Post

Kubeflow for ML - Chaper 4





Chapter 4: Kubeflow Pipelines

πŸ‘€ λ³Έ ν¬μŠ€νŠΈλŠ” Kubeflow for Machine Learning 책을 발췌/μš”μ•½ν•˜λ©΄μ„œ ν•„μš”ν•œ λ‚΄μš©μ€ μΆ”κ°€ν•˜μ—¬ μž‘μ„±ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

Getting Started with Pipelines

Kubeflow Pipelines ν”Œλž«νΌμ€ λ‹€μŒμ„ ν¬ν•¨ν•˜κ³  μžˆμŠ€λΉˆλ‹€.

  • νŒŒμ΄ν”„λΌμΈμ„ 관리, μΆ”μ ν•˜κ³  μ‹€ν–‰ν•  수 μžˆλ„λ‘ ν•˜λŠ” UI
  • νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰ μŠ€μΌ€μ€„λ§μ„ μœ„ν•œ 엔진
  • Pythonμ—μ„œ νŒŒμ΄ν”„λΌμΈμ„ μ •μ˜, ꡬ좕, λ°°ν¬ν•˜κΈ° μœ„ν•œ SDK
  • SDK와 νŒŒμ΄ν”„λΌμΈ 싀행을 μœ„ν•œ λ…ΈνŠΈλΆ 지원

Building a Sipmle Pipeline in Python

Kubeflow Pipelines은 Argo둜 μ‹€ν–‰λ˜λŠ” YAML 파일둜 μ €μž₯λ©λ‹ˆλ‹€. Kubeflowμ—μ„œλŠ” Python DSL (Domain-Specific Language)을 μ΄μš©ν•΄ νŒŒμ΄ν”„λΌμΈμ„ μž‘μ„±ν•  수 μžˆμŠ΅λ‹ˆλ‹€. νŒŒμ΄ν”„λΌμΈμ€ 본질적으둜 μ»¨ν…Œμ΄λ„ˆ μ‹€ν–‰μœΌλ‘œ κ΅¬μ„±λœ κ·Έλž˜ν”„μž…λ‹ˆλ‹€. μ‹€ν–‰ν•  μ»¨ν…Œμ΄λ„ˆλ₯Ό μˆœμ„œλŒ€λ‘œ 지정할 뿐만 μ•„λ‹ˆλΌ 전체 νŒŒμ΄ν”„λΌμΈκ³Ό μ»¨ν…Œμ΄λ„ˆλ“€ 사이에 인자λ₯Ό 전달할 μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

νŒŒμ΄ν”„λΌμΈμ€ λ‹€μŒμ˜ μˆœμ„œλ‘œ μž‘μ—…ν•©λ‹ˆλ‹€.

  • μ»¨ν…Œμ΄λ„ˆ 생성 (κ°„λ‹¨ν•œ Python ν•¨μˆ˜λ‚˜ 도컀 μ»¨ν…Œμ΄λ„ˆ)
  • ν•΄λ‹Ή μ»¨ν…Œμ΄λ„ˆ 뿐만 μ•„λ‹ˆλΌ λͺ…령쀄 인수, 데이터 νƒ‘μž¬, μ»¨ν…Œμ΄λ„ˆμ— 전달할 λ³€μˆ˜ 등을 μ°Έμ‘°ν•˜λŠ” μž‘μ—…(operation) 생성
  • λ³‘λ ¬λ‘œ 진행할 μž‘μ—…κ³Ό 사전 μž‘μ—… 등을 μ •μ˜ν•˜μ—¬ μˆœμ„œλŒ€λ‘œ 지정
  • Python으둜 μ •μ˜ν•œ νŒŒμ΄ν”„λΌμΈμ„ Kubeflow Pipelines이 μ‚¬μš©ν•  수 μžˆλ„λ‘ YAML둜 컴파일

λ‹€μŒμ€ κ°„λ‹¨ν•œ νŒŒμ΄ν”„λΌμΈμ„ λ§Œλ“œλŠ” μ˜ˆμ œμž…λ‹ˆλ‹€. μš°μ„  νŒŒμ΄ν”„λΌμΈμ„ κ΅¬μ„±ν•˜λŠ” μ˜€νΌλ ˆμ΄μ…˜μ„ μƒμ„±ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from collections import namedtuple
from typing import NamedTuple

import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.components as comp

def add(a: float, b: float) -> float:
    """Calculates sum of two arguments"""
    return a+b

