 안녕하세요! 이번에는 YOLOv3을 PyTorch로 구현하고 학습까지 해보도록 하겠습니다. 작업환경은 Google Colab에서 진행했습니다.


 YOLOv3 논문 리뷰는 아래 포스팅에서 확인하실 수 있습니다.




이번에 읽어볼 논문은 'YOLOv3: An Incermetal Improvement' 입니다.  YOLOv3은 YOLOv2에서 개선된 버전입니다. 예를 들어, FPN을 사용하여 multi-scale에서 feature을 추출하고, shortcut connection을 활용한 D..


 전체 코드는 여기에서 확인하실 수 있습니다.


 아래 코드를 분석하여 구현해보았습니다.


1. Kaggle에서 VOC Dataset 다운로드

 우선, 학습에 필요한 Dataset을 다운로드 하겠습니다. 전처리된 voc dataset을 aladdinpersson 유튜버가 업로드 해두었습니다. 이것을 활용해서 custom dataset을 생성하겠습니다.


 kaggle API를 활용하면 쉽게 dataset을 다운로드 할 수 있습니다.

# download kaggle API
!pip install kaggle --upgrade


 kaggle APL을 사용하려면, kaggle.json 파일이 필요합니다. kaggle.json 파일은 kaggle 홈페이지에 로그인 한뒤에 account 탭에서 다운로드 받을 수 있습니다. 이 파일을 /root/.kaggle/ 에 업로드를 합니다.

# make .kaggle directory
# kaggle APL을 사용하려면, /root/.kaggle/ 에 kaggle.json을 업로드 해야합니다.
!mkdir .kaggle
cd /root/.kaggle/
# kaggle 홈페이지에서 로그인 한뒤에 account 탭에 들어가서 kaggle.json을 다운로드 합니다.
# 다운로드한 kaggle.json을 /root/.kaggle/ 에 업로드 합니다.
from google.colab import files
file_uploaded = files.upload()


 이제 원하는 위치에 VOC dataset을 다운로드 받습니다.

# download pre-processed voc dataset
!kaggle datasets download -d aladdinpersson/pascal-voc-dataset-used-in-yolov3-video


 압축을 해제합니다.

# unzip
!unzip -n pascal-voc-dataset-used-in-yolov3-video.zip


 augmentation을 위한 module을 설치합니다.

# install transformation package
!pip install -U albumentations


2. VOC Custom Dataset 생성하기

 다운로드 받은 VOC dataset으로 custom dataset을 생성하겠습니다.


 필요한 라이브러리를 import 합니다.

# import pakages
import torch
from torch import nn
from torch import optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import utils
from torchsummary import summary
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import to_pil_image
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import cv2
import os
import copy
import numpy as np
import pandas as pd
import random
import albumentations as A
from albumentations.pytorch import ToTensor
%matplotlib inline

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


 VOC class names를 정의합니다.

# VOC class names
classes = [


 Custom dataset을 정의합니다.

class VOCDataset(Dataset):
    def __init__(self, csv_file, img_dir, label_dir, transform=None, trans_params=None):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.trans_params = trans_params

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1]) # /PASCAL_VOC/labels/000009.txt
        img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0]) # /PASCAL_VOC/images/000009.jpg
        image = np.array(Image.open(img_path).convert("RGB")) # albumentation을 적용하기 위해 np.array로 변환합니다.

        labels = None
        if os.path.exists(label_path):
            # np.roll: (class, cx, cy, w, h) -> (cx, cy, w, h, class)
            # np.loadtxt: txt 파일에서 data 불러오기
            labels = np.array(np.roll(np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1).tolist())
            # labels = np.loadtxt(label_path).reshape(-1, 5)

        if self.transform:
            # apply albumentations
            augmentations = self.transform(image=image, bboxes=labels)
            image = augmentations['image']
            targets = augmentations['bboxes']
            # for DataLoader
            # lables: ndarray -> tensor
            # dimension: [batch, cx, cy, w, h, class]
            if targets is not None:
                targets = torch.zeros((len(labels), 6))
                targets[:, 1:] = torch.tensor(labels) 
            targets = labels

        return image, targets, label_path


3. train dataset, val dataset 생성하기

 train dataset과 val dataset을 생성하겠습니다.

# train dataset 생성하기
train_csv_file = '/PASCAL_VOC/train.csv'
label_dir = '/PASCAL_VOC/labels'
img_dir = '/PASCAL_VOC/images'

train_ds = VOCDataset(train_csv_file, img_dir, label_dir)

img, labels, _ = train_ds[1]
print('number of data:',len(train_ds))
print('image size:', img.shape, type(img)) # HxWxC
print('labels shape:', labels.shape, type(labels))  # x1,y1,x2,y2
print('lables \n', labels)


 val dataset을 생성합니다.

# val dataset 생성하기
val_csv_file = '/PASCAL_VOC/test.csv'
label_dir = '/PASCAL_VOC/labels'
img_dir = '/PASCAL_VOC/images'

val_ds = VOCDataset(val_csv_file, img_dir, label_dir)

img, labels, _ = val_ds[1]
print('number of data:',len(val_ds))
print('image size:', img.shape, type(img))
print('labels shape:', labels.shape, type(labels))
print('lables \n', labels)


