Post

Kubeflow for ML - Chaper 4



Kubeflow for ML - Chaper 4

Chapter 4: Kubeflow Pipelines

๐Ÿ‘€ ๋ณธ ํฌ์ŠคํŠธ๋Š” Kubeflow for Machine Learning ์ฑ…์„ ๋ฐœ์ทŒ/์š”์•ฝํ•˜๋ฉด์„œ ํ•„์š”ํ•œ ๋‚ด์šฉ์€ ์ถ”๊ฐ€ํ•˜์—ฌ ์ž‘์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค.

Getting Started with Pipelines

Kubeflow Pipelines ํ”Œ๋žซํผ์€ ๋‹ค์Œ์„ ํฌํ•จํ•˜๊ณ  ์žˆ์Šค๋นˆ๋‹ค.

  • ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ด€๋ฆฌ, ์ถ”์ ํ•˜๊ณ  ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” UI
  • ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰ ์Šค์ผ€์ค„๋ง์„ ์œ„ํ•œ ์—”์ง„
  • Python์—์„œ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ •์˜, ๊ตฌ์ถ•, ๋ฐฐํฌํ•˜๊ธฐ ์œ„ํ•œ SDK
  • SDK์™€ ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰์„ ์œ„ํ•œ ๋…ธํŠธ๋ถ ์ง€์›

Building a Sipmle Pipeline in Python

Kubeflow Pipelines์€ Argo๋กœ ์‹คํ–‰๋˜๋Š” YAML ํŒŒ์ผ๋กœ ์ €์žฅ๋ฉ๋‹ˆ๋‹ค. Kubeflow์—์„œ๋Š” Python DSL (Domain-Specific Language)์„ ์ด์šฉํ•ด ํŒŒ์ดํ”„๋ผ์ธ์„ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŒŒ์ดํ”„๋ผ์ธ์€ ๋ณธ์งˆ์ ์œผ๋กœ ์ปจํ…Œ์ด๋„ˆ ์‹คํ–‰์œผ๋กœ ๊ตฌ์„ฑ๋œ ๊ทธ๋ž˜ํ”„์ž…๋‹ˆ๋‹ค. ์‹คํ–‰ํ•  ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ˆœ์„œ๋Œ€๋กœ ์ง€์ •ํ•  ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ๊ณผ ์ปจํ…Œ์ด๋„ˆ๋“ค ์‚ฌ์ด์— ์ธ์ž๋ฅผ ์ „๋‹ฌํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

ํŒŒ์ดํ”„๋ผ์ธ์€ ๋‹ค์Œ์˜ ์ˆœ์„œ๋กœ ์ž‘์—…ํ•ฉ๋‹ˆ๋‹ค.

  • ์ปจํ…Œ์ด๋„ˆ ์ƒ์„ฑ (๊ฐ„๋‹จํ•œ Python ํ•จ์ˆ˜๋‚˜ ๋„์ปค ์ปจํ…Œ์ด๋„ˆ)
  • ํ•ด๋‹น ์ปจํ…Œ์ด๋„ˆ ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๋ช…๋ น์ค„ ์ธ์ˆ˜, ๋ฐ์ดํ„ฐ ํƒ‘์žฌ, ์ปจํ…Œ์ด๋„ˆ์— ์ „๋‹ฌํ•  ๋ณ€์ˆ˜ ๋“ฑ์„ ์ฐธ์กฐํ•˜๋Š” ์ž‘์—…(operation) ์ƒ์„ฑ
  • ๋ณ‘๋ ฌ๋กœ ์ง„ํ–‰ํ•  ์ž‘์—…๊ณผ ์‚ฌ์ „ ์ž‘์—… ๋“ฑ์„ ์ •์˜ํ•˜์—ฌ ์ˆœ์„œ๋Œ€๋กœ ์ง€์ •
  • Python์œผ๋กœ ์ •์˜ํ•œ ํŒŒ์ดํ”„๋ผ์ธ์„ Kubeflow Pipelines์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก YAML๋กœ ์ปดํŒŒ์ผ

๋‹ค์Œ์€ ๊ฐ„๋‹จํ•œ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“œ๋Š” ์˜ˆ์ œ์ž…๋‹ˆ๋‹ค. ์šฐ์„  ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์„ฑํ•˜๋Š” ์˜คํผ๋ ˆ์ด์…˜์„ ์ƒ์„ฑํ•˜๋Š” ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from collections import namedtuple
from typing import NamedTuple