# ν•¨μˆ˜λ₯Ό νŒŒμ΄ν”„λΌμΈ μ˜€νΌλ ˆμ΄μ…˜μœΌλ‘œ λ³€ν™˜
add_op = comp.func_to_container_op(add)

def my_divmod(dividend: float, divisor: float) -> \
	NamedTuple("MyDivmodOutput", [("quotient", float), ("remainder", float)]):
    """Divides two numbers and calculate the quotient and remainder"""
    # μ»΄ν¬λ„ŒνŠΈ ν•¨μˆ˜ μ•ˆμ— 라이브러리 μž„ν¬νŠΈ
    import numpy as np
    
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)
    
    (quotient, remainder) = divmod_helper(dividend, divisor)
    
    divmod_output = namedtuple("MyDivmodOutput", ["quotient", "remainder"])
    return divmod_output(quotient, remainder)

# μœ„ ν•¨μˆ˜λ₯Ό νŒŒμ΄ν”„λΌμΈ μ˜€νΌλ ˆμ΄μ…˜μœΌλ‘œ λ³€ν™˜ν•˜λ©΄μ„œ 베이슀 이미지 λͺ…μ‹œ
divmod_op = comp.func_to_container_op(
    my_divmod, 
    base_image="tensorflow/tensorflow:1.14.0-py3"
)

ν•„μš”ν•œ μ˜€νΌλ ˆμ΄μ…˜μ„ λ‹€ λ§Œλ“€μ—ˆλ‹€λ©΄ 이제 νŒŒμ΄ν”„λΌμΈμ„ κ΅¬μ„±ν•˜λ©΄ λ©λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@dsl.pipeline(
	name="Calculation pipeline",
    description="A toy pipeline that performs arithmetic calculations."
)
def calc_pipeline(
	a="a",
    b="7",
    c="17",
):
    # νŒŒμ΄ν”„λΌμΈ νŒŒλΌλ―Έν„°λ₯Ό μ „λ‹¬ν•˜κ³  μ˜€νΌλ ˆμ΄μ…˜ 인자둜 μƒμˆ˜λ₯Ό 전달
    add_task = add_op(a, 4)
    
    # μœ„μ—μ„œ μƒμ„±ν•œ νƒœμŠ€ν¬μ˜ 아웃풋을 λ‹€μŒ μ˜€νΌλ ˆμ΄μ…˜μ˜ 인자둜 μ‚¬μš©
    # μœ„ μ˜€νΌλ ˆμ΄μ…˜μ€ 단일값을 λ°˜ν™˜ν•˜λ―€λ‘œ λ‹¨μˆœνžˆ `task.output`으둜 전달
    divmod_task = divmod_op(add_task.output, b)
    
    # μœ„ μ˜€νΌλ ˆμ΄μ…˜μ€ μ—¬λŸ¬ 값을 λ°˜ν™˜ν•˜λ―€λ‘œ `task.outputs["output_name"]`으둜
    # κ°’ 전달
    result_task = add_op(divmod_task.outputs["quotient"], c)

λ§ˆμ§€λ§‰μœΌλ‘œ μ‹€ν–‰κ³Ό μ‹€ν—˜μ— λŒ€ν•œ 링크λ₯Ό λ°˜ν™˜ν•˜λ„λ‘ μ‹€ν–‰μš© νŒŒμ΄ν”„λΌμΈμ„ ν΄λΌμ΄μ–ΈνŠΈμ— λ³΄λƒ…λ‹ˆλ‹€.

1
2
3
4
5
6
client = kfp.Client()
arguments = {"a": "7", "b": "8"}
client.create_run_from_pipeline_func(
    calc_pipeline, 
    arguments=arguments
)

μ˜¬λ°”λ₯΄κ²Œ 싀행이 되면 링크가 λœ¨λŠ”λ° 이λ₯Ό ν΄λ¦­ν•˜λ©΄ νŒŒμ΄ν”„λΌμΈ μ‹€ν–‰ κ²°κ³Όλ₯Ό λ³Ό 수 μžˆμŠ΅λ‹ˆλ‹€.

Pipeline execution
Pipeline execution

Storing Data Between Steps

