Kubeflow for ML - Chaper 4
Chapter 4: Kubeflow Pipelines
π λ³Έ ν¬μ€νΈλ Kubeflow for Machine Learning μ± μ λ°μ·/μμ½νλ©΄μ νμν λ΄μ©μ μΆκ°νμ¬ μμ±νμμ΅λλ€.
- Chapter 1: Kubeflow: What It is and Who It Is For
- Chapter 2: Hello Kubeflow
- Chapter 3: Kubeflow Design: Beyond the Basics
- Chapter 4: Kubeflow Pipelines
- Chapter 5: Data and Feature Preparation
- Chapter 6: Artifact and Metadata Store
- Chapter 7: Training a Machine Learning Model
- Chapter 8: Model Inference
- Chapter 9: Case Study Using Multiple Tools
- Chapter 10: Hyperparameter Tuning And Automated Machine Learning
Getting Started with Pipelines
Kubeflow Pipelines νλ«νΌμ λ€μμ ν¬ν¨νκ³ μμ€λΉλ€.
- νμ΄νλΌμΈμ κ΄λ¦¬, μΆμ νκ³ μ€νν μ μλλ‘ νλ UI
- νμ΄νλΌμΈ μ€ν μ€μΌμ€λ§μ μν μμ§
- Pythonμμ νμ΄νλΌμΈμ μ μ, ꡬμΆ, λ°°ν¬νκΈ° μν SDK
- SDKμ νμ΄νλΌμΈ μ€νμ μν λ ΈνΈλΆ μ§μ
Building a Sipmle Pipeline in Python
Kubeflow Pipelinesμ Argoλ‘ μ€νλλ YAML νμΌλ‘ μ μ₯λ©λλ€. Kubeflowμμλ Python DSL (Domain-Specific Language)μ μ΄μ©ν΄ νμ΄νλΌμΈμ μμ±ν μ μμ΅λλ€. νμ΄νλΌμΈμ λ³Έμ§μ μΌλ‘ 컨ν μ΄λ μ€νμΌλ‘ ꡬμ±λ κ·Έλνμ λλ€. μ€νν 컨ν μ΄λλ₯Ό μμλλ‘ μ§μ ν λΏλ§ μλλΌ μ 체 νμ΄νλΌμΈκ³Ό 컨ν μ΄λλ€ μ¬μ΄μ μΈμλ₯Ό μ λ¬ν μλ μμ΅λλ€.
νμ΄νλΌμΈμ λ€μμ μμλ‘ μμ ν©λλ€.
- 컨ν μ΄λ μμ± (κ°λ¨ν Python ν¨μλ λ컀 컨ν μ΄λ)
- ν΄λΉ 컨ν μ΄λ λΏλ§ μλλΌ λͺ λ Ήμ€ μΈμ, λ°μ΄ν° νμ¬, 컨ν μ΄λμ μ λ¬ν λ³μ λ±μ μ°Έμ‘°νλ μμ (operation) μμ±
- λ³λ ¬λ‘ μ§νν μμ κ³Ό μ¬μ μμ λ±μ μ μνμ¬ μμλλ‘ μ§μ
- PythonμΌλ‘ μ μν νμ΄νλΌμΈμ Kubeflow Pipelinesμ΄ μ¬μ©ν μ μλλ‘ YAMLλ‘ μ»΄νμΌ
λ€μμ κ°λ¨ν νμ΄νλΌμΈμ λ§λλ μμ μ λλ€. μ°μ νμ΄νλΌμΈμ ꡬμ±νλ μ€νΌλ μ΄μ μ μμ±νλ μ½λμ λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from collections import namedtuple
from typing import NamedTuple
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.components as comp
def add(a: float, b: float) -> float:
"""Calculates sum of two arguments"""
return a+b
# ν¨μλ₯Ό νμ΄νλΌμΈ μ€νΌλ μ΄μ
μΌλ‘ λ³ν
add_op = comp.func_to_container_op(add)
def my_divmod(dividend: float, divisor: float) -> \
NamedTuple("MyDivmodOutput", [("quotient", float), ("remainder", float)]):
"""Divides two numbers and calculate the quotient and remainder"""
# μ»΄ν¬λνΈ ν¨μ μμ λΌμ΄λΈλ¬λ¦¬ μν¬νΈ
import numpy as np
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
divmod_output = namedtuple("MyDivmodOutput", ["quotient", "remainder"])
return divmod_output(quotient, remainder)
# μ ν¨μλ₯Ό νμ΄νλΌμΈ μ€νΌλ μ΄μ
μΌλ‘ λ³ννλ©΄μ λ² μ΄μ€ μ΄λ―Έμ§ λͺ
μ
divmod_op = comp.func_to_container_op(
my_divmod,
base_image="tensorflow/tensorflow:1.14.0-py3"
)
νμν μ€νΌλ μ΄μ μ λ€ λ§λ€μλ€λ©΄ μ΄μ νμ΄νλΌμΈμ ꡬμ±νλ©΄ λ©λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@dsl.pipeline(
name="Calculation pipeline",
description="A toy pipeline that performs arithmetic calculations."
)
def calc_pipeline(
a="a",
b="7",
c="17",
):
# νμ΄νλΌμΈ νλΌλ―Έν°λ₯Ό μ λ¬νκ³ μ€νΌλ μ΄μ
μΈμλ‘ μμλ₯Ό μ λ¬
add_task = add_op(a, 4)
# μμμ μμ±ν νμ€ν¬μ μμνμ λ€μ μ€νΌλ μ΄μ
μ μΈμλ‘ μ¬μ©
# μ μ€νΌλ μ΄μ
μ λ¨μΌκ°μ λ°ννλ―λ‘ λ¨μν `task.output`μΌλ‘ μ λ¬
divmod_task = divmod_op(add_task.output, b)
# μ μ€νΌλ μ΄μ
μ μ¬λ¬ κ°μ λ°ννλ―λ‘ `task.outputs["output_name"]`μΌλ‘
# κ° μ λ¬
result_task = add_op(divmod_task.outputs["quotient"], c)
λ§μ§λ§μΌλ‘ μ€νκ³Ό μ€νμ λν λ§ν¬λ₯Ό λ°ννλλ‘ μ€νμ© νμ΄νλΌμΈμ ν΄λΌμ΄μΈνΈμ 보λ λλ€.
1
2
3
4
5
6
client = kfp.Client()
arguments = {"a": "7", "b": "8"}
client.create_run_from_pipeline_func(
calc_pipeline,
arguments=arguments
)
μ¬λ°λ₯΄κ² μ€νμ΄ λλ©΄ λ§ν¬κ° λ¨λλ° μ΄λ₯Ό ν΄λ¦νλ©΄ νμ΄νλΌμΈ μ€ν κ²°κ³Όλ₯Ό λ³Ό μ μμ΅λλ€.
Storing Data Between Steps
Kubeflowμμλ 컨ν μ΄λλ‘ λ°μ΄ν°λ₯Ό μ λ¬ν λ λ κ°μ§ λ°©μμ μ§μν©λλ€. Kubernetes ν΄λ¬μ€ν° λ΄μ νΌμμ€ν΄νΈ λ³Όλ₯¨ (persistent volumes)κ³Ό S3μ κ°μ ν΄λΌμ°λ μ μ₯μ μ΅μ μ λλ€.
νΌμμ€ν΄νΈ λ³Όλ₯¨μ μ μ₯μ λ μ΄μ΄λ₯Ό μΆμνν©λλ€. νΌμμ€ν΄νΈ λ³Όλ₯¨μ λ²€λμ λ°λΌ νλ‘λΉμ λ μλκ° λλ¦¬κ³ IO μ νμ΄ μμ μ μμ΅λλ€. μ μ₯μ ν΄λμ€λ λ€μ μ€ νλκ° λ©λλ€.
- ReadWriteOnce : νλμ λ Έλμμ ν΄λΉ λ³Όλ₯¨μ΄ μ½κΈ°-μ°κΈ°λ‘ λ§μ΄νΈλ μ μμ
- ReadOnlyMany : λ³Όλ₯¨μ΄ λ€μμ λ Έλμμ μ½κΈ° μ μ©μΌλ‘ λ§μ΄νΈλ μ μμ
- ReadWriteMany : λ³Όλ₯¨μ΄ λ€μμ λ Έλμμ μ½κΈ°-μ°κΈ°λ‘ λ§μ΄νΈλ μ μμ
Kubeflow Pipelinesμμ VolumeOp
λ₯Ό μ¬μ©νλ©΄ μλμΌλ‘ κ΄λ¦¬λλ νΌμμ€ν΄νΈ λ³Όλ₯¨μ μμ±ν μ μμ΅λλ€.
1
2
3
4
dvop = dsl.VolumeOp(name="create_pvc",
resource_name="my-pvc-2",
size="5Gi",
modes=dsl.VOLUME_MODE_RWO)
μ€νΌλ μ΄μ
μ λ³Όλ₯¨μ μΆκ°νκ³ μΆλ€λ©΄ add_pvolumes
μ νΈμΆνμ¬ μ¬μ©νλ©΄ λ©λλ€.
1
download_data_op(year).add_pvolumes({"/data_processing": dvop.volumeΒ΄})
Kubeflowμ λΉνΈμΈ file_output
λ©μ»€λμ¦μ νμ΄νλΌμΈ μ¬μ΄μμ νΉμ λ‘컬 νμΌμ MinIOλ‘ μλμΌλ‘ μ μ‘ν μ μκ² ν©λλ€. file_output
μ μ¬μ©νμ¬ μ»¨ν
μ΄λ λ΄μ νμΌμ μΈ μλ μκ³ ContainerOp
μ νλΌλ―Έν°λ₯Ό νΉμ ν μλ μμ΅λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
fecth = kfp.dsl.ContainerOp(
name="download",
image="busybox",
command=["sh", "-c"],
arguments=[
"sleep 1;",
"mkdir -p /tmp/data;",
"wget " + data_url +
" -O /tmp/dat/result.csv"
],
file_outputs={"downloaded", "/tmp/data"}
)
Introduction to Kubeflow Pipelines Components
Argo: the Foundation of Pipelines
Kubeflow Pipelinesλ Kubenetesλ₯Ό μ¬μ©νλ 컨ν μ΄λ λ€μ΄ν°λΈ μν¬νλ‘μ° μμ§μΈ Argo Workflowsλ₯Ό λΉλν©λλ€. Kubeflowλ₯Ό μ€μΉν λλ λͺ¨λ Argo μ»΄ν¬λνΈκ° μ€μΉλλλ°μ. Kubeflow Pipelinesμ μ¬μ©νκΈ° μν΄ Argoλ₯Ό μ€μΉν νμλ μμ§λ§ Argo 컀맨λλΌμΈμ ν΅ν΄ νμ΄νλΌμΈμ λλ²κΉ ν λ μ¬μ©ν μ μμ΅λλ€. Argo μ€μΉλ λ€μκ³Ό κ°μ΅λλ€.
1
2
3
4
5
6
7
8
$ # Download the binary
$ curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64
$ # Make binary executable
$ chmod +x argo-linux-amd64
$ # Move binary to path
$ mv ./argo-linux-amd64 ~/bin/argo
μ€μΉκ° μλ£λλ€λ©΄ Manifestλ₯Ό μ΄μ©ν΄ Argo Podμ λμμ€λλ€.
1
2
$ kubectl create ns argo
$ kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml
μ΄μ λ€μμ λͺ λ Ήμ΄λ‘ Kubeflow λ€μμ€νμ΄μ€λ‘ νμ΄νλΌμΈμ λ³΄λΌ μ μμ΅λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ argo submit -n kubeflow --watch https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/hello-world.yaml
$ argo list -n kubeflowgg
NAME STATUS AGE DURATION PRIORITY
hello-world-hxkrs Succeeded 4h 26s 0
$ argo get hello-world-hxkrs -n kubeflow
Name: hello-world-hxkrs
Namespace: kubeflow
ServiceAccount: default
Status: Succeeded
Conditions:
PodRunning False
Completed True
Created: Sun Jul 03 16:57:04 +0900 (4 hours ago)
Started: Sun Jul 03 16:57:04 +0900 (4 hours ago)
Finished: Sun Jul 03 16:57:30 +0900 (4 hours ago)
Duration: 26 seconds
ResourcesDuration: 14s*cpu,4s*memory
STEP TEMPLATE PODNAME DURATION MESSAGE
β hello-world-hxkrs whalesay hello-world-hxkrs 15s
$ argo logs hello-world-hxkrs -n kubeflow
hello-world-hxkrs: time="2022-07-03T07:57:12.005Z" level=info msg="capturing logs" argo=true
hello-world-hxkrs: _____________
hello-world-hxkrs: < hello world >
hello-world-hxkrs: -------------
hello-world-hxkrs: \
hello-world-hxkrs: \
hello-world-hxkrs: \
hello-world-hxkrs: ## .
hello-world-hxkrs: ## ## ## ==
hello-world-hxkrs: ## ## ## ## ===
hello-world-hxkrs: /""""""""""""""""___/ ===
hello-world-hxkrs: ~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~
hello-world-hxkrs: \______ o __/
hello-world-hxkrs: \ \ __/
hello-world-hxkrs: \____\______/
$ argo delete hello-world-hxkrs -n kubeflow
Workflow 'hello-world-hxkrs' deleted
What Kubeflow Pipelines Adds to Argo Workflow
Argoλ₯Ό μ§μ μ¬μ©νλ €λ©΄ YAMLλ‘ μν¬νλ‘μ°λ₯Ό μ μν΄μΌ νκ³ μ½λλ₯Ό 컨ν μ΄λνν΄μΌ νλλ° λ§€μ° κΉλ€λ‘μ΄ μμ μ λλ€. λ°λ©΄ Kubeflow Pipelinesμ μ¬μ©νλ©΄ Python APIλ₯Ό μ΄μ©ν΄ νμ΄νλΌμΈμ μ μνκ³ μμ±ν μ μκΈ° λλ¬Έμ νΈνκ² μμ ν μ μμ΅λλ€. λ λμκ° ML κ΄λ ¨ μ»΄ν¬λνΈλ₯Ό μΆκ°νκΈ°μλ ν¨μ¬ μ©μ΄ν©λλ€.
Building a Pipeline Using Existing Images
Kubeflow Pipelinesμ μ΄λ―Έ λΉλλ Docker μ΄λ―Έμ§λ₯Ό μ΄μ©ν΄ μ¬λ¬ μΈμ΄λ‘ ꡬνλ κ²λ€μ μ€νν΄ μ€μΌμ€νΈλ μ΄μ νλ κΈ°λ₯μ κ°κ³ μμ΅λλ€.
λν Python λ΄μμ μ§μ Kubernetes ν¨μλ₯Ό μ¬μ©νλλ‘ Kubernetes ν΄λΌμ΄μΈνΈλ₯Ό μν¬νΈν μ μμ΅λλ€.
1
from kubernetes import client as k8s_client
κ·Έλ¦¬κ³ νμ΄νλΌμΈμμ μ€νμ ν λ²λ§ λ§λ€ μ μκΈ° λλ¬Έμ κΈ°μ‘΄μ μ€ν μ΄λ¦μ ν΅ν΄ ν΄λΉ μ€νμ μ¬μ©ν μλ μμ΅λλ€.
1
2
client = kfp.Client()
exp = client.get_experiment(experiment_name="mdupdate")
λ€μ μμ λ μ΄λ―Έ λΉλλμ΄ μλ μ΄λ―Έμ§λ€μ μ΄μ©νμ¬ νμ΄νλΌμΈμ μμ±νλ μ½λμ
λλ€. μλ μ½λμμ ν리λΉλλ 컨ν
μ΄λλ MINIO_*
νκ²½ λ³μλ‘ μ€μ λμ΄ μμ΄ add_env_variable
μ νΈμΆνμ¬ λ‘컬 MinIOλ₯Ό μ¬μ©ν μ μμ΅λλ€. κ·Έλ¦¬κ³ κ° λ¨κ³λ₯Ό λͺ
μνκΈ° μν΄ after
λ©μλλ₯Ό μ¬μ©ν μ μμ΅λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dsl.pipeline(
name='Recommender model update',
description='Demonstrate usage of pipelines for multi-step model update')
def recommender_pipeline():
# Load new data
data = dsl.ContainerOp(
name='updatedata',
image='lightbend/recommender-data-update-publisher:0.2') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
# Train the model
train = dsl.ContainerOp(
name='trainmodel',
image='lightbend/ml-tf-recommender:0.1') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))
train.after(data)
# Publish new model model
publish = dsl.ContainerOp(
name='publishmodel',
image='lightbend/recommender-model-publisher:0.2') \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL', value='http://minio-service.kubeflow.svc.cluster.local:9000')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \
.add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \
.add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS', value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \
.add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL', value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \
.add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL', value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))
publish.after(train)
μ΄ νμ΄νλΌμΈμ μ»΄νμΌνκ±°λ λ°λ‘ create_run_from_pipeline_func
μΌλ‘ μ€νν μ μμ΅λλ€.
1
2
3
4
from kfp import compiler
compiler.Compiler().compile(recommender_pipeline, "pipeline.tar.gz")
run = client.run_pipeline(exp.id, "pipeline1", "pipeline.tar.gz")
Kubeflow Pipeline Components
Kubeflow Pipelinesμ λ€λ₯Έ Kubernetes 리μμ€λ dataproc
κ³Ό κ°μ μΈλΆ μ€νΌλ μ΄μ
μ μ¬μ©νλ μ»΄ν¬λνΈλ μ 곡ν©λλ€. Kubeflow μ»΄ν¬λνΈλ ML ν΄μ ν¨ν€μ§νλ λμμ μ¬μ©ν 컨ν
μ΄λλ CRDλ₯Ό μΆμνν μ μμ΅λλ€.
func_to_container
κ°μ μ»΄ν¬λνΈλ Python μ½λλ‘ μ¬μ©ν μ μκ³ , μ΄λ€ μ»΄ν¬λνΈλ Kubeflowμ component.yaml
μμ€ν
μ μ¬μ©νμ¬ λΆλ¬μμΌ ν©λλ€. λ³Έ μ±
μμ μ μνλ Kubeflow μ»΄ν¬λνΈλ₯Ό μ λλ‘ μ¬μ©νλ λ°©λ²μ μ μ₯μμ νΉμ νκ·Έλ₯Ό load_components_from_file
μ μ΄μ©ν΄ λ€μ΄λ‘λνλ κ²μ
λλ€.
1
2
$ wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
$ tar -xvf 0.2.5.tar.gz
μ΄μ λ€μ΄λ‘λν μ»΄ν¬λνΈλ₯Ό λΆλ¬μ¬ μ μμ΅λλ€.
1
2
3
gcs_download_component = kfp.components.load_component_from_file(
"pipelines-0.2.5/components/google-cloud/storage/download/component.yaml"
)
GCS λ€μ΄λ‘λ μ»΄ν¬λνΈλ Google Cloud Storage κ²½λ‘λ₯Ό μ λ ₯νμ¬ νμΌμ λ€μ΄λ‘λ ν μ μκ² ν©λλ€.
1
2
3
dl_op = gcs_download_component(
gcs_path="gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv"
)
Advanced Topic in Pipelines
Conditional Execution of Pipeline Stages
Kubeflow Pipelinesμμλ dsl.Condition
μ μ΄μ©ν΄μ 쑰건μ λ§κ² νμ΄νλΌμΈμ μ€νν μ μμ΅λλ€. ν¨μ λͺ κ°λ₯Ό μ€νΌλ μ΄μ
μΌλ‘ λ°κΎΈκ² μ΅λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath
@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
"""Generate a random number between minimum and maximum (inclusive)"""
import random
result = random.randint(minimum, maximum)
print(result)
return result
@func_to_container_op
def process_small_op(data: int):
"""Process small numbers."""
print("Processing small result", data)
return
@func_to_container_op
def process_medium_op(data: int):
"""Process medium numbers."""
print("Processing medium result", data)
return
@func_to_container_op
def process_large_op(data: int):
"""Process large numbers."""
print("Processing large result", data)
return
μ¬κΈ°μ dsl.Condition
μ μ΄μ©ν΄ 쑰건μ λ§λ μ€νΌλ μ΄μ
μ μ€νν μ μμ΅λλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dsl.pipeline(
name="Conditional execution pipeline",
description="Shows how to use dsl.Condition()."
)
def conditional_pipeline():
number = get_random_int_op(0, 100).output
with dsl.Condition(number < 10):
process_small_op(number)
with dsl.Condition(number > 10 and number < 50):
process_medium_op(number)
with dsl.Condition(number > 50):
process_large_op(number)
kfp.Client().create_run_from_pipeline_func(conditional_pipeline, arguments={})
μ€νμ΄ μλ£λλ©΄ λ€μκ³Ό κ°μ μ€ν κ·Έλνλ₯Ό μ»μ μ μμ΅λλ€.
Running Pipelines on Schedule
νμ΄νλΌμΈμ μ€μΌμ€λ§ν μλ μμ΅λλ€. νμ΄νλΌμΈμ ν λ² μ λ‘λ ν λ€μ μ€ν νμ μ βRecurringβμΌλ‘ μ€μ ν λ€μ μ€μΌμ€λ§ν μ μμ΅λλ€.