Skip to content

Commit

Permalink
Fix linting in examples folder (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkcache authored Jul 7, 2022
1 parent 8c62b6b commit 8440ebc
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ format:
cargo +nightly fmt --all

lint:
pip install -e .
isort --check --diff --project=mosec ${PY_SOURCE_FILES}
black --check --diff ${PY_SOURCE_FILES}
pylint -j 8 --recursive=y mosec
pylint -j 8 --recursive=y --disable=import-error examples --generated-members=numpy.*,torch.*,cv2.*,cv.*
pydocstyle mosec
@-rm mosec/_version.py
mypy --install-types --non-interactive ${PY_SOURCE_FILES}
Expand Down
12 changes: 7 additions & 5 deletions examples/custom_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Custom Environment setup"""

import logging
import os

from mosec import Server, Worker
from mosec.errors import ValidationError

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Expand All @@ -29,28 +29,30 @@


class Inference(Worker):
"""Customisable inference class."""

def __init__(self):
super().__init__()
# initialize your models here and allocate dedicated device to it
device = os.environ["CUDA_VISIBLE_DEVICES"]
logger.info(f"Initializing model on device={device}")
logger.info("Initializing model on device=%s", device)

def forward(self, _: dict) -> dict:
device = os.environ["CUDA_VISIBLE_DEVICES"]
# NOTE self.worker_id is 1-indexed
logger.info(f"Worker={self.worker_id} on device={device} is processing...")
logger.info("Worker=%d on device=%s is processing...", self.worker_id, device)
return {"device": device}


if __name__ == "__main__":
num_device = 4
NUM_DEVICE = 4

def _get_cuda_device(cid: int) -> dict:
return {"CUDA_VISIBLE_DEVICES": str(cid)}

server = Server()

server.append_worker(
Inference, num=num_device, env=[_get_cuda_device(x) for x in range(num_device)]
Inference, num=NUM_DEVICE, env=[_get_cuda_device(x) for x in range(NUM_DEVICE)]
)
server.run()
7 changes: 6 additions & 1 deletion examples/distil_bert_server_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Mosec with Pytorch Distil BERT."""

import logging
from typing import List, TypeVar
Expand Down Expand Up @@ -38,6 +39,8 @@


class Preprocess(Worker):
"""Preprocess BERT on current setup."""

def __init__(self):
super().__init__()
self.tokenizer = AutoTokenizer.from_pretrained(
Expand All @@ -55,12 +58,14 @@ def forward(self, data: str) -> T:


class Inference(Worker):
"""Pytorch Inference class"""

def __init__(self):
super().__init__()
self.device = (
torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
)
logger.info(f"using computing device: {self.device}")
logger.info("using computing device: %s", self.device)
self.model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased-finetuned-sst-2-english"
)
Expand Down
19 changes: 13 additions & 6 deletions examples/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample structures for using mosec server."""

import logging
import time
Expand All @@ -29,26 +30,32 @@


class Preprocess(Worker):
"""Sample Class."""

def forward(self, data: dict) -> float:
logger.debug(f"pre received {data}")
logger.debug("pre received %s", data)
# Customized, simple input validation
try:
time = float(data["time"])
count_time = float(data["time"])
except KeyError as err:
raise ValidationError(f"cannot find key {err}")
return time
raise ValidationError(f"cannot find key {err}") from err
return count_time


class Inference(Worker):
"""Sample Class."""

def forward(self, data: float) -> float:
logger.info(f"sleeping for {data} seconds")
logger.info("sleeping for %d seconds", data)
time.sleep(data)
return data


class Postprocess(Worker):
"""Sample Class."""

def forward(self, data: float) -> dict:
logger.debug(f"post received {data}")
logger.debug("post received %f", data)
return {"msg": f"sleep {data} seconds"}


Expand Down
23 changes: 15 additions & 8 deletions examples/plasma_shm_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example: Using Plasma store with mosec plugin PlasmaShmWrapper.
We start a subprocess for the plasma server, and pass the path
to the plasma client which serves as the shm wrapper.
We also register the plasma server process as a daemon, so
that when it exits the service is able to gracefully shutdown
and restarted by the orchestrator.
"""

from functools import partial

from pyarrow import plasma # type: ignore
Expand All @@ -22,27 +31,25 @@


class DataProducer(Worker):
"""Sample Data Producer."""

def forward(self, data: dict) -> bytes:
try:
data_bytes = b"a" * int(data["size"])
except KeyError as err:
raise ValidationError(err)
raise ValidationError(err) from err
return data_bytes


class DataConsumer(Worker):
"""Sample Data Consumer."""

def forward(self, data: bytes) -> dict:
return {"ipc test data length": len(data)}


if __name__ == "__main__":
"""
We start a subprocess for the plasma server, and pass the path
to the plasma client which serves as the shm wrapper.
We also register the plasma server process as a daemon, so
that when it exits the service is able to gracefully shutdown
and restarted by the orchestrator.
"""

# 200 Mb store, adjust the size according to your requirement
with plasma.start_plasma_store(plasma_store_memory=200 * 1000 * 1000) as (
shm_path,
Expand Down
26 changes: 17 additions & 9 deletions examples/python_side_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Adding metrics service."""

import logging
import os
Expand All @@ -20,6 +21,14 @@
from typing import List
from wsgiref.simple_server import make_server

from prometheus_client import ( # type: ignore
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
generate_latest,
multiprocess,
)

from mosec import Server, Worker
from mosec.errors import ValidationError

Expand All @@ -39,20 +48,15 @@
pathlib.Path(metric_dir_path).mkdir(parents=True, exist_ok=True)
os.environ["PROMETHEUS_MULTIPROC_DIR"] = metric_dir_path

from prometheus_client import ( # type: ignore
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
generate_latest,
multiprocess,
)

metric_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(metric_registry)
counter = Counter("inference_result", "statistic of result", ("status", "worker_id"))


def metric_app(environ, start_response):
def metric_app(start_response):
"""Sample Metric function."""

data = generate_latest(metric_registry)
start_response(
"200 OK",
Expand All @@ -62,11 +66,15 @@ def metric_app(environ, start_response):


def metric_service(host="", port=8080):
"""Sample metric service."""

with make_server(host, port, metric_app) as httpd:
httpd.serve_forever()


class Inference(Worker):
"""Sample Inference Worker."""

def __init__(self):
super().__init__()
self.worker_id = str(self.worker_id)
Expand All @@ -76,7 +84,7 @@ def deserialize(self, data: bytes) -> int:
try:
res = int(json_data.get("num"))
except Exception as err:
raise ValidationError(err)
raise ValidationError(err) from err
return res

def forward(self, data: List[int]) -> List[bool]:
Expand Down
1 change: 1 addition & 0 deletions examples/resnet50_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample Resnet client."""

import base64

Expand Down
41 changes: 24 additions & 17 deletions examples/resnet50_server_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample Resnet server."""

import base64
import logging
Expand Down Expand Up @@ -38,35 +39,39 @@


class Preprocess(Worker):
def forward(self, req: dict) -> np.ndarray:
"""Sample Preprocess worker"""

def forward(self, data: dict) -> np.ndarray:
# Customized validation for input key and field content; raise
# ValidationError so that the client can get 422 as http status
try:
image = req["image"]
im = np.frombuffer(base64.b64decode(image), np.uint8)
im = cv2.imdecode(im, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb
image = data["image"]
img = np.frombuffer(base64.b64decode(image), np.uint8)
img = cv2.imdecode(img, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb
except KeyError as err:
raise ValidationError(f"cannot find key {err}")
raise ValidationError(f"cannot find key {err}") from err
except Exception as err:
raise ValidationError(f"cannot decode as image data: {err}")
raise ValidationError(f"cannot decode as image data: {err}") from err

im = cv2.resize(im, (256, 256))
crop_im = (
im[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255
img = cv2.resize(img, (256, 256))
crop_img = (
img[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255
) # center crop
crop_im -= [0.485, 0.456, 0.406]
crop_im /= [0.229, 0.224, 0.225]
crop_im = np.transpose(crop_im, (2, 0, 1))
return crop_im
crop_img -= [0.485, 0.456, 0.406]
crop_img /= [0.229, 0.224, 0.225]
crop_img = np.transpose(crop_img, (2, 0, 1))
return crop_img


class Inference(Worker):
"""Sample Inference worker"""

def __init__(self):
super().__init__()
self.device = (
torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
)
logger.info(f"using computing device: {self.device}")
logger.info("using computing device: %s", self.device)
self.model = torchvision.models.resnet50(pretrained=True)
self.model.eval()
self.model.to(self.device)
Expand All @@ -77,7 +82,7 @@ def __init__(self):
] * INFERENCE_BATCH_SIZE

def forward(self, data: List[np.ndarray]) -> List[int]:
logger.info(f"processing batch with size: {len(data)}")
logger.info("processing batch with size: %d", len(data))
with torch.no_grad():
batch = torch.stack([torch.tensor(arr, device=self.device) for arr in data])
output = self.model(batch)
Expand All @@ -86,15 +91,17 @@ def forward(self, data: List[np.ndarray]) -> List[int]:


class Postprocess(Worker):
"""Sample Postprocess worker"""

def __init__(self):
super().__init__()
logger.info("loading categories file...")
local_filename, _ = urlretrieve(
"https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt"
)

with open(local_filename) as f:
self.categories = list(map(lambda x: x.strip(), f.readlines()))
with open(local_filename, encoding="utf8") as file:
self.categories = list(map(lambda x: x.strip(), file.readlines()))

def forward(self, data: int) -> dict:
return {"category": self.categories[data]}
Expand Down

0 comments on commit 8440ebc

Please # to comment.