Kubeflowμ—μ„œλŠ” μ»¨ν…Œμ΄λ„ˆλ‘œ 데이터λ₯Ό 전달할 λ•Œ 두 가지 방식을 μ§€μ›ν•©λ‹ˆλ‹€. Kubernetes ν΄λŸ¬μŠ€ν„° λ‚΄μ˜ νΌμ‹œμŠ€ν„΄νŠΈ λ³Όλ₯¨ (persistent volumes)κ³Ό S3와 같은 ν΄λΌμš°λ“œ μ €μž₯μ†Œ μ˜΅μ…˜μž…λ‹ˆλ‹€.

νΌμ‹œμŠ€ν„΄νŠΈ λ³Όλ₯¨μ€ μ €μž₯μ†Œ λ ˆμ΄μ–΄λ₯Ό μΆ”μƒν™”ν•©λ‹ˆλ‹€. νΌμ‹œμŠ€ν„΄νŠΈ λ³Όλ₯¨μ€ 벀더에 따라 ν”„λ‘œλΉ„μ €λ‹ 속도가 느리고 IO μ œν•œμ΄ μžˆμ„ 수 μžˆμŠ΅λ‹ˆλ‹€. μ €μž₯μ†Œ ν΄λž˜μŠ€λŠ” λ‹€μŒ 쀑 ν•˜λ‚˜κ°€ λ©λ‹ˆλ‹€.

  • ReadWriteOnce : ν•˜λ‚˜μ˜ λ…Έλ“œμ—μ„œ ν•΄λ‹Ή λ³Όλ₯¨μ΄ 읽기-μ“°κΈ°λ‘œ 마운트될 수 있음
  • ReadOnlyMany : λ³Όλ₯¨μ΄ λ‹€μˆ˜μ˜ λ…Έλ“œμ—μ„œ 읽기 μ „μš©μœΌλ‘œ 마운트될 수 있음
  • ReadWriteMany : λ³Όλ₯¨μ΄ λ‹€μˆ˜μ˜ λ…Έλ“œμ—μ„œ 읽기-μ“°κΈ°λ‘œ 마운트될 수 있음

Kubeflow Pipelinesμ—μ„œ VolumeOpλ₯Ό μ‚¬μš©ν•˜λ©΄ μžλ™μœΌλ‘œ κ΄€λ¦¬λ˜λŠ” νΌμ‹œμŠ€ν„΄νŠΈ λ³Όλ₯¨μ„ 생성할 수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
dvop = dsl.VolumeOp(name="create_pvc",
                    resource_name="my-pvc-2",
                    size="5Gi",
                    modes=dsl.VOLUME_MODE_RWO)

μ˜€νΌλ ˆμ΄μ…˜μ— λ³Όλ₯¨μ„ μΆ”κ°€ν•˜κ³  μ‹Άλ‹€λ©΄ add_pvolumes을 ν˜ΈμΆœν•˜μ—¬ μ‚¬μš©ν•˜λ©΄ λ©λ‹ˆλ‹€.

1
download_data_op(year).add_pvolumes({"/data_processing": dvop.volumeΒ΄})

Kubeflow의 빌트인 file_output λ©”μ»€λ‹ˆμ¦˜μ€ νŒŒμ΄ν”„λΌμΈ μ‚¬μ΄μ—μ„œ νŠΉμ • 둜컬 νŒŒμΌμ„ MinIO둜 μžλ™μœΌλ‘œ 전솑할 수 있게 ν•©λ‹ˆλ‹€. file_output을 μ‚¬μš©ν•˜μ—¬ μ»¨ν…Œμ΄λ„ˆ 내에 νŒŒμΌμ„ μ“Έ μˆ˜λ„ 있고 ContainerOp에 νŒŒλΌλ―Έν„°λ₯Ό νŠΉμ •ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
fecth = kfp.dsl.ContainerOp(
	name="download",
    image="busybox",
    command=["sh", "-c"],
    arguments=[
        "sleep 1;",
        "mkdir -p /tmp/data;",
        "wget " + data_url + 
        " -O /tmp/dat/result.csv"
    ],
    file_outputs={"downloaded", "/tmp/data"}
)

Introduction to Kubeflow Pipelines Components

Argo: the Foundation of Pipelines

