Skip to content

Bump mypy version #7349

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ repos:
- id: isort
language_version: python3
- repo: https://github.com/asottile/pyupgrade
rev: v2.38.2
rev: v3.2.2
hooks:
- id: pyupgrade
args:
- --py38-plus
- repo: https://github.com/psf/black
rev: 22.8.0
rev: 22.10.0
hooks:
- id: black
language_version: python3
Expand All @@ -35,13 +35,13 @@ repos:
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
- flake8-bugbear==22.9.23
- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
rev: v2.2.2
hooks:
- id: codespell
types_or: [rst, markdown]
files: docs
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.982
rev: v0.991
hooks:
- id: mypy
# Override default --ignore-missing-imports
Expand Down
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def set_thread_ident():

self.io_loop.add_callback(set_thread_ident)
self._startup_lock = asyncio.Lock()
self.__startup_exc: Exception | None = None
self.__startup_exc = None

self.rpc = ConnectionPool(
limit=connection_limit,
Expand Down
2 changes: 1 addition & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2222,7 +2222,7 @@ def task_stream_figure(clear_interval="20s", **kwargs):
PanTool(dimensions="width"),
WheelZoomTool(dimensions="width"),
)
if ExportTool:
if ExportTool: # type: ignore
export = ExportTool()
export.register_plot(root)
root.add_tools(export)
Expand Down
2 changes: 1 addition & 1 deletion distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
def template_variables():
from distributed.diagnostics.nvml import device_get_count

