"""py_agua_iot provides controlling heating devices connected via the IOT Agua platform of Micronova """ import asyncio import jwt import logging import time import httpx from simpleeval import simple_eval _LOGGER = logging.getLogger(__name__) API_PATH_APP_SIGNUP = "/appSignup" API_PATH_LOGIN = "/userLogin" API_PATH_REFRESH_TOKEN = "/refreshToken" API_PATH_DEVICE_LIST = "/deviceList" API_PATH_DEVICE_INFO = "/deviceGetInfo" API_PATH_DEVICE_REGISTERS_MAP = "/deviceGetRegistersMap" API_PATH_DEVICE_BUFFER_READING = "/deviceGetBufferReading" API_PATH_DEVICE_JOB_STATUS = "/deviceJobStatus/" API_PATH_DEVICE_WRITING = "/deviceRequestWriting" HEADER_ACCEPT = "application/json, text/javascript, */*; q=0.01" HEADER_CONTENT_TYPE = "application/json" HEADER = {"Accept": HEADER_ACCEPT, "Content-Type": HEADER_CONTENT_TYPE} class aguaiot(object): def __init__( self, api_url, customer_code, email, password, unique_id, login_api_url=None, brand_id=None, brand=None, application_version="1.9.7", async_client=None, air_temp_fix=False, reading_error_fix=False, language="ENG", http_timeout=30, buffer_read_timeout=30, ): self.api_url = api_url.rstrip("/") self.customer_code = customer_code self.email = email self.password = password self.unique_id = unique_id self.brand_id = brand_id self.brand = brand self.login_api_url = login_api_url self.application_version = application_version self.token = None self.token_expires = None self.refresh_token = None self.devices = list() self.async_client = async_client self.http_timeout = http_timeout self.buffer_read_timeout = buffer_read_timeout # Vendor specific fixes self.air_temp_fix = air_temp_fix self.reading_error_fix = reading_error_fix self.language = language if not self.async_client: self.async_client = httpx.AsyncClient() async def connect(self): await self.register_app_id() await self.login() await self.fetch_devices() await self.fetch_device_information() def _headers(self): """Correctly set headers for requests to Agua IOT.""" headers = { "Accept": HEADER_ACCEPT, "Content-Type": HEADER_CONTENT_TYPE, "Origin": "file://", "id_brand": self.brand_id if self.brand_id is not None else "1", "customer_code": self.customer_code, } if self.brand is not None: headers["brand"] = self.brand return headers async def register_app_id(self): """Register app id with Agua IOT""" url = self.api_url + API_PATH_APP_SIGNUP payload = { "phone_type": "Android", "phone_id": self.unique_id, "phone_version": "1.0", "language": "en", "id_app": self.unique_id, "push_notification_token": self.unique_id, "push_notification_active": False, } try: _LOGGER.debug( "POST Register app - HEADERS: %s DATA: %s", self._headers(), payload ) async with self.async_client as client: response = await client.post( url, json=payload, headers=self._headers(), follow_redirects=False, timeout=self.http_timeout, ) _LOGGER.debug( "RESPONSE Register app - CODE: %s DATA: %s", response.status_code, response.text, ) except httpx.TransportError as e: raise AguaIOTConnectionError(f"Connection error to {url}: {e}") if response.status_code != 201: _LOGGER.error( "Failed to register app id. Code: %s, Response: %s", response.status_code, response.text, ) raise AguaIOTUnauthorized("Failed to register app id") return True async def login(self): """Authenticate with email and password to Agua IOT""" url = self.api_url + API_PATH_LOGIN payload = {"email": self.email, "password": self.password} extra_headers = {"local": "true", "Authorization": self.unique_id} headers = self._headers() headers.update(extra_headers) if self.login_api_url is not None: extra_login_headers = { "applicationversion": self.application_version, "url": API_PATH_LOGIN.lstrip("/"), "userid": "null", "aguaid": "null", } headers.update(extra_login_headers) url = self.login_api_url try: _LOGGER.debug("POST Login - HEADERS: %s DATA: ***", headers) async with self.async_client as client: response = await client.post( url, json=payload, headers=headers, follow_redirects=False, timeout=self.http_timeout, ) _LOGGER.debug( "RESPONSE Login - CODE: %s DATA: %s", response.status_code, response.text, ) except httpx.TransportError as e: raise AguaIOTConnectionError(f"Connection error to {url}: {e}") if response.status_code != 200: _LOGGER.error( "Failed to login. Code: %s, Response: %s", response.status_code, response.text, ) raise AguaIOTUnauthorized("Failed to login, please check credentials") res = response.json() self.token = res["token"] self.refresh_token = res["refresh_token"] claimset = jwt.decode( res["token"], options={"verify_signature": False}, algorithms=["none"] ) self.token_expires = claimset.get("exp") return True async def do_refresh_token(self): """Refresh auth token for Agua IOT""" url = self.api_url + API_PATH_REFRESH_TOKEN payload = {"refresh_token": self.refresh_token} try: _LOGGER.debug( "POST Refresh token - HEADERS: %s DATA: %s", self._headers(), payload ) async with self.async_client as client: response = await client.post( url, json=payload, headers=self._headers(), follow_redirects=False, timeout=self.http_timeout, ) _LOGGER.debug( "RESPONSE Refresh token - CODE: %s DATA: %s", response.status_code, response.text, ) except httpx.TransportError as e: raise AguaIOTConnectionError(f"Connection error to {url}: {e}") if response.status_code != 201: _LOGGER.warning("Refresh auth token failed, forcing new login...") await self.login() return res = response.json() self.token = res["token"] claimset = jwt.decode( res["token"], options={"verify_signature": False}, algorithms=["none"] ) self.token_expires = claimset.get("exp") return True async def fetch_devices(self): """Fetch heating devices""" url = self.api_url + API_PATH_DEVICE_LIST payload = {} res = await self.handle_webcall("POST", url, payload) if res is False: raise AguaIOTError("Error while fetching devices") for dev in res["device"]: url = self.api_url + API_PATH_DEVICE_INFO payload = {"id_device": dev["id_device"], "id_product": dev["id_product"]} res2 = await self.handle_webcall("POST", url, payload) if res2 is False: raise AguaIOTError("Error while fetching device info") self.devices.append( Device( dev["id"], dev["id_device"], dev["id_product"], dev["product_serial"], dev["name"], dev["is_online"], dev["name_product"], res2["device_info"][0]["id_registers_map"], self, ) ) async def fetch_device_information(self): """Fetch device information of heating devices""" for dev in self.devices: await dev.update_mapping() async def update(self): for dev in self.devices: await dev.update() async def handle_webcall(self, method, url, payload): if time.time() > self.token_expires: await self.do_refresh_token() extra_headers = {"local": "false", "Authorization": self.token} headers = self._headers() headers.update(extra_headers) try: _LOGGER.debug("%s %s - HEADERS: %s DATA: %s", method, url, headers, payload) if method == "POST": async with self.async_client as client: response = await client.post( url, json=payload, headers=headers, follow_redirects=False, timeout=self.http_timeout, ) else: async with self.async_client as client: response = await client.get( url, params=payload, headers=headers, follow_redirects=False, timeout=self.http_timeout, ) _LOGGER.debug( "RESPONSE %s - CODE: %s DATA: %s", url, response.status_code, response.text, ) except httpx.TransportError as e: raise AguaIOTConnectionError(f"Connection error to {url}: {e}") if response.status_code == 401: await self.do_refresh_token() return await self.handle_webcall(method, url, payload) elif response.status_code != 200: _LOGGER.error( "Webcall failed. Code: %s, Response: %s", response.status_code, response.text, ) return False return response.json() class Device(object): """Agua IOT heating device representation""" def __init__( self, id, id_device, id_product, product_serial, name, is_online, name_product, id_registers_map, aguaiot, ): self.id = id self.id_device = id_device self.id_product = id_product self.product_serial = product_serial self.name = name self.is_online = is_online self.name_product = name_product self.id_registers_map = id_registers_map self.__aguaiot = aguaiot self.__register_map_dict = dict() self.__information_dict = dict() async def update_mapping(self): await self.__update_device_registers_mapping() async def update(self): await self.__update_device_information() async def __update_device_registers_mapping(self): url = self.__aguaiot.api_url + API_PATH_DEVICE_REGISTERS_MAP registers = dict() payload = { "id_device": self.id_device, "id_product": self.id_product, "last_update": "2018-06-03T08:59:54.043", } res = await self.__aguaiot.handle_webcall("POST", url, payload) if res is False: raise AguaIOTError("Error while fetching registers map") for registers_map in res["device_registers_map"]["registers_map"]: if registers_map["id"] == self.id_registers_map: registers = { reg["reg_key"].lower(): reg for reg in registers_map["registers"] } self.__register_map_dict = registers async def __update_device_information(self): url = self.__aguaiot.api_url + API_PATH_DEVICE_BUFFER_READING payload = { "id_device": self.id_device, "id_product": self.id_product, "BufferId": 1, } res_req = await self.__aguaiot.handle_webcall("POST", url, payload) if res_req is False: raise AguaIOTError("Error while making device buffer read request.") async def buffer_read_loop(id_request): url = self.__aguaiot.api_url + API_PATH_DEVICE_JOB_STATUS + id_request sleep_secs = 1 attempts = 1 try: while True: await asyncio.sleep(sleep_secs) _LOGGER.debug("BUFFER READ (%s) ATTEMPT %s", id_request, attempts) res_get = await self.__aguaiot.handle_webcall("GET", url, {}) _LOGGER.debug( "BUFFER READ (%s) STATUS: %s", id_request, res_get.get("jobAnswerStatus"), ) if res_get.get("jobAnswerStatus") != "waiting": return res_get sleep_secs += 1 attempts += 1 except asyncio.CancelledError: raise try: res = await asyncio.wait_for( buffer_read_loop(res_req["idRequest"]), self.__aguaiot.buffer_read_timeout, ) except asyncio.TimeoutError: raise AguaIOTUpdateError( f"Timeout on waiting device buffer read to complete within {self.__aguaiot.buffer_read_timeout} seconds." ) if not res: raise AguaIOTUpdateError("Error while reading device buffer response.") if res.get("jobAnswerStatus") == "completed": current_i = 0 information_dict = dict() try: for item in res["jobAnswerData"]["Items"]: information_dict.update( {item: res["jobAnswerData"]["Values"][current_i]} ) current_i = current_i + 1 except KeyError: raise AguaIOTUpdateError("Error in data received from device.") self.__information_dict = information_dict else: raise AguaIOTUpdateError( f"Received unexpected 'jobAnswerStatus' while reading buffers: {res.get('jobAnswerStatus')}" ) def __prepare_value_for_writing(self, item, value, limit_value_raw=False): set_min = self.__register_map_dict[item]["set_min"] set_max = self.__register_map_dict[item]["set_max"] if not limit_value_raw and (float(value) < set_min or float(value) > set_max): raise ValueError(f"Value must be between {set_min} and {set_max}: {value}") formula = self.__register_map_dict[item]["formula_inverse"] formula = formula.replace("#", str(value)) formula = formula.replace("Mod", "%") eval_formula = simple_eval( formula, functions={"IF": lambda a, b, c: b if a else c, "int": lambda a: int(a)}, ) value = int(eval_formula) if limit_value_raw and (float(value) < set_min or float(value) > set_max): raise ValueError( f"Raw value must be between {set_min} and {set_max}: {value}" ) if self.__register_map_dict[item]["is_hex"]: value = int(f"0x{value}", 16) return value async def __request_writing(self, items): url = self.__aguaiot.api_url + API_PATH_DEVICE_WRITING set_items = [] set_masks = [] set_bits = [] set_endians = [] set_values = [] for key in items: set_items.append(int(self.__register_map_dict[key]["offset"])) set_masks.append(int(self.__register_map_dict[key]["mask"])) set_values.append(items[key]) set_bits.append(8) set_endians.append("L") payload = { "id_device": self.id_device, "id_product": self.id_product, "Protocol": "RWMSmaster", "BitData": set_bits, "Endianess": set_endians, "Items": set_items, "Masks": set_masks, "Values": set_values, } res = await self.__aguaiot.handle_webcall("POST", url, payload) if res is False: raise AguaIOTError("Error while request device writing") id_request = res["idRequest"] url = self.__aguaiot.api_url + API_PATH_DEVICE_JOB_STATUS + id_request payload = {} retry_count = 0 res = await self.__aguaiot.handle_webcall("GET", url, payload) while ( res is False or res["jobAnswerStatus"] != "completed" ) and retry_count < 10: await asyncio.sleep(1) res = await self.__aguaiot.handle_webcall("GET", url, payload) retry_count = retry_count + 1 if ( res is False or res["jobAnswerStatus"] != "completed" or "Cmd" not in res["jobAnswerData"] ): raise AguaIOTError("Error while request device writing") @property def registers(self): return list(self.__register_map_dict.keys()) def get_register(self, key): register = self.__register_map_dict.get(key, {}) try: register["value_raw"] = str( self.__information_dict[register["offset"]] & register["mask"] ) formula = register["formula"].replace("#", register["value_raw"]) formula = formula.replace("Mod", "%") register["value"] = simple_eval( formula, functions={ "IF": lambda a, b, c: b if a else c, "int": lambda a: int(a), }, ) except (KeyError, ValueError): pass return register def get_register_value(self, key): value = self.get_register(key).get("value") # Fix for reading errors from wifi module if ( self.__aguaiot.reading_error_fix and int(self.get_register(key).get("value_raw", 0)) == 32768 ): _LOGGER.debug( f"Applied reading_error_fix. Dropped value {value} for register {key}" ) return # Fix for stoves abusing air temp register if ( self.__aguaiot.air_temp_fix and key.endswith("air_get") and value and int(value) > 100 ): _LOGGER.debug( f"Applied air_temp_fix. Dropped value {value} for register {key}" ) return return value def get_register_value_min(self, key): return self.get_register(key).get("set_min") def get_register_value_max(self, key): return self.get_register(key).get("set_max") def get_register_value_formatted(self, key): return str.format( self.get_register(key).get("format_string"), self.get_register(key).get("value"), ) def get_register_value_description(self, key, language=None): options = self.get_register_value_options(key, language) if options: return options.get( self.get_register_value(key), self.get_register_value(key) ) else: return self.get_register_value(key) def get_register_value_options(self, key, language=None): if "enc_val" in self.get_register(key): lang = language if language else self.__aguaiot.language if lang not in self.get_register_value_options_languages(key): lang = "ENG" return { item["value"]: item["description"] for item in self.get_register(key).get("enc_val") if item["lang"] == lang } return {} def get_register_value_options_languages(self, key): if "enc_val" in self.get_register(key): return {item["lang"] for item in self.get_register(key).get("enc_val")} return set() def get_register_enabled(self, key): enable_key = key.rsplit("_", 1)[0] + "_enable" if enable_key not in self.registers or not self.get_register(enable_key): # Always enabled if no enable register present return True if self.get_register(enable_key).get("reg_type") != "ENABLE": raise AguaIOTError(f"Not a register of type ENABLE: {key}") if "enable_val" in self.get_register(enable_key): enabled_values = [ d["value"] for d in self.get_register(enable_key).get("enable_val") ] return self.get_register_value(enable_key) in enabled_values else: return self.get_register_value(enable_key) == 1 async def set_register_value(self, key, value, limit_value_raw=False): if value is None: raise AguaIOTError(f"Error while trying to set '{key}' to None") value = self.__prepare_value_for_writing( key, value, limit_value_raw=limit_value_raw ) items = {key: value} try: await self.__request_writing(items) except AguaIOTError: raise AguaIOTError(f"Error while trying to set: key={key} value={value}") async def set_register_values(self, items, limit_value_raw=False): for key in items: items[key] = self.__prepare_value_for_writing( key, items[key], limit_value_raw=limit_value_raw ) try: await self.__request_writing(items) except AguaIOTError: raise AguaIOTError(f"Error while trying to set: items={items}") async def set_register_value_description( self, key, value_description, value_fallback=None, language=None ): try: options = self.get_register_value_options(key, language) value = list(options.keys())[ list(options.values()).index(value_description) ] except (AttributeError, ValueError): value = value_description try: value = float(value) except ValueError: value = value_fallback await self.set_register_value(key, value) class AguaIOTError(Exception): """Exception type for Agua IOT""" def __init__(self, message): Exception.__init__(self, message) class AguaIOTUnauthorized(AguaIOTError): """Unauthorized""" def __init__(self, message): super().__init__(message) class AguaIOTConnectionError(AguaIOTError): """Connection error""" def __init__(self, message): super().__init__(message) class AguaIOTUpdateError(AguaIOTError): """Update error""" def __init__(self, message): super().__init__(message)