1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Compare commits

..

No commits in common. "master" and "1.0" have entirely different histories.
master ... 1.0

36 changed files with 252 additions and 1206 deletions

1
.gitignore vendored
View File

@ -4,4 +4,3 @@
build
*.avi
test_real.py
.vscode/settings.json

View File

@ -1,8 +1,14 @@
sudo: false
language: python
python:
- 3.6
- 3.8
virtualenv:
system_site_packages: true
matrix:
include:
- os: linux
python: 2.7
- os: linux
python: 3.4
addons:
apt:
@ -15,23 +21,17 @@ addons:
- libdbus-1-dev
- libdbus-glib-1-dev
- libgirepository-1.0-1
- libgirepository1.0-dev
- bluez
- python-dbus
- python-gi
- python3-dbus
- python3-gi
install:
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
- pip install codecov codacy-coverage pytest pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
- pip install --upgrade attrs
- pip install codecov nose-exclude gattlib pygatt gatt pexpect bluepy
env:
- READTHEDOCS=True
script:
- coverage run --omit="examples/*" --source=. -m pytest -v --ignore=examples --log-level=INFO tests
script: coverage run --source=. -m nose tests -v --exclude-dir=examples
after_success:
- coverage report -m
- coverage xml
- codecov
- python-codacy-coverage -r coverage.xml

View File

