Skip to content

A collection of awesome examples and tips regarding KubeFlow Pipeline.

License

Notifications You must be signed in to change notification settings

ozora-ogino/awesome-kfp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Awesome KubeFlow Pipeline Awesome

A collection of awesome examples and tips regarding KubeFlow Pipeline.

Examples

Tips

Use additional package

You can specify additional packages as package_to_install. This enables to use additional packages without building new docker image.

slack_post_op = func_to_container_op(slack_post, packages_to_install=["slack_sdk"])

You can use dsl.Condition like an if statement in a pipeline function.

with dsl.Condition(flip_task.output == "heads"):
	# Will be executed in the case flip_task.output is 'heads'.
	print_op("You got heads!!")

ExitHandler will be executed whether the pipeline succeeds or fails.

Example

exit_op = ContainerOp(...)
with ExitHandler(exit_op):
	op1 = ContainerOp(...)
	op2 = ContainerOp(...)

Note: ExitHandler requires ContainerOp as a argument, not ResourceOp or something else. If you want to create/delete/apply k8s resources with ExitHandler, please see this example.

ParallelFor represents a parallel for loop over a static set of items.

Example:

Simple literal example. In this case op1 would be executed twice, once with case args=['echo 1'] and once with case args=['echo 2']

with dsl.ParallelFor([{'apple': 2, 'banana': 4}, {'apple': 3, 'banana': 20}]) as item:
	op1 = ContainerOp(..., args=['echo apple:{}'.format(item.apple)])
	op2 = ContainerOp(..., args=['echo banana:{}'.format(item.banana])

In this example, the previous task's output will be used for loop arguments.

list_task = list_generator_op(parallelism)
parallel_tasks = dsl.ParallelFor(list_task.output)
with parallel_tasks as msg:
	print_op(msg)

Set the number of parallel tasks

You can set the number of prallel tasks inside of the pipeline function.

def pipeline(parallelism: int):
    # set the number of parallel
    default_conf = kfp.dsl.get_pipeline_conf()
    default_conf.set_parallelism(2)

Please refer official document or our example.

Caching

We recommend to read KubeFlow official documents about caching.

Disabling/Enabling Caching in the namespace

For disabling:

# Make sure mutatingwebhookconfiguration exists in your cluster
export NAMESPACE=<Namespace where KFP is installed>
kubectl get mutatingwebhookconfiguration cache-webhook-${NAMESPACE}

kubectl patch mutatingwebhookconfiguration cache-webhook-${NAMESPACE} --type='json' -p='[{"op":"replace", "path": "/webhooks/0/rules/0/operations/0", "value": "DELETE"}]'

For enabling:

kubectl patch mutatingwebhookconfiguration cache-webhook-${NAMESPACE} --type='json' -p='[{"op":"replace", "path": "/webhooks/0/rules/0/operations/0", "value": "CREATE"}]'

Managing caching staleness

To control the maximum staleness of the reused cached data, you can set the step’s max_cache_staleness parameter which is in RFC3339 Duration format (so 10 days = "P30D").

For disabling cache of ContainerOP:

task = dsl.ContainerOp(...)
task.execution_options.caching_strategy.max_cache_staleness = "P30D"

Because ResourceOp doesn't have execution_options parameter, you can useadd_pod_annotation instead:

rop = dsl.ResourceOp(...)
rop.add_pod_annotation("pipelines.kubeflow.org/max_cache_staleness", "P0D")

If you're using KubeFlow Pipeline SDK v2, you can use enable_caching for both of ContainerOp and ResourceOp:

task = dsl.ContainerOp(...)
task.set_caching_options(False)

rop = dsl.ResourceOp(...)
rop.set_caching_options(False)

See our example for more information.

Contributing

Interested in contributing? Awesome😎

Fork and run make init to setup.

Then create PR and let us review!

See the Google's best practices which we follow in this repository.

About

A collection of awesome examples and tips regarding KubeFlow Pipeline.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published