Skip to content

Commit aa18bac

Browse files
bfinerandbogunowicz
authored andcommitted
Pipelines Refactor - Initial Impl (#1287)
1 parent b4eb96e commit aa18bac

File tree

14 files changed

+620
-0
lines changed

14 files changed

+620
-0
lines changed

src/deepsparse/v2/__init__.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# flake8: noqa
2+
3+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .pipeline import *
18+
from .operators import *
19+
from .routers import *
20+
from .schedulers import *
21+
from .utils import *
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# flake8: noqa
2+
3+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .operator import *
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing,
10+
# software distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from abc import ABC, abstractmethod
16+
from typing import Optional, Type
17+
18+
from pydantic import BaseModel
19+
20+
from deepsparse.v2.utils import Context, OperatorSchema
21+
22+
23+
__all__ = ["Operator"]
24+
25+
26+
class Operator(ABC):
27+
"""
28+
Base operator class - can represent any part of an ML pipeline
29+
"""
30+
31+
# expected structured input and output types, to be defined by child classes
32+
input_schema: Optional[Type[OperatorSchema]] = None
33+
output_schema: Optional[Type[OperatorSchema]] = None
34+
35+
@abstractmethod
36+
def run(self, inp: OperatorSchema, context: Context) -> OperatorSchema:
37+
"""
38+
:param inp: operator input, as the defined input schema if applicable
39+
:param context: pipeline context of already run operators
40+
:return: result of this operator as the defined output schema if applicable
41+
"""
42+
raise NotImplementedError
43+
44+
@classmethod
45+
def has_input_schema(cls) -> bool:
46+
"""
47+
:return: True if this class has a defined pydantic input schema
48+
"""
49+
return issubclass(cls.input_schema, BaseModel)
50+
51+
@classmethod
52+
def has_output_schema(cls) -> bool:
53+
"""
54+
:return: True if this class has a defined pydantic input schema
55+
"""
56+
return issubclass(cls.output_schema, BaseModel)
57+
58+
def __call__(
59+
self,
60+
*args,
61+
context: Optional[Context] = None,
62+
**kwargs,
63+
) -> OperatorSchema:
64+
"""
65+
Parses inputs to this Operator and runs the run() method of this operator
66+
67+
:param args: an unnamed arg may only be provided
68+
if it is of the type of the input_schema
69+
:param context: pipeline context to pass to operator
70+
:param kwargs: kwargs when not initializing from an instantiated schema
71+
:return: operator output
72+
"""
73+
if len(args) > 1:
74+
raise ValueError(
75+
f"Only 1 unnamed arg may be supplied to an Operator, found {len(args)}"
76+
)
77+
78+
if len(args) == 1:
79+
if self.input_schema is not None and isinstance(args[0], self.input_schema):
80+
inference_input = args[0]
81+
else:
82+
raise ValueError(
83+
f"1 arg supplied to Operator {self.__class__.__name__} but was not "
84+
f"of expected type {self.input_schema}, found {type(args[0])}"
85+
)
86+
elif self.has_input_schema():
87+
inference_input = self.input_schema(**kwargs)
88+
else:
89+
inference_input = kwargs
90+
return self.run(inference_input, context=context)

src/deepsparse/v2/pipeline.py

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing,
10+
# software distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from typing import List
17+
18+
from pydantic import BaseModel, Field, PrivateAttr
19+
20+
from deepsparse.v2.operators import Operator
21+
from deepsparse.v2.routers import Router
22+
from deepsparse.v2.schedulers import OperatorScheduler, SchedulerGroup
23+
24+
25+
__all__ = ["Pipeline"]
26+
27+
28+
class Pipeline(BaseModel):
29+
"""
30+
Pipeline accepts a series of operators, schedulers, and a router which define
31+
an end to end ML transformation.
32+
33+
Calling a pipeline runs these transformations
34+
"""
35+
36+
stages: List[Operator] = Field(
37+
required=True,
38+
description="In-order list of operators that make up this pipeline",
39+
)
40+
router: Router = Field(
41+
default_factor=Router,
42+
description="Router object to determine order and run the stages. "
43+
"Defaults to the base Router object",
44+
)
45+
schedulers: List[OperatorScheduler] = Field(
46+
default_factor=lambda: [OperatorScheduler()],
47+
description="List of schedulers to run operators in order of priority",
48+
)
49+
50+
_scheduler_group: SchedulerGroup = PrivateAttr()
51+
52+
class Config:
53+
arbitrary_types_allowed = True
54+
55+
def __init__(self, *args, **kwargs):
56+
super().__init__(*args, **kwargs)
57+
58+
self.validate()
59+
60+
# SchedulerGroup handles running all schedulers in order of priority
61+
self._scheduler_group = SchedulerGroup(self.schedulers)
62+
63+
def __call__(self, *args, return_context: bool = False, **kwargs):
64+
"""
65+
:param return_context: if True, retrns tuple of the pipelien output
66+
and entire context. Default False
67+
:return: output of the pipeline stages ran with the router for the given input
68+
"""
69+
if len(args) > 1:
70+
raise ValueError(
71+
"Only 1 in-line argument may be supplied to Pipeline which "
72+
f"must be a Schema, found: {len(args)}"
73+
)
74+
if args and kwargs:
75+
raise ValueError(
76+
"Pipeline can only run either a single in-line argument schema or a "
77+
f"series of kwargs, found {len(args)} args and {len(kwargs)} kwargs"
78+
)
79+
80+
pipeline_input = args[0] or kwargs
81+
pipeline_output, context = self.router.run(
82+
inp=pipeline_input,
83+
operators=self.stages,
84+
scheduler=self._scheduler_group,
85+
)
86+
87+
if return_context:
88+
return pipeline_output, context
89+
90+
return pipeline_output
91+
92+
def validate(self):
93+
router_validation = self.router.validate(self.stages)
94+
95+
if router_validation is False:
96+
# default error message
97+
stage_types = [type(stage) for stage in self.stages]
98+
raise ValueError(
99+
f"Invalid Router: {type(self.router)} for stages: {stage_types}"
100+
)
101+
elif isinstance(router_validation, str):
102+
raise ValueError(f"Invalid Router for stages: {router_validation}")

src/deepsparse/v2/routers/__init__.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# flake8: noqa
2+
3+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .router import *

src/deepsparse/v2/routers/router.py

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing,
10+
# software distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from typing import List, Tuple, Union
17+
18+
from deepsparse.v2.operators import Operator
19+
from deepsparse.v2.schedulers import OperatorScheduler
20+
from deepsparse.v2.utils import Context, OperatorSchema
21+
22+
23+
__all__ = ["Router"]
24+
25+
26+
class Router:
27+
"""
28+
Routers must implement a run method which runs a series of operators
29+
for a pipeline for a given input. Base Router runs operators linearly
30+
in a series
31+
"""
32+
33+
@staticmethod
34+
def run(
35+
inp: OperatorSchema,
36+
operators: List[Operator],
37+
scheduler: OperatorScheduler,
38+
) -> Tuple[OperatorSchema, Context]:
39+
"""
40+
:param inp: input to the first operator of the series
41+
:param operators: list of operators to run
42+
:param scheduler: scheudler to submit operators to
43+
:return: final output of the operators
44+
"""
45+
context = Context()
46+
47+
# run operators linearly
48+
operator_input = inp
49+
for operator in operators:
50+
output_future = scheduler.submit(
51+
operator=operator, operator_input=operator_input, context=context
52+
)
53+
54+
# wait for future to resolve
55+
operator_output = output_future.result()
56+
57+
# update context
58+
context.update(
59+
operator=operator,
60+
input=operator_input,
61+
output=operator_output,
62+
)
63+
64+
# previous output becomes next input
65+
operator_input = operator_output
66+
67+
return operator_output, context
68+
69+
@staticmethod
70+
def validate(operators: List[Operator]) -> Union[bool, str]:
71+
"""
72+
:param operators: operators that this Router could potentially run over
73+
:return: True if this Router can run this series of operators. Base Router
74+
runs any series of operators that is non empty and whose input and output
75+
schemas align. If not valid, either False or an error string will be
76+
returned
77+
"""
78+
if len(operators) < 1:
79+
return "No operators found"
80+
81+
for idx in range(len(operators) - 1):
82+
current_output_schema = operators[idx].output_schema
83+
next_input_schema = operators[idx + 1].input_schema
84+
85+
if current_output_schema is None or next_input_schema is None:
86+
# if no input/output schema defined, assume operator can run
87+
# without schema
88+
continue
89+
90+
if current_output_schema != next_input_schema:
91+
return (
92+
f"Operator at idx {idx}: {type(operators[idx])} has invalid "
93+
f"output schema {current_output_schema} for next operator "
94+
f"{type(operators[idx + 1])} which requires {next_input_schema}"
95+
)
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# flake8: noqa
2+
3+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from .scheduler import *
18+
from .scheduler_group import *

0 commit comments

Comments
 (0)