1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Refactoring

This commit is contained in:
Andrey Pohilko 2017-09-19 21:48:53 +03:00
parent a9a0386634
commit 06cfd9c419
12 changed files with 339 additions and 363 deletions

19
demo.py
View File

@ -1,7 +1,10 @@
# coding=utf-8
import traceback
from time import sleep
from pylgbst import *
from pylgbst.comms import DebugServerConnection
from pylgbst.movehub import MoveHub
log = logging.getLogger("demo")
@ -73,7 +76,7 @@ def demo_tilt_sensor_simple(movehub):
def callback(param1):
demo_tilt_sensor_simple.cnt += 1
log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TILT_STATES[param1])
log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TiltSensor.TILT_STATES[param1])
movehub.tilt_sensor.subscribe(callback)
while demo_tilt_sensor_simple.cnt < limit:
@ -91,7 +94,7 @@ def demo_tilt_sensor_precise(movehub):
demo_tilt_sensor_simple.cnt += 1
log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw)
movehub.tilt_sensor.subscribe(callback, mode=TILT_MODE_FULL)
movehub.tilt_sensor.subscribe(callback, mode=TiltSensor.MODE_FULL)
while demo_tilt_sensor_simple.cnt < limit:
time.sleep(1)
@ -169,14 +172,4 @@ if __name__ == '__main__':
connection = BLEConnection().connect()
hub = MoveHub(connection)
mtr = hub.motor_A
# mtr.timed(5, async=True)
# time.sleep(1)
# mtr.stop()
mtr.test()
time.sleep(3)
mtr.stop()
time.sleep(1)
demo_all(hub)

View File

@ -1,170 +1,2 @@
from pylgbst.comms import *
from pylgbst.constants import *
from pylgbst.movehub import *
from pylgbst.peripherals import *
log = logging.getLogger('movehub')
class MoveHub(object):
"""
:type connection: pylgbst.comms.Connection
:type devices: dict[int,Peripheral]
:type led: LED
:type tilt_sensor: TiltSensor
:type button: Button
:type battery: Battery
:type color_distance_sensor: pylgbst.peripherals.ColorDistanceSensor
:type motor_external: EncodedMotor
:type port_C: Peripheral
:type port_D: Peripheral
:type motor_A: EncodedMotor
:type motor_B: EncodedMotor
:type motor_AB: EncodedMotor
"""
def __init__(self, connection=None):
if not connection:
connection = BLEConnection()
self.connection = connection
self.devices = {}
# shorthand fields
self.button = Button(self)
self.led = None
self.battery = None
self.motor_A = None
self.motor_B = None
self.motor_AB = None
self.tilt_sensor = None
self.color_distance_sensor = None
self.motor_external = None
self.port_C = None
self.port_D = None
self.connection.set_notify_handler(self._notify)
self._wait_for_devices()
def _wait_for_devices(self):
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
builtin_devices = ()
for num in range(0, 60):
builtin_devices = (self.led, self.motor_A, self.motor_B, self.motor_AB, self.tilt_sensor, self.button)
if None not in builtin_devices:
return
log.debug("Waiting for builtin devices to appear: %s", builtin_devices)
time.sleep(1)
log.warning("Got only these devices: %s", builtin_devices)
raise RuntimeError("Failed to obtain all builtin devices")
def _notify(self, handle, data):
orig = data
if handle != MOVE_HUB_HARDWARE_HANDLE:
log.warning("Unsupported notification handle: 0x%s", handle)
return
data = data[3:]
log.debug("Notification on %s: %s", handle, str2hex(orig))
msg_type = get_byte(data, 2)
if msg_type == MSG_PORT_INFO:
self._handle_port_info(data)
elif msg_type == MSG_PORT_STATUS:
self._handle_port_status(data)
elif msg_type == MSG_SENSOR_DATA:
self._handle_sensor_data(data)
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
port = get_byte(data, 3)
log.debug("Sensor subscribe ack on port %s", PORTS[port])
self.devices[port].finished()
elif msg_type == MSG_PORT_CMD_ERROR:
log.warning("Command error: %s", str2hex(data[3:]))
elif msg_type == MSG_DEVICE_SHUTDOWN:
log.warning("Device reported shutdown: %s", str2hex(data))
raise KeyboardInterrupt("Device shutdown")
elif msg_type == MSG_DEVICE_INFO:
self._handle_device_info(data)
else:
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
def _handle_device_info(self, data):
if get_byte(data, 3) == 2:
self.button.handle_port_data(data)
else:
log.warning("Unhandled device info: %s", str2hex(data))
def _handle_sensor_data(self, data):
port = get_byte(data, 3)
if port not in self.devices:
log.warning("Notification on port with no device: %s", PORTS[port])
return
device = self.devices[port]
device.queue_port_data(data)
def _handle_port_status(self, data):
port = get_byte(data, 3)
status = get_byte(data, 4)
if status == STATUS_STARTED:
self.devices[port].started()
elif status == STATUS_FINISHED:
self.devices[port].finished()
elif status == STATUS_CONFLICT:
log.warning("Command conflict on port %s", PORTS[port])
else:
log.warning("Unhandled status value: 0x%x", status)
def _handle_port_info(self, data):
port = get_byte(data, 3)
dev_type = get_byte(data, 5)
if port in PORTS and dev_type in DEVICE_TYPES:
log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port])
else:
log.warning("Device 0x%x at port 0x%x", dev_type, port)
if dev_type == DEV_MOTOR:
self.devices[port] = EncodedMotor(self, port)
elif dev_type == DEV_IMOTOR:
self.motor_external = EncodedMotor(self, port)
self.devices[port] = self.motor_external
elif dev_type == DEV_DCS:
self.color_distance_sensor = ColorDistanceSensor(self, port)
self.devices[port] = self.color_distance_sensor
elif dev_type == DEV_LED:
self.devices[port] = LED(self, port)
elif dev_type == DEV_TILT_SENSOR:
self.devices[port] = TiltSensor(self, port)
elif dev_type == DEV_BATTERY:
self.devices[port] = Battery(self, port)
else:
log.debug("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
self.devices[port] = Peripheral(self, port)
if port == PORT_A:
self.motor_A = self.devices[port]
elif port == PORT_B:
self.motor_B = self.devices[port]
elif port == PORT_AB:
self.motor_AB = self.devices[port]
elif port == PORT_C:
self.port_C = self.devices[port]
elif port == PORT_D:
self.port_D = self.devices[port]
elif port == PORT_LED:
self.led = self.devices[port]
elif port == PORT_TILT_SENSOR:
self.tilt_sensor = self.devices[port]
elif port == PORT_BATTERY:
self.battery = self.devices[port]
else:
log.debug("Unhandled port: %s", PORTS[port])
def shutdown(self):
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)

