1
0
mirror of https://github.com/no2chem/wideq.git synced 2025-05-15 14:50:28 -07:00

Initial thinq2 support, works with LA090HSV5 ac

This commit is contained in:
Michael Wei 2020-11-20 19:43:04 +00:00
parent 267a880340
commit 8617831453
5 changed files with 324 additions and 214 deletions

2
.gitignore vendored
View File

@ -6,3 +6,5 @@ __pycache__
.vscode/
# Ignore Mac system files
.DS_Store
# Ignore devcontainer for now (should this go in source control?)
.devcontainer

View File

@ -9,6 +9,7 @@ import re
import os.path
import logging
from typing import List
from pprint import pprint
STATE_FILE = "wideq_state.json"
LOGGER = logging.getLogger("wideq.example")
@ -34,6 +35,13 @@ def ls(client):
print("{0.id}: {0.name} ({0.type.name} {0.model_id})".format(device))
def info(client, device_id):
"""Dump info on a device."""
device = client.get_device(device_id)
pprint(vars(device), indent=4, width=1)
def gen_mon(client, device_id):
"""Monitor any other device but AC device,
displaying generic information about its status.
@ -51,9 +59,11 @@ def gen_mon(client, device_id):
if data:
try:
res = model.decode_monitor(data)
print(res)
except ValueError:
print("status data: {!r}".format(data))
else:
"""
else:
for key, value in res.items():
try:
desc = model.value(key)
@ -66,13 +76,9 @@ def gen_mon(client, device_id):
)
)
elif isinstance(desc, wideq.RangeValue):
print(
"- {0}: {1} ({2.min}-{2.max})".format(
key,
value,
desc,
)
)
print('- {0}: {1} ({2.min}-{2.max})'.format(
key, value, desc,
)) """
except KeyboardInterrupt:
pass
@ -204,6 +210,7 @@ EXAMPLE_COMMANDS = {
"set-temp-freezer": set_temp_freezer,
"turn": turn,
"ac-config": ac_config,
"info": info,
}

View File

@ -155,6 +155,7 @@ class ACOp(enum.Enum):
"""Whether a device is on or off."""
OFF = "@AC_MAIN_OPERATION_OFF_W"
ON = "@AC_MAIN_OPERATION_ON_W" # (single) on
RIGHT_ON = "@AC_MAIN_OPERATION_RIGHT_ON_W" # Right fan only.
LEFT_ON = "@AC_MAIN_OPERATION_LEFT_ON_W" # Left fan only.
ALL_ON = "@AC_MAIN_OPERATION_ALL_ON_W" # Both fans (or only fan) on.
@ -202,7 +203,7 @@ class ACDevice(Device):
def supported_operations(self):
"""Get a list of the ACOp Operations the device supports."""
mapping = self.model.value("Operation").options
mapping = self.model.value("airState.operation").options
return [ACOp(o) for i, o in mapping.items()]
@property
@ -239,7 +240,7 @@ class ACDevice(Device):
def set_celsius(self, c):
"""Set the device's target temperature in Celsius degrees."""
self._set_control("TempCfg", c)
self._set_control("airState.tempState.target", c)
def set_fahrenheit(self, f):
"""Set the device's target temperature in Fahrenheit degrees."""
@ -276,86 +277,80 @@ class ACDevice(Device):
`set_zones`.
"""
return self._get_config("DuctZone")
# don't have api data for v2 zones, not sure about format
return []
def set_jet_mode(self, jet_opt):
"""Set jet mode to a value from the `ACJetMode` enum."""
jet_opt_value = self.model.enum_value("Jet", jet_opt.value)
self._set_control("Jet", jet_opt_value)
jet_opt_value = self.model.enum_value(
"airState.wMode.jet", jet_opt.value
)
self._set_control("airState.wMode.jet", jet_opt_value)
def set_fan_speed(self, speed):
"""Set the fan speed to a value from the `ACFanSpeed` enum."""
speed_value = self.model.enum_value("WindStrength", speed.value)
self._set_control("WindStrength", speed_value)
speed_value = self.model.enum_value(
"airState.windStrength", speed.value
)
self._set_control("airState.windStrength", speed_value)
def set_horz_swing(self, swing):
"""Set the horizontal swing to a value from the `ACHSwingMode` enum."""
swing_value = self.model.enum_value("WDirHStep", swing.value)
self._set_control("WDirHStep", swing_value)
swing_value = self.model.enum_value("airState.wDir.hStep", swing.value)
self._set_control("airState.wDir.hStep", swing_value)
def set_vert_swing(self, swing):
"""Set the vertical swing to a value from the `ACVSwingMode` enum."""
swing_value = self.model.enum_value("WDirVStep", swing.value)
self._set_control("WDirVStep", swing_value)
swing_value = self.model.enum_value("airState.wDir.vStep", swing.value)
self._set_control("airState.wDir.vStep", swing_value)
def set_mode(self, mode):
"""Set the device's operating mode to an `OpMode` value."""
mode_value = self.model.enum_value("OpMode", mode.value)
self._set_control("OpMode", mode_value)
mode_value = self.model.enum_value("airState.opMode", mode.value)
self._set_control("airState.opMode", mode_value)
def set_on(self, is_on):
"""Turn on or off the device (according to a boolean)."""
op = self.supported_on_operation if is_on else ACOp.OFF
op_value = self.model.enum_value("Operation", op.value)
self._set_control("Operation", op_value)
op_value = self.model.enum_value("airState.operation", op.value)
self._set_control("airState.operation", op_value, command="Operation")
def get_filter_state(self):
"""Get information about the filter."""
return self._get_config("Filter")
return self.get_status().filter_state
def get_mfilter_state(self):
"""Get information about the "MFilter" (not sure what this is)."""
return self._get_config("MFilter")
return self.get_status().filter_state_max_time
def get_energy_target(self):
"""Get the configured energy target data."""
return self._get_config("EnergyDesiredValue")
return self.get_status().energy_on_current
def get_outdoor_power(self):
"""Get instant power usage in watts of the outdoor unit"""
try:
value = self._get_config("OutTotalInstantPower")
return value["OutTotalInstantPower"]
except InvalidRequestError:
# Device does not support outdoor unit instant power usage
return 0
return self.get_status().energy_on_current
def get_power(self):
"""Get the instant power usage in watts of the whole unit"""
try:
value = self._get_config("InOutInstantPower")
return value["InOutInstantPower"]
except InvalidRequestError:
# Device does not support whole unit instant power usage
return 0
return self.get_status().energy_on_current
def get_light(self):
"""Get a Boolean indicating whether the display light is on."""
try:
value = self._get_control("DisplayControl")
return value == "0" # Seems backwards, but isn't.
return self.get_status().light
except FailedRequestError:
# Device does not support reporting display light status.
# Since it's probably not changeable the it must be on.
@ -363,12 +358,14 @@ class ACDevice(Device):
def get_volume(self):
"""Get the speaker volume level."""
return 0 # Device does not support volume control.
try:
value = self._get_control("SpkVolume")
return int(value)
except FailedRequestError:
return 0 # Device does not support volume control.
def get_status(self):
"""Get status information
This method retrieves the entire device snapshot....
"""
res = self._get_deviceinfo_from_snapshot()
return ACStatus(self, res)
def poll(self):
"""Poll the device's current state.
@ -413,7 +410,7 @@ class ACStatus(object):
@property
def temp_cur_c(self):
return self._str_to_num(self.data["TempCur"])
return self._str_to_num(self.data["airState.tempState.current"])
@property
def temp_cur_f(self):
@ -421,7 +418,7 @@ class ACStatus(object):
@property
def temp_cfg_c(self):
return self._str_to_num(self.data["TempCfg"])
return self._str_to_num(self.data["airState.tempState.target"])
@property
def temp_cfg_f(self):
@ -429,23 +426,47 @@ class ACStatus(object):
@property
def mode(self):
return ACMode(lookup_enum("OpMode", self.data, self.ac))
return ACMode(lookup_enum("airState.opMode", self.data, self.ac))
@property
def fan_speed(self):
return ACFanSpeed(lookup_enum("WindStrength", self.data, self.ac))
return ACFanSpeed(
lookup_enum("airState.windStrength", self.data, self.ac)
)
@property
def horz_swing(self):
return ACHSwingMode(lookup_enum("WDirHStep", self.data, self.ac))
return ACHSwingMode(
lookup_enum("airState.wDir.hStep", self.data, self.ac)
)
@property
def vert_swing(self):
return ACVSwingMode(lookup_enum("WDirVStep", self.data, self.ac))
return ACVSwingMode(
lookup_enum("airState.wDir.vStep", self.data, self.ac)
)
@property
def filter_state(self):
return self._str_to_num(self.data["airState.filterMngStates.useTime"])
@property
def filter_state_max_time(self):
return self._str_to_num(self.data["airState.filterMngStates.maxTime"])
@property
def energy_on_current(self):
return self._str_to_num(self.data["airState.energy.onCurrent"])
@property
def light(self):
return self._str_to_num(
self.data["airState.lightingState.displayControl"]
)
@property
def is_on(self):
op = ACOp(lookup_enum("Operation", self.data, self.ac))
op = ACOp(lookup_enum("airState.operation", self.data, self.ac))
return op != ACOp.OFF
def __str__(self):

View File

@ -1,6 +1,7 @@
"""A high-level, convenient abstraction for interacting with the LG
SmartThinQ API for most use cases.
"""
from io import UnsupportedOperation
import json
import enum
import logging
@ -31,23 +32,29 @@ class Monitor(object):
self.device_id = device_id
def start(self) -> None:
self.work_id = self.session.monitor_start(self.device_id)
"""Nothing to do for v2"""
# self.work_id = self.session.monitor_start(self.device_id)
def stop(self) -> None:
self.session.monitor_stop(self.device_id, self.work_id)
"""Nothing to do for v2"""
# self.session.monitor_stop(self.device_id, self.work_id)
def poll(self) -> Optional[bytes]:
def poll(self) -> Dict[str, Any]:
"""Get the current status data (a bytestring) or None if the
device is not yet ready.
"""
# return self.session.monitor_poll(self.device_id, self.work_id)
# in v2, the data is available only in the snapshot,
# getting better info without querying all devices seems to require
# mqtt
devices = self.session.get_devices()
for device in (DeviceInfo(d) for d in devices):
if device.id == self.device_id:
pollDevice = device
if pollDevice is None:
raise core.DeviceNotFoundError()
try:
return self.session.monitor_poll(self.device_id, self.work_id)
except core.MonitorError:
# Try to restart the task.
self.stop()
self.start()
return None
return pollDevice.data["snapshot"]
@staticmethod
def decode_json(data: bytes) -> Dict[str, Any]:
@ -60,8 +67,7 @@ class Monitor(object):
decoded status result (or None if status is not available).
"""
data = self.poll()
return self.decode_json(data) if data else None
return self.poll()
def __enter__(self) -> "Monitor":
self.start()
@ -172,7 +178,11 @@ class Client(object):
if "auth" in state:
data = state["auth"]
client._auth = core.Auth(
client.gateway, data["access_token"], data["refresh_token"]
client.gateway,
data["access_token"],
data["refresh_token"],
data["user_number"],
data["oauth_root"],
)
if "session" in state:
@ -225,13 +235,9 @@ class Client(object):
to reload the gateway servers and restart the session.
"""
client = cls(
country=country or core.DEFAULT_COUNTRY,
language=language or core.DEFAULT_LANGUAGE,
)
client._auth = core.Auth(client.gateway, None, refresh_token)
client.refresh()
return client
# This operation is no longer supported in v2, the
# user no is required.
raise UnsupportedOperation("User number required in v2")
def model_info(self, device: "DeviceInfo") -> "ModelInfo":
"""For a DeviceInfo object, get a ModelInfo object describing
@ -284,7 +290,7 @@ class DeviceInfo(object):
@property
def model_id(self) -> str:
return self.data["modelNm"]
return self.data["modelName"]
@property
def id(self) -> str:
@ -292,7 +298,7 @@ class DeviceInfo(object):
@property
def model_info_url(self) -> str:
return self.data["modelJsonUrl"]
return self.data["modelJsonUri"]
@property
def name(self) -> str:
@ -306,7 +312,7 @@ class DeviceInfo(object):
def load_model_info(self):
"""Load JSON data describing the model's capabilities."""
return requests.get(self.model_info_url).json()
return requests.get(self.model_info_url, verify=False).json()
BitValue = namedtuple("BitValue", ["options"])
@ -333,21 +339,21 @@ class ModelInfo(object):
:raises ValueError: If an unsupported type is encountered.
"""
d = self.data["Value"][name]
if d["type"] in ("Enum", "enum"):
return EnumValue(d["option"])
elif d["type"] == "Range":
if d["data_type"] in ("Enum", "enum"):
return EnumValue(d["value_mapping"])
elif d["data_type"] == "Range":
return RangeValue(
d["option"]["min"],
d["option"]["max"],
d["option"].get("step", 1),
)
elif d["type"].lower() == "bit":
elif d["data_type"].lower() == "bit":
bit_values = {opt["startbit"]: opt["value"] for opt in d["option"]}
return BitValue(bit_values)
elif d["type"].lower() == "reference":
elif d["data_type"].lower() == "reference":
ref = d["option"][0]
return ReferenceValue(self.data[ref])
elif d["type"].lower() == "string":
elif d["data_type"].lower() == "string":
return StringValue(d.get("_comment", ""))
else:
raise ValueError(
@ -368,17 +374,17 @@ class ModelInfo(object):
def enum_name(self, key, value):
"""Look up the friendly enum name for an encoded value."""
options = self.value(key).options
if value not in options:
if str(int(value)) not in options:
LOGGER.warning(
"Value `%s` for key `%s` not in options: %s. Values from API: "
"%s",
value,
str(int(value)),
key,
options,
self.data["Value"][key]["option"],
self.data["Value"][key]["value_mapping"],
)
return _UNKNOWN
return options[value]
return options[str(int(value))]
def reference_name(self, key: str, value: Any) -> Optional[str]:
"""Look up the friendly name for an encoded reference value.
@ -418,10 +424,11 @@ class ModelInfo(object):
def decode_monitor(self, data):
"""Decode status data."""
if self.binary_monitor_data:
return self.decode_monitor_binary(data)
else:
return self.decode_monitor_json(data)
return data
# if self.binary_monitor_data:
# return self.decode_monitor_binary(data)
# else:
# return self.decode_monitor_json(data)
class Device(object):
@ -440,21 +447,36 @@ class Device(object):
self.device = device
self.model: ModelInfo = client.model_info(device)
def _set_control(self, key, value):
def _get_deviceinfo_from_snapshot(self):
# probably should cache the snapshot somehow
devices = self.client.session.get_devices()
for device in (DeviceInfo(d) for d in devices):
if device.id == self.device.id:
pollDevice = device
if pollDevice is None:
raise core.DeviceNotFoundError()
return pollDevice.data["snapshot"]
def _set_control(self, key, value, command="Set", ctrlKey="basicCtrl"):
"""Set a device's control for `key` to `value`."""
self.client.session.set_device_controls(
self.client.session.device_control(
self.device.id,
{key: value},
{
"ctrlKey": ctrlKey,
"command": command,
"dataKey": key,
"dataValue": value,
},
)
def _get_config(self, key):
def _get_config(self, key, ctrlKey="basicCtrl"):
"""Look up a device's configuration for a given value.
The response is parsed as base64-encoded JSON.
"""
data = self.client.session.get_device_config(
data = self.client.session.device_control(
self.device.id,
key,
{"ctrlKey": ctrlKey, "command": "Get", "dataKey": key},
)
data = base64.b64decode(data).decode("utf8")
try:
@ -470,15 +492,9 @@ class Device(object):
def _get_control(self, key):
"""Look up a device's control value."""
data = self.client.session.get_device_config(
self.device.id,
key,
"Control",
)
# The response comes in a funky key/value format: "(key:value)".
_, value = data[1:-1].split(":")
return value
# unsupported api in v2- use snapshot or mqtt
# currently just returns empty value
return ""
def monitor_start(self):
"""Start monitoring the device's status."""

View File

@ -1,6 +1,7 @@
"""A low-level, general abstraction for the LG SmartThinQ API.
"""
import base64
from enum import Enum
import uuid
from urllib.parse import urljoin, urlencode, urlparse, parse_qs
import hashlib
@ -12,18 +13,39 @@ from typing import Any, Dict, List, Tuple
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
GATEWAY_URL = "https://kic.lgthinq.com:46030/api/common/gatewayUriList"
APP_KEY = "wideq"
GATEWAY_URL = (
"https://route.lgthinq.com:46030/v1/service/application/gateway-uri"
)
SECURITY_KEY = "nuts_securitykey"
DATA_ROOT = "lgedmRoot"
APP_KEY = "wideq"
DATA_ROOT = "result"
POST_DATA_ROOT = "lgedmRoot"
RETURN_CODE_ROOT = "resultCode"
RETURN_MESSAGE_ROOT = "returnMsg"
SVC_CODE = "SVC202"
CLIENT_ID = "LGAO221A02"
OAUTH_SECRET_KEY = "c053c2a6ddeb7ad97cb0eed0dcb31cf8"
OAUTH_CLIENT_KEY = "LGAO221A02"
DATE_FORMAT = "%a, %d %b %Y %H:%M:%S +0000"
DEFAULT_COUNTRY = "US"
DEFAULT_LANGUAGE = "en-US"
# v2
API_KEY = "VGhpblEyLjAgU0VSVklDRQ=="
# the client id is a SHA512 hash of the phone MFR,MODEL,SERIAL,
# and the build id of the thinq app it can also just be a random
# string, we use the same client id used for oauth
CLIENT_ID = OAUTH_CLIENT_KEY
MESSAGE_ID = "wideq"
SVC_PHASE = "OP"
APP_LEVEL = "PRD"
APP_OS = "ANDROID"
APP_TYPE = "NUTS"
APP_VER = "3.5.1200"
OAUTH_REDIRECT_URI = "https://kr.m.lgaccount.com/login/iabClose"
RETRY_COUNT = 5 # Anecdotally this seems sufficient.
RETRY_FACTOR = 0.5
RETRY_STATUSES = (502, 503, 504)
@ -129,7 +151,7 @@ def get_list(obj, key: str) -> List[Dict[str, Any]]:
class APIError(Exception):
"""An error reported by the API."""
def __init__(self, code, message):
def __init__(self, code, message=None):
self.code = code
self.message = message
@ -159,6 +181,10 @@ class InvalidRequestError(APIError):
"""The server rejected a request as invalid."""
class DeviceNotFoundError:
"""The device couldn't be found."""
class MonitorError(APIError):
"""Monitoring a device failed, possibly because the monitoring
session failed and needs to be restarted.
@ -185,7 +211,21 @@ API_ERRORS = {
}
def lgedm_post(url, data=None, access_token=None, session_id=None):
class RequestMethod(Enum):
GET = "get"
POST = "post"
def thinq_request(
method,
url,
data=None,
access_token=None,
session_id=None,
user_number=None,
country=DEFAULT_COUNTRY,
language=DEFAULT_LANGUAGE,
):
"""Make an HTTP request in the format used by the API servers.
In this format, the request POST data sent as JSON under a special
@ -197,30 +237,45 @@ def lgedm_post(url, data=None, access_token=None, session_id=None):
the gateway server data or to start a session.
"""
headers = {
"x-thinq-application-key": APP_KEY,
"x-thinq-security-key": SECURITY_KEY,
"Accept": "application/json",
"x-api-key": API_KEY,
"x-client-id": CLIENT_ID,
"x-country-code": country,
"x-language-code": language,
"x-message-id": MESSAGE_ID,
"x-service-code": SVC_CODE,
"x-service-phase": SVC_PHASE,
"x-thinq-app-level": APP_LEVEL,
"x-thinq-app-os": APP_OS,
"x-thinq-app-type": APP_TYPE,
"x-thinq-app-ver": APP_VER,
}
if access_token:
headers["x-thinq-token"] = access_token
if session_id:
headers["x-thinq-jsessionId"] = session_id
headers["x-emp-token"] = access_token
if user_number:
headers["x-user-no"] = user_number
with retry_session() as session:
res = session.post(url, json={DATA_ROOT: data}, headers=headers)
out = res.json()[DATA_ROOT]
if method == RequestMethod.POST:
res = session.post(url, json=data, headers=headers)
elif method == RequestMethod.GET:
res = session.get(url, headers=headers)
else:
raise ValueError("Unsupported request method")
out = res.json()
# Check for API errors.
if "returnCd" in out:
code = out["returnCd"]
if RETURN_CODE_ROOT in out:
code = out[RETURN_CODE_ROOT]
if code != "0000":
message = out["returnMsg"]
if code in API_ERRORS:
raise API_ERRORS[code](code, message)
raise API_ERRORS[code](code)
else:
raise APIError(code, message)
raise APIError(code)
return out
return out[DATA_ROOT]
def oauth_url(auth_base, country, language):
@ -228,18 +283,20 @@ def oauth_url(auth_base, country, language):
authenticated session.
"""
url = urljoin(auth_base, "login/sign_in")
url = urljoin(auth_base, "spx/login/signIn")
query = urlencode(
{
"country": country,
"language": language,
"svcCode": SVC_CODE,
"authSvr": "oauth2",
"svc_list": SVC_CODE,
"client_id": CLIENT_ID,
"division": "ha",
"grant_type": "password",
"redirect_uri": OAUTH_REDIRECT_URI,
"state": uuid.uuid1().hex,
"show_thirdparty_login": "AMZ,FBK",
}
)
return "{}?{}".format(url, query)
@ -250,35 +307,37 @@ def parse_oauth_callback(url):
"""
params = parse_qs(urlparse(url).query)
return params["access_token"][0], params["refresh_token"][0]
return (
params["oauth2_backend_url"][0],
params["code"][0],
params["user_number"][0],
)
def login(api_root, access_token, country, language):
"""Use an access token to log into the API and obtain a session and
return information about the session.
"""
url = urljoin(api_root + "/", "member/login")
data = {
"countryCode": country,
"langCode": language,
"loginType": "EMP",
"token": access_token,
}
return lgedm_post(url, data)
class OAuthGrant(Enum):
REFRESH_TOKEN = "refresh_token"
AUTHORIZATION_CODE = "authorization_code"
def refresh_auth(oauth_root, refresh_token):
"""Get a new access_token using a refresh_token.
def oauth_request(grant, oauth_root, token):
"""Make an oauth_request with a specific grant type
May raise a `TokenError`.
"""
token_url = urljoin(oauth_root, "/oauth2/token")
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
oauth_path = "/oauth/1.0/oauth2/token"
token_url = urljoin(oauth_root, oauth_path)
data = {}
if grant == OAuthGrant.REFRESH_TOKEN:
data["grant_type"] = "refresh_token"
data["refresh_token"] = token
elif grant == OAuthGrant.AUTHORIZATION_CODE:
data["code"] = token
data["grant_type"] = "authorization_code"
data["redirect_uri"] = OAUTH_REDIRECT_URI
else:
raise ValueError("Unsupported grant")
# The timestamp for labeling OAuth requests can be obtained
# through a request to the date/time endpoint:
@ -289,17 +348,15 @@ def refresh_auth(oauth_root, refresh_token):
# The signature for the requests is on a string consisting of two
# parts: (1) a fake request URL containing the refresh token, and (2)
# the timestamp.
req_url = (
"/oauth2/token?grant_type=refresh_token&refresh_token=" + refresh_token
)
req_url = "{}?{}".format(oauth_path, urlencode(data))
sig = oauth2_signature(
"{}\n{}".format(req_url, timestamp), OAUTH_SECRET_KEY
)
headers = {
"lgemp-x-app-key": OAUTH_CLIENT_KEY,
"lgemp-x-signature": sig,
"lgemp-x-date": timestamp,
"x-lge-appkey": CLIENT_ID,
"x-lge-oauth-signature": sig,
"x-lge-oauth-date": timestamp,
"Accept": "application/json",
}
@ -307,16 +364,16 @@ def refresh_auth(oauth_root, refresh_token):
res = session.post(token_url, data=data, headers=headers)
res_data = res.json()
if res_data["status"] != 1:
if res.status_code != 200:
raise TokenError()
return res_data["access_token"]
return res_data
class Gateway(object):
def __init__(self, auth_base, api_root, oauth_root, country, language):
def __init__(self, auth_base, api_root, country, language):
self.auth_base = auth_base
self.api_root = api_root
self.oauth_root = oauth_root
self.country = country
self.language = language
@ -327,12 +384,12 @@ class Gateway(object):
`country` and `language` are codes, like "US" and "en-US,"
respectively.
"""
gw = lgedm_post(
GATEWAY_URL, {"countryCode": country, "langCode": language}
)
return cls(
gw["empUri"], gw["thinqUri"], gw["oauthUri"], country, language
gw = thinq_request(
RequestMethod.GET,
GATEWAY_URL,
{"countryCode": country, "langCode": language},
)
return cls(gw["empUri"], gw["thinq2Uri"], country, language)
def oauth_url(self):
return oauth_url(self.auth_base, self.country, self.language)
@ -341,7 +398,6 @@ class Gateway(object):
return {
"auth_base": self.auth_base,
"api_root": self.api_root,
"oauth_root": self.oauth_root,
"country": self.country,
"language": self.language,
}
@ -351,56 +407,68 @@ class Gateway(object):
return cls(
data["auth_base"],
data["api_root"],
data["oauth_root"],
data.get("country", DEFAULT_COUNTRY),
data.get("language", DEFAULT_LANGUAGE),
)
class Auth(object):
def __init__(self, gateway, access_token, refresh_token):
def __init__(
self, gateway, access_token, refresh_token, user_number, oauth_root
):
self.gateway = gateway
self.access_token = access_token
self.refresh_token = refresh_token
self.user_number = user_number
self.oauth_root = oauth_root
@classmethod
def from_url(cls, gateway, url):
"""Create an authentication using an OAuth callback URL."""
access_token, refresh_token = parse_oauth_callback(url)
return cls(gateway, access_token, refresh_token)
oauth_root, auth_code, user_number = parse_oauth_callback(url)
out = oauth_request(
OAuthGrant.AUTHORIZATION_CODE, oauth_root, auth_code
)
return cls(
gateway,
out["access_token"],
out["refresh_token"],
user_number,
oauth_root,
)
def start_session(self) -> Tuple["Session", List[Dict[str, Any]]]:
"""Start an API session for the logged-in user. Return the
Session object and a list of the user's devices.
"""
session_info = login(
self.gateway.api_root,
self.access_token,
self.gateway.country,
self.gateway.language,
)
session_id = session_info["jsessionId"]
return Session(self, session_id), get_list(session_info, "item")
return Session(self), []
def refresh(self):
"""Refresh the authentication, returning a new Auth object."""
new_access_token = refresh_auth(
self.gateway.oauth_root, self.refresh_token
new_access_token = oauth_request(
OAuthGrant.REFRESH_TOKEN, self.oauth_root, self.refresh_token
)["access_token"]
return Auth(
self.gateway,
new_access_token,
self.refresh_token,
self.user_number,
self.oauth_root,
)
return Auth(self.gateway, new_access_token, self.refresh_token)
def serialize(self) -> Dict[str, str]:
return {
"access_token": self.access_token,
"refresh_token": self.refresh_token,
"user_number": self.user_number,
"oauth_root": self.oauth_root,
}
class Session(object):
def __init__(self, auth, session_id) -> None:
def __init__(self, auth, session_id=None) -> None:
self.auth = auth
self.session_id = session_id
@ -412,7 +480,30 @@ class Session(object):
"""
url = urljoin(self.auth.gateway.api_root + "/", path)
return lgedm_post(url, data, self.auth.access_token, self.session_id)
return thinq_request(
RequestMethod.POST,
url,
data,
self.auth.access_token,
user_number=self.auth.user_number,
)
def get(self, path):
"""Make a GET request to the API server.
This is like `lgedm_get`, but it pulls the context for the
request from an active Session.
"""
url = urljoin(self.auth.gateway.api_root + "/", path)
return thinq_request(
RequestMethod.GET,
url,
access_token=self.auth.access_token,
user_number=self.auth.user_number,
country=self.auth.gateway.country,
language=self.auth.gateway.language,
)
def get_devices(self) -> List[Dict[str, Any]]:
"""Get a list of devices associated with the user's account.
@ -420,7 +511,7 @@ class Session(object):
Return a list of dicts with information about the devices.
"""
return get_list(self.post("device/deviceList"), "item")
return get_list(self.get("service/application/dashboard"), "item")
def monitor_start(self, device_id):
"""Begin monitoring a device's status.
@ -488,40 +579,13 @@ class Session(object):
},
)
def set_device_controls(self, device_id, values):
def device_control(self, device_id, data):
"""Control a device's settings.
`values` is a key/value map containing the settings to update.
"""
return self.post(
"rti/rtiControl",
{
"cmd": "Control",
"cmdOpt": "Set",
"value": values,
"deviceId": device_id,
"workId": gen_uuid(),
"data": "",
},
)
controlPath = "service/devices/{}/control-sync".format(device_id)
def get_device_config(self, device_id, key, category="Config"):
"""Get a device configuration option.
The `category` string should probably either be "Config" or
"Control"; the right choice appears to depend on the key.
"""
res = self.post(
"rti/rtiControl",
{
"cmd": category,
"cmdOpt": "Get",
"value": key,
"deviceId": device_id,
"workId": gen_uuid(),
"data": "",
},
)
return res["returnData"]
res = self.post(controlPath, data)
return res