1. 목표

딥러닝 이미지 모델로 유명한 모델인 U-Net을 pytorch로 구현하는 것을 목표로 했다. 평소에 해보고 싶었던 이미지 세그먼트를 수행했고, 데이터는 ISBI 2012 EM Segmentation Challenge에 사용된 membrane 데이터셋을 사용했다. 자동차 번호판 인식을 해보고 싶었는데 그건 다음에 시도해보도록 하겠다. 데이터셋은 train / val / test 로 모두 나눠서 저장했다.

test.zip
7.29MB

모든 소스 코드는 깃허브에 업로드했다.

[GitHub - Hyunmok-Park/Torch_U_Net

Contribute to Hyunmok-Park/Torch_U_Net development by creating an account on GitHub.

github.com](https://github.com/Hyunmok-Park/Torch_U_Net)

2. 데이터셋

데이터는 위의 dataset 폴더에 저장했으며 정규화 전처리 정도만 추가해서 사용했다. 흑백 이미지이기 때문에 채널수가 1개라서 의도적으로 차원을 확장해서 (1, 512, 512) 크기를 맞춰주었다.

import os.path
import random
import shutil

from glob import glob
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torchvision.transforms as transforms


class Dataset(Dataset):
    def __init__(self, data_path, mode='train', transform=None):
        self.transform = transform

        self.labels = [file for file in sorted(glob(f"{data_path}/{mode}/label_*.npy"))]
        self.inputs = [file for file in sorted(glob(f"{data_path}/{mode}/input_*.npy"))]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        labels = np.load(self.labels[index])
        inputs = np.load(self.inputs[index])

        # 정규화
        labels = labels/255.0
        inputs = inputs/255.0

        if labels.ndim == 2:
            labels = labels[np.newaxis, :, :]
        if inputs.ndim == 2:
            inputs = inputs[np.newaxis, :, :]

        return inputs, labels


def load_dataset(data_path='dataset', mode='train', batch_size=4, img_size=512):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(0.5, 0.5)
    ])
    dataset = Dataset(data_path, mode, transform)
    data_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)
    return data_loader

3. 모델 소개

모델은 인코더, 디코더, 인코더와 디코더를 잇는 브릿지로 크게 3가지 단계로 구성했다. 인코더와 디코더에서는 반복되는 구조를 하나의 클래스로 작성해서 번거로움을 줄였다.
인코더, 디코더를 구성하는 Conv2d 는 모두 kernel-size=3, stride=1, padding=1 로 고정했고, pool의 kernel-size=2로 고정했다. 다만 padding=0으로 설정했어야 한다고 배웠는데 이 부분은 추가적인 확인이 필요했다. 현재는 padding=0 으로 설정할 경우, 이미지의 크기가 맞지 않아서 모델 학습이 불가능했다.

3.1 Encoder block

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, bias, pool_kernel_size):
        super().__init__()

        self.convlayer1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            bias=bias
        )

        self.convlayer2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            bias=bias
        )

        self.batchnorm1 = nn.BatchNorm2d(num_features=out_channels)
        self.batchnorm2 = nn.BatchNorm2d(num_features=out_channels)

        self.layer = nn.Sequential(
            self.convlayer1,
            self.batchnorm1,
            nn.ReLU(),
            self.convlayer2,
            self.batchnorm2,
            nn.ReLU(),
        )

        self.pool = nn.MaxPool2d(kernel_size=pool_kernel_size)

    def forward(self, inputs):
        output_for_decoder = self.layer(inputs)
        output_for_next = self.pool(output_for_decoder)
        return output_for_next, output_for_decoder

3.2 Decoder block

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, bias, pool_kernel_size):
        super().__init__()

        self.convlayer1 = nn.Conv2d(
            in_channels=out_channels * 2,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            bias=bias
        )

        self.convlayer2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            bias=bias
        )

        self.batchnorm1 = nn.BatchNorm2d(num_features=out_channels)
        self.batchnorm2 = nn.BatchNorm2d(num_features=out_channels)

        self.pool = nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels, kernel_size=pool_kernel_size, stride=2, padding=0)

        self.layer = nn.Sequential(
            self.convlayer1,
            self.batchnorm1,
            nn.ReLU(),
            self.convlayer2,
            self.batchnorm2,
            nn.ReLU()
        )


    def forward(self, decoder_output, encoder_output):
        inputs = self.pool(decoder_output)
        inputs = torch.cat([encoder_output, inputs], dim=1)
        output = self.layer(inputs)
        return output

3.3 U-Net

class UNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.e_block1 = EncoderBlock(1, 64, 3, 1, 1, True, 2)
        self.e_block2 = EncoderBlock(64, 128, 3, 1, 1, True, 2)
        self.e_block3 = EncoderBlock(128, 256, 3, 1, 1, True, 2)
        self.e_block4 = EncoderBlock(256, 512, 3, 1, 1, True, 2)

        self.bridge = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU()
        )

        self.d_block1 = DecoderBlock(1024, 512, 3, 1, 1, True, 2)
        self.d_block2 = DecoderBlock(512, 256, 3, 1, 1, True, 2)
        self.d_block3 = DecoderBlock(256, 128, 3, 1, 1, True, 2)
        self.d_block4 = DecoderBlock(128, 64, 3, 1, 1, True, 2)

        self.final_conv = nn.Conv2d(in_channels=64, out_channels=1, kernel_size=1, stride=1, padding=0)

    def forward(self, inputs):
        e_output1, d_inputs1 = self.e_block1(inputs)
        e_output2, d_inputs2 = self.e_block2(e_output1)
        e_output3, d_inputs3 = self.e_block3(e_output2)
        e_output4, d_inputs4 = self.e_block4(e_output3)

        e_output = self.bridge(e_output4)

        d_output1 = self.d_block1(e_output, d_inputs4)
        d_output2 = self.d_block2(d_output1, d_inputs3)
        d_output3 = self.d_block3(d_output2, d_inputs2)
        d_output4 = self.d_block4(d_output3, d_inputs1)

        output = self.final_conv(d_output4)

        return output

4. 학습

학습 코드는 평소와 마찬가지로 config을 입력받아서 진행하도록 작성하였다. 모델 저장 기준으로 validation 데이터셋에 대한 결과를 추가했다. 테스트 단계에서 epoch 기준으로 저장한 모델과 validation 결과를 기준으로 저장한 모델을 비교한 결과, validation 모델이 조금은 더 좋아보였다.

from datetime import datetime
import os

import matplotlib.pyplot as plt
import torch
from torch import nn
from torch import optim
from tqdm import tqdm
import numpy as np

from data_factory.data_loader import load_dataset
from model.UNet import UNet
from torchvision.utils import save_image

def train(config):
    EXP_NAME = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    os.makedirs(f"result/{EXP_NAME}", exist_ok=True)
    os.makedirs(f"result/{EXP_NAME}/test", exist_ok=True)

    train_loader = load_dataset(config['data_path'], 'train', config['batch_size'], config['img_size'])
    val_loader = load_dataset(config['data_path'], 'val', config['batch_size'], config['img_size'])
    test_loader = load_dataset(config['data_path'], 'test', 1, config['img_size'])

    model = UNet().to(config['device'])
    criterion = nn.BCEWithLogitsLoss().to(config['device'])
    optimizer = optim.Adam(params=model.parameters(), lr=config['learning_rate'])

    train_loss = []
    val_loss = []

    min_val_loss = np.inf

    for epoch in tqdm(range(config['epoch']), desc='EPOCH'):
        train_loss_batch = []
        val_loss_batch = []

        ###########
        #  TRAIN  #
        ###########
        model.train()
        for batch, (inputs, label) in enumerate(train_loader):
            optimizer.zero_grad()
            inputs = inputs.to(config['device']).type(torch.float32)
            label = label.to(config['device']).type(torch.float32)
            output = model.forward(inputs)
            loss = criterion(label, output)
            loss.backward()
            optimizer.step()
            train_loss_batch.append(loss.item())

        torch.save(model.state_dict(), f'result/{EXP_NAME}/checkpoint.pth')
        train_loss.append(np.mean(train_loss_batch))

        ##########
        #  EVAL  #
        ##########
        model.eval()
        for batch, (inputs, label) in enumerate(val_loader):
            inputs = inputs.to(config['device']).type(torch.float32)
            label = label.to(config['device']).type(torch.float32)
            output = model.forward(inputs)
            loss = criterion(label, output)
            val_loss_batch.append(loss.item())

        if min_val_loss > np.mean(val_loss_batch):
            min_val_loss = np.mean(val_loss_batch)
            torch.save(model.state_dict(), f'result/{EXP_NAME}/checkpoint_best_val.pth')

        val_loss.append(np.mean(val_loss_batch))

    ##########
    #  TEST  #
    ##########
    to_class = lambda x: 1.0 * (x > 0.5)
    model.load_state_dict(torch.load(f'result/{EXP_NAME}/checkpoint.pth'))
    model.eval()
    for idx, (inputs, label) in enumerate(test_loader):
        inputs = inputs.to(config['device']).type(torch.float32)
        label = label.to(config['device']).type(torch.float32)
        output = model.forward(inputs)

        inputs = inputs.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        label = label.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        output = output.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        output = to_class(output)

        f, ax = plt.subplots(1, 3, figsize=(10, 4))
        ax[0].imshow(inputs, cmap='gray')
        ax[1].imshow(label, cmap='gray')
        ax[2].imshow(output, cmap='gray')

        f.savefig(f"result/{EXP_NAME}/test/{idx}.png")
        plt.close()

    model.load_state_dict(torch.load(f'result/{EXP_NAME}/checkpoint_best_val.pth'))
    model.eval()
    for idx, (inputs, label) in enumerate(test_loader):
        inputs = inputs.to(config['device']).type(torch.float32)
        label = label.to(config['device']).type(torch.float32)
        output = model.forward(inputs)

        inputs = inputs.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        label = label.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        output = output.detach().cpu().numpy().reshape(config['img_size'], config['img_size'])
        output = to_class(output)

        f, ax = plt.subplots(1, 3, figsize=(10, 4))
        ax[0].imshow(inputs, cmap='gray')
        ax[1].imshow(label, cmap='gray')
        ax[2].imshow(output, cmap='gray')

        f.savefig(f"result/{EXP_NAME}/test/{idx}_val.png")
        plt.close()

    f, ax = plt.subplots(1, 1, figsize=(10, 2))
    ax.plot(train_loss, color='blue')
    ax.plot(val_loss, color='red')

    f.savefig(f"result/{EXP_NAME}/train_loss.png")
    plt.close()


