mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
* It's HUB ID * Rename file * Working with official doc * Some progress * AttachedIO msg * device action impl * some better device alert impl * restructuring * Some port commands handled * Some command feedback waiting * Some more request-reply things * Some more request-reply things * Reworked msg classes * Getting back to UTs * Getting back to UTs * Facing sync lock problems * Facing sync lock problems * Testing it * Covering more with tests * handle actions * Hub class is almost covered * done coverage for Hub * done coverage for MoveHub * Button is tested * remove debug server from examples * Current and voltage tested * color sensor basic test * cover tilt sensor * motor sensor tested * constant motor * motor is tested * hold_speed impl for motor * motor commands recorded * some cleanup * some cleanup * some cleanup * debug * debug * FIX a bug * acc/dec profiles figured out * UT motor ops * UT motor ops * Get rid of weird piggyback * fix UT * Fix encoding? * fix test mb * More robust * Checked demo works * cosmetics * cosmetics * Maybe better test * fetching and decoding some device caps * describing devs * describing devs works * Applying modes we've learned * Simple and extensible dev attach * Reworking peripherals based on modes * Applying modes we've learned * implemented getting sensor data * fixed port subscribe * Added led out cmds on vision sensor * Worked on color-distance sensor * Introduce some locking for consistency * Improved it all * Travis flags * improve * improve * improve docs
744 lines
21 KiB
Python
744 lines
21 KiB
Python
import logging
|
|
from struct import pack, unpack
|
|
|
|
from pylgbst.utilities import str2hex
|
|
|
|
log = logging.getLogger('hub')
|
|
|
|
|
|
class Message(object):
|
|
TYPE = None
|
|
|
|
def __init__(self):
|
|
self.hub_id = 0x00 # not used according to official doc
|
|
self.payload = b""
|
|
|
|
def bytes(self):
|
|
"""
|
|
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
|
|
"""
|
|
msglen = len(self.payload) + 3
|
|
assert msglen < 127, "TODO: handle logner messages with 2-byte len"
|
|
return pack("<B", msglen) + pack("<B", self.hub_id) + pack("<B", self.TYPE) + self.payload
|
|
|
|
def __repr__(self):
|
|
data = self.__dict__
|
|
data = {x: (str2hex(y) if isinstance(y, bytes) else y)
|
|
for x, y in data.items()
|
|
if x not in ('hub_id', 'needs_reply')}
|
|
return self.__class__.__name__ + "(%s)" % data
|
|
|
|
|
|
class DownstreamMsg(Message):
|
|
|
|
def __init__(self):
|
|
super(DownstreamMsg, self).__init__()
|
|
self.needs_reply = False
|
|
|
|
def is_reply(self, msg):
|
|
del msg
|
|
return False
|
|
|
|
|
|
class UpstreamMsg(Message):
|
|
|
|
def __init__(self):
|
|
super(UpstreamMsg, self).__init__()
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
"""
|
|
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
|
|
"""
|
|
msg = cls()
|
|
msg.payload = data
|
|
msglen = msg._byte()
|
|
assert msglen < 127, "TODO: handle longer messages with 2-byte len"
|
|
hub_id = msg._byte()
|
|
assert hub_id == 0
|
|
msg_type = msg._byte()
|
|
assert cls.TYPE == msg_type, "Message type does not match: %x!=%x" % (cls.TYPE, msg_type)
|
|
assert isinstance(msg.payload, bytes)
|
|
return msg
|
|
|
|
def __shift(self, vtype, vlen):
|
|
val = self.payload[0:vlen]
|
|
self.payload = self.payload[vlen:]
|
|
return unpack("<" + vtype, val)[0]
|
|
|
|
def _byte(self):
|
|
return self.__shift("B", 1)
|
|
|
|
def _short(self):
|
|
return self.__shift("H", 2)
|
|
|
|
def _long(self):
|
|
return self.__shift("I", 4)
|
|
|
|
def _float(self):
|
|
return self.__shift("f", 4)
|
|
|
|
def _bits_list(self, val):
|
|
res = []
|
|
x = 1
|
|
for i in range(16 + 1):
|
|
if val & x:
|
|
res.append(i)
|
|
x <<= 1
|
|
return res
|
|
|
|
|
|
class MsgHubProperties(DownstreamMsg, UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-properties
|
|
"""
|
|
TYPE = 0x01
|
|
|
|
ADVERTISE_NAME = 0x01
|
|
BUTTON = 0x02
|
|
FW_VERSION = 0x03
|
|
HW_VERSION = 0x04
|
|
RSSI = 0x05
|
|
VOLTAGE_PERC = 0x06
|
|
BATTERY_TYPE = 0x07
|
|
MANUFACTURER = 0x08
|
|
RADIO_FW_VERSION = 0x09
|
|
WIRELESS_PROTO_VERSION = 0x0A
|
|
SYSTEM_TYPE_ID = 0x0B
|
|
HW_NETW_ID = 0x0C
|
|
PRIMARY_MAC = 0x0D
|
|
SECONDARY_MAC = 0x0E
|
|
HARDWARE_NETWORK_FAMILY = 0x0F
|
|
|
|
SET = 0x01
|
|
UPD_ENABLE = 0x02
|
|
UPD_DISABLE = 0x03
|
|
RESET = 0x04
|
|
UPD_REQUEST = 0x05
|
|
UPSTREAM_UPDATE = 0x06
|
|
|
|
def __init__(self, prop=None, operation=None, parameters=b""):
|
|
super(MsgHubProperties, self).__init__()
|
|
|
|
self.property = prop
|
|
self.operation = operation
|
|
self.parameters = parameters
|
|
|
|
def bytes(self):
|
|
if self.operation == self.UPD_REQUEST:
|
|
self.needs_reply = True
|
|
self.payload = pack("<B", self.property) + pack("<B", self.operation) + self.parameters
|
|
return super(MsgHubProperties, self).bytes()
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgHubProperties, cls).decode(data)
|
|
assert isinstance(msg, MsgHubProperties)
|
|
msg.property = msg._byte()
|
|
msg.operation = msg._byte()
|
|
msg.parameters = msg.payload
|
|
return msg
|
|
|
|
def is_reply(self, msg):
|
|
return isinstance(msg, MsgHubProperties) \
|
|
and msg.operation == self.UPSTREAM_UPDATE and msg.property == self.property
|
|
|
|
|
|
class MsgHubAction(DownstreamMsg, UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-actions
|
|
"""
|
|
TYPE = 0x02
|
|
|
|
SWITCH_OFF = 0x01
|
|
DISCONNECT = 0x02
|
|
VCC_PORT_CONTROL_ON = 0x03
|
|
VCC_PORT_CONTROL_OFF = 0x04
|
|
BUSY_INDICATION_ON = 0x05
|
|
BUSY_INDICATION_OFF = 0x06
|
|
SWITCH_OFF_IMMEDIATELY = 0x2F
|
|
|
|
UPSTREAM_SHUTDOWN = 0x30
|
|
UPSTREAM_DISCONNECT = 0x31
|
|
UPSTREAM_BOOT_MODE = 0x32
|
|
|
|
def __init__(self, action=None):
|
|
super(MsgHubAction, self).__init__()
|
|
self.action = action
|
|
|
|
def bytes(self):
|
|
self.payload = pack("<B", self.action)
|
|
self.needs_reply = self.action in (self.DISCONNECT, self.SWITCH_OFF)
|
|
return super(MsgHubAction, self).bytes()
|
|
|
|
def is_reply(self, msg):
|
|
assert isinstance(msg, MsgHubAction)
|
|
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
|
|
return True
|
|
|
|
if self.action == self.SWITCH_OFF and msg.action == self.UPSTREAM_SHUTDOWN:
|
|
return True
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgHubAction, cls).decode(data)
|
|
assert isinstance(msg, MsgHubAction)
|
|
msg.action = msg._byte()
|
|
return msg
|
|
|
|
|
|
class MsgHubAlert(DownstreamMsg, UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-alerts
|
|
"""
|
|
TYPE = 0x03
|
|
|
|
LOW_VOLTAGE = 0x01
|
|
HIGH_CURRENT = 0x02
|
|
LOW_SIGNAL = 0x03
|
|
OVER_POWER = 0x04
|
|
|
|
DESCR = {
|
|
LOW_VOLTAGE: "low voltage",
|
|
HIGH_CURRENT: "high current",
|
|
LOW_SIGNAL: "low signal",
|
|
OVER_POWER: "over power"
|
|
}
|
|
|
|
UPD_ENABLE = 0x01
|
|
UPD_DISABLE = 0x02
|
|
UPD_REQUEST = 0x03
|
|
UPSTREAM_UPDATE = 0x04
|
|
|
|
def __init__(self, atype=None, operation=None):
|
|
super(MsgHubAlert, self).__init__()
|
|
self.atype = atype
|
|
self.operation = operation
|
|
self.status = None
|
|
|
|
def bytes(self):
|
|
self.payload = pack("<B", self.atype) + pack("<B", self.operation)
|
|
if self.operation == self.UPD_REQUEST:
|
|
self.needs_reply = True
|
|
return super(MsgHubAlert, self).bytes()
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgHubAlert, cls).decode(data)
|
|
assert isinstance(msg, MsgHubAlert)
|
|
msg.atype = msg._byte()
|
|
msg.operation = msg._byte()
|
|
msg.status = msg._byte()
|
|
|
|
assert msg.operation == cls.UPSTREAM_UPDATE
|
|
return msg
|
|
|
|
def is_ok(self):
|
|
return not self.status
|
|
|
|
def is_reply(self, msg):
|
|
return isinstance(msg, MsgHubAlert) \
|
|
and msg.operation == self.UPSTREAM_UPDATE and msg.atype == self.atype
|
|
|
|
|
|
class MsgHubAttachedIO(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#hub-attached-i-o
|
|
"""
|
|
TYPE = 0x04
|
|
|
|
EVENT_DETACHED = 0x00
|
|
EVENT_ATTACHED = 0x01
|
|
EVENT_ATTACHED_VIRTUAL = 0x02
|
|
|
|
# DEVICE TYPES
|
|
DEV_MOTOR = 0x0001
|
|
DEV_SYSTEM_TRAIN_MOTOR = 0x0002
|
|
DEV_BUTTON = 0x0005
|
|
DEV_LED_LIGHT = 0x0008
|
|
DEV_VOLTAGE = 0x0014
|
|
DEV_CURRENT = 0x0015
|
|
DEV_PIEZO_SOUND = 0x0016
|
|
DEV_RGB_LIGHT = 0x0017
|
|
DEV_TILT_EXTERNAL = 0x0022
|
|
DEV_MOTION_SENSOR = 0x0023
|
|
DEV_VISION_SENSOR = 0x0025
|
|
DEV_MOTOR_EXTERNAL_TACHO = 0x0026
|
|
DEV_MOTOR_INTERNAL_TACHO = 0x0027
|
|
DEV_TILT_INTERNAL = 0x0028
|
|
|
|
def __init__(self):
|
|
super(MsgHubAttachedIO, self).__init__()
|
|
self.port = None
|
|
self.event = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgHubAttachedIO, cls).decode(data)
|
|
assert isinstance(msg, MsgHubAttachedIO)
|
|
msg.port = msg._byte()
|
|
msg.event = msg._byte()
|
|
return msg
|
|
|
|
|
|
class MsgGenericError(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#generic-error-messages
|
|
"""
|
|
TYPE = 0x05
|
|
|
|
ERR_ACK = 0x01 # ACK
|
|
ERR_MACK = 0x02 # MACK
|
|
ERR_BUFFER_OVERFLOW = 0x03 # Buffer Overflow
|
|
ERR_TIMEOUT = 0x04 # Timeout
|
|
ERR_WRONG_COMMAND = 0x05 # Command NOT recognized
|
|
ERR_WRONG_PARAMS = 0x06 # Invalid use (e.g. parameter error(s)
|
|
ERR_OVERCURRENT = 0x07
|
|
ERR_INTERNAL = 0x08
|
|
|
|
DESCR = {
|
|
ERR_ACK: "ACK",
|
|
ERR_MACK: "MACK",
|
|
ERR_BUFFER_OVERFLOW: "Buffer Overflow",
|
|
ERR_TIMEOUT: "Timeout",
|
|
ERR_WRONG_COMMAND: "Command NOT recognized",
|
|
ERR_WRONG_PARAMS: "Invalid use (e.g. parameter error(s)",
|
|
ERR_OVERCURRENT: "Overcurrent",
|
|
ERR_INTERNAL: "Internal ERROR",
|
|
}
|
|
|
|
def __init__(self):
|
|
super(MsgGenericError, self).__init__()
|
|
self.cmd = None
|
|
self.err = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgGenericError, cls).decode(data)
|
|
assert isinstance(msg, MsgGenericError)
|
|
msg.cmd = msg._byte()
|
|
msg.err = msg._byte()
|
|
return msg
|
|
|
|
def message(self):
|
|
return "Command 0x%x caused error 0x%x: %s" % (self.cmd, self.err, self.DESCR[self.err])
|
|
|
|
|
|
class MsgPortInfoRequest(DownstreamMsg):
|
|
"""
|
|
This is sync request for value on port
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-information-request
|
|
"""
|
|
TYPE = 0x21
|
|
|
|
INFO_PORT_VALUE = 0x00
|
|
INFO_MODE_INFO = 0x01
|
|
INFO_MODE_COMBINATIONS = 0x02
|
|
|
|
def __init__(self, port, info_type):
|
|
super(MsgPortInfoRequest, self).__init__()
|
|
self.port = port
|
|
self.info_type = info_type
|
|
self.needs_reply = True
|
|
|
|
def bytes(self):
|
|
self.payload = pack("<B", self.port) + pack("<B", self.info_type)
|
|
return super(MsgPortInfoRequest, self).bytes()
|
|
|
|
def is_reply(self, msg):
|
|
if msg.port != self.port:
|
|
return False
|
|
|
|
if self.info_type == self.INFO_PORT_VALUE:
|
|
return isinstance(msg, (MsgPortValueSingle, MsgPortValueCombined))
|
|
else:
|
|
return isinstance(msg, (MsgPortInfo,))
|
|
|
|
|
|
class MsgPortModeInfoRequest(DownstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-mode-information-request
|
|
"""
|
|
TYPE = 0x22
|
|
|
|
INFO_NAME = 0x00
|
|
INFO_RAW_RANGE = 0x01
|
|
INFO_PCT_RANGE = 0x02
|
|
INFO_SI_RANGE = 0x03 # no idea what 'SI' stands for
|
|
INFO_UNITS = 0x04
|
|
INFO_MAPPING = 0x05
|
|
# INFO_INTERNAL = 0x06
|
|
INFO_MOTOR_BIAS = 0x07
|
|
INFO_CAPABILITY_BITS = 0x08
|
|
INFO_VALUE_FORMAT = 0x80
|
|
|
|
INFO_TYPES = {
|
|
INFO_NAME: "Name",
|
|
INFO_RAW_RANGE: "Raw range",
|
|
INFO_PCT_RANGE: "Percent range",
|
|
INFO_SI_RANGE: "SI value range",
|
|
INFO_UNITS: "Units",
|
|
INFO_MAPPING: "Mapping",
|
|
INFO_MOTOR_BIAS: "Motor bias",
|
|
INFO_CAPABILITY_BITS: "Capabilities",
|
|
INFO_VALUE_FORMAT: "Value encoding",
|
|
}
|
|
|
|
def __init__(self, port, mode, info_type):
|
|
super(MsgPortModeInfoRequest, self).__init__()
|
|
self.port = port
|
|
self.mode = mode
|
|
self.info_type = info_type
|
|
self.payload = pack("<B", port) + pack("<B", mode) + pack("<B", info_type)
|
|
self.needs_reply = True
|
|
|
|
def is_reply(self, msg):
|
|
if not isinstance(msg, MsgPortModeInfo):
|
|
return False
|
|
|
|
if msg.port != self.port or msg.mode != self.mode or msg.info_type != self.info_type:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class MsgPortInputFmtSetupSingle(DownstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-setup-single
|
|
"""
|
|
TYPE = 0x41
|
|
|
|
def __init__(self, port, mode, delta=1, update_enable=0):
|
|
super(MsgPortInputFmtSetupSingle, self).__init__()
|
|
self.port = port
|
|
self.mode = mode
|
|
self.updates_enabled = update_enable
|
|
self.update_delta = delta
|
|
self.payload = pack("<B", port) + pack("<B", mode) + pack("<I", delta) + pack("<B", update_enable)
|
|
self.needs_reply = True
|
|
|
|
def is_reply(self, msg):
|
|
if isinstance(msg, MsgPortInputFmtSingle) and msg.port == self.port:
|
|
return True
|
|
|
|
|
|
class MsgPortInputFmtSetupCombined(DownstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-setup-combinedmode
|
|
"""
|
|
TYPE = 0x42
|
|
|
|
def __init__(self, port, mode, delta=1, update_enable=0):
|
|
super(MsgPortInputFmtSetupCombined, self).__init__()
|
|
self.port = port
|
|
self.payload = pack("<B", port) + pack("<B", mode) + pack("<I", delta) + pack("<B", update_enable)
|
|
self.needs_reply = True
|
|
|
|
def is_reply(self, msg):
|
|
if isinstance(msg, MsgPortInputFmtCombined) and msg.port == self.port:
|
|
return True
|
|
|
|
|
|
class MsgPortInfo(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-information
|
|
"""
|
|
TYPE = 0x43
|
|
|
|
CAP_OUTPUT = 0b00000001
|
|
CAP_INPUT = 0b00000010
|
|
CAP_COMBINABLE = 0b00000100
|
|
CAP_SYNCHRONIZABLE = 0b00001000
|
|
|
|
def __init__(self):
|
|
super(MsgPortInfo, self).__init__()
|
|
self.port = None
|
|
self.info_type = None
|
|
self.capabilities = None
|
|
self.total_modes = None
|
|
self.input_modes = None
|
|
self.output_modes = None
|
|
self.possible_mode_combinations = []
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortInfo, cls).decode(data)
|
|
assert isinstance(msg, MsgPortInfo)
|
|
msg.port = msg._byte()
|
|
msg.info_type = msg._byte()
|
|
if msg.info_type == MsgPortInfoRequest.INFO_MODE_INFO:
|
|
msg.capabilities = msg._byte()
|
|
msg.total_modes = msg._byte()
|
|
msg.input_modes = msg._bits_list(msg._short())
|
|
msg.output_modes = msg._bits_list(msg._short())
|
|
else:
|
|
while msg.payload:
|
|
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#pos-m
|
|
val = msg._short()
|
|
msg.possible_mode_combinations.append(msg._bits_list(val))
|
|
if not val:
|
|
break
|
|
return msg
|
|
|
|
def is_output(self):
|
|
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
|
|
return bool(self.capabilities & self.CAP_OUTPUT)
|
|
|
|
def is_input(self):
|
|
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
|
|
return bool(self.capabilities & self.CAP_INPUT)
|
|
|
|
def is_combinable(self):
|
|
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
|
|
return bool(self.capabilities & self.CAP_COMBINABLE)
|
|
|
|
def is_synchronizable(self):
|
|
assert self.info_type == MsgPortInfoRequest.INFO_MODE_INFO
|
|
return bool(self.capabilities & self.CAP_SYNCHRONIZABLE)
|
|
|
|
|
|
class MsgPortModeInfo(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-mode-information
|
|
"""
|
|
TYPE = 0x44
|
|
|
|
MAPPING_FLAGS = {
|
|
7: "Supports NULL value",
|
|
6: "Supports Functional Mapping 2.0+",
|
|
5: "N/A",
|
|
4: "Absolute [min..max]",
|
|
3: "Relative [-1..1]",
|
|
2: "Discrete [0, 1, 2, 3]",
|
|
1: "N/A",
|
|
0: "N/A",
|
|
}
|
|
|
|
DATASET_TYPES = {
|
|
0b00: "8 bit",
|
|
0b01: "16 bit",
|
|
0b10: "32 bit",
|
|
0b11: "FLOAT",
|
|
}
|
|
|
|
def __init__(self):
|
|
super(MsgPortModeInfo, self).__init__()
|
|
self.port = None
|
|
self.mode = None
|
|
self.info_type = None # @see MsgPortModeInfoRequest
|
|
self.value = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortModeInfo, cls).decode(data)
|
|
assert isinstance(msg, MsgPortModeInfo)
|
|
msg.port = msg._byte()
|
|
msg.mode = msg._byte()
|
|
msg.info_type = msg._byte()
|
|
msg.value = msg._value()
|
|
return msg
|
|
|
|
def _value(self):
|
|
info = MsgPortModeInfoRequest
|
|
if self.info_type == info.INFO_NAME:
|
|
return self.payload[:self.payload.index(b"\00")].decode('ascii')
|
|
elif self.info_type in (info.INFO_RAW_RANGE, info.INFO_PCT_RANGE, info.INFO_SI_RANGE):
|
|
return [self._float(), self._float()]
|
|
elif self.info_type == info.INFO_UNITS:
|
|
return self.payload[:self.payload.index(b"\00")].decode('ascii')
|
|
elif self.info_type == info.INFO_MAPPING:
|
|
inp = self._bits_list(self._byte())
|
|
outp = self._bits_list(self._byte())
|
|
return {
|
|
"input": [self.MAPPING_FLAGS[x] for x in inp],
|
|
"output": [self.MAPPING_FLAGS[x] for x in outp],
|
|
}
|
|
elif self.info_type == info.INFO_MOTOR_BIAS:
|
|
return self._byte()
|
|
elif self.info_type == info.INFO_VALUE_FORMAT:
|
|
return {
|
|
"datasets": self._byte(),
|
|
"type": self.DATASET_TYPES[self._byte()],
|
|
"total_figures": self._byte(),
|
|
"decimals": self._byte(),
|
|
}
|
|
else:
|
|
return self.payload # FIXME: will probably fail here
|
|
|
|
|
|
class MsgPortValueSingle(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-value-single
|
|
"""
|
|
TYPE = 0x45
|
|
|
|
def __init__(self):
|
|
super(MsgPortValueSingle, self).__init__()
|
|
self.port = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortValueSingle, cls).decode(data)
|
|
assert isinstance(msg, MsgPortValueSingle)
|
|
msg.port = msg._byte()
|
|
return msg
|
|
|
|
|
|
class MsgPortValueCombined(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-value-combinedmode
|
|
"""
|
|
TYPE = 0x46
|
|
|
|
def __init__(self):
|
|
super(MsgPortValueCombined, self).__init__()
|
|
self.port = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortValueCombined, cls).decode(data)
|
|
assert isinstance(msg, MsgPortValueCombined)
|
|
msg.port = msg._byte()
|
|
return msg
|
|
|
|
|
|
class MsgPortInputFmtSingle(UpstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
|
|
"""
|
|
TYPE = 0x47
|
|
|
|
def __init__(self, port=None, mode=None, upd_enabled=None, upd_delta=None):
|
|
super(MsgPortInputFmtSingle, self).__init__()
|
|
self.port = port
|
|
self.mode = mode
|
|
self.upd_delta = upd_delta
|
|
self.upd_enabled = upd_enabled
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortInputFmtSingle, cls).decode(data)
|
|
assert isinstance(msg, MsgPortInputFmtSingle)
|
|
msg.port = msg._byte()
|
|
msg.mode = msg._byte()
|
|
msg.upd_delta = msg._long()
|
|
if len(msg.payload):
|
|
msg.upd_enabled = msg._byte()
|
|
|
|
return msg
|
|
|
|
|
|
class MsgPortInputFmtCombined(UpstreamMsg): # TODO
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
|
|
"""
|
|
TYPE = 0x48
|
|
|
|
def __init__(self):
|
|
super(MsgPortInputFmtCombined, self).__init__()
|
|
self.port = None
|
|
self.combined_control = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortInputFmtCombined, cls).decode(data)
|
|
assert isinstance(msg, MsgPortInputFmtSingle)
|
|
msg.port = msg._byte()
|
|
return msg
|
|
|
|
|
|
class MsgVirtualPortSetup(DownstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#virtual-port-setup
|
|
"""
|
|
TYPE = 0x61
|
|
|
|
CMD_DISCONNECT = 0x00
|
|
CMD_CONNECT = 0x01
|
|
|
|
def __init__(self, cmd, port):
|
|
super(MsgVirtualPortSetup, self).__init__()
|
|
self.payload = pack("<B", cmd)
|
|
if cmd == self.CMD_DISCONNECT:
|
|
assert isinstance(port, int)
|
|
self.payload += pack("<B", port)
|
|
else:
|
|
assert isinstance(port, (list, tuple))
|
|
self.payload += pack("<B", port[0]) + pack("<B", port[1])
|
|
|
|
|
|
class MsgPortOutput(DownstreamMsg):
|
|
"""
|
|
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-output-command
|
|
"""
|
|
TYPE = 0x81
|
|
|
|
SC_NO_BUFFER = 0b00000001
|
|
SC_FEEDBACK = 0b00010000
|
|
|
|
WRITE_DIRECT = 0x50
|
|
WRITE_DIRECT_MODE_DATA = 0x51
|
|
|
|
def __init__(self, port, subcommand, params):
|
|
super(MsgPortOutput, self).__init__()
|
|
self.port = port
|
|
self.is_buffered = False
|
|
self.do_feedback = True
|
|
self.subcommand = subcommand
|
|
self.params = params
|
|
|
|
def bytes(self):
|
|
startup_completion_flags = 0
|
|
if not self.is_buffered:
|
|
startup_completion_flags |= self.SC_NO_BUFFER
|
|
|
|
if self.do_feedback:
|
|
startup_completion_flags |= self.SC_FEEDBACK
|
|
self.needs_reply = True
|
|
|
|
self.payload = pack("<B", self.port) + pack("<B", startup_completion_flags) \
|
|
+ pack("<B", self.subcommand) + self.params
|
|
return super(MsgPortOutput, self).bytes()
|
|
|
|
def is_reply(self, msg):
|
|
return isinstance(msg, MsgPortOutputFeedback) and msg.port == self.port \
|
|
and (msg.is_completed() or self.is_buffered)
|
|
|
|
|
|
class MsgPortOutputFeedback(UpstreamMsg):
|
|
TYPE = 0x82
|
|
|
|
def __init__(self):
|
|
super(MsgPortOutputFeedback, self).__init__()
|
|
self.port = None
|
|
self.status = None
|
|
|
|
@classmethod
|
|
def decode(cls, data):
|
|
msg = super(MsgPortOutputFeedback, cls).decode(data)
|
|
assert isinstance(msg, MsgPortOutputFeedback)
|
|
assert len(msg.payload) == 2, "TODO: implement multi-port feedback message"
|
|
msg.port = msg._byte()
|
|
msg.status = msg._byte()
|
|
return msg
|
|
|
|
def is_in_progress(self):
|
|
return self.status & 0b0001
|
|
|
|
def is_completed(self):
|
|
return self.status & 0b0010
|
|
|
|
def is_discarded(self):
|
|
return self.status & 0b0100
|
|
|
|
def is_idle(self):
|
|
return self.status & 0b1000
|
|
|
|
|
|
UPSTREAM_MSGS = (
|
|
MsgHubProperties, MsgHubAction, MsgHubAlert, MsgHubAttachedIO, MsgGenericError,
|
|
MsgPortInfo, MsgPortModeInfo,
|
|
MsgPortValueSingle, MsgPortValueCombined, MsgPortInputFmtSingle, MsgPortInputFmtCombined,
|
|
MsgPortOutputFeedback
|
|
)
|