1. 개요

지난 글에 이어서 이번에는 kubeflow를 사용해서 데이터 전처리, 모델 학습, 평가까지의 파이프라인을 작성, 구동해보았다. 파이썬에는 kubeflow pipeline 작성을 위해 kfp 라는 라이브러리가 있는데 이를 활용해서 진행했다. 

2. 사용환경

OS: 윈도우 11

3. 파이프라인

Kubeflow에서 파이프라인을 작성하는 방법에는 크게 2가지 정도의 방향이 있다. 첫번째는 도커 이미지 단위, 두 번째는 함수 단위로 컴포넌트를 구성해서 파이프라인을 이어주는 방법이다. 첫 번째 방법은 kfp 라이브러리의 dsl 에서 ContainerOp 라는 함수를 사용해서 형성할 수 있다.

3.1 docker recret

kubeflow 파이프라인을 작성하는 방법은 여러가지가 있지만 보통 이미지 방식을 선택하는 것 같았다. 그래서 나도 이미지 기반의 파이프라인을 선택했으며 이때 kubernetes 에서 이미지를 당겨오려면 이미지 저장소에 인증할 정보를 미리 저장해두어야 한다. 여기서는 docker recret이라는 이름의 인증 정보를 미리 형성하여 개인 도커 허브 로그인 정보를 저장해두었다. 쿠버네티스를 시작한 후에 파이프라인이 동작할 네임 스페이스에서 시크릿을 형성해주면 된다.

kubectl create secret docker-registry --docker-server=https://index.docker.io/v1/ --docker-username={아이디} --docker-password={비밀번호} --docker-email={이메일}

위의 커맨드를 입력하면 docker secret이 형성된다. 비밀번호는 도커에 로그인할때 입력하는 실제 비밀번호가 아니라 개인 설정/security 에서 토큰을 발급받은 뒤에 해당 토큰을 입력해야 한다. 깃허브처럼 토큰을 형성하고 개인적으로 저장해두지 않으며 나중에 토큰을 다시 형성하는 번거로움이 있으니까 미리 저장해두자. 시크릿을 형성한 후에는 혹시 모르니까 제대로 시크릿이 형성되었는지 확인해야 한다.

 

3.2 Image based component

 data_op = dsl.ContainerOp(
    	name="example component",
        image="example_image/latest"
    ).add_volume(
    	k8s_client.V1Volume(
        	name='data-pvc'
        )
    )

위의 예시처럼 kubeflow ui 상에서 표시할 name과 pull할 베이스 이미지를 설정해주면 해당 이미지를 가져와서 자동으로 실행해주는 방식으로 동작한다. 그리고 예시처럼 add_volume이라는 메소드를 사용하면 원하는 저장소를 마운트할 수 있으며 해당 방식으로 파이프라인 컴포넌트간의 저장소를 공유할 수 있다.

다만 위처럼 이미지 방식의 파이프라인은 kubeflow에서 권장하는 방식은 아니며, kubeflow는 재사용가능한 형태의 파이프라인을 작성하는 것을 권장한다. 사용 방식에 익숙해지는 것에 시간이 조금 걸리긴 했지만 확실히 재사용가능한 함수 형태의 파이프라인을 작성하는 것이 수정, 관리에 훨씬 편하다는 느낌을 받았다.

3.3 function based component

두 번째 방식인 함수 형태의 컴포넌트 구성은 kfp.components의 create_component_from_func 이라는 함수를 사용해서 사용할 수 있다.

def example_component(ex_arg='test'):

    from library_to_use import example
   
    result = do_something_u_want(ex_arg)
    return result
    
@dsl.pipeline(
    name='Pipeline'
) 
def pipeline():
    example_op = comp.create_component_from_func(example_component)
    example_op = data_op(ex_arg='test')
    example_op.apply(onprem.mount_pvc(pvc_name="phmpipeline", volume_name='test-phm', volume_mount_path=volume_path))

해당 함수의 사용 방식은 조금 유의할 사항이 있다. 원하는 동작을 하는 함수 내에서 모든 것을 수행해야하며 이것은 필요한 라이브러리를 import 하는 것까지 포함한다. 이때 나는 약간의 문제가 생겼었는데 내가 만든 모델을 사용할 수가 없었다는 점이었다. 내가 작성한 모델을 포함해서 모든 내가 작성한 함수를 사용하려면 파이썬 기본 경로에 내가 만든 코드들을 넣어줘야 했는데 그렇게 되면 이후 관리가 너무 힘들고 번거로울 것 같다는 생각이 들었다. 그래서 생각한 방법은 작성한 모든 코드들을 담은 이미지 파일을 하나 형성하고 해당 이미지를 불러오면서 PYTHONPATH에 추가하고 그 위에서 component가 돌도록 설정하는 것이었다. 이것도 마찬가지로 번거롭지만 딱히 떠오르는 방식이 없었다. 함수 방식의 component가 이미지를 불러오려면 아래처럼 인자만 추가해주면 되었다.

example_op = comp.create_component_from_func(example_component, base_image=BASE_IMAGE_URL)

이때 BASE_IMAGE_URL은 이미지를 당겨올 때처럼 docker pull ~~ 에서 입력하는 형태의 주소를 넣어주면 된다. 나는 도커 허브에 이미지를 업로드하여 당겨오는 방식으로 구성했으며 이미지를 빌드할때 dockerfile에서 PYTHONPATH에 코드의 경로를 추가하는 과정을 넣어주었다. 

FROM pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
ENV PYTHONUNBUFFERED 1
RUN mkdir /src
COPY src/ /src/
ENV PYTHONPATH="${PYTHONPATH}:/src"
ENV PYTHONPATH="${PYTHONPATH}:/src/model"
ENV PATH="${PATH}:/src"
ENV PATH="${PATH}:/src/model"
WORKDIR /src
RUN mkdir /src/pv
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

PYTHONPATH, PATH를 모두 사용한 이유는 그냥 이래야 작동해서 추가했다... 내가 작성한 코드는 모두 src 폴더에 들어있으며 dockerfile은 이미지를 사용할때 src 폴더를 복사한 후 환경변수에 추가, 그리고 해당 폴더로 이동까지 과정이다. 작업 경로를 src 폴더로 지정했기 때문에 위에서 create_component_from_func을 작성할 때는 해당 경로에 있다고 생각하고 작업을 작성해주면 된다. 이때 굉장히 헷갈린 점이 있었는데 dockerfile에서 지정한 작업 경로와 파이프라인을 작성할 때의 경로, 그리고 마지막으로 컴포넌트에서 작동하는 함수의 경로를 주의해야 한다는 것이다.

 