import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.components as comp

def add(a: float, b: float) -> float:
    """Calculates sum of two arguments"""
    return a+b

# ํ•จ์ˆ˜๋ฅผ ํŒŒ์ดํ”„๋ผ์ธ ์˜คํผ๋ ˆ์ด์…˜์œผ๋กœ ๋ณ€ํ™˜
add_op = comp.func_to_container_op(add)

def my_divmod(dividend: float, divisor: float) -> \
	NamedTuple("MyDivmodOutput", [("quotient", float), ("remainder", float)]):
    """Divides two numbers and calculate the quotient and remainder"""
    # ์ปดํฌ๋„ŒํŠธ ํ•จ์ˆ˜ ์•ˆ์— ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์ž„ํฌํŠธ
    import numpy as np
    
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)
    
    (quotient, remainder) = divmod_helper(dividend, divisor)
    
    divmod_output = namedtuple("MyDivmodOutput", ["quotient", "remainder"])
    return divmod_output(quotient, remainder)

# ์œ„ ํ•จ์ˆ˜๋ฅผ ํŒŒ์ดํ”„๋ผ์ธ ์˜คํผ๋ ˆ์ด์…˜์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋ฉด์„œ ๋ฒ ์ด์Šค ์ด๋ฏธ์ง€ ๋ช…์‹œ
divmod_op = comp.func_to_container_op(
    my_divmod, 
    base_image="tensorflow/tensorflow:1.14.0-py3"
)

ํ•„์š”ํ•œ ์˜คํผ๋ ˆ์ด์…˜์„ ๋‹ค ๋งŒ๋“ค์—ˆ๋‹ค๋ฉด ์ด์ œ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์„ฑํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@dsl.pipeline(
	name="Calculation pipeline",
    description="A toy pipeline that performs arithmetic calculations."
)
def calc_pipeline(
	a="a",
    b="7",
    c="17",
):
    # ํŒŒ์ดํ”„๋ผ์ธ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ „๋‹ฌํ•˜๊ณ  ์˜คํผ๋ ˆ์ด์…˜ ์ธ์ž๋กœ ์ƒ์ˆ˜๋ฅผ ์ „๋‹ฌ
    add_task = add_op(a, 4)
    
    # ์œ„์—์„œ ์ƒ์„ฑํ•œ ํƒœ์Šคํฌ์˜ ์•„์›ƒํ’‹์„ ๋‹ค์Œ ์˜คํผ๋ ˆ์ด์…˜์˜ ์ธ์ž๋กœ ์‚ฌ์šฉ
    # ์œ„ ์˜คํผ๋ ˆ์ด์…˜์€ ๋‹จ์ผ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋ฏ€๋กœ ๋‹จ์ˆœํžˆ `task.output`์œผ๋กœ ์ „๋‹ฌ
    divmod_task = divmod_op(add_task.output, b)
    
    # ์œ„ ์˜คํผ๋ ˆ์ด์…˜์€ ์—ฌ๋Ÿฌ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋ฏ€๋กœ `task.outputs["output_name"]`์œผ๋กœ
    # ๊ฐ’ ์ „๋‹ฌ
    result_task = add_op(divmod_task.outputs["quotient"], c)

๋งˆ์ง€๋ง‰์œผ๋กœ ์‹คํ–‰๊ณผ ์‹คํ—˜์— ๋Œ€ํ•œ ๋งํฌ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ์‹คํ–‰์šฉ ํŒŒ์ดํ”„๋ผ์ธ์„ ํด๋ผ์ด์–ธํŠธ์— ๋ณด๋ƒ…๋‹ˆ๋‹ค.

1
2
3
4
5
6
client = kfp.Client()
arguments = {"a": "7", "b": "8"}
client.create_run_from_pipeline_func(
    calc_pipeline, 
    arguments=arguments
)

์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์‹คํ–‰์ด ๋˜๋ฉด ๋งํฌ๊ฐ€ ๋œจ๋Š”๋ฐ ์ด๋ฅผ ํด๋ฆญํ•˜๋ฉด ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Pipeline execution
Pipeline execution

Storing Data Between Steps

