Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Create KFP artifacts using the SDK #333

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions backend/kale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class PipelineParam(NamedTuple):
param_value: Any


class Artifact(NamedTuple):
"""A Step artifact."""
name: str
path: str


from .step import Step, StepConfig
from .pipeline import Pipeline, PipelineConfig, VolumeConfig
from .compiler import Compiler
Expand Down
35 changes: 33 additions & 2 deletions backend/kale/common/runutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import signal
import logging

from typing import Callable
from kale.common import utils
from typing import Callable, Dict

from kale.common import utils, kfputils


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -68,3 +70,32 @@ def _ttl():
return res
return _ttl
return _decorator_ttl


def link_artifacts(artifacts: Dict, link=True):
"""Link a series of artifacts to mlpipeline-ui-metadata.json.

We use the `link` argument to avoid writing to
`/tmp/mlpipeline-ui-metadata.json` when executing locally, otherwise
we would get a permission error.
"""
if artifacts:
log.info("Registering step's artifacts: %s" %
", ".join("'%s'" % name for name in artifacts.keys()))
else:
log.info("This step has no artifacts")

for name, path in artifacts.items():
if not os.path.exists(path):
raise RuntimeError("Filepath '%s' for artifact '%s' does not"
" exist." % (path, name))
if not os.path.isabs(path):
raise ValueError("Path '%s' for artifact '%s' is a relative path."
" Please provide an absolute path."
% (path, name))
if os.path.isdir(path):
raise RuntimeError("Cannot create an artifact from path '%s':"
" it is a folder." % path)
if link:
# FIXME: Currently this supports just HTML artifacts
kfputils.update_uimetadata(name)
4 changes: 2 additions & 2 deletions backend/kale/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.


from .api import pipeline, step
from kale.common import logutils
from .api import pipeline, step, artifact

from kale.common import logutils

logutils.get_or_create_logger(module=__name__, name="sdk")
del logutils
40 changes: 39 additions & 1 deletion backend/kale/sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import argparse

from kale.common import rokutils, utils
from kale import Compiler, Step, PythonProcessor, PipelineConfig, StepConfig
from kale import (Compiler, Step, PythonProcessor, PipelineConfig, StepConfig,
Artifact)


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -106,3 +107,40 @@ def _parse_cli_args():
help=("Compile the pipeline to KFP DSL."
" Requires --kfp."))
return parser.parse_args()


def artifact(name: str, path: str):
"""Decorate a step to create a KFP HTML artifact.

Apply this decorator to a step to create a Kubeflow Pipelines artifact
(https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/).
In case the path does not point to a valid file, the step will fail with
an error.

To generate more than one artifact per step, apply the same decorator
multiple time, as shown in the example below.

```python
@artifact(name="artifact1", path="./figure.html")
@artifact(name="artifact2", path="./plot.html")
@step(name="artifact-generator")
def foo():
# ...
# save something to plot.html and figure.html
# ...
```

**Note**: Currently the only supported format is HTML.

Args:
name: Artifact name
path: Absolute path to an HTML file
"""

def _(step: Step):
if not isinstance(step, Step):
raise ValueError("You should decorate functions that are decorated"
" with the @step decorator!")
step.artifacts.append(Artifact(name, path))
return step
return _
9 changes: 6 additions & 3 deletions backend/kale/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

from typing import Any, Dict, List, Callable, Union