아까 위에서 파이프라인을 작성할 때 아래처럼 apply 함수를 사용해서 pvc를 마운트해주었다. 적어두진 않았지만 volume_path 는 /src/pv 인데 여기서 작업 경로가 이미 /src 이니까 /pv 만 적으면 될 것이라 생각했었다. 당연히 마운트는 상위 경로의 /pv로 지정되었고 내가 원하는 방향으로 파이프라인이 동작하지 않았다. 컴포넌트를 작성할 때는 이미지에서 설정한 부분은 생각하지 말고 작성해야 한다. 컴포넌트에서 사용할 함수는 도커 이미지에서 설정한 데로 진행하면 된다. 

example_op = comp.create_component_from_func(example_component)
example_op = data_op(ex_arg='test')
example_op.apply(onprem.mount_pvc(pvc_name="phmpipeline", volume_name='test-phm', volume_mount_path=volume_path))

이런 방식으로 파이프라인 컴포넌트를 하나 하나 구성해서 순서를 지정하면 문제없이 파이프라인이 구동하게 된다. 순서를 지정하는 방법은 아래처럼 after 라는 함수를 사용해서 이어주면 된다.

# data processing
comp1 = comp.create_component_from_func(func1, base_image=BASE_IMAGE_URL)
comp1 = comp1(args)
comp1.apply(onprem.mount_pvc(pvc_name="phmpipeline", volume_name='test-phm', volume_mount_path=volume_path))

# training
comp2 = comp.create_component_from_func(func2, base_image=BASE_IMAGE_URL)
comp2 = comp2(args)
comp2.apply(onprem.mount_pvc(pvc_name="phmpipeline", volume_name='test-phm', volume_mount_path=volume_path))
comp2.after(comp1)

3.4 매트릭 추가

나는 데이터 전처리-모델 학습-모델 테스트의 총 3단계로 파이프라인을 구성했다. 그리고 마지막 테스트 단계에서 모델 결과를 확인하기 위해 metrics를 추가해주었다. metric를 추가하는 방법은 아래와 같이 컴포넌트 함수에 인자를 추가해주는 방식으로 가능하다.

from kfp.components import OutputPath

...
def predict(mlpipeline_metrics_path: OutputPath("Metrics"), model_path)
	import json
    
    accuracy = test_model(model_path)
    
    metrics = {
        "metrics": [
            {
                "name": "Accuracy",
                "numberValue": accuracy
            }
        ]
    }

    print("saving mlpipline metrics")

    with open(mlpipeline_metrics_path, 'w') as f:
        json.dump(metrics, f)
        
@dsl.pipeline(
    name='Pipeline'
)        
def pipeline():
	...
    pred_op = comp.create_component_from_func(predict, base_image=BASE_IMAGE_URL)
    pred_op = pred_op(model_path='pv/result')
    ...

 

특이한 점은 predict 함수에는 첫 번째 인자로 OutputPath 가 들어가지만 컴포넌트를 형성할 때는 이것을 신경쓰지 않고 이후의 인자만 입맛대로 넣어주면 된다는 것이다. 이렇게 해주면 파이프라인이 끝난 뒤에 output 에 알아서 결과가 출력된다.

3.5 파이프라인

파이프라인을 모두 작성했다면 아래의 코드를 실행해서 yaml 파일로 파이프라인을 작성할 수 있다. compile 함수 내에는 아까 @dsl.pipeline 을 달아주었던 함수를 넣어주면 된다. 

kfp.compiler.Compiler().compile(pipeline,'{}.yaml'.format(experiment_name))

그리고 위처럼 파이썬 내에서 선언을 할 수도 있고 아래의 커맨드라인의 형태로도 컴파일 가능하다.

dsl-compile --py $base_dir/pipeline.py --output $base_dir/experiment_name.yaml

4. 파이프라인 실행

위의 과정을 통해 형성된 yaml 파일을 kubeflow ui 에 접속해서 업로드해주면 파이프라인의 과정을 그래프로 확인할 수 있다. 그리고 큰 문제가 없다면 expriment에서 실행을 했을때 성공하는 화면을 볼 수 있다.

추가로 마지막 predict 컴포넌트에 추가했던 metric도 정상적으로 출력되는 것을 확인할 수 있다.

1. 개요

MLOps를 공부하면서 자연스럽게 kubeflow를 사용하게 될 일이 많아졌다. 이미 구축이 되어있던 공용 서버에서 작업을 하다가 혼자 연습을 하고 싶어서 집에 있는 윈도우 데스크톱에 kubeflow 구축을 시도해 봤다. 작년에도 시도는 해봤다가 스펙 문제로 실패했었는데 주말을 바친 끝에 마침내 성공했다... 개인적인 의견이지만 kubeflow는 아직도 진입장벽이 너무 높다는 생각이 든다. 

2. 목차

설치 환경은 다음과 같다.

  • 운영체제: 윈도우 11 64bit
  • 환경: Docker, WSL2, Minikube v1.29.0
  • Kubeflow 관련 스펙: kustomize v3.8.4, kubeflow v1.4

3. Minikube 설치

1. Minikube는 윈도우 power shell을 사용해서 설치했다. 설치하기 전에 우선 보안정책부터 수정해야 한다. 윈도우 Power shell을 관리자 권한으로 실행한 후에 다음 명령어를 실행한다

Set-ExecutionPolicy AllSigned

다만 위의 명령어를 실행한 후에 power shell을 실행하면 "profile.ps1 파일이 디지털 서명되지 않았습니다." 라는 에러 메시지가 나왔다. 이럴 경우 권한을 unrestricted로 변경할 경우 위의 에러 메시지가 더 이상 나오지 않았다. 관련해서 검색해 본 결과 위의 방식이 조금 더 안전한 세팅이라고 한다.

Set-ExecutionPolicy Unrestricted

2. 이어서 minikube를 설치를 위해 윈도우 패키지 관리자 chocolatey를 설치한다. 

iex ((New-Object System.Net.WebClient).DownloadString('https://community.chocolatey.org/install.ps1'))

3. Chocolatey가 정상적으로 설치되었다면 minikube를 설치한다.

choco install minikube

4. Docker 설치

4.1 WSL2 설치

Kubeflow는 쿠버네티스를 기반으로 하기 때문에 윈도우에서 kubeflow를 사용하려면 도커를 설치해야 한다. 그런데 윈도우에서 도커를 사용하려면 Hyper-V 설정을 허용해야 하는데 해당 설정은 윈도우 프로 버전에서만 가능한 설정이라고 한다. 그리고 윈도우 11에서는 Hyper-V 설정도 없다라고 하는데 그 부분은 정확히 확인하지 못했다. 결론적으로 나는 WSL2(Windows Subsystem for Linux 2)라는 것을 사용해서 도커를 설치해야만 했다.

 

1. 우선 power shell에서 간단한 설정을 해줘야한다.

dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart

2. 그 후에 아래의 링크에서 최신 패키지를 다운로드한다.

 

