1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00
Andrey Pohilko 907a2dd561 Cosmetics
2019-12-27 10:34:04 +03:00

746 lines
22 KiB
Python

import logging
from struct import pack, unpack
from pylgbst.utilities import str2hex
log = logging.getLogger('hub')
class Message(object):
TYPE = None
def __init__(self):
self.hub_id = 0x00 # not used according to official doc
self.payload = b""
def bytes(self):
"""
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
"""
msglen = len(self.payload) + 3
assert msglen < 127, "TODO: handle longer messages with 2-byte len"
return pack("<B", msglen) + pack("<B", self.hub_id) + pack("<B", self.TYPE) + self.payload
def __repr__(self):
# assert self.bytes() # to trigger any field changes
data = self.__dict__
data = {x: (str2hex(y) if isinstance(y, bytes) else y)
for x, y in data.items()
if x not in ('hub_id',)}
return self.__class__.__name__ + "(%s)" % data
class DownstreamMsg(Message):
def __init__(self):
super(DownstreamMsg, self).__init__()
self.needs_reply = False
def is_reply(self, msg):
del msg
return False
class UpstreamMsg(Message):
def __init__(self):
super(UpstreamMsg, self).__init__()
@classmethod
def decode(cls, data):
"""
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
"""
msg = cls()
msg.payload = data
msglen = msg._byte()
assert msglen < 127, "TODO: handle longer messages with 2-byte len"
hub_id = msg._byte()
assert hub_id == 0
msg_type = msg._byte()
assert cls.TYPE == msg_type, "Message type does not match: %x!=%x" % (cls.TYPE, msg_type)
assert isinstance(msg.payload, (bytes, bytearray))
return msg
def __shift(self, vtype, vlen):
val = self.payload[0:vlen]
self.payload = self.payload[vlen:]
return unpack("<" + vtype, val)[0]
def _byte(self):
return self.__shift("B", 1)
def _short(self):
return self.__shift("H", 2)
def _long(self):
return self.__shift("I", 4)
def _float(self):
return self.__shift("f", 4)
def _bits_list(self, val):
res = []
x = 1
for i in range(16 + 1):
if val & x:
res.append(i)
x <<= 1
return res
class MsgHubProperties(DownstreamMsg, UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-properties
"""
TYPE = 0x01
ADVERTISE_NAME = 0x01
BUTTON = 0x02
FW_VERSION = 0x03
HW_VERSION = 0x04
RSSI = 0x05
VOLTAGE_PERC = 0x06
BATTERY_TYPE = 0x07
MANUFACTURER = 0x08
RADIO_FW_VERSION = 0x09
WIRELESS_PROTO_VERSION = 0x0A
SYSTEM_TYPE_ID = 0x0B
HW_NETW_ID = 0x0C
PRIMARY_MAC = 0x0D
SECONDARY_MAC = 0x0E
HARDWARE_NETWORK_FAMILY = 0x0F
SET = 0x01
UPD_ENABLE = 0x02
UPD_DISABLE = 0x03
RESET = 0x04
UPD_REQUEST = 0x05
UPSTREAM_UPDATE = 0x06
def __init__(self, prop=None, operation=None, parameters=b""):
super(MsgHubProperties, self).__init__()
self.property = prop
self.operation = operation
self.parameters = parameters
def bytes(self):
if self.operation in (self.UPD_REQUEST, self.UPD_ENABLE):
self.needs_reply = True
self.payload = pack("<B", self.property) + pack("<B", self.operation) + self.parameters
return super(MsgHubProperties, self).bytes()
@classmethod
def decode(cls, data):
msg = super(MsgHubProperties, cls).decode(data)
assert isinstance(msg, MsgHubProperties)
msg.property = msg._byte()
msg.operation = msg._byte()
msg.parameters = msg.payload
return msg
def is_reply(self, msg):
return isinstance(msg, MsgHubProperties) \
and msg.operation == self.UPSTREAM_UPDATE and msg.property == self.property
class MsgHubAction(DownstreamMsg, UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-actions
"""
TYPE = 0x02
SWITCH_OFF = 0x01
DISCONNECT = 0x02
VCC_PORT_CONTROL_ON = 0x03
VCC_PORT_CONTROL_OFF = 0x04
BUSY_INDICATION_ON = 0x05
BUSY_INDICATION_OFF = 0x06
SWITCH_OFF_IMMEDIATELY = 0x2F
UPSTREAM_SHUTDOWN = 0x30
UPSTREAM_DISCONNECT = 0x31
UPSTREAM_BOOT_MODE = 0x32
def __init__(self, action=None):
super(MsgHubAction, self).__init__()
self.action = action
def bytes(self):
self.payload = pack("<B", self.action)
self.needs_reply = self.action in (self.DISCONNECT, self.SWITCH_OFF)
return super(MsgHubAction, self).bytes()
def is_reply(self, msg):
if not isinstance(msg, MsgHubAction):
raise TypeError("Unexpected message type: %s" % (msg.__class__,))
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
return True
if self.action == self.SWITCH_OFF and msg.action == self.UPSTREAM_SHUTDOWN:
return True
@classmethod
def decode(cls, data):
msg = super(MsgHubAction, cls).decode(data)
assert isinstance(msg, MsgHubAction)
msg.action = msg._byte()
return msg
class MsgHubAlert(DownstreamMsg, UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-alerts
"""
TYPE = 0x03
LOW_VOLTAGE = 0x01
HIGH_CURRENT = 0x02
LOW_SIGNAL = 0x03
OVER_POWER = 0x04
DESCR = {
LOW_VOLTAGE: "low voltage",
HIGH_CURRENT: "high current",
LOW_SIGNAL: "low signal",
OVER_POWER: "over power"
}
UPD_ENABLE = 0x01
UPD_DISABLE = 0x02
UPD_REQUEST = 0x03
UPSTREAM_UPDATE = 0x04
def __init__(self, atype=None, operation=None):
super(MsgHubAlert, self).__init__()
self.atype = atype
self.operation = operation
self.status = None
def bytes(self):
self.payload = pack("<B", self.atype) + pack("<B", self.operation)
if self.operation == self.UPD_REQUEST:
self.needs_reply = True
return super(MsgHubAlert, self).bytes()
@classmethod
def decode(cls, data):
msg = super(MsgHubAlert, cls).decode(data)
assert isinstance(msg, MsgHubAlert)
msg.atype = msg._byte()
msg.operation = msg._byte()
msg.status = msg._byte()
assert msg.operation == cls.UPSTREAM_UPDATE
return msg
def is_ok(self):
return not self.status
def is_reply(self, msg):
return isinstance(msg, MsgHubAlert) \
and msg.operation == self.UPSTREAM_UPDATE and msg.atype == self.atype
class MsgHubAttachedIO(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-attached-i-o
"""
TYPE = 0x04
EVENT_DETACHED = 0x00
EVENT_ATTACHED = 0x01
EVENT_ATTACHED_VIRTUAL = 0x02
# DEVICE TYPES
DEV_MOTOR = 0x0001
DEV_SYSTEM_TRAIN_MOTOR = 0x0002
DEV_BUTTON = 0x0005
DEV_LED_LIGHT = 0x0008
DEV_VOLTAGE = 0x0014
DEV_CURRENT = 0x0015
DEV_PIEZO_SOUND = 0x0016
DEV_RGB_LIGHT = 0x0017
DEV_TILT_EXTERNAL = 0x0022
DEV_MOTION_SENSOR = 0x0023
DEV_VISION_SENSOR = 0x0025
DEV_MOTOR_EXTERNAL_TACHO = 0x0026
DEV_MOTOR_INTERNAL_TACHO = 0x0027
DEV_TILT_INTERNAL = 0x0028
def __init__(self):
super(MsgHubAttachedIO, self).__init__()
self.port = None
self.event = None
@classmethod
def decode(cls, data):
msg = super(MsgHubAttachedIO, cls).decode(data)
assert isinstance(msg, MsgHubAttachedIO)
msg.port = msg._byte()
msg.event = msg._byte()
return msg
class MsgGenericError(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#generic-error-messages
"""
TYPE = 0x05
ERR_ACK = 0x01 # ACK
ERR_MACK = 0x02 # MACK
ERR_BUFFER_OVERFLOW = 0x03 # Buffer Overflow
ERR_TIMEOUT = 0x04 # Timeout
ERR_WRONG_COMMAND = 0x05 # Command NOT recognized
ERR_WRONG_PARAMS = 0x06 # Invalid use (e.g. parameter error(s)
ERR_OVERCURRENT = 0x07
ERR_INTERNAL = 0x08
DESCR = {
ERR_ACK: "ACK",
ERR_MACK: "MACK",
ERR_BUFFER_OVERFLOW: "Buffer Overflow",
ERR_TIMEOUT: "Timeout",
ERR_WRONG_COMMAND: "Command NOT recognized",
ERR_WRONG_PARAMS: "Invalid use (e.g. parameter error(s)",
ERR_OVERCURRENT: "Overcurrent",
ERR_INTERNAL: "Internal ERROR",
}
def __init__(self):
super(MsgGenericError, self).__init__()
self.cmd = None
self.err = None
@classmethod
def decode(cls, data):
msg = super(MsgGenericError, cls).decode(data)
assert isinstance(msg, MsgGenericError)
msg.cmd = msg._byte()
msg.err = msg._byte()
return msg
def message(self):
return "Command 0x%x caused error 0x%x: %s" % (self.cmd, self.err, self.DESCR[self.err])
class MsgPortInfoRequest(DownstreamMsg):
"""
This is sync request for value on port
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-information-request
"""
TYPE = 0x21
INFO_PORT_VALUE = 0x00
INFO_MODE_INFO = 0x01
INFO_MODE_COMBINATIONS = 0x02
def __init__(self, port, info_type):
super(MsgPortInfoRequest, self).__init__()
self.port = port
self.info_type = info_type
self.needs_reply = True
def bytes(self):
self.payload = pack("<B", self.port) + pack("<B", self.info_type)
return super(MsgPortInfoRequest, self).bytes()
def is_reply(self, msg):
if msg.port != self.port:
return False
if self.info_type == self.INFO_PORT_VALUE:
return isinstance(msg, (MsgPortValueSingle, MsgPortValueCombined))
else:
return isinstance(msg, (MsgPortInfo,))
class MsgPortModeInfoRequest(DownstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-mode-information-request
"""
TYPE = 0x22
INFO_NAME = 0x00
INFO_RAW_RANGE = 0x01
INFO_PCT_RANGE = 0x02
INFO_SI_RANGE = 0x03 # no idea what 'SI' stands for
INFO_UNITS = 0x04
INFO_MAPPING = 0x05
# INFO_INTERNAL = 0x06
INFO_MOTOR_BIAS = 0x07
INFO_CAPABILITY_BITS = 0x08
INFO_VALUE_FORMAT = 0x80
INFO_TYPES = {
INFO_NAME: "Name",
INFO_RAW_RANGE: "Raw range",
INFO_PCT_RANGE: "Percent range",
INFO_SI_RANGE: "SI value range",
INFO_UNITS: "Units",
INFO_MAPPING: "Mapping",
INFO_MOTOR_BIAS: "Motor bias",
INFO_CAPABILITY_BITS: "Capabilities",
INFO_VALUE_FORMAT: "Value encoding",
}
def __init__(self, port, mode, info_type):
super(MsgPortModeInfoRequest, self).__init__()
self.port = port
self.mode = mode
self.info_type = info_type
self.payload = pack("<B", port) + pack("<B", mode) + pack("<B", info_type)
self.needs_reply = True
def is_reply(self, msg):
if not isinstance(msg, MsgPortModeInfo):
return False
if msg.port != self.port or msg.mode != self.mode or msg.info_type != self.info_type:
return False
return True
class MsgPortInputFmtSetupSingle(DownstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-setup-single
"""
TYPE = 0x41
def __init__(self, port, mode, delta=1, update_enable=0):
super(MsgPortInputFmtSetupSingle, self).__init__()
self.port = port
self.mode = mode
self.updates_enabled = update_enable
self.update_delta = delta
self.payload = pack("<B", port) + pack("<B", mode) + pack("<I", delta) + pack("<B", update_enable)
self.needs_reply = True
def is_reply(self, msg):
if isinstance(msg, MsgPortInputFmtSingle) and msg.port == self.port:
return True
class MsgPortInputFmtSetupCombined(DownstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-setup-combinedmode
"""
TYPE = 0x42
def __init__(self, port, mode, delta=1, update_enable=0):
super(MsgPortInputFmtSetupCombined, self).__init__()
self.port = port
self.payload = pack("<B", port) + pack("<B", mode) + pack("<I", delta) + pack("<B", update_enable)
self.needs_reply = True
def is_reply(self, msg):
if isinstance(msg, MsgPortInputFmtCombined) and msg.port == self.port:
return True
class MsgPortInfo(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-information
"""
TYPE = 0x43
CAP_OUTPUT = 0b00000001
CAP_INPUT = 0b00000010
CAP_COMBINABLE = 0b00000100
CAP_SYNCHRONIZABLE = 0b00001000
def __init__(self):
super(MsgPortInfo, self).__init__()
self.port = None
self.info_type = None
self.capabilities = None
self.total_modes = None
self.input_modes = None
self.output_modes = None
self.possible_mode_combinations = []
@classmethod
def decode(cls, data):
msg = super(MsgPortInfo, cls).decode(data)
assert isinstance(msg, MsgPortInfo)
msg.port = msg._byte()
msg.info_type = msg._byte()
if msg.info_type == MsgPortInfoRequest.INFO_MODE_INFO:
msg.capabilities = msg._byte()
msg.total_modes = msg._byte()
msg.input_modes = msg._bits_list(msg._short())
msg.output_modes = msg._bits_list(msg._short())
else:
while msg.payload:
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#pos-m
val = msg._short()
msg.possible_mode_combinations.append(msg._bits_list(val))
if not val:
break
return msg
def is_output(self):
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
return bool(self.capabilities & self.CAP_OUTPUT)
def is_input(self):
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
return bool(self.capabilities & self.CAP_INPUT)
def is_combinable(self):
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
return bool(self.capabilities & self.CAP_COMBINABLE)
def is_synchronizable(self):
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
return bool(self.capabilities & self.CAP_SYNCHRONIZABLE)
class MsgPortModeInfo(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-mode-information
"""
TYPE = 0x44
MAPPING_FLAGS = {
7: "Supports NULL value",
6: "Supports Functional Mapping 2.0+",
5: "N/A",
4: "Absolute [min..max]",
3: "Relative [-1..1]",
2: "Discrete [0, 1, 2, 3]",
1: "N/A",
0: "N/A",
}
DATASET_TYPES = {
0b00: "8 bit",
0b01: "16 bit",
0b10: "32 bit",
0b11: "FLOAT",
}
def __init__(self):
super(MsgPortModeInfo, self).__init__()
self.port = None
self.mode = None
self.info_type = None # @see MsgPortModeInfoRequest
self.value = None
@classmethod
def decode(cls, data):
msg = super(MsgPortModeInfo, cls).decode(data)
assert isinstance(msg, MsgPortModeInfo)
msg.port = msg._byte()
msg.mode = msg._byte()
msg.info_type = msg._byte()
msg.value = msg._value()
return msg
def _value(self):
info = MsgPortModeInfoRequest
if self.info_type == info.INFO_NAME:
return self.payload[:self.payload.index(b"\00")].decode('ascii')
elif self.info_type in (info.INFO_RAW_RANGE, info.INFO_PCT_RANGE, info.INFO_SI_RANGE):
return [self._float(), self._float()]
elif self.info_type == info.INFO_UNITS:
return self.payload[:self.payload.index(b"\00")].decode('ascii')
elif self.info_type == info.INFO_MAPPING:
inp = self._bits_list(self._byte())
outp = self._bits_list(self._byte())
return {
"input": [self.MAPPING_FLAGS[x] for x in inp],
"output": [self.MAPPING_FLAGS[x] for x in outp],
}
elif self.info_type == info.INFO_MOTOR_BIAS:
return self._byte()
elif self.info_type == info.INFO_VALUE_FORMAT:
return {
"datasets": self._byte(),
"type": self.DATASET_TYPES[self._byte()],
"total_figures": self._byte(),
"decimals": self._byte(),
}
else:
return self.payload # FIXME: will probably fail here
class MsgPortValueSingle(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-value-single
"""
TYPE = 0x45
def __init__(self):
super(MsgPortValueSingle, self).__init__()
self.port = None
@classmethod
def decode(cls, data):
msg = super(MsgPortValueSingle, cls).decode(data)
assert isinstance(msg, MsgPortValueSingle)
msg.port = msg._byte()
return msg
class MsgPortValueCombined(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-value-combinedmode
"""
TYPE = 0x46
def __init__(self):
super(MsgPortValueCombined, self).__init__()
self.port = None
@classmethod
def decode(cls, data):
msg = super(MsgPortValueCombined, cls).decode(data)
assert isinstance(msg, MsgPortValueCombined)
msg.port = msg._byte()
return msg
class MsgPortInputFmtSingle(UpstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
"""
TYPE = 0x47
def __init__(self, port=None, mode=None, upd_enabled=None, upd_delta=None):
super(MsgPortInputFmtSingle, self).__init__()
self.port = port
self.mode = mode
self.upd_delta = upd_delta
self.upd_enabled = upd_enabled
@classmethod
def decode(cls, data):
msg = super(MsgPortInputFmtSingle, cls).decode(data)
assert isinstance(msg, MsgPortInputFmtSingle)
msg.port = msg._byte()
msg.mode = msg._byte()
msg.upd_delta = msg._long()
if len(msg.payload):
msg.upd_enabled = msg._byte()
return msg
class MsgPortInputFmtCombined(UpstreamMsg): # TODO
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
"""
TYPE = 0x48
def __init__(self):
super(MsgPortInputFmtCombined, self).__init__()
self.port = None
self.combined_control = None
@classmethod
def decode(cls, data):
msg = super(MsgPortInputFmtCombined, cls).decode(data)
assert isinstance(msg, MsgPortInputFmtSingle)
msg.port = msg._byte()
return msg
class MsgVirtualPortSetup(DownstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#virtual-port-setup
"""
TYPE = 0x61
CMD_DISCONNECT = 0x00
CMD_CONNECT = 0x01
def __init__(self, cmd, port):
super(MsgVirtualPortSetup, self).__init__()
self.payload = pack("<B", cmd)
if cmd == self.CMD_DISCONNECT:
assert isinstance(port, int)
self.payload += pack("<B", port)
else:
assert isinstance(port, (list, tuple))
self.payload += pack("<B", port[0]) + pack("<B", port[1])
class MsgPortOutput(DownstreamMsg):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-output-command
"""
TYPE = 0x81
SC_NO_BUFFER = 0b00000001
SC_FEEDBACK = 0b00010000
WRITE_DIRECT = 0x50
WRITE_DIRECT_MODE_DATA = 0x51
def __init__(self, port, subcommand, params):
super(MsgPortOutput, self).__init__()
self.port = port
self.is_buffered = False
self.do_feedback = True
self.subcommand = subcommand
self.params = params
def bytes(self):
startup_completion_flags = 0
if not self.is_buffered:
startup_completion_flags |= self.SC_NO_BUFFER
if self.do_feedback:
startup_completion_flags |= self.SC_FEEDBACK
self.needs_reply = True
self.payload = pack("<B", self.port) + pack("<B", startup_completion_flags) \
+ pack("<B", self.subcommand) + self.params
return super(MsgPortOutput, self).bytes()
def is_reply(self, msg):
return isinstance(msg, MsgPortOutputFeedback) and msg.port == self.port \
and (msg.is_completed() or self.is_buffered)
class MsgPortOutputFeedback(UpstreamMsg):
TYPE = 0x82
def __init__(self):
super(MsgPortOutputFeedback, self).__init__()
self.port = None
self.status = None
@classmethod
def decode(cls, data):
msg = super(MsgPortOutputFeedback, cls).decode(data)
assert isinstance(msg, MsgPortOutputFeedback)
assert len(msg.payload) == 2, "TODO: implement multi-port feedback message"
msg.port = msg._byte()
msg.status = msg._byte()
return msg
def is_in_progress(self):
return self.status & 0b0001
def is_completed(self):
return self.status & 0b0010
def is_discarded(self):
return self.status & 0b0100
def is_idle(self):
return self.status & 0b1000
UPSTREAM_MSGS = (
MsgHubProperties, MsgHubAction, MsgHubAlert, MsgHubAttachedIO, MsgGenericError,
MsgPortInfo, MsgPortModeInfo,
MsgPortValueSingle, MsgPortValueCombined, MsgPortInputFmtSingle, MsgPortInputFmtCombined,
MsgPortOutputFeedback
)