mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Working on color and distance sensor
This commit is contained in:
parent
5ec9d52ce1
commit
d2d8fd54a7
23
demo.py
23
demo.py
@ -24,7 +24,7 @@ def demo_tilt_sensor_simple(movehub):
|
||||
def demo_tilt_sensor_precise(movehub):
|
||||
log.info("Tilt sensor precise test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 100
|
||||
limit = 50
|
||||
|
||||
def callback(pitch, roll, yaw):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
@ -40,7 +40,7 @@ def demo_tilt_sensor_precise(movehub):
|
||||
def demo_led_colors(movehub):
|
||||
# LED colors demo
|
||||
log.info("LED colors demo")
|
||||
for color in COLORS.keys():
|
||||
for color in COLORS.keys()[1:] + [COLOR_BLACK]:
|
||||
log.info("Setting LED color to: %s", COLORS[color])
|
||||
movehub.led.set_color(color)
|
||||
sleep(1)
|
||||
@ -110,6 +110,22 @@ def vernie_head(movehub):
|
||||
sleep(2)
|
||||
|
||||
|
||||
def demo_color_sensor(movehub):
|
||||
log.info("Color sensor test: give it 3 things to detect color")
|
||||
demo_color_sensor.cnt = 0
|
||||
|
||||
def callback(color, distance):
|
||||
demo_color_sensor.cnt += 1
|
||||
clr = COLORS[color] if color in COLORS else None
|
||||
log.info("#%s: Color %s, distance %s", demo_color_sensor.cnt, clr, distance)
|
||||
|
||||
movehub.color_distance_sensor.subscribe(callback)
|
||||
while demo_color_sensor.cnt < 300:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.color_distance_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_all(movehub):
|
||||
demo_led_colors(movehub)
|
||||
demo_motors_timed(movehub)
|
||||
@ -117,6 +133,7 @@ def demo_all(movehub):
|
||||
demo_port_cd_motor(movehub)
|
||||
demo_tilt_sensor_simple(movehub)
|
||||
demo_tilt_sensor_precise(movehub)
|
||||
demo_color_sensor(movehub)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@ -129,7 +146,7 @@ if __name__ == '__main__':
|
||||
connection = BLEConnection().connect()
|
||||
|
||||
hub = MoveHub(connection)
|
||||
demo_tilt_sensor_precise(hub)
|
||||
demo_color_sensor(hub)
|
||||
|
||||
# demo_all(hub)
|
||||
|
||||
|
@ -7,12 +7,12 @@ log = logging.getLogger('movehub')
|
||||
|
||||
class MoveHub(object):
|
||||
"""
|
||||
:type connection: pylegoboost.comms.Connection
|
||||
:type connection: pylgbst.comms.Connection
|
||||
:type devices: dict[int,Peripheral]
|
||||
:type led: LED
|
||||
:type tilt_sensor: TiltSensor
|
||||
:type button: Button
|
||||
:type color_distance_sensor: ColorDistanceSensor
|
||||
:type color_distance_sensor: pylgbst.peripherals.ColorDistanceSensor
|
||||
:type external_motor: EncodedMotor
|
||||
:type port_C: Peripheral
|
||||
:type port_D: Peripheral
|
||||
@ -67,35 +67,36 @@ class MoveHub(object):
|
||||
"""
|
||||
orig = data
|
||||
|
||||
if handle == MOVE_HUB_HARDWARE_HANDLE:
|
||||
data = data[3:]
|
||||
log.debug("Notification on %s: %s", handle, str2hex(orig))
|
||||
|
||||
msg_type = get_byte(data, 2)
|
||||
|
||||
if msg_type == MSG_PORT_INFO:
|
||||
self._handle_port_info(data)
|
||||
elif msg_type == MSG_PORT_STATUS:
|
||||
self._handle_port_status(data)
|
||||
elif msg_type == MSG_SENSOR_DATA:
|
||||
self._handle_sensor_data(data)
|
||||
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
|
||||
log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)])
|
||||
elif msg_type == MSG_PORT_CMD_ERROR:
|
||||
log.warning("Command error: %s", str2hex(data[3:]))
|
||||
else:
|
||||
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
|
||||
else:
|
||||
if handle != MOVE_HUB_HARDWARE_HANDLE:
|
||||
log.warning("Unsupported notification handle: 0x%s", handle)
|
||||
return
|
||||
|
||||
data = data[3:]
|
||||
log.debug("Notification on %s: %s", handle, str2hex(orig))
|
||||
|
||||
msg_type = get_byte(data, 2)
|
||||
|
||||
if msg_type == MSG_PORT_INFO:
|
||||
self._handle_port_info(data)
|
||||
elif msg_type == MSG_PORT_STATUS:
|
||||
self._handle_port_status(data)
|
||||
elif msg_type == MSG_SENSOR_DATA:
|
||||
self._handle_sensor_data(data)
|
||||
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
|
||||
log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)])
|
||||
elif msg_type == MSG_PORT_CMD_ERROR:
|
||||
log.warning("Command error: %s", str2hex(data[3:]))
|
||||
else:
|
||||
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
|
||||
|
||||
def _handle_sensor_data(self, data):
|
||||
port = get_byte(data, 3)
|
||||
if port not in self.devices:
|
||||
log.warning("Notification on port with no device: %s", PORTS[port])
|
||||
return
|
||||
sensor = self.devices[port]
|
||||
if isinstance(sensor, TiltSensor):
|
||||
sensor.handle_notification(data)
|
||||
|
||||
device = self.devices[port]
|
||||
device.handle_notification(data)
|
||||
|
||||
def _handle_port_status(self, data):
|
||||
port = get_byte(data, 3)
|
||||
@ -122,11 +123,11 @@ class MoveHub(object):
|
||||
if dev_type == TYPE_MOTOR:
|
||||
self.devices[port] = EncodedMotor(self, port)
|
||||
elif dev_type == TYPE_IMOTOR:
|
||||
self.devices[port] = EncodedMotor(self, port)
|
||||
self.external_motor = self.devices[port]
|
||||
self.external_motor = EncodedMotor(self, port)
|
||||
self.devices[port] = self.external_motor
|
||||
elif dev_type == TYPE_DISTANCE_COLOR_SENSOR:
|
||||
self.devices[port] = ColorDistanceSensor(self, port)
|
||||
self.color_distance_sensor = self.devices[port]
|
||||
self.color_distance_sensor = ColorDistanceSensor(self, port)
|
||||
self.devices[port] = self.color_distance_sensor
|
||||
elif dev_type == TYPE_LED:
|
||||
self.devices[port] = LED(self, port)
|
||||
elif dev_type == TYPE_TILT_SENSOR:
|
||||
|
@ -10,7 +10,7 @@ ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
|
||||
PACKET_VER = 0x01
|
||||
|
||||
# COLORS
|
||||
COLOR_OFF = 0x00
|
||||
COLOR_BLACK = 0x00
|
||||
COLOR_PINK = 0x01
|
||||
COLOR_PURPLE = 0x02
|
||||
COLOR_BLUE = 0x03
|
||||
@ -22,7 +22,7 @@ COLOR_ORANGE = 0x09
|
||||
COLOR_RED = 0x09
|
||||
COLOR_WHITE = 0x0a
|
||||
COLORS = {
|
||||
COLOR_OFF: "OFF",
|
||||
COLOR_BLACK: "BLACK",
|
||||
COLOR_PINK: "PINK",
|
||||
COLOR_PURPLE: "PURPLE",
|
||||
COLOR_BLUE: "BLUE",
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import struct
|
||||
import time
|
||||
|
||||
from pylgbst import get_byte, int2byte
|
||||
from pylgbst import get_byte, int2byte, str2hex
|
||||
from pylgbst.constants import *
|
||||
|
||||
log = logging.getLogger('peripherals')
|
||||
@ -33,10 +33,6 @@ class Peripheral(object):
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE,
|
||||
int2byte(len(cmd) + 1) + cmd) # should we +1 cmd len here?
|
||||
|
||||
def _set_port_val(self, value):
|
||||
# FIXME: became obsolete
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, value)
|
||||
|
||||
def started(self):
|
||||
self.working = True
|
||||
|
||||
@ -47,14 +43,21 @@ class Peripheral(object):
|
||||
for subscriber in self._subscribers:
|
||||
subscriber(*args, **kwargs)
|
||||
|
||||
def handle_notification(self, data):
|
||||
log.warning("Unhandled device notification for %s: %s", self, str2hex(data))
|
||||
|
||||
|
||||
class LED(Peripheral):
|
||||
def set_color(self, color):
|
||||
if color not in COLORS:
|
||||
raise ValueError("Color %s is not in list of available colors" % color)
|
||||
|
||||
cmd = b'\x11\x51\x00' + int2byte(color)
|
||||
self._set_port_val(cmd)
|
||||
cmd = b'\xFF\x51\x00' + int2byte(color)
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, cmd)
|
||||
|
||||
def finished(self):
|
||||
super(LED, self).finished()
|
||||
log.debug("LED has changed color")
|
||||
|
||||
|
||||
class EncodedMotor(Peripheral):
|
||||
@ -89,7 +92,7 @@ class EncodedMotor(Peripheral):
|
||||
|
||||
command += self.TRAILER
|
||||
|
||||
self._set_port_val(command)
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, command)
|
||||
|
||||
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
||||
if speed_secondary is None:
|
||||
@ -128,19 +131,22 @@ class TiltSensor(Peripheral):
|
||||
super(TiltSensor, self).__init__(parent, port)
|
||||
self.mode = None
|
||||
|
||||
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, threshold=1):
|
||||
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, granularity=1):
|
||||
self.mode = mode
|
||||
|
||||
params = int2byte(self.mode)
|
||||
params += int2byte(threshold)
|
||||
params += int2byte(granularity)
|
||||
params += self.TRAILER
|
||||
params += int2byte(1) # enable
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params + self.TRAILER)
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback):
|
||||
self._subscribers.remove(callback)
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + self.TRAILER + int2byte(0))
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + b'\x00\x00\x00' + int2byte(0))
|
||||
|
||||
def handle_notification(self, data):
|
||||
if self.mode == TILT_SENSOR_MODE_BASIC:
|
||||
@ -173,7 +179,29 @@ class TiltSensor(Peripheral):
|
||||
|
||||
|
||||
class ColorDistanceSensor(Peripheral):
|
||||
pass
|
||||
def subscribe(self, callback):
|
||||
params = b'\x08\x01\x00\x00\x00'
|
||||
params += int2byte(1) # enable
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, b'\x08\x01\x00\x00\x00' + int2byte(0))
|
||||
|
||||
def handle_notification(self, data):
|
||||
color = get_byte(data, 4)
|
||||
distance = get_byte(data, 5)
|
||||
partial = get_byte(data, 7)
|
||||
if partial:
|
||||
distance += 1.0 / partial
|
||||
self._notify_subscribers(color if color != 0xFF else None, float(distance))
|
||||
|
||||
|
||||
# 0a00 41 01 01 enable
|
||||
|
||||
|
||||
class Button(Peripheral):
|
||||
|
28
tests.py
28
tests.py
@ -1,12 +1,6 @@
|
||||
import logging
|
||||
import time
|
||||
import unittest
|
||||
from threading import Thread
|
||||
|
||||
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB
|
||||
from pylgbst.comms import Connection, str2hex, hex2str
|
||||
from pylgbst.constants import PORT_LED, TILT_STATES, TILT_SENSOR_MODE_2AXIS_FULL, TILT_SENSOR_MODE_2AXIS_SIMPLE, \
|
||||
MOVE_HUB_HARDWARE_HANDLE
|
||||
from pylgbst import *
|
||||
|
||||
HANDLE = MOVE_HUB_HARDWARE_HANDLE
|
||||
|
||||
@ -103,7 +97,7 @@ class GeneralTest(unittest.TestCase):
|
||||
|
||||
self._wait_notifications_handled(hub)
|
||||
hub.tilt_sensor.unsubscribe(callback)
|
||||
# self.assertEquals("0a01413a000100000001", hub.connection.writes[0][1])
|
||||
# TODO: assert
|
||||
|
||||
def test_motor(self):
|
||||
conn = ConnectionMock()
|
||||
@ -132,3 +126,21 @@ class GeneralTest(unittest.TestCase):
|
||||
hub = MoveHub(conn)
|
||||
# demo_all(hub)
|
||||
self._wait_notifications_handled(hub)
|
||||
|
||||
def test_color_sensor(self):
|
||||
#
|
||||
hub = HubMock()
|
||||
hub.connection.notifications.append((HANDLE, '1b0e000f0004010125000000001000000010'))
|
||||
time.sleep(1)
|
||||
|
||||
def callback(color, unk1, unk2):
|
||||
name = COLORS[color] if color is not None else 'NONE'
|
||||
log.info("Color: %s %s %s", name, unk1, unk2)
|
||||
|
||||
hub.color_distance_sensor.subscribe(callback)
|
||||
|
||||
hub.connection.notifications.append((HANDLE, "1b0e0008004501ff0aff00"))
|
||||
time.sleep(1)
|
||||
# TODO: assert
|
||||
self._wait_notifications_handled(hub)
|
||||
hub.color_distance_sensor.unsubscribe(callback)
|
||||
|
Loading…
x
Reference in New Issue
Block a user