In [None]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [3]:
# Install Pipeline SDK - This only needs to be ran once in the enviroment. 
# you can find the latest package @ https://github.com/kubeflow/pipelines/releases
#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.20/kfp.tar.gz'
#!pip3 install $KFP_PACKAGE --upgrade

In [2]:
import kfp
import kfp.dsl as dsl
from kfp.gcp import use_gcp_secret
from kubernetes import client as k8s_client
from kfp import compiler
from kfp import notebook
from kfp import components as comp

In [3]:
PROJECT_NAME='chavoshi-dev-2'
DLMODEL_IMAGE = 'tensorflow/tensorflow:1.12.0-py3'
#DLMODEL_IMAGE = 'tensorflow/tensorflow:1.12.0-py3-gpu'
GCLOUD_SDK = 'google/cloud-sdk:latest'
NOOP_IMAGE = 'ubuntu:16.04'
EXPERIMENT_NAME = 'Image_classification'
LOCAL_PATH = '/mnt/vol'
IMAGE_FOLDER = 'small_bolt_images'
IMAGE_SOURCE = 'gs://cisco-live-2019-demo/' + IMAGE_FOLDER
OUTPUT_DIR = 'gs://chavoshi-dev-mlpipeline/%s' % EXPERIMENT_NAME # Such as gs://bucket/objact/path
BASE_IMAGE='gcr.io/%s/pusherbase:latest' % PROJECT_NAME
TARGET_IMAGE='gcr.io/%s/pusher:latest' % PROJECT_NAME

In [1]:
from kfp.onprem import mount_pvc

In [2]:
#def copy_source_data_op(step_name='load-data'):
# return dsl.ContainerOp(
# name = step_name,
# image = GCLOUD_SDK,
# command=['sh', '-c'],
# arguments = [ 'rm -rf ' +LOCAL_PATH+ '/* ' + '&& gsutil -m cp -r -n '+IMAGE_SOURCE+' '+LOCAL_PATH +' && rm -rf /mnt/vol/saved_model && rm -rf /mnt/vol/retrain_logs && gcloud auth activate-service-account --key-file /secret/gcp-credentials/user-gcp-sa.json && gsutil -m rm -r gs://test-gtc-demo-2019/retrain_logs/* 2> /dev/null || true']
# ).add_volume(k8s_client.V1Volume(name='workdir', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
# ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')).apply(use_gcp_secret('user-gcp-sa'))
def copy_source_data_op(step_name='load-data'):
 return dsl.ContainerOp(
 name = step_name,
 image = GCLOUD_SDK,
 command=['sh', '-c'],
 arguments = [ 'rm -rf ' +LOCAL_PATH+ '/* ' + '&& gsutil -m cp -r -n '+IMAGE_SOURCE+' '+LOCAL_PATH +' && rm -rf /mnt/vol/saved_model && rm -rf /mnt/vol/retrain_logs && gcloud auth activate-service-account --key-file /secret/gcp-credentials/user-gcp-sa.json && gsutil -m rm -r gs://test-gtc-demo-2019/retrain_logs/* 2> /dev/null || true']
 ).apply(mount_pvc(pvc_name='pvccc', volume_mount_path=LOCAL_PATH)).apply(use_gcp_secret('user-gcp-sa'))


In [5]:
print(OUTPUT_DIR, BASE_IMAGE)

gs://chavoshi-dev-mlpipeline/Image_classification gcr.io/chavoshi-dev-2/pusherbase:latest


In [6]:
%%docker {BASE_IMAGE} {OUTPUT_DIR}
FROM tensorflow/tensorflow:1.12.0-py3
RUN pip3 install tensorflow_hub &&\
 curl -O https://raw.githubusercontent.com/tensorflow/hub/master/examples/image_retraining/retrain.py
ENTRYPOINT ["python", "retrain.py"]

2019-05-14 12:24:47:INFO:Checking path: gs://chavoshi-dev-mlpipeline/Image_classification...
2019-05-14 12:24:47:INFO:Generate build files.
2019-05-14 12:24:47:INFO:Start a kaniko job for build.
2019-05-14 12:24:47:INFO:Cannot Find local kubernetes config. Trying in-cluster config.
2019-05-14 12:24:47:INFO:Initialized with in-cluster config.
2019-05-14 12:24:52:INFO:5 seconds: waiting for job to complete
2019-05-14 12:24:57:INFO:10 seconds: waiting for job to complete
2019-05-14 12:25:02:INFO:15 seconds: waiting for job to complete
2019-05-14 12:25:07:INFO:20 seconds: waiting for job to complete
2019-05-14 12:25:12:INFO:25 seconds: waiting for job to complete
2019-05-14 12:25:17:INFO:30 seconds: waiting for job to complete
2019-05-14 12:25:22:INFO:35 seconds: waiting for job to complete
2019-05-14 12:25:27:INFO:40 seconds: waiting for job to complete
2019-05-14 12:25:32:INFO:45 seconds: waiting for job to complete
2019-05-14 12:25:37:INFO:50 seconds: waiting for job to complete
2019-05

In [7]:
print(LOCAL_PATH, IMAGE_FOLDER)

/mnt/vol small_bolt_images


In [8]:
# this first step is to create buttlenecks, by setting training steps to zero 
# this is done separately as in the following steps we may choose to run multiple 
# steps with various hyper parameters. 

