1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

refactored code to detect devices

This commit is contained in:
Andrey Pohilko 2017-09-13 20:14:17 +03:00
parent 5b81e86632
commit e2fe66ac2d
6 changed files with 209 additions and 59 deletions

View File

@ -9,7 +9,8 @@ Best way to start is to look at [demo.py](demo.py) file, or maybe run it.
# Ideas
Make it 2/3 compatible
Add travis unit tests and coverage
# Links

36
demo.py
View File

@ -12,7 +12,7 @@ def demo_all(movehub):
demo_led_colors(movehub)
demo_motors_timed(movehub)
demo_motors_angled(movehub)
demo_port_c_motor(movehub)
demo_port_cd_motor(movehub)
def demo_led_colors(movehub):
@ -51,12 +51,20 @@ def demo_motors_angled(movehub):
sleep(1)
def demo_port_c_motor(movehub):
portd = EncodedMotor(movehub, PORT_D)
portd.angled(90, 1)
sleep(1)
portd.angled(90, -1)
sleep(1)
def demo_port_cd_motor(movehub):
motor = None
if isinstance(movehub.port_D, EncodedMotor):
motor = movehub.port_D
elif isinstance(movehub.port_C, EncodedMotor):
motor = movehub.port_D
else:
log.warning("Motor not found on ports C or D")
if motor:
motor.angled(20, 1)
sleep(1)
motor.angled(20, -1)
sleep(1)
def vernie_head(movehub):
@ -74,7 +82,7 @@ def vernie_head(movehub):
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
try:
connection = DebugServerConnection()
@ -84,10 +92,8 @@ if __name__ == '__main__':
hub = MoveHub(connection)
sleep(1)
#hub.get_name()
for x in range(1, 60):
sleep(1)
# demo_led_colors(hub)
# demo_all(movehub)
# hub.get_name()
demo_port_cd_motor(hub)
# demo_all(hub)
# demo_led_colors(hub)
sleep(1)

View File

@ -11,20 +11,22 @@ class MoveHub(object):
"""
:type connection: pylegoboost.comms.Connection
:type led: LED
:type devices: dict[int,Peripheral]
"""
def __init__(self, connection):
self.notified = False
self.connection = connection
self.devices = {}
self.led = LED(self)
self.motor_A = EncodedMotor(self, PORT_A)
self.motor_B = EncodedMotor(self, PORT_B)
self.motor_AB = EncodedMotor(self, PORT_AB)
# shorthand fields
self.led = None
self.motor_A = None
self.motor_B = None
self.motor_AB = None
self.tilt_sensor = None
self.color_distance_sensor = None
# self.button
# self.tilt_sensor
# enables notifications reading
self.connection.set_notify_handler(self._notify)
@ -34,28 +36,90 @@ class MoveHub(object):
# log.debug("Waiting to be notified")
# time.sleep(1)
# self.port_C = None
# self.port_D = None
self.port_C = None
self.port_D = None
# transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01')
def _notify(self, handle, data):
# TODO
log.debug("Notification on %s: %s", handle, data.encode("hex"))
def get_name(self):
# note: reading this too fast makes it hang
self.connection.read(DEVICE_NAME)
def _notify(self, handle, data):
"""
Using https://github.com/JorgePe/BOOSTreveng/blob/master/Notifications.md
"""
orig = data
log.debug("Notification on %s: %s", handle, orig.encode("hex"))
data = data[3:]
msg_type = ord(data[2])
if msg_type == MSG_PORT_INFO:
self._handle_port_info(data)
else:
log.warning("Unhandled msg type %s: %s", msg_type, orig.encode("hex"))
pass
def _handle_port_info(self, data):
port = ord(data[3])
dev_type = ord(data[5])
if port in PORTS and dev_type in DEVICE_TYPES:
log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port])
else:
log.debug("Device 0x%x at port 0x%x", dev_type, port)
if dev_type == TYPE_MOTOR:
self.devices[port] = EncodedMotor(self, port)
elif dev_type == TYPE_IMOTOR:
self.devices[port] = EncodedMotor(self, port)
elif dev_type == TYPE_DISTANCE_COLOR_SENSOR:
self.devices[port] = ColorDistanceSensor(self, port)
self.color_distance_sensor = self.devices[port]
elif dev_type == TYPE_LED:
self.devices[port] = LED(self, port)
elif dev_type == TYPE_TILT_SENSOR:
self.devices[port] = TiltSensor(self, port)
else:
log.warning("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
if port == PORT_A:
self.motor_A = self.devices[port]
elif port == PORT_B:
self.motor_B = self.devices[port]
elif port == PORT_AB:
self.motor_AB = self.devices[port]
elif port == PORT_C:
self.port_C = self.devices[port]
elif port == PORT_D:
self.port_D = self.devices[port]
elif port == PORT_LED:
self.led = self.devices[port]
elif port == PORT_TILT_SENSOR:
self.tilt_sensor = self.devices[port]
else:
log.warning("Unhandled port: %s", PORTS[port])
class Peripheral(object):
"""
:type parent: MoveHub
"""
PACKET_VER = b'\x01'
SET_PORT_VAL = b'\x81'
def __init__(self, parent):
def __init__(self, parent, port):
super(Peripheral, self).__init__()
self.parent = parent
self.port = port
def _set_port_val(self, value):
cmd = self.PACKET_VER + self.SET_PORT_VAL + chr(self.port)
cmd += value
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(cmd)) + cmd)
class LED(Peripheral):
@ -63,14 +127,12 @@ class LED(Peripheral):
if color not in COLORS:
raise ValueError("Color %s is not in list of available colors" % color)
cmd = CMD_SET_COLOR + chr(color)
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, cmd)
cmd = '\x11\x51\x00' + chr(color)
self._set_port_val(cmd)
class EncodedMotor(Peripheral):
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
PACKET_VER = b'\x01'
SET_PORT_VAL = b'\x81'
MOVEMENT_TYPE = b'\x11'
TIMED_SINGLE = b'\x09'
TIMED_GROUP = b'\x0A'
@ -78,10 +140,9 @@ class EncodedMotor(Peripheral):
ANGLED_GROUP = b'\x0C'
def __init__(self, parent, port):
super(EncodedMotor, self).__init__(parent)
if port not in PORTS:
super(EncodedMotor, self).__init__(parent, port)
if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]:
raise ValueError("Invalid port for motor: %s" % port)
self.port = port
def _speed_abs(self, relative):
if relative < -1 or relative > 1:
@ -94,7 +155,7 @@ class EncodedMotor(Peripheral):
def _wrap_and_write(self, command, speed_primary, speed_secondary):
# set for port
command = self.SET_PORT_VAL + chr(self.port) + self.MOVEMENT_TYPE + command
command = self.MOVEMENT_TYPE + command
command += chr(self._speed_abs(speed_primary))
if self.port == PORT_AB:
@ -102,7 +163,7 @@ class EncodedMotor(Peripheral):
command += self.TRAILER
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(command) + 1) + self.PACKET_VER + command)
self._set_port_val(command)
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
if speed_secondary is None:
@ -136,3 +197,7 @@ class EncodedMotor(Peripheral):
class ColorDistanceSensor(Peripheral):
pass
class TiltSensor(Peripheral):
pass