이전 버전 WSL의 수동 설치 단계

wsl install 명령을 사용하지 않고 이전 버전의 Windows에 WSL을 수동으로 설치하는 방법에 대한 단계별 지침입니다.

learn.microsoft.com

3. 설치하기 전에 컴퓨터를 재부팅해주고 다운로드하였던 파일을 실행해 준다.

4. 윈도우 터미널을 실행해서 아래 명령어를 실행해 준다.

wsl --set-default-version 2

4.2 Docker desktop 설치

WLS2 설치가 끝났다면 도커 데스크톱을 설치할 차례이다. 아래의 링크에서 환경에 맞는 데스크톱을 다운로드한다.

1. https://www.docker.com/products/docker-desktop/

 

Download Docker Desktop | Docker

Docker Desktop is available to download for free on Mac, Windows, or Linux operating systems. Get started with Docker today!

www.docker.com

2. 설치가 끝났다면 도커 데스크톱을 실행하여 설정으로 이동한다. 설정에서 kubernetes 탭에서 관련된 설정을 모두 체크해 준 후에 재실행한다.

모든 설정이 제대로 되었다면 resources > advanced 탭에서 아래처럼 WSL2에 관련된 내용을 볼 수 있다.

3. 도커 설치까지 되었다면 minikube를 실행해서 제대로 실행이 되는지 확인한다.

minikube start --driver=docker --memory=16g --cpus=4 --disk-size 80GB --kubernetes-version=1.20.11 --profile=hm

메모리, CPU 등의 설정은 데스크톱 환경에 맞추어 변경해 주면 된다. 처음에 메모리를 12G를 할당했는데 무한 실행이 된 건지 kubeflow 가 실행되지 않았다. 넉넉하게 잡아주도록 하자. 이때 도커의 기본 설정에 의해서 데스크톱의 전체 메모리의 절반 정도로 사용량이 제한되어 있을텐데 .wslconfig 파일을 수정해서 제한량을 늘릴 수 있다. 

 

.wslconfig 파일은 기본적으로는 user/{사용자}에 자동으로 생성되나 없을 가능성도 있으며 나도 없어서 따로 형성을 해주었다. 메모장 파일이 아니라는 것을 주의해야 한다. 파일을 만들어줬다면 아래의 내용을 저장해 준다.

[wsl2]
memory=32GB
processors=6
swap=0

위의 설정을 저장해도 처음에는 도커에 반영이 되지 않았는데, 속 편하게 컴퓨터를 재부팅했더니 바로 반영이 되었다. 메모리까지 변경했다면 다시 minikube를 실행해 주자.

5. Kubeflow 설치

현재 kubeflow는 v1.6까지 배포되었다. 

Kubeflow를 설치하기 위해서는 kustomize라는 공식 패키지를 사용하면 된다. kustomize는 23.03.20 기준 v5.0.0이 최신 버전으로 아래의 링크에서 다운로드 가능하다. 현재 최신 버전의 kubeflow는 반드시 5.0 버전의 kustomize를 사용해야 한다. 

 

Release kustomize/v5.0.0 · kubernetes-sigs/kustomize

🎉The Kustomize team is so excited to release version 5.0! 🎉 This release is packed with exciting features and improvements. The full list is below, but here are some of our favorites: kustomize lo...

github.com

다만 무엇이 문제인지 kubeflow 설치가 제대로 되지 않아서 나는 kustomize v3.8.4, kubeflow v1.4를 사용했다. Kubeflow v1.4는 아래의 링크에서 branch를 변경하여 다운로드할 수 있다. 

 

GitHub - kubeflow/manifests: A repository for Kustomize manifests

A repository for Kustomize manifests. Contribute to kubeflow/manifests development by creating an account on GitHub.

github.com

Kubeflow는 2가지 설치 방법이 있다. 첫 번째는 싱글 커맨드로 간단하게 구성요소들을 설치하는 방법이며, 두 번째는 각 컴포넌트를 하나하나 직접 설치하는 방법이다. 당연히 두 번째 방법이 더 어렵고 숙련자를 위한 방법이기 때문에 나는 첫 번째 방법을 사용해서 설치를 진행했다. 윈도우 power shell을 관리자 권한으로 실행한 후에 manifests를 설치한 경로로 이동해서 아래의 명령어를 수행한다.

while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

그런데 위의 명령어를 그대로 입력하면 계속 에러가 발생해서 아래의 명령어로 수정했다.

while($true) {kustomize build example | kubectl apply -f -}

공식 깃허브에서도 설명하듯이 위의 명령어를 실행해도 완전하게 kubeflow가 실행되려면 꽤 시간이 걸린다. 여러 권한과 pod들이 서로 얽혀있다 보니 설치, 실행이 완료될 때까지 무한루프를 기다려야 한다...

6. Kubeflow Dashboard 접속

Kubeflow 대시보드는 포트 포워딩을 해줘야 접속이 가능하다. 윈도우 터미널에서 아래의 명령어를 실행해 준 다음에 localhost:8080으로 접속하면 대시보드를 확인할 수 있다.

kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80

7. 마치며...

Kubeflow 설치는 작년에도 거의 2주일을 넘게 시도했지만 실패했던 기억이 있다. 그래서 거의 반포기 상태로 공용 서버에서 작업하고 있었는데 연습을 하기에는 적절하지 못한 환경이라서 이번에 다시 집 데스크톱에 설치를 도전해 보았다. 작년보다 컴퓨터 스펙이 많이 좋아지기는 했지만 똑같은 과정으로 설치를 했는데 왜 이번에는 되는 건지 모르겠다... 물론 이번에도 쉽지는 않았다. 이번에도 거의 포기하고 자려다가 마지막으로 다시 시도했을 때 성공했는데 주말 하루를 바친 값을 한 것 같다. 최신 버전으로 구성하지 못한 게 아쉽지만 원래 최신 버전은 함부로 쓰는 게 아니니... 다음에는 kubeflow를 활용해서 pipeline을 구성해보고자 한다.

 

 

 

'1. Engineering > Kubeflow' 카테고리의 다른 글

[Kubeflow] 파이프라인 실행  (1) 2023.04.23
[Kubeflow] ENAS RNN pipeline  (0) 2022.10.20
[Kubeflow] Kubeflow 설치  (0) 2022.10.20
[Kubeflow] GNN 파이프라인  (0) 2022.10.20

1. WandB 란?

