mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
286 lines
10 KiB
Python
286 lines
10 KiB
Python
import logging
|
|
import time
|
|
from struct import pack, unpack
|
|
|
|
from pylgbst import get_byte, str2hex
|
|
from pylgbst.constants import *
|
|
|
|
log = logging.getLogger('peripherals')
|
|
|
|
|
|
class Peripheral(object):
|
|
"""
|
|
:type parent: MoveHub
|
|
"""
|
|
|
|
def __init__(self, parent, port):
|
|
"""
|
|
:type parent: pylgbst.MoveHub
|
|
:type port: int
|
|
"""
|
|
super(Peripheral, self).__init__()
|
|
self.parent = parent
|
|
self.port = port
|
|
self._working = False
|
|
self._subscribers = set()
|
|
self._port_subscription_mode = None
|
|
|
|
def __repr__(self):
|
|
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A')
|
|
|
|
def _write_to_hub(self, msg_type, params):
|
|
cmd = pack("<B", PACKET_VER) + pack("<B", msg_type) + pack("<B", self.port)
|
|
cmd += params
|
|
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
|
|
|
|
def _port_subscribe(self, mode, granularity, enable):
|
|
params = pack("<B", mode)
|
|
params += pack("<H", granularity)
|
|
params += b'\x00\x00' # maybe also bytes of granularity
|
|
params += pack("<?", bool(enable))
|
|
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
|
|
|
def started(self):
|
|
log.debug("Started: %s", self)
|
|
self._working = True
|
|
|
|
def finished(self):
|
|
log.debug("Finished: %s", self)
|
|
self._working = False
|
|
|
|
def is_working(self):
|
|
return bool(self._working)
|
|
|
|
def subscribe(self, callback, mode, granularity=1):
|
|
self._port_subscription_mode = mode
|
|
self._port_subscribe(self._port_subscription_mode, granularity, True)
|
|
if callback:
|
|
self._subscribers.add(callback)
|
|
|
|
def unsubscribe(self, callback=None):
|
|
if callback in self._subscribers:
|
|
self._subscribers.remove(callback)
|
|
|
|
if not self._port_subscription_mode:
|
|
log.warning("Attempt to unsubscribe while never subscribed: %s", self)
|
|
elif not self._subscribers:
|
|
self._port_subscribe(self._port_subscription_mode, 0, False)
|
|
self._port_subscription_mode = None
|
|
|
|
def _notify_subscribers(self, *args, **kwargs):
|
|
for subscriber in self._subscribers:
|
|
subscriber(*args, **kwargs)
|
|
|
|
def handle_port_data(self, data):
|
|
log.warning("Unhandled device notification for %s: %s", self, str2hex(data[4:]))
|
|
self._notify_subscribers(data[4:])
|
|
|
|
|
|
class LED(Peripheral):
|
|
SOMETHING = b'\x51\x00'
|
|
|
|
def set_color(self, color, do_notify=True):
|
|
if color == COLOR_NONE:
|
|
color = COLOR_BLACK
|
|
|
|
if color not in COLORS:
|
|
raise ValueError("Color %s is not in list of available colors" % color)
|
|
|
|
cmd = pack("<?", do_notify) + self.SOMETHING + pack("<B", color)
|
|
self.started()
|
|
self._write_to_hub(MSG_SET_PORT_VAL, cmd)
|
|
|
|
def finished(self):
|
|
super(LED, self).finished()
|
|
log.debug("LED has changed color")
|
|
self._notify_subscribers()
|
|
|
|
def subscribe(self, callback, mode=None, granularity=None):
|
|
self._subscribers.add(callback)
|
|
|
|
def unsubscribe(self, callback=None):
|
|
self._subscribers.add(callback)
|
|
|
|
|
|
class EncodedMotor(Peripheral):
|
|
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
|
|
MOVEMENT_TYPE = b'\x11'
|
|
TIMED_SINGLE = b'\x09'
|
|
TIMED_GROUP = b'\x0A'
|
|
ANGLED_SINGLE = b'\x0B'
|
|
ANGLED_GROUP = b'\x0C'
|
|
|
|
def _speed_abs(self, relative):
|
|
if relative < -1:
|
|
log.warning("Speed cannot be less than -1")
|
|
relative = -1
|
|
|
|
if relative > 1:
|
|
log.warning("Speed cannot be more than 1")
|
|
relative = 1
|
|
|
|
absolute = round(relative * 100)
|
|
if absolute < 0:
|
|
absolute += 255
|
|
return int(absolute)
|
|
|
|
def _wrap_and_write(self, command, speed_primary, speed_secondary):
|
|
# set for port
|
|
command = self.MOVEMENT_TYPE + command
|
|
|
|
command += pack("<B", self._speed_abs(speed_primary))
|
|
if self.port == PORT_AB:
|
|
command += pack("<B", self._speed_abs(speed_secondary))
|
|
|
|
command += self.TRAILER
|
|
|
|
self._write_to_hub(MSG_SET_PORT_VAL, command)
|
|
|
|
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
|
if speed_secondary is None:
|
|
speed_secondary = speed_primary
|
|
|
|
# movement type
|
|
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
|
|
# time
|
|
msec = int(seconds * 1000)
|
|
if msec >= pow(2, 16):
|
|
raise ValueError("Too large value for seconds: %s", seconds)
|
|
command += pack('<H', msec)
|
|
|
|
self.started()
|
|
self._wrap_and_write(command, speed_primary, speed_secondary)
|
|
self.__wait_sync(async)
|
|
|
|
def angled(self, angle, speed_primary=1, speed_secondary=None, async=False):
|
|
if speed_secondary is None:
|
|
speed_secondary = speed_primary
|
|
|
|
if angle < 0:
|
|
angle = -angle
|
|
speed_primary = -speed_primary
|
|
speed_secondary = -speed_secondary
|
|
|
|
# movement type
|
|
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
|
|
# angle
|
|
command += pack('<I', angle)
|
|
|
|
self.started()
|
|
self._wrap_and_write(command, speed_primary, speed_secondary)
|
|
self.__wait_sync(async)
|
|
|
|
def __wait_sync(self, async):
|
|
if not async:
|
|
log.debug("Waiting for sync command work to finish...")
|
|
while self.is_working():
|
|
time.sleep(0.5)
|
|
log.debug("Command has finished.")
|
|
|
|
def handle_port_data(self, data):
|
|
if self._port_subscription_mode == MOTOR_MODE_ANGLE:
|
|
rotation = unpack("<l", data[4:8])[0]
|
|
self._notify_subscribers(rotation)
|
|
elif self._port_subscription_mode == MOTOR_MODE_SOMETHING1:
|
|
# TODO: understand what it means
|
|
rotation = unpack("<B", data[4])[0]
|
|
self._notify_subscribers(rotation)
|
|
elif self._port_subscription_mode == MOTOR_MODE_SPEED:
|
|
rotation = unpack("<b", data[4])[0]
|
|
self._notify_subscribers(rotation)
|
|
else:
|
|
log.debug("Got motor sensor data while in unexpected mode: %s", self._port_subscription_mode)
|
|
|
|
def subscribe(self, callback, mode=MOTOR_MODE_ANGLE, granularity=1):
|
|
super(EncodedMotor, self).subscribe(callback, mode, granularity)
|
|
|
|
|
|
class TiltSensor(Peripheral):
|
|
def subscribe(self, callback, mode=TILT_MODE_BASIC, granularity=1):
|
|
super(TiltSensor, self).subscribe(callback, mode, granularity)
|
|
|
|
def handle_port_data(self, data):
|
|
if self._port_subscription_mode == TILT_MODE_BASIC:
|
|
state = get_byte(data, 4)
|
|
self._notify_subscribers(state)
|
|
elif self._port_subscription_mode == TILT_MODE_2AXIS_SIMPLE:
|
|
# TODO: figure out right interpreting of this
|
|
state = get_byte(data, 4)
|
|
self._notify_subscribers(state)
|
|
elif self._port_subscription_mode == TILT_MODE_BUMP:
|
|
bump_count = get_byte(data, 4)
|
|
self._notify_subscribers(bump_count)
|
|
elif self._port_subscription_mode == TILT_MODE_2AXIS_FULL:
|
|
roll = self._byte2deg(get_byte(data, 4))
|
|
pitch = self._byte2deg(get_byte(data, 5))
|
|
self._notify_subscribers(roll, pitch)
|
|
elif self._port_subscription_mode == TILT_MODE_FULL:
|
|
roll = self._byte2deg(get_byte(data, 4))
|
|
pitch = self._byte2deg(get_byte(data, 5))
|
|
yaw = self._byte2deg(get_byte(data, 6)) # did I get the order right?
|
|
self._notify_subscribers(roll, pitch, yaw)
|
|
else:
|
|
log.debug("Got tilt sensor data while in unexpected mode: %s", self._port_subscription_mode)
|
|
|
|
def _byte2deg(self, val):
|
|
if val > 90:
|
|
return val - 256
|
|
else:
|
|
return val
|
|
|
|
|
|
class ColorDistanceSensor(Peripheral):
|
|
def __init__(self, parent, port):
|
|
super(ColorDistanceSensor, self).__init__(parent, port)
|
|
|
|
def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_FLOAT, granularity=1):
|
|
super(ColorDistanceSensor, self).subscribe(callback, mode, granularity)
|
|
|
|
def handle_port_data(self, data):
|
|
if self._port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT:
|
|
color = get_byte(data, 4)
|
|
distance = get_byte(data, 5)
|
|
partial = get_byte(data, 7)
|
|
if partial:
|
|
distance += 1.0 / partial
|
|
self._notify_subscribers(color, float(distance))
|
|
elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY:
|
|
color = get_byte(data, 4)
|
|
self._notify_subscribers(color)
|
|
elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
|
|
distance = get_byte(data, 4)
|
|
self._notify_subscribers(distance)
|
|
elif self._port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE:
|
|
distance = get_byte(data, 4)
|
|
self._notify_subscribers(distance)
|
|
elif self._port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
|
|
distance = get_byte(data, 4)
|
|
self._notify_subscribers(distance)
|
|
elif self._port_subscription_mode == CDS_MODE_OFF1 or self._port_subscription_mode == CDS_MODE_OFF2:
|
|
log.info("Turned off led on %s", self)
|
|
elif self._port_subscription_mode == CDS_MODE_COUNT_2INCH:
|
|
count = unpack("<L", data[4:8])[0] # is it all 4 bytes or just 2?
|
|
self._notify_subscribers(count)
|
|
elif self._port_subscription_mode == CDS_MODE_STREAM_3_VALUES:
|
|
# TODO: understand better meaning of these 3 values
|
|
val1 = unpack("<H", data[4:6])[0]
|
|
val2 = unpack("<H", data[6:8])[0]
|
|
val3 = unpack("<H", data[8:10])[0]
|
|
self._notify_subscribers(val1, val2, val3)
|
|
elif self._port_subscription_mode == CDS_MODE_LUMINOSITY:
|
|
luminosity = unpack("<H", data[4:6])[0]
|
|
self._notify_subscribers(luminosity)
|
|
else: # TODO: support whatever we forgot
|
|
log.debug("Unhandled data in mode %s: %s", self._port_subscription_mode, str2hex(data))
|
|
|
|
|
|
class Battery(Peripheral):
|
|
# we know only voltage subscription from it. is it really battery or just onboard voltage?
|
|
def handle_port_data(self, data):
|
|
self._notify_subscribers(unpack("<H", data[4:6]))
|
|
|
|
|
|
class Button(Peripheral):
|
|
def __init__(self, parent):
|
|
super(Button, self).__init__(parent, 0)
|