"""Switcher water heater unofficial API and bridge, Network protocols.

.. codeauthor:: Tomer Figenblat <>


# fmt: off
from asyncio import (
    AbstractEventLoop, BaseTransport, DatagramProtocol, Event, Future, Queue,
    QueueEmpty, QueueFull, Transport, ensure_future)
from datetime import datetime
from functools import partial
from typing import Optional, Tuple, Union, cast

from .bridge.messages import SwitcherV2BroadcastMSG
from .devices import SwitcherV2Device

# fmt: on

[docs]class SwitcherV2UdpProtocolFactory(DatagramProtocol): """Represntation of the Asyncio UDP protocol factory. Args: loop: the event loop for the factory to run in. phone_id: the phone id of the desired device. device_id: the id of the desired device. device_password: the password of the desired device. queue: a ``asyncio.Queue`` for the factory to save messages in. run_factory_evt ``asyncio.Event`` for signaling the factory to run. Todo: * replace ``factory_future`` attribute with ``get_factory_future`` method. """ def __init__( self, loop: AbstractEventLoop, phone_id: str, device_id: str, device_password: str, queue: Queue, run_factory_evt: Event, ) -> None: """Initialize the protocol.""" self._loop = loop self._phone_id = phone_id self._device_id = device_id self._device_password = device_password self._queue = queue self._run_factory = run_factory_evt self._factory_future = self._loop.create_future() self._factory_future.add_done_callback(self.close_transport) self._accept_datagrams = Event() self.transport = None # type: Optional[Transport] self._device = None # type: Optional[SwitcherV2Device]
[docs] def connection_made(self, transport: BaseTransport) -> None: """Call on connection established.""" self.transport = cast(Transport, transport) self._accept_datagrams.set()
[docs] def datagram_received(self, data: Union[bytes, str], addr: Tuple) -> None: """Call on datagram recieved.""" if self._run_factory.is_set() and self._accept_datagrams.is_set(): ensure_future( self.handle_incoming_messages(data, addr), loop=self._loop )
[docs] def error_received(self, exc: Optional[Exception]) -> None: """Call on exception recieved.""" if exc: self.factory_future.set_exception(exc) else: self.factory_future.set_result(None)
[docs] def connection_lost(self, exc: Optional[Exception]) -> None: """Call on connection lost.""" self.factory_future.set_result(exc if exc else None)
[docs] def close_transport(self, future: Future) -> None: """Call for closing the transport.""" if self.transport: self.transport.close()
[docs] async def handle_incoming_messages( self, data: Union[bytes, str], addr: Tuple ) -> None: """Use for Handling incoming messages.""" self._accept_datagrams.clear() msg = SwitcherV2BroadcastMSG(self._loop, data) msg.init_future.add_done_callback( partial(self.get_device_from_message, addr[0]) ) return None
[docs] def get_device_from_message(self, ip_addr: str, future: Future) -> None: """Use for extracting the device from the broadcast message.""" msg = future.result() if msg.verified: if self._device_id == msg.device_id: if self._device: # Update known device self._device.update_device_data( ip_addr,, msg.device_state, msg.remaining_time_to_off, msg.auto_off_set, msg.power, msg.current, ( self._device.last_state_change if msg.device_state == self._device.state else ), ) else: # New device discoverd self._device = SwitcherV2Device( msg.device_id, ip_addr, msg.mac_address,, msg.device_state, msg.remaining_time_to_off, msg.auto_off_set, msg.power, msg.current, self._phone_id.lower(), self._device_password.lower(),, ) try: self._queue.put_nowait(self._device) except QueueFull: try: self._queue.get_nowait() except QueueEmpty: pass ensure_future( self._queue.put(self._device), loop=self._loop ) self._accept_datagrams.set()
@property def factory_future(self) -> Future: """asyncio.Future: Representing the initialization status.""" return self._factory_future