[논문 구현] PyTorch로 YOLOv3(2018) 구현하고 학습하기
안녕하세요! 이번에는 YOLOv3을 PyTorch로 구현하고 학습까지 해보도록 하겠습니다. 작업환경은 Google Colab에서 진행했습니다.
YOLOv3 논문 리뷰는 아래 포스팅에서 확인하실 수 있습니다.
전체 코드는 여기에서 확인하실 수 있습니다.
아래 코드를 분석하여 구현해보았습니다.
- https://github.com/ayooshkathuria/YOLO_v3_tutorial_from_scratch
- https://github.com/aladdinpersson/Machine-Learning-Collection/tree/master/ML/Pytorch/object_detection/YOLOv3
- https://github.com/PacktPublishing/PyTorch-Computer-Vision-Cookbook
- https://github.com/pjreddie/darknet
- https://github.com/eriklindernoren/PyTorch-YOLOv3
1. Kaggle에서 VOC Dataset 다운로드
우선, 학습에 필요한 Dataset을 다운로드 하겠습니다. 전처리된 voc dataset을 aladdinpersson 유튜버가 업로드 해두었습니다. 이것을 활용해서 custom dataset을 생성하겠습니다.
kaggle API를 활용하면 쉽게 dataset을 다운로드 할 수 있습니다.
# download kaggle API
!pip install kaggle --upgrade
kaggle APL을 사용하려면, kaggle.json 파일이 필요합니다. kaggle.json 파일은 kaggle 홈페이지에 로그인 한뒤에 account 탭에서 다운로드 받을 수 있습니다. 이 파일을 /root/.kaggle/ 에 업로드를 합니다.
# make .kaggle directory
# kaggle APL을 사용하려면, /root/.kaggle/ 에 kaggle.json을 업로드 해야합니다.
!mkdir .kaggle
cd /root/.kaggle/
# kaggle 홈페이지에서 로그인 한뒤에 account 탭에 들어가서 kaggle.json을 다운로드 합니다.
# 다운로드한 kaggle.json을 /root/.kaggle/ 에 업로드 합니다.
from google.colab import files
file_uploaded = files.upload()
이제 원하는 위치에 VOC dataset을 다운로드 받습니다.
# download pre-processed voc dataset
!kaggle datasets download -d aladdinpersson/pascal-voc-dataset-used-in-yolov3-video
압축을 해제합니다.
# unzip
!unzip -n pascal-voc-dataset-used-in-yolov3-video.zip
augmentation을 위한 module을 설치합니다.
# install transformation package
!pip install -U albumentations
2. VOC Custom Dataset 생성하기
다운로드 받은 VOC dataset으로 custom dataset을 생성하겠습니다.
필요한 라이브러리를 import 합니다.
# import pakages
import torch
from torch import nn
from torch import optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import utils
from torchsummary import summary
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import to_pil_image
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import cv2
import os
import copy
import numpy as np
import pandas as pd
import random
import albumentations as A
from albumentations.pytorch import ToTensor
%matplotlib inline
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
VOC class names를 정의합니다.
# VOC class names
classes = [
"aeroplane",
"bicycle",
"bird",
"boat",
"bottle",
"bus",
"car",
"cat",
"chair",
"cow",
"diningtable",
"dog",
"horse",
"motorbike",
"person",
"pottedplant",
"sheep",
"sofa",
"train",
"tvmonitor"
]
Custom dataset을 정의합니다.
class VOCDataset(Dataset):
def __init__(self, csv_file, img_dir, label_dir, transform=None, trans_params=None):
self.annotations = pd.read_csv(csv_file)
self.img_dir = img_dir
self.label_dir = label_dir
self.transform = transform
self.trans_params = trans_params
def __len__(self):
return len(self.annotations)
def __getitem__(self, index):
label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) # /PASCAL_VOC/labels/000009.txt
img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) # /PASCAL_VOC/images/000009.jpg
image = np.array(Image.open(img_path).convert("RGB")) # albumentation을 적용하기 위해 np.array로 변환합니다.
labels = None
if os.path.exists(label_path):
# np.roll: (class, cx, cy, w, h) -> (cx, cy, w, h, class)
# np.loadtxt: txt 파일에서 data 불러오기
labels = np.array(np.roll(np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1).tolist())
# labels = np.loadtxt(label_path).reshape(-1, 5)
if self.transform:
# apply albumentations
augmentations = self.transform(image=image, bboxes=labels)
image = augmentations['image']
targets = augmentations['bboxes']
# for DataLoader
# lables: ndarray -> tensor
# dimension: [batch, cx, cy, w, h, class]
if targets is not None:
targets = torch.zeros((len(labels), 6))
targets[:, 1:] = torch.tensor(labels)
else:
targets = labels
return image, targets, label_path
3. train dataset, val dataset 생성하기
train dataset과 val dataset을 생성하겠습니다.
# train dataset 생성하기
train_csv_file = '/PASCAL_VOC/train.csv'
label_dir = '/PASCAL_VOC/labels'
img_dir = '/PASCAL_VOC/images'
train_ds = VOCDataset(train_csv_file, img_dir, label_dir)
img, labels, _ = train_ds[1]
print('number of data:',len(train_ds))
print('image size:', img.shape, type(img)) # HxWxC
print('labels shape:', labels.shape, type(labels)) # x1,y1,x2,y2
print('lables \n', labels)
val dataset을 생성합니다.
# val dataset 생성하기
val_csv_file = '/PASCAL_VOC/test.csv'
label_dir = '/PASCAL_VOC/labels'
img_dir = '/PASCAL_VOC/images'
val_ds = VOCDataset(val_csv_file, img_dir, label_dir)
img, labels, _ = val_ds[1]
print('number of data:',len(val_ds))
print('image size:', img.shape, type(img))
print('labels shape:', labels.shape, type(labels))
print('lables \n', labels)
4. transforms을 정의하고, dataset에 적용하기
VOC dataset 바운딩박스는 x1, y1, x2, y2로 되어있습니다. Albumentations 모듈로 바운딩 박스를 yolo format인 cx, cy, w, h로 변경합니다.
# transforms 정의하기
IMAGE_SIZE = 416
scale = 1.0
# for train
train_transforms = A.Compose([
# 이미지의 maxsize를 max_size로 rescale합니다. aspect ratio는 유지합니다.
A.LongestMaxSize(max_size=int(IMAGE_SIZE * scale)),
# min_size보다 작으면 pad
A.PadIfNeeded(min_height=int(IMAGE_SIZE * scale), min_width=int(IMAGE_SIZE * scale), border_mode=cv2.BORDER_CONSTANT),
# random crop
A.RandomCrop(width=IMAGE_SIZE, height=IMAGE_SIZE),
# brightness, contrast, saturation을 무작위로 변경합니다.
A.ColorJitter(brightness=0.6, contrast=0.6, saturation=0.6, hue=0.6, p=0.4),
# transforms 중 하나를 선택해 적용합니다.
A.OneOf([
# shift, scale, rotate 를 무작위로 적용합니다.
A.ShiftScaleRotate(rotate_limit=20, p=0.5, border_mode=cv2.BORDER_CONSTANT),
# affine 변환
A.IAAAffine(shear=15, p=0.5, mode='constant')
], p=1.0),
# 수평 뒤집기
A.HorizontalFlip(p=0.5),
# blur
A.Blur(p=0.1),
# Contrast Limited Adaptive Histogram Equalization 적용
A.CLAHE(p=0.1),
# 각 채널의 bit 감소
A.Posterize(p=0.1),
# grayscale로 변환
A.ToGray(p=0.1),
# 무작위로 channel을 섞기
A.ChannelShuffle(p=0.05),
# normalize
A.Normalize(mean=[0,0,0], std=[1,1,1], max_pixel_value=255),
ToTensor()
],
# (x1, y1, x2, y2) -> (cx, cy, w, h)
bbox_params=A.BboxParams(format='yolo', min_visibility=0.4, label_fields=[])
)
# for validation
val_transforms = A.Compose([
A.LongestMaxSize(max_size=int(IMAGE_SIZE * scale)),
A.PadIfNeeded(min_height=int(IMAGE_SIZE * scale), min_width=int(IMAGE_SIZE * scale), border_mode=cv2.BORDER_CONSTANT),
A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255),
ToTensor(),
],
bbox_params=A.BboxParams(format='yolo', min_visibility=0.4, label_fields=[])
)
이제 transforms를 데이터셋에 적용합니다.
# 데이터셋에 transforms 적용하기
train_ds.transform = train_transforms
val_ds.transform = val_transforms
5. sample image 출력하기
transform이 적용된 sample image를 출력하겠습니다.
# 정규화된 x,y,w,h를 이미지 크기에 맞게 변경
def rescale_bbox(bb, W, H):
x,y,w,h = bb
return [x*W, y*H, w*W, h*H]
# 바운딩 박스 색상
COLORS = np.random.randint(0, 255, size=(80,3),dtype='uint8')
# image 출력 함수 정의
def show_img_bbox(img, targets, classes=classes):
if torch.is_tensor(img):
img=to_pil_image(img)
if torch.is_tensor(targets):
targets=targets.numpy()[:,1:]
W, H = img.size
draw = ImageDraw.Draw(img)
for tg in targets:
id_=int(tg[4])
bbox=tg[:4]
bbox=rescale_bbox(bbox,W,H)
xc,yc,w,h = bbox
color = [int(c) for c in COLORS[id_]]
name=classes[id_]
draw.rectangle(((xc-w/2, yc-h/2), (xc+w/2, yc+h/2)), outline=tuple(color), width=3)
draw.text((xc-w/2, yc-h/2), name, fill=(255,255,255,0))
plt.imshow(np.array(img))
# transforms가 적용된 sample image 확인
np.random.seed(2)
grid_size = 2
rnd_ind = np.random.randint(0, len(train_ds), grid_size)
print('image indices:',rnd_ind)
plt.figure(figsize=(20, 20))
for i, indice in enumerate(rnd_ind):
img, label, _ = train_ds[indice]
plt.subplot(1, grid_size, i+1)
show_img_bbox(img, label)
6. DataLoader 생성하기
dataloader 함수의 collate_fn 인자를 설정해야 합니다. 이미지마다 bounding box 개수가 다르므로, 이들을 한꺼번에 배치로 묶어야 하기 때문입니다.
# collate_fn 를 정의합니다.
# collate_fn은 DataLoader의 인자로 사용되며, batch 단위로 imgs와 targets를 묶습니다.
def collate_fn(batch):
imgs, targets, paths = list(zip(*batch))
# 빈 박스 제거하기
targets = [boxes for boxes in targets if boxes is not None]
# index 설정하기
for b_i, boxes in enumerate(targets):
boxes[:, 0] = b_i
targets = torch.cat(targets, 0)
imgs = torch.stack([img for img in imgs])
return imgs, targets, paths
dataloader를 생성합니다.
# make DataLoader
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_dl = DataLoader(val_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)
# check train_dl
torch.manual_seed(1)
for imgs_batch, tg_batch, path_batch in train_dl:
break
print(imgs_batch.shape)
print(tg_batch.shape, tg_batch.dtype)
print(tg_batch)
7. YOLOv3 모델 구축하기
YOLOv3은 FPN을 활용하는 모델입니다. 13x13, 26x26, 52x52 3개의 피쳐맵에서 예측을 수행합니다.
YOLOv3 을 구현하는 방법은 두 가지 입니다.
1. Configuration file 분석하기
2. 직접 모델 구현하기
연습 삼아 두개 다 해보겠습니다.
7-1 configuration file 분석하기
configuration file을 분석하여, 분석한 결과를 활용하겠습니다. config file은 https://github.com/pjreddie/darknet git을 연동 한뒤에, cfg 폴더에 있는 yolov3-voc.cfg 파일을 사용하겠습니다.
!git clone https://github.com/pjreddie/darknet.git
# yolov3-voc.cfg 경로를 저장합니다.
path2config = '/darknet/cfg/yolov3-voc.cfg'
config 파일을 분석하는 함수를 정의합니다.
# config 파일을 분석하는 함수를 정의합니다.
def parse_model_config(path2file):
# cfg 파일 열기
cfg_file = open(path2file, 'r')
# 문자열 데이터 읽어오기
lines = cfg_file.read().split('\n') #['[net]', '# Testing', '# batch=1', '....' ]
# 데이터 전처리
# startswith('#'): 문자열이 # 로 시작하는지 여부를 알려줍니다.
lines = [x for x in lines if x and not x.startswith('#')] # ['[net]', 'batch=64', '...']
# 공백 제거
lines = [x.rstrip().lstrip() for x in lines]
blocks_list = []
for line in lines:
if line.startswith('['): # [net]
blocks_list.append({}) # {}
blocks_list[-1]['type'] = line[1:-1].rstrip() # [{'type': 'net'}]
else:
key, value = line.split('=') # batch=64 -> batch, 64
value = value.strip() # 공백 제거
blocks_list[-1][key.rstrip()] = value.strip() # 'batch':'64'
return blocks_list
cfg 파일을 분석하여, blocks_list 를 생성합니다.
# cfg 파일 분석
blocks_list = parse_model_config(path2config)
cfg 파일을 분석한 blocks_list로 pytorch module list를 생성합니다.
# EmptyLayer를 정의합니다.
# EmptyLayer는 residual unit의 shortcut과 FPN의 lateral connection 용도로 사용합니다.
class EmptyLayer(nn.Module):
def __init__(self):
super().__init__()
# YOLOLayer를 정의합니다.
# YOLOLayer는 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
class YOLOLayer(nn.Module):
def __init__(self, anchors, num_classes, img_dim=416):
super().__init__()
self.anchors = anchors # three anchor per YOLO layer
self.num_anchors = len(anchors) # 3
self.num_classes = num_classes
self.img_dim = img_dim
self.grid_size = 0
def forward(self, x):
# x: batch_size, channels, H, W
batch_size = x.size(0)
grid_size = x.size(2) # S = 13 or 26 or 52
device = x.device
prediction = x.view(batch_size, self.num_anchors, self.num_classes + 5,
grid_size, grid_size) # shape = (batch, 3, 25, S, S)
# (batch, 3, 25, S, S) -> (batch, 3, S, S, 25)
prediction = prediction.permute(0, 1, 3, 4, 2)
prediction = prediction.contiguous()
obj_score = torch.sigmoid(prediction[..., 4]) # 클래스
pred_cls = torch.sigmoid(prediction[..., 5:]) # 바운딩 박스 좌표
if grid_size != self.grid_size:
# grid_size 갱신, cell index 생성, anchor 정규화
self.compute_grid_offsets(grid_size, cuda=x.is_cuda)
# bounding box prediction
pred_boxes = self.transform_outputs(prediction)
# batch, num_anchor x S x S, 25
# ex) at 13x13 -> [batch, 507, 25], at 26x26 -> [batch, 2028, 85], at 52x52 -> [batch, 10647, 85]
# 최종적으로 YOLO는 10647개의 바운딩박스를 예측합니다.
output = torch.cat((pred_boxes.view(batch_size, -1, 4),
obj_score.view(batch_size, -1, 1),
pred_cls.view(batch_size, -1, self.num_classes)), -1)
return output
def compute_grid_offsets(self, grid_size, cuda=True):
self.grid_size = grid_size # ex) 13, 26, 52
self.stride = self.img_dim / self.grid_size # ex) 32, 16, 8
# cell index 생성
# 1, 1, S, 1
self.grid_x = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).type(torch.float32)
# 1, 1, 1, S
self.grid_y = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).transpose(3, 2).type(torch.float32)
# anchors를 feature map 크기로 정규화, [0~1] 범위
# ex) (10, 13), (16, 30), (33, 23) / stride
scaled_anchors = [(a_w / self.stride, a_h / self.stride) for a_w, a_h in self.anchors]
# tensor로 변환
self.scaled_anchors = torch.tensor(scaled_anchors, device=device)
# shape=(3,2) -> (1,1,3,1)
self.anchor_w = self.scaled_anchors[:, 0:1].view((1, self.num_anchors, 1, 1))
# shape=(3,2) -> (1,1,3,1)
self.anchor_h = self.scaled_anchors[:, 1:2].view((1, self.num_anchors, 1, 1))
def transform_outputs(self, prediction):
# pridiction (batch, 3, S, S, 25)
device = prediction.device
x = torch.sigmoid(prediction[..., 0]) # sigmoid(box x), 예측값을 sigmoid로 감싸서 [0~1] 범위
y = torch.sigmoid(prediction[..., 1]) # sigmoid(box y), 예측값을 sigmoid로 감싸서 [0~1] 범위
w = prediction[..., 2] # 예측한 바운딩 박스 너비
h = prediction[..., 3] # 예측한 바운딩 박스 높이
pred_boxes = torch.zeros_like(prediction[..., :4]).to(device)
pred_boxes[..., 0] = x.data + self.grid_x # sigmoid(box x) + cell x 좌표
pred_boxes[..., 1] = y.data + self.grid_y # sigmoid(box y) + cell y 좌표
pred_boxes[..., 2] = torch.exp(w.data) * self.anchor_w
pred_boxes[..., 3] = torch.exp(h.data) * self.anchor_h
return pred_boxes * self.stride
layer를 생성하는 함수를 정의합니다.
# layer를 생성하는 함수를 정의합니다.
def create_layers(blocks_list):
hyperparams = blocks_list[0]
channels_list = [int(hyperparams['channels'])]
module_list = nn.ModuleList()
for layer_ind, layer_dict in enumerate(blocks_list[1:]):
modules = nn.Sequential()
if layer_dict['type'] == 'convolutional':
filters = int(layer_dict['filters'])
kernel_size = int(layer_dict['size'])
pad = (kernel_size - 1) // 2
bn = layer_dict.get('batch_normalize', 0)
conv2d = nn.Conv2d(in_channels=channels_list[-1], out_channels=filters, kernel_size=kernel_size,
stride=int(layer_dict['stride']), padding=pad, bias=not bn)
modules.add_module('conv_{0}'.format(layer_ind), conv2d)
if bn:
bn_layer = nn.BatchNorm2d(filters, momentum=0.9, eps=1e-5)
modules.add_module('batch_norm_{0}'.format(layer_ind), bn_layer)
if layer_dict['activation'] == 'leaky':
activn = nn.LeakyReLU(0.1)
modules.add_module('leky_{0}'.format(layer_ind), activn)
elif layer_dict["type"] == "upsample":
stride = int(layer_dict["stride"])
upsample = nn.Upsample(scale_factor = stride)
modules.add_module("upsample_{}".format(layer_ind), upsample)
elif layer_dict["type"] == "shortcut":
backwards=int(layer_dict["from"])
filters = channels_list[1:][backwards]
modules.add_module("shortcut_{}".format(layer_ind), EmptyLayer())
elif layer_dict["type"] == "route":
layers = [int(x) for x in layer_dict["layers"].split(",")]
filters = sum([channels_list[1:][l] for l in layers])
modules.add_module("route_{}".format(layer_ind), EmptyLayer())
elif layer_dict["type"] == "yolo":
anchors = [int(a) for a in layer_dict["anchors"].split(",")]
anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]
# ex) at 13x13, 'mask': '6,7,8'
# mask는 anchors index를 의미합니다.
# yolo layer당 3개의 anchors를 할당 합니다.
# mask는 yolo layer feature map size에 알맞는 anchors를 설정합니다.
mask = [int(m) for m in layer_dict["mask"].split(",")]
anchors = [anchors[i] for i in mask] # 3 anchors
num_classes = int(layer_dict["classes"]) # 20
img_size = int(hyperparams["height"]) # 416
yolo_layer = YOLOLayer(anchors, num_classes, img_size)
modules.add_module("yolo_{}".format(layer_ind), yolo_layer)
module_list.append(modules)
channels_list.append(filters)
return hyperparams, module_list
생성한 module_list로 Darknet을 정의합니다.
class Darknet(nn.Module):
def __init__(self, config_path, img_size=416):
super(Darknet, self).__init__()
self.blocks_list = parse_model_config(config_path)
self.hyperparams, self.module_list = create_layers(self.blocks_list)
self.img_size = img_size
def forward(self, x):
img_dim = x.shape[2]
layer_outputs, yolo_outputs = [], []
# blocks_list: config 파일 분석한 결과
# module_list: blocks_list로 생성한 module
for block, module in zip(self.blocks_list[1:], self.module_list):
if block["type"] in ["convolutional", "upsample", "maxpool"]:
x = module(x)
elif block["type"] == "shortcut":
layer_ind = int(block["from"]) # -3
x = layer_outputs[-1] + layer_outputs[layer_ind] # shortcut connection
# {'type': 'yolo', 'mask': '3,4,5', 'anchors': '10,13, ...}
elif block["type"] == "yolo":
x= module[0](x) # get yolo layer output
yolo_outputs.append(x)
elif block["type"] == "route": # {'type': 'route', 'layers': '-1, 61'}
x = torch.cat([layer_outputs[int(l_i)]
for l_i in block["layers"].split(",")], 1)
layer_outputs.append(x)
yolo_out_cat = torch.cat(yolo_outputs, 1) # 3개의 output을 하나로 연결
return yolo_out_cat, yolo_outputs
모델을 생성하고 확인합니다.
# check model
model = Darknet(path2config).to(device)
x=torch.rand(1,3,416,416).to(device)
with torch.no_grad():
yolo_out_cat, yolo_outputs=model.forward(x)
print(yolo_out_cat.shape)
print(yolo_outputs[0].shape,yolo_outputs[1].shape,yolo_outputs[2].shape)
model summary를 출력합니다.
summary(model, (3, 416, 416))
7-2 직접 모델 구현하기
YOLOv3은 DarkNet으로 특징을 추출하고, FPN을 거쳐서 예측을 합니다. 전체 구조입니다.
DarkNet 구조입니다.
우선 BasicConv class를 정의합니다.
class BasicConv(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False),
nn.BatchNorm2d(out_channels),
nn.LeakyReLU(0.1)
)
def forward(self, x):
return self.conv(x)
residual block을 정의합니다.
class ResidualBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.residual = nn.Sequential(
BasicConv(channels, channels//2, 1, stride=1, padding=0),
BasicConv(channels//2, channels, 3, stride=1, padding=1)
)
self.shortcut = nn.Sequential()
def forward(self, x):
x_shortcut = self.shortcut(x)
x_residual = self.residual(x)
return x_shortcut + x_residual
FPN에서 사용하는 Top_down layer를 정의합니다.
# FPN의 Top_down layer 입니다.
# lateral connection과 Upsampling이 concatate 한 뒤에 수행합니다.
class Top_down(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.conv = nn.Sequential(
BasicConv(in_channels, out_channels, 1, stride=1, padding=0),
BasicConv(out_channels, out_channels*2, 3, stride=1, padding=1),
BasicConv(out_channels*2, out_channels, 1, stride=1, padding=0),
BasicConv(out_channels, out_channels*2, 3, stride=1, padding=1),
BasicConv(out_channels*2, out_channels, 1, stride=1, padding=0)
)
def forward(self, x):
return self.conv(x)
YOLO layer를 정의합니다.
# YOLO Layer를 정의합니다.
# YOLO Layer는 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
class YOLOLayer(nn.Module):
def __init__(self, channels, anchors, num_classes=20, img_dim=416):
super().__init__()
self.anchors = anchors # three anchors per YOLO Layer
self.num_anchors = len(anchors) # 3
self.num_classes = num_classes # VOC classes 20
self.img_dim = img_dim # 입력 이미지 크기 416
self.grid_size = 0
# 예측을 수행하기 전, smooth conv layer 입니다.
self.conv = nn.Sequential(
BasicConv(channels, channels*2, 3, stride=1, padding=1),
nn.Conv2d(channels*2, 75, 1, stride=1, padding=0)
)
def forward(self, x):
x = self.conv(x)
# prediction
# x: batch, channels, W, H
batch_size = x.size(0)
grid_size = x.size(2) # S = 13 or 26 or 52
device = x.device
prediction = x.view(batch_size, self.num_anchors, self.num_classes + 5,
grid_size, grid_size) # shape = (batch, 3, 25, S, S)
# shape change (batch, 3, 25, S, S) -> (batch, 3, S, S, 25)
prediction = prediction.permute(0, 1, 3, 4, 2)
prediction = prediction.contiguous()
obj_score = torch.sigmoid(prediction[..., 4]) # Confidence: 1 if object, else 0
pred_cls = torch.sigmoid(prediction[..., 5:]) # 바운딩 박스 좌표
# grid_size 갱신
if grid_size != self.grid_size:
# grid_size를 갱신하고, transform_outputs 함수를 위해 anchor 박스를 전처리 합니다.
self.compute_grid_offsets(grid_size, cuda=x.is_cuda)
# calculate bounding box coordinates
pred_boxes = self.transform_outputs(prediction)
# output shape(batch, num_anchors x S x S, 25)
# ex) at 13x13 -> [batch, 507, 25], at 26x26 -> [batch, 2028, 25], at 52x52 -> [batch, 10647, 25]
# 최종적으로 YOLO는 10647개의 바운딩박스를 예측합니다.
output = torch.cat((pred_boxes.view(batch_size, -1, 4),
obj_score.view(batch_size, -1, 1),
pred_cls.view(batch_size, -1, self.num_classes)), -1)
return output
# grid_size를 갱신하고, transform_outputs 함수를 위해 anchor 박스를 전처리 합니다.
def compute_grid_offsets(self, grid_size, cuda=True):
self.grid_size = grid_size # ex) 13, 26, 52
self.stride = self.img_dim / self.grid_size
# cell index 생성
# transform_outputs 함수에서 바운딩 박스의 x, y좌표를 예측할 때 사용합니다.
# 1, 1, S, S
self.grid_x = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).type(torch.float32)
# 1, 1, S, S
self.grid_y = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).transpose(3,2).type(torch.float32)
# anchors를 feature map 크기로 정규화, [0~1] 범위
scaled_anchors = [(a_w / self.stride, a_h / self.stride) for a_w, a_h in self.anchors]
# tensor로 변환
self.scaled_anchors = torch.tensor(scaled_anchors, device=device)
# transform_outputs 함수에서 바운딩 박스의 w, h를 예측할 때 사용합니다.
# shape=(3,2) -> (1, 3, 1, 1)
self.anchor_w = self.scaled_anchors[:, 0:1].view((1, self.num_anchors, 1, 1))
self.anchor_h = self.scaled_anchors[:, 1:2].view((1, self.num_anchors, 1, 1))
# 예측한 바운딩 박스 좌표를 계산하는 함수입니다.
def transform_outputs(self, prediction):
# prediction = (batch, num_anchors, S, S, coordinates + classes)
device = prediction.device
x = torch.sigmoid(prediction[..., 0]) # sigmoid(box x), 예측값을 sigmoid로 감싸서 [0~1] 범위
y = torch.sigmoid(prediction[..., 1]) # sigmoid(box y), 예측값을 sigmoid로 감싸서 [0~1] 범위
w = prediction[..., 2] # 예측한 바운딩 박스 너비
h = prediction[..., 3] # 예측한 바운딩 박스 높이
pred_boxes = torch.zeros_like(prediction[..., :4]).to(device)
pred_boxes[..., 0] = x.data + self.grid_x # sigmoid(box x) + cell x 좌표
pred_boxes[..., 1] = y.data + self.grid_y # sigmoid(box y) + cell y 좌표
pred_boxes[..., 2] = torch.exp(w.data) * self.anchor_w
pred_boxes[..., 3] = torch.exp(h.data) * self.anchor_h
return pred_boxes * self.stride
DarkNet을 정의합니다.
class DarkNet(nn.Module):
def __init__(self, anchors, num_blocks=[1,2,8,8,4], num_classes=20):
super().__init__()
# feature extractor
self.conv1 = BasicConv(3, 32, 3, stride=1, padding=1)
self.res_block_1 = self._make_residual_block(64, num_blocks[0]) # 208x208
self.res_block_2 = self._make_residual_block(128, num_blocks[1]) # 104x104
self.res_block_3 = self._make_residual_block(256, num_blocks[2]) # 52x52, FPN lateral connection
self.res_block_4 = self._make_residual_block(512, num_blocks[3]) # 26x26, FPN lateral connection
self.res_block_5 = self._make_residual_block(1024, num_blocks[4]) # 13x13, Top layer
# FPN Top down, conv + upsampling을 수행합니다.
self.topdown_1 = Top_down(1024, 512)
self.topdown_2 = Top_down(768, 256)
self.topdown_3 = Top_down(384, 128)
# FPN lateral connection
# 차원 축소를 위해 사용합니다.
self.lateral_1 = BasicConv(512, 256, 1, stride=1, padding=0)
self.lateral_2 = BasicConv(256, 128, 1, stride=1, padding=0)
# prediction, 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
self.yolo_1 = YOLOLayer(512, anchors=anchors[2]) # 13x13
self.yolo_2 = YOLOLayer(256, anchors=anchors[1]) # 26x26
self.yolo_3 = YOLOLayer(128, anchors=anchors[0]) # 52x52
self.upsample = nn.Upsample(scale_factor=2)
def forward(self, x):
# feature extractor
x = self.conv1(x)
c1 = self.res_block_1(x)
c2 = self.res_block_2(c1)
c3 = self.res_block_3(c2)
c4 = self.res_block_4(c3)
c5 = self.res_block_5(c4)
# FPN Top-downm, Upsample and lateral connection
p5 = self.topdown_1(c5)
p4 = self.topdown_2(torch.cat((self.upsample(p5), self.lateral_1(c4)), 1))
p3 = self.topdown_3(torch.cat((self.upsample(p4), self.lateral_2(c3)), 1))
# prediction
yolo_1 = self.yolo_1(p5)
yolo_2 = self.yolo_2(p4)
yolo_3 = self.yolo_3(p3)
return torch.cat((yolo_1, yolo_2, yolo_3), 1), [yolo_1, yolo_2, yolo_3]
def _make_residual_block(self,in_channels, num_block):
blocks = []
# down sample
blocks.append(BasicConv(in_channels//2, in_channels, 3, stride=2, padding=1))
for i in range(num_block):
blocks.append(ResidualBlock(in_channels))
return nn.Sequential(*blocks)
구축한 모델을 확인합니다.
anchors = [[(10,13),(16,30),(33,23)],[(30,61),(62,45),(59,119)],[(116,90),(156,198),(373,326)]]
x = torch.randn(1, 3, 416, 416)
with torch.no_grad():
model = DarkNet(anchors)
output_cat , output = model(x)
print(output_cat.size())
print(output[0].size(), output[1].size(), output[2].size())
잘 작동하네요!
8. 손실 함수 정의하기
손실 함수를 구현하는 것이 너무 복잡해, 아래 깃허브에서 갖고 왔습니다 .
def get_loss_batch(output,targets, params_loss, opt=None):
ignore_thres=params_loss["ignore_thres"]
scaled_anchors= params_loss["scaled_anchors"] # 정규화된 anchor
mse_loss= params_loss["mse_loss"] # nn.MSELoss
bce_loss= params_loss["bce_loss"] # nn.BCELoss, 이진 분류에서 사용
num_yolos=params_loss["num_yolos"] # 3
num_anchors= params_loss["num_anchors"] # 3
obj_scale= params_loss["obj_scale"] # 1
noobj_scale= params_loss["noobj_scale"] # 100
loss = 0.0
for yolo_ind in range(num_yolos):
yolo_out = output[yolo_ind] # yolo_out: batch, num_boxes, class+coordinates
batch_size, num_bbxs, _ = yolo_out.shape
# get grid size
gz_2 = num_bbxs/num_anchors # ex) at 13x13, 507 / 3
grid_size=int(np.sqrt(gz_2))
# (batch, num_boxes, class+coordinates) -> (batch, num_anchors, S, S, class+coordinates)
yolo_out = yolo_out.view(batch_size, num_anchors, grid_size, grid_size, -1)
pred_boxes = yolo_out[:,:,:,:,:4] # get box coordinates
x,y,w,h = transform_bbox(pred_boxes, scaled_anchors[yolo_ind]) # cell 내에서 x,y 좌표와
pred_conf = yolo_out[:,:,:,:,4] # get confidence
pred_cls_prob = yolo_out[:,:,:,:,5:]
yolo_targets = get_yolo_targets({
'pred_cls_prob':pred_cls_prob,
'pred_boxes':pred_boxes,
'targets':targets,
'anchors':scaled_anchors[yolo_ind],
'ignore_thres':ignore_thres,
})
obj_mask=yolo_targets["obj_mask"]
noobj_mask=yolo_targets["noobj_mask"]
tx=yolo_targets["tx"]
ty=yolo_targets["ty"]
tw=yolo_targets["tw"]
th=yolo_targets["th"]
tcls=yolo_targets["tcls"]
t_conf=yolo_targets["t_conf"]
loss_x = mse_loss(x[obj_mask], tx[obj_mask])
loss_y = mse_loss(y[obj_mask], ty[obj_mask])
loss_w = mse_loss(w[obj_mask], tw[obj_mask])
loss_h = mse_loss(h[obj_mask], th[obj_mask])
loss_conf_obj = bce_loss(pred_conf[obj_mask], t_conf[obj_mask])
loss_conf_noobj = bce_loss(pred_conf[noobj_mask], t_conf[noobj_mask])
loss_conf = obj_scale * loss_conf_obj + noobj_scale * loss_conf_noobj
loss_cls = bce_loss(pred_cls_prob[obj_mask], tcls[obj_mask])
loss += loss_x + loss_y + loss_w + loss_h + loss_conf + loss_cls
if opt is not None:
opt.zero_grad()
loss.backward()
opt.step()
return loss.item()
transform_bbox는 전체 이미지의 x,y 좌표에서 샐 내의 xy 좌표로 변경합니다. 또한 w, h를 anchor box에 맞게 변경합니다.
def transform_bbox(bbox, anchors):
# bbox: predicted bbox coordinates
# anchors: scaled anchors
x = bbox[:,:,:,:,0]
y = bbox[:,:,:,:,1]
w = bbox[:,:,:,:,2]
h = bbox[:,:,:,:,3]
anchor_w = anchors[:,0].view((1,3,1,1))
anchor_h = anchors[:,1].view((1,3,1,1))
x=x-x.floor() # 전체 이미지의 x 좌표에서 셀 내의 x좌표로 변경
y=y-y.floor() # 전체 이미지의 y 좌표에서 셀 내의 y좌표로 변경
w=torch.log(w / anchor_w + 1e-16)
h=torch.log(h / anchor_h + 1e-16)
return x, y, w, h
get_target_yolo는 ground truth와 iou가 가장 높은 anchor를 object가 있다고 할당하고, iou > threshhold인 anchor도 object 가 있다고 할당합니다. 나머지 anchor는 무시합니다. 또한 바운딩 박스 예측 좌표, cls, confidence를 생성합니다.
def get_yolo_targets(params):
pred_boxes = params['pred_boxes']
pred_cls_prob = params['pred_cls_prob']
target = params['targets'] # batchsize, cls, cx, cy, w, h
anchors = params['anchors']
ignore_thres = params['ignore_thres']
batch_size = pred_boxes.size(0)
num_anchors = pred_boxes.size(1)
grid_size = pred_boxes.size(2)
num_cls = pred_cls_prob.size(-1)
sizeT = batch_size, num_anchors, grid_size, grid_size
obj_mask = torch.zeros(sizeT, device=device, dtype=torch.uint8)
noobj_mask = torch.ones(sizeT, device=device, dtype=torch.uint8)
tx = torch.zeros(sizeT, device=device, dtype=torch.float32)
ty = torch.zeros(sizeT, device=device, dtype=torch.float32)
tw = torch.zeros(sizeT, device=device, dtype=torch.float32)
th = torch.zeros(sizeT, device=device, dtype=torch.float32)
sizeT = batch_size, num_anchors, grid_size, grid_size, num_cls
tcls = torch.zeros(sizeT, device=device, dtype=torch.float32)
# target = batch, cx, cy, w, h, class
target_bboxes = target[:, 1:5] * grid_size
t_xy = target_bboxes[:, :2]
t_wh = target_bboxes[:, 2:]
t_x, t_y = t_xy.t() # .t(): 전치
t_w, t_h = t_wh.t() # .t(): 전치
grid_i, grid_j = t_xy.long().t() # .long(): int로 변환
# anchor와 target의 iou 계산
iou_with_anchors = [get_iou_WH(anchor, t_wh) for anchor in anchors]
iou_with_anchors = torch.stack(iou_with_anchors)
best_iou_wa, best_anchor_ind = iou_with_anchors.max(0) # iou가 가장 높은 anchor 추출
batch_inds, target_labels = target[:, 0].long(), target[:, 5].long()
obj_mask[batch_inds, best_anchor_ind, grid_j, grid_i] = 1 # iou가 가장 높은 anchor 할당
noobj_mask[batch_inds, best_anchor_ind, grid_j, grid_i] = 0
# threshold 보다 높은 iou를 지닌 anchor
# iou가 가장 높은 anchor만 할당하면 되기 때문입니다.
for ind, iou_wa in enumerate(iou_with_anchors.t()):
noobj_mask[batch_inds[ind], iou_wa > ignore_thres, grid_j[ind], grid_i[ind]] = 0
# cell 내에서 x,y로 변환
tx[batch_inds, best_anchor_ind, grid_j, grid_i] = t_x - t_x.float()
ty[batch_inds, best_anchor_ind, grid_j, grid_i] = t_y - t_y.float()
anchor_w = anchors[best_anchor_ind][:, 0]
tw[batch_inds, best_anchor_ind, grid_j, grid_i] = torch.log(t_w / anchor_w + 1e-16)
anchor_h = anchors[best_anchor_ind][:, 1]
th[batch_inds, best_anchor_ind, grid_j, grid_i] = torch.log(t_h / anchor_h + 1e-16)
tcls[batch_inds, best_anchor_ind, grid_j, grid_i, target_labels] = 1
output = {
'obj_mask': obj_mask,
'noobj_mask': noobj_mask,
'tx': tx,
'ty': ty,
'tw': tw,
'th': th,
'tcls': tcls,
't_conf': obj_mask.float(),
}
return output
anchor와 target box의 iou를 계산하는 함수입니다.
# anchor와 target box의 iou 계산하는 함수입니다.
def get_iou_WH(wh1, wh2):
wh2 = wh2.t()
w1, h1 = wh1[0], wh1[1]
w2, h2 = wh2[0], wh2[1]
inter_area = torch.min(w1, w2) * torch.min(h1, h2)
union_area = (w1 * h1 + 1e-16) + w2 * h2 - inter_area
return inter_area / union_area
9. 모델 학습하기
학습에 필요한 함수를 정의하고, 학습을 진행합니다.
opt = optim.Adam(model.parameters(), lr=1e-3)
lr_scheduler = ReduceLROnPlateau(opt, mode='min',factor=0.5, patience=20,verbose=1)
# 현재 lr 계산하는 함수
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
# epoch당 loss 계산하는 함수
def loss_epoch(model,params_loss,dataset_dl,sanity_check=False,opt=None):
running_loss=0.0
len_data=len(dataset_dl.dataset)
running_metrics= {}
for xb, yb,_ in dataset_dl:
yb=yb.to(device)
_,output=model(xb.to(device))
loss_b=get_loss_batch(output,yb, params_loss,opt)
running_loss+=loss_b
if sanity_check is True:
break
loss=running_loss/float(len_data)
return loss
import time
def train_val(model, params):
num_epochs=params["num_epochs"]
params_loss=params["params_loss"]
opt=params["optimizer"]
train_dl=params["train_dl"]
val_dl=params["val_dl"]
sanity_check=params["sanity_check"]
lr_scheduler=params["lr_scheduler"]
path2weights=params["path2weights"]
loss_history={
"train": [],
"val": [],
}
best_model_wts = copy.deepcopy(model.state_dict())
best_loss=float('inf')
start_time = time.time()
for epoch in range(num_epochs):
current_lr=get_lr(opt)
print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs - 1, current_lr))
model.train()
train_loss=loss_epoch(model,params_loss,train_dl,sanity_check,opt)
loss_history["train"].append(train_loss)
model.eval()
with torch.no_grad():
val_loss=loss_epoch(model,params_loss,val_dl,sanity_check)
loss_history["val"].append(val_loss)
if val_loss < best_loss:
best_loss = val_loss
best_model_wts = copy.deepcopy(model.state_dict())
torch.save(model.state_dict(), path2weights)
print("Copied best model weights!")
print('Get best val loss')
lr_scheduler.step(val_loss)
if current_lr != get_lr(opt):
print("Loading best model weights!")
model.load_state_dict(best_model_wts)
print("train loss: %.6f, val loss: %.6f, time: %.4f min" %(train_loss, val_loss, (time.time()-start_time)/60))
print("-"*10)
model.load_state_dict(best_model_wts)
return model, loss_history
path2models= "./models/"
if not os.path.exists(path2models):
os.mkdir(path2models)
scaled_anchors=[model.module_list[82][0].scaled_anchors,
model.module_list[94][0].scaled_anchors,
model.module_list[106][0].scaled_anchors]
mse_loss = nn.MSELoss(reduction="sum")
bce_loss = nn.BCELoss(reduction="sum")
params_loss={
"scaled_anchors" : scaled_anchors,
"ignore_thres": 0.5,
"mse_loss": mse_loss,
"bce_loss": bce_loss,
"num_yolos": 3,
"num_anchors": 3,
"obj_scale": 1,
"noobj_scale": 100,
}
params_train={
"num_epochs": 3,
"optimizer": opt,
"params_loss": params_loss,
"train_dl": train_dl,
"val_dl": val_dl,
"sanity_check": False,
"lr_scheduler": lr_scheduler,
"path2weights": path2models+"weights.pt",
}
model,loss_hist=train_val(model,params_train)
1epoch당 25분이 소요되네요. 구현과 학습이 목적이기 때문에 3epoch만 진행해보았습니다. colab 환경에서 학습이 너무 오래 걸리므로 pre trained model을 불러오는 것이 현명합니다.
loss progress를 출력합니다.
num_epochs = params_train['num_epochs']
# Plot train-val loss
plt.title('Train-Val Loss')
plt.plot(range(1, num_epochs+1), loss_hist['train'], label='train')
plt.plot(range(1, num_epochs+1), loss_hist['val'], label='val')
plt.ylabel('Loss')
plt.xlabel('Training Epochs')
plt.legend()
plt.show()
감사합니다.