Use Platform-Specific Features
Overview
One of the benefits of KFP is cross-platform portability. The KFP SDK compiles pipeline definitions to IR YAML which can be read and executed by different backends, including the Kubeflow Pipelines open source backend and Vertex AI Pipelines.
For cases where features are not portable across platforms, users may author pipelines with platform-specific functionality via KFP SDK platform-specific plugin libraries. In general, platform-specific plugin libraries provide functions that act on tasks similarly to task-level configuration methods provided by the KFP SDK directly.
kfp-kubernetes
Currently, the only KFP SDK platform-specific plugin library is kfp-kubernetes
, which is supported by the Kubeflow Pipelines open source backend and enables direct access to some Kubernetes resources and functionality.
For more information, see the kfp-kubernetes
documentation .
Kubernetes PersistentVolumeClaims
In this example we will use kfp-kubernetes
to create a PersistentVolumeClaim (PVC), use the PVC to pass data between tasks, and then delete the PVC.
We will assume you have basic familiarity with PersistentVolume
and PersistentVolumeClaim
resources in Kubernetes, in addition to authoring components, and authoring pipelines in KFP.
Step 1: Install the kfp-kubernetes
library
Run the following command to install the kfp-kubernetes
library:
pip install kfp[kubernetes]
Step 2: Create components that read/write to the mount path
Create two simple components that read and write to a file in the /data
directory.
In a later step, we will mount a PVC volume to the /data
directory.
from kfp import dsl
@dsl.component
def producer() -> str:
with open('/data/file.txt', 'w') as file:
file.write('Hello world')
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
@dsl.component
def consumer() -> str:
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
Step 3: Dynamically provision a PVC using CreatePVC
Now that we have our components, we can begin constructing a pipeline.
We need a PVC to mount, so we will create one using the kubernetes.CreatePVC
pre-baked component:
from kfp import kubernetes
@dsl.pipeline
def my_pipeline():
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteMany'],
size='5Gi',
storage_class_name='standard',
)
This component provisions a 5GB PVC from the StorageClass 'standard'
with the ReadWriteMany
access mode.
The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix -my-pvc
. The CreatePVC
component returns this name as the output 'name'
.
Step 4: Read and write data to the PVC
Next, we’ll use the mount_pvc
task modifier with the producer
and consumer
components.
We schedule task2
to run after task1
so the components don’t read and write to the PVC at the same time.
# write to the PVC
task1 = producer()
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
# read to the PVC
task2 = consumer()
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/reused_data',
)
task2.after(task1)
Step 5: Delete the PVC
Finally, we can schedule deletion of the PVC after task2
finishes to clean up the Kubernetes resources we created.
delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']
).after(task2)
For the full pipeline and more information, see a similar example in the kfp-kubernetes
documentation.
Feedback
Was this page helpful?
Thank you for your feedback!
We're sorry this page wasn't helpful. If you have a moment, please share your feedback so we can improve.