Kubeflow for ML - Chaper 4
Chapter 4: Kubeflow Pipelines
๐ ๋ณธ ํฌ์คํธ๋ Kubeflow for Machine Learning ์ฑ ์ ๋ฐ์ท/์์ฝํ๋ฉด์ ํ์ํ ๋ด์ฉ์ ์ถ๊ฐํ์ฌ ์์ฑํ์์ต๋๋ค.
- Chapter 1: Kubeflow: What It is and Who It Is For
- Chapter 2: Hello Kubeflow
- Chapter 3: Kubeflow Design: Beyond the Basics
- Chapter 4: Kubeflow Pipelines
- Chapter 5: Data and Feature Preparation
- Chapter 6: Artifact and Metadata Store
- Chapter 7: Training a Machine Learning Model
- Chapter 8: Model Inference
- Chapter 9: Case Study Using Multiple Tools
- Chapter 10: Hyperparameter Tuning And Automated Machine Learning
Getting Started with Pipelines
Kubeflow Pipelines ํ๋ซํผ์ ๋ค์์ ํฌํจํ๊ณ ์์ค๋น๋ค.
- ํ์ดํ๋ผ์ธ์ ๊ด๋ฆฌ, ์ถ์ ํ๊ณ ์คํํ ์ ์๋๋ก ํ๋ UI
- ํ์ดํ๋ผ์ธ ์คํ ์ค์ผ์ค๋ง์ ์ํ ์์ง
- Python์์ ํ์ดํ๋ผ์ธ์ ์ ์, ๊ตฌ์ถ, ๋ฐฐํฌํ๊ธฐ ์ํ SDK
- SDK์ ํ์ดํ๋ผ์ธ ์คํ์ ์ํ ๋ ธํธ๋ถ ์ง์
Building a Sipmle Pipeline in Python
Kubeflow Pipelines์ Argo๋ก ์คํ๋๋ YAML ํ์ผ๋ก ์ ์ฅ๋ฉ๋๋ค. Kubeflow์์๋ Python DSL (Domain-Specific Language)์ ์ด์ฉํด ํ์ดํ๋ผ์ธ์ ์์ฑํ ์ ์์ต๋๋ค. ํ์ดํ๋ผ์ธ์ ๋ณธ์ง์ ์ผ๋ก ์ปจํ ์ด๋ ์คํ์ผ๋ก ๊ตฌ์ฑ๋ ๊ทธ๋ํ์ ๋๋ค. ์คํํ ์ปจํ ์ด๋๋ฅผ ์์๋๋ก ์ง์ ํ ๋ฟ๋ง ์๋๋ผ ์ ์ฒด ํ์ดํ๋ผ์ธ๊ณผ ์ปจํ ์ด๋๋ค ์ฌ์ด์ ์ธ์๋ฅผ ์ ๋ฌํ ์๋ ์์ต๋๋ค.
ํ์ดํ๋ผ์ธ์ ๋ค์์ ์์๋ก ์์ ํฉ๋๋ค.
- ์ปจํ ์ด๋ ์์ฑ (๊ฐ๋จํ Python ํจ์๋ ๋์ปค ์ปจํ ์ด๋)
- ํด๋น ์ปจํ ์ด๋ ๋ฟ๋ง ์๋๋ผ ๋ช ๋ น์ค ์ธ์, ๋ฐ์ดํฐ ํ์ฌ, ์ปจํ ์ด๋์ ์ ๋ฌํ ๋ณ์ ๋ฑ์ ์ฐธ์กฐํ๋ ์์ (operation) ์์ฑ
- ๋ณ๋ ฌ๋ก ์งํํ ์์ ๊ณผ ์ฌ์ ์์ ๋ฑ์ ์ ์ํ์ฌ ์์๋๋ก ์ง์
- Python์ผ๋ก ์ ์ํ ํ์ดํ๋ผ์ธ์ Kubeflow Pipelines์ด ์ฌ์ฉํ ์ ์๋๋ก YAML๋ก ์ปดํ์ผ
๋ค์์ ๊ฐ๋จํ ํ์ดํ๋ผ์ธ์ ๋ง๋๋ ์์ ์ ๋๋ค. ์ฐ์ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ฑํ๋ ์คํผ๋ ์ด์ ์ ์์ฑํ๋ ์ฝ๋์ ๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from collections import namedtuple
from typing import NamedTuple
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.components as comp
def add(a: float, b: float) -> float:
"""Calculates sum of two arguments"""
return a+b
# ํจ์๋ฅผ ํ์ดํ๋ผ์ธ ์คํผ๋ ์ด์
์ผ๋ก ๋ณํ
add_op = comp.func_to_container_op(add)
def my_divmod(dividend: float, divisor: float) -> \
NamedTuple("MyDivmodOutput", [("quotient", float), ("remainder", float)]):
"""Divides two numbers and calculate the quotient and remainder"""
# ์ปดํฌ๋ํธ ํจ์ ์์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ํฌํธ
import numpy as np
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
divmod_output = namedtuple("MyDivmodOutput", ["quotient", "remainder"])
return divmod_output(quotient, remainder)
# ์ ํจ์๋ฅผ ํ์ดํ๋ผ์ธ ์คํผ๋ ์ด์
์ผ๋ก ๋ณํํ๋ฉด์ ๋ฒ ์ด์ค ์ด๋ฏธ์ง ๋ช
์
divmod_op = comp.func_to_container_op(
my_divmod,
base_image="tensorflow/tensorflow:1.14.0-py3"
)
ํ์ํ ์คํผ๋ ์ด์ ์ ๋ค ๋ง๋ค์๋ค๋ฉด ์ด์ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ฑํ๋ฉด ๋ฉ๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@dsl.pipeline(
name="Calculation pipeline",
description="A toy pipeline that performs arithmetic calculations."
)
def calc_pipeline(
a="a",
b="7",
c="17",
):
# ํ์ดํ๋ผ์ธ ํ๋ผ๋ฏธํฐ๋ฅผ ์ ๋ฌํ๊ณ ์คํผ๋ ์ด์
์ธ์๋ก ์์๋ฅผ ์ ๋ฌ
add_task = add_op(a, 4)
# ์์์ ์์ฑํ ํ์คํฌ์ ์์ํ์ ๋ค์ ์คํผ๋ ์ด์
์ ์ธ์๋ก ์ฌ์ฉ
# ์ ์คํผ๋ ์ด์
์ ๋จ์ผ๊ฐ์ ๋ฐํํ๋ฏ๋ก ๋จ์ํ `task.output`์ผ๋ก ์ ๋ฌ
divmod_task = divmod_op(add_task.output, b)
# ์ ์คํผ๋ ์ด์
์ ์ฌ๋ฌ ๊ฐ์ ๋ฐํํ๋ฏ๋ก `task.outputs["output_name"]`์ผ๋ก
# ๊ฐ ์ ๋ฌ
result_task = add_op(divmod_task.outputs["quotient"], c)
๋ง์ง๋ง์ผ๋ก ์คํ๊ณผ ์คํ์ ๋ํ ๋งํฌ๋ฅผ ๋ฐํํ๋๋ก ์คํ์ฉ ํ์ดํ๋ผ์ธ์ ํด๋ผ์ด์ธํธ์ ๋ณด๋ ๋๋ค.
1
2
3
4
5
6
client = kfp.Client()
arguments = {"a": "7", "b": "8"}
client.create_run_from_pipeline_func(
calc_pipeline,
arguments=arguments
)
์ฌ๋ฐ๋ฅด๊ฒ ์คํ์ด ๋๋ฉด ๋งํฌ๊ฐ ๋จ๋๋ฐ ์ด๋ฅผ ํด๋ฆญํ๋ฉด ํ์ดํ๋ผ์ธ ์คํ ๊ฒฐ๊ณผ๋ฅผ ๋ณผ ์ ์์ต๋๋ค.

