Files

712 lines
23 KiB
Python

"""py_agua_iot provides controlling heating devices connected via
the IOT Agua platform of Micronova
"""
import asyncio
import jwt
import logging
import time
import httpx
from simpleeval import simple_eval
_LOGGER = logging.getLogger(__name__)
API_PATH_APP_SIGNUP = "/appSignup"
API_PATH_LOGIN = "/userLogin"
API_PATH_REFRESH_TOKEN = "/refreshToken"
API_PATH_DEVICE_LIST = "/deviceList"
API_PATH_DEVICE_INFO = "/deviceGetInfo"
API_PATH_DEVICE_REGISTERS_MAP = "/deviceGetRegistersMap"
API_PATH_DEVICE_BUFFER_READING = "/deviceGetBufferReading"
API_PATH_DEVICE_JOB_STATUS = "/deviceJobStatus/"
API_PATH_DEVICE_WRITING = "/deviceRequestWriting"
HEADER_ACCEPT = "application/json, text/javascript, */*; q=0.01"
HEADER_CONTENT_TYPE = "application/json"
HEADER = {"Accept": HEADER_ACCEPT, "Content-Type": HEADER_CONTENT_TYPE}
class aguaiot(object):
def __init__(
self,
api_url,
customer_code,
email,
password,
unique_id,
login_api_url=None,
brand_id=None,
brand=None,
application_version="1.9.7",
async_client=None,
air_temp_fix=False,
reading_error_fix=False,
language="ENG",
http_timeout=30,
buffer_read_timeout=30,
):
self.api_url = api_url.rstrip("/")
self.customer_code = customer_code
self.email = email
self.password = password
self.unique_id = unique_id
self.brand_id = brand_id
self.brand = brand
self.login_api_url = login_api_url
self.application_version = application_version
self.token = None
self.token_expires = None
self.refresh_token = None
self.devices = list()
self.async_client = async_client
self.http_timeout = http_timeout
self.buffer_read_timeout = buffer_read_timeout
# Vendor specific fixes
self.air_temp_fix = air_temp_fix
self.reading_error_fix = reading_error_fix
self.language = language
if not self.async_client:
self.async_client = httpx.AsyncClient()
async def connect(self):
await self.register_app_id()
await self.login()
await self.fetch_devices()
await self.fetch_device_information()
def _headers(self):
"""Correctly set headers for requests to Agua IOT."""
headers = {
"Accept": HEADER_ACCEPT,
"Content-Type": HEADER_CONTENT_TYPE,
"Origin": "file://",
"id_brand": self.brand_id if self.brand_id is not None else "1",
"customer_code": self.customer_code,
}
if self.brand is not None:
headers["brand"] = self.brand
return headers
async def register_app_id(self):
"""Register app id with Agua IOT"""
url = self.api_url + API_PATH_APP_SIGNUP
payload = {
"phone_type": "Android",
"phone_id": self.unique_id,
"phone_version": "1.0",
"language": "en",
"id_app": self.unique_id,
"push_notification_token": self.unique_id,
"push_notification_active": False,
}
try:
_LOGGER.debug(
"POST Register app - HEADERS: %s DATA: %s", self._headers(), payload
)
async with self.async_client as client:
response = await client.post(
url,
json=payload,
headers=self._headers(),
follow_redirects=False,
timeout=self.http_timeout,
)
_LOGGER.debug(
"RESPONSE Register app - CODE: %s DATA: %s",
response.status_code,
response.text,
)
except httpx.TransportError as e:
raise AguaIOTConnectionError(f"Connection error to {url}: {e}")
if response.status_code != 201:
_LOGGER.error(
"Failed to register app id. Code: %s, Response: %s",
response.status_code,
response.text,
)
raise AguaIOTUnauthorized("Failed to register app id")
return True
async def login(self):
"""Authenticate with email and password to Agua IOT"""
url = self.api_url + API_PATH_LOGIN
payload = {"email": self.email, "password": self.password}
extra_headers = {"local": "true", "Authorization": self.unique_id}
headers = self._headers()
headers.update(extra_headers)
if self.login_api_url is not None:
extra_login_headers = {
"applicationversion": self.application_version,
"url": API_PATH_LOGIN.lstrip("/"),
"userid": "null",
"aguaid": "null",
}
headers.update(extra_login_headers)
url = self.login_api_url
try:
_LOGGER.debug("POST Login - HEADERS: %s DATA: ***", headers)
async with self.async_client as client:
response = await client.post(
url,
json=payload,
headers=headers,
follow_redirects=False,
timeout=self.http_timeout,
)
_LOGGER.debug(
"RESPONSE Login - CODE: %s DATA: %s",
response.status_code,
response.text,
)
except httpx.TransportError as e:
raise AguaIOTConnectionError(f"Connection error to {url}: {e}")
if response.status_code != 200:
_LOGGER.error(
"Failed to login. Code: %s, Response: %s",
response.status_code,
response.text,
)
raise AguaIOTUnauthorized("Failed to login, please check credentials")
res = response.json()
self.token = res["token"]
self.refresh_token = res["refresh_token"]
claimset = jwt.decode(
res["token"], options={"verify_signature": False}, algorithms=["none"]
)
self.token_expires = claimset.get("exp")
return True
async def do_refresh_token(self):
"""Refresh auth token for Agua IOT"""
url = self.api_url + API_PATH_REFRESH_TOKEN
payload = {"refresh_token": self.refresh_token}
try:
_LOGGER.debug(
"POST Refresh token - HEADERS: %s DATA: %s", self._headers(), payload
)
async with self.async_client as client:
response = await client.post(
url,
json=payload,
headers=self._headers(),
follow_redirects=False,
timeout=self.http_timeout,
)
_LOGGER.debug(
"RESPONSE Refresh token - CODE: %s DATA: %s",
response.status_code,
response.text,
)
except httpx.TransportError as e:
raise AguaIOTConnectionError(f"Connection error to {url}: {e}")
if response.status_code != 201:
_LOGGER.warning("Refresh auth token failed, forcing new login...")
await self.login()
return
res = response.json()
self.token = res["token"]
claimset = jwt.decode(
res["token"], options={"verify_signature": False}, algorithms=["none"]
)
self.token_expires = claimset.get("exp")
return True
async def fetch_devices(self):
"""Fetch heating devices"""
url = self.api_url + API_PATH_DEVICE_LIST
payload = {}
res = await self.handle_webcall("POST", url, payload)
if res is False:
raise AguaIOTError("Error while fetching devices")
for dev in res["device"]:
url = self.api_url + API_PATH_DEVICE_INFO
payload = {"id_device": dev["id_device"], "id_product": dev["id_product"]}
res2 = await self.handle_webcall("POST", url, payload)
if res2 is False:
raise AguaIOTError("Error while fetching device info")
self.devices.append(
Device(
dev["id"],
dev["id_device"],
dev["id_product"],
dev["product_serial"],
dev["name"],
dev["is_online"],
dev["name_product"],
res2["device_info"][0]["id_registers_map"],
self,
)
)
async def fetch_device_information(self):
"""Fetch device information of heating devices"""
for dev in self.devices:
await dev.update_mapping()
async def update(self):
for dev in self.devices:
await dev.update()
async def handle_webcall(self, method, url, payload):
if time.time() > self.token_expires:
await self.do_refresh_token()
extra_headers = {"local": "false", "Authorization": self.token}
headers = self._headers()
headers.update(extra_headers)
try:
_LOGGER.debug("%s %s - HEADERS: %s DATA: %s", method, url, headers, payload)
if method == "POST":
async with self.async_client as client:
response = await client.post(
url,
json=payload,
headers=headers,
follow_redirects=False,
timeout=self.http_timeout,
)
else:
async with self.async_client as client:
response = await client.get(
url,
params=payload,
headers=headers,
follow_redirects=False,
timeout=self.http_timeout,
)
_LOGGER.debug(
"RESPONSE %s - CODE: %s DATA: %s",
url,
response.status_code,
response.text,
)
except httpx.TransportError as e:
raise AguaIOTConnectionError(f"Connection error to {url}: {e}")
if response.status_code == 401:
await self.do_refresh_token()
return await self.handle_webcall(method, url, payload)
elif response.status_code != 200:
_LOGGER.error(
"Webcall failed. Code: %s, Response: %s",
response.status_code,
response.text,
)
return False
return response.json()
class Device(object):
"""Agua IOT heating device representation"""
def __init__(
self,
id,
id_device,
id_product,
product_serial,
name,
is_online,
name_product,
id_registers_map,
aguaiot,
):
self.id = id
self.id_device = id_device
self.id_product = id_product
self.product_serial = product_serial
self.name = name
self.is_online = is_online
self.name_product = name_product
self.id_registers_map = id_registers_map
self.__aguaiot = aguaiot
self.__register_map_dict = dict()
self.__information_dict = dict()
async def update_mapping(self):
await self.__update_device_registers_mapping()
async def update(self):
await self.__update_device_information()
async def __update_device_registers_mapping(self):
url = self.__aguaiot.api_url + API_PATH_DEVICE_REGISTERS_MAP
registers = dict()
payload = {
"id_device": self.id_device,
"id_product": self.id_product,
"last_update": "2018-06-03T08:59:54.043",
}
res = await self.__aguaiot.handle_webcall("POST", url, payload)
if res is False:
raise AguaIOTError("Error while fetching registers map")
for registers_map in res["device_registers_map"]["registers_map"]:
if registers_map["id"] == self.id_registers_map:
registers = {
reg["reg_key"].lower(): reg for reg in registers_map["registers"]
}
self.__register_map_dict = registers
async def __update_device_information(self):
url = self.__aguaiot.api_url + API_PATH_DEVICE_BUFFER_READING
payload = {
"id_device": self.id_device,
"id_product": self.id_product,
"BufferId": 1,
}
res_req = await self.__aguaiot.handle_webcall("POST", url, payload)
if res_req is False:
raise AguaIOTError("Error while making device buffer read request.")
async def buffer_read_loop(id_request):
url = self.__aguaiot.api_url + API_PATH_DEVICE_JOB_STATUS + id_request
sleep_secs = 1
attempts = 1
try:
while True:
await asyncio.sleep(sleep_secs)
_LOGGER.debug("BUFFER READ (%s) ATTEMPT %s", id_request, attempts)
res_get = await self.__aguaiot.handle_webcall("GET", url, {})
_LOGGER.debug(
"BUFFER READ (%s) STATUS: %s",
id_request,
res_get.get("jobAnswerStatus"),
)
if res_get.get("jobAnswerStatus") != "waiting":
return res_get
sleep_secs += 1
attempts += 1
except asyncio.CancelledError:
raise
try:
res = await asyncio.wait_for(
buffer_read_loop(res_req["idRequest"]),
self.__aguaiot.buffer_read_timeout,
)
except asyncio.TimeoutError:
raise AguaIOTUpdateError(
f"Timeout on waiting device buffer read to complete within {self.__aguaiot.buffer_read_timeout} seconds."
)
if not res:
raise AguaIOTUpdateError("Error while reading device buffer response.")
if res.get("jobAnswerStatus") == "completed":
current_i = 0
information_dict = dict()
try:
for item in res["jobAnswerData"]["Items"]:
information_dict.update(
{item: res["jobAnswerData"]["Values"][current_i]}
)
current_i = current_i + 1
except KeyError:
raise AguaIOTUpdateError("Error in data received from device.")
self.__information_dict = information_dict
else:
raise AguaIOTUpdateError(
f"Received unexpected 'jobAnswerStatus' while reading buffers: {res.get('jobAnswerStatus')}"
)
def __prepare_value_for_writing(self, item, value, limit_value_raw=False):
set_min = self.__register_map_dict[item]["set_min"]
set_max = self.__register_map_dict[item]["set_max"]
if not limit_value_raw and (float(value) < set_min or float(value) > set_max):
raise ValueError(f"Value must be between {set_min} and {set_max}: {value}")
formula = self.__register_map_dict[item]["formula_inverse"]
formula = formula.replace("#", str(value))
formula = formula.replace("Mod", "%")
eval_formula = simple_eval(
formula,
functions={"IF": lambda a, b, c: b if a else c, "int": lambda a: int(a)},
)
value = int(eval_formula)
if limit_value_raw and (float(value) < set_min or float(value) > set_max):
raise ValueError(
f"Raw value must be between {set_min} and {set_max}: {value}"
)
if self.__register_map_dict[item]["is_hex"]:
value = int(f"0x{value}", 16)
return value
async def __request_writing(self, items):
url = self.__aguaiot.api_url + API_PATH_DEVICE_WRITING
set_items = []
set_masks = []
set_bits = []
set_endians = []
set_values = []
for key in items:
set_items.append(int(self.__register_map_dict[key]["offset"]))
set_masks.append(int(self.__register_map_dict[key]["mask"]))
set_values.append(items[key])
set_bits.append(8)
set_endians.append("L")
payload = {
"id_device": self.id_device,
"id_product": self.id_product,
"Protocol": "RWMSmaster",
"BitData": set_bits,
"Endianess": set_endians,
"Items": set_items,
"Masks": set_masks,
"Values": set_values,
}
res = await self.__aguaiot.handle_webcall("POST", url, payload)
if res is False:
raise AguaIOTError("Error while request device writing")
id_request = res["idRequest"]
url = self.__aguaiot.api_url + API_PATH_DEVICE_JOB_STATUS + id_request
payload = {}
retry_count = 0
res = await self.__aguaiot.handle_webcall("GET", url, payload)
while (
res is False or res["jobAnswerStatus"] != "completed"
) and retry_count < 10:
await asyncio.sleep(1)
res = await self.__aguaiot.handle_webcall("GET", url, payload)
retry_count = retry_count + 1
if (
res is False
or res["jobAnswerStatus"] != "completed"
or "Cmd" not in res["jobAnswerData"]
):
raise AguaIOTError("Error while request device writing")
@property
def registers(self):
return list(self.__register_map_dict.keys())
def get_register(self, key):
register = self.__register_map_dict.get(key, {})
try:
register["value_raw"] = str(
self.__information_dict[register["offset"]] & register["mask"]
)
formula = register["formula"].replace("#", register["value_raw"])
formula = formula.replace("Mod", "%")
register["value"] = simple_eval(
formula,
functions={
"IF": lambda a, b, c: b if a else c,
"int": lambda a: int(a),
},
)
except (KeyError, ValueError):
pass
return register
def get_register_value(self, key):
value = self.get_register(key).get("value")
# Fix for reading errors from wifi module
if (
self.__aguaiot.reading_error_fix
and int(self.get_register(key).get("value_raw", 0)) == 32768
):
_LOGGER.debug(
f"Applied reading_error_fix. Dropped value {value} for register {key}"
)
return
# Fix for stoves abusing air temp register
if (
self.__aguaiot.air_temp_fix
and key.endswith("air_get")
and value
and int(value) > 100
):
_LOGGER.debug(
f"Applied air_temp_fix. Dropped value {value} for register {key}"
)
return
return value
def get_register_value_min(self, key):
return self.get_register(key).get("set_min")
def get_register_value_max(self, key):
return self.get_register(key).get("set_max")
def get_register_value_formatted(self, key):
return str.format(
self.get_register(key).get("format_string"),
self.get_register(key).get("value"),
)
def get_register_value_description(self, key, language=None):
options = self.get_register_value_options(key, language)
if options:
return options.get(
self.get_register_value(key), self.get_register_value(key)
)
else:
return self.get_register_value(key)
def get_register_value_options(self, key, language=None):
if "enc_val" in self.get_register(key):
lang = language if language else self.__aguaiot.language
if lang not in self.get_register_value_options_languages(key):
lang = "ENG"
return {
item["value"]: item["description"]
for item in self.get_register(key).get("enc_val")
if item["lang"] == lang
}
return {}
def get_register_value_options_languages(self, key):
if "enc_val" in self.get_register(key):
return {item["lang"] for item in self.get_register(key).get("enc_val")}
return set()
def get_register_enabled(self, key):
enable_key = key.rsplit("_", 1)[0] + "_enable"
if enable_key not in self.registers or not self.get_register(enable_key):
# Always enabled if no enable register present
return True
if self.get_register(enable_key).get("reg_type") != "ENABLE":
raise AguaIOTError(f"Not a register of type ENABLE: {key}")
if "enable_val" in self.get_register(enable_key):
enabled_values = [
d["value"] for d in self.get_register(enable_key).get("enable_val")
]
return self.get_register_value(enable_key) in enabled_values
else:
return self.get_register_value(enable_key) == 1
async def set_register_value(self, key, value, limit_value_raw=False):
if value is None:
raise AguaIOTError(f"Error while trying to set '{key}' to None")
value = self.__prepare_value_for_writing(
key, value, limit_value_raw=limit_value_raw
)
items = {key: value}
try:
await self.__request_writing(items)
except AguaIOTError:
raise AguaIOTError(f"Error while trying to set: key={key} value={value}")
async def set_register_values(self, items, limit_value_raw=False):
for key in items:
items[key] = self.__prepare_value_for_writing(
key, items[key], limit_value_raw=limit_value_raw
)
try:
await self.__request_writing(items)
except AguaIOTError:
raise AguaIOTError(f"Error while trying to set: items={items}")
async def set_register_value_description(
self, key, value_description, value_fallback=None, language=None
):
try:
options = self.get_register_value_options(key, language)
value = list(options.keys())[
list(options.values()).index(value_description)
]
except (AttributeError, ValueError):
value = value_description
try:
value = float(value)
except ValueError:
value = value_fallback
await self.set_register_value(key, value)
class AguaIOTError(Exception):
"""Exception type for Agua IOT"""
def __init__(self, message):
Exception.__init__(self, message)
class AguaIOTUnauthorized(AguaIOTError):
"""Unauthorized"""
def __init__(self, message):
super().__init__(message)
class AguaIOTConnectionError(AguaIOTError):
"""Connection error"""
def __init__(self, message):
super().__init__(message)
class AguaIOTUpdateError(AguaIOTError):
"""Update error"""
def __init__(self, message):
super().__init__(message)