4. transforms을 정의하고, dataset에 적용하기

 VOC dataset 바운딩박스는 x1, y1, x2, y2로 되어있습니다. Albumentations 모듈로 바운딩 박스를 yolo format인 cx, cy, w, h로 변경합니다.

# transforms 정의하기
scale = 1.0

# for train
train_transforms = A.Compose([
        # 이미지의 maxsize를 max_size로 rescale합니다. aspect ratio는 유지합니다.
        A.LongestMaxSize(max_size=int(IMAGE_SIZE * scale)),
        # min_size보다 작으면 pad
        A.PadIfNeeded(min_height=int(IMAGE_SIZE * scale), min_width=int(IMAGE_SIZE * scale), border_mode=cv2.BORDER_CONSTANT),
        # random crop
        A.RandomCrop(width=IMAGE_SIZE, height=IMAGE_SIZE),
        # brightness, contrast, saturation을 무작위로 변경합니다.
        A.ColorJitter(brightness=0.6, contrast=0.6, saturation=0.6, hue=0.6, p=0.4),
        # transforms 중 하나를 선택해 적용합니다.
                 # shift, scale, rotate 를 무작위로 적용합니다.
                 A.ShiftScaleRotate(rotate_limit=20, p=0.5, border_mode=cv2.BORDER_CONSTANT),
                 # affine 변환
                 A.IAAAffine(shear=15, p=0.5, mode='constant')
        ], p=1.0),
        # 수평 뒤집기
        # blur
        # Contrast Limited Adaptive Histogram Equalization 적용
        # 각 채널의 bit 감소
        # grayscale로 변환
        # 무작위로 channel을 섞기
        # normalize
        A.Normalize(mean=[0,0,0], std=[1,1,1], max_pixel_value=255),
        # (x1, y1, x2, y2) -> (cx, cy, w, h)
        bbox_params=A.BboxParams(format='yolo', min_visibility=0.4, label_fields=[])