Weight and Bias 의 줄임말로 머신러닝 / 딥러닝 실험과정을 쉽게 관리할 수 있도록 도와주는 tool 이다. 딥러닝 연구를 하다보면 정말 많은 모델을 다루게 되고 모델에 크고 작은 변화를 주게 된다. 모델 구조의 변화 뿐만 아니라 하이퍼 파라미터, 데이터 전처리 등 여러가지 요인으로 인해서 수십 ~ 수백개의 경우의 수를 모두 비교해야하는 상황이 생긴다. 이때 보통 jupyter notebook 으로 실험결과를 시각화, 비교하는 경우가 많았는데 실험이 많아질수록 이 방법에는 한계가 존재하게 된다. 나는 보통 엑셀을 통해 실험결과를 정리하고 하이퍼 파라미터 튜닝, 시각화 코드를 직접 작성해서 일일이 비교를 진행했는데 보통 한달에 한번씩은 꼭 결과들이 꼬여서 다시 정리하는 일이 생겼고 그럴때마다 지옥을 경험했다... 지금까지는 이렇게 하는게 실험과 모델을 이해하는데 더 도움이 될 것이라고 판단했지만 그냥 미련했던 것 같다. WandB는 이러한 과정을 정말 보기 쉽고 편하게 정리해준다. 이러한 tool 들은 보통 MLOps 와 연관지어서 언급이 많이 되는데 WandB, MLflow, Kubernetes 등 다양한 툴이 존재하며 각자의 장/단점들이 명확하게 존재하기 때문에 본인의 용도에 맞춰서 사용하도록 하자. WandB는 개인적인 용도로 실험만을 관리하기에 가장 적합해보였다. 

2. WandB 시작하기

우선 wandb 홈페이지에 가서 가입을 한다. 구글, 깃허브 아이디를 사용하면 쉽게 가입할 수 있고 기관을 적으라는데... 나는 그냥 아무거나 적어서 제출했다. 가입 후 로그인을 하면 매우 친절하게 가이드를 알려주기 때문에 어렵지 않게 시작할 수 있다. 나는 jupyter notebook 을 통해 작업했는데 notebook 에서 로그인을 했더니 무언가 오류가 발생해서 그냥 pychram 터미털에서 작업을 해주었다. 어차피 코드 관리를 위해서는 pycharm 으로 연습하는게 좋아보인다.

3. 소스코드

모든 코드는 깃허브에 업로드하였고 아래에서는 주요 코드만 설명했다. 학습 관련 utils들은 링크를 통해 확인할 수 있다.

https://github.com/Hyunmok-Park/WandB_practice

 

GitHub - Hyunmok-Park/WandB_practice

Contribute to Hyunmok-Park/WandB_practice development by creating an account on GitHub.

github.com

3.1 모델(net.py)

모델은 아주 간단한 CNN을 사용했다. 코드는 pytorch 를 사용해서 작성하였고, 데이터는 torchvision 에서 제공하는 mnist 데이터를 사용했다.

from torch import nn

class ConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1, 1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, 1, 1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.layer3 = nn.Sequential(
            nn.Linear(64 * 7 * 7, 128, bias=True),
            nn.ReLU()
        )
        self.layer4 = nn.Sequential(
            nn.Linear(128, 84),
            nn.ReLU()
        )
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = x.view(x.size(0), -1)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.fc3(x)
        return x

3.2 학습함수(train_utils.py)

모델의 파라미터를 업데이트하고, wandb 라이브러리의 watch, log 함수를 사용해 학습 과정에 대한 로그를 기록할 수 있다.

import numpy as np

def train(model, train_loader, criterion, optimizer, device, config, wandb, epoch):
    model.train()
    wandb.watch(model, criterion, log="all", log_freq=10)

    loss_list = []
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        output = model(images)
        loss = criterion(output, labels)
        loss_list += [loss.detach().cpu().numpy().tolist()]

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    avg_loss = np.mean(loss_list)
    wandb.log({"train_loss": avg_loss}, step=epoch)
    print(f"TRAIN: EPOCH {epoch + 1:04d} / {config.epochs:04d} | Epoch LOSS {avg_loss:.4f}")


def vaild(model, vali_loader, criterion, device, wandb, epoch):
    model.eval()
    val_loss = []
    for data, target in vali_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        val_loss += [criterion(output, target).item()]

    val_loss = np.mean(val_loss)
    wandb.log({"valid_loss": val_loss}, step=epoch)
    print(f"VALID: LOSS {val_loss:.4f} | Accuracy {val_loss:.4f} ")

3.3 메인함수(main.py)

WandB의 과정을 시작할 수 있는 코드로 wandb 라이브러리의 sweep, agent, init 함수를 사용해서 모든 과정을 시작할 수 있다. 이때 sweep 이라는 기능이 가장 중요한 기능인데 hyperparameter tuning 을 자동화할 수 있는 기능이다. WandB는 기본적으로 yaml 형식의 입력을 통해 관리된다. wandb.init() 함수에 우리가 원하는 모델의 스펙을 dictionary 형태(yaml)로 넣어주면 해당 스펙을 일종의 primary key 로서 활용하여 기록을 시작한다. 그러면 wandb 페이지에서는 run의 형태(아래의 run 함수와는 다름)로 해당 실험을 기록한다. run과 init이 1대1 형태로 기록이 되는 것이다.(아직 입문자라서 이 부분은 확실하지 않음...)

그리고 sweep 이라는 기능은 run-init 쌍을 우리가 원하는 만큼 자동으로 돌려주는 것이다. sweep도 init 과 마찬가지로 dictionary 형태로 관리된다. 다만 init 에 들어가는 형태와는 다르게 각 하이퍼 파라미터의 범위를 지정하게 된다. 그러면 sweep 함수는 알아서 init 에 들어가게 될 하나의 config를 만들어주고 run 을 형성한다. 우리는 sweep, agent 함수를 사용해서 sweep 전용 config, init 전용 config를 넣어줄 함수(아래는 run) 을 지정해주면 알아서 모든 것을 해준다.

import torch
from torch import nn

import wandb
from tqdm import tqdm

from dataloader import Load_Dataset
from train_utils import train, vaild
from net import ConvNet
from utils import build_optimizer
from make_config import *

DEVICE = torch.device('cuda')

def run(config=None):
    wandb.init(config=config)
    w_config = wandb.config

    criterion = nn.CrossEntropyLoss()
    train_loader, vaild_loader = Load_Dataset(w_config.batch_size)
    model = ConvNet().to(DEVICE)
    optimizer = build_optimizer(model, w_config.optimizer, w_config.learning_rate)

    for epoch in tqdm(range(w_config.epochs), desc='EPOCH'):
        train(model, train_loader, criterion, optimizer, DEVICE, w_config, wandb, epoch)
        vaild(model, vaild_loader, criterion, DEVICE, wandb, epoch)

if __name__ == '__main__':
    sweep_config = sweep_config
    sweep_id = wandb.sweep(sweep_config, project="sweep_tutorial", entity='hmpark1995')
    wandb.agent(sweep_id, run)
    # default_config = hyperparameter_defaults
    # run(default_config)

3.3 config(make_config.py)