Kubeflow์—์„œ๋Š” ์ปจํ…Œ์ด๋„ˆ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌํ•  ๋•Œ ๋‘ ๊ฐ€์ง€ ๋ฐฉ์‹์„ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค. Kubernetes ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์˜ ํผ์‹œ์Šคํ„ดํŠธ ๋ณผ๋ฅจ (persistent volumes)๊ณผ S3์™€ ๊ฐ™์€ ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ ์˜ต์…˜์ž…๋‹ˆ๋‹ค.

ํผ์‹œ์Šคํ„ดํŠธ ๋ณผ๋ฅจ์€ ์ €์žฅ์†Œ ๋ ˆ์ด์–ด๋ฅผ ์ถ”์ƒํ™”ํ•ฉ๋‹ˆ๋‹ค. ํผ์‹œ์Šคํ„ดํŠธ ๋ณผ๋ฅจ์€ ๋ฒค๋”์— ๋”ฐ๋ผ ํ”„๋กœ๋น„์ €๋‹ ์†๋„๊ฐ€ ๋А๋ฆฌ๊ณ  IO ์ œํ•œ์ด ์žˆ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ €์žฅ์†Œ ํด๋ž˜์Šค๋Š” ๋‹ค์Œ ์ค‘ ํ•˜๋‚˜๊ฐ€ ๋ฉ๋‹ˆ๋‹ค.

  • ReadWriteOnce : ํ•˜๋‚˜์˜ ๋…ธ๋“œ์—์„œ ํ•ด๋‹น ๋ณผ๋ฅจ์ด ์ฝ๊ธฐ-์“ฐ๊ธฐ๋กœ ๋งˆ์šดํŠธ๋  ์ˆ˜ ์žˆ์Œ
  • ReadOnlyMany : ๋ณผ๋ฅจ์ด ๋‹ค์ˆ˜์˜ ๋…ธ๋“œ์—์„œ ์ฝ๊ธฐ ์ „์šฉ์œผ๋กœ ๋งˆ์šดํŠธ๋  ์ˆ˜ ์žˆ์Œ
  • ReadWriteMany : ๋ณผ๋ฅจ์ด ๋‹ค์ˆ˜์˜ ๋…ธ๋“œ์—์„œ ์ฝ๊ธฐ-์“ฐ๊ธฐ๋กœ ๋งˆ์šดํŠธ๋  ์ˆ˜ ์žˆ์Œ

Kubeflow Pipelines์—์„œ VolumeOp๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ž๋™์œผ๋กœ ๊ด€๋ฆฌ๋˜๋Š” ํผ์‹œ์Šคํ„ดํŠธ ๋ณผ๋ฅจ์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
dvop = dsl.VolumeOp(name="create_pvc",
                    resource_name="my-pvc-2",
                    size="5Gi",
                    modes=dsl.VOLUME_MODE_RWO)

์˜คํผ๋ ˆ์ด์…˜์— ๋ณผ๋ฅจ์„ ์ถ”๊ฐ€ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด add_pvolumes์„ ํ˜ธ์ถœํ•˜์—ฌ ์‚ฌ์šฉํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

1
download_data_op(year).add_pvolumes({"/data_processing": dvop.volumeยด})

Kubeflow์˜ ๋นŒํŠธ์ธ file_output ๋ฉ”์ปค๋‹ˆ์ฆ˜์€ ํŒŒ์ดํ”„๋ผ์ธ ์‚ฌ์ด์—์„œ ํŠน์ • ๋กœ์ปฌ ํŒŒ์ผ์„ MinIO๋กœ ์ž๋™์œผ๋กœ ์ „์†กํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค. file_output์„ ์‚ฌ์šฉํ•˜์—ฌ ์ปจํ…Œ์ด๋„ˆ ๋‚ด์— ํŒŒ์ผ์„ ์“ธ ์ˆ˜๋„ ์žˆ๊ณ  ContainerOp์— ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ํŠน์ •ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
fecth = kfp.dsl.ContainerOp(
	name="download",
    image="busybox",
    command=["sh", "-c"],
    arguments=[
        "sleep 1;",
        "mkdir -p /tmp/data;",
        "wget " + data_url + 
        " -O /tmp/dat/result.csv"
    ],
    file_outputs={"downloaded", "/tmp/data"}
)

Introduction to Kubeflow Pipelines Components

Argo: the Foundation of Pipelines