def pre_process_op(step_name='preprocess-data'):
 return dsl.ContainerOp(
 name = step_name,
 image = BASE_IMAGE,
 arguments = [
 '--image_dir', LOCAL_PATH+'/'+IMAGE_FOLDER,
 '--output_labels', LOCAL_PATH+'/output_labels.txt',
 '--summaries_dir', LOCAL_PATH+'/retrain_logs',
 '--how_many_training_steps', 0,
 '--learning_rate', 0.01,
 '--bottleneck_dir', LOCAL_PATH+'/bottleneck',
 '--tfhub_module', 'https://tfhub.dev/google/imagenet/mobilenet_v2_140_224/classification/2',
 #'--saved_model_dir', LOCAL_PATH+'/saved_model',
 ]
 ).add_volume(k8s_client.V1Volume(name='workdir', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
 ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')
 ).apply(use_gcp_secret('user-gcp-sa')
 #).set_gpu_limit('1')
 ).set_cpu_request('2')

In [9]:
# multiple instace of training can run in paralel with various hyper parameters ex learning rate 
# however the same tfhub module should be used as in buttle neck creation step
def train_op(step_name='train'):
 return dsl.ContainerOp(
 name = step_name,
 image = BASE_IMAGE,
 arguments = [
 '--image_dir', LOCAL_PATH+'/'+IMAGE_FOLDER,
 '--output_labels', LOCAL_PATH+'/output_labels.txt',
 '--summaries_dir', LOCAL_PATH+'/retrain_logs',
 '--how_many_training_steps', 10,
 '--learning_rate', 0.01,
 '--bottleneck_dir', LOCAL_PATH+'/bottleneck',
 '--tfhub_module', 'https://tfhub.dev/google/imagenet/mobilenet_v2_140_224/classification/2',
 '--saved_model_dir', LOCAL_PATH+'/saved_model',
 #'--saved_model_dir', OUTPUT_DIR+'/BOLT/saved_model',

 ]
 ).add_volume(k8s_client.V1Volume(name='workdir', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
 ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')
 ).apply(use_gcp_secret('user-gcp-sa')
 ).set_gpu_limit('1')
 #).set_cpu_request('2')

In [10]:
def tensorboard_op2(step_name='tensorboard'):
 return dsl.ContainerOp(
 name = step_name,
 image = GCLOUD_SDK,
 command=['sh', '-c'],
 arguments = ['''echo '{"outputs": [{"source": "gs://test-gtc-demo-2019/retrain_logs", 
 "type": "tensorboard"}]}'>/mlpipeline-ui-metadata.json && gcloud auth activate-service-account --key-file '/secret/gcp-credentials/user-gcp-sa.json' && gsutil -m cp -R mnt/vol/retrain_logs gs://test-gtc-demo-2019 ''']
 ).add_volume(
 k8s_client.V1Volume(
 name='workdir', 
 persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
 ).add_volume_mount(
 k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')
 ).apply(use_gcp_secret('user-gcp-sa'))



In [11]:
def tensorboard_func():
 from tensorflow.python.lib.io import file_io
 import json
 
 # Exports a sample tensorboard:
 metadata = {
 'outputs' : [{
 'type': 'tensorboard',
 'source': 'gs://test-gtc-demo-2019/retrain_logs',
 }]
 }
 
 with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
 json.dump(metadata, f) 

 import os
 
 #TODO: copy training files
 
tensorboard_op = comp.func_to_container_op(tensorboard_func, base_image='tensorflow/tensorflow:1.12.0-py3')

In [None]:
# this step simply changes the permissions on the drive to make accessible jupyter hub and other locations
def tflite_transform_op():
 import tensorflow as tf

 converter = tf.lite.TFLiteConverter.from_saved_model('mnt/vol/retrain')
 tflite_model = converter.convert()
 open("converted_model.tflite", "wb").write(tflite_model)
 return 
 
tensorboard_op = comp.func_to_container_op(tensorboard_func, base_image='tensorflow/tensorflow:1.12.0-py3')

In [12]:
# this step simply changes the permissions on the drive to make accessible jupyter hub and other locations
def publish_op(step_name='publish content'):
 return dsl.ContainerOp(
 name = step_name,
 image = GCLOUD_SDK,
 command=['sh', '-c'],
 arguments = ['chmod -R 0777 /mnt/vol/ ']
 ).add_volume(k8s_client.V1Volume(name='workdir', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
 ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')).apply(use_gcp_secret('user-gcp-sa'))


In [15]:
@dsl.pipeline(
 name='TFHub Image Classifier',
 description='Users TFHub based models such as Mobilenetv2 and NasNet to train an image classifer.'
)
def tfhub_image_classifier_dag(
 model_version: dsl.PipelineParam = dsl.PipelineParam(name='model-version', value='1'),
):
 #copy source data
 copy_source_data = copy_source_data_op()
 
 
 pre_process_data = pre_process_op()
 
 pre_process_data.after(copy_source_data)
 
 
 train = train_op()
 
 train.after(pre_process_data) 
 tensorboard = tensorboard_op2()
 tensorboard.after(train)
 
 tflite = tflite_transform_op().add_volume(k8s_client.V1Volume(name='workdir', persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs'))
 ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir')

 tflite.after(train)
 
 
 publish = publish_op()
 publish.after(train)
 
 
 #deploy = deploy_op()
 #deploy.after(publish)
 
 

In [25]:
client = kfp.Client()
exp = client.list_experiments().experiments[0]

In [31]:
from kfp import compiler
compiler.Compiler().compile(tfhub_image_classifier_dag, 'tfhub_image_classifier_dag.tar.gz')

In [34]:
run = client.run_pipeline(exp.id, 'TF Hub Image Classifier', 'tfhub_image_classifier_dag.tar.gz',
 params={})