mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
REfactoring
This commit is contained in:
parent
0cb01d5e30
commit
59e04b4472
18
README.md
18
README.md
@ -12,10 +12,22 @@ Best way to start is to look at [demo.py](demo.py) file, and run it.
|
|||||||
- angled and timed movement for motors
|
- angled and timed movement for motors
|
||||||
- LED color change
|
- LED color change
|
||||||
|
|
||||||
## Ideas
|
## Usage
|
||||||
|
|
||||||
Make it 2/3 compatible
|
```python
|
||||||
Add travis unit tests and coverage
|
from pylgbst import MoveHub
|
||||||
|
|
||||||
|
hub=MoveHub()
|
||||||
|
print (hub.get_name())
|
||||||
|
for device in hub.devices:
|
||||||
|
print (device)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Roadmap
|
||||||
|
|
||||||
|
- Make it 2/3 compatible
|
||||||
|
- Add travis unit tests and coverage
|
||||||
|
- Give nice documentation examples, don't forget to mention logging
|
||||||
|
|
||||||
## Links
|
## Links
|
||||||
|
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
import logging
|
from pylgbst.comms import *
|
||||||
import struct
|
|
||||||
import time
|
|
||||||
|
|
||||||
from pylgbst.comms import str2hex
|
|
||||||
from pylgbst.constants import *
|
from pylgbst.constants import *
|
||||||
|
from pylgbst.peripherals import *
|
||||||
|
|
||||||
log = logging.getLogger('movehub')
|
log = logging.getLogger('movehub')
|
||||||
|
|
||||||
@ -15,7 +12,10 @@ class MoveHub(object):
|
|||||||
:type devices: dict[int,Peripheral]
|
:type devices: dict[int,Peripheral]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, connection):
|
def __init__(self, connection=None):
|
||||||
|
if not connection:
|
||||||
|
connection = BLEConnection()
|
||||||
|
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.devices = {}
|
self.devices = {}
|
||||||
|
|
||||||
@ -24,7 +24,6 @@ class MoveHub(object):
|
|||||||
self.motor_A = None
|
self.motor_A = None
|
||||||
self.motor_B = None
|
self.motor_B = None
|
||||||
self.motor_AB = None
|
self.motor_AB = None
|
||||||
|
|
||||||
self.tilt_sensor = None
|
self.tilt_sensor = None
|
||||||
self.color_distance_sensor = None
|
self.color_distance_sensor = None
|
||||||
# self.button
|
# self.button
|
||||||
@ -33,7 +32,7 @@ class MoveHub(object):
|
|||||||
self.connection.set_notify_handler(self._notify)
|
self.connection.set_notify_handler(self._notify)
|
||||||
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
|
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
|
||||||
|
|
||||||
while not self.devices:
|
while len(self.devices):
|
||||||
log.debug("Waiting to be notified about devices...")
|
log.debug("Waiting to be notified about devices...")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
@ -44,7 +43,7 @@ class MoveHub(object):
|
|||||||
|
|
||||||
def get_name(self):
|
def get_name(self):
|
||||||
# note: reading this too fast makes it hang
|
# note: reading this too fast makes it hang
|
||||||
self.connection.read(DEVICE_NAME)
|
return self.connection.read(DEVICE_NAME)
|
||||||
|
|
||||||
def _notify(self, handle, data):
|
def _notify(self, handle, data):
|
||||||
"""
|
"""
|
||||||
@ -119,108 +118,16 @@ class MoveHub(object):
|
|||||||
log.warning("Unhandled port: %s", PORTS[port])
|
log.warning("Unhandled port: %s", PORTS[port])
|
||||||
|
|
||||||
|
|
||||||
class Peripheral(object):
|
LISTEN_COLOR_SENSOR_ON_C = b'\x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01'
|
||||||
"""
|
LISTEN_COLOR_SENSOR_ON_D = b'\x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01'
|
||||||
:type parent: MoveHub
|
|
||||||
"""
|
|
||||||
PACKET_VER = b'\x01'
|
|
||||||
SET_PORT_VAL = b'\x81'
|
|
||||||
|
|
||||||
def __init__(self, parent, port):
|
LISTEN_DIST_SENSOR_ON_C = b'\x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01'
|
||||||
super(Peripheral, self).__init__()
|
LISTEN_DIST_SENSOR_ON_D = b'\x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01'
|
||||||
self.parent = parent
|
|
||||||
self.port = port
|
|
||||||
self.working = False
|
|
||||||
|
|
||||||
def _set_port_val(self, value):
|
LISTEN_ENCODER_ON_A = b'\x0a\x00 \x41\x37 \x02\x01\x00\x00\x00\x01'
|
||||||
cmd = self.PACKET_VER + self.SET_PORT_VAL + chr(self.port)
|
LISTEN_ENCODER_ON_B = b'\x0a\x00 \x41\x38 \x02\x01\x00\x00\x00\x01'
|
||||||
cmd += value
|
LISTEN_ENCODER_ON_C = b'\x0a\x00 \x41\x01 \x02\x01\x00\x00\x00\x01'
|
||||||
|
LISTEN_ENCODER_ON_D = b'\x0a\x00 \x41\x02 \x02\x01\x00\x00\x00\x01'
|
||||||
|
|
||||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(cmd)) + cmd)
|
LISTEN_TILT_BASIC = b'\x0a\x00 \x41\x3a \x02\x01\x00\x00\x00\x01'
|
||||||
|
LISTEN_TILT_FULL = b'\x0a\x00 \x41\x3a \x00\x01\x00\x00\x00\x01'
|
||||||
def started(self):
|
|
||||||
self.working = True
|
|
||||||
|
|
||||||
def finished(self):
|
|
||||||
self.working = False
|
|
||||||
|
|
||||||
|
|
||||||
class LED(Peripheral):
|
|
||||||
def set_color(self, color):
|
|
||||||
if color not in COLORS:
|
|
||||||
raise ValueError("Color %s is not in list of available colors" % color)
|
|
||||||
|
|
||||||
cmd = '\x11\x51\x00' + chr(color)
|
|
||||||
self._set_port_val(cmd)
|
|
||||||
|
|
||||||
|
|
||||||
class EncodedMotor(Peripheral):
|
|
||||||
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
|
|
||||||
MOVEMENT_TYPE = b'\x11'
|
|
||||||
TIMED_SINGLE = b'\x09'
|
|
||||||
TIMED_GROUP = b'\x0A'
|
|
||||||
ANGLED_SINGLE = b'\x0B'
|
|
||||||
ANGLED_GROUP = b'\x0C'
|
|
||||||
|
|
||||||
def __init__(self, parent, port):
|
|
||||||
super(EncodedMotor, self).__init__(parent, port)
|
|
||||||
if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]:
|
|
||||||
raise ValueError("Invalid port for motor: %s" % port)
|
|
||||||
|
|
||||||
def _speed_abs(self, relative):
|
|
||||||
if relative < -1 or relative > 1:
|
|
||||||
raise ValueError("Invalid speed value: %s", relative)
|
|
||||||
|
|
||||||
absolute = round(relative * 100)
|
|
||||||
if absolute < 0:
|
|
||||||
absolute += 255
|
|
||||||
return int(absolute)
|
|
||||||
|
|
||||||
def _wrap_and_write(self, command, speed_primary, speed_secondary):
|
|
||||||
# set for port
|
|
||||||
command = self.MOVEMENT_TYPE + command
|
|
||||||
|
|
||||||
command += chr(self._speed_abs(speed_primary))
|
|
||||||
if self.port == PORT_AB:
|
|
||||||
command += chr(self._speed_abs(speed_secondary))
|
|
||||||
|
|
||||||
command += self.TRAILER
|
|
||||||
|
|
||||||
self._set_port_val(command)
|
|
||||||
|
|
||||||
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
|
||||||
if speed_secondary is None:
|
|
||||||
speed_secondary = speed_primary
|
|
||||||
|
|
||||||
# movement type
|
|
||||||
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
|
|
||||||
# time
|
|
||||||
msec = int(seconds * 1000)
|
|
||||||
if msec > 255 * 255:
|
|
||||||
raise ValueError("Too large value for seconds: %s", seconds)
|
|
||||||
command += struct.pack('<H', msec)
|
|
||||||
|
|
||||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
|
||||||
|
|
||||||
if not async:
|
|
||||||
time.sleep(seconds)
|
|
||||||
|
|
||||||
def angled(self, angle, speed_primary=1, speed_secondary=None):
|
|
||||||
if speed_secondary is None:
|
|
||||||
speed_secondary = speed_primary
|
|
||||||
|
|
||||||
# movement type
|
|
||||||
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
|
|
||||||
# angle
|
|
||||||
command += struct.pack('<I', angle)
|
|
||||||
|
|
||||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
|
||||||
# TODO: how to tell when motor has stopped?
|
|
||||||
|
|
||||||
|
|
||||||
class ColorDistanceSensor(Peripheral):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TiltSensor(Peripheral):
|
|
||||||
pass
|
|
||||||
|
@ -56,10 +56,12 @@ PORTS = {
|
|||||||
PORT_SOMETHING2: "UNK2",
|
PORT_SOMETHING2: "UNK2",
|
||||||
}
|
}
|
||||||
|
|
||||||
# NOTIFICATIONS
|
# PACKET TYPES
|
||||||
MSG_PORT_INFO = 0x04
|
MSG_PORT_INFO = 0x04
|
||||||
MSG_PORT_STATUS = 0x82
|
MSG_PORT_STATUS = 0x82
|
||||||
|
MSG_SET_PORT_VAL = 0x81
|
||||||
|
|
||||||
|
# NOTIFICATIONS
|
||||||
TYPE_DISTANCE_COLOR_SENSOR = 0x25
|
TYPE_DISTANCE_COLOR_SENSOR = 0x25
|
||||||
TYPE_IMOTOR = 0x26
|
TYPE_IMOTOR = 0x26
|
||||||
TYPE_MOTOR = 0x27
|
TYPE_MOTOR = 0x27
|
||||||
|
113
pylgbst/peripherals.py
Normal file
113
pylgbst/peripherals.py
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
import struct
|
||||||
|
import time
|
||||||
|
|
||||||
|
from pylgbst.constants import *
|
||||||
|
|
||||||
|
|
||||||
|
class Peripheral(object):
|
||||||
|
"""
|
||||||
|
:type parent: MoveHub
|
||||||
|
"""
|
||||||
|
PACKET_VER = b'\x01'
|
||||||
|
|
||||||
|
def __init__(self, parent, port):
|
||||||
|
super(Peripheral, self).__init__()
|
||||||
|
self.parent = parent
|
||||||
|
self.port = port
|
||||||
|
self.working = False
|
||||||
|
|
||||||
|
def _set_port_val(self, value):
|
||||||
|
cmd = self.PACKET_VER + chr(MSG_SET_PORT_VAL) + chr(self.port)
|
||||||
|
cmd += value
|
||||||
|
|
||||||
|
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(cmd)) + cmd) # should we +1 cmd len here?
|
||||||
|
|
||||||
|
def started(self):
|
||||||
|
self.working = True
|
||||||
|
|
||||||
|
def finished(self):
|
||||||
|
self.working = False
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port])
|
||||||
|
|
||||||
|
|
||||||
|
class LED(Peripheral):
|
||||||
|
def set_color(self, color):
|
||||||
|
if color not in COLORS:
|
||||||
|
raise ValueError("Color %s is not in list of available colors" % color)
|
||||||
|
|
||||||
|
cmd = '\x11\x51\x00' + chr(color)
|
||||||
|
self._set_port_val(cmd)
|
||||||
|
|
||||||
|
|
||||||
|
class EncodedMotor(Peripheral):
|
||||||
|
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
|
||||||
|
MOVEMENT_TYPE = b'\x11'
|
||||||
|
TIMED_SINGLE = b'\x09'
|
||||||
|
TIMED_GROUP = b'\x0A'
|
||||||
|
ANGLED_SINGLE = b'\x0B'
|
||||||
|
ANGLED_GROUP = b'\x0C'
|
||||||
|
|
||||||
|
def __init__(self, parent, port):
|
||||||
|
super(EncodedMotor, self).__init__(parent, port)
|
||||||
|
if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]:
|
||||||
|
raise ValueError("Invalid port for motor: %s" % port)
|
||||||
|
|
||||||
|
def _speed_abs(self, relative):
|
||||||
|
if relative < -1 or relative > 1:
|
||||||
|
raise ValueError("Invalid speed value: %s", relative)
|
||||||
|
|
||||||
|
absolute = round(relative * 100)
|
||||||
|
if absolute < 0:
|
||||||
|
absolute += 255
|
||||||
|
return int(absolute)
|
||||||
|
|
||||||
|
def _wrap_and_write(self, command, speed_primary, speed_secondary):
|
||||||
|
# set for port
|
||||||
|
command = self.MOVEMENT_TYPE + command
|
||||||
|
|
||||||
|
command += chr(self._speed_abs(speed_primary))
|
||||||
|
if self.port == PORT_AB:
|
||||||
|
command += chr(self._speed_abs(speed_secondary))
|
||||||
|
|
||||||
|
command += self.TRAILER
|
||||||
|
|
||||||
|
self._set_port_val(command)
|
||||||
|
|
||||||
|
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
||||||
|
if speed_secondary is None:
|
||||||
|
speed_secondary = speed_primary
|
||||||
|
|
||||||
|
# movement type
|
||||||
|
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
|
||||||
|
# time
|
||||||
|
msec = int(seconds * 1000)
|
||||||
|
if msec > 255 * 255:
|
||||||
|
raise ValueError("Too large value for seconds: %s", seconds)
|
||||||
|
command += struct.pack('<H', msec)
|
||||||
|
|
||||||
|
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||||
|
|
||||||
|
if not async:
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
def angled(self, angle, speed_primary=1, speed_secondary=None):
|
||||||
|
if speed_secondary is None:
|
||||||
|
speed_secondary = speed_primary
|
||||||
|
|
||||||
|
# movement type
|
||||||
|
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
|
||||||
|
# angle
|
||||||
|
command += struct.pack('<I', angle)
|
||||||
|
|
||||||
|
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||||
|
# TODO: how to tell when motor has stopped?
|
||||||
|
|
||||||
|
|
||||||
|
class ColorDistanceSensor(Peripheral):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TiltSensor(Peripheral):
|
||||||
|
pass
|
11
test.py
11
test.py
@ -59,6 +59,17 @@ class GeneralTest(unittest.TestCase):
|
|||||||
led.set_color(COLOR_RED)
|
led.set_color(COLOR_RED)
|
||||||
self.assertEquals("0701813211510009", conn.writes[1][1])
|
self.assertEquals("0701813211510009", conn.writes[1][1])
|
||||||
|
|
||||||
|
def test_tilt_sensor(self):
|
||||||
|
conn = ConnectionMock()
|
||||||
|
conn.notifications.append((14, '1b0e000f00043a0128000000000100000001'))
|
||||||
|
hub = MoveHub(conn)
|
||||||
|
|
||||||
|
def callback():
|
||||||
|
pass
|
||||||
|
|
||||||
|
hub.tilt_sensor.subscribe(callback)
|
||||||
|
self.assertEquals("0701813211510009", conn.writes[1][1])
|
||||||
|
|
||||||
def test_motor(self):
|
def test_motor(self):
|
||||||
conn = ConnectionMock()
|
conn = ConnectionMock()
|
||||||
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
|
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user