sweep, init 에 넣어줄 config를 미리 지정해둔 함수이다. 이번에는 4개의 하이퍼 파라미터를 조정했다. values 라는 이름으로 원하는 파라미터 집합을 넣어주면 모든 조합을 알아서 측정해준다. 각 key 가 어떤 동작을 해주는 것인지는 점차 알아가도록 하자. 

sweep_config = {
    'name' : 'bayes-test',
    'method': 'random',
    'metric' : {
        'name': 'valid_loss',
        'goal': 'minimize'
        },
    'parameters' : {
        'optimizer': {
            'values': ['adam', 'sgd']
            },
        'epochs': {
            'values': [3, 4]
            },
        'learning_rate': {
            'values': [0.1, 0.01]
            },
        'batch_size': {
            'values': [32, 64]
            }
        }
    }

4. 결과

단일 실험(init)을 진행했을때 결과. 내가 log에 기록했던 valid, train loss 가 step에 맞추어 기록되었으며 watch 함수를 사용하면 아래처럼 gradients, parmeters도 확인할 수 있다. 

Sweep 을 통한 하이퍼 파라미터 튜닝 결과. log에 기록한 loss는 여기서도 확인이 가능하며 sweep 을 통해 최소화하고자 했던 valid_loss에 대한 기록은 더 자세하게 확인이 가능하다. 그리고 놀랍게도 각 하이퍼 파라미터와 sweep 목표의 연관성을 자동으로 측정해준다...

 

5. 요약

이번 포스트에서는 MLOps 에서 유명한 tool 중 하나인 WandB를 입문해보았다. 내가 생각했던 기능보다도 더 많은 기능을 제공했고 사용하기도 매우 쉬웠다. 다른 MLOps tool들과 비교해 serving 에는 조금 취약하다고는 하지만 개인적인 용도로서 실험을 관리하고 추적하는 목적에서는 매우 좋은 도구라고 생각된다. 

소개

  • ENAS
    • NAS 기법 중 parameter sharing 기법을 제안한 모델로 학습시간을 획기적으로 단축시켰던 모델이다.
  • Pipeline
    • Docker, kubernetes, kubeflow를 사용하여 데이터 준비, 모델 학습, 서비스 제공까지 일련의 자동화 과정을 구축하는 것
  • 깃허브 링크
 

GitHub - Hyunmok-Park/enas_pipeline

Contribute to Hyunmok-Park/enas_pipeline development by creating an account on GitHub.

github.com

모델

파이프라인

  • 콤포넌트
    • train
      • ENAS의 controller, shared를 학습하는 과정으로 출력으로 최적의 모델을 찾아낸다.
    • re-train
      • train 단계에서 찾아낸 최적의 모델을 다시 scratch부터 학습하는 단계
    • serve
      • bentoML 라이브러리를 사용해서 재학습한 네트워크를 API 형태로 제공
    • 위의 과정은 오픈 소스를 그대로 활용
  • 파이프라인
    • 파이썬 kfp 라이브러리를 사용해서 pipeline을 형성한다.
    • 출력 tar 파일을 kubeflow에 업로드하면 pipeline을 사용할 수 있다.
    • 도커 이미지 파일은 개인 pc에 private registry를 형성해서 사용했다.
    • 도커 private registry 형성 방법은 따로 정리해두었다.
    • 도커 private registry 만들기
import kfp.dsl as dsl
from kubernetes import client as k8s_client

def TrainOp(vop):
    return dsl.ContainerOp(
        name="training pipeline",
        image="xx.xxx.xx.xx:5000/phm:0.1-enas-train",

        command = [
            "sh", "run_train_container.sh"
        ],

        pvolumes={"src/data": vop},
    ).add_pod_label("app", "enas-application")

def ReTrainOp(trainop):
    return dsl.ContainerOp(
        name="retraining pipeline",
        image="xx.xxx.xx.xx:5000/phm:0.1-enas-retrain",

        command = [
            "sh", "run_retrain_container.sh"
        ],

        pvolumes={"src/data": trainop.pvolume},
    ).add_pod_label("app", "enas-application")

def ServeOp(retrainop):
    return dsl.ContainerOp(
        name="serve pipeline",
        image="xx.xxx.xx.xx:5000/phm:0.1-enas-serve",
        command = [
            "sh", "run_serve_container.sh"
        ],
        pvolumes={"src/data": retrainop.pvolume},
    ).add_pod_label("app", "enas-application")

def VolumnOp():
    return dsl.PipelineVolume(
        pvc="phm-volume"
    )

@dsl.pipeline(
    name='enas_pipeline',
    description='Probabilistic inference with graph neural network'
)
def enas_pipeline(
):
    print('enas_pipeline')

    vop = VolumnOp()

    dsl.get_pipeline_conf().set_image_pull_secrets([k8s_client.V1LocalObjectReference(name='regcredidc')])

    train_and_eval = TrainOp(vop)

    retrain = ReTrainOp(train_and_eval)
    retrain.after(train_and_eval)

    serve = ServeOp(retrain)
    serve.after(train_and_eval)

if __name__ == '__main__':
    import kfp.compiler as compiler
    # compiler.Compiler().compile(enas_pipeline, __file__ + '.tar.gz')
    compiler.Compiler().compile(enas_pipeline, __file__ + '.yaml')

결과

  • 학습 파라미터
    • 데이터: PTB
python main.py --network_type rnn --dataset ptb --controller_optim adam --controller_lr 0.00035 --controller_max_step=10 --controller_hid=32 --shared_max_step=10 --shared_hid=32 --shared_embed=32 --shared_optim sgd --shared_lr 20.0 --entropy_coeff 0.0001 --num_blocks=4 --max_epoch=10 --derive_num_sample=5
  • 모델 학습 결과
    • controller, shared weight parameter.pth 파일
    • best performance DAG 를 담은 json 파일
{"-1": [[0, "identity"]], "-2": [[0, "identity"]], "0": [[1, "identity"], [2, "sigmoid"]], "1": [[3, "ReLU"]], "2": [[4, "avg"]], "3": [[4, "avg"]], "4": [[5, "h[t]"]]}
  • bentoML

1.데이터 생성 및 전처리

  • 기본적인 격자(grid)의 모양의 그래프만 사용
  • 무방향 확률 그래프 모델(undirected probability graphical model)
  • 데이터 생성 코드
