mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Motor subscription
This commit is contained in:
parent
88dd8d8f06
commit
885779ef4c
30
demo.py
30
demo.py
@ -30,7 +30,7 @@ def demo_tilt_sensor_precise(movehub):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw)
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback, mode=TILT_SENSOR_MODE_FULL)
|
||||
movehub.tilt_sensor.subscribe(callback, mode=TILT_MODE_FULL)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
@ -128,20 +128,28 @@ def demo_color_sensor(movehub):
|
||||
|
||||
|
||||
def demo_motor_sensors(movehub):
|
||||
log.info("Motor rotation sensors test")
|
||||
demo_color_sensor.cnt = 0
|
||||
limit = 20
|
||||
log.info("Motor rotation sensors test. Rotate all available motors once")
|
||||
demo_motor_sensors.states = {
|
||||
movehub.motor_A: [None, None], # callback and last value
|
||||
movehub.motor_B: [None, None], # callback and last value
|
||||
}
|
||||
|
||||
def callback(color, distance=None):
|
||||
demo_color_sensor.cnt += 1
|
||||
color = COLORS[color] if color in COLORS else color
|
||||
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, color, distance)
|
||||
if movehub.external_motor is not None:
|
||||
demo_motor_sensors.states[movehub.external_motor] = [None, None]
|
||||
|
||||
movehub.color_distance_sensor.subscribe(callback)
|
||||
while demo_color_sensor.cnt < limit:
|
||||
def callback(mtr, param1):
|
||||
demo_motor_sensors.states[mtr][1] = param1
|
||||
log.info("%s", {x: demo_motor_sensors.states[x][1] for x in demo_motor_sensors.states})
|
||||
|
||||
for motor in demo_motor_sensors.states:
|
||||
demo_motor_sensors.states[motor][0] = lambda x: callback(motor, x)
|
||||
motor.subscribe(demo_motor_sensors.states[motor][0])
|
||||
|
||||
while None in [x[1] for x in demo_motor_sensors.states.values()]: # demo_motor_sensors.states < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.color_distance_sensor.unsubscribe(callback)
|
||||
for motor in demo_motor_sensors.states:
|
||||
motor.unsubscribe(demo_motor_sensors.states[motor][0])
|
||||
|
||||
|
||||
def demo_all(movehub):
|
||||
|
@ -167,7 +167,7 @@ class DebugServer(object):
|
||||
self.sock.close()
|
||||
|
||||
def _notify_dummy(self, handle, data):
|
||||
log.debug("Notification from handle %s: %s", handle, data)
|
||||
log.debug("Notification from handle %s: %s", handle, hex2str(data))
|
||||
self._check_shutdown(data)
|
||||
|
||||
def _notify(self, conn, handle, data):
|
||||
|
@ -41,7 +41,7 @@ MSG_PORT_CMD_ERROR = 0x05
|
||||
MSG_SET_PORT_VAL = 0x81
|
||||
MSG_PORT_STATUS = 0x82
|
||||
MSG_SENSOR_SUBSCRIBE = 0x41
|
||||
MSG_SENSOR_SOMETHING = 0x42
|
||||
MSG_SENSOR_SOMETHING1 = 0x42 # it is seen close to sensor subscribe commands. Subscription options? Initial value?
|
||||
MSG_SENSOR_DATA = 0x45
|
||||
MSG_SENSOR_SUBSCRIBE_ACK = 0x47
|
||||
|
||||
@ -71,11 +71,11 @@ STATUS_CONFLICT = 0x05
|
||||
STATUS_FINISHED = 0x0a
|
||||
|
||||
# TILT SENSOR
|
||||
TILT_SENSOR_MODE_2AXIS_FULL = 0x00
|
||||
TILT_SENSOR_MODE_2AXIS_SIMPLE = 0x01
|
||||
TILT_SENSOR_MODE_BASIC = 0x02
|
||||
TILT_SENSOR_MODE_BUMP = 0x03
|
||||
TILT_SENSOR_MODE_FULL = 0x04
|
||||
TILT_MODE_2AXIS_FULL = 0x00
|
||||
TILT_MODE_2AXIS_SIMPLE = 0x01
|
||||
TILT_MODE_BASIC = 0x02
|
||||
TILT_MODE_BUMP = 0x03
|
||||
TILT_MODE_FULL = 0x04
|
||||
|
||||
TILT_HORIZONTAL = 0x00
|
||||
TILT_UP = 0x01
|
||||
@ -135,3 +135,8 @@ COLORS = {
|
||||
COLOR_RED: "RED",
|
||||
COLOR_WHITE: "WHITE"
|
||||
}
|
||||
|
||||
# MOTORS
|
||||
MOTOR_MODE_SOMETHING1 = 0x00
|
||||
MOTOR_MODE_SPEED = 0x01
|
||||
MOTOR_MODE_ANGLE = 0x02
|
||||
|
@ -23,7 +23,7 @@ class Peripheral(object):
|
||||
self.port = port
|
||||
self.working = False
|
||||
self._subscribers = set()
|
||||
self.port_subscription_mode = None
|
||||
self._port_subscription_mode = None
|
||||
|
||||
def __repr__(self):
|
||||
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A')
|
||||
@ -47,8 +47,8 @@ class Peripheral(object):
|
||||
self.working = False
|
||||
|
||||
def subscribe(self, callback, mode, granularity=1):
|
||||
self.port_subscription_mode = mode
|
||||
self._port_subscribe(self.port_subscription_mode, granularity, True)
|
||||
self._port_subscription_mode = mode
|
||||
self._port_subscribe(self._port_subscription_mode, granularity, True)
|
||||
if callback:
|
||||
self._subscribers.add(callback)
|
||||
|
||||
@ -57,8 +57,8 @@ class Peripheral(object):
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
self._port_subscribe(self.port_subscription_mode, 0, False)
|
||||
self.port_subscription_mode = None
|
||||
self._port_subscribe(self._port_subscription_mode, 0, False)
|
||||
self._port_subscription_mode = None
|
||||
|
||||
def _notify_subscribers(self, *args, **kwargs):
|
||||
for subscriber in self._subscribers:
|
||||
@ -153,36 +153,53 @@ class EncodedMotor(Peripheral):
|
||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||
# TODO: how to tell when motor has stopped?
|
||||
|
||||
def handle_port_data(self, data):
|
||||
if self._port_subscription_mode == MOTOR_MODE_ANGLE:
|
||||
rotation = unpack("<l", data[4:8])[0]
|
||||
self._notify_subscribers(rotation)
|
||||
elif self._port_subscription_mode == MOTOR_MODE_SOMETHING1:
|
||||
# TODO: understand what it means
|
||||
rotation = unpack("<B", data[4])[0]
|
||||
self._notify_subscribers(rotation)
|
||||
elif self._port_subscription_mode == MOTOR_MODE_SPEED:
|
||||
rotation = unpack("<b", data[4])[0]
|
||||
self._notify_subscribers(rotation)
|
||||
else:
|
||||
log.debug("Got motor sensor data while in unexpected mode: %s", self._port_subscription_mode)
|
||||
|
||||
def subscribe(self, callback, mode=MOTOR_MODE_ANGLE, granularity=1):
|
||||
super(EncodedMotor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
|
||||
class TiltSensor(Peripheral):
|
||||
def __init__(self, parent, port):
|
||||
super(TiltSensor, self).__init__(parent, port)
|
||||
|
||||
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC, granularity=1):
|
||||
def subscribe(self, callback, mode=TILT_MODE_BASIC, granularity=1):
|
||||
super(TiltSensor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
def handle_port_data(self, data):
|
||||
if self.port_subscription_mode == TILT_SENSOR_MODE_BASIC:
|
||||
if self._port_subscription_mode == TILT_MODE_BASIC:
|
||||
state = get_byte(data, 4)
|
||||
self._notify_subscribers(state)
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_2AXIS_SIMPLE:
|
||||
elif self._port_subscription_mode == TILT_MODE_2AXIS_SIMPLE:
|
||||
# TODO: figure out right interpreting of this
|
||||
state = get_byte(data, 4)
|
||||
self._notify_subscribers(state)
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_BUMP:
|
||||
elif self._port_subscription_mode == TILT_MODE_BUMP:
|
||||
bump_count = get_byte(data, 4)
|
||||
self._notify_subscribers(bump_count)
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_2AXIS_FULL:
|
||||
elif self._port_subscription_mode == TILT_MODE_2AXIS_FULL:
|
||||
roll = self._byte2deg(get_byte(data, 4))
|
||||
pitch = self._byte2deg(get_byte(data, 5))
|
||||
self._notify_subscribers(roll, pitch)
|
||||
elif self.port_subscription_mode == TILT_SENSOR_MODE_FULL:
|
||||
elif self._port_subscription_mode == TILT_MODE_FULL:
|
||||
roll = self._byte2deg(get_byte(data, 4))
|
||||
pitch = self._byte2deg(get_byte(data, 5))
|
||||
yaw = self._byte2deg(get_byte(data, 6)) # did I get the order right?
|
||||
self._notify_subscribers(roll, pitch, yaw)
|
||||
else:
|
||||
log.debug("Got tilt sensor data while in unexpected mode: %s", self.port_subscription_mode)
|
||||
log.debug("Got tilt sensor data while in unexpected mode: %s", self._port_subscription_mode)
|
||||
|
||||
def _byte2deg(self, val):
|
||||
if val > 90:
|
||||
@ -199,41 +216,41 @@ class ColorDistanceSensor(Peripheral):
|
||||
super(ColorDistanceSensor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
def handle_port_data(self, data):
|
||||
if self.port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT:
|
||||
if self._port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT:
|
||||
color = get_byte(data, 4)
|
||||
distance = get_byte(data, 5)
|
||||
partial = get_byte(data, 7)
|
||||
if partial:
|
||||
distance += 1.0 / partial
|
||||
self._notify_subscribers(color if color != 0xFF else None, float(distance))
|
||||
elif self.port_subscription_mode == CDS_MODE_COLOR_ONLY:
|
||||
elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY:
|
||||
color = get_byte(data, 4)
|
||||
self._notify_subscribers(color if color != 0xFF else None)
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
|
||||
elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE:
|
||||
elif self._port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
|
||||
elif self._port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(distance)
|
||||
elif self.port_subscription_mode == CDS_MODE_OFF1 or self.port_subscription_mode == CDS_MODE_OFF2:
|
||||
elif self._port_subscription_mode == CDS_MODE_OFF1 or self._port_subscription_mode == CDS_MODE_OFF2:
|
||||
log.info("Turned off led on %s", self)
|
||||
elif self.port_subscription_mode == CDS_MODE_COUNT_2INCH:
|
||||
elif self._port_subscription_mode == CDS_MODE_COUNT_2INCH:
|
||||
count = unpack("<L", data[4:8])[0] # is it all 4 bytes or just 2?
|
||||
self._notify_subscribers(count)
|
||||
elif self.port_subscription_mode == CDS_MODE_STREAM_3_VALUES:
|
||||
elif self._port_subscription_mode == CDS_MODE_STREAM_3_VALUES:
|
||||
# TODO: understand better meaning of these 3 values
|
||||
val1 = unpack("<H", data[4:6])[0]
|
||||
val2 = unpack("<H", data[6:8])[0]
|
||||
val3 = unpack("<H", data[8:10])[0]
|
||||
self._notify_subscribers(val1, val2, val3)
|
||||
elif self.port_subscription_mode == CDS_MODE_LUMINOSITY:
|
||||
elif self._port_subscription_mode == CDS_MODE_LUMINOSITY:
|
||||
luminosity = unpack("<H", data[4:6])[0]
|
||||
self._notify_subscribers(luminosity)
|
||||
else: # TODO: support whatever we forgot
|
||||
log.debug("Unhandled data in mode %s: %s", self.port_subscription_mode, str2hex(data))
|
||||
log.debug("Unhandled data in mode %s: %s", self._port_subscription_mode, str2hex(data))
|
||||
|
||||
|
||||
class Button(Peripheral):
|
||||
|
4
tests.py
4
tests.py
@ -86,12 +86,12 @@ class GeneralTest(unittest.TestCase):
|
||||
hub.connection.notifications.append((HANDLE, "1b0e000500453a05"))
|
||||
hub.connection.notifications.append((HANDLE, "1b0e000a00473a010100000001"))
|
||||
time.sleep(1)
|
||||
hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_2AXIS_SIMPLE)
|
||||
hub.tilt_sensor.subscribe(callback, TILT_MODE_2AXIS_SIMPLE)
|
||||
|
||||
hub.connection.notifications.append((HANDLE, "1b0e000500453a09"))
|
||||
time.sleep(1)
|
||||
|
||||
hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_2AXIS_FULL)
|
||||
hub.tilt_sensor.subscribe(callback, TILT_MODE_2AXIS_FULL)
|
||||
hub.connection.notifications.append((HANDLE, "1b0e000600453a04fe"))
|
||||
time.sleep(1)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user