Kubeflow PipelinesλŠ” Kubenetesλ₯Ό μ‚¬μš©ν•˜λŠ” μ»¨ν…Œμ΄λ„ˆ λ„€μ΄ν‹°λΈŒ μ›Œν¬ν”Œλ‘œμš° 엔진인 Argo Workflowsλ₯Ό λΉŒλ“œν•©λ‹ˆλ‹€. Kubeflowλ₯Ό μ„€μΉ˜ν•  λ•Œλ„ λͺ¨λ“  Argo μ»΄ν¬λ„ŒνŠΈκ°€ μ„€μΉ˜λ˜λŠ”λ°μš”. Kubeflow Pipelines을 μ‚¬μš©ν•˜κΈ° μœ„ν•΄ Argoλ₯Ό μ„€μΉ˜ν•  ν•„μš”λŠ” μ—†μ§€λ§Œ Argo μ»€λ§¨λ“œλΌμΈμ„ 톡해 νŒŒμ΄ν”„λΌμΈμ„ 디버깅할 λ•Œ μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. Argo μ„€μΉ˜λŠ” λ‹€μŒκ³Ό κ°™μŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
$ # Download the binary 
$ curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64

$ # Make binary executable 
$ chmod +x argo-linux-amd64

$ # Move binary to path 
$ mv ./argo-linux-amd64 ~/bin/argo

μ„€μΉ˜κ°€ μ™„λ£Œλλ‹€λ©΄ Manifestλ₯Ό μ΄μš©ν•΄ Argo Pod을 λ„μ›Œμ€λ‹ˆλ‹€.

1
2
$ kubectl create ns argo
$ kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml

이제 λ‹€μŒμ˜ λͺ…λ Ήμ–΄λ‘œ Kubeflow λ„€μž„μŠ€νŽ˜μ΄μŠ€λ‘œ νŒŒμ΄ν”„λΌμΈμ„ 보낼 수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ argo submit -n kubeflow --watch https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/hello-world.yaml

$ argo list -n kubeflowgg
NAME                STATUS      AGE   DURATION   PRIORITY
hello-world-hxkrs   Succeeded   4h    26s        0

$ argo get hello-world-hxkrs -n kubeflow
Name:                hello-world-hxkrs
Namespace:           kubeflow
ServiceAccount:      default
Status:              Succeeded
Conditions:
 PodRunning          False
 Completed           True
Created:             Sun Jul 03 16:57:04 +0900 (4 hours ago)
Started:             Sun Jul 03 16:57:04 +0900 (4 hours ago)
Finished:            Sun Jul 03 16:57:30 +0900 (4 hours ago)
Duration:            26 seconds
ResourcesDuration:   14s*cpu,4s*memory

STEP                  TEMPLATE  PODNAME            DURATION  MESSAGE
 βœ” hello-world-hxkrs  whalesay  hello-world-hxkrs  15s
 