import os import pickle import numpy as np from utils.topology import NetworkTopology, get_msg_graph from model.gt_inference import Enumerate from scipy.sparse import coo_matrix`

def mkdir(dir\_name):  
if not os.path.exists(dir\_name):  
os.makedirs(dir\_name)  
print('made directory {}'.format(dir\_name))

def main():  
num\_nodes\_I = 9  
std\_J\_A = 1/3  
std\_b\_A = 0.25  
save\_dir = './data\_temp'  
print('Generating training graphs!')

try:
    mkdir(save_dir)
    mkdir(os.path.join(save_dir, "train"))
except OSError:
    pass

for sample_id in range(100):
  seed_train = int(str(1111) + str(sample_id))
  topology = NetworkTopology(num_nodes=num_nodes_I, seed=seed_train)
  npr = np.random.RandomState(seed=seed_train)
  graph = {}
  G, W = topology.generate(topology='grid')
  J = npr.normal(0, std_J_A, size=[num_nodes_I, num_nodes_I])
  J = (J + J.transpose()) / 2.0
  J = J * W

  b = npr.normal(0, std_b_A, size=[num_nodes_I, 1])

  # Enumerate
  model = Enumerate(W, J, b)
  prob_gt = model.inference()

  graph['prob_gt'] = prob_gt # shape N x 2
  graph['J'] = coo_matrix(J)  # shape N X N
  graph['b'] = b  # shape N x 1
  graph['seed_train'] = seed_train
  graph['stdJ'] = std_J_A
  graph['stdb'] = std_b_A

  msg_node, msg_adj = get_msg_graph(G)
  msg_node, msg_adj = np.array(msg_node), np.array(msg_adj)
  idx_msg_edge = np.transpose(np.nonzero(msg_adj))
  J_msg = J[msg_node[:, 0], msg_node[:, 1]].reshape(-1, 1)

  graph['msg_node'] = msg_node
  graph['idx_msg_edge'] = idx_msg_edge
  graph['J_msg'] = J_msg

  file_name = os.path.join(save_dir, "train", 'graph_{}_nn{}_{:07d}.p'.format('grid', num_nodes_I, sample_id))
  with open(file_name, 'wb') as f:
    pickle.dump(graph, f)

if **name** == '**main**':  
main()
  • Train / val 데이터: validation 데이터는 random seed만 변경하여 생성
    • Train / val = 100개 / 10개 데이터

1.1 데이터 구조

  • 확률 그래프 모델

$$ p(\mathbf{x}) = {1\over{Z}}\ exp(\ \mathbf{b}\ \cdot\ \mathbf{x}\ +\ \mathbf{x \cdot J \cdot x} ) $$

  • 각 노드들이 -1 / 1의 state(x) 를 가지며 위의 식을 사용해 joint distribution값을 계산가능
    • 하지만 위의 식은 개별 노드들의 대한 확률값을 구하는 식이 아니며 이를 위해선 추가적인 계산 요구
      • Enumerate: 모든 경우의 수를 확률 테이블로 보관하여 개별 확률값을 계산하는 방법
    • Ising model
      • b: self-bias / J: coupling-strength / x: node state
  • Enumerate를 통해서 target value만 계산하고 J, b 만을 사용해 각 노드별 확률값을 구하는 것이 neural entwork의 목표
  • b: 각 노드별 bias 값들의 벡터
  • msg_node: 그래프의 연결상태를 노드순서로 나열한 집합
  • J_msg: msg_node에 맞춰서 각 edge의 coupling strength의 집합

2. 학습 모델 작성

2.1 Simple GNN

  • Gated Graph Neural Network(GG-NN)
    • 확인을 위해 단순히 input을 받아서 출력하는 predict 함수 작성
# gnn.py

import numpy as np  
import torch  
import torch.nn as nn

EPS = float(np.finfo(np.float32).eps)  
**all** = \['NodeGNN'\]

class NodeGNN(nn.Module):  
def **init**(self):  
""" A simplified implementation of NodeGNN """  
super(NodeGNN, self).**init**()  
self.hidden\_dim = 16  
self.num\_prop = 5  
self.aggregate\_type = 'sum'

# message function
self.msg_func = nn.Sequential(*[
  nn.Linear(2 * self.hidden_dim + 8, 64),
  nn.ReLU(),
  nn.Linear(64, self.hidden_dim)
])

# update function
self.update_func = nn.GRUCell(
  input_size=self.hidden_dim, hidden_size=self.hidden_dim)

# output function
self.output_func = nn.Sequential(*[
  nn.Linear(self.hidden_dim + 2, 64),
  nn.ReLU(),
  nn.Linear(64, 2),
])
self.loss_func = nn.KLDivLoss(reduction='batchmean')


def forward(self, J\_msg, b, msg\_node, target=None):  
num\_node = b.shape\[0\]  
num\_edge = msg\_node.shape\[0\]

edge_in = msg_node[:, 0]
edge_out = msg_node[:, 1].contiguous()

ff_in = torch.cat([b[edge_in], -b[edge_in], J_msg, -J_msg], dim=1)
ff_out = torch.cat([-b[edge_out], b[edge_out], -J_msg, J_msg], dim=1)

state = torch.zeros(num_node, self.hidden_dim).to(b.device)

def _prop(state_prev):
  # 1. compute messages
  state_in = state_prev[edge_in, :]  # shape |E| X D
  state_out = state_prev[edge_out, :]  # shape |E| X D
  msg = self.msg_func(torch.cat([state_in, ff_in, state_out, ff_out], dim=1)) # shape: |E| X D
  # 2. aggregate message
  scatter_idx = edge_out.view(-1, 1).expand(-1, self.hidden_dim)
  msg_agg = torch.zeros(num_node, self.hidden_dim).to(b.device) # shape: |V| X D
  msg_agg = msg_agg.scatter_add(0, scatter_idx, msg)
  avg_norm = torch.zeros(num_node).to(b.device).scatter_add_(0, edge_out, torch.ones(num_edge).to(b.device))
  msg_agg /= (avg_norm.view(-1, 1) + EPS)
  # 3. update state
  state_new = self.update_func(msg_agg, state_prev)  # GRU update
  return state_new

# propagation
for tt in range(self.num_prop):
  state = _prop(state)

# output
y = self.output_func(torch.cat([state, b, -b], dim=1))
y = torch.log_softmax(y, dim=1)
loss = self.loss_func(y, target)
return y, loss

def predict(self, J\_msg, b, msg\_node, prob\_gt):

J_msg = J_msg[0].long()
b = b[0].long()
msg_node = msg_node[0].long()

num_node = b.shape[0]
num_edge = msg_node.shape[0]

edge_in = msg_node[:, 0]
edge_out = msg_node[:, 1].contiguous()

ff_in = torch.cat([b[edge_in], -b[edge_in], J_msg, -J_msg], dim=1)
ff_out = torch.cat([-b[edge_out], b[edge_out], -J_msg, J_msg], dim=1)

state = torch.zeros(num_node, self.hidden_dim).to(b.device)

def _prop(state_prev):
  # 1. compute messages
  state_in = state_prev[edge_in, :]  # shape |E| X D
  state_out = state_prev[edge_out, :]  # shape |E| X D
  msg = self.msg_func(torch.cat([state_in, ff_in, state_out, ff_out], dim=1)) # shape: |E| X D
  # 2. aggregate message
  scatter_idx = edge_out.view(-1, 1).expand(-1, self.hidden_dim)
  msg_agg = torch.zeros(num_node, self.hidden_dim).to(b.device) # shape: |V| X D
  msg_agg = msg_agg.scatter_add(0, scatter_idx, msg)
  avg_norm = torch.zeros(num_node).to(b.device).scatter_add_(0, edge_out, torch.ones(num_edge).to(b.device))
  msg_agg /= (avg_norm.view(-1, 1) + EPS)
  # 3. update state
  state_new = self.update_func(msg_agg, state_prev)  # GRU update
  return state_new

# propagation
for tt in range(self.num_prop):
  state = _prop(state)

# output
res = dict()
y = self.output_func(torch.cat([state, b, -b], dim=1))
y = torch.log_softmax(y, dim=1)
loss = self.loss_func(y, prob_gt)
res["prob"] = np.exp(y.detach().cpu().numpy())
res["loss"] = loss.detach().cpu().numpy()

3. 모델 학습

  • Neural Network 학습 과정을 관리할 runner 클래스 작성
# inference\_runner.py

from **future** import (division, print\_function)  
import os  
import numpy as np  
from collections import defaultdict  
from tqdm import tqdm

import torch  
import torch.utils.data  
import torch.optim as optim  
from model.gnn import NodeGNN  
from dataset.dataloader import \*  
import bentoml

EPS = float(np.finfo(np.float32).eps)  
**all** = \['NeuralInferenceRunner'\]

class NeuralInferenceRunner(object):

def train(self):  
print("=== START TRAINING ===")  
\# create data loader  
train\_dataset = MyDataloader(split='train')  
val\_dataset = MyDataloader(split='val')

train_loader = torch.utils.data.DataLoader(
  train_dataset,
  batch_size=10,
  shuffle=True,
  collate_fn=train_dataset.collate_fn)

val_loader = torch.utils.data.DataLoader(
  val_dataset,
  batch_size=10,
  shuffle=True,
  collate_fn=val_dataset.collate_fn)

# create models
model = NodeGNN()

# create optimizer
params = filter(lambda p: p.requires_grad, model.parameters())
optimizer = optim.Adam(
    params,
    lr=0.001)

# reset gradient
optimizer.zero_grad()

#========================= Training Loop =============================#
best_val_loss = np.inf
for epoch in range(10):
  print("=== EPOCH : {} ===".format(epoch))
  # ===================== validation ============================ #
  model.eval()
  for data in tqdm(val_loader, desc="VALIDATION"):
    with torch.no_grad():
      _, loss = model(data['J_msg'], data['b'], data['msg_node'], target=data['prob_gt'])

  # ====================== training ============================= #
  model.train()
  for data in tqdm(train_loader, desc="TRAINING"):
    optimizer.zero_grad()
    _, loss = model(data['J_msg'], data['b'], data['msg_node'],  target=data['prob_gt'])
    loss.backward()
    optimizer.step()

snapshot(model, optimizer)
# bentoml.sklearn.save('simple_gnn', model)
return best_val_loss

def predict(self):
    return 0

def snapshot(model, optimizer):  
model\_snapshot = {  
"model": model.state\_dict(),  
"optimizer": optimizer.state\_dict(),  
}  
torch.save(model\_snapshot,  
os.path.join("model\_snapshot.pth"))
  • 학습
    $ python run\_exp\_local.py

4. bentoML 서비스

  • JSON file 또는 Pickle file을 입력으로 받을 수 있는 bentoML 서비스
  • bentofile.yaml 작성
# bentofile.yaml

service: "service:svc"  
labels:  
owner: bentoml-team  
stage: demo  
include:

-   "\*.py"  
    python:  
    packages:
    -   torch

-   service 작성
``` python
# service.py