if __name__ == '__main__':
    config = {}
    config['num_channel'] = 3
    config['img_size'] = 512

    config['data_path'] = 'dataset'

    config['batch_size'] = 4
    config['learning_rate'] = 0.0001
    config['epoch'] = 200

    config['device'] = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    train(config)

5. 결과

왼쪽의 결과가 validation 기준, 오른쪽이 epoch 기준으로 저장한 모델이다. 하나의 이미지에서는 왼쪽부터 실제 입력, 정답, 테스트 결과이다. 어느 쪽이 더 좋다고 단언하기는 어렵지만 왼쪽의 모델이 조금 더 세밀하게 조각을 나누지 않았나싶다.

6. 마치며

Pytorch 라이브러리를 사용해서 U-Net을 직접 구현해보고 학습까지 진행했다. 이미지쪽은 워낙 아는 바가 없어서 구현에서 실행까지 생각보다 시간을 오래 잡아먹었던 것 같다. 얕게나마 알아본 결과, 실제 U-net에서는 다양한 학습 테크닉을 추가했던 것 같다. 이미지라는게 다양한 기법을 통해 학습 이미지를 약간 변형해서 성능을 높일 수 있는 것으로 알고 있다. 다음에는 다른 데이터셋을 사용해서 성능을 확인해보고자 한다.

'3. Dev > 모델 구현' 카테고리의 다른 글

Pytorch로 GAN 구현하기(+ mnist 데이터)  (0) 2023.01.27
Pytorch로 GAN 구현하기  (1) 2023.01.24
Pytorch로 Transformer 구현하기  (0) 2022.10.20
Pytorch로 DQN 구현하기(+ 팩맨)  (0) 2022.10.20

1. 수정사항

모델 적절성 판단을 위해 MNIST 데이터셋 추가
https://github.com/Hyunmok-Park/PePe_GAN/pull/1
모델 레이어 일부 수정
https://github.com/Hyunmok-Park/PePe_GAN/pull/2

2. MNIST 적용

2.1 MNIST 데이터셋 추가

모델의 적절성을 판단하기 위해 MNIST 데이터셋을 사용해서 성능을 확인했다. 기존 데이터 로더 코드에 torchvision.dsets.MNIST를 추가하였고, 간단한 정규화만 적용해서 우선 결과를 확인했다.

if dataset == 'mnist':
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(0.5, 0.5)
    ])
    dataset = dsets.MNIST(root=f'{data_path}/MNIST_data/',
                          train=True,
                          transform=transform,
                          download=True)

기존과 동일한 모델을 사용하여 학습한 결과, 어느 정도 숫자가 생성되었지만 분명하지 않은 데이터가 대부분이었다. 그래서 모델 자체가 한계가 존재한다고 판단했고 일부 수정과정을 거쳤다.

2.2 성능개선

성능 개선을 위해 수정한 사항은 크게 2가지였다. 첫번째는 생성자, 판별자에 레이어를 추가 및 변경하였다. 기존에는 2-layer ReLU를 사용했지만 4-layer LeakyReLU(0.2)를 사용했으며 생성자, 판별자의 특성에 맞게 점진적으로 유닛수를 증가/감소하도록 조절했다. 두번째 수정사항은 데이터 전처리(정규화)이다. 기존에는 mean / val = 0 / 1로 전처리했지만 이럴 경우 텐서의 값이 0 ~ 1사이의 값이 나왔다. 생성자의 마지막 레이어에 tanh를 사용하는데 이럴 경우 서로 조화를 이루지 못한다. 그래서 더 찾아본 결과 mean / val = 0.5 / 0.5 로 조절해야 하는 것을 확인했다. 이 부분에 대해서는 torchvision.transforms 의 기능을 더 찾아봐야 할 것 같다. 위의 수정사항을 적용하고 파라미터 튜닝을 거친 결과, 기존보다 훨씬 명확한 이미지를 얻을 수 있었다.

 

