[논문 구현] PyToch로 AlexNet(2012) 구현하기
AlexNet(2012) 논문 리뷰는 여기에서 확인하실 수 있습니다.
전체 코드는 여기에서 확인하실 수 있습니다.


1. 데이터셋 불러오고 Transformation 적용하기
데이터셋은 torchvision에서 제공하는 STL-10 dataset을 사용합니다. STL-10 dataset은 train과 test dataset만 제공합니다. 따라서, test dataset을 분할해서 val dataset을 따로 만들어 주겠습니다. transformation은 resize, horizontal flip, normalize를 적용합니다.
우선 구글 코랩에 마운트를 합니다.
from google.colab import drive
STL-10 dataset을 불러옵니다. torchvision 패키지에서 데이터셋을 제공합니다.
# loading training dataset
from torchvision import datasets
import torchvision.transforms as transforms
import os
# specify a data path
path2data = '/data'
# if not exists the path, make the path
if not os.path.exists(path2data):
# load STL10 train dataset, and check
data_transformer = transforms.Compose([transforms.ToTensor()])
train_ds = datasets.STL10(path2data, split='train', download=True, transform=data_transformer)
# load STL10 test dataset
test0_ds = datasets.STL10(path2data, split='test', download=True, transform=data_transformer)
데이터 정규화를 위해 평균, 표준편차 값을 계산합니다.
# calculate the mean and standard deviation of train_ds
import numpy as np
meanRGB = [np.mean(x.numpy(), axis=(1,2)) for x, _ in train_ds]
stdRGB = [np.std(x.numpy(), axis=(1,2)) for x, _ in train_ds]
meanR = np.mean([m[0] for m in meanRGB])
meanG = np.mean([m[1] for m in meanRGB])
meanB = np.mean([m[2] for m in meanRGB])
stdR = np.mean([s[0] for s in stdRGB])
stdG = np.mean([s[1] for s in stdRGB])
stdB = np.mean([s[2] for s in stdRGB])
print(meanR, meanG, meanB)
print(stdR, stdG, stdB)
image transformation을 정의합니다.
# define the image transformation for trains_ds
# in paper, using FiveCrop, normalize, horizontal reflection
train_transformer = transforms.Compose([
transforms.Normalize([meanR, meanG, meanB], [stdR, stdG, stdB]),
# define the image transforamtion for test0_ds
test_transformer = transforms.Compose([
transforms.Normalize([meanR, meanG, meanB], [stdR, stdG, stdB]),
불러온 데이터셋에 transformation을 적용합니다.
# apply transformation to train_ds and test0_ds
train_ds.transform = train_transformer
test0_ds.transform = test_transformer
transformation이 적용된 sample image를 확인합니다.
import torch
from torchvision import utils
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
# display the transformed sample images from train_ds
# define helper function to show images
def show(img, y=None, color=True):
npimg = img.numpy()
npimg_tr = np.transpose(npimg, (1, 2, 0))
# plt.imshow(npimg_tr)
if y is not None:
plt.title('labels: ' + str(y))
# pick a random sample image
rnd_inds = int(np.random.randint(0, len(train_ds), 1))
img, label = train_ds[rnd_inds]
print('images indices: ', rnd_inds)
plt.figure(figsize=(10, 10))
색상이 좀 기괴하네요ㅎㅎ
귀여운 강아지...가 사라졌습니다.
색상 정규화를 적용하지 않으면 귀여운 강아지가 다시 나타날껍니다ㅎㅎ
train_ds에서 카테고리당 이미지 개수를 확인합니다.
# count the number of images per category in train_ds
import collections
y_train = [y for _, y in train_ds]
counter_train = collections.Counter(y_train)
10개의 label이 존재하고 각 label에 500개의 이미지가 있네요
STL-10 dataset은 val dataset을 제공하지 않으므로 test dataset에서 val dataset을 분할하겠습니다.
StratifiedShuffleSplit 함수는 각 레이블에서 동일한 비율로 이미지 인덱스를 추출합니다.
이제 Subset 함수를 활용하여 추출한 인덱스로 데이터셋을 분할해주면 됩니다!!
# split the indices of test0_ds into two groups
# there aren't validation dataset in STL10 dataset, so make validation dataset
# by spliting test0 dataset
from sklearn.model_selection import StratifiedShuffleSplit
# StratifiedShuffleSplit splits indices of test0 in same proportion of labels
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
indices = list(range(len(test0_ds)))
y_test0 = [y for _,y in test0_ds]
for test_index, val_index in sss.split(indices, y_test0):
print('test :', len(test_index) , 'val :', len(val_index))
Subset 함수로 test dataset과 val dataset을 나눕니다.
test0 dataset이 업데이트되면 test dataset과 val dataset 모두 업데이트 됩니다.
# create two datasets from test0_ds
from torch.utils.data import Subset
# if test0_ds is updated, val_ds and test_ds are updated
# because val_ds and test_ds are a subset of test0_ds
val_ds = Subset(test0_ds, val_index)
test_ds = Subset(test0_ds, test_index)
# count the number of images per calss in val_ds and test_ds
import collections
import numpy as np
y_test = [y for _, y in test_ds]
y_val = [y for _, y in val_ds]
counter_test = collections.Counter(y_test)
counter_val = collections.Counter(y_val)
DataLoader를 생성합니다.
# create dataloaders from train_ds and val_ds
from torch.utils.data import DataLoader
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=32, shuffle=False)
# check dataloader
for x,y in train_dl:
for x,y in val_dl:
2. 모델 구축하기
우선 device를 정의합니다.
# define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
import torch.nn as nn
import torch.nn.functional as F
class AlexNet(nn.Module):
def __init__(self, num_classes=10):
# input size : (b x 3 x 227 x 227)
# 논문에는 image 크기가 224 pixel이라고 나와 있지만, 오타입니다.
# 227x227을 사용합니다.
# Conv layer
self.net = nn.Sequential(
nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=0), # (b x 96 x 55 x 55)
nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),
nn.MaxPool2d(kernel_size=3, stride=2), # (b x 96 x 27 x 27)
nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2), # (b x 256 x 27 x 27)
nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),
nn.MaxPool2d(kernel_size=3, stride=2), # (b x 256 x 13 x 13)
nn.Conv2d(256, 384, 3, 1, 1), # (b x 384 x 13 x 13)
nn.Conv2d(384, 384, 3, 1, 1), # (b x 384 x 13 x 13)
nn.Conv2d(384, 256, 3, 1, 1), # (b x 256 x 13 x 13)
nn.MaxPool2d(3, 2), # (b x 256 x 6 x 6)
# fc layer
self.classifier = nn.Sequential(
nn.Dropout(p=0.5, inplace=True),
nn.Linear(in_features=(256 * 6 * 6), out_features=4096),
nn.Dropout(p=0.5, inplace=True),
nn.Linear(in_features=4096, out_features=4096),
nn.Linear(in_features=4096, out_features=num_classes),
# weight initialization
# define weight initialization function
def init_weight(self):
for layer in self.net:
if isinstance(layer, nn.Conv2d):
nn.init.normal_(layer.weight, mean=0, std=0.01)
nn.init.constant_(layer.bias, 0)
# in paper, initialize bias to 1 for conv2, 4, 5 layer
nn.init.constant_(self.net[4].bias, 1)
nn.init.constant_(self.net[10].bias, 1)
nn.init.constant_(self.net[12].bias, 1)
def forward(self,x):
x = self.net(x)
x = x.view(-1, 256 * 6* 6)
x = self.classifier(x)
return x
구축한 모델을 생성하고 확인합니다.
# check the model
model = AlexNet().to(device)
model summary를 출력합니다.
# get the model summary
from torchsummary import summary
summary(Model, input_size=(3, 227, 227), device=device.type)
모델이 잘 생성되었네요ㅎㅎ
가중치 초기화가 잘 적용됬는지 확인해봅니다.
# check weight initialization
for p in model.parameters():
3. 모델 학습
# define the loss function
loss_func = nn.CrossEntropyLoss(reduction='sum')
# define the optimizer
from torch import optim
opt = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
# opt = optim.Adam(model.parameters(), lr=0.01)
# read the current value of the learning rate using the following function
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
# define lr_scheduler : 1/10 per 10 epochs
from torch.optim.lr_scheduler import StepLR
lr_scheduler = optim.lr_scheduler.StepLR(opt, step_size=30, gamma=0.1)
# define a function to count the number of correct predictions per mini-batch
def metrics_batch(output, target):
# get output class
pred = output.argmax(dim=1, keepdim=True)
# compare output class with target class
corrects = pred.eq(target.view_as(pred)).sum().item()
return corrects
# define a function to compute the loss value per mini-batch
def loss_batch(loss_func, output, target, opt=None):
loss = loss_func(output, target)
metric_b = metrics_batch(output, target)
if opt is not None:
return loss.item(), metric_b
# develop a function to compute the loss value and the performance metric for the epoch
def loss_epoch(model, loss_func, dataset_dl, sanity_check=False, opt=None):
running_loss = 0
running_metric = 0
len_data = len(dataset_dl.dataset)
for xb, yb in dataset_dl:
# move batcch to device
xb = xb.to(device)
yb = yb.to(device)
# get model output
output = model(xb)
# get loss per batch
loss_b, metric_b = loss_batch(loss_func, output, yb, opt)
# update running loss
running_loss += loss_b
# update running metric
if metric_b is not None:
running_metric += metric_b
# break the loop in case of sanity check
if sanity_check is True:
# average loss value and metric value
loss = running_loss / float(len_data)
metric = running_metric / float(len_data)
return loss, metric
import time
import copy
# develop train_val function
def train_val(model, params):
# extract model parameters
num_epochs = params['num_epochs']
loss_func = params['loss_func']
opt = params['optimizer']
train_dl = params['train_dl']
val_dl = params['val_dl']
sanity_check = params['sanity_check']
lr_scheduler = params['lr_scheduler']
path2weights = params['path2weights']
# keep a history of the loss and the metric value
loss_history = {
'train': [],
'val': [],
metric_history = {
'train': [],
'val': [],
# save the best perfirming model
best_model_wts = copy.deepcopy(model.state_dict())
# initializer the best loss to an infinite value
best_loss = float('inf')
for epoch in range(num_epochs):
start_time = time.time()
# get current learning rate
current_lr = get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs-1, current_lr))
# train model on trainin dataset
train_loss, train_metric = loss_epoch(model, loss_func, train_dl, sanity_check, opt)
# collect loss and metric for the training dataset
# evaluate model on validation dataset
with torch.no_grad():
val_loss, val_metric = loss_epoch(model, loss_func, val_dl, sanity_check)
# store the best model
if val_loss < best_loss:
best_loss = val_loss
best_model_wts = copy.deepcopy(model.state_dict())
# store weights into a local file
torch.save(model.state_dict(), path2weights)
print('Copied best model weights')
# collect loss and metric for validation dataset
# update the learning rate
if current_lr != get_lr(opt):
print('Loading best model weights!')
# print the loss and accuracy values and return the trained model
print('train loss: %.6f, dev loss: %.6f, accuracy: %.2f, time: %.4f s' %(train_loss, val_loss, 100*val_metric, time.time()-start_time))
# load best model weights
return model, loss_history, metric_history
# define the training parameters
params_train = {
# check the directory to save weights.pt
def createFolder(directory):
if not os.path.exists(directory):
except OSerror:
# train model
# sanity_check를 True로 설정하여, 학습이 되는지 확인해봅니다.
model, loss_hist, metric_hist = train_val(model, params_train)
학습을 위한 함수가 많이 기네요...ㅎㅎ
sanity_check = True로 설정하여 1epoch당 1 mini batch, 총 3epoh 학습시켰습니다.
학습이 진행되는 걸로 보아 위 함수가 잘 작동하네요!
sanity_check = False로 80 epoch까지 학습을 시키다가 코랩이 끊겨버렸습니다!!!
그때 결과를 보았을 때, 수렴이 되지 않았던 걸로 기억하네요ㅠㅠ
아마 데이터셋도 너무 적고 AlexNet이 얇은 신경망이기 때문이라고 생각합니다....ㅎㅎ
loss가 낮은 이유는 1epoch당 1 mini-batch를 계산하도록 설정했기 때문입니다. 전체 로스는 데이터셋 수로 나눠주므로 낮은 수가 나올 수 밖에 없습니다. 정확도는 0.31%인 것으로 보아 모델이 거의 예측을 못하고 있네요...ㅋㅋ 아마 400 epoch정도 학습시켜주면 수렴하지 않을까.... 생각하고 있습니다!
아래 함수를 이용하면 저장했던 train_loss, val_loss를 표로 만들어, 시각적으로 확인할 수 있습니다!
# Train-Validation Progress
# plot loss progress
plt.title("Train-Val Loss")
plt.xlabel("Training Epochs")
# plot accuracy progress
plt.title("Train-Val Accuracy")
plt.xlabel("Training Epochs")
다음에는 VGGnet 논문 구현 포스팅으로 뵙겠습니다. 읽어주셔서 감사합니다..!!