1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

refactoring

This commit is contained in:
Andrey Pohilko 2017-09-14 09:35:22 +03:00
parent c55d796683
commit cded44a3b2
4 changed files with 56 additions and 20 deletions

View File

@ -8,8 +8,17 @@ log = logging.getLogger('movehub')
class MoveHub(object):
"""
:type connection: pylegoboost.comms.Connection
:type led: LED
:type devices: dict[int,Peripheral]
:type led: LED
:type tilt_sensor: TiltSensor
:type button: Button
:type color_distance_sensor: ColorDistanceSensor
:type external_motor: EncodedMotor
:type port_C: Peripheral
:type port_D: Peripheral
:type motor_A: EncodedMotor
:type motor_B: EncodedMotor
:type motor_AB: EncodedMotor
"""
def __init__(self, connection=None):
@ -20,6 +29,7 @@ class MoveHub(object):
self.devices = {}
# shorthand fields
self.button = Button(self)
self.led = None
self.motor_A = None
self.motor_B = None
@ -27,20 +37,25 @@ class MoveHub(object):
self.tilt_sensor = None
self.color_distance_sensor = None
self.external_motor = None
self.button = Button(self)
self.port_C = None
self.port_D = None
self._wait_for_devices()
def _wait_for_devices(self):
# enables notifications reading
self.connection.set_notify_handler(self._notify)
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
while None in (self.led, self.motor_A, self.motor_B, self.motor_AB, self.tilt_sensor):
log.debug("Waiting to be notified about devices...")
time.sleep(0.1)
self.port_C = None
self.port_D = None
# transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01')
builtin_devices = ()
for num in range(0, 60):
builtin_devices = (self.led, self.motor_A, self.motor_B, self.motor_AB, self.tilt_sensor, self.button)
if None not in builtin_devices:
return
log.debug("Waiting for builtin devices to appear: %s", builtin_devices)
time.sleep(1)
log.warning("Got only these devices: %s", builtin_devices)
raise RuntimeError("Failed to obtain all builtin devices")
def get_name(self):
# note: reading this too fast makes it hang

View File

@ -86,3 +86,20 @@ DEVICE_TYPES = {
STATUS_STARTED = 0x01
STATUS_CONFLICT = 0x05
STATUS_FINISHED = 0x0a
# TILT
TILT_HORIZ = 0x00
TILT_UP = 0x01
TILT_DOWN = 0x02
TILT_RIGHT = 0x03
TILT_LEFT = 0x04
TILT_INVERT = 0x05
TILT_STATES = {
TILT_HORIZ: "BACK",
TILT_UP: "UP",
TILT_DOWN: "DOWN",
TILT_RIGHT: "RIGHT",
TILT_LEFT: "LEFT",
TILT_INVERT: "FRONT",
}

View File

@ -82,7 +82,7 @@ class EncodedMotor(Peripheral):
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
# time
msec = int(seconds * 1000)
if msec > 255 * 255:
if msec >= pow(2, 16):
raise ValueError("Too large value for seconds: %s", seconds)
command += struct.pack('<H', msec)
@ -113,8 +113,7 @@ class TiltSensor(Peripheral):
class Button(Peripheral):
def __init__(self, parent, port):
del port
def __init__(self, parent):
super(Button, self).__init__(parent, None)

19
test.py
View File

@ -3,7 +3,7 @@ import time
import unittest
from threading import Thread
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB, Peripheral
from pylgbst.comms import Connection, str2hex, hex2str
from pylgbst.constants import PORT_LED
@ -50,19 +50,24 @@ class ConnectionMock(Connection):
return None # TODO
class HubMock(MoveHub):
def _wait_for_devices(self):
pass
class GeneralTest(unittest.TestCase):
def test_led(self):
conn = ConnectionMock()
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
hub = MoveHub(conn)
hub = HubMock(conn)
led = LED(hub, PORT_LED)
led.set_color(COLOR_RED)
self.assertEquals("0701813211510009", conn.writes[1][1])
self.assertEquals("0701813211510009", conn.writes[0][1])
def test_tilt_sensor(self):
conn = ConnectionMock()
conn.notifications.append((14, '1b0e000f00043a0128000000000100000001'))
hub = MoveHub(conn)
hub = HubMock(conn)
def callback():
pass
@ -73,12 +78,12 @@ class GeneralTest(unittest.TestCase):
def test_motor(self):
conn = ConnectionMock()
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
hub = MoveHub(conn)
hub = HubMock(conn)
motor = EncodedMotor(hub, PORT_AB)
motor.timed(1.5)
self.assertEquals("0c018139110adc056464647f03", conn.writes[1][1])
self.assertEquals("0c018139110adc056464647f03", conn.writes[0][1])
motor.angled(90)
self.assertEquals("0e018139110c5a0000006464647f03", conn.writes[2][1])
self.assertEquals("0e018139110c5a0000006464647f03", conn.writes[1][1])
def test_capabilities(self):
conn = ConnectionMock()