3. PePeGAN

3.1 학습 데이터 추가

성능 개선을 위해 학습 데이터를 기존 36개에서 846개로 대폭 증가시켰다. 일부 겹치거나 비슷한 이미지도 있었지만 현재로서는 이게 최선이었다. 크롬 브라우저의 Image Downloader 라는 플러그인을 사용하면 쉽게 대량의 이미지를 얻을 수 있다. 수차례의 하이퍼 파라미터 튜닝을 거친 결과 아래의 결과를 얻을 수 있었다. 여전히 불만족스러운 결과지만 눈과 입이 나타났다는 큰 소득이 있었다. 구글링을 해본 결과, GAN으로 생성한 이미지가 원본 이미지들보다 다소 어둡게 나오는 경우가 많이 있는 것 같다. 아마 이 부분까지 해결하려면 학습 데이터도 더 세밀하게 수집하고 모델도 다른 장치를 추가해야 할 것 같다.

눈, 입이 나타났다...!
epoch이 증가하면서 조금씩 달라지는 모습

4. 마치며

대략 2주의 기간동안 vanilla GAN 구조로 데이터 수집, 학습, 최적화까지 직접 진행해보았다. 비록 결과가 완벽하지는 않지만 생성모델에 입문할 수 있는 좋은 기회였다. 최적화를 진행하며 생성된 이미지를 보면서 일차적으로 생성된 이미지를 다시 입력으로 사용할 수 있지 않을까라는 생각했는데 이게 결국 diffusion이 아닐까라는 생각이 들었다. GAN을 여러번 중첩하면서 거기에 마코브 특성을 결합한게 현재의 diffusion과 유사한 개념이 아닐까... 다른 프로젝트를 위해 PePeGAN은 한동안 보류할 예정이지만 완벽하게 선명한 페페를 얻을때까지 다른 생성모델들도 자세히 공부하면서 진행하도록 하겠다.

'3. Dev > 모델 구현' 카테고리의 다른 글

Pytorch로 U-Net 구현하기  (0) 2023.02.12
Pytorch로 GAN 구현하기  (1) 2023.01.24
Pytorch로 Transformer 구현하기  (0) 2022.10.20
Pytorch로 DQN 구현하기(+ 팩맨)  (0) 2022.10.20

1. 목표

간단한 생성 모델(GAN)을 직접 구현해보기

https://github.com/Hyunmok-Park/PePe_GAN

 

GitHub - Hyunmok-Park/PePe_GAN

Contribute to Hyunmok-Park/PePe_GAN development by creating an account on GitHub.

github.com

2. 모델 소개

2.1 데이터셋

데이터셋은 유명한 개구리 페페 사진을 사용했다.

ㅋㅋㅋ

학습 데이터는 구글링을 해서 36개의 페페 얼굴 이미지를 사용했으며 결과에서 다시 언급하겠지만 너무 적었던게 아닌가 생각된다. 데이터 로딩은 torchvision.datasets.dset 라이브러리를 그대로 사용했다. 최상위 경로를 지정해주고 그 안에 클래스별로 폴더를 만들어주어 각 폴더에 이미지만 저장해두면 알아서 이미지를 텐서로 변환해주고 각 클래스에 맞게 라벨링까지 로딩해준다. 이미지의 크기를 통일해주기 위해서 resize, centercrop을 추가해주었고 마지막에 정규화까지 추가해주었다. 정규화의 파라미터는 나중에 수정해주어야 할 것 같다.

import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader


def ImageLoader(data_path='dataset', batch_size=4, img_size=36):
    dataset = dset.ImageFolder(root=data_path,
                               transform=transforms.Compose([
                                   transforms.Resize(img_size),
                                   transforms.CenterCrop(img_size),
                                   transforms.ToTensor(),
                                   transforms.Normalize(0, 1),
                               ]))
    data_loader = DataLoader(dataset=dataset, batch_size=batch_size)
    return data_loader

2.2 생성자

모델은 매우 간단한 2-layer 구조를 사용했다.

from torch import nn