View File

@ -9,12 +9,12 @@ import sys
import traceback
from abc import abstractmethod
from binascii import unhexlify
from gattlib import DiscoveryService, GATTRequester
from threading import Thread
from gattlib import DiscoveryService, GATTRequester
from six.moves import queue
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
from pylgbst.constants import MSG_DEVICE_SHUTDOWN
log = logging.getLogger('comms')
@ -30,6 +30,8 @@ else:
def get_byte(seq, index):
return seq[index]
LEGO_MOVE_HUB = "LEGO Move Hub"
# noinspection PyMethodOverriding
class Requester(GATTRequester):
@ -174,7 +176,7 @@ class DebugServer(object):
self._check_shutdown(data)
def _check_shutdown(self, data):
if get_byte(data, 5) == MSG_DEVICE_SHUTDOWN:
if data[5] == MSG_DEVICE_SHUTDOWN:
log.warning("Device shutdown")
self._running = False

View File

@ -1,12 +1,7 @@
# GENERAL
LEGO_MOVE_HUB = "LEGO Move Hub"
DEVICE_NAME = 0x07
MOVE_HUB_HARDWARE_HANDLE = 0x0E
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
PACKET_VER = 0x01
# PORTS
@ -70,46 +65,6 @@ STATUS_STARTED = 0x01
STATUS_CONFLICT = 0x05
STATUS_FINISHED = 0x0a
# TILT SENSOR
TILT_MODE_2AXIS_FULL = 0x00
TILT_MODE_2AXIS_SIMPLE = 0x01
TILT_MODE_BASIC = 0x02
TILT_MODE_BUMP = 0x03
TILT_MODE_FULL = 0x04
TILT_HORIZONTAL = 0x00
TILT_UP = 0x01
TILT_DOWN = 0x02
TILT_RIGHT = 0x03
TILT_LEFT = 0x04
TILT_FRONT = 0x05
TILT_SOME1 = 0x07
TILT_SOME2 = 0x09
TILT_STATES = {
TILT_HORIZONTAL: "HORIZONTAL",
TILT_UP: "UP",
TILT_DOWN: "DOWN",
TILT_RIGHT: "RIGHT",
TILT_LEFT: "LEFT",
TILT_FRONT: "FRONT",
TILT_SOME1: "LEFT1",
TILT_SOME2: "SOME2",
}
# COLOR & DISTANCE SENSOR
CDS_MODE_COLOR_ONLY = 0x00
CDS_MODE_DISTANCE_INCHES = 0x01
CDS_MODE_COUNT_2INCH = 0x02
CDS_MODE_DISTANCE_HOW_CLOSE = 0x03
CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE = 0x04
CDS_MODE_OFF1 = 0x05
CDS_MODE_STREAM_3_VALUES = 0x06
CDS_MODE_OFF2 = 0x07
CDS_MODE_COLOR_DISTANCE_FLOAT = 0x08
CDS_MODE_LUMINOSITY = 0x09
CDS_MODE_SOME_20BYTES = 0x0a
# COLORS
COLOR_BLACK = 0x00
COLOR_PINK = 0x01
@ -137,8 +92,3 @@ COLORS = {
COLOR_WHITE: "WHITE",
COLOR_NONE: "NONE"
}
# MOTORS
MOTOR_MODE_SOMETHING1 = 0x00
MOTOR_MODE_SPEED = 0x01
MOTOR_MODE_ANGLE = 0x02

