-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathpipeline.py
577 lines (523 loc) · 26.8 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Placeholder docstring"""
from __future__ import absolute_import
from typing import Callable, Optional, Dict, List, Union
import sagemaker
from sagemaker import ModelMetrics, Model
from sagemaker import local
from sagemaker import session
from sagemaker.config import (
ENDPOINT_CONFIG_KMS_KEY_ID_PATH,
MODEL_VPC_CONFIG_PATH,
MODEL_ENABLE_NETWORK_ISOLATION_PATH,
MODEL_EXECUTION_ROLE_ARN_PATH,
load_sagemaker_config,
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.metadata_properties import MetadataProperties
from sagemaker.model import ModelPackage
from sagemaker.model_card import (
ModelCard,
ModelPackageModelCard,
)
from sagemaker.session import Session
from sagemaker.utils import (
name_from_image,
update_container_with_inference_params,
resolve_value_from_config,
format_tags,
)
from sagemaker.transformer import Transformer
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.utils import instance_supports_kms
class PipelineModel(object):
"""A pipeline of SageMaker `Model` instances.
This pipeline can be deployed as an `Endpoint` on SageMaker.
"""
def __init__(
self,
models: List[Model],
role: str = None,
predictor_cls: Optional[Callable] = None,
name: Optional[str] = None,
vpc_config: Optional[Dict[str, List[Union[str, PipelineVariable]]]] = None,
sagemaker_session: Optional[Session] = None,
enable_network_isolation: Union[bool, PipelineVariable] = None,
):
"""Initialize a SageMaker `Model` instance.
The `Model` can be used to build an Inference Pipeline comprising of
multiple model containers.
Args:
models (list[sagemaker.Model]): For using multiple containers to
build an inference pipeline, you can pass a list of
``sagemaker.Model`` objects in the order you want the inference
to happen.
role (str): An AWS IAM role (either name or full ARN). The Amazon
SageMaker training jobs and APIs that create Amazon SageMaker
endpoints use this role to access training data and model
artifacts. After the endpoint is created, the inference code
might use the IAM role, if it needs to access an AWS resource.
predictor_cls (Callable[[string, sagemaker.session.Session], Any]): A
function to call to create a predictor (default: None). If not
None, ``deploy`` will return the result of invoking this
function on the created endpoint name.
name (str): The model name. If None, a default model name will be
selected on each ``deploy``.
vpc_config (dict[str, list[str]] or dict[str, list[PipelineVariable]]):
The VpcConfig set on the model (default: None)
* 'Subnets' (list[str]): List of subnet ids.
* 'SecurityGroupIds' (list[str]): List of security group ids.
sagemaker_session (sagemaker.session.Session): A SageMaker Session
object, used for SageMaker interactions (default: None). If not
specified, one is created using the default AWS configuration
chain.
enable_network_isolation (bool or PipelineVariable): Default False. if True,
enables network isolation in the endpoint, isolating the model
container. No inbound or outbound network calls can be made to
or from the model container.Boolean
"""
self.models = models
self.predictor_cls = predictor_cls
self.name = name
self.endpoint_name = None
self.sagemaker_session = sagemaker_session
# In case, sagemaker_session is None, get sagemaker_config from load_sagemaker_config()
# to resolve value from config for the respective parameter
self._sagemaker_config = (
load_sagemaker_config() if (self.sagemaker_session is None) else None
)
self.role = resolve_value_from_config(
role,
MODEL_EXECUTION_ROLE_ARN_PATH,
sagemaker_session=self.sagemaker_session,
sagemaker_config=self._sagemaker_config,
)
self.vpc_config = resolve_value_from_config(
vpc_config,
MODEL_VPC_CONFIG_PATH,
sagemaker_session=self.sagemaker_session,
sagemaker_config=self._sagemaker_config,
)
self.enable_network_isolation = resolve_value_from_config(
direct_input=enable_network_isolation,
config_path=MODEL_ENABLE_NETWORK_ISOLATION_PATH,
default_value=False,
sagemaker_session=self.sagemaker_session,
sagemaker_config=self._sagemaker_config,
)
if not self.role:
# Originally IAM role was a required parameter.
# Now we marked that as Optional because we can fetch it from SageMakerConfig
# Because of marking that parameter as optional, we should validate if it is None, even
# after fetching the config.
raise ValueError("An AWS IAM role is required to create a Pipeline Model.")
def pipeline_container_def(self, instance_type=None):
"""The pipeline definition for deploying this model.
This is the dict created by ``sagemaker.pipeline_container_def()``.
The instance type to be used may be specified.
Subclasses can override this to provide custom container definitions
for deployment to a specific instance type. Called by ``deploy()``.
Args:
instance_type (str): The EC2 instance type to deploy this Model to.
For example, 'ml.p2.xlarge'.
Returns:
list[dict[str, str]]: A list of container definition objects usable
with the CreateModel API in the scenario of multiple containers
(Inference Pipeline).
"""
return sagemaker.pipeline_container_def(self.models, instance_type)
def deploy(
self,
initial_instance_count,
instance_type,
serializer=None,
deserializer=None,
endpoint_name=None,
tags=None,
wait=True,
update_endpoint=False,
data_capture_config=None,
kms_key=None,
volume_size=None,
model_data_download_timeout=None,
container_startup_health_check_timeout=None,
):
"""Deploy the ``Model`` to an ``Endpoint``.
It optionally return a ``Predictor``.
Create a SageMaker ``Model`` and ``EndpointConfig``, and deploy an
``Endpoint`` from this ``Model``. If ``self.predictor_cls`` is not None,
this method returns a the result of invoking ``self.predictor_cls`` on
the created endpoint name.
The name of the created model is accessible in the ``name`` field of
this ``Model`` after deploy returns
The name of the created endpoint is accessible in the
``endpoint_name`` field of this ``Model`` after deploy returns.
Args:
initial_instance_count (int): The initial number of instances to run
in the ``Endpoint`` created from this ``Model``.
instance_type (str): The EC2 instance type to deploy this Model to.
For example, 'ml.p2.xlarge'.
serializer (:class:`~sagemaker.serializers.BaseSerializer`): A
serializer object, used to encode data for an inference endpoint
(default: None). If ``serializer`` is not None, then
``serializer`` will override the default serializer. The
default serializer is set by the ``predictor_cls``.
deserializer (:class:`~sagemaker.deserializers.BaseDeserializer`): A
deserializer object, used to decode data from an inference
endpoint (default: None). If ``deserializer`` is not None, then
``deserializer`` will override the default deserializer. The
default deserializer is set by the ``predictor_cls``.
endpoint_name (str): The name of the endpoint to create (default:
None). If not specified, a unique endpoint name will be created.
tags (List[dict[str, str]]): The list of tags to attach to this
specific endpoint.
wait (bool): Whether the call should wait until the deployment of
model completes (default: True).
update_endpoint (bool): Flag to update the model in an existing
Amazon SageMaker endpoint. If True, this will deploy a new
EndpointConfig to an already existing endpoint and delete
resources corresponding to the previous EndpointConfig. If
False, a new endpoint will be created. Default: False
data_capture_config (sagemaker.model_monitor.DataCaptureConfig): Specifies
configuration related to Endpoint data capture for use with
Amazon SageMaker Model Monitoring. Default: None.
kms_key (str): The ARN, Key ID or Alias of the KMS key that is used to
encrypt the data on the storage volume attached to the instance hosting
the endpoint.
volume_size (int): The size, in GB, of the ML storage volume attached to individual
inference instance associated with the production variant. Currenly only Amazon EBS
gp2 storage volumes are supported.
model_data_download_timeout (int): The timeout value, in seconds, to download and
extract model data from Amazon S3 to the individual inference instance associated
with this production variant.
container_startup_health_check_timeout (int): The timeout value, in seconds, for your
inference container to pass health check by SageMaker Hosting. For more information
about health check see:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html#your-algorithms-inference-algo-ping-requests
Returns:
Optional[Callable[[string, sagemaker.session.Session], Any]]: Invocation of
``self.predictor_cls`` on the created endpoint name, if ``self.predictor_cls``
is not None. Otherwise, return None.
"""
if not self.sagemaker_session:
self.sagemaker_session = Session(sagemaker_config=self._sagemaker_config)
containers = self.pipeline_container_def(instance_type)
self.name = self.name or name_from_image(containers[0]["Image"])
self.sagemaker_session.create_model(
self.name,
self.role,
containers,
vpc_config=self.vpc_config,
enable_network_isolation=self.enable_network_isolation,
)
production_variant = sagemaker.production_variant(
self.name,
instance_type,
initial_instance_count,
volume_size=volume_size,
model_data_download_timeout=model_data_download_timeout,
container_startup_health_check_timeout=container_startup_health_check_timeout,
)
self.endpoint_name = endpoint_name or self.name
kms_key = (
resolve_value_from_config(
kms_key, ENDPOINT_CONFIG_KMS_KEY_ID_PATH, sagemaker_session=self.sagemaker_session
)
if instance_supports_kms(instance_type)
else kms_key
)
data_capture_config_dict = None
if data_capture_config is not None:
data_capture_config_dict = data_capture_config._to_request_dict()
tags = format_tags(tags)
if update_endpoint:
endpoint_config_name = self.sagemaker_session.create_endpoint_config(
name=self.name,
model_name=self.name,
initial_instance_count=initial_instance_count,
instance_type=instance_type,
tags=tags,
kms_key=kms_key,
data_capture_config_dict=data_capture_config_dict,
volume_size=volume_size,
model_data_download_timeout=model_data_download_timeout,
container_startup_health_check_timeout=container_startup_health_check_timeout,
)
self.sagemaker_session.update_endpoint(
self.endpoint_name, endpoint_config_name, wait=wait
)
else:
self.sagemaker_session.endpoint_from_production_variants(
name=self.endpoint_name,
production_variants=[production_variant],
tags=tags,
kms_key=kms_key,
wait=wait,
data_capture_config_dict=data_capture_config_dict,
)
if self.predictor_cls:
predictor = self.predictor_cls(self.endpoint_name, self.sagemaker_session)
if serializer:
predictor.serializer = serializer
if deserializer:
predictor.deserializer = deserializer
return predictor
return None
@runnable_by_pipeline
def create(self, instance_type: str):
"""Create a SageMaker Model Entity
Args:
instance_type (str): The EC2 instance type that this Model will be
used for, this is only used to determine if the image needs GPU
support or not.
"""
self._create_sagemaker_pipeline_model(instance_type)
def _create_sagemaker_pipeline_model(self, instance_type):
"""Create a SageMaker Model Entity
Args:
instance_type (str): The EC2 instance type that this Model will be
used for, this is only used to determine if the image needs GPU
support or not.
"""
if not self.sagemaker_session:
self.sagemaker_session = Session(sagemaker_config=self._sagemaker_config)
containers = self.pipeline_container_def(instance_type)
self.name = self.name or name_from_image(containers[0]["Image"])
create_model_args = dict(
name=self.name,
role=self.role,
container_defs=containers,
vpc_config=self.vpc_config,
enable_network_isolation=self.enable_network_isolation,
)
self.sagemaker_session.create_model(**create_model_args)
@runnable_by_pipeline
def register(
self,
content_types: List[Union[str, PipelineVariable]] = None,
response_types: List[Union[str, PipelineVariable]] = None,
inference_instances: Optional[List[Union[str, PipelineVariable]]] = None,
transform_instances: Optional[List[Union[str, PipelineVariable]]] = None,
model_package_name: Optional[Union[str, PipelineVariable]] = None,
model_package_group_name: Optional[Union[str, PipelineVariable]] = None,
image_uri: Optional[Union[str, PipelineVariable]] = None,
model_metrics: Optional[ModelMetrics] = None,
metadata_properties: Optional[MetadataProperties] = None,
marketplace_cert: bool = False,
approval_status: Optional[Union[str, PipelineVariable]] = None,
description: Optional[str] = None,
drift_check_baselines: Optional[DriftCheckBaselines] = None,
customer_metadata_properties: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
domain: Optional[Union[str, PipelineVariable]] = None,
sample_payload_url: Optional[Union[str, PipelineVariable]] = None,
task: Optional[Union[str, PipelineVariable]] = None,
framework: Optional[Union[str, PipelineVariable]] = None,
framework_version: Optional[Union[str, PipelineVariable]] = None,
nearest_model_name: Optional[Union[str, PipelineVariable]] = None,
data_input_configuration: Optional[Union[str, PipelineVariable]] = None,
skip_model_validation: Optional[Union[str, PipelineVariable]] = None,
source_uri: Optional[Union[str, PipelineVariable]] = None,
model_card: Optional[Union[ModelPackageModelCard, ModelCard]] = None,
):
"""Creates a model package for creating SageMaker models or listing on Marketplace.
Args:
content_types (list[str] or list[PipelineVariable]): The supported MIME types
for the input data.
response_types (list[str] or list[PipelineVariable]): The supported MIME types
for the output data.
inference_instances (list[str] or list[PipelineVariable]): A list of the instance
types that are used to generate inferences in real-time (default: None).
transform_instances (list[str] or list[PipelineVariable]): A list of the instance types
on which a transformation job can be run or on which an endpoint can be deployed
(default: None).
model_package_name (str or PipelineVariable): Model Package name, exclusive to
`model_package_group_name`, using `model_package_name` makes the Model Package
un-versioned (default: None).
model_package_group_name (str or PipelineVariable): Model Package Group name,
exclusive to `model_package_name`, using `model_package_group_name` makes
the Model Package versioned (default: None).
image_uri (str or PipelineVariable): Inference image uri for the container.
Model class' self.image will be used if it is None (default: None).
model_metrics (ModelMetrics): ModelMetrics object (default: None).
metadata_properties (MetadataProperties): MetadataProperties object (default: None).
marketplace_cert (bool): A boolean value indicating if the Model Package is certified
for AWS Marketplace (default: False).
approval_status (str or PipelineVariable): Model Approval Status, values can
be "Approved", "Rejected", or "PendingManualApproval"
(default: "PendingManualApproval").
description (str): Model Package description (default: None).
drift_check_baselines (DriftCheckBaselines): DriftCheckBaselines object (default: None).
customer_metadata_properties (dict[str, str] or dict[str, PipelineVariable]):
A dictionary of key-value paired metadata properties (default: None).
domain (str or PipelineVariable): Domain values can be "COMPUTER_VISION",
"NATURAL_LANGUAGE_PROCESSING", "MACHINE_LEARNING" (default: None).
sample_payload_url (str or PipelineVariable): The S3 path where the sample payload
is stored (default: None).
task (str or PipelineVariable): Task values which are supported by Inference Recommender
are "FILL_MASK", "IMAGE_CLASSIFICATION", "OBJECT_DETECTION", "TEXT_GENERATION",
"IMAGE_SEGMENTATION", "CLASSIFICATION", "REGRESSION", "OTHER" (default: None).
framework (str or PipelineVariable): Machine learning framework of the model package
container image (default: None).
framework_version (str or PipelineVariable): Framework version of the Model Package
Container Image (default: None).
nearest_model_name (str or PipelineVariable): Name of a pre-trained machine learning
benchmarked by Amazon SageMaker Inference Recommender (default: None).
data_input_configuration (str or PipelineVariable): Input object for the model
(default: None).
skip_model_validation (str or PipelineVariable): Indicates if you want to skip model
validation. Values can be "All" or "None" (default: None).
source_uri (str or PipelineVariable): The URI of the source for the model package
(default: None).
model_card (ModeCard or ModelPackageModelCard): document contains qualitative and
quantitative information about a model (default: None).
Returns:
If ``sagemaker_session`` is a ``PipelineSession`` instance, returns pipeline step
arguments. Otherwise, returns ``None``
"""
for model in self.models:
if model.model_data is None:
raise ValueError("SageMaker Model Package cannot be created without model data.")
if model_package_group_name is not None:
container_def = self.pipeline_container_def(
inference_instances[0] if inference_instances else None
)
container_def = update_container_with_inference_params(
framework=framework,
framework_version=framework_version,
nearest_model_name=nearest_model_name,
data_input_configuration=data_input_configuration,
container_list=container_def,
)
else:
container_def = [
{
"Image": image_uri or model.image_uri,
"ModelDataUrl": model.model_data,
}
for model in self.models
]
model_pkg_args = sagemaker.get_model_package_args(
content_types,
response_types,
inference_instances=inference_instances,
transform_instances=transform_instances,
model_package_name=model_package_name,
model_package_group_name=model_package_group_name,
model_metrics=model_metrics,
metadata_properties=metadata_properties,
marketplace_cert=marketplace_cert,
approval_status=approval_status,
description=description,
container_def_list=container_def,
drift_check_baselines=drift_check_baselines,
customer_metadata_properties=customer_metadata_properties,
domain=domain,
sample_payload_url=sample_payload_url,
task=task,
skip_model_validation=skip_model_validation,
source_uri=source_uri,
model_card=model_card,
)
model_package = self.sagemaker_session.create_model_package_from_containers(
**model_pkg_args
)
if model_package is not None and "ModelPackageArn" in model_package:
return ModelPackage(
role=self.role,
model_package_arn=model_package.get("ModelPackageArn"),
sagemaker_session=self.sagemaker_session,
predictor_cls=self.predictor_cls,
)
return None
def transformer(
self,
instance_count,
instance_type,
strategy=None,
assemble_with=None,
output_path=None,
output_kms_key=None,
accept=None,
env=None,
max_concurrent_transforms=None,
max_payload=None,
tags=None,
volume_kms_key=None,
):
"""Return a ``Transformer`` that uses this Model.
Args:
instance_count (int): Number of EC2 instances to use.
instance_type (str): Type of EC2 instance to use, for example,
'ml.c4.xlarge'.
strategy (str): The strategy used to decide how to batch records in
a single request (default: None). Valid values: 'MultiRecord'
and 'SingleRecord'.
assemble_with (str): How the output is assembled (default: None).
Valid values: 'Line' or 'None'.
output_path (str): S3 location for saving the transform result. If
not specified, results are stored to a default bucket.
output_kms_key (str): Optional. KMS key ID for encrypting the
transform output (default: None).
accept (str): The accept header passed by the client to
the inference endpoint. If it is supported by the endpoint,
it will be the format of the batch transform output.
env (dict): Environment variables to be set for use during the
transform job (default: None).
max_concurrent_transforms (int): The maximum number of HTTP requests
to be made to each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP
request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If
none specified, then the tags used for the training job are used
for the transform job.
volume_kms_key (str): Optional. KMS key ID for encrypting the volume
attached to the ML compute instance (default: None).
"""
self._create_sagemaker_pipeline_model(instance_type)
return Transformer(
self.name,
instance_count,
instance_type,
strategy=strategy,
assemble_with=assemble_with,
output_path=output_path,
output_kms_key=output_kms_key,
accept=accept,
max_concurrent_transforms=max_concurrent_transforms,
max_payload=max_payload,
env=env,
tags=format_tags(tags),
base_transform_job_name=self.name,
volume_kms_key=volume_kms_key,
sagemaker_session=self.sagemaker_session,
)
def delete_model(self):
"""Delete the SageMaker model backing this pipeline model.
This does not delete the list of SageMaker models used in multiple containers to build
the inference pipeline.
"""
if self.name is None:
raise ValueError("The SageMaker model must be created before attempting to delete.")
self.sagemaker_session.delete_model(self.name)
def _init_sagemaker_session_if_does_not_exist(self, instance_type=None):
"""Set ``self.sagemaker_session`` to ``LocalSession`` or ``Session`` if it's not already.
The type of session object is determined by the instance type.
"""
if self.sagemaker_session:
return
if instance_type in ("local", "local_gpu"):
self.sagemaker_session = local.LocalSession(sagemaker_config=self._sagemaker_config)
else:
self.sagemaker_session = session.Session(sagemaker_config=self._sagemaker_config)