class Generator(nn.Module):
    def __init__(self, img_size=128 * 128 * 3, hidden_dim=512, latent_dim=64):
        super().__init__()

        self.latent_dim = latent_dim
        self.img_size = img_size
        self.hidden_dim = hidden_dim

        self.generator = nn.Sequential(
            nn.Linear(self.latent_dim, self.hidden_dim),
            nn.ReLU(),
            nn.Linear(self.hidden_dim, self.hidden_dim),
            nn.ReLU(),
            nn.Linear(self.hidden_dim, self.img_size * self.img_size * 3),
            nn.Tanh()
        )

    def forward(self, z):
        x = self.generator(z)
        return x

2.3 판별자

판별자도 마찬가지로 2-layer 구조로 간단하게 작성했다. 단순하게 FCN을 사용해도 무방하지는 모르겠지만 나중에 성능이 잘 안나오면 CNN 구조를 사용하는 것도 고려 중이다.

from torch import nn

class Discriminator(nn.Module):
    def __init__(self, img_size=128 * 128 * 3, hidden_dim=512):
        super().__init__()

        self.img_size = img_size
        self.hidden_dim = hidden_dim

        self.discriminator = nn.Sequential(
            nn.Linear(self.img_size * self.img_size * 3, self.hidden_dim),
            nn.ReLU(),
            nn.Linear(self.hidden_dim, self.hidden_dim),
            nn.ReLU(),
            nn.Linear(self.hidden_dim, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.discriminator(x)
        return x

2.4 GAN

위에서 만들어준 생성자, 판별자 클래스를 불러와서 모델을 구성하는 형태로 코드를 나눠주었다. 클래스 명은 PePe_GAN으로 했으며 하위에 생성자 학습, 판별자 학습, 이미지 샘플링 메소드를 추가했다. 디자인 패턴에 대한 공부를 확실히 해야할 것 같다...

 

생성자 학습에서는 노이즈(z)를 만들어서 생성자를 통해 가짜 이미지(fake images)를 만들어준다. 해당 이미지들은 판별자를 속이기 위한 데이터이기 때문에 모두 리얼 이미지라는 라벨링(1)을 부여한 후에 판별자에게 이미지만 넘겨준다. 그 후에 판별자가 분류한 결과와 가짜 라벨링이 유사해지도록 생성자 학습이 진행된다.

 

판별자 학습에서는 리얼 이미지(페페)와 가짜 이미지를 모두 사용한다. 이번에는 리얼 이미지에는 1, 가짜 이미지에는 0으로 제대로 라벨링을 부여한 후에 판별자가 이대로 분류할 수 있도록 학습이 진행된다.

import torch
from torch import nn
from torch import optim

from net import Discriminator
from net import Generator

class PePe_GAN(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.img_size = config['img_size']
        self.hidden_dim = config['hidden_dim']
        self.latent_dim = config['latent_dim']
        self.learning_rate = config['learning_rate']
        self.device = config['device']

        self.generator = Generator(img_size=self.img_size, hidden_dim=self.hidden_dim, latent_dim=self.latent_dim)
        self.discriminator = Discriminator(img_size=self.img_size, hidden_dim=self.hidden_dim)

        self.criterion = nn.BCELoss()
        self.generator_optimizer = optim.Adam(params=self.generator.parameters(), lr=self.learning_rate)
        self.discriminatorr_optimizer = optim.Adam(params=self.discriminator.parameters(), lr=self.learning_rate)

    def update_generator(self, batch_size):
        self.generator_optimizer.zero_grad()
        self.discriminatorr_optimizer.zero_grad()

        z = torch.randn(batch_size, self.latent_dim).to(self.device)
        fake_images = self.generator(z)
        all_true_labels = torch.ones([batch_size, 1], dtype=torch.float32).to(self.device)

        generator_loss = self.criterion(self.discriminator(fake_images), all_true_labels) / 100
        generator_loss.backward()
        self.generator_optimizer.step()
        return generator_loss

    def update_discriminator(self, real_images, batch_size):
        self.generator_optimizer.zero_grad()
        self.discriminatorr_optimizer.zero_grad()

        z = torch.randn(batch_size, self.latent_dim).to(self.device)
        fake_images = self.generator(z)
        all_true_labels = torch.ones([real_images.size(0), 1], dtype=torch.float32).to(self.device)
        all_false_labels = torch.zeros([fake_images.size(0), 1], dtype=torch.float32).to(self.device)

        real_images_loss = self.criterion(self.discriminator(real_images), all_true_labels)
        fake_images_loss = self.criterion(self.discriminator(fake_images), all_false_labels)
        discriminator_loss = real_images_loss + fake_images_loss
        discriminator_loss.backward()
        self.discriminatorr_optimizer.step()
        return discriminator_loss

    def generate_image(self):
        z = torch.randn(1, self.latent_dim).to(self.device)
        fake_images = self.generator(z)
        return fake_images

2.5 메인 코드

학습 과정, 결과와 연관된 부분은 모두 여기서 관리하도록 작성했다. 생성자, 판별자의 학습 곡선, 실제 모델에 입력된 이미지(확인을 위해), 테스트, 모델 저장 등을 여기서 관리한다. 모델은 어떤 시점을 저장하는 것이 적절한지 아직 판단이 안되서 마지막 epoch과 생성자 기준으로 오류가 가장 적을때의 모델을 저장했다. 결과적으로는 마지막 모델을 사용하는 것이 이미지가 그나마 괜찮았다. 모델 하이퍼 파라미터는 모두 config 형태로 관리할 예정이고 나중에 WandB도 사용해서 최적화를 할 예정이기 때문에 이렇게 작성했다.

from datetime import datetime
import os

import torch
from tqdm import tqdm
import numpy as np

from data_factory.data_loader import ImageLoader
from model.gan import PePe_GAN
from torchvision.utils import save_image

def train(config):
    EXP_NAME = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    os.makedirs(f"result/{EXP_NAME}")
    os.makedirs(f"result/{EXP_NAME}/train_image")
    os.makedirs(f"result/{EXP_NAME}/test_image")

    data_loader = ImageLoader(config['data_path'], config['batch_size'], config['img_size'])
    model = PePe_GAN(config).to(config['device'])
    g_loss_hist = []
    d_loss_hist = []
    min_g_loss = np.inf
    min_d_loss = np.inf

    for epoch in tqdm(range(config['epoch']), desc='EPOCH'):
        gloss = []
        dloss = []
        for batch, (real_image, label) in enumerate(data_loader):
            for idx, img in enumerate(real_image):
                save_image(img, f'result/{EXP_NAME}/train_image/{epoch}_{batch}_{idx}.png')

            inputs = real_image.view(-1, config['img_size'] * config['img_size'] * 3).to(config['device'])

            g_loss = model.update_generator(config['batch_size'])
            d_loss = model.update_discriminator(inputs, config['batch_size'])
            g_loss_hist.append(g_loss.detach().item())
            d_loss_hist.append(d_loss.detach().item())
            gloss.append(g_loss.detach().item())
            dloss.append(d_loss.detach().item())
            mean_gloss = np.mean(gloss)
            mean_dloss = np.mean(dloss)

            if min_g_loss > mean_gloss:
                min_g_loss = mean_gloss
                torch.save(model.state_dict(), f'result/{EXP_NAME}/checkpoint.pth')
    torch.save(model.state_dict(), f'result/{EXP_NAME}/last_checkpoint.pth')

    model.load_state_dict(torch.load(f'result/{EXP_NAME}/checkpoint.pth'))
    for i in range(10):
        img = model.generate_image()
        img = img.detach().view(3, config['img_size'], config['img_size'])
        save_image(img, f'result/{EXP_NAME}/test_image/{i}.png')

    model.load_state_dict(torch.load(f'result/{EXP_NAME}/last_checkpoint.pth'))
    for i in range(10):
        img = model.generate_image()
        img = img.detach().view(3, config['img_size'], config['img_size'])
        save_image(img, f'result/{EXP_NAME}/test_image/last_{i}.png')

    import matplotlib.pyplot as plt
    f, ax = plt.subplots(1, 1, figsize = (20, 4))
    ax.plot(g_loss_hist, color='blue')
    ax2 = ax.twinx()
    ax2.plot(d_loss_hist, color='red')
    plt.savefig(f'result/{EXP_NAME}/loss.png')

if __name__ == '__main__':
    config = {}
    config['latent_dim'] = 32
    config['hidden_dim'] = 64
    config['img_size'] = 128

    config['data_path'] = 'dataset'

    config['batch_size'] = 4
    config['learning_rate'] = 0.001
    config['epoch'] = 100

    config['device'] = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    train(config)

3. 결과

처음 테스트 결과는 완전히 노이즈에 가까운 이미지를 얻었다.

조금 보이는 것 같기도..?
학습 곡선(빨: 생성자, 파: 판별자)

몇번의 파라미터 튜닝을 거친 결과 그나마 괜찮은 이미지를 얻을 수 있었다.

학습 곡선(빨: 생성자, 파: 판별자)

4. 정리

페페 이미지를 사용해서 간단한 GAN모델에 입문해보았다. 간단한 모델을 사용하고 학습 데이터를 겨우 36개 사용한거 치고는 괜찮은 결과인가 싶기도 했다. 결과에 대한 자세한 해석을 해봐야 알겠지만 학습 곡선만 보았을때는 명확한 차이를 확인하기 어려웠다. 다음에는 mnist 데이터를 사용해서 우선 모델의 학습 능력을 확인할 필요도 있어보인다. mnist 에서도 안되면 모델을 수정한 후에 페페 이미지를 더 확보해서 학습하면 성능이 더 좋아지지 않을까 생각해본다.

소개

  • Attention is all you need 라는 제목으로 엄청난 영향력을 끼쳤던 transformer 모델을 pytorch 를 사용하여 직접 구현하고 학습까지 진행해서 어떤 결과를 볼 수 있는지 확인해보았다.
  • 원본 눈문 https://arxiv.org/pdf/1706.03762.pdf
  • 코드 구현 깃허브 링크
 

GitHub - Hyunmok-Park/My_Torch_Transformer

Contribute to Hyunmok-Park/My_Torch_Transformer development by creating an account on GitHub.

github.com

 

'3. Dev > 모델 구현' 카테고리의 다른 글

Pytorch로 U-Net 구현하기  (0) 2023.02.12
Pytorch로 GAN 구현하기(+ mnist 데이터)  (0) 2023.01.27
Pytorch로 GAN 구현하기  (1) 2023.01.24
Pytorch로 DQN 구현하기(+ 팩맨)  (0) 2022.10.20

소개

  • 강화학습과 딥러닝을 결합하는 뼈대를 제공한 DQN 모델로 간단한 그리드 월드에서 열매(?)를 찾아가는 법을 학습시키는 모델을 만들어보았다.
  • Github 링크
 

GitHub - Hyunmok-Park/RL_snake

Contribute to Hyunmok-Park/RL_snake development by creating an account on GitHub.

github.com

모델 소개

  • 환경 : 그리드 월드
  • state : (현재 팩맨의 위치, 열매위치)
  • 보상 : 기본 이동시 -1, 열매를 획득한다면 +1
  • 행동 : 4방향 이동

규칙

  • 모델 초기에 그리드 월드의 가로, 세로 길이를 지정한다.
  • 팩맨은 게임 시작시에 (0,0) 위치에서 출발하며, 열매는 랜덤한 위치에 1개 형성된다.
  • 팩맨은 매 단계에서 4방향 중 한가지 방향을 선택해서 이동하며 열매를 획득한다면 +1 보상, 그렇지 못하면 -1 보상을 획득한다.
  • 추가로 장애물, 몸의 길이가 늘어나는 규칙도 추가할 예정

모델환경

  • 환경 클래스는 액션을 받아서 환경을 업데이트하고 그에 따른 보상을 리턴한다.
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np

class myenv():
    def __init__(self, height=10, width=10):
        self.height = height
        self.width = width
        self.x = 0
        self.y = 0
        self.dx = [1, 0, -1, 0]
        self.dy = [0, 1, 0, -1]

        self.food_x = None
        self.food_y = None

        self.create_food()

    def reset(self):
        self.x = 0
        self.y = 0
        self.create_food()
        return self.x, self.y, self.food_x, self.food_y

    def step(self, action):

        if action == 0: #right
            if self.x == (self.width - 1):
                pass
            else:
                self.x = self.x + 1
        elif action == 1: #up
            if self.y == 0:
                pass
            else:
                self.y = self.y - 1
        elif action == 2: #left
            if self.x == 0:
                pass
            else:
                self.x = self.x - 1
        elif action == 3 : #down
            if self.y == (self.height - 1):
                pass
            else:
                self.y = self.y + 1

        if self.x == self.food_x and self.y == self.food_y:
            reward = 1
            done = True
        else:
            done = False
            reward = -1

        return (self.x, self.y, self.food_x, self.food_y), reward, done

    def create_food(self):
        done = False
        while not done:
            x = np.random.choice([i for i in range(self.width)])
            y = np.random.choice([i for i in range(self.height)])

            if x == self.food_x and y == self.food_y:
                done = False
            else:
                done = True

        self.food_x = x
        self.food_y = y

    def draw_world(self):
        return 0
  • Qnet
    • 간단한 MLP구조로 출력으로 4가지 action에 대한 value 값을 리턴한다.
import torch
import torch.nn as nn
import torch.nn.functional as F

import random

class Qnet(nn.Module):
    def __init__(self, hidden_dim):
        super(Qnet, self).__init__()

        self.nn = nn.Sequential(
            nn.Linear(4, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 4)
        )

    def forward(self, state):
        return self.nn(state)

    def sample_action(self, state, eps):
        action = self.forward(state)
        if random.random() < eps:
            return random.choice([i for i in range(4)])
        else:
            return action.argmax().item()
  • Buffer
    • DQN에서 사용하는 replay buffer로 간단한 deque를 사용해서 최대 10000개, 최소 2000개의 transition 데이터를 사용해 학습한다.
import collections
import random

import torch

class ReplayBuffer():
    def __init__(self, buffer_limit):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self.buffer.append(transition)

    def sample(self, num_sample):
        mini_batch = random.sample(self.buffer, num_sample)
        s_list = []
        a_list = []
        r_list = []
        next_s_list = []
        done_list = []

        for tran in mini_batch:
            s_list.append(tran[0])
            a_list.append([tran[1]])
            r_list.append([tran[2]])
            next_s_list.append(tran[3])
            done_list.append([tran[4]])

        s_list = torch.tensor(s_list, dtype=torch.float)
        a_list = torch.tensor(a_list)
        r_list = torch.tensor(r_list)
        next_s_list = torch.tensor(next_s_list, dtype=torch.float)
        done_list = torch.tensor(done_list)

        return s_list, a_list, r_list, next_s_list, done_list

    def len(self):
        return len(self.buffer)
  • 메인 학습
    • 학습 및 테스트를 진행하는 코드
    • DQN은 offline 학습 구조를 따르기 때문에 q, q_target 이라는 2개의 별도의 qnet을 형성한다.
    • 이 모델에서는 20번의 에피소드가 끝날때마다 q_target 을 업데이트해주었다.
    • 감마값은 0.98로 설정
import torch
import torch.nn.functional as F
import torch.optim

import numpy as np
from tqdm import tqdm

from net import Qnet
from grid_world import myenv
from buffer import ReplayBuffer

def main():
    device = torch.device('mps')

    q = Qnet(128)
    q_target = Qnet(128)
    env = myenv(10, 10)
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer(buffer_limit=10000)

    update_interval = 20
    optimizer = torch.optim.Adam(q.parameters(), lr=0.001)

    for n_epi in tqdm(range(10000), desc="n_epi"):
        eps = max(0.3, 0.9 - 0.01 * (n_epi / 200))
        s = env.reset() #(x,y,f_x,f_y)
        done = False
        score = 0

        while not done:
            a = q.sample_action(torch.from_numpy(np.array(s)).float(), eps)
            next_s, reward, done = env.step(a)
            done_mask = 0.0 if done else 1.0
            memory.put((s, a, reward, next_s, done_mask))
            s = next_s
            score += reward
            if done:
                break

        if memory.len() > 2000:
            train(q, q_target, memory, optimizer)

        if n_epi % update_interval == 0:
            q_target.load_state_dict(q.state_dict())
            score = 0

    torch.save(q_target.state_dict(), "qnet")

    x, y, f_x, f_y = env.reset()
    q_target.eval()
    print(x, y, f_x, f_y)
    while True:
        action = q_target(torch.tensor([x, y, f_x, f_y]).float()).argmax().item()
        next_s, reward, done = env.step(action)
        print(action, next_s)
        x, y = next_s[0], next_s[1]
        if done:
            break

def train(q, q_target, memory, opt):
    for i in range(10):
        s_list, a_list, r_list, next_s_list, done_list = memory.sample(32)
        q_out = q(s_list)
        q_a = q_out.gather(1, a_list)
        max_q_prime = q_target(next_s_list).max(1)[0].unsqueeze(1)
        target = r_list + 0.98 * max_q_prime * done_list
        loss = F.smooth_l1_loss(q_a, target)

        opt.zero_grad()
        loss.backward()
        opt.step()

if __name__ == '__main__':
    main()

결과

  • 테스트 결과 (0,0)에서 출발한 팩맨이 (2,9) 위치의 열매를 찾기 위해 최단경로로 이동하는 것을 확인했다.
    • 첫째줄 : (팩맨_x, 팩맨_y, 열매_x, 열매_y)
    • 둘째줄부터 : action, (팩맨_x, 팩맨_y, 열매_x, 열매_y)
0 0 2 9
3 (0, 1, 2, 9)
3 (0, 2, 2, 9)
3 (0, 3, 2, 9)
3 (0, 4, 2, 9)
3 (0, 5, 2, 9)
3 (0, 6, 2, 9)
3 (0, 7, 2, 9)
0 (1, 7, 2, 9)
0 (2, 7, 2, 9)
3 (2, 8, 2, 9)
3 (2, 9, 2, 9)

근데 이렇게 만들면 안될수가 없다... 좌표값을 그대로 주는 것이 아니라 이미지 인식으로 해야한다.

'3. Dev > 모델 구현' 카테고리의 다른 글

Pytorch로 U-Net 구현하기  (0) 2023.02.12
Pytorch로 GAN 구현하기(+ mnist 데이터)  (0) 2023.01.27
Pytorch로 GAN 구현하기  (1) 2023.01.24
Pytorch로 Transformer 구현하기  (0) 2022.10.20

+ Recent posts