Source code for aioswitcher.tools

"""Switcher water heater unofficial API and bridge, Tools and Helpers.

.. codeauthor:: Tomer Figenblat <tomer.figenblat@gmail.com>

"""

# fmt: off
from asyncio import AbstractEventLoop
from binascii import crc_hqx, hexlify, unhexlify
from datetime import time as datetime_time
from datetime import timedelta
from struct import pack
from time import localtime, mktime, strftime, strptime
from time import time as time_time
from typing import List

from .consts import (ENCODING_CODEC, HANDLED_EXCEPTIONS, HEX_TO_DAY_DICT,
                     REMOTE_KEY, STRUCT_PACKING_FORMAT)
from .errors import CalculationError, DecodingError, EncodingError

# fmt: on


[docs]def _convert_seconds_to_iso_time(all_seconds: int) -> str: """Convert seconds to iso time. Args: all_seconds: the total number of seconds to convert. Return: A string represnting the converted iso time in %H:%M:%S format. e.g. "02:24:37". Raises: aioswitcher.erros.CalculationError: when failed to convert the argument. Note: This is a private function containing blocking code. Please consider using ``convert_seconds_to_iso_time`` (without the `_`), to schedule as a task in the event loop. """ try: minutes, seconds = divmod(int(all_seconds), 60) hours, minutes = divmod(minutes, 60) return datetime_time( hour=hours, minute=minutes, second=seconds ).isoformat() except HANDLED_EXCEPTIONS as ex: raise CalculationError("failed to convert seconds to iso time") from ex
[docs]async def convert_seconds_to_iso_time( loop: AbstractEventLoop, all_seconds: int ) -> str: """Asynchronous wrapper for _convert_seconds_to_iso_time. Use as async wrapper for calling _convert_seconds_to_iso_time, calculating the next runtime of the schedule. Args: loop: the event loop to execute the function in. all_seconds: the total number of seconds to convert. Returns: A string represnting the converted iso time in %H:%M:%S format. e.g. "02:24:37". """ return await loop.run_in_executor( None, _convert_seconds_to_iso_time, all_seconds )
[docs]def _crc_sign_full_packet_com_key(data: str) -> str: """Calculate the crc for packets before send. Args: data: packet data to sign. Return: The calculated and signed packet data. Raises: aioswitcher.erros.EncodingError: when failed to sign the packet. Note: This is a private function containing blocking code. Please consider using ``crc_sign_full_packet_com_key`` (without the `_`), to schedule as a task in the event loop. """ try: crc = hexlify(pack(">I", crc_hqx(unhexlify(data), 0x1021))).decode( ENCODING_CODEC ) data = data + crc[6:8] + crc[4:6] crc = ( crc[6:8] + crc[4:6] + (hexlify(REMOTE_KEY)).decode(ENCODING_CODEC) ) crc = hexlify(pack(">I", crc_hqx(unhexlify(crc), 0x1021))).decode( ENCODING_CODEC ) data = data + crc[6:8] + crc[4:6] return data except HANDLED_EXCEPTIONS as ex: raise EncodingError("failed to sign crc") from ex
[docs]async def crc_sign_full_packet_com_key( loop: AbstractEventLoop, data: str ) -> str: """Asynchronous wrapper for _crc_sign_full_packet_com_key. Use as async wrapper for calling _crc_sign_full_packet_com_key, performing crc sign to the packet data. Args: data: packet data to sign. Return: The calculated and signed packet data. """ return await loop.run_in_executor( None, _crc_sign_full_packet_com_key, data )
[docs]def _convert_minutes_to_timer(minutes: str) -> str: """Convert on-timer minutes to hexadecimal before sending. Args: minutes: on-timer minutes to convert. Return: Hexadecimal represntation of the mintues argument. Raises: aioswitcher.erros.EncodingError: when failed hexlify. Note: This is a private function containing blocking code. Please consider using ``convert_minutes_to_timer`` (without the `_`), to schedule as a task in the event loop. """ try: return hexlify(pack(STRUCT_PACKING_FORMAT, int(minutes) * 60)).decode( ENCODING_CODEC ) except HANDLED_EXCEPTIONS as ex: raise EncodingError( "failed to create timer from {} minutes".format(str(minutes)) ) from ex
[docs]async def convert_minutes_to_timer( loop: AbstractEventLoop, minutes: str ) -> str: """Asynchronous wrapper for _convert_minutes_to_timer. Use as async wrapper for calling _convert_minutes_to_timer, converting on-timer minutes to hexadecimal. Args: minutes: on-timer minutes to convert. Return: Hexadecimal represntation of the mintues argument. """ return await loop.run_in_executor(None, _convert_minutes_to_timer, minutes)
[docs]def _convert_timedelta_to_auto_off(full_time: timedelta) -> str: """Convert timedelta object for auto-shutdown to hexadecimal. Args: full_time: timedelta object represnting the auto-shutdown time. Return: Hexadecimal represntation of the full_time argument. Raises: aioswitcher.erros.EncodingError: when failed hexlify. Note: This is a private function containing blocking code. Please consider using ``convert_timedelta_to_auto_off`` (without the `_`), to schedule as a task in the event loop. """ try: minutes = full_time.total_seconds() / 60 hours, minutes = divmod(minutes, 60) seconds = int(hours) * 3600 + int(minutes) * 60 if 3599 < seconds < 86341: return hexlify(pack(STRUCT_PACKING_FORMAT, int(seconds))).decode( ENCODING_CODEC ) raise ValueError("can only handle 1 to 24 hours on auto-off set") except HANDLED_EXCEPTIONS as ex: raise EncodingError( "failed to create auto-off from {} timedelta".format( str(full_time) ) ) from ex
[docs]async def convert_timedelta_to_auto_off( loop: AbstractEventLoop, full_time: timedelta ) -> str: """Asynchronous wrapper for _convert_timedelta_to_auto_off. Use as async wrapper for calling _convert_timedelta_to_auto_off, converting timedelta auto-shutdown configuration to hexadecimal. Args: full_time: timedelta object represnting the auto-shutdown time. Return: Hexadecimal represntation of the full_time argument. """ return await loop.run_in_executor( None, _convert_timedelta_to_auto_off, full_time )
[docs]def _convert_string_to_device_name(name: str) -> str: """Convert string device name to hexadecimal before sending. Args: name: the desired name for conerting. Return: Hexadecimal represntation of the name argument. Raises: aioswitcher.erros.EncodingError: when failed hexlify. Note: This is a private function containing blocking code. Please consider using ``convert_string_to_device_name`` (without the `_`), to schedule as a task in the event loop. """ try: if 1 < len(name) < 33: return ( hexlify(name.encode(ENCODING_CODEC)) + ((32 - len(name)) * "00").encode(ENCODING_CODEC) ).decode(ENCODING_CODEC) raise ValueError("name length can vary from 2 to 32.") except HANDLED_EXCEPTIONS as ex: raise EncodingError( "failed to convert " + str(name) + " to device name" ) from ex
[docs]async def convert_string_to_device_name( loop: AbstractEventLoop, name: str ) -> str: """Asynchronous wrapper for _convert_string_to_device_name. Use as async wrapper for calling _convert_string_to_device_name, converting name to hexadecimal. Args: name: the desired name for conerting. Return: Hexadecimal represntation of the name argument. """ return await loop.run_in_executor( None, _convert_string_to_device_name, name )
[docs]def _get_days_list_from_bytes(data: int) -> List[str]: """Extract week days from shcedule bytes data. Args: data: bytes representing the days list. Return: List of string represntation the week days included in the list. See ``aioswitcher.consts`` for days literals. Raises: aioswitcher.erros.DecodingError: when failed to analyze the argument. Note: This is a private function containing blocking code. Please consider using ``get_days_list_from_bytes`` (without the `_`), to schedule as a task in the event loop. """ days_list = [] try: for day in HEX_TO_DAY_DICT: if day & data != 0x00: days_list.append(HEX_TO_DAY_DICT[day]) return days_list except HANDLED_EXCEPTIONS as ex: raise DecodingError( "failed to extract week days from schedule" ) from ex
[docs]async def get_days_list_from_bytes( loop: AbstractEventLoop, data: int ) -> List[str]: """Asynchronous wrapper for _get_days_list_from_bytes. Use as async wrapper for calling _get_days_list_from_bytes, extracting week days from shcedule bytes data. Args: data: bytes representing the days list. Return: List of string represntation the week days included in the list. See ``aioswitcher.consts`` for days literals. """ return await loop.run_in_executor(None, _get_days_list_from_bytes, data)
[docs]def _get_time_from_bytes(data: bytes) -> str: """Extract start/end time from schedule bytes. Args: data: bytes representing the start or the end time for the schedule. Return: Time string in %H:%M format. e.g. "20:30". Raises: aioswitcher.erros.DecodingError: when failed to analyze the argument. Note: This is a private function containing blocking code. Please consider using ``get_time_from_bytes`` (without the `_`), to schedule as a task in the event loop. """ try: timestamp = int(data[6:8] + data[4:6] + data[2:4] + data[0:2], 16) return strftime("%H:%M", localtime(timestamp)) except HANDLED_EXCEPTIONS as ex: raise DecodingError("failed to extract time from schedule") from ex
[docs]async def get_time_from_bytes(loop: AbstractEventLoop, data: bytes) -> str: """Asynchronous wrapper for _get_time_from_bytes. Use as async wrapper for calling _get_time_from_bytes, extracting start/end time from schedule bytes data. Args: data: bytes representing the start or the end time for the schedule. Return: Time string in %H:%M format. e.g. "20:30". """ return await loop.run_in_executor(None, _get_time_from_bytes, data)
[docs]def _get_timestamp() -> str: """Generate hexadecimal represntation of the current timestamp. Return: Hexadecimal represntation of the current unix time retrieved by ``time.time``. Raises: aioswitcher.erros.DecodingError: when failed to analyze the timestamp. Note: This is a private function containing blocking code. Please consider using ``get_timestamp`` (without the `_`), to schedule as a task in the event loop. """ try: return hexlify( pack(STRUCT_PACKING_FORMAT, int(round(time_time()))) ).decode(ENCODING_CODEC) except HANDLED_EXCEPTIONS as ex: raise DecodingError("failed to generate timestamp") from ex
[docs]async def get_timestamp(loop: AbstractEventLoop) -> str: """Asynchronous wrapper for _get_timestamp. Use as async wrapper for calling _get_timestamp, creating hexadecimal represntation of the current timestamp. Return: Hexadecimal represntation of the current unix time retrieved by ``time.time``. """ return await loop.run_in_executor(None, _get_timestamp)
[docs]def _create_weekdays_value(requested_days: List[int]) -> str: """Create hex value from list of requested days for schedule updating. Args: data: list of integers represnting the requested days. check ``aioswitcher.consts`` for the correct values. Return: Hexadecimal representation of the days list. Raises: aioswitcher.erros.EncodingError: when failed to convert the argument. Note: This is a private function containing blocking code. Please consider using ``create_weekdays_value`` (without the `_`), to schedule as a task in the event loop. """ try: if requested_days: return "{:02x}".format(int(sum(requested_days))) raise ValueError("days list is empty") except HANDLED_EXCEPTIONS as ex: raise EncodingError("failed to create weekdays value") from ex
[docs]async def create_weekdays_value( loop: AbstractEventLoop, requested_days: List[int] ) -> str: """Asynchronous wrapper for _create_weekdays_value. Use as async wrapper for calling _create_weekdays_value, creating hex value from list of requested days for schedule updating. Args: data: list of integers represnting the requested days. check ``aioswitcher.consts`` for the correct values. Return: Hexadecimal representation of the days list. """ return await loop.run_in_executor( None, _create_weekdays_value, requested_days )
[docs]def _timedelta_str_to_schedule_time(time_value: str) -> str: """Convert time string to schedule start/end time to hexadecimale. Args: data: time to convert. e.g. "21:00". Return: Hexadecimal representation of time_value argument. Raises: aioswitcher.erros.EncodingError: when failed to convert the argument. Note: This is a private function containing blocking code. Please consider using ``timedelta_str_to_schedule_time`` (without the `_`), to schedule as a task in the event loop. """ try: return_time = mktime( strptime( strftime("%d/%m/%Y") + " " + time_value.split(":")[0] + ":" + time_value.split(":")[1], "%d/%m/%Y %H:%M", ) ) return hexlify(pack(STRUCT_PACKING_FORMAT, int(return_time))).decode( ENCODING_CODEC ) except HANDLED_EXCEPTIONS as ex: raise EncodingError( "failed to convert time value to schedule time" ) from ex
[docs]async def timedelta_str_to_schedule_time( loop: AbstractEventLoop, time_value: str ) -> str: """Asynchronous wrapper for _timedelta_str_to_schedule_time. Use as async wrapper for calling _timedelta_str_to_schedule_time, converting time string to schedule start/end time to hexadecimale. Args: data: time to convert. e.g. "21:00". Return: Hexadecimal representation of time_value argument. """ return await loop.run_in_executor( None, _timedelta_str_to_schedule_time, time_value )