From 59e04b4472bb0ddca50af3d4f85eaea6b74db2f8 Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Thu, 14 Sep 2017 08:35:57 +0300 Subject: [PATCH] REfactoring --- README.md | 18 +++++- pylgbst/__init__.py | 129 ++++++----------------------------------- pylgbst/constants.py | 4 +- pylgbst/peripherals.py | 113 ++++++++++++++++++++++++++++++++++++ test.py | 11 ++++ 5 files changed, 160 insertions(+), 115 deletions(-) create mode 100644 pylgbst/peripherals.py diff --git a/README.md b/README.md index f8abb0d..bb44d55 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,22 @@ Best way to start is to look at [demo.py](demo.py) file, and run it. - angled and timed movement for motors - LED color change -## Ideas +## Usage -Make it 2/3 compatible -Add travis unit tests and coverage +```python +from pylgbst import MoveHub + +hub=MoveHub() +print (hub.get_name()) +for device in hub.devices: + print (device) +``` + +## Roadmap + +- Make it 2/3 compatible +- Add travis unit tests and coverage +- Give nice documentation examples, don't forget to mention logging ## Links diff --git a/pylgbst/__init__.py b/pylgbst/__init__.py index e30098b..153fc7d 100644 --- a/pylgbst/__init__.py +++ b/pylgbst/__init__.py @@ -1,9 +1,6 @@ -import logging -import struct -import time - -from pylgbst.comms import str2hex +from pylgbst.comms import * from pylgbst.constants import * +from pylgbst.peripherals import * log = logging.getLogger('movehub') @@ -15,7 +12,10 @@ class MoveHub(object): :type devices: dict[int,Peripheral] """ - def __init__(self, connection): + def __init__(self, connection=None): + if not connection: + connection = BLEConnection() + self.connection = connection self.devices = {} @@ -24,7 +24,6 @@ class MoveHub(object): self.motor_A = None self.motor_B = None self.motor_AB = None - self.tilt_sensor = None self.color_distance_sensor = None # self.button @@ -33,7 +32,7 @@ class MoveHub(object): self.connection.set_notify_handler(self._notify) self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE) - while not self.devices: + while len(self.devices): log.debug("Waiting to be notified about devices...") time.sleep(0.1) @@ -44,7 +43,7 @@ class MoveHub(object): def get_name(self): # note: reading this too fast makes it hang - self.connection.read(DEVICE_NAME) + return self.connection.read(DEVICE_NAME) def _notify(self, handle, data): """ @@ -119,108 +118,16 @@ class MoveHub(object): log.warning("Unhandled port: %s", PORTS[port]) -class Peripheral(object): - """ - :type parent: MoveHub - """ - PACKET_VER = b'\x01' - SET_PORT_VAL = b'\x81' +LISTEN_COLOR_SENSOR_ON_C = b'\x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01' +LISTEN_COLOR_SENSOR_ON_D = b'\x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01' - def __init__(self, parent, port): - super(Peripheral, self).__init__() - self.parent = parent - self.port = port - self.working = False +LISTEN_DIST_SENSOR_ON_C = b'\x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01' +LISTEN_DIST_SENSOR_ON_D = b'\x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01' - def _set_port_val(self, value): - cmd = self.PACKET_VER + self.SET_PORT_VAL + chr(self.port) - cmd += value +LISTEN_ENCODER_ON_A = b'\x0a\x00 \x41\x37 \x02\x01\x00\x00\x00\x01' +LISTEN_ENCODER_ON_B = b'\x0a\x00 \x41\x38 \x02\x01\x00\x00\x00\x01' +LISTEN_ENCODER_ON_C = b'\x0a\x00 \x41\x01 \x02\x01\x00\x00\x00\x01' +LISTEN_ENCODER_ON_D = b'\x0a\x00 \x41\x02 \x02\x01\x00\x00\x00\x01' - self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(cmd)) + cmd) - - def started(self): - self.working = True - - def finished(self): - self.working = False - - -class LED(Peripheral): - def set_color(self, color): - if color not in COLORS: - raise ValueError("Color %s is not in list of available colors" % color) - - cmd = '\x11\x51\x00' + chr(color) - self._set_port_val(cmd) - - -class EncodedMotor(Peripheral): - TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something - MOVEMENT_TYPE = b'\x11' - TIMED_SINGLE = b'\x09' - TIMED_GROUP = b'\x0A' - ANGLED_SINGLE = b'\x0B' - ANGLED_GROUP = b'\x0C' - - def __init__(self, parent, port): - super(EncodedMotor, self).__init__(parent, port) - if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]: - raise ValueError("Invalid port for motor: %s" % port) - - def _speed_abs(self, relative): - if relative < -1 or relative > 1: - raise ValueError("Invalid speed value: %s", relative) - - absolute = round(relative * 100) - if absolute < 0: - absolute += 255 - return int(absolute) - - def _wrap_and_write(self, command, speed_primary, speed_secondary): - # set for port - command = self.MOVEMENT_TYPE + command - - command += chr(self._speed_abs(speed_primary)) - if self.port == PORT_AB: - command += chr(self._speed_abs(speed_secondary)) - - command += self.TRAILER - - self._set_port_val(command) - - def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): - if speed_secondary is None: - speed_secondary = speed_primary - - # movement type - command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE - # time - msec = int(seconds * 1000) - if msec > 255 * 255: - raise ValueError("Too large value for seconds: %s", seconds) - command += struct.pack(' 1: + raise ValueError("Invalid speed value: %s", relative) + + absolute = round(relative * 100) + if absolute < 0: + absolute += 255 + return int(absolute) + + def _wrap_and_write(self, command, speed_primary, speed_secondary): + # set for port + command = self.MOVEMENT_TYPE + command + + command += chr(self._speed_abs(speed_primary)) + if self.port == PORT_AB: + command += chr(self._speed_abs(speed_secondary)) + + command += self.TRAILER + + self._set_port_val(command) + + def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): + if speed_secondary is None: + speed_secondary = speed_primary + + # movement type + command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE + # time + msec = int(seconds * 1000) + if msec > 255 * 255: + raise ValueError("Too large value for seconds: %s", seconds) + command += struct.pack('