Storing Data Between Steps
Kubeflow์์๋ ์ปจํ ์ด๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ ๋ ๋ ๊ฐ์ง ๋ฐฉ์์ ์ง์ํฉ๋๋ค. Kubernetes ํด๋ฌ์คํฐ ๋ด์ ํผ์์คํดํธ ๋ณผ๋ฅจ (persistent volumes)๊ณผ S3์ ๊ฐ์ ํด๋ผ์ฐ๋ ์ ์ฅ์ ์ต์ ์ ๋๋ค.
ํผ์์คํดํธ ๋ณผ๋ฅจ์ ์ ์ฅ์ ๋ ์ด์ด๋ฅผ ์ถ์ํํฉ๋๋ค. ํผ์์คํดํธ ๋ณผ๋ฅจ์ ๋ฒค๋์ ๋ฐ๋ผ ํ๋ก๋น์ ๋ ์๋๊ฐ ๋๋ฆฌ๊ณ IO ์ ํ์ด ์์ ์ ์์ต๋๋ค. ์ ์ฅ์ ํด๋์ค๋ ๋ค์ ์ค ํ๋๊ฐ ๋ฉ๋๋ค.
- ReadWriteOnce : ํ๋์ ๋ ธ๋์์ ํด๋น ๋ณผ๋ฅจ์ด ์ฝ๊ธฐ-์ฐ๊ธฐ๋ก ๋ง์ดํธ๋ ์ ์์
- ReadOnlyMany : ๋ณผ๋ฅจ์ด ๋ค์์ ๋ ธ๋์์ ์ฝ๊ธฐ ์ ์ฉ์ผ๋ก ๋ง์ดํธ๋ ์ ์์
- ReadWriteMany : ๋ณผ๋ฅจ์ด ๋ค์์ ๋ ธ๋์์ ์ฝ๊ธฐ-์ฐ๊ธฐ๋ก ๋ง์ดํธ๋ ์ ์์
Kubeflow Pipelines์์ VolumeOp
๋ฅผ ์ฌ์ฉํ๋ฉด ์๋์ผ๋ก ๊ด๋ฆฌ๋๋ ํผ์์คํดํธ ๋ณผ๋ฅจ์ ์์ฑํ ์ ์์ต๋๋ค.
1
2
3
4
dvop = dsl.VolumeOp(name="create_pvc",
resource_name="my-pvc-2",
size="5Gi",
modes=dsl.VOLUME_MODE_RWO)
์คํผ๋ ์ด์
์ ๋ณผ๋ฅจ์ ์ถ๊ฐํ๊ณ ์ถ๋ค๋ฉด add_pvolumes
์ ํธ์ถํ์ฌ ์ฌ์ฉํ๋ฉด ๋ฉ๋๋ค.
1
download_data_op(year).add_pvolumes({"/data_processing": dvop.volumeยด})
Kubeflow์ ๋นํธ์ธ file_output
๋ฉ์ปค๋์ฆ์ ํ์ดํ๋ผ์ธ ์ฌ์ด์์ ํน์ ๋ก์ปฌ ํ์ผ์ MinIO๋ก ์๋์ผ๋ก ์ ์กํ ์ ์๊ฒ ํฉ๋๋ค. file_output
์ ์ฌ์ฉํ์ฌ ์ปจํ
์ด๋ ๋ด์ ํ์ผ์ ์ธ ์๋ ์๊ณ ContainerOp
์ ํ๋ผ๋ฏธํฐ๋ฅผ ํน์ ํ ์๋ ์์ต๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
fecth = kfp.dsl.ContainerOp(
name="download",
image="busybox",
command=["sh", "-c"],
arguments=[
"sleep 1;",
"mkdir -p /tmp/data;",
"wget " + data_url +
" -O /tmp/dat/result.csv"
],
file_outputs={"downloaded", "/tmp/data"}
)
Introduction to Kubeflow Pipelines Components
Argo: the Foundation of Pipelines
Kubeflow Pipelines๋ Kubenetes๋ฅผ ์ฌ์ฉํ๋ ์ปจํ ์ด๋ ๋ค์ดํฐ๋ธ ์ํฌํ๋ก์ฐ ์์ง์ธ Argo Workflows๋ฅผ ๋น๋ํฉ๋๋ค. Kubeflow๋ฅผ ์ค์นํ ๋๋ ๋ชจ๋ Argo ์ปดํฌ๋ํธ๊ฐ ์ค์น๋๋๋ฐ์. Kubeflow Pipelines์ ์ฌ์ฉํ๊ธฐ ์ํด Argo๋ฅผ ์ค์นํ ํ์๋ ์์ง๋ง Argo ์ปค๋งจ๋๋ผ์ธ์ ํตํด ํ์ดํ๋ผ์ธ์ ๋๋ฒ๊น ํ ๋ ์ฌ์ฉํ ์ ์์ต๋๋ค. Argo ์ค์น๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
1
2
3
4
5
6
7
8
$ # Download the binary
$ curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64
$ # Make binary executable
$ chmod +x argo-linux-amd64
$ # Move binary to path
$ mv ./argo-linux-amd64 ~/bin/argo
์ค์น๊ฐ ์๋ฃ๋๋ค๋ฉด Manifest๋ฅผ ์ด์ฉํด Argo Pod์ ๋์์ค๋๋ค.
1
2
$ kubectl create ns argo
$ kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml
์ด์ ๋ค์์ ๋ช ๋ น์ด๋ก Kubeflow ๋ค์์คํ์ด์ค๋ก ํ์ดํ๋ผ์ธ์ ๋ณด๋ผ ์ ์์ต๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ argo submit -n kubeflow --watch https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/hello-world.yaml
$ argo list -n kubeflowgg
NAME STATUS AGE DURATION PRIORITY
hello-world-hxkrs Succeeded 4h 26s 0
$ argo get hello-world-hxkrs -n kubeflow
Name: hello-world-hxkrs
Namespace: kubeflow
ServiceAccount: default
Status: Succeeded
Conditions:
PodRunning False
Completed True
Created: Sun Jul 03 16:57:04 +0900 (4 hours ago)
Started: Sun Jul 03 16:57:04 +0900 (4 hours ago)
Finished: Sun Jul 03 16:57:30 +0900 (4 hours ago)
Duration: 26 seconds
ResourcesDuration: 14s*cpu,4s*memory
STEP TEMPLATE PODNAME DURATION MESSAGE
โ hello-world-hxkrs whalesay hello-world-hxkrs 15s
$ argo logs hello-world-hxkrs -n kubeflow
hello-world-hxkrs: time="2022-07-03T07:57:12.005Z" level=info msg="capturing logs" argo=true
hello-world-hxkrs: _____________
hello-world-hxkrs: < hello world >
hello-world-hxkrs: -------------
hello-world-hxkrs: \
hello-world-hxkrs: \
hello-world-hxkrs: \
hello-world-hxkrs: ## .
hello-world-hxkrs: ## ## ## ==
hello-world-hxkrs: ## ## ## ## ===
hello-world-hxkrs: /""""""""""""""""___/ ===
hello-world-hxkrs: ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~
hello-world-hxkrs: \______ o __/
hello-world-hxkrs: \ \ __/
hello-world-hxkrs: \____\______/
$ argo delete hello-world-hxkrs -n kubeflow
Workflow 'hello-world-hxkrs' deleted
What Kubeflow Pipelines Adds to Argo Workflow
Argo๋ฅผ ์ง์ ์ฌ์ฉํ๋ ค๋ฉด YAML๋ก ์ํฌํ๋ก์ฐ๋ฅผ ์ ์ํด์ผ ํ๊ณ ์ฝ๋๋ฅผ ์ปจํ ์ด๋ํํด์ผ ํ๋๋ฐ ๋งค์ฐ ๊น๋ค๋ก์ด ์์ ์ ๋๋ค. ๋ฐ๋ฉด Kubeflow Pipelines์ ์ฌ์ฉํ๋ฉด Python API๋ฅผ ์ด์ฉํด ํ์ดํ๋ผ์ธ์ ์ ์ํ๊ณ ์์ฑํ ์ ์๊ธฐ ๋๋ฌธ์ ํธํ๊ฒ ์์ ํ ์ ์์ต๋๋ค. ๋ ๋์๊ฐ ML ๊ด๋ จ ์ปดํฌ๋ํธ๋ฅผ ์ถ๊ฐํ๊ธฐ์๋ ํจ์ฌ ์ฉ์ดํฉ๋๋ค.
Building a Pipeline Using Existing Images
Kubeflow Pipelines์ ์ด๋ฏธ ๋น๋๋ Docker ์ด๋ฏธ์ง๋ฅผ ์ด์ฉํด ์ฌ๋ฌ ์ธ์ด๋ก ๊ตฌํ๋ ๊ฒ๋ค์ ์คํํด ์ค์ผ์คํธ๋ ์ด์ ํ๋ ๊ธฐ๋ฅ์ ๊ฐ๊ณ ์์ต๋๋ค.
๋ํ Python ๋ด์์ ์ง์ Kubernetes ํจ์๋ฅผ ์ฌ์ฉํ๋๋ก Kubernetes ํด๋ผ์ด์ธํธ๋ฅผ ์ํฌํธํ ์ ์์ต๋๋ค.
1
from kubernetes import client as k8s_client
๊ทธ๋ฆฌ๊ณ ํ์ดํ๋ผ์ธ์์ ์คํ์ ํ ๋ฒ๋ง ๋ง๋ค ์ ์๊ธฐ ๋๋ฌธ์ ๊ธฐ์กด์ ์คํ ์ด๋ฆ์ ํตํด ํด๋น ์คํ์ ์ฌ์ฉํ ์๋ ์์ต๋๋ค.
1
2
client = kfp.Client()
exp = client.get_experiment(experiment_name="mdupdate")
๋ค์ ์์ ๋ ์ด๋ฏธ ๋น๋๋์ด ์๋ ์ด๋ฏธ์ง๋ค์ ์ด์ฉํ์ฌ ํ์ดํ๋ผ์ธ์ ์์ฑํ๋ ์ฝ๋์
๋๋ค. ์๋ ์ฝ๋์์ ํ๋ฆฌ๋น๋๋ ์ปจํ
์ด๋๋ MINIO_*
ํ๊ฒฝ ๋ณ์๋ก ์ค์ ๋์ด ์์ด add_env_variable
์ ํธ์ถํ์ฌ ๋ก์ปฌ MinIO๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ๊ฐ ๋จ๊ณ๋ฅผ ๋ช
์ํ๊ธฐ ์ํด after
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dsl.pipeline(
name='Recommender model update',
description='Demonstrate usage of pipelines for multi-step model update')
def recommender_pipeline():
# Load new data
data = dsl.ContainerOp(
name='updatedata',
image='lightbend/recommender-data-update-publisher:0.2') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
# Train the model
train = dsl.ContainerOp(
name='trainmodel',
image='lightbend/ml-tf-recommender:0.1') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
train.after(data)
# Publish new model model
publish = dsl.ContainerOp(
name='publishmodel',
image='lightbend/recommender-model-publisher:0.2') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \
.add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS', value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \
.add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL', value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \
.add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL', value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))
publish.after(train)
์ด ํ์ดํ๋ผ์ธ์ ์ปดํ์ผํ๊ฑฐ๋ ๋ฐ๋ก create_run_from_pipeline_func
์ผ๋ก ์คํํ ์ ์์ต๋๋ค.
1
2
3
4
from kfp import compiler
compiler.Compiler().compile(recommender_pipeline, "pipeline.tar.gz")
run = client.run_pipeline(exp.id, "pipeline1", "pipeline.tar.gz")
Kubeflow Pipeline Components
Kubeflow Pipelines์ ๋ค๋ฅธ Kubernetes ๋ฆฌ์์ค๋ dataproc
๊ณผ ๊ฐ์ ์ธ๋ถ ์คํผ๋ ์ด์
์ ์ฌ์ฉํ๋ ์ปดํฌ๋ํธ๋ ์ ๊ณตํฉ๋๋ค. Kubeflow ์ปดํฌ๋ํธ๋ ML ํด์ ํจํค์งํ๋ ๋์์ ์ฌ์ฉํ ์ปจํ
์ด๋๋ CRD๋ฅผ ์ถ์ํํ ์ ์์ต๋๋ค.
func_to_container
๊ฐ์ ์ปดํฌ๋ํธ๋ Python ์ฝ๋๋ก ์ฌ์ฉํ ์ ์๊ณ , ์ด๋ค ์ปดํฌ๋ํธ๋ Kubeflow์ component.yaml
์์คํ
์ ์ฌ์ฉํ์ฌ ๋ถ๋ฌ์์ผ ํฉ๋๋ค. ๋ณธ ์ฑ
์์ ์ ์ํ๋ Kubeflow ์ปดํฌ๋ํธ๋ฅผ ์ ๋๋ก ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ์ ์ฅ์์ ํน์ ํ๊ทธ๋ฅผ load_components_from_file
์ ์ด์ฉํด ๋ค์ด๋ก๋ํ๋ ๊ฒ์
๋๋ค.
1
2
$ wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
$ tar -xvf 0.2.5.tar.gz
์ด์ ๋ค์ด๋ก๋ํ ์ปดํฌ๋ํธ๋ฅผ ๋ถ๋ฌ์ฌ ์ ์์ต๋๋ค.
1
2
3
gcs_download_component = kfp.components.load_component_from_file(
"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)
GCS ๋ค์ด๋ก๋ ์ปดํฌ๋ํธ๋ Google Cloud Storage ๊ฒฝ๋ก๋ฅผ ์ ๋ ฅํ์ฌ ํ์ผ์ ๋ค์ด๋ก๋ ํ ์ ์๊ฒ ํฉ๋๋ค.
1
2
3
dl_op = gcs_download_component(
gcs_path="gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)
Advanced Topic in Pipelines
Conditional Execution of Pipeline Stages
Kubeflow Pipelines์์๋ dsl.Condition
์ ์ด์ฉํด์ ์กฐ๊ฑด์ ๋ง๊ฒ ํ์ดํ๋ผ์ธ์ ์คํํ ์ ์์ต๋๋ค. ํจ์ ๋ช ๊ฐ๋ฅผ ์คํผ๋ ์ด์
์ผ๋ก ๋ฐ๊พธ๊ฒ ์ต๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath
@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
"""Generate a random number between minimum and maximum (inclusive)"""
import random
result = random.randint(minimum, maximum)
print(result)
return result
@func_to_container_op
def process_small_op(data: int):
"""Process small numbers."""
print("Processing small result", data)
return
@func_to_container_op
def process_medium_op(data: int):
"""Process medium numbers."""
print("Processing medium result", data)
return
@func_to_container_op
def process_large_op(data: int):
"""Process large numbers."""
print("Processing large result", data)
return
์ฌ๊ธฐ์ dsl.Condition
์ ์ด์ฉํด ์กฐ๊ฑด์ ๋ง๋ ์คํผ๋ ์ด์
์ ์คํํ ์ ์์ต๋๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dsl.pipeline(
name="Conditional execution pipeline",
description="Shows how to use dsl.Condition()."
)
def conditional_pipeline():
number = get_random_int_op(0, 100).output
with dsl.Condition(number < 10):
process_small_op(number)
with dsl.Condition(number > 10 and number < 50):
process_medium_op(number)
with dsl.Condition(number > 50):
process_large_op(number)
kfp.Client().create_run_from_pipeline_func(conditional_pipeline, arguments={})
์คํ์ด ์๋ฃ๋๋ฉด ๋ค์๊ณผ ๊ฐ์ ์คํ ๊ทธ๋ํ๋ฅผ ์ป์ ์ ์์ต๋๋ค.

Running Pipelines on Schedule
ํ์ดํ๋ผ์ธ์ ์ค์ผ์ค๋งํ ์๋ ์์ต๋๋ค. ํ์ดํ๋ผ์ธ์ ํ ๋ฒ ์ ๋ก๋ ํ ๋ค์ ์คํ ํ์ ์ โRecurringโ์ผ๋ก ์ค์ ํ ๋ค์ ์ค์ผ์ค๋งํ ์ ์์ต๋๋ค.