# for validation
val_transforms = A.Compose([
        A.LongestMaxSize(max_size=int(IMAGE_SIZE * scale)),
        A.PadIfNeeded(min_height=int(IMAGE_SIZE * scale), min_width=int(IMAGE_SIZE * scale), border_mode=cv2.BORDER_CONSTANT),
        A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255),
        bbox_params=A.BboxParams(format='yolo', min_visibility=0.4, label_fields=[])


 이제 transforms를 데이터셋에 적용합니다.

# 데이터셋에 transforms 적용하기
train_ds.transform = train_transforms
val_ds.transform = val_transforms


5. sample image 출력하기

 transform이 적용된 sample image를 출력하겠습니다.

# 정규화된 x,y,w,h를 이미지 크기에 맞게 변경
def rescale_bbox(bb, W, H):
    x,y,w,h = bb
    return [x*W, y*H, w*W, h*H]

# 바운딩 박스 색상
COLORS = np.random.randint(0, 255, size=(80,3),dtype='uint8')

# image 출력 함수 정의
def show_img_bbox(img, targets, classes=classes):
    if torch.is_tensor(img):
    if torch.is_tensor(targets):
    W, H = img.size
    draw = ImageDraw.Draw(img)

    for tg in targets:
        xc,yc,w,h = bbox

        color = [int(c) for c in COLORS[id_]]

        draw.rectangle(((xc-w/2, yc-h/2), (xc+w/2, yc+h/2)), outline=tuple(color), width=3)
        draw.text((xc-w/2, yc-h/2), name, fill=(255,255,255,0))
# transforms가 적용된 sample image 확인

grid_size = 2
rnd_ind = np.random.randint(0, len(train_ds), grid_size)
print('image indices:',rnd_ind)

plt.figure(figsize=(20, 20))
for i, indice in enumerate(rnd_ind):
    img, label, _ = train_ds[indice]
    plt.subplot(1, grid_size, i+1)
    show_img_bbox(img, label)

6. DataLoader 생성하기

 dataloader 함수의 collate_fn 인자를 설정해야 합니다. 이미지마다 bounding box 개수가 다르므로, 이들을 한꺼번에 배치로 묶어야 하기 때문입니다.

# collate_fn 를 정의합니다.
# collate_fn은 DataLoader의 인자로 사용되며, batch 단위로 imgs와 targets를 묶습니다.
def collate_fn(batch):
    imgs, targets, paths = list(zip(*batch))
    # 빈 박스 제거하기
    targets = [boxes for boxes in targets if boxes is not None]
    # index 설정하기
    for b_i, boxes in enumerate(targets):
        boxes[:, 0] = b_i
    targets = torch.cat(targets, 0)
    imgs = torch.stack([img for img in imgs])
    return imgs, targets, paths


 dataloader를 생성합니다.

# make DataLoader
train_dl = DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_dl = DataLoader(val_ds, batch_size=4, shuffle=True, collate_fn=collate_fn)

# check train_dl
for imgs_batch, tg_batch, path_batch in train_dl:
print(tg_batch.shape, tg_batch.dtype)


7. YOLOv3 모델 구축하기

 YOLOv3은 FPN을 활용하는 모델입니다. 13x13, 26x26, 52x52 3개의 피쳐맵에서 예측을 수행합니다.


 YOLOv3 을 구현하는 방법은 두 가지 입니다.

 1. Configuration file 분석하기

 2. 직접 모델 구현하기


 연습 삼아 두개 다 해보겠습니다.


7-1 configuration file 분석하기

 configuration file을 분석하여, 분석한 결과를 활용하겠습니다. config file은 https://github.com/pjreddie/darknet git을 연동 한뒤에, cfg 폴더에 있는 yolov3-voc.cfg 파일을 사용하겠습니다.

!git clone https://github.com/pjreddie/darknet.git
# yolov3-voc.cfg 경로를 저장합니다.
path2config = '/darknet/cfg/yolov3-voc.cfg'


 config 파일을 분석하는 함수를 정의합니다.

# config 파일을 분석하는 함수를 정의합니다.
def parse_model_config(path2file):
    # cfg 파일 열기
    cfg_file = open(path2file, 'r')
    # 문자열 데이터 읽어오기 
    lines = cfg_file.read().split('\n') #['[net]', '# Testing', '# batch=1', '....' ]

    # 데이터 전처리
    # startswith('#'): 문자열이 # 로 시작하는지 여부를 알려줍니다. 
    lines = [x for x in lines if x and not x.startswith('#')] # ['[net]', 'batch=64', '...']
    # 공백 제거
    lines = [x.rstrip().lstrip() for x in lines]

    blocks_list = []
    for line in lines:
        if line.startswith('['): # [net]
            blocks_list.append({}) # {}
            blocks_list[-1]['type'] = line[1:-1].rstrip() # [{'type': 'net'}]
            key, value = line.split('=') # batch=64 -> batch, 64
            value = value.strip() # 공백 제거
            blocks_list[-1][key.rstrip()] = value.strip() # 'batch':'64'

    return blocks_list


 cfg 파일을 분석하여, blocks_list 를 생성합니다.

# cfg 파일 분석
blocks_list = parse_model_config(path2config)



 cfg 파일을 분석한 blocks_list로 pytorch module list를 생성합니다.

# EmptyLayer를 정의합니다.
# EmptyLayer는 residual unit의 shortcut과 FPN의 lateral connection 용도로 사용합니다.
class EmptyLayer(nn.Module):
    def __init__(self):
# YOLOLayer를 정의합니다.
# YOLOLayer는 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
class YOLOLayer(nn.Module):
    def __init__(self, anchors, num_classes, img_dim=416):
        self.anchors = anchors # three anchor per YOLO layer
        self.num_anchors = len(anchors) # 3
        self.num_classes = num_classes
        self.img_dim = img_dim
        self.grid_size = 0

    def forward(self, x):
        # x: batch_size, channels, H, W
        batch_size = x.size(0)
        grid_size = x.size(2) # S = 13 or 26 or 52
        device = x.device

        prediction = x.view(batch_size, self.num_anchors, self.num_classes + 5, 
                            grid_size, grid_size) # shape = (batch, 3, 25, S, S)

        # (batch, 3, 25, S, S) -> (batch, 3, S, S, 25)
        prediction = prediction.permute(0, 1, 3, 4, 2)
        prediction = prediction.contiguous()

        obj_score = torch.sigmoid(prediction[..., 4]) # 클래스
        pred_cls = torch.sigmoid(prediction[..., 5:]) # 바운딩 박스 좌표

        if grid_size != self.grid_size:
            # grid_size 갱신, cell index 생성, anchor 정규화
            self.compute_grid_offsets(grid_size, cuda=x.is_cuda)

        # bounding box prediction
        pred_boxes = self.transform_outputs(prediction)

        # batch, num_anchor x S x S, 25
        # ex) at 13x13 -> [batch, 507, 25], at 26x26 -> [batch, 2028, 85], at 52x52 -> [batch, 10647, 85]
        # 최종적으로 YOLO는 10647개의 바운딩박스를 예측합니다.
        output = torch.cat((pred_boxes.view(batch_size, -1, 4),
                            obj_score.view(batch_size, -1, 1),
                            pred_cls.view(batch_size, -1, self.num_classes)), -1)
        return output

    def compute_grid_offsets(self, grid_size, cuda=True):
        self.grid_size = grid_size # ex) 13, 26, 52
        self.stride = self.img_dim / self.grid_size # ex) 32, 16, 8

        # cell index 생성
        # 1, 1, S, 1
        self.grid_x = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).type(torch.float32)
        # 1, 1, 1, S
        self.grid_y = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).transpose(3, 2).type(torch.float32)

        # anchors를 feature map 크기로 정규화, [0~1] 범위
        # ex) (10, 13), (16, 30), (33, 23) / stride
        scaled_anchors = [(a_w / self.stride, a_h / self.stride) for a_w, a_h in self.anchors]
        # tensor로 변환
        self.scaled_anchors = torch.tensor(scaled_anchors, device=device)

        # shape=(3,2) -> (1,1,3,1)
        self.anchor_w = self.scaled_anchors[:, 0:1].view((1, self.num_anchors, 1, 1))
        # shape=(3,2) -> (1,1,3,1)
        self.anchor_h = self.scaled_anchors[:, 1:2].view((1, self.num_anchors, 1, 1))

    def transform_outputs(self, prediction):
        # pridiction (batch, 3, S, S, 25)
        device = prediction.device
        x = torch.sigmoid(prediction[..., 0]) # sigmoid(box x), 예측값을 sigmoid로 감싸서 [0~1] 범위
        y = torch.sigmoid(prediction[..., 1]) # sigmoid(box y), 예측값을 sigmoid로 감싸서 [0~1] 범위
        w = prediction[..., 2] # 예측한 바운딩 박스 너비
        h = prediction[..., 3] # 예측한 바운딩 박스 높이

        pred_boxes = torch.zeros_like(prediction[..., :4]).to(device)
        pred_boxes[..., 0] = x.data + self.grid_x # sigmoid(box x) + cell x 좌표
        pred_boxes[..., 1] = y.data + self.grid_y # sigmoid(box y) + cell y 좌표
        pred_boxes[..., 2] = torch.exp(w.data) * self.anchor_w
        pred_boxes[..., 3] = torch.exp(h.data) * self.anchor_h

        return pred_boxes * self.stride


 layer를 생성하는 함수를 정의합니다.

