소개
Pytorch 패키지를 사용해 간단한 mnist classification CNN 모델을 만들었다. 그리고 HPO 기법 중 하나인 BOHB를 사용해서 최적의 hyperparameter를 찾아내는 예시 코드를 작성했다.
깃허브링크
GitHub - Hyunmok-Park/Pytorch_BOHB_mnist
Contribute to Hyunmok-Park/Pytorch_BOHB_mnist development by creating an account on GitHub.
github.com
모델
CNN
간단하게 2개의 filter를 적용했고, crossEntropyLoss를 사용했다.
import math
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
class cnn(nn.Module):
def __init__(self):
super(cnn, self).__init__()
self.conv1 = nn.Conv2d(in_channels=1, out_channels=3, kernel_size=5, stride=1)
self.conv2 = nn.Conv2d(in_channels=3, out_channels=10, kernel_size=5, stride=1)
self.fc1 = nn.Linear(10 * 4 * 4, 50)
self.fc2 = nn.Linear(50, 10)
self.loss_function = nn.CrossEntropyLoss()
def forward(self, input, target):
x = self.conv1(input)
x = F.relu(x)
x = F.max_pool2d(x, kernel_size=2, stride=2)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, kernel_size=2, stride=2)
x = x.view(-1, 10 * 4 * 4)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
loss = self.loss_function(x, target)
return x, loss
Dataloader
Pytorch 라이브러리를 사용하기 위해 dataloader 클래스를 작성해서 사용했고, 데이터는 mnist 데이터를 사용했다.
import os
import glob
import time
import torch
import pickle
import numpy as np
from collections import defaultdict
from struct import *
from torch.utils.data import Dataset
class DataSet(Dataset):
def __init__(self, split='train'):
if split == "train":
fp_image = open('dataset/train-images-idx3-ubyte','rb')
fp_label = open('dataset/train-labels-idx1-ubyte','rb')
else:
fp_image = open('dataset/t10k-images-idx3-ubyte','rb')
fp_label = open('dataset/t10k-labels-idx1-ubyte','rb')
s = fp_image.read(16)
l = fp_label.read(8)
k = 0
lbl = [ [],[],[],[],[],[],[],[],[],[] ]
# print("LOADING DATA")
while True:
s = fp_image.read(784) #784바이트씩 읽음
l = fp_label.read(1) #1바이트씩 읽음
if not s:
break;
if not l:
break;
index = int(l[0])
#unpack
img = np.reshape(unpack(len(s)*'B',s), (28,28))
lbl[index].append(img) #각 숫자영역별로 해당이미지를 추가
self.data_list = []
if split == "train":
for label, datas in enumerate(lbl):
for _ in range(10):
data = {}
data['input'] = datas[_]
data['target'] = label
self.data_list.append(data)
k += 1
else:
for label, datas in enumerate(lbl):
for _ in range(1):
data = {}
data['input'] = datas[_]
data['target'] = label
self.data_list.append(data)
k += 1
self.num_data = k
# print("DATALOAD COMPLETE")
# print(len(self.data_list))
def __getitem__(self, index):
return self.data_list[index]
def __len__(self):
return self.num_data
def collate_fn(self, batch):
data = {}
data['input'] = torch.from_numpy(np.concatenate([[bch['input']] for bch in batch], axis=0).reshape(-1,1,28,28)).float()
data['target'] = torch.from_numpy(np.concatenate([[bch['target']] for bch in batch], axis=0))
return data
메인 학습 함수
모델, 데이터를 읽어오고 학습의 전과정을 관리하는 클래스. BOHB에서 해당 클래스를 objective로 삼아서 최적화를 진행한다.
import numpy as np
from tqdm import tqdm
import time
import torch
import torch.nn as nn
import torch.utils.data
import torch.optim as optim
from dataset.dataset import *
from model.net import *
class NeuralNetworkRunner():
def __init__(self):
self.config = 0
def train(self, lr, batch_size, epoch):
self.lr = lr
self.epoch = epoch
self.shuffle = True
self.batch_size = batch_size
TIK = time.time()
torch.cuda.empty_cache()
train_dataset = DataSet(split='train')
val_dataset = DataSet(split='val')
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=self.batch_size,
shuffle=self.shuffle,
collate_fn=train_dataset.collate_fn,
drop_last=False)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=1,
shuffle=False,
collate_fn=val_dataset.collate_fn,
drop_last=False)
# create models
# device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda:0')
model = cnn()
model.to(device)
# create optimizer
params = filter(lambda p: p.requires_grad, model.parameters())
optimizer = optim.Adam(params, lr=self.lr)
# reset gradient
optimizer.zero_grad()
model.train()
train_loss = []
val_loss = []
# for epo in tqdm(range(self.epoch), desc="EPOCH"):
for epo in range(self.epoch):
for idx, data in enumerate(train_loader):
optimizer.zero_grad()
x, t_loss = model(data['input'].to(device), data['target'].to(device))
t_loss.backward()
optimizer.step()
train_loss.append(t_loss.item())
model.eval()
for idx, data in enumerate(val_loader):
x, v_loss = model(data['input'].to(device), data['target'].to(device))
val_loss.append(v_loss.item())
mean_train_loss = np.mean(train_loss)
mean_val_loss = np.mean(val_loss)
print("lr : {} / batch_size : {} / epoch : {} / loss : {}--{}".format(self.lr, self.batch_size, self.epoch, mean_train_loss, mean_val_loss))
return mean_val_loss
if __name__ == "__main__":
model = NeuralNetworkRunner().train()
BOHB
evaluate 함수에 아까 작성했던 메인함수를 넣어주면 BOHB 라이브러리가 지정해놓은 config space 내에서 최적의 hyperparameter 조합을 찾아준다.
from bohb import BOHB
import bohb.configspace as cs
import os
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
from runner import *
def evaluate(params, budget):
loss = NeuralNetworkRunner().train(**params, epoch=budget)
return loss
if __name__ == '__main__':
lr = cs.CategoricalHyperparameter('lr', [0.001, 0.01, 0.1])
batch_size = cs.CategoricalHyperparameter('batch_size', [1, 2, 4])
configspace = cs.ConfigurationSpace([lr, batch_size])
opt = BOHB(configspace, evaluate, max_budget=100, min_budget=1)
logs = opt.optimize()
print(logs)
결과
실행 결과 수 차례의 학습과 브라켓을 진행한 후 최적의 hyperparameter값을 알려준다.