diff --git a/Makefile b/Makefile index f27649f3..9efa1b25 100644 --- a/Makefile +++ b/Makefile @@ -54,9 +54,11 @@ format: cargo +nightly fmt --all lint: + pip install -e . isort --check --diff --project=mosec ${PY_SOURCE_FILES} black --check --diff ${PY_SOURCE_FILES} pylint -j 8 --recursive=y mosec + pylint -j 8 --recursive=y --disable=import-error examples --generated-members=numpy.*,torch.*,cv2.*,cv.* pydocstyle mosec @-rm mosec/_version.py mypy --install-types --non-interactive ${PY_SOURCE_FILES} diff --git a/examples/custom_env.py b/examples/custom_env.py index 54cebbd4..1d0433e0 100644 --- a/examples/custom_env.py +++ b/examples/custom_env.py @@ -11,12 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Custom Environment setup""" import logging import os from mosec import Server, Worker -from mosec.errors import ValidationError logger = logging.getLogger() logger.setLevel(logging.DEBUG) @@ -29,21 +29,23 @@ class Inference(Worker): + """Customisable inference class.""" + def __init__(self): super().__init__() # initialize your models here and allocate dedicated device to it device = os.environ["CUDA_VISIBLE_DEVICES"] - logger.info(f"Initializing model on device={device}") + logger.info("Initializing model on device=%s", device) def forward(self, _: dict) -> dict: device = os.environ["CUDA_VISIBLE_DEVICES"] # NOTE self.worker_id is 1-indexed - logger.info(f"Worker={self.worker_id} on device={device} is processing...") + logger.info("Worker=%d on device=%s is processing...", self.worker_id, device) return {"device": device} if __name__ == "__main__": - num_device = 4 + NUM_DEVICE = 4 def _get_cuda_device(cid: int) -> dict: return {"CUDA_VISIBLE_DEVICES": str(cid)} @@ -51,6 +53,6 @@ def _get_cuda_device(cid: int) -> dict: server = Server() server.append_worker( - Inference, num=num_device, env=[_get_cuda_device(x) for x in range(num_device)] + Inference, num=NUM_DEVICE, env=[_get_cuda_device(x) for x in range(NUM_DEVICE)] ) server.run() diff --git a/examples/distil_bert_server_pytorch.py b/examples/distil_bert_server_pytorch.py index 1fa1187c..bcc665db 100644 --- a/examples/distil_bert_server_pytorch.py +++ b/examples/distil_bert_server_pytorch.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Mosec with Pytorch Distil BERT.""" import logging from typing import List, TypeVar @@ -38,6 +39,8 @@ class Preprocess(Worker): + """Preprocess BERT on current setup.""" + def __init__(self): super().__init__() self.tokenizer = AutoTokenizer.from_pretrained( @@ -55,12 +58,14 @@ def forward(self, data: str) -> T: class Inference(Worker): + """Pytorch Inference class""" + def __init__(self): super().__init__() self.device = ( torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") ) - logger.info(f"using computing device: {self.device}") + logger.info("using computing device: %s", self.device) self.model = AutoModelForSequenceClassification.from_pretrained( "distilbert-base-uncased-finetuned-sst-2-english" ) diff --git a/examples/echo.py b/examples/echo.py index 108acb30..1c659b4a 100644 --- a/examples/echo.py +++ b/examples/echo.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Sample structures for using mosec server.""" import logging import time @@ -29,26 +30,32 @@ class Preprocess(Worker): + """Sample Class.""" + def forward(self, data: dict) -> float: - logger.debug(f"pre received {data}") + logger.debug("pre received %s", data) # Customized, simple input validation try: - time = float(data["time"]) + count_time = float(data["time"]) except KeyError as err: - raise ValidationError(f"cannot find key {err}") - return time + raise ValidationError(f"cannot find key {err}") from err + return count_time class Inference(Worker): + """Sample Class.""" + def forward(self, data: float) -> float: - logger.info(f"sleeping for {data} seconds") + logger.info("sleeping for %d seconds", data) time.sleep(data) return data class Postprocess(Worker): + """Sample Class.""" + def forward(self, data: float) -> dict: - logger.debug(f"post received {data}") + logger.debug("post received %f", data) return {"msg": f"sleep {data} seconds"} diff --git a/examples/plasma_shm_ipc.py b/examples/plasma_shm_ipc.py index cb6cb232..c38d2cc5 100644 --- a/examples/plasma_shm_ipc.py +++ b/examples/plasma_shm_ipc.py @@ -12,6 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Using Plasma store with mosec plugin PlasmaShmWrapper. + +We start a subprocess for the plasma server, and pass the path +to the plasma client which serves as the shm wrapper. +We also register the plasma server process as a daemon, so +that when it exits the service is able to gracefully shutdown +and restarted by the orchestrator. +""" + from functools import partial from pyarrow import plasma # type: ignore @@ -22,27 +31,25 @@ class DataProducer(Worker): + """Sample Data Producer.""" + def forward(self, data: dict) -> bytes: try: data_bytes = b"a" * int(data["size"]) except KeyError as err: - raise ValidationError(err) + raise ValidationError(err) from err return data_bytes class DataConsumer(Worker): + """Sample Data Consumer.""" + def forward(self, data: bytes) -> dict: return {"ipc test data length": len(data)} if __name__ == "__main__": - """ - We start a subprocess for the plasma server, and pass the path - to the plasma client which serves as the shm wrapper. - We also register the plasma server process as a daemon, so - that when it exits the service is able to gracefully shutdown - and restarted by the orchestrator. - """ + # 200 Mb store, adjust the size according to your requirement with plasma.start_plasma_store(plasma_store_memory=200 * 1000 * 1000) as ( shm_path, diff --git a/examples/python_side_metrics.py b/examples/python_side_metrics.py index 88ec123c..8fadb45c 100644 --- a/examples/python_side_metrics.py +++ b/examples/python_side_metrics.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Adding metrics service.""" import logging import os @@ -20,6 +21,14 @@ from typing import List from wsgiref.simple_server import make_server +from prometheus_client import ( # type: ignore + CONTENT_TYPE_LATEST, + CollectorRegistry, + Counter, + generate_latest, + multiprocess, +) + from mosec import Server, Worker from mosec.errors import ValidationError @@ -39,20 +48,15 @@ pathlib.Path(metric_dir_path).mkdir(parents=True, exist_ok=True) os.environ["PROMETHEUS_MULTIPROC_DIR"] = metric_dir_path -from prometheus_client import ( # type: ignore - CONTENT_TYPE_LATEST, - CollectorRegistry, - Counter, - generate_latest, - multiprocess, -) metric_registry = CollectorRegistry() multiprocess.MultiProcessCollector(metric_registry) counter = Counter("inference_result", "statistic of result", ("status", "worker_id")) -def metric_app(environ, start_response): +def metric_app(start_response): + """Sample Metric function.""" + data = generate_latest(metric_registry) start_response( "200 OK", @@ -62,11 +66,15 @@ def metric_app(environ, start_response): def metric_service(host="", port=8080): + """Sample metric service.""" + with make_server(host, port, metric_app) as httpd: httpd.serve_forever() class Inference(Worker): + """Sample Inference Worker.""" + def __init__(self): super().__init__() self.worker_id = str(self.worker_id) @@ -76,7 +84,7 @@ def deserialize(self, data: bytes) -> int: try: res = int(json_data.get("num")) except Exception as err: - raise ValidationError(err) + raise ValidationError(err) from err return res def forward(self, data: List[int]) -> List[bool]: diff --git a/examples/resnet50_client.py b/examples/resnet50_client.py index 0d8e140b..fe06e462 100644 --- a/examples/resnet50_client.py +++ b/examples/resnet50_client.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Sample Resnet client.""" import base64 diff --git a/examples/resnet50_server_pytorch.py b/examples/resnet50_server_pytorch.py index d77eb592..fd847f9d 100644 --- a/examples/resnet50_server_pytorch.py +++ b/examples/resnet50_server_pytorch.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Example: Sample Resnet server.""" import base64 import logging @@ -38,35 +39,39 @@ class Preprocess(Worker): - def forward(self, req: dict) -> np.ndarray: + """Sample Preprocess worker""" + + def forward(self, data: dict) -> np.ndarray: # Customized validation for input key and field content; raise # ValidationError so that the client can get 422 as http status try: - image = req["image"] - im = np.frombuffer(base64.b64decode(image), np.uint8) - im = cv2.imdecode(im, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb + image = data["image"] + img = np.frombuffer(base64.b64decode(image), np.uint8) + img = cv2.imdecode(img, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb except KeyError as err: - raise ValidationError(f"cannot find key {err}") + raise ValidationError(f"cannot find key {err}") from err except Exception as err: - raise ValidationError(f"cannot decode as image data: {err}") + raise ValidationError(f"cannot decode as image data: {err}") from err - im = cv2.resize(im, (256, 256)) - crop_im = ( - im[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255 + img = cv2.resize(img, (256, 256)) + crop_img = ( + img[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255 ) # center crop - crop_im -= [0.485, 0.456, 0.406] - crop_im /= [0.229, 0.224, 0.225] - crop_im = np.transpose(crop_im, (2, 0, 1)) - return crop_im + crop_img -= [0.485, 0.456, 0.406] + crop_img /= [0.229, 0.224, 0.225] + crop_img = np.transpose(crop_img, (2, 0, 1)) + return crop_img class Inference(Worker): + """Sample Inference worker""" + def __init__(self): super().__init__() self.device = ( torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") ) - logger.info(f"using computing device: {self.device}") + logger.info("using computing device: %s", self.device) self.model = torchvision.models.resnet50(pretrained=True) self.model.eval() self.model.to(self.device) @@ -77,7 +82,7 @@ def __init__(self): ] * INFERENCE_BATCH_SIZE def forward(self, data: List[np.ndarray]) -> List[int]: - logger.info(f"processing batch with size: {len(data)}") + logger.info("processing batch with size: %d", len(data)) with torch.no_grad(): batch = torch.stack([torch.tensor(arr, device=self.device) for arr in data]) output = self.model(batch) @@ -86,6 +91,8 @@ def forward(self, data: List[np.ndarray]) -> List[int]: class Postprocess(Worker): + """Sample Postprocess worker""" + def __init__(self): super().__init__() logger.info("loading categories file...") @@ -93,8 +100,8 @@ def __init__(self): "https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt" ) - with open(local_filename) as f: - self.categories = list(map(lambda x: x.strip(), f.readlines())) + with open(local_filename, encoding="utf8") as file: + self.categories = list(map(lambda x: x.strip(), file.readlines())) def forward(self, data: int) -> dict: return {"category": self.categories[data]}