import bentoml
import numpy as np
from bentoml.io import JSON

runner = bentoml.pytorch.load_runner(
    "simple_gnn:latest",
    predict_fn_name="predict"
)

svc = bentoml.Service("simple_gnn", runners=[runner])

@svc.api(input=JSON(), output=JSON())
def predict(input_arr: JSON):
    J_msg, b, msg_node, prob_gt = np.array(input_arr['J_msg']), np.array(input_arr['b']), np.array(input_arr['msg_node']), np.array(input_arr['prob_gt'])
    res = runner.run(J_msg, b, msg_node, prob_gt)
    return res
$ bentoml build
$ bentoml serve simple_gnn:latest --reload
  • 서비스 실행
    • input type을 Jsoninput으로 할 경우 import 에러가 발생하여 우선 numpy로 작성
  • 테스트
    • 입력
    {
    "J_msg":[[ 0.32481338],
           [ 0.10008402],
           [ 0.32481338],
           [-0.29213125],
           [ 0.13292147],
           [-0.29213125],
           [ 0.27904898],
           [ 0.10008402],
           [ 0.09995882],
           [ 0.18141661],
           [ 0.13292147],
           [ 0.09995882],
           [-0.14101666],
           [ 0.07256332],
           [ 0.27904898],
           [-0.14101666],
           [-0.00166374],
           [ 0.18141661],
           [ 0.01135302],
           [ 0.07256332],
           [ 0.01135302],
           [ 0.17131865],
           [-0.00166374],
           [ 0.17131865]],
    "b":[[-0.11840663],
           [-0.24949627],
           [ 0.04009551],
           [ 0.58055465],
           [-0.19123979],
           [-0.0364058 ],
           [ 0.01783222],
           [ 0.10532287],
           [-0.27045844]],
    "msg_node":[[0, 1],
           [0, 3],
           [1, 0],
           [1, 2],
           [1, 4],
           [2, 1],
           [2, 5],
           [3, 0],
           [3, 4],
           [3, 6],
           [4, 1],
           [4, 3],
           [4, 5],
           [4, 7],
           [5, 2],
           [5, 4],
           [5, 8],
           [6, 3],
           [6, 7],
           [7, 4],
           [7, 6],
           [7, 8],
           [8, 5],
           [8, 7]],
    "prob_gt":[[0.42438148, 0.57561852],
           [0.35530727, 0.64469273],
           [0.55791833, 0.44208167],
           [0.7471271 , 0.2528729 ],
           [0.41531799, 0.58468201],
           [0.51015097, 0.48984903],
           [0.55322386, 0.44677614],
           [0.52453344, 0.47546656],
           [0.37538241, 0.62461759]]
    }
    • 결과
  • { "prob": [ [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ], [ 0.49200597405433655, 0.5079939961433411 ] ], "loss": 0.02680271677672863 }

'1. Engineering > BentoML' 카테고리의 다른 글

