mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Some refactorings
This commit is contained in:
parent
2819dd4c3c
commit
88dd8d8f06
31
demo.py
31
demo.py
@ -113,14 +113,31 @@ def vernie_head(movehub):
|
||||
def demo_color_sensor(movehub):
|
||||
log.info("Color sensor test: wave your hand in front of it")
|
||||
demo_color_sensor.cnt = 0
|
||||
limit = 2000
|
||||
limit = 20
|
||||
|
||||
def callback(color, distance=None, param=None):
|
||||
def callback(color, distance=None):
|
||||
demo_color_sensor.cnt += 1
|
||||
#color = COLORS[color] if color in COLORS else color
|
||||
log.info("#%s/%s: Color %s, distance %s, param %s", demo_color_sensor.cnt, limit, color, distance, param)
|
||||
color = COLORS[color] if color in COLORS else color
|
||||
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, color, distance)
|
||||
|
||||
movehub.color_distance_sensor.subscribe(callback, CDS_MODE_STREAM_3_VALUES, granularity=3)
|
||||
movehub.color_distance_sensor.subscribe(callback)
|
||||
while demo_color_sensor.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.color_distance_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_motor_sensors(movehub):
|
||||
log.info("Motor rotation sensors test")
|
||||
demo_color_sensor.cnt = 0
|
||||
limit = 20
|
||||
|
||||
def callback(color, distance=None):
|
||||
demo_color_sensor.cnt += 1
|
||||
color = COLORS[color] if color in COLORS else color
|
||||
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, color, distance)
|
||||
|
||||
movehub.color_distance_sensor.subscribe(callback)
|
||||
while demo_color_sensor.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
@ -135,6 +152,7 @@ def demo_all(movehub):
|
||||
demo_tilt_sensor_simple(movehub)
|
||||
demo_tilt_sensor_precise(movehub)
|
||||
demo_color_sensor(movehub)
|
||||
demo_motor_sensors(movehub)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@ -147,7 +165,8 @@ if __name__ == '__main__':
|
||||
connection = BLEConnection().connect()
|
||||
|
||||
hub = MoveHub(connection)
|
||||
demo_color_sensor(hub)
|
||||
|
||||
demo_motor_sensors(hub)
|
||||
|
||||
# demo_all(hub)
|
||||
|
||||
|
@ -86,6 +86,9 @@ class MoveHub(object):
|
||||
log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)])
|
||||
elif msg_type == MSG_PORT_CMD_ERROR:
|
||||
log.warning("Command error: %s", str2hex(data[3:]))
|
||||
elif msg_type == MSG_DEVICE_SHUTDOWN:
|
||||
log.warning("Device reported shutdown: %s", str2hex(data))
|
||||
raise KeyboardInterrupt("Device shutdown")
|
||||
else:
|
||||
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
|
||||
|
||||
@ -96,7 +99,7 @@ class MoveHub(object):
|
||||
return
|
||||
|
||||
device = self.devices[port]
|
||||
device.handle_sensor_data(data)
|
||||
device.handle_port_data(data)
|
||||
|
||||
def _handle_port_status(self, data):
|
||||
port = get_byte(data, 3)
|
||||
@ -120,17 +123,17 @@ class MoveHub(object):
|
||||
else:
|
||||
log.warning("Device 0x%x at port 0x%x", dev_type, port)
|
||||
|
||||
if dev_type == TYPE_MOTOR:
|
||||
if dev_type == DEV_MOTOR:
|
||||
self.devices[port] = EncodedMotor(self, port)
|
||||
elif dev_type == TYPE_IMOTOR:
|
||||
elif dev_type == DEV_IMOTOR:
|
||||
self.external_motor = EncodedMotor(self, port)
|
||||
self.devices[port] = self.external_motor
|
||||
elif dev_type == TYPE_DISTANCE_COLOR_SENSOR:
|
||||
elif dev_type == DEV_DCS:
|
||||
self.color_distance_sensor = ColorDistanceSensor(self, port)
|
||||
self.devices[port] = self.color_distance_sensor
|
||||
elif dev_type == TYPE_LED:
|
||||
elif dev_type == DEV_LED:
|
||||
self.devices[port] = LED(self, port)
|
||||
elif dev_type == TYPE_TILT_SENSOR:
|
||||
elif dev_type == DEV_TILT_SENSOR:
|
||||
self.devices[port] = TiltSensor(self, port)
|
||||
else:
|
||||
log.warning("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
|
||||
|
@ -11,11 +11,10 @@ from abc import abstractmethod
|
||||
from gattlib import DiscoveryService, GATTRequester
|
||||
from threading import Thread
|
||||
|
||||
from pylgbst.constants import LEGO_MOVE_HUB
|
||||
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
|
||||
|
||||
log = logging.getLogger('transport')
|
||||
|
||||
# could use `six` here, but just for 1 function
|
||||
if sys.version_info[0] == 2:
|
||||
def str2hex(data):
|
||||
return data.encode("hex")
|
||||
@ -27,10 +26,6 @@ if sys.version_info[0] == 2:
|
||||
|
||||
def get_byte(seq, index):
|
||||
return ord(seq[index])
|
||||
|
||||
|
||||
def int2byte(val):
|
||||
return chr(val)
|
||||
else:
|
||||
import binascii
|
||||
|
||||
@ -47,10 +42,6 @@ else:
|
||||
return seq[index]
|
||||
|
||||
|
||||
def int2byte(val):
|
||||
return bytes((val,))
|
||||
|
||||
|
||||
# noinspection PyMethodOverriding
|
||||
class Requester(GATTRequester):
|
||||
"""
|
||||
@ -147,6 +138,7 @@ class DebugServer(object):
|
||||
"""
|
||||
|
||||
def __init__(self, ble_trans):
|
||||
self._running = False
|
||||
self.sock = socket.socket()
|
||||
self.ble = ble_trans
|
||||
|
||||
@ -154,9 +146,12 @@ class DebugServer(object):
|
||||
self.sock.bind(('', port))
|
||||
self.sock.listen(1)
|
||||
|
||||
while True:
|
||||
self._running = True
|
||||
while self._running:
|
||||
log.info("Accepting connections at %s", port)
|
||||
conn, addr = self.sock.accept()
|
||||
if not self._running:
|
||||
raise KeyboardInterrupt("Shutdown")
|
||||
self.ble.requester.notification_sink = lambda x, y: self._notify(conn, x, y)
|
||||
try:
|
||||
self._handle_conn(conn)
|
||||
@ -165,12 +160,16 @@ class DebugServer(object):
|
||||
except BaseException:
|
||||
log.error("Problem handling incoming connection: %s", traceback.format_exc())
|
||||
finally:
|
||||
self.ble.requester.notification_sink = None
|
||||
self.ble.requester.notification_sink = self._notify_dummy
|
||||
conn.close()
|
||||
|
||||
def __del__(self):
|
||||
self.sock.close()
|
||||
|
||||
def _notify_dummy(self, handle, data):
|
||||
log.debug("Notification from handle %s: %s", handle, data)
|
||||
self._check_shutdown(data)
|
||||
|
||||
def _notify(self, conn, handle, data):
|
||||
payload = {"type": "notification", "handle": handle, "data": str2hex(data)}
|
||||
log.debug("Send notification: %s", payload)
|
||||
@ -181,6 +180,13 @@ class DebugServer(object):
|
||||
except BaseException:
|
||||
log.error("Problem sending notification: %s", traceback.format_exc())
|
||||
|
||||
self._check_shutdown(data)
|
||||
|
||||
def _check_shutdown(self, data):
|
||||
if get_byte(data, 5) == MSG_DEVICE_SHUTDOWN:
|
||||
log.warning("Device shutdown")
|
||||
self._running = False
|
||||
|
||||
def _handle_conn(self, conn):
|
||||
"""
|
||||
:type conn: socket._socketobject
|
||||
|
@ -9,32 +9,6 @@ ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
|
||||
|
||||
PACKET_VER = 0x01
|
||||
|
||||
# COLORS
|
||||
COLOR_BLACK = 0x00
|
||||
COLOR_PINK = 0x01
|
||||
COLOR_PURPLE = 0x02
|
||||
COLOR_BLUE = 0x03
|
||||
COLOR_LIGHTBLUE = 0x04
|
||||
COLOR_CYAN = 0x05
|
||||
COLOR_GREEN = 0x06
|
||||
COLOR_YELLOW = 0x07
|
||||
COLOR_ORANGE = 0x09
|
||||
COLOR_RED = 0x09
|
||||
COLOR_WHITE = 0x0a
|
||||
COLORS = {
|
||||
COLOR_BLACK: "BLACK",
|
||||
COLOR_PINK: "PINK",
|
||||
COLOR_PURPLE: "PURPLE",
|
||||
COLOR_BLUE: "BLUE",
|
||||
COLOR_LIGHTBLUE: "LIGHTBLUE",
|
||||
COLOR_CYAN: "CYAN",
|
||||
COLOR_GREEN: "GREEN",
|
||||
COLOR_YELLOW: "YELLOW",
|
||||
COLOR_ORANGE: "ORANGE",
|
||||
COLOR_RED: "RED",
|
||||
COLOR_WHITE: "WHITE"
|
||||
}
|
||||
|
||||
# PORTS
|
||||
PORT_C = 0x01
|
||||
PORT_D = 0x02
|
||||
@ -60,6 +34,7 @@ PORTS = {
|
||||
|
||||
# PACKET TYPES
|
||||
MSG_DEVICE_INFO = 0x01
|
||||
MSG_DEVICE_SHUTDOWN = 0x02
|
||||
MSG_PING_RESPONSE = 0x03
|
||||
MSG_PORT_INFO = 0x04
|
||||
MSG_PORT_CMD_ERROR = 0x05
|
||||
@ -70,31 +45,32 @@ MSG_SENSOR_SOMETHING = 0x42
|
||||
MSG_SENSOR_DATA = 0x45
|
||||
MSG_SENSOR_SUBSCRIBE_ACK = 0x47
|
||||
|
||||
# NOTIFICATIONS
|
||||
TYPE_DISTANCE_COLOR_SENSOR = 0x25
|
||||
TYPE_IMOTOR = 0x26
|
||||
TYPE_MOTOR = 0x27
|
||||
TYPE_TILT_SENSOR = 0x28
|
||||
TYPE_LED = 0x17
|
||||
# one of them is button? another is battery?
|
||||
TYPE_SOMETHING1 = 0x15
|
||||
TYPE_SOMETHING2 = 0x14
|
||||
# DEVICE TYPES
|
||||
DEV_UNKNOWN1 = 0x15 # one of them is button?
|
||||
DEV_UNKNOWN2 = 0x14 # another is battery?
|
||||
|
||||
DEV_DCS = 0x25
|
||||
DEV_IMOTOR = 0x26
|
||||
DEV_MOTOR = 0x27
|
||||
DEV_TILT_SENSOR = 0x28
|
||||
DEV_LED = 0x17
|
||||
|
||||
DEVICE_TYPES = {
|
||||
TYPE_DISTANCE_COLOR_SENSOR: "DISTANCE_COLOR_SENSOR",
|
||||
TYPE_IMOTOR: "IMOTOR",
|
||||
TYPE_MOTOR: "MOTOR",
|
||||
TYPE_TILT_SENSOR: "TILT_SENSOR",
|
||||
TYPE_LED: "LED",
|
||||
TYPE_SOMETHING1: "UNK1",
|
||||
TYPE_SOMETHING2: "UNK2",
|
||||
DEV_DCS: "DISTANCE_COLOR_SENSOR",
|
||||
DEV_IMOTOR: "IMOTOR",
|
||||
DEV_MOTOR: "MOTOR",
|
||||
DEV_TILT_SENSOR: "TILT_SENSOR",
|
||||
DEV_LED: "LED",
|
||||
DEV_UNKNOWN1: "UNKNOWN #1",
|
||||
DEV_UNKNOWN2: "UNKNOWN #2",
|
||||
}
|
||||
|
||||
# NOTIFICATIONS
|
||||
STATUS_STARTED = 0x01
|
||||
STATUS_CONFLICT = 0x05
|
||||
STATUS_FINISHED = 0x0a
|
||||
|
||||
# TILT
|
||||
# TILT SENSOR
|
||||
TILT_SENSOR_MODE_2AXIS_FULL = 0x00
|
||||
TILT_SENSOR_MODE_2AXIS_SIMPLE = 0x01
|
||||
TILT_SENSOR_MODE_BASIC = 0x02
|
||||
@ -130,6 +106,32 @@ CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE = 0x04
|
||||
CDS_MODE_OFF1 = 0x05
|
||||
CDS_MODE_STREAM_3_VALUES = 0x06
|
||||
CDS_MODE_OFF2 = 0x07
|
||||
CDS_MODE_COLOR_DISTANCE_INCHES_SUBINCHES = 0x08
|
||||
CDS_MODE_COLOR_DISTANCE_FLOAT = 0x08
|
||||
CDS_MODE_LUMINOSITY = 0x09
|
||||
CDS_MODE_SOME_20BYTES = 0x0a
|
||||
|
||||
# COLORS
|
||||
COLOR_BLACK = 0x00
|
||||
COLOR_PINK = 0x01
|
||||
COLOR_PURPLE = 0x02
|
||||
COLOR_BLUE = 0x03
|
||||
COLOR_LIGHTBLUE = 0x04
|
||||
COLOR_CYAN = 0x05
|
||||
COLOR_GREEN = 0x06
|
||||
COLOR_YELLOW = 0x07
|
||||
COLOR_ORANGE = 0x09
|
||||
COLOR_RED = 0x09
|
||||
COLOR_WHITE = 0x0a
|
||||
COLORS = {
|
||||
COLOR_BLACK: "BLACK",
|
||||
COLOR_PINK: "PINK",
|
||||
COLOR_PURPLE: "PURPLE",
|
||||
COLOR_BLUE: "BLUE",
|
||||
COLOR_LIGHTBLUE: "LIGHTBLUE",
|
||||
COLOR_CYAN: "CYAN",
|
||||
COLOR_GREEN: "GREEN",
|
||||
COLOR_YELLOW: "YELLOW",
|
||||
COLOR_ORANGE: "ORANGE",
|
||||
COLOR_RED: "RED",
|
||||
COLOR_WHITE: "WHITE"
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
import logging
|
||||
import struct
|
||||
import time
|
||||
from struct import pack, unpack
|
||||
|
||||
from pylgbst import get_byte, int2byte, str2hex
|
||||
from pylgbst import get_byte, str2hex
|
||||
from pylgbst.constants import *
|
||||
|
||||
log = logging.getLogger('peripherals')
|
||||
@ -23,15 +23,22 @@ class Peripheral(object):
|
||||
self.port = port
|
||||
self.working = False
|
||||
self._subscribers = set()
|
||||
self.port_subscription_mode = None
|
||||
|
||||
def __repr__(self):
|
||||
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A')
|
||||
|
||||
def _write_to_hub(self, msg_type, params):
|
||||
cmd = int2byte(PACKET_VER) + int2byte(msg_type) + int2byte(self.port)
|
||||
cmd = pack("<B", PACKET_VER) + pack("<B", msg_type) + pack("<B", self.port)
|
||||
cmd += params
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE,
|
||||
int2byte(len(cmd) + 1) + cmd) # should we +1 cmd len here?
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
|
||||
|
||||
def _port_subscribe(self, mode, granularity, enable):
|
||||
params = pack("<B", mode)
|
||||
params += pack("<H", granularity)
|
||||
params += b'\x00\x00' # maybe also bytes of granularity
|
||||
params += pack("<?", bool(enable))
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
|
||||
def started(self):
|
||||
self.working = True
|
||||
@ -39,25 +46,48 @@ class Peripheral(object):
|
||||
def finished(self):
|
||||
self.working = False
|
||||
|
||||
def subscribe(self, callback, mode, granularity=1):
|
||||
self.port_subscription_mode = mode
|
||||
self._port_subscribe(self.port_subscription_mode, granularity, True)
|
||||
if callback:
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._port_subscribe(self.port_subscription_mode, 0, False)
|
||||
self.port_subscription_mode = None
|
||||
|
||||
def _notify_subscribers(self, *args, **kwargs):
|
||||
for subscriber in self._subscribers:
|
||||
subscriber(*args, **kwargs)
|
||||
|
||||
def handle_sensor_data(self, data):
|
||||
def handle_port_data(self, data):
|
||||
log.warning("Unhandled device notification for %s: %s", self, str2hex(data))
|
||||
|
||||
|
||||
class LED(Peripheral):
|
||||
def set_color(self, color):
|
||||
SOMETHING = b'\x51\x00'
|
||||
|
||||
def set_color(self, color, do_notify=True):
|
||||
if color not in COLORS:
|
||||
raise ValueError("Color %s is not in list of available colors" % color)
|
||||
|
||||
cmd = b'\x01\x51\x00' + int2byte(color)
|
||||
cmd = pack("<?", do_notify) + self.SOMETHING + pack("<B", color)
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, cmd)
|
||||
|
||||
def finished(self):
|
||||
super(LED, self).finished()
|
||||
log.debug("LED has changed color")
|
||||
self._notify_subscribers()
|
||||
|
||||
def subscribe(self, callback, mode=None, granularity=None):
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
self._subscribers.add(callback)
|
||||
|
||||
|
||||
class EncodedMotor(Peripheral):
|
||||
@ -86,9 +116,9 @@ class EncodedMotor(Peripheral):
|
||||
# set for port
|
||||
command = self.MOVEMENT_TYPE + command
|
||||
|
||||
command += int2byte(self._speed_abs(speed_primary))
|
||||
command += pack("<B", self._speed_abs(speed_primary))
|
||||
if self.port == PORT_AB:
|
||||
command += int2byte(self._speed_abs(speed_secondary))
|
||||
command += pack("<B", self._speed_abs(speed_secondary))
|
||||
|
||||
command += self.TRAILER
|
||||
|
||||
@ -104,7 +134,7 @@ class EncodedMotor(Peripheral):
|
||||
msec = int(seconds * 1000)
|
||||
if msec >= pow(2, 16):
|
||||
raise ValueError("Too large value for seconds: %s", seconds)
|
||||
command += struct.pack('<H', msec)
|
||||
command += pack('<H', msec)
|
||||
|
||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||
|
||||
@ -118,59 +148,41 @@ class EncodedMotor(Peripheral):
|
||||
# movement type
|
||||
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
|
||||
# angle
|
||||
command += struct.pack('<I', angle)
|
||||
command += pack('<I', angle)
|
||||
|
||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||
# TODO: how to tell when motor has stopped?
|
||||
|
||||
|
||||
class TiltSensor(Peripheral):
|
||||
TRAILER = b'\x00\x00\x00'
|
||||
|
||||
def __init__(self, parent, port):
|
||||
super(TiltSensor, self).__init__(parent, port)
|
||||
self.mode = None
|
||||
|
||||
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, granularity=1):
|
||||
self.mode = mode
|
||||
super(TiltSensor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
params = int2byte(self.mode)
|
||||
params += int2byte(granularity)
|
||||
params += self.TRAILER
|
||||
params += int2byte(1) # enable
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + b'\x00\x00\x00' + int2byte(0))
|
||||
self.mode = None
|
||||
|
||||
def handle_sensor_data(self, data):
|
||||
if self.mode == TILT_SENSOR_MODE_BASIC:
|
||||
def handle_port_data(self, data):
|
||||
if self.port_subscription_mode == TILT_SENSOR_MODE_BASIC:
|
||||
state = get_byte(data, 4)
|
||||
self._notify_subscribers(state)
|
||||
elif self.mode == TILT_SENSOR_MODE_2AXIS_SIMPLE:
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_2AXIS_SIMPLE:
|
||||
# TODO: figure out right interpreting of this
|
||||
state = get_byte(data, 4)
|
||||
self._notify_subscribers(state)
|
||||
elif self.mode == TILT_SENSOR_MODE_BUMP:
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_BUMP:
|
||||
bump_count = get_byte(data, 4)
|
||||
self._notify_subscribers(bump_count)
|
||||
elif self.mode == TILT_SENSOR_MODE_2AXIS_FULL:
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_2AXIS_FULL:
|
||||
roll = self._byte2deg(get_byte(data, 4))
|
||||
pitch = self._byte2deg(get_byte(data, 5))
|
||||
self._notify_subscribers(roll, pitch)
|
||||
elif self.mode == TILT_SENSOR_MODE_FULL:
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_FULL:
|
||||
roll = self._byte2deg(get_byte(data, 4))
|
||||
pitch = self._byte2deg(get_byte(data, 5))
|
||||
yaw = self._byte2deg(get_byte(data, 6)) # did I get the order right?
|
||||
self._notify_subscribers(roll, pitch, yaw)
|
||||
else:
|
||||
log.debug("Got tilt sensor data while in unexpected mode: %s", self.mode)
|
||||
log.debug("Got tilt sensor data while in unexpected mode: %s", self.port_subscription_mode)
|
||||
|
||||
def _byte2deg(self, val):
|
||||
if val > 90:
|
||||
@ -182,64 +194,46 @@ class TiltSensor(Peripheral):
|
||||
class ColorDistanceSensor(Peripheral):
|
||||
def __init__(self, parent, port):
|
||||
super(ColorDistanceSensor, self).__init__(parent, port)
|
||||
self.mode = None
|
||||
|
||||
def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_INCHES_SUBINCHES, granularity=1):
|
||||
self.mode = mode
|
||||
params = int2byte(mode)
|
||||
params += int2byte(granularity)
|
||||
params += b'\x00\x00\x00'
|
||||
params += int2byte(1) # enable
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
self._subscribers.add(callback)
|
||||
def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_FLOAT, granularity=1):
|
||||
super(ColorDistanceSensor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, int2byte(self.mode) + b'\x01\x00\x00\x00' + int2byte(0))
|
||||
self.mode = None
|
||||
|
||||
def handle_sensor_data(self, data):
|
||||
if self.mode == CDS_MODE_COLOR_DISTANCE_INCHES_SUBINCHES:
|
||||
def handle_port_data(self, data):
|
||||
if self.port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT:
|
||||
color = get_byte(data, 4)
|
||||
distance = get_byte(data, 5)
|
||||
partial = get_byte(data, 7)
|
||||
if partial:
|
||||
distance += 1.0 / partial
|
||||
self._notify_subscribers(color if color != 0xFF else None, float(distance))
|
||||
elif self.mode == CDS_MODE_COLOR_ONLY:
|
||||
elif self.port_subscription_mode == CDS_MODE_COLOR_ONLY:
|
||||
color = get_byte(data, 4)
|
||||
self._notify_subscribers(color if color != 0xFF else None)
|
||||
elif self.mode == CDS_MODE_DISTANCE_INCHES:
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(float(distance))
|
||||
elif self.mode == CDS_MODE_DISTANCE_HOW_CLOSE:
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(float(distance))
|
||||
elif self.mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(float(distance))
|
||||
elif self.mode == CDS_MODE_OFF1 or self.mode == CDS_MODE_OFF2:
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_OFF1 or self.port_subscription_mode == CDS_MODE_OFF2:
|
||||
log.info("Turned off led on %s", self)
|
||||
elif self.mode == CDS_MODE_COUNT_2INCH:
|
||||
count = struct.unpack("<L", data[4:8])[0] # is it all 4 bytes or just 2?
|
||||
elif self.port_subscription_mode == CDS_MODE_COUNT_2INCH:
|
||||
count = unpack("<L", data[4:8])[0] # is it all 4 bytes or just 2?
|
||||
self._notify_subscribers(count)
|
||||
elif self.mode == CDS_MODE_STREAM_3_VALUES:
|
||||
elif self.port_subscription_mode == CDS_MODE_STREAM_3_VALUES:
|
||||
# TODO: understand better meaning of these 3 values
|
||||
val1 = struct.unpack("<H", data[4:6])[0]
|
||||
val2 = struct.unpack("<H", data[6:8])[0]
|
||||
val3 = struct.unpack("<H", data[8:10])[0]
|
||||
val1 = unpack("<H", data[4:6])[0]
|
||||
val2 = unpack("<H", data[6:8])[0]
|
||||
val3 = unpack("<H", data[8:10])[0]
|
||||
self._notify_subscribers(val1, val2, val3)
|
||||
elif self.mode == CDS_MODE_LUMINOSITY:
|
||||
luminosity = struct.unpack("<H", data[4:6])[0]
|
||||
elif self.port_subscription_mode == CDS_MODE_LUMINOSITY:
|
||||
luminosity = unpack("<H", data[4:6])[0]
|
||||
self._notify_subscribers(luminosity)
|
||||
else: # TODO: support whatever we forgot
|
||||
log.warning("Unhandled data in mode %s: %s", self.mode, str2hex(data))
|
||||
|
||||
|
||||
# 0a00 41 01 01 enable
|
||||
log.debug("Unhandled data in mode %s: %s", self.port_subscription_mode, str2hex(data))
|
||||
|
||||
|
||||
class Button(Peripheral):
|
||||
|
Loading…
x
Reference in New Issue
Block a user