View File

@ -49,19 +49,6 @@ class Connection(object):
pass
class ConnectionMock(Connection):
"""
For unit testing purposes
"""
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data.encode("hex"))
def read(self, handle):
log.debug("Reading from: %s", handle)
return None # TODO
class BLEConnection(Connection):
"""
Main transport class, uses real Bluetooth LE connection.

View File

@ -33,13 +33,47 @@ COLORS = {
COLOR_WHITE: "WHITE"
}
CMD_SET_COLOR = b'\x08\x00\x81\x32\x11\x51\x00'
# PORTS
PORT_C = 0x01
PORT_D = 0x02
PORT_LED = 0x32
PORT_A = 0x37
PORT_B = 0x38
PORT_AB = 0x39
PORT_C = 0x01
PORT_D = 0x02
PORT_TILT_SENSOR = 0x3a
PORT_SOMETHING1 = 0x3B
PORT_SOMETHING2 = 0x3C
PORTS = [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]
PORTS = {
PORT_A: "A",
PORT_B: "B",
PORT_AB: "AB",
PORT_C: "C",
PORT_D: "D",
PORT_LED: "LED",
PORT_TILT_SENSOR: "TILT_SENSOR",
PORT_SOMETHING1: "UNK1",
PORT_SOMETHING2: "UNK2",
}
# NOTIFICATIONS
MSG_PORT_INFO = 0x04
TYPE_DISTANCE_COLOR_SENSOR = 0x25
TYPE_IMOTOR = 0x26
TYPE_MOTOR = 0x27
TYPE_TILT_SENSOR = 0x28
TYPE_LED = 0x17
# one of them is button? another is battery?
TYPE_SOMETHING1 = 0x15
TYPE_SOMETHING2 = 0x14
DEVICE_TYPES = {
TYPE_DISTANCE_COLOR_SENSOR: "DISTANCE_COLOR_SENSOR",
TYPE_IMOTOR: "IMOTOR",
TYPE_MOTOR: "MOTOR",
TYPE_TILT_SENSOR: "TILT_SENSOR",
TYPE_LED: "LED",
TYPE_SOMETHING1: "UNK1",
TYPE_SOMETHING2: "UNK2",
}

61
test.py
View File

@ -1,13 +1,70 @@
import logging
import time
import unittest
from threading import Thread
from demo import demo_all
from pylegoboost.comms import ConnectionMock
from pylegoboost import MoveHub
from pylegoboost.comms import Connection
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')
class ConnectionMock(Connection):
"""
For unit testing purposes
"""
def __init__(self):
super(ConnectionMock, self).__init__()
self.notifications = []
self.notification_handler = None
self.running = True
self.finished = False
def set_notify_handler(self, handler):
self.notification_handler = handler
thr = Thread(target=self.notifier)
thr.setDaemon(True)
thr.start()
def notifier(self):
while self.running:
if self.notification_handler:
while self.notifications:
handle, data = self.notifications.pop(0)
self.notification_handler(handle, data.replace(' ', '').decode('hex'))
time.sleep(0.1)
self.finished = True
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data.encode("hex"))
def read(self, handle):
log.debug("Reading from: %s", handle)
return None # TODO
class GeneralTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
demo_all(conn)
hub = MoveHub(conn)
time.sleep(1)
conn.notifications.append((14, '1b0e00 0f00 04 01 0125000000001000000010'))
conn.notifications.append((14, '1b0e00 0f00 04 02 0126000000001000000010'))
conn.notifications.append((14, '1b0e00 0f00 04 37 0127000100000001000000'))
conn.notifications.append((14, '1b0e00 0f00 04 38 0127000100000001000000'))
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
conn.notifications.append((14, '1b0e00 0f00 04 32 0117000100000001000000'))
conn.notifications.append((14, '1b0e00 0f00 04 3a 0128000000000100000001'))
conn.notifications.append((14, '1b0e00 0f00 04 3b 0115000200000002000000'))
conn.notifications.append((14, '1b0e00 0f00 04 3c 0114000200000002000000'))
time.sleep(1)
demo_all(hub)
conn.running = False
while not conn.finished:
time.sleep(0.1)