Connect the SDK to the API
Overview
The Kubeflow Pipelines SDK provides a Python interface to interact with the Kubeflow Pipelines API. This guide will show you how to connect the SDK to the Pipelines API in various scenarios.
Kubeflow Platform
When running Kubeflow Pipelines as part of a multi-user Kubeflow Platform, how you authenticate the Pipelines SDK will depend on whether you are running your code inside or outside the cluster.
Kubeflow Platform - Inside the Cluster
Click to expand
A ServiceAccount token volume can be mounted to a Pod running in the same cluster as Kubeflow Pipelines. The Kubeflow Pipelines SDK can use this token to authenticate itself with the Kubeflow Pipelines API.
The following Python code will create a kfp.Client()
using a ServiceAccount token for authentication:
import kfp
# by default, when run from inside a Kubernetes cluster:
# - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
# - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
ServiceAccount Token Volume
To use the preceding code, you will need to run it from a Pod that has a ServiceAccount token volume mounted.
You may manually add a volume
and volumeMount
to your PodSpec or use Kubeflow’s PodDefaults
to inject the required volume.
Option 1 - manually add a volume to your PodSpec:
apiVersion: v1
kind: Pod
metadata:
name: access-kfp-example
spec:
containers:
- image: hello-world:latest
name: hello-world
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
Option 2 - use a PodDefault
to inject the volume:
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: access-ml-pipeline
namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
desc: Allow access to Kubeflow Pipelines
selector:
matchLabels:
access-ml-pipeline: "true"
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
Tip
PodDefaults
are namespaced resources, so you need to create one inside each of your KubeflowProfile
namespaces.- The Notebook Spawner UI will be aware of any
PodDefaults
in the user’s namespace (they are selectable under the “configurations” section).
RBAC Authorization
The Kubeflow Pipelines API respects Kubernetes RBAC, and will check RoleBindings assigned to the ServiceAccount before allowing it to take Pipelines API actions.
For example, this RoleBinding allows Pods with the default-editor
ServiceAccount in namespace-2
to manage Kubeflow Pipelines in namespace-1
:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: allow-namespace-2-kubeflow-edit
## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
namespace: namespace-1
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kubeflow-edit
subjects:
- kind: ServiceAccount
name: default-editor
## the ServiceAccount lives in `namespace-2`
namespace: namespace-2
Tip
- Review the ClusterRole called
aggregate-to-kubeflow-pipelines-edit
for a list of some importantpipelines.kubeflow.org
RBAC verbs. - Kubeflow Notebooks pods run as the
default-editor
ServiceAccount by default, so the RoleBindings fordefault-editor
apply to them and give them access to submit pipelines in their own namespace. - For more information about profiles, see the Manage Profile Contributors guide.
Kubeflow Platform - Outside the Cluster
Click to expand
Kubeflow Notebooks
As Kubeflow Notebooks run on Pods inside the cluster, they can NOT use the following method to authenticate the Pipelines SDK, see the inside the cluster method.The precise method to authenticate from outside the cluster will depend on how you deployed Kubeflow Platform. Because most distributions use Dex as their identity provider, this example will show you how to authenticate with Dex using a Python script.
You will need to make the Kubeflow Pipelines API accessible on the remote machine. If your Kubeflow Istio gateway is already exposed, skip this step and use that URL directly.
The following command will expose the istio-ingressgateway
service on localhost:8080
:
# TIP: svc/istio-ingressgateway may be called something else,
# or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80
The following Python code defines a KFPClientManager()
class that creates an authenticated kfp.Client()
by interacting with Dex:
import re
from urllib.parse import urlsplit, urlencode
import kfp
import requests
import urllib3
class KFPClientManager:
"""
A class that creates `kfp.Client` instances with Dex authentication.
"""
def __init__(
self,
api_url: str,
dex_username: str,
dex_password: str,
dex_auth_type: str = "local",
skip_tls_verify: bool = False,
):
"""
Initialize the KfpClient
:param api_url: the Kubeflow Pipelines API URL
:param skip_tls_verify: if True, skip TLS verification
:param dex_username: the Dex username
:param dex_password: the Dex password
:param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
"""
self._api_url = api_url
self._skip_tls_verify = skip_tls_verify
self._dex_username = dex_username
self._dex_password = dex_password
self._dex_auth_type = dex_auth_type
self._client = None
# disable SSL verification, if requested
if self._skip_tls_verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ensure `dex_default_auth_type` is valid
if self._dex_auth_type not in ["ldap", "local"]:
raise ValueError(
f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
)
def _get_session_cookies(self) -> str:
"""
Get the session cookies by authenticating against Dex
:return: a string of session cookies in the form "key1=value1; key2=value2"
"""
# use a persistent session (for cookies)
s = requests.Session()
# GET the api_url, which should redirect to Dex
resp = s.get(
self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code == 200:
pass
elif resp.status_code == 403:
# if we get 403, we might be at the oauth2-proxy sign-in page
# the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
url_obj = urlsplit(resp.url)
url_obj = url_obj._replace(
path="/oauth2/start", query=urlencode({"rd": url_obj.path})
)
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
else:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
)
# if we were NOT redirected, then the endpoint is unsecured
if len(resp.history) == 0:
# no cookies are needed
return ""
# if we are at `../auth` path, we need to select an auth type
url_obj = urlsplit(resp.url)
if re.search(r"/auth$", url_obj.path):
url_obj = url_obj._replace(
path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
)
# if we are at `../auth/xxxx/login` path, then we are at the login page
if re.search(r"/auth/.*/login$", url_obj.path):
dex_login_url = url_obj.geturl()
else:
# otherwise, we need to follow a redirect to the login page
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
)
dex_login_url = resp.url
# attempt Dex login
resp = s.post(
dex_login_url,
data={"login": self._dex_username, "password": self._dex_password},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
)
# if we were NOT redirected, then the login credentials were probably invalid
if len(resp.history) == 0:
raise RuntimeError(
f"Login credentials are probably invalid - "
f"No redirect after POST to: {dex_login_url}"
)
# if we are at `../approval` path, we need to approve the login
url_obj = urlsplit(resp.url)
if re.search(r"/approval$", url_obj.path):
dex_approval_url = url_obj.geturl()
# approve the login
resp = s.post(
dex_approval_url,
data={"approval": "approve"},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
)
return "; ".join([f"{c.name}={c.value}" for c in s.cookies])
def _create_kfp_client(self) -> kfp.Client:
try:
session_cookies = self._get_session_cookies()
except Exception as ex:
raise RuntimeError(f"Failed to get Dex session cookies") from ex
# monkey patch the kfp.Client to support disabling SSL verification
# kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
original_load_config = kfp.Client._load_config
def patched_load_config(client_self, *args, **kwargs):
config = original_load_config(client_self, *args, **kwargs)
config.verify_ssl = not self._skip_tls_verify
return config
patched_kfp_client = kfp.Client
patched_kfp_client._load_config = patched_load_config
return patched_kfp_client(
host=self._api_url,
cookies=session_cookies,
)
def create_kfp_client(self) -> kfp.Client:
"""Get a newly authenticated Kubeflow Pipelines client."""
return self._create_kfp_client()
The following Python code shows how to use the KFPClientManager()
class to create a kfp.Client()
:
# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
api_url="http://localhost:8080/pipeline",
skip_tls_verify=True,
dex_username="user@example.com",
dex_password="12341234",
# can be 'ldap' or 'local' depending on your Dex configuration
dex_auth_type="local",
)
# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
Standalone Kubeflow Pipelines
When running Kubeflow Pipelines in standalone mode, there will be no concept of multi-user authentication or RBAC. The specific steps will depend on whether you are running your code inside or outside the cluster.
Standalone KFP - Inside the Cluster
Click to expand
When running inside the Kubernetes cluster, you may connect Pipelines SDK directly to the ml-pipeline-ui
service via cluster-internal service DNS resolution.
Warning
In standalone deployments of Kubeflow Pipelines, there is no authentication enforced on theml-pipeline-ui
service.When running in the same namespace as Kubeflow:
import kfp
client = kfp.Client(host="http://ml-pipeline-ui:80")
print(client.list_experiments())
When running in a different namespace to Kubeflow:
import kfp
# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow"
client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")
print(client.list_experiments())
Standalone KFP - Outside the Cluster
Click to expand
When running outside the Kubernetes cluster, you may connect Pipelines SDK to the ml-pipeline-ui
service by using kubectl port-forwarding.
Warning
In standalone deployments of Kubeflow Pipelines, there is no authentication enforced on theml-pipeline-ui
service.Step 1: run the following command on your external system to initiate port-forwarding:
# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80
Step 2: the following code will create a kfp.Client()
against your port-forwarded ml-pipeline-ui
service:
import kfp
client = kfp.Client(host="http://localhost:3000")
print(client.list_experiments())
Feedback
Was this page helpful?
Thank you for your feedback!
We're sorry this page wasn't helpful. If you have a moment, please share your feedback so we can improve.