template_variables: dict = {
template_variables = {
"pages": [
"status",
"workers",
Expand Down
2 changes: 1 addition & 1 deletion distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _get_plugin_name(plugin: SchedulerPlugin | WorkerPlugin | NannyPlugin) -> st

"""
if hasattr(plugin, "name"):
return plugin.name # type: ignore
return plugin.name
else:
return funcname(type(plugin)) + "-" + str(uuid.uuid4())

Expand Down
14 changes: 9 additions & 5 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from collections import defaultdict
from timeit import default_timer
from typing import ClassVar

from tlz import groupby, valmap

Expand Down Expand Up @@ -298,7 +299,10 @@ def restart(self, scheduler):
class GroupTiming(SchedulerPlugin):
"""Keep track of high-level timing information for task group progress"""

name = "group-timing"
name: ClassVar[str] = "group-timing"
time: list[float]
compute: dict[str, list[float]]
nthreads: list[float]

def __init__(self, scheduler):
self.scheduler = scheduler
Expand All @@ -309,17 +313,17 @@ def __init__(self, scheduler):
# Initialize our data structures.
self._init()

def _init(self):
def _init(self) -> None:
"""Shared initializatoin code between __init__ and restart"""
now = time()

# Timestamps for tracking compute durations by task group.
# Start with length 2 so that we always can compute a valid dt later.
self.time: list[float] = [now] * 2
self.time = [now] * 2
# The amount of compute since the last timestamp
self.compute: dict[str, list[float]] = {}
self.compute = {}
# The number of threads at the time
self.nthreads: list[float] = [self.scheduler.total_nthreads] * 2
self.nthreads = [self.scheduler.total_nthreads] * 2

def transition(self, key, start, finish, *args, **kwargs):
# We are mostly interested in when tasks complete for now, so just look
Expand Down
8 changes: 7 additions & 1 deletion distributed/http/scheduler/prometheus/stealing.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING

from distributed.http.prometheus import PrometheusCollector
from distributed.stealing import WorkStealing

if TYPE_CHECKING:
from prometheus_client.core import CounterMetricFamily


class WorkStealingMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "stealing"

def collect(self):
def collect(self) -> Iterator[CounterMetricFamily]:
from prometheus_client.core import CounterMetricFamily

try:
Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/tests/test_to_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def test_non_msgpack_serializable_layer(c, s, a, b):
with dask.config.set({"distributed.scheduler.allowed-imports": "test_to_pickle"}):
a = NonMsgPackSerializableLayer({"x": 42})
layers = {"a": a}
dependencies: dict[str, set] = {"a": set()}
dependencies = {"a": set()}
hg = HighLevelGraph(layers, dependencies)
res = await c.get(hg, "x", sync=False)
assert res == 42
3 changes: 3 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@
from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.variable import VariableExtension

# FIXME improve annotations. See also special treatment in setup.cfg.
# mypy: disable-error-code=annotation-unchecked

if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.10)
from typing_extensions import TypeAlias
Expand Down
3 changes: 1 addition & 2 deletions distributed/shuffle/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from dask.blockwise import Blockwise
from dask.utils_test import hlg_layer_topological

from distributed.shuffle._shuffle_extension import ShuffleWorkerExtension
from distributed.utils_test import gen_cluster


Expand All @@ -35,7 +34,7 @@ async def test_basic_state(c, s, *workers):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
shuffled = df.shuffle("id", shuffle="p2p")

exts: list[ShuffleWorkerExtension] = [w.extensions["shuffle"] for w in workers]
exts = [w.extensions["shuffle"] for w in workers]
for ext in exts:
assert not ext.shuffles

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(c, s, a, b

t = asyncio.create_task(c.retire_workers([a.address]))

amm: ActiveMemoryManagerExtension = s.extensions["amm"]
amm = s.extensions["amm"]
while not amm.policies:
await asyncio.sleep(0)
policy = next(iter(amm.policies))
Expand Down
9 changes: 4 additions & 5 deletions distributed/tests/test_multi_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from distributed import MultiLock, get_client
from distributed.metrics import time
from distributed.multi_lock import MultiLockExtension
from distributed.utils_test import gen_cluster


Expand All @@ -26,7 +25,7 @@ def f(_):

futures = c.map(f, range(20))
await c.gather(futures)
ext: MultiLockExtension = s.extensions["multi_locks"]
ext = s.extensions["multi_locks"]
assert not ext.events
assert not ext.requests
assert not ext.requests_left
Expand All @@ -35,7 +34,7 @@ def f(_):

@gen_cluster(client=True)
async def test_timeout(c, s, a, b):
ext: MultiLockExtension = s.extensions["multi_locks"]
ext = s.extensions["multi_locks"]
lock1 = MultiLock(names=["x"])
result = await lock1.acquire()
assert result is True
Expand Down Expand Up @@ -80,7 +79,7 @@ async def test_timeout_wake_waiter(s, a, b):

@gen_cluster(client=True)
async def test_multiple_locks(c, s, a, b):
ext: MultiLockExtension = s.extensions["multi_locks"]
ext = s.extensions["multi_locks"]
l1 = MultiLock(names=["l1"])
l2 = MultiLock(names=["l2"])
l3 = MultiLock(names=["l1", "l2"])
Expand Down Expand Up @@ -137,7 +136,7 @@ async def test_multiple_locks(c, s, a, b):

@gen_cluster(client=True)
async def test_num_locks(c, s, a, b):
ext: MultiLockExtension = s.extensions["multi_locks"]
ext = s.extensions["multi_locks"]
l1 = MultiLock(names=["l1", "l2", "l3"])
l2 = MultiLock(names=["l1", "l2", "l3"])
l3 = MultiLock(names=["l1", "l2", "l3", "l4"])
Expand Down
12 changes: 6 additions & 6 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from itertools import product
from textwrap import dedent
from time import sleep
from typing import ClassVar, Collection
from typing import Collection

import cloudpickle
import psutil
Expand Down Expand Up @@ -343,11 +343,11 @@ async def test_graph_execution_width(c, s, *workers):
class Refcount:
"Track how many instances of this class exist; logs the count at creation and deletion"

count: ClassVar[int] = 0
lock: ClassVar[dask.utils.SerializableLock] = dask.utils.SerializableLock()
log: ClassVar[list[int]] = []
count = 0
lock = dask.utils.SerializableLock()
log = []

def __init__(self) -> None:
def __init__(self):
with self.lock:
type(self).count += 1
self.log.append(self.count)
Expand Down Expand Up @@ -2511,7 +2511,7 @@ class NoSchedulerDelayWorker(Worker):
comparisons using times reported from workers.
"""

@property # type: ignore
@property
def scheduler_delay(self):
return 0

Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3645,8 +3645,8 @@ async def test_reconnect_argument_deprecated(s):
@gen_cluster(client=True, nthreads=[])
async def test_worker_running_before_running_plugins(c, s, caplog):
class InitWorkerNewThread(WorkerPlugin):
name: str = "init_worker_new_thread"
setup_status: Status | None = None
name = "init_worker_new_thread"
setup_status = None

def setup(self, worker):
self.setup_status = worker.status
Expand Down
21 changes: 9 additions & 12 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,28 +993,25 @@ def ensure_bytes(s):
return _ensure_bytes(s)


def ensure_memoryview(obj):
def ensure_memoryview(obj: bytes | bytearray | memoryview | PickleBuffer) -> memoryview:
"""Ensure `obj` is a 1-D contiguous `uint8` `memoryview`"""
mv: memoryview
if type(obj) is memoryview:
mv = obj
else:
mv = memoryview(obj)
if not isinstance(obj, memoryview):
obj = memoryview(obj)

if not mv.nbytes:
if not obj.nbytes:
# Drop `obj` reference to permit freeing underlying data
return memoryview(bytearray())
elif not mv.contiguous:
elif not obj.contiguous:
# Copy to contiguous form of expected shape & type
return memoryview(bytearray(mv))
elif mv.ndim != 1 or mv.format != "B":
return memoryview(bytearray(obj))
elif obj.ndim != 1 or obj.format != "B":
# Perform zero-copy reshape & cast
# Use `PickleBuffer.raw()` as `memoryview.cast()` fails with F-order
# xref: https://github.com/python/cpython/issues/91484
return PickleBuffer(mv).raw()
return PickleBuffer(obj).raw()
else:
# Return `memoryview` as it already meets requirements
return mv
return obj


def open_port(host: str = "") -> int:
Expand Down
4 changes: 1 addition & 3 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1628,9 +1628,7 @@ def check_thread_leak():

frames = sys._current_frames()
try:
lines: list[str] = [
f"{len(bad_threads)} thread(s) were leaked from test\n"
]
lines = [f"{len(bad_threads)} thread(s) were leaked from test\n"]
for i, thread in enumerate(bad_threads, 1):
lines.append(
f"------ Call stack of leaked thread {i}/{len(bad_threads)}: {thread} ------"
Expand Down
4 changes: 2 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ def handle_stimulus(self, *stims: StateMachineEvent) -> None:
super().handle_stimulus(*stims)
except Exception as e:
if hasattr(e, "to_event"):
topic, msg = e.to_event() # type: ignore
topic, msg = e.to_event()
self.log_event(topic, msg)
raise

Expand Down Expand Up @@ -2586,7 +2586,7 @@ def validate_state(self) -> None:
pdb.set_trace()

if hasattr(e, "to_event"):
topic, msg = e.to_event() # type: ignore
topic, msg = e.to_event()
self.log_event(topic, msg)

raise
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ warn_unused_ignores = true
warn_unreachable = true

# FIXME must clean these modules up
# Also look for 'mypy: disable-error-code' at the top of modules
[mypy-distributed.client]
allow_incomplete_defs = true
[mypy-distributed.scheduler]
Expand Down