# layer를 생성하는 함수를 정의합니다.
def create_layers(blocks_list):
    hyperparams = blocks_list[0]
    channels_list = [int(hyperparams['channels'])]
    module_list = nn.ModuleList()

    for layer_ind, layer_dict in enumerate(blocks_list[1:]):
        modules = nn.Sequential()

        if layer_dict['type'] == 'convolutional':
            filters = int(layer_dict['filters'])
            kernel_size = int(layer_dict['size'])
            pad = (kernel_size - 1) // 2
            bn = layer_dict.get('batch_normalize', 0)

            conv2d = nn.Conv2d(in_channels=channels_list[-1], out_channels=filters, kernel_size=kernel_size,
                               stride=int(layer_dict['stride']), padding=pad, bias=not bn)
            modules.add_module('conv_{0}'.format(layer_ind), conv2d)

            if bn:
                bn_layer = nn.BatchNorm2d(filters, momentum=0.9, eps=1e-5)
                modules.add_module('batch_norm_{0}'.format(layer_ind), bn_layer)
            if layer_dict['activation'] == 'leaky':
                activn = nn.LeakyReLU(0.1)
                modules.add_module('leky_{0}'.format(layer_ind), activn)

        elif layer_dict["type"] == "upsample":
            stride = int(layer_dict["stride"])
            upsample = nn.Upsample(scale_factor = stride)
            modules.add_module("upsample_{}".format(layer_ind), upsample) 

        elif layer_dict["type"] == "shortcut":
            filters = channels_list[1:][backwards]
            modules.add_module("shortcut_{}".format(layer_ind), EmptyLayer())
        elif layer_dict["type"] == "route":
            layers = [int(x) for x in layer_dict["layers"].split(",")]
            filters = sum([channels_list[1:][l] for l in layers])
            modules.add_module("route_{}".format(layer_ind), EmptyLayer())

        elif layer_dict["type"] == "yolo":
            anchors = [int(a) for a in layer_dict["anchors"].split(",")]
            anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]

            # ex) at 13x13, 'mask': '6,7,8'
            # mask는 anchors index를 의미합니다.
            # yolo layer당 3개의 anchors를 할당 합니다.
            # mask는 yolo layer feature map size에 알맞는 anchors를 설정합니다.
            mask = [int(m) for m in layer_dict["mask"].split(",")]
            anchors = [anchors[i] for i in mask] # 3 anchors
            num_classes = int(layer_dict["classes"]) # 20
            img_size = int(hyperparams["height"]) # 416 
            yolo_layer = YOLOLayer(anchors, num_classes, img_size)
            modules.add_module("yolo_{}".format(layer_ind), yolo_layer)

    return hyperparams, module_list


 생성한 module_list로 Darknet을 정의합니다.


class Darknet(nn.Module):
    def __init__(self, config_path, img_size=416):
        super(Darknet, self).__init__()
        self.blocks_list = parse_model_config(config_path)
        self.hyperparams, self.module_list = create_layers(self.blocks_list)
        self.img_size = img_size
    def forward(self, x):
        img_dim = x.shape[2]
        layer_outputs, yolo_outputs = [], []
        # blocks_list: config 파일 분석한 결과
        # module_list: blocks_list로 생성한 module
        for block, module in zip(self.blocks_list[1:], self.module_list):
            if block["type"] in ["convolutional", "upsample", "maxpool"]:
                x = module(x)        
            elif block["type"] == "shortcut":
                layer_ind = int(block["from"]) # -3
                x = layer_outputs[-1] + layer_outputs[layer_ind] # shortcut connection

            #  {'type': 'yolo', 'mask': '3,4,5', 'anchors': '10,13, ...}
            elif block["type"] == "yolo":
                x= module[0](x) # get yolo layer output
            elif block["type"] == "route": #  {'type': 'route', 'layers': '-1, 61'}
                x = torch.cat([layer_outputs[int(l_i)] 
                               for l_i in block["layers"].split(",")], 1)
        yolo_out_cat = torch.cat(yolo_outputs, 1) # 3개의 output을 하나로 연결
        return yolo_out_cat, yolo_outputs


 모델을 생성하고 확인합니다.

