diff --git a/.gitignore b/.gitignore index 71be0c3..a876294 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ __pycache__ .vscode/ # Ignore Mac system files .DS_Store +# Ignore devcontainer for now (should this go in source control?) +.devcontainer \ No newline at end of file diff --git a/example.py b/example.py index d72a1d6..a0c4ebb 100755 --- a/example.py +++ b/example.py @@ -9,6 +9,7 @@ import re import os.path import logging from typing import List +from pprint import pprint STATE_FILE = "wideq_state.json" LOGGER = logging.getLogger("wideq.example") @@ -34,6 +35,13 @@ def ls(client): print("{0.id}: {0.name} ({0.type.name} {0.model_id})".format(device)) +def info(client, device_id): + """Dump info on a device.""" + + device = client.get_device(device_id) + pprint(vars(device), indent=4, width=1) + + def gen_mon(client, device_id): """Monitor any other device but AC device, displaying generic information about its status. @@ -51,9 +59,11 @@ def gen_mon(client, device_id): if data: try: res = model.decode_monitor(data) + print(res) except ValueError: print("status data: {!r}".format(data)) - else: + """ + else: for key, value in res.items(): try: desc = model.value(key) @@ -66,13 +76,9 @@ def gen_mon(client, device_id): ) ) elif isinstance(desc, wideq.RangeValue): - print( - "- {0}: {1} ({2.min}-{2.max})".format( - key, - value, - desc, - ) - ) + print('- {0}: {1} ({2.min}-{2.max})'.format( + key, value, desc, + )) """ except KeyboardInterrupt: pass @@ -204,6 +210,7 @@ EXAMPLE_COMMANDS = { "set-temp-freezer": set_temp_freezer, "turn": turn, "ac-config": ac_config, + "info": info, } diff --git a/wideq/ac.py b/wideq/ac.py index 5115317..f5e76ce 100644 --- a/wideq/ac.py +++ b/wideq/ac.py @@ -155,6 +155,7 @@ class ACOp(enum.Enum): """Whether a device is on or off.""" OFF = "@AC_MAIN_OPERATION_OFF_W" + ON = "@AC_MAIN_OPERATION_ON_W" # (single) on RIGHT_ON = "@AC_MAIN_OPERATION_RIGHT_ON_W" # Right fan only. LEFT_ON = "@AC_MAIN_OPERATION_LEFT_ON_W" # Left fan only. ALL_ON = "@AC_MAIN_OPERATION_ALL_ON_W" # Both fans (or only fan) on. @@ -202,7 +203,7 @@ class ACDevice(Device): def supported_operations(self): """Get a list of the ACOp Operations the device supports.""" - mapping = self.model.value("Operation").options + mapping = self.model.value("airState.operation").options return [ACOp(o) for i, o in mapping.items()] @property @@ -239,7 +240,7 @@ class ACDevice(Device): def set_celsius(self, c): """Set the device's target temperature in Celsius degrees.""" - self._set_control("TempCfg", c) + self._set_control("airState.tempState.target", c) def set_fahrenheit(self, f): """Set the device's target temperature in Fahrenheit degrees.""" @@ -276,86 +277,80 @@ class ACDevice(Device): `set_zones`. """ - return self._get_config("DuctZone") + # don't have api data for v2 zones, not sure about format + return [] def set_jet_mode(self, jet_opt): """Set jet mode to a value from the `ACJetMode` enum.""" - jet_opt_value = self.model.enum_value("Jet", jet_opt.value) - self._set_control("Jet", jet_opt_value) + jet_opt_value = self.model.enum_value( + "airState.wMode.jet", jet_opt.value + ) + self._set_control("airState.wMode.jet", jet_opt_value) def set_fan_speed(self, speed): """Set the fan speed to a value from the `ACFanSpeed` enum.""" - speed_value = self.model.enum_value("WindStrength", speed.value) - self._set_control("WindStrength", speed_value) + speed_value = self.model.enum_value( + "airState.windStrength", speed.value + ) + self._set_control("airState.windStrength", speed_value) def set_horz_swing(self, swing): """Set the horizontal swing to a value from the `ACHSwingMode` enum.""" - swing_value = self.model.enum_value("WDirHStep", swing.value) - self._set_control("WDirHStep", swing_value) + swing_value = self.model.enum_value("airState.wDir.hStep", swing.value) + self._set_control("airState.wDir.hStep", swing_value) def set_vert_swing(self, swing): """Set the vertical swing to a value from the `ACVSwingMode` enum.""" - swing_value = self.model.enum_value("WDirVStep", swing.value) - self._set_control("WDirVStep", swing_value) + swing_value = self.model.enum_value("airState.wDir.vStep", swing.value) + self._set_control("airState.wDir.vStep", swing_value) def set_mode(self, mode): """Set the device's operating mode to an `OpMode` value.""" - mode_value = self.model.enum_value("OpMode", mode.value) - self._set_control("OpMode", mode_value) + mode_value = self.model.enum_value("airState.opMode", mode.value) + self._set_control("airState.opMode", mode_value) def set_on(self, is_on): """Turn on or off the device (according to a boolean).""" op = self.supported_on_operation if is_on else ACOp.OFF - op_value = self.model.enum_value("Operation", op.value) - self._set_control("Operation", op_value) + op_value = self.model.enum_value("airState.operation", op.value) + self._set_control("airState.operation", op_value, command="Operation") def get_filter_state(self): """Get information about the filter.""" - return self._get_config("Filter") + return self.get_status().filter_state def get_mfilter_state(self): """Get information about the "MFilter" (not sure what this is).""" - return self._get_config("MFilter") + return self.get_status().filter_state_max_time def get_energy_target(self): """Get the configured energy target data.""" - return self._get_config("EnergyDesiredValue") + return self.get_status().energy_on_current def get_outdoor_power(self): """Get instant power usage in watts of the outdoor unit""" - try: - value = self._get_config("OutTotalInstantPower") - return value["OutTotalInstantPower"] - except InvalidRequestError: - # Device does not support outdoor unit instant power usage - return 0 + return self.get_status().energy_on_current def get_power(self): """Get the instant power usage in watts of the whole unit""" - try: - value = self._get_config("InOutInstantPower") - return value["InOutInstantPower"] - except InvalidRequestError: - # Device does not support whole unit instant power usage - return 0 + return self.get_status().energy_on_current def get_light(self): """Get a Boolean indicating whether the display light is on.""" try: - value = self._get_control("DisplayControl") - return value == "0" # Seems backwards, but isn't. + return self.get_status().light except FailedRequestError: # Device does not support reporting display light status. # Since it's probably not changeable the it must be on. @@ -363,12 +358,14 @@ class ACDevice(Device): def get_volume(self): """Get the speaker volume level.""" + return 0 # Device does not support volume control. - try: - value = self._get_control("SpkVolume") - return int(value) - except FailedRequestError: - return 0 # Device does not support volume control. + def get_status(self): + """Get status information + This method retrieves the entire device snapshot.... + """ + res = self._get_deviceinfo_from_snapshot() + return ACStatus(self, res) def poll(self): """Poll the device's current state. @@ -413,7 +410,7 @@ class ACStatus(object): @property def temp_cur_c(self): - return self._str_to_num(self.data["TempCur"]) + return self._str_to_num(self.data["airState.tempState.current"]) @property def temp_cur_f(self): @@ -421,7 +418,7 @@ class ACStatus(object): @property def temp_cfg_c(self): - return self._str_to_num(self.data["TempCfg"]) + return self._str_to_num(self.data["airState.tempState.target"]) @property def temp_cfg_f(self): @@ -429,23 +426,47 @@ class ACStatus(object): @property def mode(self): - return ACMode(lookup_enum("OpMode", self.data, self.ac)) + return ACMode(lookup_enum("airState.opMode", self.data, self.ac)) @property def fan_speed(self): - return ACFanSpeed(lookup_enum("WindStrength", self.data, self.ac)) + return ACFanSpeed( + lookup_enum("airState.windStrength", self.data, self.ac) + ) @property def horz_swing(self): - return ACHSwingMode(lookup_enum("WDirHStep", self.data, self.ac)) + return ACHSwingMode( + lookup_enum("airState.wDir.hStep", self.data, self.ac) + ) @property def vert_swing(self): - return ACVSwingMode(lookup_enum("WDirVStep", self.data, self.ac)) + return ACVSwingMode( + lookup_enum("airState.wDir.vStep", self.data, self.ac) + ) + + @property + def filter_state(self): + return self._str_to_num(self.data["airState.filterMngStates.useTime"]) + + @property + def filter_state_max_time(self): + return self._str_to_num(self.data["airState.filterMngStates.maxTime"]) + + @property + def energy_on_current(self): + return self._str_to_num(self.data["airState.energy.onCurrent"]) + + @property + def light(self): + return self._str_to_num( + self.data["airState.lightingState.displayControl"] + ) @property def is_on(self): - op = ACOp(lookup_enum("Operation", self.data, self.ac)) + op = ACOp(lookup_enum("airState.operation", self.data, self.ac)) return op != ACOp.OFF def __str__(self): diff --git a/wideq/client.py b/wideq/client.py index 19e0c03..c54a83c 100644 --- a/wideq/client.py +++ b/wideq/client.py @@ -1,6 +1,7 @@ """A high-level, convenient abstraction for interacting with the LG SmartThinQ API for most use cases. """ +from io import UnsupportedOperation import json import enum import logging @@ -31,23 +32,29 @@ class Monitor(object): self.device_id = device_id def start(self) -> None: - self.work_id = self.session.monitor_start(self.device_id) + """Nothing to do for v2""" + # self.work_id = self.session.monitor_start(self.device_id) def stop(self) -> None: - self.session.monitor_stop(self.device_id, self.work_id) + """Nothing to do for v2""" + # self.session.monitor_stop(self.device_id, self.work_id) - def poll(self) -> Optional[bytes]: + def poll(self) -> Dict[str, Any]: """Get the current status data (a bytestring) or None if the device is not yet ready. """ + # return self.session.monitor_poll(self.device_id, self.work_id) + # in v2, the data is available only in the snapshot, + # getting better info without querying all devices seems to require + # mqtt + devices = self.session.get_devices() + for device in (DeviceInfo(d) for d in devices): + if device.id == self.device_id: + pollDevice = device + if pollDevice is None: + raise core.DeviceNotFoundError() - try: - return self.session.monitor_poll(self.device_id, self.work_id) - except core.MonitorError: - # Try to restart the task. - self.stop() - self.start() - return None + return pollDevice.data["snapshot"] @staticmethod def decode_json(data: bytes) -> Dict[str, Any]: @@ -60,8 +67,7 @@ class Monitor(object): decoded status result (or None if status is not available). """ - data = self.poll() - return self.decode_json(data) if data else None + return self.poll() def __enter__(self) -> "Monitor": self.start() @@ -172,7 +178,11 @@ class Client(object): if "auth" in state: data = state["auth"] client._auth = core.Auth( - client.gateway, data["access_token"], data["refresh_token"] + client.gateway, + data["access_token"], + data["refresh_token"], + data["user_number"], + data["oauth_root"], ) if "session" in state: @@ -225,13 +235,9 @@ class Client(object): to reload the gateway servers and restart the session. """ - client = cls( - country=country or core.DEFAULT_COUNTRY, - language=language or core.DEFAULT_LANGUAGE, - ) - client._auth = core.Auth(client.gateway, None, refresh_token) - client.refresh() - return client + # This operation is no longer supported in v2, the + # user no is required. + raise UnsupportedOperation("User number required in v2") def model_info(self, device: "DeviceInfo") -> "ModelInfo": """For a DeviceInfo object, get a ModelInfo object describing @@ -284,7 +290,7 @@ class DeviceInfo(object): @property def model_id(self) -> str: - return self.data["modelNm"] + return self.data["modelName"] @property def id(self) -> str: @@ -292,7 +298,7 @@ class DeviceInfo(object): @property def model_info_url(self) -> str: - return self.data["modelJsonUrl"] + return self.data["modelJsonUri"] @property def name(self) -> str: @@ -306,7 +312,7 @@ class DeviceInfo(object): def load_model_info(self): """Load JSON data describing the model's capabilities.""" - return requests.get(self.model_info_url).json() + return requests.get(self.model_info_url, verify=False).json() BitValue = namedtuple("BitValue", ["options"]) @@ -333,21 +339,21 @@ class ModelInfo(object): :raises ValueError: If an unsupported type is encountered. """ d = self.data["Value"][name] - if d["type"] in ("Enum", "enum"): - return EnumValue(d["option"]) - elif d["type"] == "Range": + if d["data_type"] in ("Enum", "enum"): + return EnumValue(d["value_mapping"]) + elif d["data_type"] == "Range": return RangeValue( d["option"]["min"], d["option"]["max"], d["option"].get("step", 1), ) - elif d["type"].lower() == "bit": + elif d["data_type"].lower() == "bit": bit_values = {opt["startbit"]: opt["value"] for opt in d["option"]} return BitValue(bit_values) - elif d["type"].lower() == "reference": + elif d["data_type"].lower() == "reference": ref = d["option"][0] return ReferenceValue(self.data[ref]) - elif d["type"].lower() == "string": + elif d["data_type"].lower() == "string": return StringValue(d.get("_comment", "")) else: raise ValueError( @@ -368,17 +374,17 @@ class ModelInfo(object): def enum_name(self, key, value): """Look up the friendly enum name for an encoded value.""" options = self.value(key).options - if value not in options: + if str(int(value)) not in options: LOGGER.warning( "Value `%s` for key `%s` not in options: %s. Values from API: " "%s", - value, + str(int(value)), key, options, - self.data["Value"][key]["option"], + self.data["Value"][key]["value_mapping"], ) return _UNKNOWN - return options[value] + return options[str(int(value))] def reference_name(self, key: str, value: Any) -> Optional[str]: """Look up the friendly name for an encoded reference value. @@ -418,10 +424,11 @@ class ModelInfo(object): def decode_monitor(self, data): """Decode status data.""" - if self.binary_monitor_data: - return self.decode_monitor_binary(data) - else: - return self.decode_monitor_json(data) + return data + # if self.binary_monitor_data: + # return self.decode_monitor_binary(data) + # else: + # return self.decode_monitor_json(data) class Device(object): @@ -440,21 +447,36 @@ class Device(object): self.device = device self.model: ModelInfo = client.model_info(device) - def _set_control(self, key, value): + def _get_deviceinfo_from_snapshot(self): + # probably should cache the snapshot somehow + devices = self.client.session.get_devices() + for device in (DeviceInfo(d) for d in devices): + if device.id == self.device.id: + pollDevice = device + if pollDevice is None: + raise core.DeviceNotFoundError() + return pollDevice.data["snapshot"] + + def _set_control(self, key, value, command="Set", ctrlKey="basicCtrl"): """Set a device's control for `key` to `value`.""" - self.client.session.set_device_controls( + self.client.session.device_control( self.device.id, - {key: value}, + { + "ctrlKey": ctrlKey, + "command": command, + "dataKey": key, + "dataValue": value, + }, ) - def _get_config(self, key): + def _get_config(self, key, ctrlKey="basicCtrl"): """Look up a device's configuration for a given value. The response is parsed as base64-encoded JSON. """ - data = self.client.session.get_device_config( + data = self.client.session.device_control( self.device.id, - key, + {"ctrlKey": ctrlKey, "command": "Get", "dataKey": key}, ) data = base64.b64decode(data).decode("utf8") try: @@ -470,15 +492,9 @@ class Device(object): def _get_control(self, key): """Look up a device's control value.""" - data = self.client.session.get_device_config( - self.device.id, - key, - "Control", - ) - - # The response comes in a funky key/value format: "(key:value)". - _, value = data[1:-1].split(":") - return value + # unsupported api in v2- use snapshot or mqtt + # currently just returns empty value + return "" def monitor_start(self): """Start monitoring the device's status.""" diff --git a/wideq/core.py b/wideq/core.py index e169f11..11a0576 100644 --- a/wideq/core.py +++ b/wideq/core.py @@ -1,6 +1,7 @@ """A low-level, general abstraction for the LG SmartThinQ API. """ import base64 +from enum import Enum import uuid from urllib.parse import urljoin, urlencode, urlparse, parse_qs import hashlib @@ -12,18 +13,39 @@ from typing import Any, Dict, List, Tuple from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry -GATEWAY_URL = "https://kic.lgthinq.com:46030/api/common/gatewayUriList" -APP_KEY = "wideq" +GATEWAY_URL = ( + "https://route.lgthinq.com:46030/v1/service/application/gateway-uri" +) SECURITY_KEY = "nuts_securitykey" -DATA_ROOT = "lgedmRoot" +APP_KEY = "wideq" +DATA_ROOT = "result" +POST_DATA_ROOT = "lgedmRoot" +RETURN_CODE_ROOT = "resultCode" +RETURN_MESSAGE_ROOT = "returnMsg" SVC_CODE = "SVC202" -CLIENT_ID = "LGAO221A02" OAUTH_SECRET_KEY = "c053c2a6ddeb7ad97cb0eed0dcb31cf8" OAUTH_CLIENT_KEY = "LGAO221A02" DATE_FORMAT = "%a, %d %b %Y %H:%M:%S +0000" DEFAULT_COUNTRY = "US" DEFAULT_LANGUAGE = "en-US" +# v2 +API_KEY = "VGhpblEyLjAgU0VSVklDRQ==" + +# the client id is a SHA512 hash of the phone MFR,MODEL,SERIAL, +# and the build id of the thinq app it can also just be a random +# string, we use the same client id used for oauth +CLIENT_ID = OAUTH_CLIENT_KEY +MESSAGE_ID = "wideq" +SVC_PHASE = "OP" +APP_LEVEL = "PRD" +APP_OS = "ANDROID" +APP_TYPE = "NUTS" +APP_VER = "3.5.1200" + + +OAUTH_REDIRECT_URI = "https://kr.m.lgaccount.com/login/iabClose" + RETRY_COUNT = 5 # Anecdotally this seems sufficient. RETRY_FACTOR = 0.5 RETRY_STATUSES = (502, 503, 504) @@ -129,7 +151,7 @@ def get_list(obj, key: str) -> List[Dict[str, Any]]: class APIError(Exception): """An error reported by the API.""" - def __init__(self, code, message): + def __init__(self, code, message=None): self.code = code self.message = message @@ -159,6 +181,10 @@ class InvalidRequestError(APIError): """The server rejected a request as invalid.""" +class DeviceNotFoundError: + """The device couldn't be found.""" + + class MonitorError(APIError): """Monitoring a device failed, possibly because the monitoring session failed and needs to be restarted. @@ -185,7 +211,21 @@ API_ERRORS = { } -def lgedm_post(url, data=None, access_token=None, session_id=None): +class RequestMethod(Enum): + GET = "get" + POST = "post" + + +def thinq_request( + method, + url, + data=None, + access_token=None, + session_id=None, + user_number=None, + country=DEFAULT_COUNTRY, + language=DEFAULT_LANGUAGE, +): """Make an HTTP request in the format used by the API servers. In this format, the request POST data sent as JSON under a special @@ -197,30 +237,45 @@ def lgedm_post(url, data=None, access_token=None, session_id=None): the gateway server data or to start a session. """ headers = { - "x-thinq-application-key": APP_KEY, - "x-thinq-security-key": SECURITY_KEY, "Accept": "application/json", + "x-api-key": API_KEY, + "x-client-id": CLIENT_ID, + "x-country-code": country, + "x-language-code": language, + "x-message-id": MESSAGE_ID, + "x-service-code": SVC_CODE, + "x-service-phase": SVC_PHASE, + "x-thinq-app-level": APP_LEVEL, + "x-thinq-app-os": APP_OS, + "x-thinq-app-type": APP_TYPE, + "x-thinq-app-ver": APP_VER, } + if access_token: - headers["x-thinq-token"] = access_token - if session_id: - headers["x-thinq-jsessionId"] = session_id + headers["x-emp-token"] = access_token + if user_number: + headers["x-user-no"] = user_number with retry_session() as session: - res = session.post(url, json={DATA_ROOT: data}, headers=headers) - out = res.json()[DATA_ROOT] + if method == RequestMethod.POST: + res = session.post(url, json=data, headers=headers) + elif method == RequestMethod.GET: + res = session.get(url, headers=headers) + else: + raise ValueError("Unsupported request method") + + out = res.json() # Check for API errors. - if "returnCd" in out: - code = out["returnCd"] + if RETURN_CODE_ROOT in out: + code = out[RETURN_CODE_ROOT] if code != "0000": - message = out["returnMsg"] if code in API_ERRORS: - raise API_ERRORS[code](code, message) + raise API_ERRORS[code](code) else: - raise APIError(code, message) + raise APIError(code) - return out + return out[DATA_ROOT] def oauth_url(auth_base, country, language): @@ -228,18 +283,20 @@ def oauth_url(auth_base, country, language): authenticated session. """ - url = urljoin(auth_base, "login/sign_in") + url = urljoin(auth_base, "spx/login/signIn") query = urlencode( { "country": country, "language": language, - "svcCode": SVC_CODE, - "authSvr": "oauth2", + "svc_list": SVC_CODE, "client_id": CLIENT_ID, "division": "ha", - "grant_type": "password", + "redirect_uri": OAUTH_REDIRECT_URI, + "state": uuid.uuid1().hex, + "show_thirdparty_login": "AMZ,FBK", } ) + return "{}?{}".format(url, query) @@ -250,35 +307,37 @@ def parse_oauth_callback(url): """ params = parse_qs(urlparse(url).query) - return params["access_token"][0], params["refresh_token"][0] + return ( + params["oauth2_backend_url"][0], + params["code"][0], + params["user_number"][0], + ) -def login(api_root, access_token, country, language): - """Use an access token to log into the API and obtain a session and - return information about the session. - """ - - url = urljoin(api_root + "/", "member/login") - data = { - "countryCode": country, - "langCode": language, - "loginType": "EMP", - "token": access_token, - } - return lgedm_post(url, data) +class OAuthGrant(Enum): + REFRESH_TOKEN = "refresh_token" + AUTHORIZATION_CODE = "authorization_code" -def refresh_auth(oauth_root, refresh_token): - """Get a new access_token using a refresh_token. +def oauth_request(grant, oauth_root, token): + """Make an oauth_request with a specific grant type May raise a `TokenError`. """ - token_url = urljoin(oauth_root, "/oauth2/token") - data = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - } + oauth_path = "/oauth/1.0/oauth2/token" + token_url = urljoin(oauth_root, oauth_path) + data = {} + + if grant == OAuthGrant.REFRESH_TOKEN: + data["grant_type"] = "refresh_token" + data["refresh_token"] = token + elif grant == OAuthGrant.AUTHORIZATION_CODE: + data["code"] = token + data["grant_type"] = "authorization_code" + data["redirect_uri"] = OAUTH_REDIRECT_URI + else: + raise ValueError("Unsupported grant") # The timestamp for labeling OAuth requests can be obtained # through a request to the date/time endpoint: @@ -289,17 +348,15 @@ def refresh_auth(oauth_root, refresh_token): # The signature for the requests is on a string consisting of two # parts: (1) a fake request URL containing the refresh token, and (2) # the timestamp. - req_url = ( - "/oauth2/token?grant_type=refresh_token&refresh_token=" + refresh_token - ) + req_url = "{}?{}".format(oauth_path, urlencode(data)) sig = oauth2_signature( "{}\n{}".format(req_url, timestamp), OAUTH_SECRET_KEY ) headers = { - "lgemp-x-app-key": OAUTH_CLIENT_KEY, - "lgemp-x-signature": sig, - "lgemp-x-date": timestamp, + "x-lge-appkey": CLIENT_ID, + "x-lge-oauth-signature": sig, + "x-lge-oauth-date": timestamp, "Accept": "application/json", } @@ -307,16 +364,16 @@ def refresh_auth(oauth_root, refresh_token): res = session.post(token_url, data=data, headers=headers) res_data = res.json() - if res_data["status"] != 1: + if res.status_code != 200: raise TokenError() - return res_data["access_token"] + + return res_data class Gateway(object): - def __init__(self, auth_base, api_root, oauth_root, country, language): + def __init__(self, auth_base, api_root, country, language): self.auth_base = auth_base self.api_root = api_root - self.oauth_root = oauth_root self.country = country self.language = language @@ -327,12 +384,12 @@ class Gateway(object): `country` and `language` are codes, like "US" and "en-US," respectively. """ - gw = lgedm_post( - GATEWAY_URL, {"countryCode": country, "langCode": language} - ) - return cls( - gw["empUri"], gw["thinqUri"], gw["oauthUri"], country, language + gw = thinq_request( + RequestMethod.GET, + GATEWAY_URL, + {"countryCode": country, "langCode": language}, ) + return cls(gw["empUri"], gw["thinq2Uri"], country, language) def oauth_url(self): return oauth_url(self.auth_base, self.country, self.language) @@ -341,7 +398,6 @@ class Gateway(object): return { "auth_base": self.auth_base, "api_root": self.api_root, - "oauth_root": self.oauth_root, "country": self.country, "language": self.language, } @@ -351,56 +407,68 @@ class Gateway(object): return cls( data["auth_base"], data["api_root"], - data["oauth_root"], data.get("country", DEFAULT_COUNTRY), data.get("language", DEFAULT_LANGUAGE), ) class Auth(object): - def __init__(self, gateway, access_token, refresh_token): + def __init__( + self, gateway, access_token, refresh_token, user_number, oauth_root + ): self.gateway = gateway self.access_token = access_token self.refresh_token = refresh_token + self.user_number = user_number + self.oauth_root = oauth_root @classmethod def from_url(cls, gateway, url): """Create an authentication using an OAuth callback URL.""" - access_token, refresh_token = parse_oauth_callback(url) - return cls(gateway, access_token, refresh_token) + oauth_root, auth_code, user_number = parse_oauth_callback(url) + out = oauth_request( + OAuthGrant.AUTHORIZATION_CODE, oauth_root, auth_code + ) + return cls( + gateway, + out["access_token"], + out["refresh_token"], + user_number, + oauth_root, + ) def start_session(self) -> Tuple["Session", List[Dict[str, Any]]]: """Start an API session for the logged-in user. Return the Session object and a list of the user's devices. """ - - session_info = login( - self.gateway.api_root, - self.access_token, - self.gateway.country, - self.gateway.language, - ) - session_id = session_info["jsessionId"] - return Session(self, session_id), get_list(session_info, "item") + return Session(self), [] def refresh(self): """Refresh the authentication, returning a new Auth object.""" - new_access_token = refresh_auth( - self.gateway.oauth_root, self.refresh_token + new_access_token = oauth_request( + OAuthGrant.REFRESH_TOKEN, self.oauth_root, self.refresh_token + )["access_token"] + return Auth( + self.gateway, + new_access_token, + self.refresh_token, + self.user_number, + self.oauth_root, ) - return Auth(self.gateway, new_access_token, self.refresh_token) def serialize(self) -> Dict[str, str]: return { "access_token": self.access_token, "refresh_token": self.refresh_token, + "user_number": self.user_number, + "oauth_root": self.oauth_root, } class Session(object): - def __init__(self, auth, session_id) -> None: + def __init__(self, auth, session_id=None) -> None: self.auth = auth self.session_id = session_id @@ -412,7 +480,30 @@ class Session(object): """ url = urljoin(self.auth.gateway.api_root + "/", path) - return lgedm_post(url, data, self.auth.access_token, self.session_id) + return thinq_request( + RequestMethod.POST, + url, + data, + self.auth.access_token, + user_number=self.auth.user_number, + ) + + def get(self, path): + """Make a GET request to the API server. + + This is like `lgedm_get`, but it pulls the context for the + request from an active Session. + """ + + url = urljoin(self.auth.gateway.api_root + "/", path) + return thinq_request( + RequestMethod.GET, + url, + access_token=self.auth.access_token, + user_number=self.auth.user_number, + country=self.auth.gateway.country, + language=self.auth.gateway.language, + ) def get_devices(self) -> List[Dict[str, Any]]: """Get a list of devices associated with the user's account. @@ -420,7 +511,7 @@ class Session(object): Return a list of dicts with information about the devices. """ - return get_list(self.post("device/deviceList"), "item") + return get_list(self.get("service/application/dashboard"), "item") def monitor_start(self, device_id): """Begin monitoring a device's status. @@ -488,40 +579,13 @@ class Session(object): }, ) - def set_device_controls(self, device_id, values): + def device_control(self, device_id, data): """Control a device's settings. `values` is a key/value map containing the settings to update. """ - return self.post( - "rti/rtiControl", - { - "cmd": "Control", - "cmdOpt": "Set", - "value": values, - "deviceId": device_id, - "workId": gen_uuid(), - "data": "", - }, - ) + controlPath = "service/devices/{}/control-sync".format(device_id) - def get_device_config(self, device_id, key, category="Config"): - """Get a device configuration option. - - The `category` string should probably either be "Config" or - "Control"; the right choice appears to depend on the key. - """ - - res = self.post( - "rti/rtiControl", - { - "cmd": category, - "cmdOpt": "Get", - "value": key, - "deviceId": device_id, - "workId": gen_uuid(), - "data": "", - }, - ) - return res["returnData"] + res = self.post(controlPath, data) + return res