177
pylgbst/movehub.py Normal file
View File

@ -0,0 +1,177 @@
import logging
import time
from struct import pack
from pylgbst.comms import BLEConnection, str2hex, get_byte
from pylgbst.constants import *
from pylgbst.peripherals import Button, EncodedMotor, ColorDistanceSensor, LED, TiltSensor, Battery, Peripheral
log = logging.getLogger('movehub')
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
class MoveHub(object):
"""
:type connection: pylgbst.comms.Connection
:type devices: dict[int,Peripheral]
:type led: LED
:type tilt_sensor: TiltSensor
:type button: Button
:type battery: Battery
:type color_distance_sensor: pylgbst.peripherals.ColorDistanceSensor
:type motor_external: EncodedMotor
:type port_C: Peripheral
:type port_D: Peripheral
:type motor_A: EncodedMotor
:type motor_B: EncodedMotor
:type motor_AB: EncodedMotor
"""
def __init__(self, connection=None):
if not connection:
connection = BLEConnection()
self.connection = connection
self.devices = {}
# shorthand fields
self.button = Button(self)
self.led = None
self.battery = None
self.motor_A = None
self.motor_B = None
self.motor_AB = None
self.tilt_sensor = None
self.color_distance_sensor = None
self.motor_external = None
self.port_C = None
self.port_D = None
self.connection.set_notify_handler(self._notify)
self._wait_for_devices()
def _wait_for_devices(self):
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
builtin_devices = ()
for num in range(0, 60):
builtin_devices = (self.led, self.motor_A, self.motor_B, self.motor_AB, self.tilt_sensor, self.button)
if None not in builtin_devices:
return
log.debug("Waiting for builtin devices to appear: %s", builtin_devices)
time.sleep(1)
log.warning("Got only these devices: %s", builtin_devices)
raise RuntimeError("Failed to obtain all builtin devices")
def _notify(self, handle, data):
orig = data
if handle != MOVE_HUB_HARDWARE_HANDLE:
log.warning("Unsupported notification handle: 0x%s", handle)
return
data = data[3:]
log.debug("Notification on %s: %s", handle, str2hex(orig))
msg_type = get_byte(data, 2)
if msg_type == MSG_PORT_INFO:
self._handle_port_info(data)
elif msg_type == MSG_PORT_STATUS:
self._handle_port_status(data)
elif msg_type == MSG_SENSOR_DATA:
self._handle_sensor_data(data)
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
port = get_byte(data, 3)
log.debug("Sensor subscribe ack on port %s", PORTS[port])
self.devices[port].finished()
elif msg_type == MSG_PORT_CMD_ERROR:
log.warning("Command error: %s", str2hex(data[3:]))
elif msg_type == MSG_DEVICE_SHUTDOWN:
log.warning("Device reported shutdown: %s", str2hex(data))
raise KeyboardInterrupt("Device shutdown")
elif msg_type == MSG_DEVICE_INFO:
self._handle_device_info(data)
else:
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
def _handle_device_info(self, data):
if get_byte(data, 3) == 2:
self.button.handle_port_data(data)
else:
log.warning("Unhandled device info: %s", str2hex(data))
def _handle_sensor_data(self, data):
port = get_byte(data, 3)
if port not in self.devices:
log.warning("Notification on port with no device: %s", PORTS[port])
return
device = self.devices[port]
device.queue_port_data(data)
def _handle_port_status(self, data):
port = get_byte(data, 3)
status = get_byte(data, 4)
if status == STATUS_STARTED:
self.devices[port].started()
elif status == STATUS_FINISHED:
self.devices[port].finished()
elif status == STATUS_CONFLICT:
log.warning("Command conflict on port %s", PORTS[port])
else:
log.warning("Unhandled status value: 0x%x", status)
def _handle_port_info(self, data):
port = get_byte(data, 3)
dev_type = get_byte(data, 5)
if port in PORTS and dev_type in DEVICE_TYPES:
log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port])
else:
log.warning("Device 0x%x at port 0x%x", dev_type, port)
if dev_type == DEV_MOTOR:
self.devices[port] = EncodedMotor(self, port)
elif dev_type == DEV_IMOTOR:
self.motor_external = EncodedMotor(self, port)
self.devices[port] = self.motor_external
elif dev_type == DEV_DCS:
self.color_distance_sensor = ColorDistanceSensor(self, port)
self.devices[port] = self.color_distance_sensor
elif dev_type == DEV_LED:
self.devices[port] = LED(self, port)
elif dev_type == DEV_TILT_SENSOR:
self.devices[port] = TiltSensor(self, port)
elif dev_type == DEV_BATTERY:
self.devices[port] = Battery(self, port)
else:
log.debug("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
self.devices[port] = Peripheral(self, port)
if port == PORT_A:
self.motor_A = self.devices[port]
elif port == PORT_B:
self.motor_B = self.devices[port]
elif port == PORT_AB:
self.motor_AB = self.devices[port]
elif port == PORT_C:
self.port_C = self.devices[port]
elif port == PORT_D:
self.port_D = self.devices[port]
elif port == PORT_LED:
self.led = self.devices[port]
elif port == PORT_TILT_SENSOR:
self.tilt_sensor = self.devices[port]
elif port == PORT_BATTERY:
self.battery = self.devices[port]
else:
log.debug("Unhandled port: %s", PORTS[port])
def shutdown(self):
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)

