Skip to content

Commit

Permalink
Hookup new Cipher type with auto-detect
Browse files Browse the repository at this point in the history
Tests do not pass yet though...
  • Loading branch information
cmroche committed Jul 7, 2024
1 parent 94b4412 commit 0319337
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 55 deletions.
20 changes: 20 additions & 0 deletions .idea/runConfigurations/pytest_in__.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 44 additions & 18 deletions greeclimate/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import re
from asyncio import AbstractEventLoop
from enum import IntEnum, unique
from typing import Union

from greeclimate.cipher import CipherV1
from greeclimate.cipher import CipherV1, CipherV2, CipherBase
from greeclimate.deviceinfo import DeviceInfo
from greeclimate.network import DeviceProtocol2
from greeclimate.exceptions import DeviceNotBoundError, DeviceTimeoutError
from greeclimate.network import DeviceProtocol2
from greeclimate.taskable import Taskable


Expand Down Expand Up @@ -43,6 +44,12 @@ class Props(enum.Enum):
UNKNOWN_HEATCOOLTYPE = "HeatCoolType"


GENERIC_CIPHERS_KEYS = {
CipherV1: b'a3K8Bx%2r8Y7#xDh',
CipherV2: b'{yxAHAY_Lm6pbC/<'
}


@unique
class TemperatureUnits(IntEnum):
C = 0
Expand Down Expand Up @@ -154,6 +161,13 @@ class Device(DeviceProtocol2, Taskable):
water_full: A bool to indicate the water tank is full
"""

""" Device properties """
hid = None
version = None
check_version = True
_properties = {}
_dirty = []

def __init__(self, device_info: DeviceInfo, timeout: int = 120, loop: AbstractEventLoop = None):
"""Initialize the device object
Expand All @@ -165,17 +179,9 @@ def __init__(self, device_info: DeviceInfo, timeout: int = 120, loop: AbstractEv
DeviceProtocol2.__init__(self, timeout)
Taskable.__init__(self, loop)
self._logger = logging.getLogger(__name__)

self.device_info: DeviceInfo = device_info

""" Device properties """
self.hid = None
self.version = None
self.check_version = True
self._properties = {}
self._dirty = []

async def bind(self, key=None):
async def bind(self, key: str = None, cipher_type: Union[type[Union[CipherV1, CipherV2]], None] = None):
"""Run the binding procedure.
Binding is a finicky procedure, and happens in 1 of 2 ways:
Expand All @@ -186,14 +192,18 @@ async def bind(self, key=None):
Both approaches result in a device_key which is used as like a persistent session id.
Args:
cipher_type (type): The cipher type to use for encryption, if None will attempt to detect the correct one
key (str): The device key, when provided binding is a NOOP, if None binding will
attempt to negotiate the key with the device.
attempt to negotiate the key with the device. cipher_type must be provided.
Raises:
DeviceNotBoundError: If binding was unsuccessful and no key returned
DeviceTimeoutError: The device didn't respond
"""

if key and not cipher_type:
raise ValueError("cipher_type must be provided when key is provided")

if not self.device_info:
raise DeviceNotBoundError

Expand All @@ -206,12 +216,18 @@ async def bind(self, key=None):

try:
if key:
self.device_cipher = CipherV1(key.encode())
self.device_cipher = cipher_type(key.encode())
else:
await self.send(self.create_bind_message(self.device_info))
# Special case, wait for binding to complete so we know that the device is ready
task = asyncio.create_task(self.ready.wait())
await asyncio.wait_for(task, timeout=self._timeout)
if cipher_type is not None:
await self.__bind_internal(cipher_type)
else:
""" Try binding with CipherV1 first, if that fails try CipherV2"""
try:
self._logger.info("Attempting to bind to device using CipherV1")
await self.__bind_internal(CipherV1)
except asyncio.TimeoutError:
self._logger.info("Attempting to bind to device using CipherV2")
await self.__bind_internal(CipherV2)

except asyncio.TimeoutError:
raise DeviceTimeoutError
Expand All @@ -221,9 +237,19 @@ async def bind(self, key=None):
else:
self._logger.info("Bound to device using key %s", self.device_cipher.key)

async def __bind_internal(self, cipher_type: type[Union[CipherV1, CipherV2]]):
"""Internal binding procedure, do not call directly"""
default_key = GENERIC_CIPHERS_KEYS.get(cipher_type)
await self.send(self.create_bind_message(self.device_info), cipher=cipher_type(default_key))
task = asyncio.create_task(self.ready.wait())
await asyncio.wait_for(task, timeout=self._timeout)

def handle_device_bound(self, key: str) -> None:
"""Handle the device bound message from the device"""
self.device_cipher = CipherV1(key.encode())
cipher_type = type(self.device_cipher)
if not issubclass(cipher_type, CipherBase):
raise ValueError(f"Invalid cipher type {cipher_type}")
self.device_cipher = cipher_type(key.encode())

async def request_version(self) -> None:
"""Request the firmware version from the device."""
Expand Down
50 changes: 33 additions & 17 deletions greeclimate/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
from enum import Enum
from typing import Any, Dict, Tuple, Union

from greeclimate.cipher import CipherBase, CipherV1
from greeclimate.cipher import CipherBase
from greeclimate.deviceinfo import DeviceInfo

NETWORK_TIMEOUT = 10
GENERIC_CIPHERS_KEYS = [
b'a3K8Bx%2r8Y7#xDh',
b'{yxAHAY_Lm6pbC/<'
]

_LOGGER = logging.getLogger(__name__)

IPAddr = Tuple[str, int]
Expand Down Expand Up @@ -43,6 +38,9 @@ class IPInterface:
class DeviceProtocolBase2(asyncio.DatagramProtocol):
"""Event driven device protocol class."""

_transport: Union[asyncio.transports.DatagramTransport, None] = None
_cipher: Union[CipherBase, None] = None

def __init__(self, timeout: int = 10, drained: asyncio.Event = None) -> None:
"""Initialize the device protocol object.
Expand All @@ -53,8 +51,6 @@ def __init__(self, timeout: int = 10, drained: asyncio.Event = None) -> None:
self._timeout: int = timeout
self._drained: asyncio.Event = drained or asyncio.Event()
self._drained.set()
self._transport: Union[asyncio.transports.DatagramTransport, None] = None
self._cipher: Union[CipherBase, None] = None

# This event need to be implemented to handle incoming requests
def packet_received(self, obj, addr: IPAddr) -> None:
Expand All @@ -76,6 +72,20 @@ def device_cipher(self, value: CipherBase):
"""Gets the encryption key used for device data."""
self._cipher = value

@property
def device_key(self) -> str:
"""Gets the encryption key used for device data."""
if self._cipher is None:
raise ValueError("Cipher object not set")
return self._cipher.key

@device_key.setter
def device_key(self, value: str):
"""Sets the encryption key used for device data."""
if self._cipher is None:
raise ValueError("Cipher object not set")
self._cipher.key = value

def close(self) -> None:
"""Close the UDP transport."""
try:
Expand Down Expand Up @@ -121,22 +131,28 @@ def datagram_received(self, data: bytes, addr: IPAddr) -> None:

obj = json.loads(data)

# It could be either a v1 or v2 key
cipher = CipherV1(GENERIC_CIPHERS_KEYS[0]) if obj.get("i") == 1 else self._cipher

if obj.get("pack"):
obj["pack"] = cipher.decrypt(obj["pack"])
obj["pack"] = self._cipher.decrypt(obj["pack"])

_LOGGER.debug("Received packet from %s:\n<- %s", addr[0], json.dumps(obj))
self.packet_received(obj, addr)

async def send(self, obj, addr: IPAddr = None) -> None:
"""Send encode and send JSON command to the device."""
async def send(self, obj, addr: IPAddr = None, cipher: Union[CipherBase, None] = None) -> None:
"""Send encode and send JSON command to the device.
Args:
addr (object): (Optional) Address to send the message
cipher (object): (Optional) Initial cipher to use for SCANNING and BINDING
"""
_LOGGER.debug("Sending packet:\n-> %s", json.dumps(obj))

if obj.get("pack"):
cipher = CipherV1(GENERIC_CIPHERS_KEYS[0]) if obj.get("i") == 1 else self._cipher
obj["pack"], tag = cipher.encrypt(obj["pack"])
if obj.get("i") == 1:
if cipher is None:
raise ValueError("Cipher must be supplied for SCAN or BIND messages")
self._cipher = cipher

obj["pack"], tag = self._cipher.encrypt(obj["pack"])
if tag:
obj["tag"] = tag

Expand All @@ -161,6 +177,7 @@ def connection_made(self, transport: asyncio.transports.DatagramTransport) -> No

class DeviceProtocol2(DeviceProtocolBase2):
"""Protocol handler for direct device communication."""
_handlers = {}

def __init__(self, timeout: int = 10, drained: asyncio.Event = None) -> None:
"""Initialize the device protocol object.
Expand All @@ -172,7 +189,6 @@ def __init__(self, timeout: int = 10, drained: asyncio.Event = None) -> None:
DeviceProtocolBase2.__init__(self, timeout, drained)
self._ready = asyncio.Event()
self._ready.clear()
self._handlers = {}

@property
def ready(self) -> asyncio.Event:
Expand Down
4 changes: 2 additions & 2 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from unittest.mock import Mock

from greeclimate.cipher import CipherV1, CipherBase
from greeclimate.network import GENERIC_CIPHERS_KEYS
from greeclimate.device import GENERIC_CIPHERS_KEYS

DEFAULT_TIMEOUT = 1
DISCOVERY_REQUEST = {"t": "scan"}
Expand Down Expand Up @@ -96,7 +96,7 @@ def get_mock_device_info():
def encrypt_payload(data):
"""Encrypt the payload of responses quickly."""
d = data.copy()
cipher = CipherV1(GENERIC_CIPHERS_KEYS[0])
cipher = CipherV1(GENERIC_CIPHERS_KEYS[CipherV1])
d["pack"], _ = cipher.encrypt(d["pack"])
return d

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def netifaces_fixture():
@pytest.fixture(name="cipher")
def cipher_fixture():
"""Patch the cipher object."""
with patch("greeclimate.network.CipherV1") as mock1, patch("greeclimate.cipher.CipherV2") as mock2:
with patch("greeclimate.device.CipherV1") as mock1, patch("greeclimate.device.CipherV2") as mock2:
mock1.return_value = FakeCipher(b"1234567890123456")
mock2.return_value = FakeCipher(b"1234567890123456")
yield
Loading

0 comments on commit 0319337

Please # to comment.