[논문 구현] PyTorch로 InceptionV4(2016) 구현하고 학습하기
이번 포스팅에서는 InceptionV4를 파이토치로 구현하고, 학습까지 진행해보겠습니다. 구현할 모델은 InceptionV4에 residual block을 사용하는 Inception-ResNet-V2 입니다. 작업 환경은 구글 코랩에서 진행했습니다.
논문 리뷰는 여기에서 확인하실 수 있습니다.
전체 코드는 여기에서 확인하실 수 있습니다.
1. 데이터셋 불러오기
데이터셋은 torchvision 패키지에서 제공하는 STL10 dataset을 이용하겠습니다. STL10 dataset은 10개의 label을 갖으며 train dataset 5000개, test dataset 8000개로 구성됩니다.
우선 google colab mount를 합니다.
from google.colab import drive
drive.mount('inceptionv4')
필요한 라이브러리를 import 합니다.
# import package
# model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary
from torch import optim
# dataset and transformation
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import os
# display images
from torchvision import utils
import matplotlib.pyplot as plt
%matplotlib inline
# utils
import numpy as np
from torchsummary import summary
import time
import copy
데이터셋을 불러옵니다.
# specift the data path
path2data = '/content/inceptionv4/MyDrive/data'
# if not exists the path, make the directory
if not os.path.exists(path2data):
os.mkdir(path2data)
# load dataset
train_ds = datasets.STL10(path2data, split='train', download=True, transform=transforms.ToTensor())
val_ds = datasets.STL10(path2data, split='test', download=True, transform=transforms.ToTensor())
print(len(train_ds))
print(len(val_ds))
데이터셋에 적용할 transformation 객체를 정의하고, 데이터셋에 적용합니다.
# define image transformation
transformation = transforms.Compose([
transforms.ToTensor(),
transforms.Resize(299)
])
train_ds.transform = transformation
val_ds.transform = transformation
데이터로더를 생성합니다. 배치사이즈는 8로 설정했습니다. 모델 용량이 너무 커서 8보다 더 크게 설정하면 out of memory가 발생하네요!
# create dataloader
train_dl = DataLoader(train_ds, batch_size=8, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=8, shuffle=True)
샘플 이미지를 확인합니다.
# display sample images
def show(img, y=None, color=True):
npimg = img.numpy()
npimg_tr = np.transpose(npimg, (1, 2, 0))
plt.imshow(npimg_tr)
if y is not None:
plt.title('labels:' + str(y))
np.random.seed(0)
torch.manual_seed(0)
grid_size = 4
rnd_ind = np.random.randint(0, len(train_ds), grid_size)
x_grid = [train_ds[i][0] for i in rnd_ind]
y_grid = [train_ds[i][1] for i in rnd_ind]
plt.figure(figsize=(10,10))
x_grid = utils.make_grid(x_grid, nrow=4, padding=2)
show(x_grid, y_grid)
2. 모델 구축하기
구현할 모델은 Inception-ResNet-v2 입니다. Inception-ResNet-v2는 Inception-v4에 residual block을 사용하는 모델입니다. 코드는 https://github.com/weiaicunzai/pytorch-cifar100/blob/master/models/inceptionv4.py 를 참고했습니다.
Inception-ResNet-v2 구조는 다음과 같습니다. 아래 그림에서 출력값은 Inception-ResNet-v1 기준이므로, v2 출력값은 논문을 확인해야 합니다.
conv layer 클래스를 정의합니다. 모델 구축할 때, 편리하게 이용할 수 있습니다.
class BasicConv2d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, **kwargs):
super().__init__()
# bias=Fasle, because BN after conv includes bias.
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, bias=False, **kwargs),
nn.BatchNorm2d(out_channels),
nn.ReLU()
)
def forward(self, x):
x = self.conv(x)
return x
Stem을 정의합니다.
class Stem(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Sequential(
BasicConv2d(3, 32, 3, stride=2, padding=0), # 149 x 149 x 32
BasicConv2d(32, 32, 3, stride=1, padding=0), # 147 x 147 x 32
BasicConv2d(32, 64, 3, stride=1, padding=1), # 147 x 147 x 64
)
self.branch3x3_conv = BasicConv2d(64, 96, 3, stride=2, padding=0) # 73x73x96
# kernel_size=4: 피쳐맵 크기 73, kernel_size=3: 피쳐맵 크기 74
self.branch3x3_pool = nn.MaxPool2d(4, stride=2, padding=1) # 73x73x64
self.branch7x7a = nn.Sequential(
BasicConv2d(160, 64, 1, stride=1, padding=0),
BasicConv2d(64, 96, 3, stride=1, padding=0)
) # 71x71x96
self.branch7x7b = nn.Sequential(
BasicConv2d(160, 64, 1, stride=1, padding=0),
BasicConv2d(64, 64, (7,1), stride=1, padding=(3,0)),
BasicConv2d(64, 64, (1,7), stride=1, padding=(0,3)),
BasicConv2d(64, 96, 3, stride=1, padding=0)
) # 71x71x96
self.branchpoola = BasicConv2d(192, 192, 3, stride=2, padding=0) # 35x35x192
# kernel_size=4: 피쳐맵 크기 73, kernel_size=3: 피쳐맵 크기 74
self.branchpoolb = nn.MaxPool2d(4, 2, 1) # 35x35x192
def forward(self, x):
x = self.conv1(x)
x = torch.cat((self.branch3x3_conv(x), self.branch3x3_pool(x)), dim=1)
x = torch.cat((self.branch7x7a(x), self.branch7x7b(x)), dim=1)
x = torch.cat((self.branchpoola(x), self.branchpoolb(x)), dim=1)
return x
MaxPool을 3x3로 적용하면 피쳐맵크기가 맞지 않아 4x4로 적용했습니다.
Stem을 잘 구축했는지 확인합니다. 구조가 너무 복잡해서 실수하기가 쉽습니다. 중간중간 채널수와 피쳐맵 크기가 맞는지 확인해야 합니다. 전체 구조를 구축한뒤에 확인하면 오류가 어디에서 발생했는지 알아내기가 어렵습니다.
# check Stem
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
x = torch.randn((3, 3, 299, 299)).to(device)
model = Stem().to(device)
output_Stem = model(x)
print('Input size:', x.size())
print('Stem output size:', output_Stem.size())
Inception-ResNet-A 모듈을 정의합니다.
class Inception_Resnet_A(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.branch1x1 = BasicConv2d(in_channels, 32, 1, stride=1, padding=0)
self.branch3x3 = nn.Sequential(
BasicConv2d(in_channels, 32, 1, stride=1, padding=0),
BasicConv2d(32, 32, 3, stride=1, padding=1)
)
self.branch3x3stack = nn.Sequential(
BasicConv2d(in_channels, 32, 1, stride=1, padding=0),
BasicConv2d(32, 48, 3, stride=1, padding=1),
BasicConv2d(48, 64, 3, stride=1, padding=1)
)
self.reduction1x1 = nn.Conv2d(128, 384, 1, stride=1, padding=0)
self.shortcut = nn.Conv2d(in_channels, 384, 1, stride=1, padding=0)
self.bn = nn.BatchNorm2d(384)
self.relu = nn.ReLU()
def forward(self, x):
x_shortcut = self.shortcut(x)
x = torch.cat((self.branch1x1(x), self.branch3x3(x), self.branch3x3stack(x)), dim=1)
x = self.reduction1x1(x)
x = self.bn(x_shortcut + x)
x = self.relu(x)
return x
Inception-resnetA가 잘 구축됬는지 확인합니다.
# check Inception_Resnet_A
model = Inception_Resnet_A(output_Stem.size()[1]).to(device)
output_resA = model(output_Stem)
print('Input size:', output_Stem.size())
print('output size:', output_resA.size())
ReductionA를 정의합니다.
Inception-Resnet-V2는 k=256, l=256, m=384, n=384를 사용합니다.
class ReductionA(nn.Module):
def __init__(self, in_channels, k, l, m, n):
super().__init__()
self.branchpool = nn.MaxPool2d(3, 2)
self.branch3x3 = BasicConv2d(in_channels, n, 3, stride=2, padding=0)
self.branch3x3stack = nn.Sequential(
BasicConv2d(in_channels, k, 1, stride=1, padding=0),
BasicConv2d(k, l, 3, stride=1, padding=1),
BasicConv2d(l, m, 3, stride=2, padding=0)
)
self.output_channels = in_channels + n + m
def forward(self, x):
x = torch.cat((self.branchpool(x), self.branch3x3(x), self.branch3x3stack(x)), dim=1)
return x
확인해보겠습니다.
# check ReductionA
print('input size:', output_resA.size())
model = ReductionA(output_resA.size()[1], 256, 256, 384, 384).to(device)
output_rA = model(output_resA)
print('output size:', output_rA.size())
Inception-ResnetB 모듈을 정의합니다.
class Inception_Resnet_B(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.branch1x1 = BasicConv2d(in_channels, 192, 1, stride=1, padding=0)
self.branch7x7 = nn.Sequential(
BasicConv2d(in_channels, 128, 1, stride=1, padding=0),
BasicConv2d(128, 160, (1,7), stride=1, padding=(0,3)),
BasicConv2d(160, 192, (7,1), stride=1, padding=(3,0))
)
self.reduction1x1 = nn.Conv2d(384, 1152, 1, stride=1, padding=0)
self.shortcut = nn.Conv2d(in_channels, 1152, 1, stride=1, padding=0)
self.bn = nn.BatchNorm2d(1152)
self.relu = nn.ReLU()
def forward(self, x):
x_shortcut = self.shortcut(x)
x = torch.cat((self.branch1x1(x), self.branch7x7(x)), dim=1)
x = self.reduction1x1(x) * 0.1
x = self.bn(x + x_shortcut)
x = self.relu(x)
return x
확인해봅니다.
# check Inception_Resnet_B
model = Inception_Resnet_B(output_rA.size()[1]).to(device)
output_resB = model(output_rA)
print('Input size:', output_rA.size())
print('output size:', output_resB.size())
ReductionB를 정의합니다.
class ReductionB(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.branchpool = nn.MaxPool2d(3, 2)
self.branch3x3a = nn.Sequential(
BasicConv2d(in_channels, 256, 1, stride=1, padding=0),
BasicConv2d(256, 384, 3, stride=2, padding=0)
)
self.branch3x3b = nn.Sequential(
BasicConv2d(in_channels, 256, 1, stride=1, padding=0),
BasicConv2d(256, 288, 3, stride=2, padding=0)
)
self.branch3x3stack = nn.Sequential(
BasicConv2d(in_channels, 256, 1, stride=1, padding=0),
BasicConv2d(256, 288, 3, stride=1, padding=1),
BasicConv2d(288, 320, 3, stride=2, padding=0)
)
def forward(self, x):
x = torch.cat((self.branchpool(x), self.branch3x3a(x), self.branch3x3b(x), self.branch3x3stack(x)), dim=1)
return x
확인해보겠습니다.
# check ReductionB
model = ReductionB(output_resB.size()[1]).to(device)
output_rB = model(output_resB)
print('Input size:', output_resB.size())
print('output size:', output_rB.size())
Inception_ResNet_C 모듈을 정의합니다.
class Inception_Resnet_C(nn.Module):
def __init__(self, in_channels):
super().__init__()
self.branch1x1 = BasicConv2d(in_channels, 192, 1, stride=1, padding=0)
self.branch3x3 = nn.Sequential(
BasicConv2d(in_channels, 192, 1, stride=1, padding=0),
BasicConv2d(192, 224, (1,3), stride=1, padding=(0,1)),
BasicConv2d(224, 256, (3,1), stride=1, padding=(1,0))
)
self.reduction1x1 = nn.Conv2d(448, 2144, 1, stride=1, padding=0)
self.shortcut = nn.Conv2d(in_channels, 2144, 1, stride=1, padding=0) # 2144
self.bn = nn.BatchNorm2d(2144)
self.relu = nn.ReLU()
def forward(self, x):
x_shortcut = self.shortcut(x)
x = torch.cat((self.branch1x1(x), self.branch3x3(x)), dim=1)
x = self.reduction1x1(x) * 0.1
x = self.bn(x_shortcut + x)
x = self.relu(x)
return x
확인해보겠습니다.
# check Inception_Resnet_C
model = Inception_Resnet_C(output_rB.size()[1]).to(device)
output_resC = model(output_rB)
print('Input size:', output_rB.size())
print('output size:', output_resC.size())
위에서 정의한 클래스를 활용해서 전체 구조를 정의합니다.
class InceptionResNetV2(nn.Module):
def __init__(self, A, B, C, k=256, l=256, m=384, n=384, num_classes=10, init_weights=True):
super().__init__()
blocks = []
blocks.append(Stem())
for i in range(A):
blocks.append(Inception_Resnet_A(384))
blocks.append(ReductionA(384, k, l, m, n))
for i in range(B):
blocks.append(Inception_Resnet_B(1152))
blocks.append(ReductionB(1152))
for i in range(C):
blocks.append(Inception_Resnet_C(2144))
self.features = nn.Sequential(*blocks)
self.avgpool = nn.AdaptiveAvgPool2d((1,1))
# drop out
self.dropout = nn.Dropout2d(0.2)
self.linear = nn.Linear(2144, num_classes)
# weights inittialization
if init_weights:
self._initialize_weights()
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.dropout(x)
x = self.linear(x)
return x
# define weight initialization function
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
모델이 잘 구축됬는지 확인해보겠습니다.
# create InceptionResNetV2
model = InceptionResNetV2(10, 20, 10).to(device)
summary(model, (3, 299, 299), device=device.type)
헉...!! 모델 크기가 1426.65MB나 되네요!!! GoogLeNet(Inceptiov1)보다 무려 10배나 큽니다! 코랩 환경에서 학습이 가능할지 걱정이네요!ㅠㅠ
3. 학습하기
손실 함수와 optimizer, lr_scheduler를 정의합니다.
# define loss function and optimizer
loss_func = nn.CrossEntropyLoss(reduction='sum')
opt = optim.Adam(model.parameters(), lr=0.001)
from torch.optim.lr_scheduler import ReduceLROnPlateau
lr_scheduler = ReduceLROnPlateau(opt, mode='min', factor=0.1, patience=10)
학습에 필요한 함수들을 정의합니다.
# function to get current learning rate
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
# function to calculate metric per mini-batch
def metric_batch(output, target):
pred = output.argmax(1, keepdim=True)
corrects = pred.eq(target.view_as(pred)).sum().item()
return corrects
# function to calculate loss per mini-batch
def loss_batch(loss_func, output, target, opt=None):
loss_b = loss_func(output, target)
metric_b = metric_batch(output, target)
if opt is not None:
opt.zero_grad()
loss_b.backward()
opt.step()
return loss_b.item(), metric_b
# function to calculate loss per epoch
def loss_epoch(model, loss_func, dataset_dl, sanity_check=False, opt=None):
running_loss = 0.0
running_metric = 0.0
len_data = len(dataset_dl.dataset)
for xb, yb in dataset_dl:
xb = xb.to(device)
yb = yb.to(device)
output = model(xb)
loss_b, metric_b = loss_batch(loss_func, output, yb, opt)
running_loss += loss_b
if metric_b is not None:
running_metric += metric_b
if sanity_check is True:
break
loss = running_loss / len_data
metric = running_metric / len_data
return loss, metric
# function to start training
def train_val(model, params):
num_epochs=params['num_epochs']
loss_func=params["loss_func"]
opt=params["optimizer"]
train_dl=params["train_dl"]
val_dl=params["val_dl"]
sanity_check=params["sanity_check"]
lr_scheduler=params["lr_scheduler"]
path2weights=params["path2weights"]
loss_history = {'train': [], 'val': []}
metric_history = {'train': [], 'val': []}
best_loss = float('inf')
start_time = time.time()
for epoch in range(num_epochs):
current_lr = get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs-1, current_lr))
model.train()
train_loss, train_metric = loss_epoch(model, loss_func, train_dl, sanity_check, opt)
loss_history['train'].append(train_loss)
metric_history['train'].append(train_metric)
model.eval()
with torch.no_grad():
val_loss, val_metric = loss_epoch(model, loss_func, val_dl, sanity_check)
loss_history['val'].append(val_loss)
metric_history['val'].append(val_metric)
if val_loss < best_loss:
best_loss = val_loss
print('Get best val_loss!')
lr_scheduler.step(val_loss)
print('train loss: %.6f, val loss: %.6f, accuracy: %.2f, time: %.4f min' %(train_loss, val_loss, 100*val_metric, (time.time()-start_time)/60))
print('-'*10)
return model, loss_history, metric_history
training 파라미터를 정의합니다. 모델 크기가 너무 크기 때문에 5epoch만 학습하겠습니다.
# definc the training parameters
params_train = {
'num_epochs':5,
'optimizer':opt,
'loss_func':loss_func,
'train_dl':train_dl,
'val_dl':val_dl,
'sanity_check':False,
'lr_scheduler':lr_scheduler,
'path2weights':'./models/weights.pt',
}
# create the directory that stores weights.pt
def createFolder(directory):
try:
if not os.path.exists(directory):
os.makedirs(directory)
except OSerror:
print('Error')
createFolder('./models')
학습 시작!
model, loss_hist, metric_hist = train_val(model, params_train)
GoogLeNet은 1epoch만에 accuracy를 31% 달성했습니다. 하지만 Inception-v4는 12.51%입니다. 왜 이럴까요?!
깊은 모델일수록 학습 초기에 수렴하기가 어렵습니다. 4epoch까지 수렴을 못하고 있는 모습을 확인할 수 있네요. optimizer을 바꿔보거나 초기 학습률을 수정해보는 것도 좋은 방법입니다.
loss, accuracy progress를 출력합니다.
# Train-Validation Progress
num_epochs=params_train["num_epochs"]
# plot loss progress
plt.title("Train-Val Loss")
plt.plot(range(1,num_epochs+1),loss_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),loss_hist["val"],label="val")
plt.ylabel("Loss")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
# plot accuracy progress
plt.title("Train-Val Accuracy")
plt.plot(range(1,num_epochs+1),metric_hist["train"],label="train")
plt.plot(range(1,num_epochs+1),metric_hist["val"],label="val")
plt.ylabel("Accuracy")
plt.xlabel("Training Epochs")
plt.legend()
plt.show()
감사합니다.