$ argo logs hello-world-hxkrs -n kubeflow
hello-world-hxkrs: time="2022-07-03T07:57:12.005Z" level=info msg="capturing logs" argo=true
hello-world-hxkrs:  _____________
hello-world-hxkrs: < hello world >
hello-world-hxkrs:  -------------
hello-world-hxkrs:     \
hello-world-hxkrs:      \
hello-world-hxkrs:       \
hello-world-hxkrs:                     ##        .
hello-world-hxkrs:               ## ## ##       ==
hello-world-hxkrs:            ## ## ## ##      ===
hello-world-hxkrs:        /""""""""""""""""___/ ===
hello-world-hxkrs:   ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ /  ===- ~~~
hello-world-hxkrs:        \______ o          __/
hello-world-hxkrs:         \    \        __/
hello-world-hxkrs:           \____\______/

$ argo delete hello-world-hxkrs -n kubeflow
Workflow 'hello-world-hxkrs' deleted

What Kubeflow Pipelines Adds to Argo Workflow

Argoλ₯Ό 직접 μ‚¬μš©ν•˜λ €λ©΄ YAML둜 μ›Œν¬ν”Œλ‘œμš°λ₯Ό μ •μ˜ν•΄μ•Ό ν•˜κ³  μ½”λ“œλ₯Ό μ»¨ν…Œμ΄λ„ˆν™”ν•΄μ•Ό ν•˜λŠ”λ° 맀우 κΉŒλ‹€λ‘œμš΄ μž‘μ—…μž…λ‹ˆλ‹€. 반면 Kubeflow Pipelines을 μ‚¬μš©ν•˜λ©΄ Python APIλ₯Ό μ΄μš©ν•΄ νŒŒμ΄ν”„λΌμΈμ„ μ •μ˜ν•˜κ³  생성할 수 있기 λ•Œλ¬Έμ— νŽΈν•˜κ²Œ μž‘μ—…ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 더 λ‚˜μ•„κ°€ ML κ΄€λ ¨ μ»΄ν¬λ„ŒνŠΈλ₯Ό μΆ”κ°€ν•˜κΈ°μ—λ„ 훨씬 μš©μ΄ν•©λ‹ˆλ‹€.

Building a Pipeline Using Existing Images

Kubeflow Pipelines은 이미 λΉŒλ“œλœ Docker 이미지λ₯Ό μ΄μš©ν•΄ μ—¬λŸ¬ μ–Έμ–΄λ‘œ κ΅¬ν˜„λœ 것듀을 μ‹€ν–‰ν•΄ μ˜€μΌ€μŠ€νŠΈλ ˆμ΄μ…˜ν•˜λŠ” κΈ°λŠ₯을 κ°–κ³  μžˆμŠ΅λ‹ˆλ‹€.

λ˜ν•œ Python λ‚΄μ—μ„œ 직접 Kubernetes ν•¨μˆ˜λ₯Ό μ‚¬μš©ν•˜λ„λ‘ Kubernetes ν΄λΌμ΄μ–ΈνŠΈλ₯Ό μž„ν¬νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.

1
from kubernetes import client as k8s_client

그리고 νŒŒμ΄ν”„λΌμΈμ—μ„œ μ‹€ν—˜μ€ ν•œ 번만 λ§Œλ“€ 수 있기 λ•Œλ¬Έμ— 기쑴의 μ‹€ν—˜ 이름을 톡해 ν•΄λ‹Ή μ‹€ν—˜μ„ μ‚¬μš©ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

1
2
client = kfp.Client()
exp = client.get_experiment(experiment_name="mdupdate")

λ‹€μŒ μ˜ˆμ œλŠ” 이미 λΉŒλ“œλ˜μ–΄ μžˆλŠ” 이미지듀을 μ΄μš©ν•˜μ—¬ νŒŒμ΄ν”„λΌμΈμ„ μƒμ„±ν•˜λŠ” μ½”λ“œμž…λ‹ˆλ‹€. μ•„λž˜ μ½”λ“œμ—μ„œ ν”„λ¦¬λΉŒλ“œλœ μ»¨ν…Œμ΄λ„ˆλŠ” MINIO_* ν™˜κ²½ λ³€μˆ˜λ‘œ μ„€μ •λ˜μ–΄ μžˆμ–΄ add_env_variable을 ν˜ΈμΆœν•˜μ—¬ 둜컬 MinIOλ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 그리고 각 단계λ₯Ό λͺ…μ‹œν•˜κΈ° μœ„ν•΄ after λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dsl.pipeline(
    name='Recommender model update',
    description='Demonstrate usage of pipelines for multi-step model update')
def recommender_pipeline():
    # Load new data
    data = dsl.ContainerOp(
        name='updatedata',
        image='lightbend/recommender-data-update-publisher:0.2') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
    # Train the model
    train = dsl.ContainerOp(
        name='trainmodel',
        image='lightbend/ml-tf-recommender:0.1') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
    train.after(data)
    # Publish new model model
    publish = dsl.ContainerOp(
        name='publishmodel',
        image='lightbend/recommender-model-publisher:0.2') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \
      .add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS', value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \
      .add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL', value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \
      .add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL', value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))
    publish.after(train)

이 νŒŒμ΄ν”„λΌμΈμ„ μ»΄νŒŒμΌν•˜κ±°λ‚˜ λ°”λ‘œ create_run_from_pipeline_func으둜 μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
from kfp import compiler
compiler.Compiler().compile(recommender_pipeline, "pipeline.tar.gz")

run = client.run_pipeline(exp.id, "pipeline1", "pipeline.tar.gz")

Kubeflow Pipeline Components

Kubeflow Pipelines은 λ‹€λ₯Έ Kubernetes λ¦¬μ†ŒμŠ€λ‚˜ dataprocκ³Ό 같은 μ™ΈλΆ€ μ˜€νΌλ ˆμ΄μ…˜μ„ μ‚¬μš©ν•˜λŠ” μ»΄ν¬λ„ŒνŠΈλ„ μ œκ³΅ν•©λ‹ˆλ‹€. Kubeflow μ»΄ν¬λ„ŒνŠΈλŠ” ML νˆ΄μ„ νŒ¨ν‚€μ§•ν•˜λŠ” λ™μ‹œμ— μ‚¬μš©ν•œ μ»¨ν…Œμ΄λ„ˆλ‚˜ CRDλ₯Ό 좔상화할 수 μžˆμŠ΅λ‹ˆλ‹€.

func_to_container 같은 μ»΄ν¬λ„ŒνŠΈλŠ” Python μ½”λ“œλ‘œ μ‚¬μš©ν•  수 있고, μ–΄λ–€ μ»΄ν¬λ„ŒνŠΈλŠ” Kubeflow의 component.yaml μ‹œμŠ€ν…œμ„ μ‚¬μš©ν•˜μ—¬ λΆˆλŸ¬μ™€μ•Ό ν•©λ‹ˆλ‹€. λ³Έ μ±…μ—μ„œ μ œμ•ˆν•˜λŠ” Kubeflow μ»΄ν¬λ„ŒνŠΈλ₯Ό μ œλŒ€λ‘œ μ‚¬μš©ν•˜λŠ” 방법은 μ €μž₯μ†Œμ˜ νŠΉμ • νƒœκ·Έλ₯Ό load_components_from_file을 μ΄μš©ν•΄ λ‹€μš΄λ‘œλ“œν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

1
2
$ wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
$ tar -xvf 0.2.5.tar.gz

이제 λ‹€μš΄λ‘œλ“œν•œ μ»΄ν¬λ„ŒνŠΈλ₯Ό 뢈러올 수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
gcs_download_component = kfp.components.load_component_from_file(
	"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)

GCS λ‹€μš΄λ‘œλ“œ μ»΄ν¬λ„ŒνŠΈλŠ” Google Cloud Storage 경둜λ₯Ό μž…λ ₯ν•˜μ—¬ νŒŒμΌμ„ λ‹€μš΄λ‘œλ“œ ν•  수 있게 ν•©λ‹ˆλ‹€.

1
2
3
dl_op = gcs_download_component(
	gcs_path="gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)

Advanced Topic in Pipelines

Conditional Execution of Pipeline Stages

Kubeflow Pipelinesμ—μ„œλŠ” dsl.Condition을 μ΄μš©ν•΄μ„œ 쑰건에 맞게 νŒŒμ΄ν”„λΌμΈμ„ μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€. ν•¨μˆ˜ λͺ‡ 개λ₯Ό μ˜€νΌλ ˆμ΄μ…˜μœΌλ‘œ λ°”κΎΈκ² μŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
    """Generate a random number between minimum and maximum (inclusive)"""
    import random
    result = random.randint(minimum, maximum)
    print(result)
    return result

@func_to_container_op
def process_small_op(data: int):
    """Process small numbers."""
    print("Processing small result", data)
    return

@func_to_container_op
def process_medium_op(data: int):
    """Process medium numbers."""
    print("Processing medium result", data)
    return

@func_to_container_op
def process_large_op(data: int):
    """Process large numbers."""
    print("Processing large result", data)
    return

여기에 dsl.Condition을 μ΄μš©ν•΄ 쑰건에 λ§žλŠ” μ˜€νΌλ ˆμ΄μ…˜μ„ μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dsl.pipeline(
	name="Conditional execution pipeline",
    description="Shows how to use dsl.Condition()."
)
def conditional_pipeline():
    number = get_random_int_op(0, 100).output
    with dsl.Condition(number < 10):
        process_small_op(number)
    with dsl.Condition(number > 10 and number < 50):
        process_medium_op(number)
    with dsl.Condition(number > 50):
        process_large_op(number)

kfp.Client().create_run_from_pipeline_func(conditional_pipeline, arguments={})

싀행이 μ™„λ£Œλ˜λ©΄ λ‹€μŒκ³Ό 같은 μ‹€ν–‰ κ·Έλž˜ν”„λ₯Ό 얻을 수 μžˆμŠ΅λ‹ˆλ‹€.

Conditional execution
Conditional execution

Running Pipelines on Schedule

νŒŒμ΄ν”„λΌμΈμ„ μŠ€μΌ€μ€„λ§ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€. νŒŒμ΄ν”„λΌμΈμ„ ν•œ 번 μ—…λ‘œλ“œ ν•œ λ‹€μŒ μ‹€ν–‰ νƒ€μž…μ„ β€œRecurringβ€μœΌλ‘œ μ„€μ •ν•œ λ‹€μŒ μŠ€μΌ€μ€„λ§ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Pipeline scheduling
Pipeline scheduling


This post is licensed under CC BY 4.0 by the author.