# check model
model = Darknet(path2config).to(device)
with torch.no_grad():
    yolo_out_cat, yolo_outputs=model.forward(x)


 model summary를 출력합니다.

summary(model, (3, 416, 416))


7-2 직접 모델 구현하기

YOLOv3은 DarkNet으로 특징을 추출하고, FPN을 거쳐서 예측을 합니다. 전체 구조입니다.


 DarkNet 구조입니다.


 우선 BasicConv class를 정의합니다.

class BasicConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):

        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False),

    def forward(self, x):
        return self.conv(x)


 residual block을 정의합니다.

class ResidualBlock(nn.Module):
    def __init__(self, channels):

        self.residual = nn.Sequential(
            BasicConv(channels, channels//2, 1, stride=1, padding=0),
            BasicConv(channels//2, channels, 3, stride=1, padding=1)

        self.shortcut = nn.Sequential()

    def forward(self, x):
        x_shortcut = self.shortcut(x)
        x_residual = self.residual(x)

        return x_shortcut + x_residual


 FPN에서 사용하는 Top_down layer를 정의합니다.

# FPN의 Top_down layer 입니다.
# lateral connection과 Upsampling이 concatate 한 뒤에 수행합니다.
class Top_down(nn.Module):
    def __init__(self, in_channels, out_channels):

        self.conv = nn.Sequential(
            BasicConv(in_channels, out_channels, 1, stride=1, padding=0),
            BasicConv(out_channels, out_channels*2, 3, stride=1, padding=1),
            BasicConv(out_channels*2, out_channels, 1, stride=1, padding=0),
            BasicConv(out_channels, out_channels*2, 3, stride=1, padding=1),
            BasicConv(out_channels*2, out_channels, 1, stride=1, padding=0)

    def forward(self, x):
        return self.conv(x)


 YOLO layer를 정의합니다.

# YOLO Layer를 정의합니다.
# YOLO Layer는 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
class YOLOLayer(nn.Module):
    def __init__(self, channels, anchors, num_classes=20, img_dim=416):
        self.anchors = anchors # three anchors per YOLO Layer
        self.num_anchors = len(anchors) # 3
        self.num_classes = num_classes # VOC classes 20
        self.img_dim = img_dim # 입력 이미지 크기 416
        self.grid_size = 0

        # 예측을 수행하기 전, smooth conv layer 입니다.
        self.conv = nn.Sequential(
            BasicConv(channels, channels*2, 3, stride=1, padding=1),
            nn.Conv2d(channels*2, 75, 1, stride=1, padding=0)

    def forward(self, x):
        x = self.conv(x)

        # prediction
        # x: batch, channels, W, H
        batch_size = x.size(0)
        grid_size = x.size(2) # S = 13 or 26 or 52
        device = x.device

        prediction = x.view(batch_size, self.num_anchors, self.num_classes + 5,
                            grid_size, grid_size) # shape = (batch, 3, 25, S, S)
        # shape change (batch, 3, 25, S, S) -> (batch, 3, S, S, 25)
        prediction = prediction.permute(0, 1, 3, 4, 2)
        prediction = prediction.contiguous()

        obj_score = torch.sigmoid(prediction[..., 4]) # Confidence: 1 if object, else 0
        pred_cls = torch.sigmoid(prediction[..., 5:]) # 바운딩 박스 좌표

        # grid_size 갱신
        if grid_size != self.grid_size:
            # grid_size를 갱신하고, transform_outputs 함수를 위해 anchor 박스를 전처리 합니다.
            self.compute_grid_offsets(grid_size, cuda=x.is_cuda)

        # calculate bounding box coordinates
        pred_boxes = self.transform_outputs(prediction)

        # output shape(batch, num_anchors x S x S, 25)
        # ex) at 13x13 -> [batch, 507, 25], at 26x26 -> [batch, 2028, 25], at 52x52 -> [batch, 10647, 25]
        # 최종적으로 YOLO는 10647개의 바운딩박스를 예측합니다.
        output = torch.cat((pred_boxes.view(batch_size, -1, 4),
                    obj_score.view(batch_size, -1, 1),
                    pred_cls.view(batch_size, -1, self.num_classes)), -1)
        return output

    # grid_size를 갱신하고, transform_outputs 함수를 위해 anchor 박스를 전처리 합니다.
    def compute_grid_offsets(self, grid_size, cuda=True):
        self.grid_size = grid_size # ex) 13, 26, 52
        self.stride = self.img_dim / self.grid_size

        # cell index 생성
        # transform_outputs 함수에서 바운딩 박스의 x, y좌표를 예측할 때 사용합니다.
        # 1, 1, S, S
        self.grid_x = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).type(torch.float32)
        # 1, 1, S, S
        self.grid_y = torch.arange(grid_size, device=device).repeat(1, 1, grid_size, 1).transpose(3,2).type(torch.float32)

        # anchors를 feature map 크기로 정규화, [0~1] 범위
        scaled_anchors = [(a_w / self.stride, a_h / self.stride) for a_w, a_h in self.anchors]
        # tensor로 변환
        self.scaled_anchors = torch.tensor(scaled_anchors, device=device)

        # transform_outputs 함수에서 바운딩 박스의 w, h를 예측할 때 사용합니다.
        # shape=(3,2) -> (1, 3, 1, 1)
        self.anchor_w = self.scaled_anchors[:, 0:1].view((1, self.num_anchors, 1, 1))
        self.anchor_h = self.scaled_anchors[:, 1:2].view((1, self.num_anchors, 1, 1))

    # 예측한 바운딩 박스 좌표를 계산하는 함수입니다.
    def transform_outputs(self, prediction):
        # prediction = (batch, num_anchors, S, S, coordinates + classes)
        device = prediction.device
        x = torch.sigmoid(prediction[..., 0]) # sigmoid(box x), 예측값을 sigmoid로 감싸서 [0~1] 범위
        y = torch.sigmoid(prediction[..., 1]) # sigmoid(box y), 예측값을 sigmoid로 감싸서 [0~1] 범위
        w = prediction[..., 2] # 예측한 바운딩 박스 너비
        h = prediction[..., 3] # 예측한 바운딩 박스 높이

        pred_boxes = torch.zeros_like(prediction[..., :4]).to(device)
        pred_boxes[..., 0] = x.data + self.grid_x # sigmoid(box x) + cell x 좌표
        pred_boxes[..., 1] = y.data + self.grid_y # sigmoid(box y) + cell y 좌표
        pred_boxes[..., 2] = torch.exp(w.data) * self.anchor_w
        pred_boxes[..., 3] = torch.exp(h.data) * self.anchor_h

        return pred_boxes * self.stride


 DarkNet을 정의합니다.

class DarkNet(nn.Module):
    def __init__(self, anchors, num_blocks=[1,2,8,8,4], num_classes=20):

        # feature extractor
        self.conv1 = BasicConv(3, 32, 3, stride=1, padding=1)
        self.res_block_1 = self._make_residual_block(64, num_blocks[0]) # 208x208
        self.res_block_2 = self._make_residual_block(128, num_blocks[1]) # 104x104
        self.res_block_3 = self._make_residual_block(256, num_blocks[2]) # 52x52, FPN lateral connection
        self.res_block_4 = self._make_residual_block(512, num_blocks[3]) # 26x26, FPN lateral connection
        self.res_block_5 = self._make_residual_block(1024, num_blocks[4]) # 13x13, Top layer

        # FPN Top down, conv + upsampling을 수행합니다.
        self.topdown_1 = Top_down(1024, 512)
        self.topdown_2 = Top_down(768, 256)
        self.topdown_3 = Top_down(384, 128)

        # FPN lateral connection
        # 차원 축소를 위해 사용합니다.
        self.lateral_1 = BasicConv(512, 256, 1, stride=1, padding=0)
        self.lateral_2 = BasicConv(256, 128, 1, stride=1, padding=0)

        # prediction, 13x13, 26x26, 52x52 피쳐맵에서 예측을 수행합니다.
        self.yolo_1 = YOLOLayer(512, anchors=anchors[2]) # 13x13
        self.yolo_2 = YOLOLayer(256, anchors=anchors[1]) # 26x26
        self.yolo_3 = YOLOLayer(128, anchors=anchors[0]) # 52x52

        self.upsample = nn.Upsample(scale_factor=2)

    def forward(self, x):
        # feature extractor
        x = self.conv1(x)
        c1 = self.res_block_1(x)
        c2 = self.res_block_2(c1)
        c3 = self.res_block_3(c2)
        c4 = self.res_block_4(c3)
        c5 = self.res_block_5(c4)

        # FPN Top-downm, Upsample and lateral connection 
        p5 = self.topdown_1(c5)
        p4 = self.topdown_2(torch.cat((self.upsample(p5), self.lateral_1(c4)), 1))
        p3 = self.topdown_3(torch.cat((self.upsample(p4), self.lateral_2(c3)), 1))

        # prediction
        yolo_1 = self.yolo_1(p5)
        yolo_2 = self.yolo_2(p4)
        yolo_3 = self.yolo_3(p3)

        return torch.cat((yolo_1, yolo_2, yolo_3), 1), [yolo_1, yolo_2, yolo_3]

    def _make_residual_block(self,in_channels, num_block):
        blocks = []

        # down sample
        blocks.append(BasicConv(in_channels//2, in_channels, 3, stride=2, padding=1))

        for i in range(num_block):
        return nn.Sequential(*blocks)


 구축한 모델을 확인합니다.

anchors = [[(10,13),(16,30),(33,23)],[(30,61),(62,45),(59,119)],[(116,90),(156,198),(373,326)]]
x = torch.randn(1, 3, 416, 416)
with torch.no_grad():
    model = DarkNet(anchors)
    output_cat , output = model(x)
    print(output[0].size(), output[1].size(), output[2].size())


 잘 작동하네요!


8. 손실 함수 정의하기

 손실 함수를 구현하는 것이 너무 복잡해, 아래 깃허브에서 갖고 왔습니다 .



def get_loss_batch(output,targets, params_loss, opt=None):
    scaled_anchors= params_loss["scaled_anchors"] # 정규화된 anchor   
    mse_loss= params_loss["mse_loss"] # nn.MSELoss
    bce_loss= params_loss["bce_loss"] # nn.BCELoss, 이진 분류에서 사용
    num_yolos=params_loss["num_yolos"] # 3
    num_anchors= params_loss["num_anchors"] # 3
    obj_scale= params_loss["obj_scale"] # 1
    noobj_scale= params_loss["noobj_scale"] # 100

    loss = 0.0

    for yolo_ind in range(num_yolos):
        yolo_out = output[yolo_ind] # yolo_out: batch, num_boxes, class+coordinates
        batch_size, num_bbxs, _ = yolo_out.shape

        # get grid size
        gz_2 = num_bbxs/num_anchors # ex) at 13x13, 507 / 3

        # (batch, num_boxes, class+coordinates) -> (batch, num_anchors, S, S, class+coordinates)
        yolo_out = yolo_out.view(batch_size, num_anchors, grid_size, grid_size, -1)

        pred_boxes = yolo_out[:,:,:,:,:4] # get box coordinates
        x,y,w,h = transform_bbox(pred_boxes, scaled_anchors[yolo_ind]) # cell 내에서 x,y 좌표와  
        pred_conf = yolo_out[:,:,:,:,4] # get confidence
        pred_cls_prob = yolo_out[:,:,:,:,5:]

        yolo_targets = get_yolo_targets({


        loss_x = mse_loss(x[obj_mask], tx[obj_mask])
        loss_y = mse_loss(y[obj_mask], ty[obj_mask])
        loss_w = mse_loss(w[obj_mask], tw[obj_mask])
        loss_h = mse_loss(h[obj_mask], th[obj_mask])
        loss_conf_obj = bce_loss(pred_conf[obj_mask], t_conf[obj_mask])
        loss_conf_noobj = bce_loss(pred_conf[noobj_mask], t_conf[noobj_mask])
        loss_conf = obj_scale * loss_conf_obj + noobj_scale * loss_conf_noobj
        loss_cls = bce_loss(pred_cls_prob[obj_mask], tcls[obj_mask])
        loss += loss_x + loss_y + loss_w + loss_h + loss_conf + loss_cls
    if opt is not None:
    return loss.item()


 transform_bbox는 전체 이미지의 x,y 좌표에서 샐 내의 xy 좌표로 변경합니다. 또한 w, h를 anchor box에 맞게 변경합니다.

def transform_bbox(bbox, anchors):
    # bbox: predicted bbox coordinates
    # anchors: scaled anchors

    x = bbox[:,:,:,:,0]
    y = bbox[:,:,:,:,1]
    w = bbox[:,:,:,:,2]
    h = bbox[:,:,:,:,3]
    anchor_w = anchors[:,0].view((1,3,1,1))
    anchor_h = anchors[:,1].view((1,3,1,1))

    x=x-x.floor() # 전체 이미지의 x 좌표에서 셀 내의 x좌표로 변경
    y=y-y.floor() # 전체 이미지의 y 좌표에서 셀 내의 y좌표로 변경
    w=torch.log(w / anchor_w + 1e-16)
    h=torch.log(h / anchor_h + 1e-16)
    return x, y, w, h


 get_target_yolo는 ground truth와 iou가 가장 높은 anchor를 object가 있다고 할당하고, iou > threshhold인 anchor도 object 가 있다고 할당합니다. 나머지 anchor는 무시합니다. 또한 바운딩 박스 예측 좌표, cls, confidence를 생성합니다.

def get_yolo_targets(params):
    pred_boxes = params['pred_boxes']
    pred_cls_prob = params['pred_cls_prob']
    target = params['targets'] # batchsize, cls, cx, cy, w, h
    anchors = params['anchors']
    ignore_thres = params['ignore_thres']

    batch_size = pred_boxes.size(0)
    num_anchors = pred_boxes.size(1)
    grid_size = pred_boxes.size(2)
    num_cls = pred_cls_prob.size(-1)

    sizeT = batch_size, num_anchors, grid_size, grid_size
    obj_mask = torch.zeros(sizeT, device=device, dtype=torch.uint8)
    noobj_mask = torch.ones(sizeT, device=device, dtype=torch.uint8)
    tx = torch.zeros(sizeT, device=device, dtype=torch.float32)
    ty = torch.zeros(sizeT, device=device, dtype=torch.float32)
    tw = torch.zeros(sizeT, device=device, dtype=torch.float32)
    th = torch.zeros(sizeT, device=device, dtype=torch.float32)

    sizeT = batch_size, num_anchors, grid_size, grid_size, num_cls
    tcls = torch.zeros(sizeT, device=device, dtype=torch.float32)

    # target = batch, cx, cy, w, h, class
    target_bboxes = target[:, 1:5] * grid_size
    t_xy = target_bboxes[:, :2]
    t_wh = target_bboxes[:, 2:]
    t_x, t_y = t_xy.t() # .t(): 전치
    t_w, t_h = t_wh.t() # .t(): 전치

    grid_i, grid_j = t_xy.long().t() # .long(): int로 변환

    # anchor와 target의 iou 계산
    iou_with_anchors = [get_iou_WH(anchor, t_wh) for anchor in anchors]
    iou_with_anchors = torch.stack(iou_with_anchors)
    best_iou_wa, best_anchor_ind = iou_with_anchors.max(0) # iou가 가장 높은 anchor 추출

    batch_inds, target_labels = target[:, 0].long(), target[:, 5].long()
    obj_mask[batch_inds, best_anchor_ind, grid_j, grid_i] = 1 # iou가 가장 높은 anchor 할당
    noobj_mask[batch_inds, best_anchor_ind, grid_j, grid_i] = 0

    # threshold 보다 높은 iou를 지닌 anchor
    # iou가 가장 높은 anchor만 할당하면 되기 때문입니다.
    for ind, iou_wa in enumerate(iou_with_anchors.t()):
        noobj_mask[batch_inds[ind], iou_wa > ignore_thres, grid_j[ind], grid_i[ind]] = 0

    # cell 내에서 x,y로 변환
    tx[batch_inds, best_anchor_ind, grid_j, grid_i] = t_x - t_x.float()
    ty[batch_inds, best_anchor_ind, grid_j, grid_i] = t_y - t_y.float()

    anchor_w = anchors[best_anchor_ind][:, 0]
    tw[batch_inds, best_anchor_ind, grid_j, grid_i] = torch.log(t_w / anchor_w + 1e-16)

    anchor_h = anchors[best_anchor_ind][:, 1]
    th[batch_inds, best_anchor_ind, grid_j, grid_i] = torch.log(t_h / anchor_h + 1e-16)

    tcls[batch_inds, best_anchor_ind, grid_j, grid_i, target_labels] = 1

    output = {
        'obj_mask': obj_mask,
        'noobj_mask': noobj_mask,
        'tx': tx,
        'ty': ty,
        'tw': tw,
        'th': th,
        'tcls': tcls,
        't_conf': obj_mask.float(),
    return output


 anchor와 target box의 iou를 계산하는 함수입니다.

# anchor와 target box의 iou 계산하는 함수입니다.
def get_iou_WH(wh1, wh2):
    wh2 = wh2.t()
    w1, h1 = wh1[0], wh1[1]
    w2, h2 = wh2[0], wh2[1]
    inter_area = torch.min(w1, w2) * torch.min(h1, h2)
    union_area = (w1 * h1 + 1e-16) + w2 * h2 - inter_area
    return inter_area / union_area


9. 모델 학습하기

 학습에 필요한 함수를 정의하고, 학습을 진행합니다.

opt = optim.Adam(model.parameters(), lr=1e-3)
lr_scheduler = ReduceLROnPlateau(opt, mode='min',factor=0.5, patience=20,verbose=1)
# 현재 lr 계산하는 함수
def get_lr(opt):
    for param_group in opt.param_groups:
        return param_group['lr']
# epoch당 loss 계산하는 함수
def loss_epoch(model,params_loss,dataset_dl,sanity_check=False,opt=None):
    running_metrics= {}
    for xb, yb,_ in dataset_dl:
        loss_b=get_loss_batch(output,yb, params_loss,opt)
        if sanity_check is True:
    return loss
import time
def train_val(model, params):
        "train": [],
        "val": [],
    best_model_wts = copy.deepcopy(model.state_dict())
    start_time = time.time()
    for epoch in range(num_epochs):
        print('Epoch {}/{}, current lr={}'.format(epoch, num_epochs - 1, current_lr)) 
        with torch.no_grad():
        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), path2weights)
            print("Copied best model weights!")
            print('Get best val loss')
        if current_lr != get_lr(opt):
            print("Loading best model weights!")
        print("train loss: %.6f, val loss: %.6f, time: %.4f min" %(train_loss, val_loss, (time.time()-start_time)/60))
    return model, loss_history
path2models= "./models/"
if not os.path.exists(path2models):
mse_loss = nn.MSELoss(reduction="sum")
bce_loss = nn.BCELoss(reduction="sum")
    "scaled_anchors" : scaled_anchors,
    "ignore_thres": 0.5,
    "mse_loss": mse_loss,
    "bce_loss": bce_loss,
    "num_yolos": 3,
    "num_anchors": 3,
    "obj_scale": 1,
    "noobj_scale": 100,
    "num_epochs": 3,
    "optimizer": opt,
    "params_loss": params_loss,
    "train_dl": train_dl,
    "val_dl": val_dl,
    "sanity_check": False,
    "lr_scheduler": lr_scheduler,
    "path2weights": path2models+"weights.pt",


 1epoch당 25분이 소요되네요. 구현과 학습이 목적이기 때문에 3epoch만 진행해보았습니다. colab 환경에서 학습이 너무 오래 걸리므로 pre trained model을 불러오는 것이 현명합니다.


 loss progress를 출력합니다.

num_epochs = params_train['num_epochs']

# Plot train-val loss
plt.title('Train-Val Loss')
plt.plot(range(1, num_epochs+1), loss_hist['train'], label='train')
plt.plot(range(1, num_epochs+1), loss_hist['val'], label='val')
plt.xlabel('Training Epochs')