from kale import PipelineParam
from kale.common import astutils
from kale.marshal import Marshaller
from kale import PipelineParam, Artifact
from kale.common import astutils, runutils
from kale.config import Config, Field, validators

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,6 +53,7 @@ def __init__(self,
self.source = source
self.ins = ins or []
self.outs = outs or []
self.artifacts: List[Artifact] = list()

self.config = StepConfig(**kwargs)

Expand All @@ -76,8 +77,10 @@ def run(self, pipeline_parameters_values: Dict[str, PipelineParam]):
marshaller = Marshaller(func=self.source, ins=self.ins, outs=self.outs,
parameters=_params, marshal_dir='.marshal/')
marshaller()
log.info("%s Successfully run step '%s'... %s", "-" * 10, self.name,
log.info("%s Successfully ran step '%s'... %s", "-" * 10, self.name,
"-" * 10)
runutils.link_artifacts({a.name: a.path for a in self.artifacts},
link=False)

@property
def name(self):
Expand Down
6 changes: 6 additions & 0 deletions backend/kale/templates/pipeline_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def auto_generated_pipeline({%- for arg in pipeline.pps_names -%}
_kale_output_artifacts.update({'mlpipeline-ui-metadata': '/tmp/mlpipeline-ui-metadata.json'})
_kale_output_artifacts.update({'{{ step.name }}': '/{{ step.name }}.html'})
{%- endif %}
{%- if pipeline.processor.id == "py" and step.artifacts and step.name != "final_auto_snapshot" and step.name != "pipeline_metrics" %}
_kale_output_artifacts.update({'mlpipeline-ui-metadata': '/tmp/mlpipeline-ui-metadata.json'})
{%- for artifact in step.artifacts %}
_kale_output_artifacts.update({'{{ artifact["name"] }}': '{{ artifact["path"] }}'})
{%- endfor %}
{%- endif %}
_kale_{{ step.name }}_task.output_artifact_paths.update(_kale_output_artifacts)
_kale_{{ step.name }}_task.add_pod_label("pipelines.kubeflow.org/metadata_written", "true")
_kale_dep_names = (_kale_{{ step.name }}_task.dependent_names +
Expand Down
22 changes: 17 additions & 5 deletions backend/kale/templates/py_function_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ def {{ step.name }}({%- for arg in step.pps_names -%}
{% endif %}


{%- if step.config.timeout %}from kale.common.runutils import ttl{% endif %}
from kale.marshal.decorator import marshal
{% if step.config.timeout %}from kale.common.runutils import ttl as _kale_ttl{% endif %}
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {
_kale_pipeline_parameters = {
{%- if step.pps_names|length %}
{%- for arg in step.pps_names -%}
"{{ arg }}": {{ arg }}
Expand All @@ -33,12 +34,23 @@ def {{ step.name }}({%- for arg in step.pps_names -%}
{%- endfor -%}
{%- endif %}}

{%- if step.config.timeout %}@ttl({{ step.config.timeout }}){% endif %}
@marshal({{ step.ins }}, {{ step.outs }}, pipeline_parameters, "{{ marshal_path }}")
{% if step.config.timeout %}@_kale_ttl({{ step.config.timeout }}){% endif %}
@_kale_marshal({{ step.ins }}, {{ step.outs }}, _kale_pipeline_parameters, "{{ marshal_path }}")
{{ step.rendered_source|indent(4, True) }}

{{ step.source.__name__ }}()

_kale_artifacts = {
{%- if step.artifacts|length %}
{%- for artifact in step.artifacts -%}
"{{ artifact.name }}": "{{ artifact.path }}"
{%- if loop.index < step.pps_names|length -%},
{%- endif -%}
{%- endfor -%}
{%- endif %}}

_kale_link_artifacts(_kale_artifacts)

{%- if autosnapshot %}
{%- if step.name == 'final_auto_snapshot' %}
from kale.common import rokutils as _kale_rokutils
Expand Down
44 changes: 32 additions & 12 deletions backend/kale/tests/assets/kfp_dsl/pipeline_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ def step1():
"",
before=True)

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {}
_kale_pipeline_parameters = {}

@marshal([], ['data'], pipeline_parameters, "/marshal")
@_kale_marshal([], ['data'], _kale_pipeline_parameters, "/marshal")
def step1():
return 10

step1()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_rok_snapshot_task = _kale_rokutils.snapshot_pipeline_step(
"test",
"step1",
Expand All @@ -50,15 +55,20 @@ def step3(b: str):
"",
before=True)

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {"b": b}
_kale_pipeline_parameters = {"b": b}

@marshal(['b', 'data'], [], pipeline_parameters, "/marshal")
@_kale_marshal(['b', 'data'], [], _kale_pipeline_parameters, "/marshal")
def step3(st, st2):
print(st)

step3()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_rok_snapshot_task = _kale_rokutils.snapshot_pipeline_step(
"test",
"step3",
Expand All @@ -81,16 +91,21 @@ def step2(a: int, c: int):
"",
before=True)

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {"a": a, "c": c}
_kale_pipeline_parameters = {"a": a, "c": c}

@marshal(['c', 'a', 'data'], ['res'], pipeline_parameters, "/marshal")
@_kale_marshal(['c', 'a', 'data'], ['res'], _kale_pipeline_parameters, "/marshal")
def step2(var1, var2, data):
print(var1 + var2)
return 'Test'

step2()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_rok_snapshot_task = _kale_rokutils.snapshot_pipeline_step(
"test",
"step2",
Expand All @@ -105,15 +120,20 @@ def final_auto_snapshot():
from kale.common import mlmdutils as _kale_mlmdutils
_kale_mlmdutils.init_metadata()

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {}
_kale_pipeline_parameters = {}

@marshal([], [], pipeline_parameters, "/marshal")
@_kale_marshal([], [], _kale_pipeline_parameters, "/marshal")
def _no_op():
pass

_no_op()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
from kale.common import rokutils as _kale_rokutils
_kale_mlmdutils.call("link_input_rok_artifacts")
_rok_snapshot_task = _kale_rokutils.snapshot_pipeline_step(
Expand Down
33 changes: 24 additions & 9 deletions backend/kale/tests/assets/kfp_dsl/simple_data_passing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,67 @@ def step1():
from kale.common import mlmdutils as _kale_mlmdutils
_kale_mlmdutils.init_metadata()

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {}
_kale_pipeline_parameters = {}

@marshal([], ['_b', '_a'], pipeline_parameters, "/marshal")
@_kale_marshal([], ['_b', '_a'], _kale_pipeline_parameters, "/marshal")
def step1():
a = 1
b = 2
return a, b

step1()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_kale_mlmdutils.call("mark_execution_complete")


def step2():
from kale.common import mlmdutils as _kale_mlmdutils
_kale_mlmdutils.init_metadata()

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {}
_kale_pipeline_parameters = {}

@marshal(['_b', '_a'], ['_c'], pipeline_parameters, "/marshal")
@_kale_marshal(['_b', '_a'], ['_c'], _kale_pipeline_parameters, "/marshal")
def step2(a, b):
c = a + b
print(c)
return c

step2()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_kale_mlmdutils.call("mark_execution_complete")


def step3():
from kale.common import mlmdutils as _kale_mlmdutils
_kale_mlmdutils.init_metadata()

from kale.marshal.decorator import marshal
from kale.marshal.decorator import marshal as _kale_marshal
from kale.common.runutils import link_artifacts as _kale_link_artifacts

pipeline_parameters = {}
_kale_pipeline_parameters = {}

@marshal(['_a', '_c'], [], pipeline_parameters, "/marshal")
@_kale_marshal(['_a', '_c'], [], _kale_pipeline_parameters, "/marshal")
def step3(a, c):
d = c + a
print(d)

step3()

_kale_artifacts = {}

_kale_link_artifacts(_kale_artifacts)
_kale_mlmdutils.call("mark_execution_complete")


Expand Down
Loading