Kubeflow Pipelines๋Š” Kubenetes๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ปจํ…Œ์ด๋„ˆ ๋„ค์ดํ‹ฐ๋ธŒ ์›Œํฌํ”Œ๋กœ์šฐ ์—”์ง„์ธ Argo Workflows๋ฅผ ๋นŒ๋“œํ•ฉ๋‹ˆ๋‹ค. Kubeflow๋ฅผ ์„ค์น˜ํ•  ๋•Œ๋„ ๋ชจ๋“  Argo ์ปดํฌ๋„ŒํŠธ๊ฐ€ ์„ค์น˜๋˜๋Š”๋ฐ์š”. Kubeflow Pipelines์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด Argo๋ฅผ ์„ค์น˜ํ•  ํ•„์š”๋Š” ์—†์ง€๋งŒ Argo ์ปค๋งจ๋“œ๋ผ์ธ์„ ํ†ตํ•ด ํŒŒ์ดํ”„๋ผ์ธ์„ ๋””๋ฒ„๊น…ํ•  ๋•Œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Argo ์„ค์น˜๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
$ # Download the binary 
$ curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64

$ # Make binary executable 
$ chmod +x argo-linux-amd64

$ # Move binary to path 
$ mv ./argo-linux-amd64 ~/bin/argo

์„ค์น˜๊ฐ€ ์™„๋ฃŒ๋๋‹ค๋ฉด Manifest๋ฅผ ์ด์šฉํ•ด Argo Pod์„ ๋„์›Œ์ค๋‹ˆ๋‹ค.

1
2
$ kubectl create ns argo
$ kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml

์ด์ œ ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋กœ Kubeflow ๋„ค์ž„์ŠคํŽ˜์ด์Šค๋กœ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ argo submit -n kubeflow --watch https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/hello-world.yaml

$ argo list -n kubeflowgg
NAME                STATUS      AGE   DURATION   PRIORITY
hello-world-hxkrs   Succeeded   4h    26s        0

$ argo get hello-world-hxkrs -n kubeflow
Name:                hello-world-hxkrs
Namespace:           kubeflow
ServiceAccount:      default
Status:              Succeeded
Conditions:
 PodRunning          False
 Completed           True
Created:             Sun Jul 03 16:57:04 +0900 (4 hours ago)
Started:             Sun Jul 03 16:57:04 +0900 (4 hours ago)
Finished:            Sun Jul 03 16:57:30 +0900 (4 hours ago)
Duration:            26 seconds
ResourcesDuration:   14s*cpu,4s*memory

STEP                  TEMPLATE  PODNAME            DURATION  MESSAGE
 โœ” hello-world-hxkrs  whalesay  hello-world-hxkrs  15s
 
