From e2fe66ac2d3d6e9c9cff4e371318997bdc63923c Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Wed, 13 Sep 2017 20:14:17 +0300 Subject: [PATCH] refactored code to detect devices --- README.md | 3 +- demo.py | 36 +++++++------ pylegoboost/__init__.py | 111 +++++++++++++++++++++++++++++++-------- pylegoboost/comms.py | 13 ----- pylegoboost/constants.py | 44 ++++++++++++++-- test.py | 61 ++++++++++++++++++++- 6 files changed, 209 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 7fc213e..1a8792d 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ Best way to start is to look at [demo.py](demo.py) file, or maybe run it. # Ideas - +Make it 2/3 compatible +Add travis unit tests and coverage # Links diff --git a/demo.py b/demo.py index 0bbff79..668cbd2 100644 --- a/demo.py +++ b/demo.py @@ -12,7 +12,7 @@ def demo_all(movehub): demo_led_colors(movehub) demo_motors_timed(movehub) demo_motors_angled(movehub) - demo_port_c_motor(movehub) + demo_port_cd_motor(movehub) def demo_led_colors(movehub): @@ -51,12 +51,20 @@ def demo_motors_angled(movehub): sleep(1) -def demo_port_c_motor(movehub): - portd = EncodedMotor(movehub, PORT_D) - portd.angled(90, 1) - sleep(1) - portd.angled(90, -1) - sleep(1) +def demo_port_cd_motor(movehub): + motor = None + if isinstance(movehub.port_D, EncodedMotor): + motor = movehub.port_D + elif isinstance(movehub.port_C, EncodedMotor): + motor = movehub.port_D + else: + log.warning("Motor not found on ports C or D") + + if motor: + motor.angled(20, 1) + sleep(1) + motor.angled(20, -1) + sleep(1) def vernie_head(movehub): @@ -74,7 +82,7 @@ def vernie_head(movehub): if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.INFO) try: connection = DebugServerConnection() @@ -84,10 +92,8 @@ if __name__ == '__main__': hub = MoveHub(connection) sleep(1) - #hub.get_name() - - for x in range(1, 60): - sleep(1) - - # demo_led_colors(hub) - # demo_all(movehub) + # hub.get_name() + demo_port_cd_motor(hub) + # demo_all(hub) + # demo_led_colors(hub) + sleep(1) diff --git a/pylegoboost/__init__.py b/pylegoboost/__init__.py index 51c3f90..9411647 100644 --- a/pylegoboost/__init__.py +++ b/pylegoboost/__init__.py @@ -11,20 +11,22 @@ class MoveHub(object): """ :type connection: pylegoboost.comms.Connection :type led: LED + :type devices: dict[int,Peripheral] """ def __init__(self, connection): - self.notified = False - self.connection = connection + self.devices = {} - self.led = LED(self) - self.motor_A = EncodedMotor(self, PORT_A) - self.motor_B = EncodedMotor(self, PORT_B) - self.motor_AB = EncodedMotor(self, PORT_AB) + # shorthand fields + self.led = None + self.motor_A = None + self.motor_B = None + self.motor_AB = None + self.tilt_sensor = None + self.color_distance_sensor = None # self.button - # self.tilt_sensor # enables notifications reading self.connection.set_notify_handler(self._notify) @@ -34,28 +36,90 @@ class MoveHub(object): # log.debug("Waiting to be notified") # time.sleep(1) - # self.port_C = None - # self.port_D = None + self.port_C = None + self.port_D = None # transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01') - def _notify(self, handle, data): - # TODO - log.debug("Notification on %s: %s", handle, data.encode("hex")) - def get_name(self): # note: reading this too fast makes it hang self.connection.read(DEVICE_NAME) + def _notify(self, handle, data): + """ + Using https://github.com/JorgePe/BOOSTreveng/blob/master/Notifications.md + """ + orig = data + log.debug("Notification on %s: %s", handle, orig.encode("hex")) + data = data[3:] + + msg_type = ord(data[2]) + + if msg_type == MSG_PORT_INFO: + self._handle_port_info(data) + else: + log.warning("Unhandled msg type %s: %s", msg_type, orig.encode("hex")) + + pass + + def _handle_port_info(self, data): + port = ord(data[3]) + dev_type = ord(data[5]) + + if port in PORTS and dev_type in DEVICE_TYPES: + log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port]) + else: + log.debug("Device 0x%x at port 0x%x", dev_type, port) + + if dev_type == TYPE_MOTOR: + self.devices[port] = EncodedMotor(self, port) + elif dev_type == TYPE_IMOTOR: + self.devices[port] = EncodedMotor(self, port) + elif dev_type == TYPE_DISTANCE_COLOR_SENSOR: + self.devices[port] = ColorDistanceSensor(self, port) + self.color_distance_sensor = self.devices[port] + elif dev_type == TYPE_LED: + self.devices[port] = LED(self, port) + elif dev_type == TYPE_TILT_SENSOR: + self.devices[port] = TiltSensor(self, port) + else: + log.warning("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port) + + if port == PORT_A: + self.motor_A = self.devices[port] + elif port == PORT_B: + self.motor_B = self.devices[port] + elif port == PORT_AB: + self.motor_AB = self.devices[port] + elif port == PORT_C: + self.port_C = self.devices[port] + elif port == PORT_D: + self.port_D = self.devices[port] + elif port == PORT_LED: + self.led = self.devices[port] + elif port == PORT_TILT_SENSOR: + self.tilt_sensor = self.devices[port] + else: + log.warning("Unhandled port: %s", PORTS[port]) + class Peripheral(object): """ :type parent: MoveHub """ + PACKET_VER = b'\x01' + SET_PORT_VAL = b'\x81' - def __init__(self, parent): + def __init__(self, parent, port): super(Peripheral, self).__init__() self.parent = parent + self.port = port + + def _set_port_val(self, value): + cmd = self.PACKET_VER + self.SET_PORT_VAL + chr(self.port) + cmd += value + + self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(cmd)) + cmd) class LED(Peripheral): @@ -63,14 +127,12 @@ class LED(Peripheral): if color not in COLORS: raise ValueError("Color %s is not in list of available colors" % color) - cmd = CMD_SET_COLOR + chr(color) - self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, cmd) + cmd = '\x11\x51\x00' + chr(color) + self._set_port_val(cmd) class EncodedMotor(Peripheral): TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something - PACKET_VER = b'\x01' - SET_PORT_VAL = b'\x81' MOVEMENT_TYPE = b'\x11' TIMED_SINGLE = b'\x09' TIMED_GROUP = b'\x0A' @@ -78,10 +140,9 @@ class EncodedMotor(Peripheral): ANGLED_GROUP = b'\x0C' def __init__(self, parent, port): - super(EncodedMotor, self).__init__(parent) - if port not in PORTS: + super(EncodedMotor, self).__init__(parent, port) + if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]: raise ValueError("Invalid port for motor: %s" % port) - self.port = port def _speed_abs(self, relative): if relative < -1 or relative > 1: @@ -94,7 +155,7 @@ class EncodedMotor(Peripheral): def _wrap_and_write(self, command, speed_primary, speed_secondary): # set for port - command = self.SET_PORT_VAL + chr(self.port) + self.MOVEMENT_TYPE + command + command = self.MOVEMENT_TYPE + command command += chr(self._speed_abs(speed_primary)) if self.port == PORT_AB: @@ -102,7 +163,7 @@ class EncodedMotor(Peripheral): command += self.TRAILER - self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(command) + 1) + self.PACKET_VER + command) + self._set_port_val(command) def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): if speed_secondary is None: @@ -136,3 +197,7 @@ class EncodedMotor(Peripheral): class ColorDistanceSensor(Peripheral): pass + + +class TiltSensor(Peripheral): + pass diff --git a/pylegoboost/comms.py b/pylegoboost/comms.py index 9856c68..cfa4b65 100644 --- a/pylegoboost/comms.py +++ b/pylegoboost/comms.py @@ -49,19 +49,6 @@ class Connection(object): pass -class ConnectionMock(Connection): - """ - For unit testing purposes - """ - - def write(self, handle, data): - log.debug("Writing to %s: %s", handle, data.encode("hex")) - - def read(self, handle): - log.debug("Reading from: %s", handle) - return None # TODO - - class BLEConnection(Connection): """ Main transport class, uses real Bluetooth LE connection. diff --git a/pylegoboost/constants.py b/pylegoboost/constants.py index e68c6fc..a13c4b0 100644 --- a/pylegoboost/constants.py +++ b/pylegoboost/constants.py @@ -33,13 +33,47 @@ COLORS = { COLOR_WHITE: "WHITE" } -CMD_SET_COLOR = b'\x08\x00\x81\x32\x11\x51\x00' - # PORTS +PORT_C = 0x01 +PORT_D = 0x02 +PORT_LED = 0x32 PORT_A = 0x37 PORT_B = 0x38 PORT_AB = 0x39 -PORT_C = 0x01 -PORT_D = 0x02 +PORT_TILT_SENSOR = 0x3a +PORT_SOMETHING1 = 0x3B +PORT_SOMETHING2 = 0x3C -PORTS = [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D] +PORTS = { + PORT_A: "A", + PORT_B: "B", + PORT_AB: "AB", + PORT_C: "C", + PORT_D: "D", + PORT_LED: "LED", + PORT_TILT_SENSOR: "TILT_SENSOR", + PORT_SOMETHING1: "UNK1", + PORT_SOMETHING2: "UNK2", +} + +# NOTIFICATIONS +MSG_PORT_INFO = 0x04 + +TYPE_DISTANCE_COLOR_SENSOR = 0x25 +TYPE_IMOTOR = 0x26 +TYPE_MOTOR = 0x27 +TYPE_TILT_SENSOR = 0x28 +TYPE_LED = 0x17 +# one of them is button? another is battery? +TYPE_SOMETHING1 = 0x15 +TYPE_SOMETHING2 = 0x14 + +DEVICE_TYPES = { + TYPE_DISTANCE_COLOR_SENSOR: "DISTANCE_COLOR_SENSOR", + TYPE_IMOTOR: "IMOTOR", + TYPE_MOTOR: "MOTOR", + TYPE_TILT_SENSOR: "TILT_SENSOR", + TYPE_LED: "LED", + TYPE_SOMETHING1: "UNK1", + TYPE_SOMETHING2: "UNK2", +} diff --git a/test.py b/test.py index 2f703d0..18a880b 100644 --- a/test.py +++ b/test.py @@ -1,13 +1,70 @@ import logging +import time import unittest +from threading import Thread from demo import demo_all -from pylegoboost.comms import ConnectionMock +from pylegoboost import MoveHub +from pylegoboost.comms import Connection logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger('test') + + +class ConnectionMock(Connection): + """ + For unit testing purposes + """ + + def __init__(self): + super(ConnectionMock, self).__init__() + self.notifications = [] + self.notification_handler = None + self.running = True + self.finished = False + + def set_notify_handler(self, handler): + self.notification_handler = handler + thr = Thread(target=self.notifier) + thr.setDaemon(True) + thr.start() + + def notifier(self): + while self.running: + if self.notification_handler: + while self.notifications: + handle, data = self.notifications.pop(0) + self.notification_handler(handle, data.replace(' ', '').decode('hex')) + time.sleep(0.1) + + self.finished = True + + def write(self, handle, data): + log.debug("Writing to %s: %s", handle, data.encode("hex")) + + def read(self, handle): + log.debug("Reading from: %s", handle) + return None # TODO + class GeneralTest(unittest.TestCase): def test_capabilities(self): conn = ConnectionMock() - demo_all(conn) + hub = MoveHub(conn) + time.sleep(1) + conn.notifications.append((14, '1b0e00 0f00 04 01 0125000000001000000010')) + conn.notifications.append((14, '1b0e00 0f00 04 02 0126000000001000000010')) + conn.notifications.append((14, '1b0e00 0f00 04 37 0127000100000001000000')) + conn.notifications.append((14, '1b0e00 0f00 04 38 0127000100000001000000')) + conn.notifications.append((14, '1b0e00 0900 04 39 0227003738')) + conn.notifications.append((14, '1b0e00 0f00 04 32 0117000100000001000000')) + conn.notifications.append((14, '1b0e00 0f00 04 3a 0128000000000100000001')) + conn.notifications.append((14, '1b0e00 0f00 04 3b 0115000200000002000000')) + conn.notifications.append((14, '1b0e00 0f00 04 3c 0114000200000002000000')) + time.sleep(1) + demo_all(hub) + conn.running = False + + while not conn.finished: + time.sleep(0.1)