View File

@ -3,9 +3,10 @@ import time
from struct import pack, unpack
from threading import Thread
# noinspection PyUnresolvedReferences
from six.moves import queue
from pylgbst import get_byte, str2hex
from pylgbst.comms import str2hex
from pylgbst.constants import *
log = logging.getLogger('peripherals')
@ -19,7 +20,7 @@ class Peripheral(object):
def __init__(self, parent, port):
"""
:type parent: pylgbst.MoveHub
:type parent: pylgbst.movehub.MoveHub
:type port: int
"""
super(Peripheral, self).__init__()
@ -28,6 +29,7 @@ class Peripheral(object):
self._working = False
self._subscribers = set()
self._port_subscription_mode = None
# noinspection PyUnresolvedReferences
self._incoming_port_data = queue.Queue()
thr = Thread(target=self._queue_reader)
thr.setDaemon(True)
@ -130,7 +132,7 @@ class LED(Peripheral):
log.debug("LED has changed color to %s", COLORS[self._last_color_set])
self._notify_subscribers(self._last_color_set)
def subscribe(self, callback, mode=None, granularity=None):
def subscribe(self, callback, mode=None, granularity=None, async=False):
self._subscribers.add(callback)
def unsubscribe(self, callback=None):
@ -140,16 +142,22 @@ class LED(Peripheral):
class EncodedMotor(Peripheral):
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
MOVEMENT_TYPE = b'\x11'
# trailer is not required for all movement types
MOVEMENT_TYPE = 0x11
CONSTANT_SINGLE = b'\x01'
CONSTANT_GROUP = b'\x02'
SOME_SINGLE = b'\x07'
SOME_GROUP = b'\x08'
TIMED_SINGLE = b'\x09'
TIMED_GROUP = b'\x0A'
ANGLED_SINGLE = b'\x0B'
ANGLED_GROUP = b'\x0C'
CONSTANT_SINGLE = 0x01
CONSTANT_GROUP = 0x02
SOME_SINGLE = 0x07
SOME_GROUP = 0x08
TIMED_SINGLE = 0x09
TIMED_GROUP = 0x0A
ANGLED_SINGLE = 0x0B
ANGLED_GROUP = 0x0C
# MOTORS
MOTOR_MODE_SOMETHING1 = 0x00
MOTOR_MODE_SPEED = 0x01
MOTOR_MODE_ANGLE = 0x02
def _speed_abs(self, relative):
if relative < -1:
@ -165,29 +173,28 @@ class EncodedMotor(Peripheral):
absolute += 255
return int(absolute)
def _wrap_and_write(self, command, speed_primary, speed_secondary):
# set for port
command = self.MOVEMENT_TYPE + command
command += pack("<B", self._speed_abs(speed_primary))
def _wrap_and_write(self, mtype, params, speed_primary, speed_secondary):
if self.port == PORT_AB:
command += pack("<B", self._speed_abs(speed_secondary))
mtype += 1 # de-facto rule
command += self.TRAILER
params = pack("<B", self.MOVEMENT_TYPE) + pack("<B", mtype) + params
self._write_to_hub(MSG_SET_PORT_VAL, command)
params += pack("<B", self._speed_abs(speed_primary))
if self.port == PORT_AB:
params += pack("<B", self._speed_abs(speed_secondary))
params += self.TRAILER
self._write_to_hub(MSG_SET_PORT_VAL, params)
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
if speed_secondary is None:
speed_secondary = speed_primary
# movement type
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
# time
command += pack('<H', int(seconds * 1000))
params = pack('<H', int(seconds * 1000))
self.started()
self._wrap_and_write(command, speed_primary, speed_secondary)
self._wrap_and_write(self.TIMED_SINGLE, params, speed_primary, speed_secondary)
self._wait_sync(async)
def angled(self, angle, speed_primary=1, speed_secondary=None, async=False):
@ -199,107 +206,98 @@ class EncodedMotor(Peripheral):
speed_primary = -speed_primary
speed_secondary = -speed_secondary
# movement type
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
# angle
command += pack('<I', angle)
params = pack('<I', angle)
self.started()
self._wrap_and_write(command, speed_primary, speed_secondary)
self._wrap_and_write(self.ANGLED_SINGLE, params, speed_primary, speed_secondary)
self._wait_sync(async)
def constant(self, speed_primary=1, speed_secondary=None, async=False):
if speed_secondary is None:
speed_secondary = speed_primary
# movement type
command = self.CONSTANT_GROUP if self.port == PORT_AB else self.CONSTANT_SINGLE
self.started()
self._wrap_and_write(command, speed_primary, speed_secondary)
self._wrap_and_write(self.CONSTANT_SINGLE, b"", speed_primary, speed_secondary)
self._wait_sync(async)
def __some(self, speed_primary=1, speed_secondary=None, async=False):
if speed_secondary is None:
speed_secondary = speed_primary
# movement type
command = self.SOME_GROUP if self.port == PORT_AB else self.SOME_SINGLE
self.started()
self._wrap_and_write(command, speed_primary, speed_secondary)
self._wrap_and_write(self.SOME_SINGLE, b"", speed_primary, speed_secondary)
self._wait_sync(async)
def stop(self):
self.constant(0, async=True)
def __test(self, speed_primary=1, speed_secondary=None):
if speed_secondary is None:
speed_secondary = speed_primary
self.started()
# self._wrap_and_write(command, 0.2, 0.2)
# set for port
command = self.MOVEMENT_TYPE + b"\x07"
command += pack('<H', self._speed_abs(0.2))
command += pack('<H', 1000)
# command += pack('<B', self._speed_abs(speed_primary)) # time or angle - first param
# command += pack('<B', self._speed_abs(speed_secondary)) # time or angle - first param
speed_primary = 0.5
speed_secondary = -0.5
# command += pack("<B", self._speed_abs(speed_primary))
# if self.port == PORT_AB:
# command += pack("<B", self._speed_abs(speed_secondary))
# command += self.TRAILER
self._write_to_hub(MSG_SET_PORT_VAL, command)
def handle_port_data(self, data):
if self._port_subscription_mode == MOTOR_MODE_ANGLE:
if self._port_subscription_mode == self.MOTOR_MODE_ANGLE:
rotation = unpack("<l", data[4:8])[0]
self._notify_subscribers(rotation)
elif self._port_subscription_mode == MOTOR_MODE_SOMETHING1:
elif self._port_subscription_mode == self.MOTOR_MODE_SOMETHING1:
# TODO: understand what it means
rotation = unpack("<B", data[4])[0]
self._notify_subscribers(rotation)
elif self._port_subscription_mode == MOTOR_MODE_SPEED:
elif self._port_subscription_mode == self.MOTOR_MODE_SPEED:
rotation = unpack("<b", data[4])[0]
self._notify_subscribers(rotation)
else:
log.debug("Got motor sensor data while in unexpected mode: %s", self._port_subscription_mode)
def subscribe(self, callback, mode=MOTOR_MODE_ANGLE, granularity=1):
def subscribe(self, callback, mode=MOTOR_MODE_ANGLE, granularity=1, async=False):
super(EncodedMotor, self).subscribe(callback, mode, granularity)
class TiltSensor(Peripheral):
def subscribe(self, callback, mode=TILT_MODE_BASIC, granularity=1):
MODE_2AXIS_FULL = 0x00
MODE_2AXIS_SIMPLE = 0x01
MODE_BASIC = 0x02
MODE_BUMP = 0x03
MODE_FULL = 0x04
HORIZONTAL = 0x00
UP = 0x01
DOWN = 0x02
RIGHT = 0x03
LEFT = 0x04
FRONT = 0x05
SOME1 = 0x07 # TODO
SOME2 = 0x09 # TODO
TILT_STATES = {
HORIZONTAL: "HORIZONTAL",
UP: "UP",
DOWN: "DOWN",
RIGHT: "RIGHT",
LEFT: "LEFT",
FRONT: "FRONT",
SOME1: "LEFT1",
SOME2: "RIGHT1",
}
def subscribe(self, callback, mode=MODE_BASIC, granularity=1, async=False):
super(TiltSensor, self).subscribe(callback, mode, granularity)
def handle_port_data(self, data):
if self._port_subscription_mode == TILT_MODE_BASIC:
state = get_byte(data, 4)
if self._port_subscription_mode == self.MODE_BASIC:
state = data[4]
self._notify_subscribers(state)
elif self._port_subscription_mode == TILT_MODE_2AXIS_SIMPLE:
elif self._port_subscription_mode == self.MODE_2AXIS_SIMPLE:
# TODO: figure out right interpreting of this
state = get_byte(data, 4)
state = data[4]
self._notify_subscribers(state)
elif self._port_subscription_mode == TILT_MODE_BUMP:
bump_count = get_byte(data, 4)
elif self._port_subscription_mode == self.MODE_BUMP:
bump_count = data[4]
self._notify_subscribers(bump_count)
elif self._port_subscription_mode == TILT_MODE_2AXIS_FULL:
roll = self._byte2deg(get_byte(data, 4))
pitch = self._byte2deg(get_byte(data, 5))
elif self._port_subscription_mode == self.MODE_2AXIS_FULL:
roll = self._byte2deg(data[4])
pitch = self._byte2deg(data[5])
self._notify_subscribers(roll, pitch)
elif self._port_subscription_mode == TILT_MODE_FULL:
roll = self._byte2deg(get_byte(data, 4))
pitch = self._byte2deg(get_byte(data, 5))
yaw = self._byte2deg(get_byte(data, 6)) # did I get the order right?
elif self._port_subscription_mode == self.MODE_FULL:
roll = self._byte2deg(data[4])
pitch = self._byte2deg(data[5])
yaw = self._byte2deg(data[6]) # did I get the order right?
self._notify_subscribers(roll, pitch, yaw)
else:
log.debug("Got tilt sensor data while in unexpected mode: %s", self._port_subscription_mode)
@ -312,44 +310,56 @@ class TiltSensor(Peripheral):
class ColorDistanceSensor(Peripheral):
COLOR_ONLY = 0x00
DISTANCE_INCHES = 0x01
COUNT_2INCH = 0x02
DISTANCE_HOW_CLOSE = 0x03
DISTANCE_SUBINCH_HOW_CLOSE = 0x04
OFF1 = 0x05
STREAM_3_VALUES = 0x06
OFF2 = 0x07
COLOR_DISTANCE_FLOAT = 0x08
LUMINOSITY = 0x09
SOME_20BYTES = 0x0a
def __init__(self, parent, port):
super(ColorDistanceSensor, self).__init__(parent, port)
def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_FLOAT, granularity=1):
def subscribe(self, callback, mode=COLOR_DISTANCE_FLOAT, granularity=1, async=False):
super(ColorDistanceSensor, self).subscribe(callback, mode, granularity)
def handle_port_data(self, data):
if self._port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT:
color = get_byte(data, 4)
distance = get_byte(data, 5)
partial = get_byte(data, 7)
if self._port_subscription_mode == self.COLOR_DISTANCE_FLOAT:
color = data[4]
distance = data[5]
partial = data[7]
if partial:
distance += 1.0 / partial
self._notify_subscribers(color, float(distance))
elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY:
color = get_byte(data, 4)
elif self._port_subscription_mode == self.COLOR_ONLY:
color = data[4]
self._notify_subscribers(color)
elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
distance = get_byte(data, 4)
elif self._port_subscription_mode == self.DISTANCE_INCHES:
distance = data[4]
self._notify_subscribers(distance)
elif self._port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE:
distance = get_byte(data, 4)
elif self._port_subscription_mode == self.DISTANCE_HOW_CLOSE:
distance = data[4]
self._notify_subscribers(distance)
elif self._port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE:
distance = get_byte(data, 4)
elif self._port_subscription_mode == self.DISTANCE_SUBINCH_HOW_CLOSE:
distance = data[4]
self._notify_subscribers(distance)
elif self._port_subscription_mode == CDS_MODE_OFF1 or self._port_subscription_mode == CDS_MODE_OFF2:
elif self._port_subscription_mode == self.OFF1 or self._port_subscription_mode == self.OFF2:
log.info("Turned off led on %s", self)
elif self._port_subscription_mode == CDS_MODE_COUNT_2INCH:
elif self._port_subscription_mode == self.COUNT_2INCH:
count = unpack("<L", data[4:8])[0] # is it all 4 bytes or just 2?
self._notify_subscribers(count)
elif self._port_subscription_mode == CDS_MODE_STREAM_3_VALUES:
elif self._port_subscription_mode == self.STREAM_3_VALUES:
# TODO: understand better meaning of these 3 values
val1 = unpack("<H", data[4:6])[0]
val2 = unpack("<H", data[6:8])[0]
val3 = unpack("<H", data[8:10])[0]
self._notify_subscribers(val1, val2, val3)
elif self._port_subscription_mode == CDS_MODE_LUMINOSITY:
elif self._port_subscription_mode == self.LUMINOSITY:
luminosity = unpack("<H", data[4:6])[0] / 1023.0
self._notify_subscribers(luminosity)
else: # TODO: support whatever we forgot
@ -361,7 +371,7 @@ class Battery(Peripheral):
super(Battery, self).__init__(parent, port)
self.last_value = None
def subscribe(self, callback, mode=0, granularity=1):
def subscribe(self, callback, mode=0, granularity=1, async=False):
super(Battery, self).subscribe(callback, mode, granularity)
# we know only voltage subscription from it. is it really battery or just onboard voltage?
@ -383,7 +393,7 @@ class Button(Peripheral):
def __init__(self, parent):
super(Button, self).__init__(parent, 0)
def subscribe(self, callback, mode=None, granularity=1):
def subscribe(self, callback, mode=None, granularity=1, async=False):
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_INFO) + b'\x02\x02'
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
if callback:

View File

@ -1,6 +1,9 @@
import unittest
from binascii import unhexlify
from pylgbst import *
from pylgbst.comms import Connection
from pylgbst.movehub import MoveHub
HANDLE = MOVE_HUB_HARDWARE_HANDLE
@ -44,8 +47,14 @@ class ConnectionMock(Connection):
class HubMock(MoveHub):
# noinspection PyUnresolvedReferences
def __init__(self, connection=None):
"""
:type connection: ConnectionMock
"""
super(HubMock, self).__init__(connection if connection else ConnectionMock())
self.notify_mock = self.connection.notifications
self.writes = self.connection.writes
def _wait_for_devices(self):
pass
@ -65,33 +74,33 @@ class GeneralTest(unittest.TestCase):
hub = HubMock()
led = LED(hub, PORT_LED)
led.set_color(COLOR_RED)
self.assertEqual("0801813201510009", hub.connection.writes[0][1])
self.assertEqual("0801813201510009", hub.writes[0][1])
def test_tilt_sensor(self):
hub = HubMock()
hub.connection.notifications.append((HANDLE, '1b0e00 0f00 04 3a 0128000000000100000001'))
hub.notify_mock.append((HANDLE, '1b0e00 0f00 04 3a 0128000000000100000001'))
time.sleep(1)
def callback(param1, param2=None, param3=None):
if param2 is None:
log.debug("Tilt: %s", TILT_STATES[param1])
log.debug("Tilt: %s", TiltSensor.TILT_STATES[param1])
else:
log.debug("Tilt: %s %s %s", param1, param2, param3)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback)
hub.connection.notifications.append((HANDLE, "1b0e000500453a05"))
hub.connection.notifications.append((HANDLE, "1b0e000a00473a010100000001"))
hub.notify_mock.append((HANDLE, "1b0e000500453a05"))
hub.notify_mock.append((HANDLE, "1b0e000a00473a010100000001"))
time.sleep(1)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TILT_MODE_2AXIS_SIMPLE)
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_SIMPLE)
hub.connection.notifications.append((HANDLE, "1b0e000500453a09"))
hub.notify_mock.append((HANDLE, "1b0e000500453a09"))
time.sleep(1)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TILT_MODE_2AXIS_FULL)
hub.connection.notifications.append((HANDLE, "1b0e000600453a04fe"))
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_FULL)
hub.notify_mock.append((HANDLE, "1b0e000600453a04fe"))
time.sleep(1)
self._wait_notifications_handled(hub)
@ -133,7 +142,7 @@ class GeneralTest(unittest.TestCase):
def test_color_sensor(self):
#
hub = HubMock()
hub.connection.notifications.append((HANDLE, '1b0e000f00 04010125000000001000000010'))
hub.notify_mock.append((HANDLE, '1b0e000f00 04010125000000001000000010'))
time.sleep(1)
def callback(color, unk1, unk2=None):
@ -143,7 +152,7 @@ class GeneralTest(unittest.TestCase):
self._inject_notification(hub, '1b0e000a00 4701090100000001', 1)
hub.color_distance_sensor.subscribe(callback)
hub.connection.notifications.append((HANDLE, "1b0e0008004501ff0aff00"))
hub.notify_mock.append((HANDLE, "1b0e0008004501ff0aff00"))
time.sleep(1)
# TODO: assert
self._wait_notifications_handled(hub)
@ -158,8 +167,8 @@ class GeneralTest(unittest.TestCase):
hub.button.subscribe(callback)
hub.connection.notifications.append((HANDLE, "1b0e00060001020601"))
hub.connection.notifications.append((HANDLE, "1b0e00060001020600"))
hub.notify_mock.append((HANDLE, "1b0e00060001020601"))
hub.notify_mock.append((HANDLE, "1b0e00060001020600"))
time.sleep(1)
# TODO: assert
self._wait_notifications_handled(hub)
@ -168,6 +177,6 @@ class GeneralTest(unittest.TestCase):
def _inject_notification(self, hub, notify, pause):
def inject():
time.sleep(pause)
hub.connection.notifications.append((HANDLE, notify))
hub.notify_mock.append((HANDLE, notify))
Thread(target=inject).start()