[BentoML] 1. BentoML  (0) 2022.10.20
  1. 환경구성 및 bentoML 설치
  2. 학습 모델 작성
    1. iris-classifier
    2. 추가적인 모델 실습예정
      1. Simple GNN
  3. Service class 작성
  4. bentoML 서버 실행

1. 환경구성 및 bentoML 설치

  • 환경구성: anaconda 환경
    • env name: workspace
    • Python version: 3.6

1.1 bentoML 설치

$ conda activate workspace
$ pip install bentoml --pre

2. 학습 모델 작성

2.1 Iris-classifier

# train.py

import argparse
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
import json
import bentoml

def load_data(path):
    iris = pd.read_csv(path, sep=',')
    return iris

def get_train_test_data(iris):
    encode = LabelEncoder()

    iris['variety'] = encode.fit_transform(iris['variety'])

    train , test = train_test_split(iris, test_size=0.2, random_state=0)
    print('shape of training data : ', train.shape)
    print('shape of testing data', test.shape)

    X_train = train.drop(columns=['variety'], axis=1)
    y_train = train['variety']
    X_test = test.drop(columns=['variety'], axis=1)
    y_test = test['variety']

    return X_train, X_test, y_train, y_test

def evaluation(y_test, predict):
    print("accuarcy : ", accuracy_score(y_test, predict))

    accuracy = accuracy_score(y_test, predict)
    metrics = {
        'metrics': [{
            'name': 'accuracy-score',
            'numberValue': accuracy,
            'format': "PERCENTAGE",
        }]
    }
    with open('accuracy.json', 'w') as output:
        json.dump(accuracy, output)
    with open('accuracy.json', 'r') as data:
        print(data.readlines())

    with open('mlpipeline-metrics.json', 'w') as output:
        json.dump(metrics, output)

if __name__ == "__main__":
    argument_parser = argparse.ArgumentParser()

    argument_parser.add_argument(
        '--data',
        type=str,
        help="Input data csv"
    )

    args = argument_parser.parse_args()
    iris = args.data
    iris = load_data(iris)

    X_train, X_test, y_train, y_test = get_train_test_data(iris)

    model = LogisticRegression()
    model.fit(X_train, y_train)
    predict = model.predict(X_test)
    print('\\nevaluation : ')
    evaluation(y_test, predict)

    bentoml.sklearn.save('iris_classifier', model)

3. Service class 작성

# service.py

import numpy as np
import bentoml
from bentoml.io import NumpyNdarray

iris_clf_runner = bentoml.sklearn.load_runner("iris_classifier:latest")

svc = bentoml.Service("iris_classifier", runners=[iris_clf_runner])

@svc.api(input=NumpyNdarray(), output=NumpyNdarray())
def classify(input_series: np.ndarray) -> np.ndarray:
    print("---")
    return iris_clf_runner.run(input_series)

4. bentoML 서버 실행

  • 모델 학습
$ python train.py --data=iris.csv
  • bentoML 빌드
# bentofile.yaml

service: "service:svc"
description: "file: ./README.md"
labels:
  owner: bentoml-team
  stage: demo
include:
  - "*.py"
python:
  packages:
    - scikit-learn
    - pandas
$ bentoml build
$ bentoml serve iris-classifier:latest

테스트

  • serve 후 주소확인 후 접속: localhost:3000
  • API 확인
$ curl -X POST -H "content-type: application/json" --data "[1, 5, 4, 1]" <http://127.0.0.1:3000/classify>

'1. Engineering > BentoML' 카테고리의 다른 글

[BentoML] 2. Simple GNN with BentoML  (0) 2022.10.20
  1. NFS 설정
  2. PV, PVC, StorageClass 구성
  3. Kubeflow 설치

1. NFS 설정

  • Worker node 서버를 NFS 서버로 사용

1.1 NFS 서버 구성

  • worker node 접속 후 명령 수행
$ sudo -i 
$ apt-get update 
$ apt install nfs-common nfs-kernel-server portmap 
$ mkdir /home/share/nfs -p 
$ chmod 777 /home/share/nfs 
$ vi /etc/exports
# 내용 추가 /home/share/nfs *(rw,no_root_squash,sync,insecure,no_subtree_check) # 
$ service nfs-server restart $ systemctl status nfs-server.service 
$ showmount -e 127.0.0.1 
$ mount -t nfs 192.168.72.102:/home/share/nfs /mnt

 

  • master node 접속 후 명령 수행
$ sudo -i 
$ apt-get update
$ apt install nfs-common nfs-kernel-server portmap

2. PV, PVC, StorageClass 구성

  • Master node 접속 후 yaml 파일 생성
$ vim test.yaml

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
        name: my-storageclass
provisioner: kubernetes.io/no-provisioner
parameters:
        server: 192.168.72.102
        path: /home/share/nfs
        readOnly: "false"

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: nfs-pvc
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 1Gi

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-pc
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteMany
  nfs:
    server: 192.168.72.102
    path: /home/share/nfs

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dpl-nginx
spec:
  selector:
    matchLabels:
      app: dpl-nginx
  replicas: 2
  template:
    metadata:
      labels:
        app: dpl-nginx
    spec:
      containers:
      - name: master
        image: nginx
        ports:
        - containerPort: 80
        volumeMounts:
        - mountPath: /mnt
          name: pvc-volume
      volumes:
      - name: pvc-volume
        persistentVolumeClaim:
          claimName: nfs-pvc

$ kubectl apply -f test.yaml
$ kubectl patch storageclass my-storageclass -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
$ sudo vim /etc/kubernetes/manifests/kube-apiserver.yaml

# 내용 추가
    - --enable-admission-plugins=NodeRestriction,PodNodeSelector,DefaultStorageClass
    - --service-account-signing-key-file=/etc/kubernetes/pki/sa.key
    - --service-account-issuer=kubernetes.default.svc
#
$ kubectl taint nodes --all node-role.kubernetes.io/master-

3. Kubeflow 설치

  • Master node 접속 후 명령 수행
$ git clone https://github.com/kubeflow/manifests.git
$ cd manifests
$ git checkout tags/v1.3.1
$ wget https://github.com/kubernetes-sigs/kustomize/releases/download/v3.2.0/kustomize_3.2.0_linux_amd64 -O kustomize
$ sudo mv ./kustomize /usr/local/bin/kustomize
$ chmod 777 /usr/local/bin/kustomize
$ while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
  • 설치완료 후 실행확인
kubectl get pod -A | egrep 'NAME|^auth|^cert-manager|^istio-system|^knative-|^kubeflow'

Trouble

  • 스펙 문제인지 현재 kubeflow관련 container들이 생성되는 도중에 VM이 다운되는 문제가 발생
    • 현재 스펙:
      • 4 CPU, Memory 12GB, VDI 100GB

+ Recent posts