@ -1,10 +1,10 @@
# Python library to interact with Move Hub / PoweredUp Hubs
# Python library to interact with Move Hub
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/themes/boost)._
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/en-us/boost)._
In fact, Move Hub is just a Bluetooth hardware piece, and all manipulations with it are made by commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of the ways to issue these commands is to write Python program using this library.
In fact, Move Hub is just Bluetooth hardware, all manipulations are done with commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of ways to issue these commands is to write Python program using this library.
The best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
Best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
If you have Vernie assembled, you might run scripts from [`examples/vernie`](examples/vernie) directory.
@ -14,8 +14,6 @@ If you have Vernie assembled, you might run scripts from [`examples/vernie`](exa
[![Laser Engraver](http://img.youtube.com/vi/ZbKmqVBBMhM/0.jpg)](https://youtu.be/ZbKmqVBBMhM)
[![Color Sorter](http://img.youtube.com/vi/829RKT8v8M0/0.jpg)](https://youtu.be/829RKT8v8M0)
[![Face Tracker](http://img.youtube.com/vi/WUOa3j-6XfI/0.jpg)](https://youtu.be/WUOa3j-6XfI)
[![Color Pin Bot](http://img.youtube.com/vi/QY6nRYXQw_U/0.jpg)](https://youtu.be/QY6nRYXQw_U)
[![BB-8 Joystick](http://img.youtube.com/vi/55kE9I4IQSU/0.jpg)](https://youtu.be/55kE9I4IQSU)
## Features
@ -37,7 +35,7 @@ _Please note that this library requires one of Bluetooth backend libraries to be
Install library like this:
```bash
pip install -U pylgbst
pip install https://github.com/undera/pylgbst/archive/1.0.tar.gz
```
Then instantiate MoveHub object and start invoking its methods. Following is example to just print peripherals detected on Hub:
@ -55,13 +53,12 @@ Each peripheral kind has own methods to do actions and/or get sensor data. See [
## Bluetooth Backend Prerequisites
You have following options to install as Bluetooth backend (some of them might require `sudo` on Linux):
You have following options to install as Bluetooth backend:
- `pip install pygatt` - [pygatt](https://github.com/peplin/pygatt) lib, works on both Windows and Linux
- `pip install gatt` - [gatt](https://github.com/getsenic/gatt-python) lib, supports Linux, does not work on Windows
- `pip install gattlib` - [gattlib](https://bitbucket.org/OscarAcena/pygattlib) - supports Linux, does not work on Windows, requires `sudo`
- `pip install bluepy` - [bluepy](https://github.com/IanHarvey/bluepy) lib, supports Linux, including Raspbian, which allows connection to the hub from the Raspberry PI
- `pip install bleak` - [bleak](https://github.com/hbldh/bleak) lib, supports Linux/Windows/MacOS
Running on Windows requires [Bluegiga BLED112 Bluetooth Smart Dongle](https://www.silabs.com/products/wireless/bluetooth/bluetooth-low-energy-modules/bled112-bluetooth-smart-dongle) hardware piece, because no other hardware currently works on Windows with Python+BLE.
@ -70,25 +67,26 @@ _Please let author know if you have discovered any compatibility/preprequisite d
Depending on backend type, you might need Linux `sudo` to be used when running Python.
### Bluetooth Connection Options
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `get_connection_auto()` to attempt backend auto-detection
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
All the functions above have optional arguments to specify adapter name and MoveHub mac address. Please look function source code for details.
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst.hub import MoveHub
from pylgbst import get_connection_gatt
from pylgbst.comms.cgatt import GattConnection
conn = GattConnection("hci1")
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
hub = MoveHub(conn)
```
@ -119,4 +117,4 @@ Then push green button on MoveHub, so permanent BLE connection will be establish
- https://github.com/JorgePe/BOOSTreveng - initial source of protocol knowledge
- https://github.com/nathankellenicki/node-poweredup - JavaScript version of library
- https://github.com/spezifisch/sphero-python/blob/master/BB8joyDrive.py - example with another approach to bluetooth libs
- https://github.com/virantha/bricknil - for the lovers of async Python, alternative implementation of library to control PoweredUp Hubs

View File

@ -34,10 +34,3 @@ def callback(is_pressed):
hub = MoveHub()
hub.button.subscribe(callback)
```
The state for button has 3 possible values:
- `0` - not released
- `1` - pressed
- `2` - pressed
It is for now unknown why Hub always issues notification with `1` and immediately with `2`, after button is pressed.

View File

@ -1,40 +0,0 @@
### Advanced button
This example shows how you can add additional functionallity to the move hub button.
It adds three new actions which you can use instead of the standard subscription to a button press:
- Click - a single quick up/down press
- Double click - a double up/down press, second click must occur within .5 secs of first one
- Long press - a press and hold on the button for > .7 secs
```python
from pylgbst.hub import MoveHub
from advancedbutton import AdvancedButton
import time
hub = MoveHub()
b = AdvancedButton(hub)
def clicked():
print("button clicked")
def pressed():
print("button pressed")
def doubleclicked():
print("button double clicked")
b.click.subscribe(clicked)
b.double_click.subscribe(doubleclicked)
b.long_press.subscribe(pressed)
time.sleep(120)
```
You can alter the timings using the two constants `DOUBLE_CLICK_TIME` and `LONG_PRESS_TIME`

View File

@ -1,73 +0,0 @@
import time
import threading
DOUBLE_CLICK_TIME = 0.5
LONG_PRESS_TIME = 0.7
class AdvancedButton:
def __init__(self, hub):
self.state = 0
self.last_pressed = 0
self.press_time = None
self.hub = hub
self.hub.button.subscribe(self.pressed)
self.click = ButtonAction()
self.double_click = ButtonAction()
self.long_press = ButtonAction()
def pressed(self, state):
if state == 2:
return
press_time = time.time()
if state == 1:
self.state = 1
self.press_time = press_time
return
if state == 0 and self.state == 1:
self.state = 0
press_duration = press_time - self.press_time
else:
return
if press_duration > LONG_PRESS_TIME:
# long press
self.long_press.notify()
return
if (press_time - self.last_pressed) < DOUBLE_CLICK_TIME:
# double click
self.last_pressed = 0
self.double_click.notify()
return
# could be first of a double click, could be single click
self.last_pressed = press_time
def timeout():
time.sleep(DOUBLE_CLICK_TIME)
if self.last_pressed == press_time:
# not clicked while sleeping
# single click
self.click.notify()
threading.Thread(target=timeout).start()
class ButtonAction:
def __init__(self):
self.subscribers = set()
def subscribe(self, callback):
self.subscribers.add(callback)
def unsubscribe(self, callback=None):
if callback in self.subscribers:
self.subscribers.remove(callback)
def notify(self):
for subscriber in self.subscribers.copy():
subscriber()

View File

@ -2,13 +2,12 @@ import logging
import time
from collections import Counter
from pylgbst.hub import MoveHub, COLOR_NONE, COLOR_BLACK, COLORS, COLOR_CYAN, COLOR_BLUE, COLOR_RED, COLOR_YELLOW, \
COLOR_WHITE
from pylgbst.hub import MoveHub, COLOR_NONE, COLOR_BLACK, COLORS, COLOR_CYAN, COLOR_BLUE, COLOR_RED
from pylgbst.peripherals import EncodedMotor
class Automata(object):
BASE_SPEED = 1.0
BASE_SPEED = 0.5
def __init__(self):
super(Automata, self).__init__()
@ -32,48 +31,52 @@ class Automata(object):
def get_color(self):
res = self._sensor
self._sensor = []
logging.debug("Sensor data: %s", res)
logging.info("Sensor data: %s", res)
cnts = Counter([x[0] for x in res])
clr = cnts.most_common(1)[0][0] if cnts else COLOR_NONE
if clr == COLOR_CYAN:
clr = COLOR_BLUE
self.__hub.led.set_color(clr)
return clr
def left(self):
self.__hub.motor_AB.angled(290, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_FLOAT)
self.__hub.motor_A.angled(270, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_HOLD)
time.sleep(0.1)
self.__hub.motor_AB.stop()
self.__hub.motor_A.stop()
def right(self):
self.__hub.motor_AB.angled(270, -self.BASE_SPEED, self.BASE_SPEED, end_state=EncodedMotor.END_STATE_FLOAT)
self.__hub.motor_B.angled(-320, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_HOLD)
time.sleep(0.1)
self.__hub.motor_AB.stop()
self.__hub.motor_B.stop()
def forward(self):
self.__hub.motor_AB.angled(500, self.BASE_SPEED)
self.__hub.motor_AB.angled(450, self.BASE_SPEED)
def backward(self):
self.__hub.motor_AB.angled(500, -self.BASE_SPEED)
self.__hub.motor_AB.angled(-450, self.BASE_SPEED)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
bot = Automata()
color = None
bot.forward()
bot.right()
bot.forward()
bot.left()
bot.forward()
exit(0)
color = COLOR_NONE
cmds = []
while color != COLOR_NONE:
while color != COLOR_RED:
bot.feed_tape()
color = bot.get_color()
logging.warning(COLORS[color])
cmds.append(COLORS[color])
if color == COLOR_BLUE:
bot.forward()
elif color == COLOR_RED:
bot.backward()
elif color == COLOR_YELLOW:
bot.left()
elif color == COLOR_WHITE:
bot.right()
exp = ['BLUE', 'BLUE', 'BLUE', 'WHITE', 'BLUE', 'BLUE', 'WHITE', 'BLUE', 'WHITE', 'YELLOW', 'BLUE', 'BLUE', 'BLUE',
'BLUE', 'YELLOW', 'WHITE', 'RED']
logging.info("Exp: %s", exp)
logging.info("Act: %s", cmds)
assert exp == cmds

View File

@ -1,34 +0,0 @@
from examples.automata import Automata
from pylgbst.peripherals import *
# this program is written by Sofia in early 2019
def action_by_color(color):
if color == COLOR_BLUE:
bot.forward()
if color == COLOR_RED:
bot.backward()
if color == COLOR_WHITE:
bot.right()
if color == COLOR_YELLOW:
bot.left()
def read_color():
bot.feed_tape()
color = bot.get_color()
print(COLORS[color])
return color
bot = Automata()
number = 0
color = None
while color != COLOR_NONE:
color = read_color()
number = number + 1
action_by_color(color)
print(number)

View File

@ -1,57 +0,0 @@
import logging
import sys
import time
from examples.bb8joystick import joystick
from examples.bb8joystick.bb8 import BB8
from examples.bb8joystick.joystick import Joystick
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING)
bb8 = BB8()
joystick = Joystick()
def set_bb_color(flag):
if flag:
bb8.color(255, 255, 255)
else:
bb8.color(0, 0, 0)
def set_heading(angle):
a = int(angle) % 360
print("Angle", a)
bb8.heading(a)
def roll(speed, direction):
print("Roll", speed, direction)
if speed < 3:
speed = 0
bb8.roll(speed, direction)
def stop(state):
if state:
print("Stop")
bb8.roll(0, 0)
else:
print("Stabilize")
bb8.color(255, 0, 255)
bb8.stabilize()
bb8.color(0, 0, 0)
try:
joystick.on_button(set_bb_color)
joystick.on_button(stop)
joystick.on_rotation(set_heading)
joystick.on_joystick(roll)
print("All set up")
time.sleep(300)
finally:
bb8.disconnect()
joystick.disconnect()

View File

@ -1,113 +0,0 @@
import asyncio
import time
import spheropy
# noinspection PyProtectedMember
from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE, _DEVICE_ID_SPHERO
class BLEInterfaceGattool(spheropy.BleInterface):
def _find_adapter(self):
adapter = spheropy.pygatt.GATTToolBackend()
adapter.start()
adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT
self._adapter = adapter
self._adapter_type = adapter_type
return True
class _SpheroImproved(spheropy.Sphero):
async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False,
num_retry_attempts=1):
gattool = BLEInterfaceGattool(search_name)
return await
super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)
async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394
command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE,
command_id=0x22,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[(sleeptime >> 8), (sleeptime & 0xff), 0],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)
return await
self._send_command(command, response_timeout_in_seconds)
async def set_rotation_rate(self, rate, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py
command = _ClientCommandPacket(device_id=_DEVICE_ID_SPHERO,
command_id=0x03,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[rate],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)
return await
self._send_command(command, response_timeout_in_seconds)
class BB8(object):
def __init__(self, name="BB-CC13"):
self._heading = 0
# marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
print("Started to wake up BB-8...")
self._sphero = _SpheroImproved()
self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name))
self._loop.run_until_complete(self._sphero.set_stabilization(True))
self._loop.run_until_complete(self._sphero.set_rotation_rate(1))
self.color(0, 0xFF, 0)
self.stabilize()
print("BB-8 is ready for commands")
def disconnect(self):
print("BB8 enters sleep")
self._loop.run_until_complete(self._sphero.sleep(0))
self._sphero.disconnect()
def color(self, red, green, blue):
self._wait_loop()
self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue, wait_for_response=False))
def heading(self, heading):
self._wait_loop()
heading = 359 - heading
self._heading = heading
# self._loop.run_until_complete(self._sphero.set_heading(359 - heading))
self._loop.run_until_complete(self._sphero.roll(1, heading, spheropy.RollMode.IN_PLACE_ROTATE))
def roll(self, speed=10, direction=0):
self._wait_loop()
direction += self._heading
direction %= 360
speed = int(255 * speed / 10)
speed *= 0.75 # throttle down a bit
self._loop.run_until_complete(self._sphero.roll(int(speed), direction))
def stop(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.roll(0, 0))
def stabilize(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.self_level())
def _wait_loop(self):
while self._loop.is_running():
time.sleep(0.001)
if __name__ == '__main__':
bb8 = BB8()
bb8.color(255, 0, 0)
time.sleep(1)
bb8.color(0, 255, 0)
time.sleep(1)
bb8.color(0, 0, 255)
time.sleep(1)

View File

@ -1,145 +0,0 @@
import logging
import math
import sys
import time
from pylgbst.hub import MoveHub
def _clamp(minvalue, value, maxvalue):
return max(minvalue, min(value, maxvalue))
class Joystick(object):
RANGE_A = 40
RANGE_C = 30
def __init__(self):
super(Joystick, self).__init__()
self._on_joystick = set()
self.button_pressed = False
self._angle_A = 0
self.angle_B = 0
self._angle_C = 0
print("Starting search for Joystick...")
self._hub = MoveHub()
self._reset_sensors()
self._hub.button.subscribe(self._on_btn)
self._on_motor_a(self._on_a)
self.on_rotation(self._on_b)
self._on_motor_c(self._on_c)
print("Joystick is ready")
def disconnect(self):
print("Joystick disconnects")
self._hub.disconnect()
def _reset_sensors(self):
logging.info("Resetting motor encoders")
self._hub.motor_A.preset_encoder()
self._hub.motor_B.preset_encoder()
self._hub.motor_external.preset_encoder()
def on_button(self, callback):
"""
Notifies about button state change. ``callback(state)`` gets single bool parameter
"""
def wrapper(state):
if state in (0, 1):
callback(bool(state))
self._hub.button.subscribe(wrapper)
def _on_motor_a(self, callback):
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
angle = _clamp(-self.RANGE_A, angle, self.RANGE_A)
callback(angle)
self._hub.motor_A.subscribe(wrapper)
def on_rotation(self, callback):
"""
Notifies about B motor rotation. ``callback(state)`` gets single int parameter from 0 to 359
"""
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
val = angle % 360
val = val if val >= 0 else 360 - val - 1
val = 359 - val
callback(val)
self._hub.motor_B.subscribe(wrapper)
def _on_motor_c(self, callback):
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
angle = _clamp(-self.RANGE_C, angle, self.RANGE_C)
callback(angle)
self._hub.motor_external.subscribe(wrapper)
def _on_btn(self, state):
self.button_pressed = bool(state)
def _on_a(self, angle):
logging.debug("A rotated: %s", angle)
self._angle_A = angle
self._calc_joystick()
def _on_b(self, angle):
logging.debug("B rotated: %s", angle)
self.angle_B = angle
def _on_c(self, angle):
logging.debug("C rotated: %s", angle)
self._angle_C = angle
self._calc_joystick()
def on_joystick(self, callback):
"""
Notifies about joystick change. ``callback(speed, direction)`` gets parameters:
- ``speed`` - int value from 0 to 10
- ``direction`` - int value from 0 to 359
"""
self._on_joystick.add(callback)
def _calc_joystick(self):
norm_a = self._angle_A / self.RANGE_A
norm_b = self._angle_C / self.RANGE_C
logging.debug("%s / %s", self._angle_A, self._angle_C)
logging.debug("%s / %s", norm_a, norm_b)
speed = math.sqrt(norm_a ** 2 + norm_b ** 2) # / math.sqrt(2)
speed = _clamp(-1.0, speed, 1.0)
maxsize = sys.maxsize if norm_a >= 0 else -sys.maxsize
direction = math.atan(norm_a / norm_b if norm_b else maxsize)
direction *= 180 / math.pi
if norm_a >= 0 and norm_b >= 0:
direction = 90 - direction
elif norm_a < 0 and norm_b >= 0:
direction = 90 - direction
elif norm_a < 0 and norm_b < 0:
direction = 270 - direction
else:
direction = 270 - direction
for callback in self._on_joystick:
callback(int(round(10 * speed)), int(direction))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
stick = Joystick()
stick.on_button(lambda x: logging.info("Button: %s" % x))
stick.on_rotation(lambda x: logging.info("Rotation: %s" % x))
stick.on_joystick(lambda speed, head: logging.info("Speed: %s, Direction: %s" % (speed, head)))
time.sleep(100)

View File

@ -1,18 +0,0 @@
import time
from examples.bb8joystick import BB8, Joystick
bb8 = BB8()
def button(p):
print(p)
if p == True:
bb8.color(255, 255, 255)
else:
bb8.color(0, 0, 0)
deck = Joystick()
deck.on_button(button)
time.sleep(60)

View File

@ -3,7 +3,8 @@ import time
from time import sleep
from pylgbst import *
from pylgbst.hub import MoveHub
from pylgbst.comms import DebugServerConnection
from pylgbst.hub import MoveHub, math
from pylgbst.peripherals import EncodedMotor, TiltSensor, Current, Voltage, COLORS, COLOR_BLACK
log = logging.getLogger("demo")
@ -12,12 +13,7 @@ log = logging.getLogger("demo")
def demo_led_colors(movehub):
# LED colors demo
log.info("LED colors demo")
# We get a response with payload and port, not x and y here...
def colour_callback(named):
log.info("LED Color callback: %s", named)
movehub.led.subscribe(colour_callback)
movehub.color_distance_sensor.subscribe(lambda x, y: None)
for color in list(COLORS.keys())[1:] + [COLOR_BLACK]:
log.info("Setting LED color to: %s", COLORS[color])
movehub.led.set_color(color)
@ -115,11 +111,11 @@ def demo_color_sensor(movehub):
demo_color_sensor.cnt += 1
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, COLORS[color], distance)
movehub.vision_sensor.subscribe(callback)
movehub.color_distance_sensor.subscribe(callback)
while demo_color_sensor.cnt < limit:
time.sleep(1)
movehub.vision_sensor.unsubscribe(callback)
movehub.color_distance_sensor.unsubscribe(callback)
def demo_motor_sensors(movehub):
@ -152,6 +148,7 @@ def demo_motor_sensors(movehub):
movehub.motor_B.unsubscribe(callback_b)
if movehub.motor_external is not None:
demo_motor_sensors.states[movehub.motor_external] = None
movehub.motor_external.unsubscribe(callback_e)
@ -173,92 +170,20 @@ def demo_voltage(movehub):
def demo_all(movehub):
demo_voltage(movehub)
demo_led_colors(movehub)
demo_motors_timed(movehub)
demo_motors_angled(movehub)
demo_port_cd_motor(movehub)
demo_led_colors(movehub)
demo_tilt_sensor_simple(movehub)
demo_tilt_sensor_precise(movehub)
demo_color_sensor(movehub)
demo_motor_sensors(movehub)
demo_voltage(movehub)
DEMO_CHOICES = {
'all': demo_all,
'voltage': demo_voltage,
'led_colors': demo_led_colors,
'motors_timed': demo_motors_timed,
'motors_angled': demo_motors_angled,
'port_cd_motor': demo_port_cd_motor,
'tilt_sensor': demo_tilt_sensor_simple,
'tilt_sensor_precise': demo_tilt_sensor_precise,
'color_sensor': demo_color_sensor,
'motor_sensors': demo_motor_sensors,
}
def get_options():
import argparse
arg_parser = argparse.ArgumentParser(
description='Demonstrate move-hub communications',
)
arg_parser.add_argument(
'-c', '--connection',
default='auto://',
help='''Specify connection URL to use, `protocol://mac?param=X` with protocol in:
"gatt","pygatt","gattlib","gattool", "bluepy","bluegiga"'''
)
arg_parser.add_argument(
'-d', '--demo',
default='all',
choices=sorted(DEMO_CHOICES.keys()),
help="Run a particular demo, default all"
)
return arg_parser
def connection_from_url(url):
import pylgbst
if url == 'auto://':
return None
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
parsed = urlparse(url)
name = 'get_connection_%s' % parsed.scheme
factory = getattr(pylgbst, name, None)
if not factory:
msg = "Unrecognised URL scheme/protocol, expect a get_connection_<protocol> in pylgbst: %s"
raise ValueError(msg % parsed.protocol)
params = {}
if parsed.netloc.strip():
params['hub_mac'] = parsed.netloc
for key, value in parse_qs(parsed.query).items():
if len(value) == 1:
params[key] = value[0]
else:
params[key] = value
return factory(
**params
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
parameters['connection'] = connection
except ValueError as err:
parser.error(err.args[0])
logging.basicConfig(level=logging.INFO)
hub = MoveHub(**parameters)
try:
demo = DEMO_CHOICES[options.demo]
demo(hub)
finally:
hub.disconnect()
hub = MoveHub()
demo_all(hub)
hub.disconnect()

View File

@ -1,12 +1,4 @@
[
[
125,
64,
64
],
[
145,
255,
250
]
[125, 64, 64],
[145, 255, 250]
]

View File

@ -85,7 +85,7 @@ try:
sa = round(c + b / divider, 1)
sb = round(c - b / divider, 1)
logging.info("SpeedA=%s, SpeedB=%s", sa, sb)
robot.motor_AB.start_speed(sa, sb)
robot.motor_AB.start_power(sa, sb)
# time.sleep(0.5)
finally:
robot.motor_AB.stop()

View File

@ -1,7 +1,5 @@
import logging
from examples.vernie import Vernie
from pylgbst.peripherals import VisionSensor
from . import *
logging.basicConfig(level=logging.INFO)
@ -12,8 +10,7 @@ criterion = min
cur_luminosity = 0
def on_change_lum(lumn, unknown):
del unknown
def on_change_lum(lumn):
global cur_luminosity
cur_luminosity = lumn

View File

@ -1,5 +1,5 @@
from pylgbst.peripherals import COLOR_GREEN, COLOR_NONE
from vernie import *
from pylgbst.constants import COLOR_GREEN, COLOR_NONE
from . import *
robot = Vernie()
running = True
@ -11,7 +11,7 @@ def callback(color, distance):
secs = (10 - distance + 1) / 10.0
print("Distance is %.1f inches, I'm running back with %s%% speed!" % (distance, int(speed * 100)))
if speed <= 1:
robot.motor_AB.timed(secs / 1, -speed)
robot.motor_AB.timed(secs / 1, -speed, is_async=True)
robot.say("Ouch")
@ -31,3 +31,6 @@ while running:
robot.vision_sensor.unsubscribe(callback)
robot.button.unsubscribe(on_btn)
robot.led.set_color(COLOR_NONE)
while robot.led.in_progress():
time.sleep(1)

View File

@ -6,57 +6,42 @@ from pylgbst.comms import DebugServer
log = logging.getLogger('pylgbst')
def get_connection_bluegiga(controller=None, hub_mac=None, hub_name=None):
def get_connection_bluegiga(controller=None, hub_mac=None):
del controller # to prevent code analysis warning
from pylgbst.comms.cpygatt import BlueGigaConnection
return BlueGigaConnection().connect(hub_mac, hub_name)
return BlueGigaConnection().connect(hub_mac)
def get_connection_gattool(controller='hci0', hub_mac=None, hub_name=None):
def get_connection_gattool(controller='hci0', hub_mac=None):
from pylgbst.comms.cpygatt import GattoolConnection
return GattoolConnection(controller).connect(hub_mac, hub_name)
return GattoolConnection(controller).connect(hub_mac)
def get_connection_gatt(controller='hci0', hub_mac=None, hub_name=None):
def get_connection_gatt(controller='hci0', hub_mac=None):
from pylgbst.comms.cgatt import GattConnection
return GattConnection(controller).connect(hub_mac, hub_name)
return GattConnection(controller).connect(hub_mac)
def get_connection_gattlib(controller='hci0', hub_mac=None, hub_name=None):
def get_connection_gattlib(controller='hci0', hub_mac=None):
from pylgbst.comms.cgattlib import GattLibConnection
return GattLibConnection(controller).connect(hub_mac, hub_name)
return GattLibConnection(controller).connect(hub_mac)
def get_connection_bluepy(controller='hci0', hub_mac=None, hub_name=None):
def get_connection_bluepy(controller='hci0', hub_mac=None):
from pylgbst.comms.cbluepy import BluepyConnection
return BluepyConnection(controller).connect(hub_mac, hub_name)
return BluepyConnection(controller).connect(hub_mac)
def get_connection_bleak(controller='hci0', hub_mac=None, hub_name=None):
"""
Return connection based with Bleak API as an endpoint.
:param controller: Not used, kept for compatibility with others.
:param hub_mac: Optional Lego HUB MAC to connect to.
:return: Driver object.
"""
del controller # to prevent code analysis warning
from pylgbst.comms.cbleak import BleakDriver
return BleakDriver(hub_mac, hub_name)
def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
def get_connection_auto(controller='hci0', hub_mac=None):
fns = [
get_connection_bluepy,
get_connection_bluegiga,
get_connection_gatt,
get_connection_bleak,
get_connection_gattool,
get_connection_gattlib,
]
@ -65,7 +50,7 @@ def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
for fn in fns:
try:
logging.info("Trying %s", fn.__name__)
return fn(controller, hub_mac, hub_name)
return fn(controller, hub_mac)
except KeyboardInterrupt:
raise
except BaseException:

View File

@ -15,6 +15,7 @@ from pylgbst.utilities import str2hex
log = logging.getLogger('comms')
LEGO_MOVE_HUB = "LEGO Move Hub"
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
@ -45,22 +46,6 @@ class Connection(object):
def enable_notifications(self):
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
def _is_device_matched(self, address, dev_name, hub_mac, find_name):
assert hub_mac or find_name, 'You have to provide either hub_mac or hub_name in connection options'
log.debug("Checking device: %s, MAC: %s", dev_name, address)
matched = False
if address != "00:00:00:00:00:00":
if hub_mac:
if hub_mac.lower() == address.lower():
matched = True
elif dev_name == find_name:
matched = True
if matched:
log.info("Found %s at %s", dev_name, address)
return matched
class DebugServer(object):
"""

View File

@ -1,216 +0,0 @@
import asyncio
import logging
import queue
import threading
import time
from bleak import BleakClient, discover
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
log = logging.getLogger('comms-bleak')
# Queues to handle request / responses. Acts as a buffer between API and async BLE driver
resp_queue = queue.Queue()
req_queue = queue.Queue()
class BleakDriver(object):
"""Driver that provides interface between API and Bleak."""
def __init__(self, hub_mac=None, hub_name=None):
"""
Initialize new object of Bleak Driver class.
:param hub_mac: Optional Lego HUB MAC to connect to.
"""
self.hub_mac = hub_mac
self.hub_name = hub_name
self._handler = None
self._abort = False
self._connection_thread = None
self._processing_thread = None
def set_notify_handler(self, handler):
"""
Set handler function used to communicate with an API.
:param handler: Handler function called by driver when received data
:return: None
"""
self._handler = handler
def enable_notifications(self):
"""
Enable notifications, in our cases starts communication threads.
We cannot do this earlier, because API need to fist set notification handler.
:return: None
"""
self._connection_thread = threading.Thread(target=lambda: asyncio.run(self._bleak_thread()))
self._connection_thread.daemon = True
self._connection_thread.start()
self._processing_thread = threading.Thread(target=self._processing)
self._processing_thread.daemon = True
self._processing_thread.start()
async def _bleak_thread(self):
bleak = BleakConnection()
await bleak.connect(self.hub_mac, self.hub_name)
await bleak.set_notify_handler(self._safe_handler)
# After connecting, need to send any data or hub will drop the connection,
# below command is Advertising name request update
await bleak.write_char(MOVE_HUB_HW_UUID_CHAR, bytearray([0x05, 0x00, 0x01, 0x01, 0x05]))
while not self._abort:
await asyncio.sleep(0.1)
if req_queue.qsize() != 0:
data = req_queue.get()
await bleak.write(data[0], data[1])
logging.info("Communications thread has exited")
@staticmethod
def _safe_handler(handler, data):
resp_queue.put((handler, data))
def _processing(self):
while not self._abort:
if resp_queue.qsize() != 0:
msg = resp_queue.get()
self._handler(msg[0], bytes(msg[1]))
time.sleep(0.01)
logging.info("Processing thread has exited")
def write(self, handle, data):
"""
Send data to given handle number.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:raises ConnectionError" When internal threads are not working
:return: None
"""
if not self._connection_thread.is_alive() or not self._processing_thread.is_alive():
raise ConnectionError('Something went wrong, communication threads not functioning.')
req_queue.put((handle, data))
def disconnect(self):
"""
Disconnect and stops communication threads.
:return: None
"""
self._abort = True
def is_alive(self):
"""
Indicate whether driver is functioning or not.
:return: True if driver is functioning; False otherwise.
"""
if self._connection_thread is not None and self._processing_thread is not None:
return self._connection_thread.is_alive() and self._processing_thread.is_alive()
else:
return False
class BleakConnection(Connection):
"""Bleak driver for communicating with BLE device."""
def __init__(self):
"""Initialize new instance of BleakConnection class."""
Connection.__init__(self)
self.loop = asyncio.get_event_loop()
self._device = None
self._client = None
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.WARNING)
logging.getLogger('bleak.backends.bluezdbus.client').setLevel(logging.WARNING)
async def connect(self, hub_mac=None, hub_name=None):
"""
Connect to device.
:param hub_mac: Optional Lego HUB MAC to connect to.
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
:return: None
"""
log.info("Discovering devices... Press green button on Hub")
for i in range(0, 30):
devices = await discover(timeout=1)
log.debug("Devices: %s", devices)
for dev in devices:
log.debug(dev)
address = dev.address
name = dev.name
if self._is_device_matched(address, name, hub_mac, hub_name):
log.info('Device matched: %r', dev)
self._device = dev
break
else:
continue
break
else:
raise ConnectionError('Device not found.')
self._client = BleakClient(self._device.address, self.loop)
status = await self._client.connect()
log.debug('Connection status: {status}'.format(status=status))
async def write(self, handle, data):
"""
Send data to given handle number.
If handle cannot be found in service description, hardcoded LEGO uuid will be used.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:return: None
"""
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
desc = self._client.services.get_descriptor(handle)
if not isinstance(data, bytearray):
data = bytearray(data)
if desc is None:
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
else:
await self._client.write_gatt_char(desc.characteristic_uuid, data)
async def write_char(self, characteristic_uuid, data):
"""
Send data to given handle number.
:param characteristic_uuid: Characteristic uuid used to send data
:param data: data to send
:return: None
"""
await self._client.write_gatt_char(characteristic_uuid, data)
async def set_notify_handler(self, handler):
"""
Set notification handler.
:param handler: Handle function to be called when receive any data.
:return: None
"""
def c(handle, data):
log.debug('Response: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
handler(handle, data)
await self._client.start_notify(MOVE_HUB_HW_UUID_CHAR, c)
def is_alive(self):
"""
To keep compatibility with the driver interface.
This method does nothing.
:return: None.
"""
pass

View File

@ -4,7 +4,7 @@ from threading import Thread, Event
from bluepy import btle
from pylgbst.comms import Connection
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.utilities import str2hex, queue
log = logging.getLogger('comms-bluepy')
@ -70,6 +70,7 @@ class BluepyThreadedPeripheral(object):
finally:
self._peripheral.disconnect()
def write(self, handle, data):
self._call_queue.put(lambda: self._peripheral.writeCharacteristic(handle, data))
@ -87,7 +88,7 @@ class BluepyConnection(Connection):
self._peripheral = None # :type BluepyThreadedPeripheral
self._controller = controller
def connect(self, hub_mac=None, hub_name=None):
def connect(self, hub_mac=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
scanner = btle.Scanner()
@ -98,11 +99,13 @@ class BluepyConnection(Connection):
for dev in devices:
address = dev.addr
address_type = dev.addrType
addressType = dev.addrType
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
log.debug("Found dev, name: {}, address: {}".format(name, address))
if self._is_device_matched(address, name, hub_mac, hub_name):
self._peripheral = BluepyThreadedPeripheral(address, address_type, self._controller)
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._peripheral = BluepyThreadedPeripheral(address, addressType, self._controller)
break
return self
@ -119,3 +122,4 @@ class BluepyConnection(Connection):
def is_alive(self):
return True

View File

@ -5,7 +5,7 @@ from time import sleep
import gatt
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
MOVE_HUB_HARDWARE_HANDLE
from pylgbst.utilities import str2hex
@ -88,7 +88,7 @@ class GattConnection(Connection):
self._manager_thread.setDaemon(True)
log.debug('Starting DeviceManager...')
def connect(self, hub_mac=None, hub_name=None):
def connect(self, hub_mac=None):
self._manager_thread.start()
self._manager.start_discovery()
@ -100,7 +100,8 @@ class GattConnection(Connection):
for dev in devices:
address = dev.mac_address
name = dev.alias()
if self._is_device_matched(address, name, hub_mac, hub_name):
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._device = CustomDevice(address, self._manager)
break

View File

@ -5,7 +5,7 @@ from threading import Thread
from gattlib import DiscoveryService, GATTRequester
from pylgbst.comms import Connection
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.utilities import queue, str2hex
log = logging.getLogger('comms-gattlib')
@ -61,7 +61,7 @@ class GattLibConnection(Connection):
self.requester = None
self._iface = bt_iface_name
def connect(self, hub_mac=None, hub_name=None):
def connect(self, hub_mac=None):
service = DiscoveryService(self._iface)
while not self.requester:
@ -70,7 +70,8 @@ class GattLibConnection(Connection):
log.debug("Devices: %s", devices)
for address, name in devices.items():
if self._is_device_matched(address, name, hub_mac, hub_name):
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self.requester = Requester(address, True, self._iface)
break

View File

@ -2,7 +2,7 @@ import logging
import pygatt
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_CHAR
from pylgbst.utilities import str2hex
log = logging.getLogger('comms-pygatt')
@ -20,21 +20,21 @@ class GattoolConnection(Connection):
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
self._conn_hnd = None
def connect(self, hub_mac=None, hub_name=None):
def connect(self, hub_mac=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
adapter = self.backend()
adapter.start() # enable or disable restart? What's the best?
adapter.start()
while not self._conn_hnd:
log.info("Discovering devices...")
devices = adapter.scan(1)
log.debug("Devices: %s", devices)
# Pass each device found to _is_device_matched( ) to see if it the device we want
for dev in devices:
address = dev['address']
name = dev['name']
if self._is_device_matched(address, name, hub_mac, hub_name):
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._conn_hnd = adapter.connect(address)
break

View File

@ -4,8 +4,8 @@ import time
from pylgbst import get_connection_auto
from pylgbst.messages import *
from pylgbst.peripherals import *
from pylgbst.utilities import queue
from pylgbst.utilities import str2hex, usbyte, ushort
from pylgbst.utilities import queue
log = logging.getLogger('hub')
@ -43,7 +43,7 @@ class Hub(object):
self.add_message_handler(MsgHubAction, self._handle_action)
if not connection:
connection = get_connection_auto() # TODO: how to identify the hub?
connection = get_connection_auto()
self.connection = connection
self.connection.set_notify_handler(self._notify)
self.connection.enable_notifications()
@ -61,21 +61,19 @@ class Hub(object):
:rtype: pylgbst.messages.UpstreamMsg
"""
log.debug("Send message: %r", msg)
msgbytes = msg.bytes()
self.connection.write(self.HUB_HARDWARE_HANDLE, msg.bytes())
if msg.needs_reply:
with self._sync_lock:
assert not self._sync_request, "Pending request %r while trying to put %r" % (self._sync_request, msg)
self._sync_request = msg
log.debug("Waiting for sync reply to %r...", msg)
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
resp = self._sync_replies.get()
log.debug("Fetched sync reply: %r", resp)
if isinstance(resp, MsgGenericError):
raise RuntimeError(resp.message())
return resp
else:
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
return None
def _notify(self, handle, data):
@ -137,7 +135,7 @@ class Hub(object):
if dev_type in PERIPHERAL_TYPES:
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
else:
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
self.peripherals[port] = Peripheral(self, port)
log.info("Attached peripheral: %s", self.peripherals[msg.port])
@ -184,25 +182,19 @@ class MoveHub(Hub):
:type motor_external: EncodedMotor
"""
DEFAULT_NAME = "LEGO Move Hub"
# PORTS
PORT_A = 0x00
PORT_B = 0x01
PORT_C = 0x02
PORT_D = 0x03
PORT_AB = 0x10
PORT_C = 0x01
PORT_D = 0x02
PORT_LED = 0x32
PORT_A = 0x37
PORT_B = 0x38
PORT_AB = 0x39
PORT_TILT_SENSOR = 0x3A
PORT_CURRENT = 0x3B
PORT_VOLTAGE = 0x3C
# noinspection PyTypeChecker
def __init__(self, connection=None):
self._comm_lock = threading.RLock()
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(MoveHub, self).__init__(connection)
self.info = {}
@ -227,7 +219,7 @@ class MoveHub(Hub):
if not get_dev_set:
get_dev_set = lambda: (self.motor_A, self.motor_B, self.motor_AB, self.led, self.tilt_sensor,
self.current, self.voltage)
for num in range(0, 100):
for num in range(0, 60):
devices = get_dev_set()
if all(devices):
log.debug("All devices are present: %s", devices)
@ -253,40 +245,29 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def _handle_device_change(self, msg):
with self._comm_lock:
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor \
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
class TrainHub(Hub):
DEFAULT_NAME = 'TrainHub'
def __init__(self, connection=None):
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(TrainHub, self).__init__(connection)
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]

View File

@ -18,15 +18,14 @@ class Message(object):
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
"""
msglen = len(self.payload) + 3
assert msglen < 127, "TODO: handle longer messages with 2-byte len"
assert msglen < 127, "TODO: handle logner messages with 2-byte len"
return pack("<B", msglen) + pack("<B", self.hub_id) + pack("<B", self.TYPE) + self.payload
def __repr__(self):
# assert self.bytes() # to trigger any field changes
data = self.__dict__
data = {x: (str2hex(y) if isinstance(y, bytes) else y)
for x, y in data.items()
if x not in ('hub_id',)}
if x not in ('hub_id', 'needs_reply')}
return self.__class__.__name__ + "(%s)" % data
@ -59,7 +58,7 @@ class UpstreamMsg(Message):
assert hub_id == 0
msg_type = msg._byte()
assert cls.TYPE == msg_type, "Message type does not match: %x!=%x" % (cls.TYPE, msg_type)
assert isinstance(msg.payload, (bytes, bytearray))
assert isinstance(msg.payload, bytes)
return msg
def __shift(self, vtype, vlen):
@ -126,7 +125,7 @@ class MsgHubProperties(DownstreamMsg, UpstreamMsg):
self.parameters = parameters
def bytes(self):
if self.operation in (self.UPD_REQUEST, self.UPD_ENABLE):
if self.operation == self.UPD_REQUEST:
self.needs_reply = True
self.payload = pack("<B", self.property) + pack("<B", self.operation) + self.parameters
return super(MsgHubProperties, self).bytes()
@ -173,8 +172,7 @@ class MsgHubAction(DownstreamMsg, UpstreamMsg):
return super(MsgHubAction, self).bytes()
def is_reply(self, msg):
if not isinstance(msg, MsgHubAction):
raise TypeError("Unexpected message type: %s" % (msg.__class__,))
assert isinstance(msg, MsgHubAction)
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
return True

View File

@ -19,7 +19,7 @@ COLOR_LIGHTBLUE = 0x04
COLOR_CYAN = 0x05
COLOR_GREEN = 0x06
COLOR_YELLOW = 0x07
COLOR_ORANGE = 0x08
COLOR_ORANGE = 0x09
COLOR_RED = 0x09
COLOR_WHITE = 0x0a
COLOR_NONE = 0xFF
@ -126,7 +126,7 @@ class Peripheral(object):
self.set_port_mode(self._port_mode.mode, False)
def _notify_subscribers(self, *args, **kwargs):
for subscriber in self._subscribers.copy():
for subscriber in self._subscribers:
subscriber(*args, **kwargs)
return args
@ -232,16 +232,10 @@ class LEDRGB(Peripheral):
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, payload)
self._send_output(msg)
def _decode_port_data(self, msg):
if len(msg.payload) == 3:
return usbyte(msg.payload, 0), usbyte(msg.payload, 1), usbyte(msg.payload, 2),
else:
return usbyte(msg.payload, 0),
class Motor(Peripheral):
SUBCMD_START_POWER = 0x01
SUBCMD_START_POWER_GROUPED = 0x02
# SUBCMD_START_POWER = 0x02
SUBCMD_SET_ACC_TIME = 0x05
SUBCMD_SET_DEC_TIME = 0x06
SUBCMD_START_SPEED = 0x07
@ -254,10 +248,10 @@ class Motor(Peripheral):
END_STATE_FLOAT = 0
def _speed_abs(self, relative):
if relative == Motor.END_STATE_BRAKE or relative == Motor.END_STATE_HOLD:
if relative is None:
# special value for BRAKE
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
return relative
return 127
if relative < -1:
log.warning("Speed cannot be less than -1")
@ -271,6 +265,9 @@ class Motor(Peripheral):
return int(absolute)
def _write_direct_mode(self, subcmd, params):
if self.virtual_ports:
subcmd += 1 # de-facto rule
params = pack("<B", subcmd) + params
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
self._send_output(msg)
@ -282,27 +279,22 @@ class Motor(Peripheral):
msg = MsgPortOutput(self.port, subcmd, params)
self._send_output(msg)
def start_power(self, power_primary=1.0, power_secondary=None):
def start_power(self, speed_primary=1.0, speed_secondary=None):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
"""
if power_secondary is None:
power_secondary = power_primary
if self.virtual_ports:
cmd = self.SUBCMD_START_POWER_GROUPED - 1 # because _send_cmd will do +1
else:
cmd = self.SUBCMD_START_POWER
if speed_secondary is None:
speed_secondary = speed_primary
params = b""
params += pack("<b", self._speed_abs(power_primary))
params += pack("<b", self._speed_abs(speed_primary))
if self.virtual_ports:
params += pack("<b", self._speed_abs(power_secondary))
params += pack("<b", self._speed_abs(speed_secondary))
self._send_cmd(cmd, params)
self._write_direct_mode(self.SUBCMD_START_POWER, params)
def stop(self):
self.timed(0)
self.start_speed(0)
def set_acc_profile(self, seconds, profile_no=0x00):
"""
@ -405,20 +397,20 @@ class EncodedMotor(Motor):
def goto_position(self, degrees_primary, degrees_secondary=None, speed=1.0, max_power=1.0,
end_state=Motor.END_STATE_BRAKE, use_profile=0b11):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-gotoabsoluteposition-abspos-speed-maxpower-endstate-useprofile-0x0d
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startspeedfordegrees-degrees-speed-maxpower-endstate-useprofile-0x0b
"""
if degrees_secondary is None:
degrees_secondary = degrees_primary
params = b""
params += pack("<i", degrees_primary)
params += pack("<I", degrees_primary)
if self.virtual_ports:
params += pack("<i", degrees_secondary)
params += pack("<I", degrees_secondary)
params += pack("<b", self._speed_abs(speed))
params += pack("<B", int(100 * max_power))
params += pack("<B", end_state)
params += pack("<B", int(100 * max_power))
params += pack("<B", use_profile)
self._send_cmd(self.SUBCMD_GOTO_ABSOLUTE_POSITION, params)
@ -428,8 +420,11 @@ class EncodedMotor(Motor):
if self._port_mode.mode == self.SENSOR_ANGLE:
angle = unpack("<l", data[0:4])[0]
return (angle,)
elif self._port_mode.mode == self.SENSOR_SOMETHING1:
smth = usbyte(data, 0)
return (smth,)
elif self._port_mode.mode == self.SENSOR_SPEED:
speed = unpack("<b", data[0:1])[0]
speed = unpack("<b", data[0])[0]
return (speed,)
else:
log.debug("Got motor sensor data while in unexpected mode: %r", self._port_mode)
@ -588,7 +583,7 @@ class VisionSensor(Peripheral):
elif self._port_mode.mode == self.CALIBRATE:
return [ushort(data, x * 2) for x in range(8)]
else:
log.debug("Unhandled VisionSensor data in mode %s: %s", self._port_mode.mode, str2hex(data))
log.debug("Unhandled data in mode %s: %s", self._port_mode.mode, str2hex(data))
return ()
def set_color(self, color):
@ -668,4 +663,4 @@ class Button(Peripheral):
:type msg: MsgHubProperties
"""
if msg.property == MsgHubProperties.BUTTON and msg.operation == MsgHubProperties.UPSTREAM_UPDATE:
self._notify_subscribers(usbyte(msg.parameters, 0))
self._notify_subscribers(bool(usbyte(msg.parameters, 0)))

View File

@ -7,8 +7,6 @@ import logging
import sys
from struct import unpack
log = logging.getLogger(__name__)
if sys.version_info[0] == 2:
import Queue as queue
else:
@ -17,29 +15,22 @@ else:
queue = queue # just to use it
def check_unpack(seq, index, pattern, size):
"""Check that we got size bytes, if so, unpack using pattern"""
data = seq[index: index + size]
assert len(data) == size, "Unexpected data len %d, expected %d" % (len(data), size)
return unpack(pattern, data)[0]
def usbyte(seq, index):
return check_unpack(seq, index, "<B", 1)
return unpack("<B", seq[index:index + 1])[0]
def ushort(seq, index):
return check_unpack(seq, index, "<H", 2)
return unpack("<H", seq[index:index + 2])[0]
def usint(seq, index):
return check_unpack(seq, index, "<I", 4)
return unpack("<I", seq[index:index + 4])[0]
def str2hex(data): # we need it for python 2+3 compatibility
# if sys.version_info[0] == 3:
# data = bytes(data, 'ascii')
if not isinstance(data, (bytes, bytearray)):
data = bytes(data, "ascii")
if not isinstance(data, bytes):
data = bytes(data, 'ascii')
hexed = binascii.hexlify(data)
return hexed

View File

@ -1,2 +0,0 @@
[metadata]
description-file = README.md

View File

@ -1,24 +1,16 @@
from setuptools import setup
from distutils.core import setup
setup(
name="pylgbst",
version="1.2.0",
author="Andrey Pokhilko",
author_email="apc4@ya.ru",
license="MIT",
description="Python library to interact with LEGO PoweredUp devices (Lego BOOST etc.)",
url='https://github.com/undera/pylgbst',
keywords=['LEGO', 'ROBOTICS', 'BLUETOOTH'],
packages=["pylgbst", "pylgbst.comms"],
requires=[],
extras_require={
# Note that dbus and gi are normally system packages
"gatt": ["gatt", "dbus", "gi"],
"gattlib": ["gattlib"],
"pygatt": ["pygatt", "pexpect"],
"bluepy": ["bluepy"],
"bleak": ["bleak"],
},
)
setup(name='pylgbst',
description='Python library to interact with LEGO Move Hub (from Lego BOOST set)',
version='1.0',
author='Andrey Pokhilko',
author_email='apc4@ya.ru',
packages=['pylgbst', "pylgbst.comms"],
requires=[],
extras_require={
'gatt': ["gatt"],
'gattlib': ["gattlib"],
'pygatt': ["pygatt"],
'bluepy': ["bluepy"],
}
)

View File

@ -1,4 +1,3 @@
import sys
import time
from binascii import unhexlify
@ -6,7 +5,7 @@ from pylgbst.comms import Connection
from pylgbst.hub import MoveHub, Hub
from pylgbst.peripherals import *
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.INFO)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')

View File

@ -622,6 +622,7 @@
]
]
},
"LEDRGB on port 0x32": {
"mode_count": 2,
"input_modes": [],
@ -693,6 +694,7 @@
"can_input": false
}
},
"ColorDistanceSensor on port 0x1": {
"mode_count": 11,
"input_modes": [
@ -1008,6 +1010,7 @@
]
]
},
"TiltSensor on port 0x3a": {
"mode_count": 8,
"input_modes": [
@ -1263,6 +1266,7 @@
]
]
},
"Current on port 0x3b": {
"mode_count": 2,
"input_modes": [

View File

@ -1,61 +0,0 @@
import sys
import time
import unittest
import bleak
from packaging import version
import pylgbst
import pylgbst.comms.cbleak as cbleak
bleak.BleakClient = object()
bleak.discover = object()
last_response = None
lt37 = version.parse(sys.version.split(' ')[0]) < version.parse("3.7")
class BleakDriverTest(unittest.TestCase):
def test_driver_creation(self):
connection = pylgbst.get_connection_bleak()
self.assertIsInstance(connection, cbleak.BleakDriver)
self.assertFalse(connection.is_alive(), 'Checking that factory returns not started driver')
@unittest.skipIf(lt37, "Python version is too low")
def test_communication(self):
driver = cbleak.BleakDriver()
async def fake_thread():
print('Fake thread initialized')
while not driver._abort:
time.sleep(0.1)
if cbleak.req_queue.qsize() != 0:
print('Received data, sending back')
data = cbleak.req_queue.get()
cbleak.resp_queue.put(data)
driver._bleak_thread = fake_thread
driver.set_notify_handler(BleakDriverTest.validation_handler)
driver.enable_notifications()
time.sleep(0.5) # time for driver initialization
self.assertTrue(driver.is_alive(), 'Checking that driver starts')
handle = 0x32
data = [0xD, 0xE, 0xA, 0xD, 0xB, 0xE, 0xE, 0xF]
driver.write(handle, data)
time.sleep(0.5) # processing time
self.assertEqual(handle, last_response[0], 'Verifying response handle')
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
driver.disconnect()
time.sleep(0.5) # processing time
self.assertFalse(driver.is_alive())
@staticmethod
def validation_handler(handle, data):
global last_response
last_response = (handle, data)
if __name__ == '__main__':
unittest.main()

View File

@ -1,40 +0,0 @@
import unittest
from pylgbst.comms import *
class ConnectionTestCase(unittest.TestCase):
def test_is_device_matched(self):
conn = Connection()
hub_address = '1a:2A:3A:4A:5A:6A'
other_address = 'A1:a2:a3:a4:a5:a6'
zero_address = '00:00:00:00:00:00'
hub_name = 'LEGO Move Hub'
other_name = 'HRM'
test_matrix = [
# address, name, hub_mac, expected
(hub_address, hub_name, hub_address, None, True),
(hub_address, hub_name, None, hub_name, True),
(hub_address, None, hub_address, None, True),
(hub_address, None, None, hub_name, False),
(hub_address, other_name, hub_address, None, True),
(hub_address, other_name, None, hub_name, False),
(other_address, hub_name, hub_address, None, False),
(other_address, hub_name, None, hub_name, True),
(other_address, None, hub_address, None, False),
(other_address, None, None, hub_name, False),
(other_address, other_name, hub_address, None, False),
(other_address, other_name, None, hub_name, False),
(zero_address, hub_name, hub_address, None, False),
(zero_address, hub_name, None, hub_name, False),
(zero_address, None, hub_address, None, False),
(zero_address, None, None, hub_name, False),
(zero_address, other_name, hub_address, None, False),
(zero_address, other_name, None, hub_name, False),
]
for address, name, hub_mac, fname, expected in test_matrix:
matched = conn._is_device_matched(address=address, dev_name=name, hub_mac=hub_mac, find_name=fname)
self.assertEqual(matched, expected)

View File

@ -122,20 +122,20 @@ class HubTest(unittest.TestCase):
class MoveHubTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append('0f00 04 02 0125000000001000000010')
conn.notifications.append('0f00 04 03 0126000000001000000010')
conn.notifications.append('0f00 04 00 0127000100000001000000')
conn.notifications.append('0f00 04 01 0127000100000001000000')
conn.notifications.append('0900 04 10 0227003738')
conn.notifications.append('0f00 04 01 0125000000001000000010')
conn.notifications.append('0f00 04 02 0126000000001000000010')
conn.notifications.append('0f00 04 37 0127000100000001000000')
conn.notifications.append('0f00 04 38 0127000100000001000000')
conn.notifications.append('0900 04 39 0227003738')
conn.notifications.append('0f00 04 32 0117000100000001000000')
conn.notifications.append('0f00 04 3a 0128000000000100000001')
conn.notifications.append('0f00 04 3b 0115000200000002000000')
conn.notifications.append('0f00 04 3c 0114000200000002000000')
conn.notification_delayed('12000101064c45474f204d6f766520487562', 1.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 1.3)
conn.notification_delayed('060001060600', 1.5)
conn.notification_delayed('0600030104ff', 1.7)
conn.notification_delayed('12000101064c45474f204d6f766520487562', 0.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 0.3)
conn.notification_delayed('060001060600', 0.5)
conn.notification_delayed('0600030104ff', 0.7)
MoveHub(conn.connect())
time.sleep(1)
conn.wait_notifications_handled()

View File

@ -21,7 +21,6 @@ class PeripheralsTest(unittest.TestCase):
def callback(pressed):
vals.append(pressed)
hub.connection.notification_delayed("060001020600", 0.0)
button.subscribe(callback)
time.sleep(0.1)
@ -34,7 +33,7 @@ class PeripheralsTest(unittest.TestCase):
time.sleep(0.1)
hub.connection.wait_notifications_handled()
self.assertEqual([0, 1, 0], vals)
self.assertEqual([False, True, False], vals)
self.assertEqual(b"0500010202", hub.writes[1][1])
self.assertEqual(b"0500010203", hub.writes[2][1])
@ -149,49 +148,49 @@ class PeripheralsTest(unittest.TestCase):
motor = EncodedMotor(hub, MoveHub.PORT_D)
hub.peripherals[MoveHub.PORT_D] = motor
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.start_power(1.0)
self.assertEqual(b"07008103110164", hub.writes.pop(1)[1])
self.assertEqual(b"0800810211510164", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.stop()
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.set_acc_profile(1.0)
self.assertEqual(b"090081031105e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081021105e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.set_dec_profile(1.0)
self.assertEqual(b"090081031106e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081021106e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.start_speed(1.0)
self.assertEqual(b"090081031107646403", hub.writes.pop(1)[1])
self.assertEqual(b"090081021107646403", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
hub.connection.notification_delayed('050082020a', 0.1)
motor.stop()
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
logging.debug("\n\n")
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
motor.timed(1.0)
self.assertEqual(b"0c0081031109e80364647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081021109e80364647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
motor.angled(180)
self.assertEqual(b"0e008103110bb400000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0e008102110bb400000064647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.2)
hub.connection.notification_delayed('050082020a', 0.2)
motor.preset_encoder(-180)
self.assertEqual(b"0b0081031151024cffffff", hub.writes.pop(1)[1])
self.assertEqual(b"0b0081021151024cffffff", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
motor.goto_position(0)
self.assertEqual(b"0e008103110d0000000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0e008102110d00000000647f6403", hub.writes.pop(1)[1])
hub.connection.wait_notifications_handled()
@ -205,15 +204,15 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a004702020100000001', 0.1)
hub.connection.notification_delayed('0a004701020100000001', 0.1)
motor.subscribe(callback)
hub.connection.notification_delayed("0800450200000000", 0.1)
hub.connection.notification_delayed("08004502ffffffff", 0.2)
hub.connection.notification_delayed("08004502feffffff", 0.3)
hub.connection.notification_delayed("0800450100000000", 0.1)
hub.connection.notification_delayed("08004501ffffffff", 0.2)
hub.connection.notification_delayed("08004501feffffff", 0.3)
time.sleep(0.4)
hub.connection.notification_delayed('0a004702020000000000', 0.1)
hub.connection.notification_delayed('0a004701020000000000', 0.1)
motor.unsubscribe(callback)
hub.connection.wait_notifications_handled()
@ -229,13 +228,13 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a00 4702080100000001', 0.1)
hub.connection.notification_delayed('0a00 4701080100000001', 0.1)
cds.subscribe(callback)
hub.connection.notification_delayed("08004502ff0aff00", 0.1)
hub.connection.notification_delayed("08004501ff0aff00", 0.1)
time.sleep(0.2)
hub.connection.notification_delayed('0a00 4702090100000001', 0.1)
hub.connection.notification_delayed('0a00 4701090100000001', 0.1)
cds.unsubscribe(callback)
hub.connection.wait_notifications_handled()