1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

getting hub info

This commit is contained in:
Andrey Pohilko 2017-09-23 14:09:53 +03:00
parent a5b0a567df
commit acdf631b66
6 changed files with 66 additions and 17 deletions

View File

@ -287,8 +287,7 @@ Then push green button on MoveHub, so permanent BLE connection will be establish
## Roadmap & TODO
- handle device detach and device attach events on ports C/D
- generalize getting device info + give constants (low priority)
- work out completely synchronous command model - with either command error or confirmation
- organize requesting and printing device info on startup - firmware version at least
- document all API methods
- make sure unit tests cover all important code

View File

@ -77,7 +77,7 @@ class BLEConnection(Connection):
super(BLEConnection, self).__init__()
self.requester = None
def connect(self, bt_iface_name='hci0'):
def connect(self, bt_iface_name='hci0', hub_mac=None):
service = DiscoveryService(bt_iface_name)
while not self.requester:
@ -86,7 +86,7 @@ class BLEConnection(Connection):
log.debug("Devices: %s", devices)
for address, name in devices.items():
if name == LEGO_MOVE_HUB:
if name == LEGO_MOVE_HUB or hub_mac == address:
logging.info("Found %s at %s", name, address)
self.requester = Requester(address, True, bt_iface_name)
break

View File

@ -116,3 +116,21 @@ COLORS = {
COLOR_WHITE: "WHITE",
COLOR_NONE: "NONE"
}
# DEVICE INFO
INFO_DEVICE_NAME = 0x01
INFO_BUTTON_STATE = 0x02
INFO_FIRMWARE_VERSION = 0x03
INFO_SOME4 = 0x04
INFO_SOME5_JITTERING = 0x05
INFO_SOME6 = 0x06
INFO_SOME7 = 0x07
INFO_MANUFACTURER = 0x08
INFO_HW_VERSION = 0x09
INFO_SOME10 = 0x0a
INFO_SOME11 = 0x0b
INFO_SOME12 = 0x0c
INFO_ACTION_SUBSCRIBE = 0x02
INFO_ACTION_UNSUBSCRIBE = 0x03
INFO_ACTION_GET = 0x05

View File

@ -40,6 +40,7 @@ class MoveHub(object):
connection = BLEConnection()
self.connection = connection
self.info = {}
self.devices = {}
# shorthand fields
@ -59,6 +60,11 @@ class MoveHub(object):
self.connection.set_notify_handler(self._notify)
self._wait_for_devices()
self._report_status()
def send(self, msg_type, payload):
cmd = pack("<B", PACKET_VER) + pack("<B", msg_type) + payload
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
def _wait_for_devices(self):
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
@ -107,8 +113,12 @@ class MoveHub(object):
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
def _handle_device_info(self, data):
if usbyte(data, 3) == 2:
kind = usbyte(data, 3)
if kind == 2:
self.button.handle_port_data(data)
if usbyte(data, 4) == 0x06:
self.info[kind] = data[5:]
else:
log.warning("Unhandled device info: %s", str2hex(data))
@ -200,5 +210,27 @@ class MoveHub(object):
log.warning("Unhandled port: %s", PORTS[port])
def shutdown(self):
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
self.send(MSG_DEVICE_SHUTDOWN, b'')
def _report_status(self):
# TODO: add firmware version
log.info("%s by %s", self.info_get(INFO_DEVICE_NAME), self.info_get(INFO_MANUFACTURER))
self.__voltage = 0
def on_voltage(value):
self.__voltage = value
self.voltage.subscribe(on_voltage, granularity=0)
while not self.__voltage:
time.sleep(0.01)
self.voltage.unsubscribe(on_voltage)
log.info("Voltage: %d%%", self.__voltage * 100)
def info_get(self, info_type):
self.info[info_type] = None
self.send(MSG_DEVICE_INFO, pack("<B", info_type) + pack("<B", INFO_ACTION_GET))
while self.info[info_type] is None: # FIXME: will hang forever on error
time.sleep(0.05)
return self.info[info_type]

View File

@ -36,9 +36,8 @@ class Peripheral(object):
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port)
def _write_to_hub(self, msg_type, params):
cmd = pack("<B", PACKET_VER) + pack("<B", msg_type) + pack("<B", self.port)
cmd += params
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
cmd = pack("<B", self.port) + params
self.parent.send(msg_type, cmd)
def _port_subscribe(self, mode, granularity, enable):
params = pack("<B", mode)
@ -373,7 +372,7 @@ class ColorDistanceSensor(Peripheral):
class Voltage(Peripheral):
MODE1 = 0x00 # give less frequent notifications
MODE2 = 0x01 # give more frequent notifications, maybe different voltage (cpu vs board?)
MODE2 = 0x01 # give more frequent notifications, maybe different value (cpu vs board?)
def __init__(self, parent, port):
super(Voltage, self).__init__(parent, port)
@ -383,7 +382,7 @@ class Voltage(Peripheral):
super(Voltage, self).subscribe(callback, mode, granularity)
# we know only voltage subscription from it. is it really battery or just onboard voltage?
# device has turned off on 1b0e00060045 3b a800 - 168 dec / 1b0e00060045 3c 0803 / 1b0e000600453c 0703
# device has turned off on 1b0e00060045 3c 0803 / 1b0e000600453c 0703
# moderate 9v ~= 3840
# good 7.5v ~= 3892
# liion 5v ~= 2100
@ -397,7 +396,7 @@ class Voltage(Peripheral):
class Amperage(Peripheral):
MODE1 = 0x00 # give less frequent notifications
MODE2 = 0x01 # give more frequent notifications, maybe different voltage (cpu vs board?)
MODE2 = 0x01 # give more frequent notifications, maybe different value (cpu vs board?)
def __init__(self, parent, port):
super(Amperage, self).__init__(parent, port)
@ -421,9 +420,8 @@ class Button(Peripheral):
super(Button, self).__init__(parent, 0) # fake port 0
def subscribe(self, callback, mode=None, granularity=1, async=False):
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_INFO) + b'\x02\x02'
self.started()
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
self.parent.send(MSG_DEVICE_INFO, pack('<B', INFO_BUTTON_STATE) + pack('<B', INFO_ACTION_SUBSCRIBE))
self._wait_sync(async)
if callback:
@ -434,8 +432,7 @@ class Button(Peripheral):
self._subscribers.remove(callback)
if not self._subscribers:
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_INFO) + b'\x02\x03'
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
self.parent.send(MSG_DEVICE_INFO, pack('<B', INFO_BUTTON_STATE) + pack('<B', INFO_ACTION_UNSUBSCRIBE))
def handle_port_data(self, data):
param = usbyte(data, 5)

View File

@ -59,6 +59,9 @@ class HubMock(MoveHub):
def _wait_for_devices(self):
pass
def _report_status(self):
pass
class GeneralTest(unittest.TestCase):
def _wait_notifications_handled(self, hub):