1
0
mirror of https://github.com/no2chem/wideq.git synced 2025-05-21 01:20:11 -07:00
wideq/wideq/core.py
2020-04-10 09:29:31 -04:00

493 lines
14 KiB
Python

"""A low-level, general abstraction for the LG SmartThinQ API.
"""
import base64
import uuid
from urllib.parse import urljoin, urlencode, urlparse, parse_qs
import hashlib
import hmac
import datetime
import requests
import logging
from typing import Any, Dict, List, Tuple
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
GATEWAY_URL = 'https://kic.lgthinq.com:46030/api/common/gatewayUriList'
APP_KEY = 'wideq'
SECURITY_KEY = 'nuts_securitykey'
DATA_ROOT = 'lgedmRoot'
SVC_CODE = 'SVC202'
CLIENT_ID = 'LGAO221A02'
OAUTH_SECRET_KEY = 'c053c2a6ddeb7ad97cb0eed0dcb31cf8'
OAUTH_CLIENT_KEY = 'LGAO221A02'
DATE_FORMAT = '%a, %d %b %Y %H:%M:%S +0000'
DEFAULT_COUNTRY = 'US'
DEFAULT_LANGUAGE = 'en-US'
NUM_RETRIES = 5 # Anecdotally this seems sufficient.
BACKOFF_FACTOR = 0.5
STATUS_FORCELIST = (502, 503, 504)
def get_wideq_logger() -> logging.Logger:
level = logging.INFO
fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
logger = logging.getLogger("wideq")
logger.setLevel(level)
try:
import colorlog # type: ignore
colorfmt = f"%(log_color)s{fmt}%(reset)s"
handler = colorlog.StreamHandler()
handler.setFormatter(
colorlog.ColoredFormatter(
colorfmt,
datefmt=datefmt,
reset=True,
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red",
},
)
)
except ImportError:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
logger.addHandler(handler)
return logger
LOGGER = get_wideq_logger()
def get_retry_session():
# See https://www.peterbe.com/plog/best-practice-with-retries-with-requests
# for the source of this retry mechanism
session = requests.Session()
retry = Retry(
total=NUM_RETRIES,
read=NUM_RETRIES,
connect=NUM_RETRIES,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=STATUS_FORCELIST,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
SESSION = get_retry_session()
def set_log_level(level: int):
logger = get_wideq_logger()
logger.setLevel(level)
def gen_uuid() -> str:
return str(uuid.uuid4())
def oauth2_signature(message: str, secret: str) -> bytes:
"""Get the base64-encoded SHA-1 HMAC digest of a string, as used in
OAauth2 request signatures.
Both the `secret` and `message` are given as text strings. We use
their UTF-8 equivalents.
"""
secret_bytes = secret.encode('utf8')
hashed = hmac.new(secret_bytes, message.encode('utf8'), hashlib.sha1)
digest = hashed.digest()
return base64.b64encode(digest)
def get_list(obj, key: str) -> List[Dict[str, Any]]:
"""Look up a list using a key from an object.
If `obj[key]` is a list, return it unchanged. If is something else,
return a single-element list containing it. If the key does not
exist, return an empty list.
"""
try:
val = obj[key]
except KeyError:
return []
if isinstance(val, list):
return val
else:
return [val]
class APIError(Exception):
"""An error reported by the API."""
def __init__(self, code, message):
self.code = code
self.message = message
class NotLoggedInError(APIError):
"""The session is not valid or expired."""
class NotConnectedError(APIError):
"""The service can't contact the specified device."""
class TokenError(APIError):
"""An authentication token was rejected."""
def __init__(self):
pass
class FailedRequestError(APIError):
"""A failed request typically indicates an unsupported control on a
device.
"""
class InvalidRequestError(APIError):
"""The server rejected a request as invalid."""
class MonitorError(APIError):
"""Monitoring a device failed, possibly because the monitoring
session failed and needs to be restarted.
"""
def __init__(self, device_id, code):
self.device_id = device_id
self.code = code
API_ERRORS = {
"0102": NotLoggedInError,
"0106": NotConnectedError,
"0100": FailedRequestError,
9000: InvalidRequestError, # Surprisingly, an integer (not a string).
}
def lgedm_post(url, data=None, access_token=None, session_id=None):
"""Make an HTTP request in the format used by the API servers.
In this format, the request POST data sent as JSON under a special
key; authentication sent in headers. Return the JSON data extracted
from the response.
The `access_token` and `session_id` are required for most normal,
authenticated requests. They are not required, for example, to load
the gateway server data or to start a session.
"""
headers = {
'x-thinq-application-key': APP_KEY,
'x-thinq-security-key': SECURITY_KEY,
'Accept': 'application/json',
}
if access_token:
headers['x-thinq-token'] = access_token
if session_id:
headers['x-thinq-jsessionId'] = session_id
res = SESSION.post(url, json={DATA_ROOT: data}, headers=headers)
out = res.json()[DATA_ROOT]
# Check for API errors.
if 'returnCd' in out:
code = out['returnCd']
if code != '0000':
message = out['returnMsg']
if code in API_ERRORS:
raise API_ERRORS[code](code, message)
else:
raise APIError(code, message)
return out
def oauth_url(auth_base, country, language):
"""Construct the URL for users to log in (in a browser) to start an
authenticated session.
"""
url = urljoin(auth_base, 'login/sign_in')
query = urlencode({
'country': country,
'language': language,
'svcCode': SVC_CODE,
'authSvr': 'oauth2',
'client_id': CLIENT_ID,
'division': 'ha',
'grant_type': 'password',
})
return '{}?{}'.format(url, query)
def parse_oauth_callback(url):
"""Parse the URL to which an OAuth login redirected to obtain two
tokens: an access token for API credentials, and a refresh token for
getting updated access tokens.
"""
params = parse_qs(urlparse(url).query)
return params['access_token'][0], params['refresh_token'][0]
def login(api_root, access_token, country, language):
"""Use an access token to log into the API and obtain a session and
return information about the session.
"""
url = urljoin(api_root + '/', 'member/login')
data = {
'countryCode': country,
'langCode': language,
'loginType': 'EMP',
'token': access_token,
}
return lgedm_post(url, data)
def refresh_auth(oauth_root, refresh_token):
"""Get a new access_token using a refresh_token.
May raise a `TokenError`.
"""
token_url = urljoin(oauth_root, '/oauth2/token')
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}
# The timestamp for labeling OAuth requests can be obtained
# through a request to the date/time endpoint:
# https://us.lgeapi.com/datetime
# But we can also just generate a timestamp.
timestamp = datetime.datetime.utcnow().strftime(DATE_FORMAT)
# The signature for the requests is on a string consisting of two
# parts: (1) a fake request URL containing the refresh token, and (2)
# the timestamp.
req_url = ('/oauth2/token?grant_type=refresh_token&refresh_token=' +
refresh_token)
sig = oauth2_signature('{}\n{}'.format(req_url, timestamp),
OAUTH_SECRET_KEY)
headers = {
'lgemp-x-app-key': OAUTH_CLIENT_KEY,
'lgemp-x-signature': sig,
'lgemp-x-date': timestamp,
'Accept': 'application/json',
}
res = SESSION.post(token_url, data=data, headers=headers)
res_data = res.json()
if res_data['status'] != 1:
raise TokenError()
return res_data['access_token']
class Gateway(object):
def __init__(self, auth_base, api_root, oauth_root, country, language):
self.auth_base = auth_base
self.api_root = api_root
self.oauth_root = oauth_root
self.country = country
self.language = language
@classmethod
def discover(cls, country, language) -> 'Gateway':
"""Load information about the hosts to use for API interaction.
`country` and `language` are codes, like "US" and "en-US,"
respectively.
"""
gw = lgedm_post(GATEWAY_URL,
{'countryCode': country, 'langCode': language})
return cls(gw['empUri'], gw['thinqUri'], gw['oauthUri'],
country, language)
def oauth_url(self):
return oauth_url(self.auth_base, self.country, self.language)
def serialize(self) -> Dict[str, str]:
return {
'auth_base': self.auth_base,
'api_root': self.api_root,
'oauth_root': self.oauth_root,
'country': self.country,
'language': self.language,
}
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> 'Gateway':
return cls(data['auth_base'], data['api_root'], data['oauth_root'],
data.get('country', DEFAULT_COUNTRY),
data.get('language', DEFAULT_LANGUAGE))
class Auth(object):
def __init__(self, gateway, access_token, refresh_token):
self.gateway = gateway
self.access_token = access_token
self.refresh_token = refresh_token
@classmethod
def from_url(cls, gateway, url):
"""Create an authentication using an OAuth callback URL.
"""
access_token, refresh_token = parse_oauth_callback(url)
return cls(gateway, access_token, refresh_token)
def start_session(self) -> Tuple['Session', List[Dict[str, Any]]]:
"""Start an API session for the logged-in user. Return the
Session object and a list of the user's devices.
"""
session_info = login(self.gateway.api_root, self.access_token,
self.gateway.country, self.gateway.language)
session_id = session_info['jsessionId']
return Session(self, session_id), get_list(session_info, 'item')
def refresh(self):
"""Refresh the authentication, returning a new Auth object.
"""
new_access_token = refresh_auth(self.gateway.oauth_root,
self.refresh_token)
return Auth(self.gateway, new_access_token, self.refresh_token)
def serialize(self) -> Dict[str, str]:
return {
'access_token': self.access_token,
'refresh_token': self.refresh_token,
}
class Session(object):
def __init__(self, auth, session_id) -> None:
self.auth = auth
self.session_id = session_id
def post(self, path, data=None):
"""Make a POST request to the API server.
This is like `lgedm_post`, but it pulls the context for the
request from an active Session.
"""
url = urljoin(self.auth.gateway.api_root + '/', path)
return lgedm_post(url, data, self.auth.access_token, self.session_id)
def get_devices(self) -> List[Dict[str, Any]]:
"""Get a list of devices associated with the user's account.
Return a list of dicts with information about the devices.
"""
return get_list(self.post('device/deviceList'), 'item')
def monitor_start(self, device_id):
"""Begin monitoring a device's status.
Return a "work ID" that can be used to retrieve the result of
monitoring.
"""
res = self.post('rti/rtiMon', {
'cmd': 'Mon',
'cmdOpt': 'Start',
'deviceId': device_id,
'workId': gen_uuid(),
})
return res['workId']
def monitor_poll(self, device_id, work_id):
"""Get the result of a monitoring task.
`work_id` is a string ID retrieved from `monitor_start`. Return
a status result, which is a bytestring, or None if the
monitoring is not yet ready.
May raise a `MonitorError`, in which case the right course of
action is probably to restart the monitoring task.
"""
work_list = [{'deviceId': device_id, 'workId': work_id}]
res = self.post('rti/rtiResult', {'workList': work_list})['workList']
# When monitoring first starts, it usually takes a few
# iterations before data becomes available. In the initial
# "warmup" phase, `returnCode` is missing from the response.
if 'returnCode' not in res:
return None
# Check for errors.
code = res.get('returnCode') # returnCode can be missing.
if code != '0000':
raise MonitorError(device_id, code)
# The return data may or may not be present, depending on the
# monitoring task status.
if 'returnData' in res:
# The main response payload is base64-encoded binary data in
# the `returnData` field. This sometimes contains JSON data
# and sometimes other binary data.
return base64.b64decode(res['returnData'])
else:
return None
def monitor_stop(self, device_id, work_id):
"""Stop monitoring a device."""
self.post('rti/rtiMon', {
'cmd': 'Mon',
'cmdOpt': 'Stop',
'deviceId': device_id,
'workId': work_id,
})
def set_device_controls(self, device_id, values):
"""Control a device's settings.
`values` is a key/value map containing the settings to update.
"""
return self.post('rti/rtiControl', {
'cmd': 'Control',
'cmdOpt': 'Set',
'value': values,
'deviceId': device_id,
'workId': gen_uuid(),
'data': '',
})
def get_device_config(self, device_id, key, category='Config'):
"""Get a device configuration option.
The `category` string should probably either be "Config" or
"Control"; the right choice appears to depend on the key.
"""
res = self.post('rti/rtiControl', {
'cmd': category,
'cmdOpt': 'Get',
'value': key,
'deviceId': device_id,
'workId': gen_uuid(),
'data': '',
})
return res['returnData']