$ argo logs hello-world-hxkrs -n kubeflow
hello-world-hxkrs: time="2022-07-03T07:57:12.005Z" level=info msg="capturing logs" argo=true
hello-world-hxkrs:  _____________
hello-world-hxkrs: < hello world >
hello-world-hxkrs:  -------------
hello-world-hxkrs:     \
hello-world-hxkrs:      \
hello-world-hxkrs:       \
hello-world-hxkrs:                     ##        .
hello-world-hxkrs:               ## ## ##       ==
hello-world-hxkrs:            ## ## ## ##      ===
hello-world-hxkrs:        /""""""""""""""""___/ ===
hello-world-hxkrs:   ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ /  ===- ~~~
hello-world-hxkrs:        \______ o          __/
hello-world-hxkrs:         \    \        __/
hello-world-hxkrs:           \____\______/

$ argo delete hello-world-hxkrs -n kubeflow
Workflow 'hello-world-hxkrs' deleted

What Kubeflow Pipelines Adds to Argo Workflow

Argo๋ฅผ ์ง์ ‘ ์‚ฌ์šฉํ•˜๋ ค๋ฉด YAML๋กœ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ์ •์˜ํ•ด์•ผ ํ•˜๊ณ  ์ฝ”๋“œ๋ฅผ ์ปจํ…Œ์ด๋„ˆํ™”ํ•ด์•ผ ํ•˜๋Š”๋ฐ ๋งค์šฐ ๊นŒ๋‹ค๋กœ์šด ์ž‘์—…์ž…๋‹ˆ๋‹ค. ๋ฐ˜๋ฉด Kubeflow Pipelines์„ ์‚ฌ์šฉํ•˜๋ฉด Python API๋ฅผ ์ด์šฉํ•ด ํŒŒ์ดํ”„๋ผ์ธ์„ ์ •์˜ํ•˜๊ณ  ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ํŽธํ•˜๊ฒŒ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋” ๋‚˜์•„๊ฐ€ ML ๊ด€๋ จ ์ปดํฌ๋„ŒํŠธ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ธฐ์—๋„ ํ›จ์”ฌ ์šฉ์ดํ•ฉ๋‹ˆ๋‹ค.

Building a Pipeline Using Existing Images

Kubeflow Pipelines์€ ์ด๋ฏธ ๋นŒ๋“œ๋œ Docker ์ด๋ฏธ์ง€๋ฅผ ์ด์šฉํ•ด ์—ฌ๋Ÿฌ ์–ธ์–ด๋กœ ๊ตฌํ˜„๋œ ๊ฒƒ๋“ค์„ ์‹คํ–‰ํ•ด ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ๊ฐ–๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

๋˜ํ•œ Python ๋‚ด์—์„œ ์ง์ ‘ Kubernetes ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋„๋ก Kubernetes ํด๋ผ์ด์–ธํŠธ๋ฅผ ์ž„ํฌํŠธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
from kubernetes import client as k8s_client

๊ทธ๋ฆฌ๊ณ  ํŒŒ์ดํ”„๋ผ์ธ์—์„œ ์‹คํ—˜์€ ํ•œ ๋ฒˆ๋งŒ ๋งŒ๋“ค ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๊ธฐ์กด์˜ ์‹คํ—˜ ์ด๋ฆ„์„ ํ†ตํ•ด ํ•ด๋‹น ์‹คํ—˜์„ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
client = kfp.Client()
exp = client.get_experiment(experiment_name="mdupdate")

๋‹ค์Œ ์˜ˆ์ œ๋Š” ์ด๋ฏธ ๋นŒ๋“œ๋˜์–ด ์žˆ๋Š” ์ด๋ฏธ์ง€๋“ค์„ ์ด์šฉํ•˜์—ฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•˜๋Š” ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ์—์„œ ํ”„๋ฆฌ๋นŒ๋“œ๋œ ์ปจํ…Œ์ด๋„ˆ๋Š” MINIO_* ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋กœ ์„ค์ •๋˜์–ด ์žˆ์–ด add_env_variable์„ ํ˜ธ์ถœํ•˜์—ฌ ๋กœ์ปฌ MinIO๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๊ฐ ๋‹จ๊ณ„๋ฅผ ๋ช…์‹œํ•˜๊ธฐ ์œ„ํ•ด after ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dsl.pipeline(
    name='Recommender model update',
    description='Demonstrate usage of pipelines for multi-step model update')
def recommender_pipeline():
    # Load new data
    data = dsl.ContainerOp(
        name='updatedata',
        image='lightbend/recommender-data-update-publisher:0.2') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
    # Train the model
    train = dsl.ContainerOp(
        name='trainmodel',
        image='lightbend/ml-tf-recommender:0.1') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
    train.after(data)
    # Publish new model model
    publish = dsl.ContainerOp(
        name='publishmodel',
        image='lightbend/recommender-model-publisher:0.2') \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
      .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \
      .add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS', value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \
      .add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL', value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \
      .add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL', value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))
    publish.after(train)

์ด ํŒŒ์ดํ”„๋ผ์ธ์„ ์ปดํŒŒ์ผํ•˜๊ฑฐ๋‚˜ ๋ฐ”๋กœ create_run_from_pipeline_func์œผ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
from kfp import compiler
compiler.Compiler().compile(recommender_pipeline, "pipeline.tar.gz")

run = client.run_pipeline(exp.id, "pipeline1", "pipeline.tar.gz")

Kubeflow Pipeline Components

Kubeflow Pipelines์€ ๋‹ค๋ฅธ Kubernetes ๋ฆฌ์†Œ์Šค๋‚˜ dataproc๊ณผ ๊ฐ™์€ ์™ธ๋ถ€ ์˜คํผ๋ ˆ์ด์…˜์„ ์‚ฌ์šฉํ•˜๋Š” ์ปดํฌ๋„ŒํŠธ๋„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. Kubeflow ์ปดํฌ๋„ŒํŠธ๋Š” ML ํˆด์„ ํŒจํ‚ค์ง•ํ•˜๋Š” ๋™์‹œ์— ์‚ฌ์šฉํ•œ ์ปจํ…Œ์ด๋„ˆ๋‚˜ CRD๋ฅผ ์ถ”์ƒํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

func_to_container ๊ฐ™์€ ์ปดํฌ๋„ŒํŠธ๋Š” Python ์ฝ”๋“œ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ณ , ์–ด๋–ค ์ปดํฌ๋„ŒํŠธ๋Š” Kubeflow์˜ component.yaml ์‹œ์Šคํ…œ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ถˆ๋Ÿฌ์™€์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋ณธ ์ฑ…์—์„œ ์ œ์•ˆํ•˜๋Š” Kubeflow ์ปดํฌ๋„ŒํŠธ๋ฅผ ์ œ๋Œ€๋กœ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์ €์žฅ์†Œ์˜ ํŠน์ • ํƒœ๊ทธ๋ฅผ load_components_from_file์„ ์ด์šฉํ•ด ๋‹ค์šด๋กœ๋“œํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

1
2
$ wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
$ tar -xvf 0.2.5.tar.gz

์ด์ œ ๋‹ค์šด๋กœ๋“œํ•œ ์ปดํฌ๋„ŒํŠธ๋ฅผ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
gcs_download_component = kfp.components.load_component_from_file(
	"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)

GCS ๋‹ค์šด๋กœ๋“œ ์ปดํฌ๋„ŒํŠธ๋Š” Google Cloud Storage ๊ฒฝ๋กœ๋ฅผ ์ž…๋ ฅํ•˜์—ฌ ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œ ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.

1
2
3
dl_op = gcs_download_component(
	gcs_path="gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)

Advanced Topic in Pipelines

Conditional Execution of Pipeline Stages

Kubeflow Pipelines์—์„œ๋Š” dsl.Condition์„ ์ด์šฉํ•ด์„œ ์กฐ๊ฑด์— ๋งž๊ฒŒ ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•จ์ˆ˜ ๋ช‡ ๊ฐœ๋ฅผ ์˜คํผ๋ ˆ์ด์…˜์œผ๋กœ ๋ฐ”๊พธ๊ฒ ์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
    """Generate a random number between minimum and maximum (inclusive)"""
    import random
    result = random.randint(minimum, maximum)
    print(result)
    return result

@func_to_container_op
def process_small_op(data: int):
    """Process small numbers."""
    print("Processing small result", data)
    return

@func_to_container_op
def process_medium_op(data: int):
    """Process medium numbers."""
    print("Processing medium result", data)
    return

@func_to_container_op
def process_large_op(data: int):
    """Process large numbers."""
    print("Processing large result", data)
    return

์—ฌ๊ธฐ์— dsl.Condition์„ ์ด์šฉํ•ด ์กฐ๊ฑด์— ๋งž๋Š” ์˜คํผ๋ ˆ์ด์…˜์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dsl.pipeline(
	name="Conditional execution pipeline",
    description="Shows how to use dsl.Condition()."
)
def conditional_pipeline():
    number = get_random_int_op(0, 100).output
    with dsl.Condition(number < 10):
        process_small_op(number)
    with dsl.Condition(number > 10 and number < 50):
        process_medium_op(number)
    with dsl.Condition(number > 50):
        process_large_op(number)

kfp.Client().create_run_from_pipeline_func(conditional_pipeline, arguments={})

์‹คํ–‰์ด ์™„๋ฃŒ๋˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์‹คํ–‰ ๊ทธ๋ž˜ํ”„๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Conditional execution
Conditional execution

Running Pipelines on Schedule

ํŒŒ์ดํ”„๋ผ์ธ์„ ์Šค์ผ€์ค„๋งํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ํŒŒ์ดํ”„๋ผ์ธ์„ ํ•œ ๋ฒˆ ์—…๋กœ๋“œ ํ•œ ๋‹ค์Œ ์‹คํ–‰ ํƒ€์ž…์„ โ€œRecurringโ€์œผ๋กœ ์„ค์ •ํ•œ ๋‹ค์Œ ์Šค์ผ€์ค„๋งํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Pipeline scheduling
Pipeline scheduling


This post is licensed under CC BY 4.0 by the author.