diff --git a/demo.py b/demo.py index 806dbaa..c02ecad 100644 --- a/demo.py +++ b/demo.py @@ -24,7 +24,7 @@ def demo_tilt_sensor_simple(movehub): def demo_tilt_sensor_precise(movehub): log.info("Tilt sensor precise test. Turn device in different ways.") demo_tilt_sensor_simple.cnt = 0 - limit = 100 + limit = 50 def callback(pitch, roll, yaw): demo_tilt_sensor_simple.cnt += 1 @@ -40,7 +40,7 @@ def demo_tilt_sensor_precise(movehub): def demo_led_colors(movehub): # LED colors demo log.info("LED colors demo") - for color in COLORS.keys(): + for color in COLORS.keys()[1:] + [COLOR_BLACK]: log.info("Setting LED color to: %s", COLORS[color]) movehub.led.set_color(color) sleep(1) @@ -110,6 +110,22 @@ def vernie_head(movehub): sleep(2) +def demo_color_sensor(movehub): + log.info("Color sensor test: give it 3 things to detect color") + demo_color_sensor.cnt = 0 + + def callback(color, distance): + demo_color_sensor.cnt += 1 + clr = COLORS[color] if color in COLORS else None + log.info("#%s: Color %s, distance %s", demo_color_sensor.cnt, clr, distance) + + movehub.color_distance_sensor.subscribe(callback) + while demo_color_sensor.cnt < 300: + time.sleep(1) + + movehub.color_distance_sensor.unsubscribe(callback) + + def demo_all(movehub): demo_led_colors(movehub) demo_motors_timed(movehub) @@ -117,6 +133,7 @@ def demo_all(movehub): demo_port_cd_motor(movehub) demo_tilt_sensor_simple(movehub) demo_tilt_sensor_precise(movehub) + demo_color_sensor(movehub) if __name__ == '__main__': @@ -129,7 +146,7 @@ if __name__ == '__main__': connection = BLEConnection().connect() hub = MoveHub(connection) - demo_tilt_sensor_precise(hub) + demo_color_sensor(hub) # demo_all(hub) diff --git a/pylgbst/__init__.py b/pylgbst/__init__.py index e006f1e..ea6ee25 100644 --- a/pylgbst/__init__.py +++ b/pylgbst/__init__.py @@ -7,12 +7,12 @@ log = logging.getLogger('movehub') class MoveHub(object): """ - :type connection: pylegoboost.comms.Connection + :type connection: pylgbst.comms.Connection :type devices: dict[int,Peripheral] :type led: LED :type tilt_sensor: TiltSensor :type button: Button - :type color_distance_sensor: ColorDistanceSensor + :type color_distance_sensor: pylgbst.peripherals.ColorDistanceSensor :type external_motor: EncodedMotor :type port_C: Peripheral :type port_D: Peripheral @@ -67,35 +67,36 @@ class MoveHub(object): """ orig = data - if handle == MOVE_HUB_HARDWARE_HANDLE: - data = data[3:] - log.debug("Notification on %s: %s", handle, str2hex(orig)) - - msg_type = get_byte(data, 2) - - if msg_type == MSG_PORT_INFO: - self._handle_port_info(data) - elif msg_type == MSG_PORT_STATUS: - self._handle_port_status(data) - elif msg_type == MSG_SENSOR_DATA: - self._handle_sensor_data(data) - elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK: - log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)]) - elif msg_type == MSG_PORT_CMD_ERROR: - log.warning("Command error: %s", str2hex(data[3:])) - else: - log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig)) - else: + if handle != MOVE_HUB_HARDWARE_HANDLE: log.warning("Unsupported notification handle: 0x%s", handle) + return + + data = data[3:] + log.debug("Notification on %s: %s", handle, str2hex(orig)) + + msg_type = get_byte(data, 2) + + if msg_type == MSG_PORT_INFO: + self._handle_port_info(data) + elif msg_type == MSG_PORT_STATUS: + self._handle_port_status(data) + elif msg_type == MSG_SENSOR_DATA: + self._handle_sensor_data(data) + elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK: + log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)]) + elif msg_type == MSG_PORT_CMD_ERROR: + log.warning("Command error: %s", str2hex(data[3:])) + else: + log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig)) def _handle_sensor_data(self, data): port = get_byte(data, 3) if port not in self.devices: log.warning("Notification on port with no device: %s", PORTS[port]) return - sensor = self.devices[port] - if isinstance(sensor, TiltSensor): - sensor.handle_notification(data) + + device = self.devices[port] + device.handle_notification(data) def _handle_port_status(self, data): port = get_byte(data, 3) @@ -122,11 +123,11 @@ class MoveHub(object): if dev_type == TYPE_MOTOR: self.devices[port] = EncodedMotor(self, port) elif dev_type == TYPE_IMOTOR: - self.devices[port] = EncodedMotor(self, port) - self.external_motor = self.devices[port] + self.external_motor = EncodedMotor(self, port) + self.devices[port] = self.external_motor elif dev_type == TYPE_DISTANCE_COLOR_SENSOR: - self.devices[port] = ColorDistanceSensor(self, port) - self.color_distance_sensor = self.devices[port] + self.color_distance_sensor = ColorDistanceSensor(self, port) + self.devices[port] = self.color_distance_sensor elif dev_type == TYPE_LED: self.devices[port] = LED(self, port) elif dev_type == TYPE_TILT_SENSOR: diff --git a/pylgbst/constants.py b/pylgbst/constants.py index ff121b1..20a7d18 100644 --- a/pylgbst/constants.py +++ b/pylgbst/constants.py @@ -10,7 +10,7 @@ ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00' PACKET_VER = 0x01 # COLORS -COLOR_OFF = 0x00 +COLOR_BLACK = 0x00 COLOR_PINK = 0x01 COLOR_PURPLE = 0x02 COLOR_BLUE = 0x03 @@ -22,7 +22,7 @@ COLOR_ORANGE = 0x09 COLOR_RED = 0x09 COLOR_WHITE = 0x0a COLORS = { - COLOR_OFF: "OFF", + COLOR_BLACK: "BLACK", COLOR_PINK: "PINK", COLOR_PURPLE: "PURPLE", COLOR_BLUE: "BLUE", diff --git a/pylgbst/peripherals.py b/pylgbst/peripherals.py index e6b9551..b62ad52 100644 --- a/pylgbst/peripherals.py +++ b/pylgbst/peripherals.py @@ -2,7 +2,7 @@ import logging import struct import time -from pylgbst import get_byte, int2byte +from pylgbst import get_byte, int2byte, str2hex from pylgbst.constants import * log = logging.getLogger('peripherals') @@ -33,10 +33,6 @@ class Peripheral(object): self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, int2byte(len(cmd) + 1) + cmd) # should we +1 cmd len here? - def _set_port_val(self, value): - # FIXME: became obsolete - self._write_to_hub(MSG_SET_PORT_VAL, value) - def started(self): self.working = True @@ -47,14 +43,21 @@ class Peripheral(object): for subscriber in self._subscribers: subscriber(*args, **kwargs) + def handle_notification(self, data): + log.warning("Unhandled device notification for %s: %s", self, str2hex(data)) + class LED(Peripheral): def set_color(self, color): if color not in COLORS: raise ValueError("Color %s is not in list of available colors" % color) - cmd = b'\x11\x51\x00' + int2byte(color) - self._set_port_val(cmd) + cmd = b'\xFF\x51\x00' + int2byte(color) + self._write_to_hub(MSG_SET_PORT_VAL, cmd) + + def finished(self): + super(LED, self).finished() + log.debug("LED has changed color") class EncodedMotor(Peripheral): @@ -89,7 +92,7 @@ class EncodedMotor(Peripheral): command += self.TRAILER - self._set_port_val(command) + self._write_to_hub(MSG_SET_PORT_VAL, command) def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): if speed_secondary is None: @@ -128,19 +131,22 @@ class TiltSensor(Peripheral): super(TiltSensor, self).__init__(parent, port) self.mode = None - def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, threshold=1): + def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, granularity=1): self.mode = mode + params = int2byte(self.mode) - params += int2byte(threshold) + params += int2byte(granularity) params += self.TRAILER params += int2byte(1) # enable - self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params + self.TRAILER) + self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params) self._subscribers.add(callback) - def unsubscribe(self, callback): - self._subscribers.remove(callback) + def unsubscribe(self, callback=None): + if callback in self._subscribers: + self._subscribers.remove(callback) + if not self._subscribers: - self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + self.TRAILER + int2byte(0)) + self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + b'\x00\x00\x00' + int2byte(0)) def handle_notification(self, data): if self.mode == TILT_SENSOR_MODE_BASIC: @@ -173,7 +179,29 @@ class TiltSensor(Peripheral): class ColorDistanceSensor(Peripheral): - pass + def subscribe(self, callback): + params = b'\x08\x01\x00\x00\x00' + params += int2byte(1) # enable + self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params) + self._subscribers.add(callback) + + def unsubscribe(self, callback=None): + if callback in self._subscribers: + self._subscribers.remove(callback) + + if not self._subscribers: + self._write_to_hub(MSG_SENSOR_SUBSCRIBE, b'\x08\x01\x00\x00\x00' + int2byte(0)) + + def handle_notification(self, data): + color = get_byte(data, 4) + distance = get_byte(data, 5) + partial = get_byte(data, 7) + if partial: + distance += 1.0 / partial + self._notify_subscribers(color if color != 0xFF else None, float(distance)) + + +# 0a00 41 01 01 enable class Button(Peripheral): diff --git a/tests.py b/tests.py index 72a1032..a47005a 100644 --- a/tests.py +++ b/tests.py @@ -1,12 +1,6 @@ -import logging -import time import unittest -from threading import Thread -from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB -from pylgbst.comms import Connection, str2hex, hex2str -from pylgbst.constants import PORT_LED, TILT_STATES, TILT_SENSOR_MODE_2AXIS_FULL, TILT_SENSOR_MODE_2AXIS_SIMPLE, \ - MOVE_HUB_HARDWARE_HANDLE +from pylgbst import * HANDLE = MOVE_HUB_HARDWARE_HANDLE @@ -103,7 +97,7 @@ class GeneralTest(unittest.TestCase): self._wait_notifications_handled(hub) hub.tilt_sensor.unsubscribe(callback) - # self.assertEquals("0a01413a000100000001", hub.connection.writes[0][1]) + # TODO: assert def test_motor(self): conn = ConnectionMock() @@ -132,3 +126,21 @@ class GeneralTest(unittest.TestCase): hub = MoveHub(conn) # demo_all(hub) self._wait_notifications_handled(hub) + + def test_color_sensor(self): + # + hub = HubMock() + hub.connection.notifications.append((HANDLE, '1b0e000f0004010125000000001000000010')) + time.sleep(1) + + def callback(color, unk1, unk2): + name = COLORS[color] if color is not None else 'NONE' + log.info("Color: %s %s %s", name, unk1, unk2) + + hub.color_distance_sensor.subscribe(callback) + + hub.connection.notifications.append((HANDLE, "1b0e0008004501ff0aff00")) + time.sleep(1) + # TODO: assert + self._wait_notifications_handled(hub) + hub.color_distance_sensor.unsubscribe(callback)