View File

@ -3,6 +3,11 @@ import hashlib
import os
import re
import subprocess
import sys
import traceback
from pylgbst import *
from pylgbst.comms import DebugServerConnection
try:
import gtts
@ -27,8 +32,6 @@ except BaseException:
def say(text):
sys.stdout.write("%s\n", text)
from pylgbst import *
forward = FORWARD = right = RIGHT = 1
backward = BACKWARD = left = LEFT = -1
straight = STRAIGHT = 0

View File

@ -22,8 +22,8 @@ def on_btn(pressed):
running = False
def decode_xml(messageString):
parts = messageString.split("</")
def decode_xml(msg):
parts = msg.split("</")
xxx = float(parts[1][parts[1].rfind('>') + 1:])
yyy = float(parts[2][parts[2].rfind('>') + 1:])
zzz = float(parts[3][parts[3].rfind('>') + 1:])
@ -58,7 +58,7 @@ try:
message = data
except KeyboardInterrupt:
raise
except:
except BaseException:
break
if not message:

View File

@ -9,9 +9,9 @@ criterion = min
cur_luminosity = 0
def on_change_lum(lum):
def on_change_lum(lumn):
global cur_luminosity
cur_luminosity = lum
cur_luminosity = lumn
lum_values = {}
@ -23,12 +23,12 @@ def on_btn(pressed):
running = False
def on_turn(angle):
lum_values[angle] = cur_luminosity
def on_turn(angl):
lum_values[angl] = cur_luminosity
robot.button.subscribe(on_btn)
robot.color_distance_sensor.subscribe(on_change_lum, CDS_MODE_LUMINOSITY, granularity=1)
robot.color_distance_sensor.subscribe(on_change_lum, ColorDistanceSensor.LUMINOSITY, granularity=1)
robot.motor_A.subscribe(on_turn, granularity=30)
# TODO: add bump detect to go back?
while running:

View File

@ -5,7 +5,7 @@ robot = Vernie()
robot.say('type commands')
def confirmation(cmd):
def confirmation(_):
robot.say("ok")

View File

@ -5,8 +5,8 @@ robot = Vernie()
robot.say("commands from file")
def confirmation(cmd):
robot.say(cmd[0])
def confirmation(command):
robot.say(command[0])
with open("vernie.commands") as fhd: