mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Compare commits
40 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e7e571b012 | ||
|
f015d4b03b | ||
|
17ce398595 | ||
|
37c11c0682 | ||
|
35e3868a64 | ||
|
c73311528d | ||
|
afdbe4b2e0 | ||
|
8d9bb94d87 | ||
|
7949f3477c | ||
|
777bc3ad32 | ||
|
94db2840f4 | ||
|
93e1573e64 | ||
|
3c2f0b493b | ||
|
ba7594a081 | ||
|
c7e24c10d4 | ||
|
c47fb2326a | ||
|
fef871946a | ||
|
ff51129247 | ||
|
69b234b924 | ||
|
d1019ac9f4 | ||
|
dff312534f | ||
|
9e4fab4aae | ||
|
17e22bf810 | ||
|
300268a2ab | ||
|
4f8dbe852c | ||
|
d271f251dd | ||
|
c71befdb66 | ||
|
6a49f5c840 | ||
|
0a4227d132 | ||
|
1e48f23f61 | ||
|
f058ece155 | ||
|
7ffab4fb0c | ||
|
907a2dd561 | ||
|
c955820521 | ||
|
7efd92700d | ||
|
7dc8b806fa | ||
|
64776eadc8 | ||
|
9abe2495b0 | ||
|
d3e4c58c5a | ||
|
c11e8fbd18 |
28
.travis.yml
28
.travis.yml
@ -1,14 +1,8 @@
|
||||
sudo: false
|
||||
language: python
|
||||
virtualenv:
|
||||
system_site_packages: true
|
||||
|
||||
matrix:
|
||||
include:
|
||||
- os: linux
|
||||
python: 2.7
|
||||
- os: linux
|
||||
python: 3.5
|
||||
python:
|
||||
- 3.6
|
||||
- 3.8
|
||||
|
||||
addons:
|
||||
apt:
|
||||
@ -21,19 +15,23 @@ addons:
|
||||
- libdbus-1-dev
|
||||
- libdbus-glib-1-dev
|
||||
- libgirepository-1.0-1
|
||||
- libgirepository1.0-dev
|
||||
|
||||
- python-dbus
|
||||
- python-gi
|
||||
- python3-dbus
|
||||
- python3-gi
|
||||
- bluez
|
||||
install:
|
||||
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
|
||||
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
|
||||
- pip install codecov nose-exclude pygatt gatt pexpect bluepy
|
||||
- pip install codecov codacy-coverage pytest pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
|
||||
- pip install --upgrade attrs
|
||||
|
||||
env:
|
||||
- READTHEDOCS=True
|
||||
|
||||
script: coverage run --source=. -m nose tests -v --exclude-dir=examples
|
||||
script:
|
||||
- coverage run --omit="examples/*" --source=. -m pytest -v --ignore=examples --log-level=INFO tests
|
||||
|
||||
after_success:
|
||||
- coverage report -m
|
||||
- coverage xml
|
||||
- codecov
|
||||
- python-codacy-coverage -r coverage.xml
|
||||
|
39
README.md
39
README.md
@ -1,10 +1,10 @@
|
||||
# Python library to interact with Move Hub
|
||||
# Python library to interact with Move Hub / PoweredUp Hubs
|
||||
|
||||
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/en-us/boost)._
|
||||
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/themes/boost)._
|
||||
|
||||
In fact, Move Hub is just Bluetooth hardware, all manipulations are done with commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of ways to issue these commands is to write Python program using this library.
|
||||
In fact, Move Hub is just a Bluetooth hardware piece, and all manipulations with it are made by commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of the ways to issue these commands is to write Python program using this library.
|
||||
|
||||
Best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
|
||||
The best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
|
||||
|
||||
If you have Vernie assembled, you might run scripts from [`examples/vernie`](examples/vernie) directory.
|
||||
|
||||
@ -15,6 +15,7 @@ If you have Vernie assembled, you might run scripts from [`examples/vernie`](exa
|
||||
[](https://youtu.be/829RKT8v8M0)
|
||||
[](https://youtu.be/WUOa3j-6XfI)
|
||||
[](https://youtu.be/QY6nRYXQw_U)
|
||||
[](https://youtu.be/55kE9I4IQSU)
|
||||
|
||||
|
||||
## Features
|
||||
@ -36,7 +37,7 @@ _Please note that this library requires one of Bluetooth backend libraries to be
|
||||
|
||||
Install library like this:
|
||||
```bash
|
||||
pip install https://github.com/undera/pylgbst/archive/1.1.1.tar.gz
|
||||
pip install -U pylgbst
|
||||
```
|
||||
|
||||
Then instantiate MoveHub object and start invoking its methods. Following is example to just print peripherals detected on Hub:
|
||||
@ -54,12 +55,13 @@ Each peripheral kind has own methods to do actions and/or get sensor data. See [
|
||||
|
||||
## Bluetooth Backend Prerequisites
|
||||
|
||||
You have following options to install as Bluetooth backend:
|
||||
You have following options to install as Bluetooth backend (some of them might require `sudo` on Linux):
|
||||
|
||||
- `pip install pygatt` - [pygatt](https://github.com/peplin/pygatt) lib, works on both Windows and Linux
|
||||
- `pip install gatt` - [gatt](https://github.com/getsenic/gatt-python) lib, supports Linux, does not work on Windows
|
||||
- `pip install gattlib` - [gattlib](https://bitbucket.org/OscarAcena/pygattlib) - supports Linux, does not work on Windows, requires `sudo`
|
||||
- `pip install bluepy` - [bluepy](https://github.com/IanHarvey/bluepy) lib, supports Linux, including Raspbian, which allows connection to the hub from the Raspberry PI
|
||||
- `pip install bleak` - [bleak](https://github.com/hbldh/bleak) lib, supports Linux/Windows/MacOS
|
||||
|
||||
Running on Windows requires [Bluegiga BLED112 Bluetooth Smart Dongle](https://www.silabs.com/products/wireless/bluetooth/bluetooth-low-energy-modules/bled112-bluetooth-smart-dongle) hardware piece, because no other hardware currently works on Windows with Python+BLE.
|
||||
|
||||
@ -68,26 +70,25 @@ _Please let author know if you have discovered any compatibility/preprequisite d
|
||||
Depending on backend type, you might need Linux `sudo` to be used when running Python.
|
||||
|
||||
### Bluetooth Connection Options
|
||||
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
|
||||
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
|
||||
|
||||
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
|
||||
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
|
||||
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
|
||||
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
|
||||
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
|
||||
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
|
||||
- use `get_connection_auto()` to attempt backend auto-detection
|
||||
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
|
||||
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
|
||||
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
|
||||
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
|
||||
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
|
||||
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
|
||||
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
|
||||
|
||||
All the functions above have optional arguments to specify adapter name and MoveHub mac address. Please look function source code for details.
|
||||
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
|
||||
|
||||
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
|
||||
```python
|
||||
from pylgbst.hub import MoveHub
|
||||
from pylgbst.comms.cgatt import GattConnection
|
||||
|
||||
conn = GattConnection("hci1")
|
||||
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
|
||||
from pylgbst import get_connection_gatt
|
||||
|
||||
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
|
||||
hub = MoveHub(conn)
|
||||
```
|
||||
|
||||
@ -118,4 +119,4 @@ Then push green button on MoveHub, so permanent BLE connection will be establish
|
||||
- https://github.com/JorgePe/BOOSTreveng - initial source of protocol knowledge
|
||||
- https://github.com/nathankellenicki/node-poweredup - JavaScript version of library
|
||||
- https://github.com/spezifisch/sphero-python/blob/master/BB8joyDrive.py - example with another approach to bluetooth libs
|
||||
|
||||
- https://github.com/virantha/bricknil - for the lovers of async Python, alternative implementation of library to control PoweredUp Hubs
|
||||
|
40
examples/advancedbutton/README.md
Normal file
40
examples/advancedbutton/README.md
Normal file
@ -0,0 +1,40 @@
|
||||
### Advanced button
|
||||
|
||||
This example shows how you can add additional functionallity to the move hub button.
|
||||
|
||||
It adds three new actions which you can use instead of the standard subscription to a button press:
|
||||
|
||||
- Click - a single quick up/down press
|
||||
- Double click - a double up/down press, second click must occur within .5 secs of first one
|
||||
- Long press - a press and hold on the button for > .7 secs
|
||||
|
||||
```python
|
||||
from pylgbst.hub import MoveHub
|
||||
from advancedbutton import AdvancedButton
|
||||
import time
|
||||
|
||||
|
||||
hub = MoveHub()
|
||||
b = AdvancedButton(hub)
|
||||
|
||||
|
||||
def clicked():
|
||||
print("button clicked")
|
||||
|
||||
|
||||
def pressed():
|
||||
print("button pressed")
|
||||
|
||||
|
||||
def doubleclicked():
|
||||
print("button double clicked")
|
||||
|
||||
|
||||
b.click.subscribe(clicked)
|
||||
b.double_click.subscribe(doubleclicked)
|
||||
b.long_press.subscribe(pressed)
|
||||
|
||||
time.sleep(120)
|
||||
```
|
||||
|
||||
You can alter the timings using the two constants `DOUBLE_CLICK_TIME` and `LONG_PRESS_TIME`
|
73
examples/advancedbutton/advancedbutton.py
Normal file
73
examples/advancedbutton/advancedbutton.py
Normal file
@ -0,0 +1,73 @@
|
||||
import time
|
||||
import threading
|
||||
|
||||
DOUBLE_CLICK_TIME = 0.5
|
||||
LONG_PRESS_TIME = 0.7
|
||||
|
||||
|
||||
class AdvancedButton:
|
||||
def __init__(self, hub):
|
||||
self.state = 0
|
||||
self.last_pressed = 0
|
||||
self.press_time = None
|
||||
self.hub = hub
|
||||
self.hub.button.subscribe(self.pressed)
|
||||
self.click = ButtonAction()
|
||||
self.double_click = ButtonAction()
|
||||
self.long_press = ButtonAction()
|
||||
|
||||
def pressed(self, state):
|
||||
if state == 2:
|
||||
return
|
||||
|
||||
press_time = time.time()
|
||||
|
||||
if state == 1:
|
||||
self.state = 1
|
||||
self.press_time = press_time
|
||||
return
|
||||
|
||||
if state == 0 and self.state == 1:
|
||||
self.state = 0
|
||||
press_duration = press_time - self.press_time
|
||||
else:
|
||||
return
|
||||
|
||||
if press_duration > LONG_PRESS_TIME:
|
||||
# long press
|
||||
self.long_press.notify()
|
||||
return
|
||||
|
||||
if (press_time - self.last_pressed) < DOUBLE_CLICK_TIME:
|
||||
# double click
|
||||
self.last_pressed = 0
|
||||
self.double_click.notify()
|
||||
return
|
||||
|
||||
# could be first of a double click, could be single click
|
||||
self.last_pressed = press_time
|
||||
|
||||
def timeout():
|
||||
time.sleep(DOUBLE_CLICK_TIME)
|
||||
if self.last_pressed == press_time:
|
||||
# not clicked while sleeping
|
||||
# single click
|
||||
self.click.notify()
|
||||
|
||||
threading.Thread(target=timeout).start()
|
||||
|
||||
|
||||
class ButtonAction:
|
||||
def __init__(self):
|
||||
self.subscribers = set()
|
||||
|
||||
def subscribe(self, callback):
|
||||
self.subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self.subscribers:
|
||||
self.subscribers.remove(callback)
|
||||
|
||||
def notify(self):
|
||||
for subscriber in self.subscribers.copy():
|
||||
subscriber()
|
@ -32,4 +32,3 @@ while color != COLOR_NONE:
|
||||
action_by_color(color)
|
||||
|
||||
print(number)
|
||||
|
||||
|
@ -22,7 +22,8 @@ class _SpheroImproved(spheropy.Sphero):
|
||||
async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False,
|
||||
num_retry_attempts=1):
|
||||
gattool = BLEInterfaceGattool(search_name)
|
||||
return await super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)
|
||||
return await
|
||||
super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)
|
||||
|
||||
async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
|
||||
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394
|
||||
@ -33,7 +34,8 @@ class _SpheroImproved(spheropy.Sphero):
|
||||
wait_for_response=False,
|
||||
reset_inactivity_timeout=reset_inactivity_timeout)
|
||||
|
||||
return await self._send_command(command, response_timeout_in_seconds)
|
||||
return await
|
||||
self._send_command(command, response_timeout_in_seconds)
|
||||
|
||||
async def set_rotation_rate(self, rate, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
|
||||
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py
|
||||
@ -44,7 +46,8 @@ class _SpheroImproved(spheropy.Sphero):
|
||||
wait_for_response=False,
|
||||
reset_inactivity_timeout=reset_inactivity_timeout)
|
||||
|
||||
return await self._send_command(command, response_timeout_in_seconds)
|
||||
return await
|
||||
self._send_command(command, response_timeout_in_seconds)
|
||||
|
||||
|
||||
class BB8(object):
|
||||
|
@ -3,8 +3,7 @@ import time
|
||||
from time import sleep
|
||||
|
||||
from pylgbst import *
|
||||
from pylgbst.comms import DebugServerConnection
|
||||
from pylgbst.hub import MoveHub, math
|
||||
from pylgbst.hub import MoveHub
|
||||
from pylgbst.peripherals import EncodedMotor, TiltSensor, Current, Voltage, COLORS, COLOR_BLACK
|
||||
|
||||
log = logging.getLogger("demo")
|
||||
@ -13,7 +12,12 @@ log = logging.getLogger("demo")
|
||||
def demo_led_colors(movehub):
|
||||
# LED colors demo
|
||||
log.info("LED colors demo")
|
||||
movehub.led.subscribe(lambda x, y: None)
|
||||
|
||||
# We get a response with payload and port, not x and y here...
|
||||
def colour_callback(named):
|
||||
log.info("LED Color callback: %s", named)
|
||||
|
||||
movehub.led.subscribe(colour_callback)
|
||||
for color in list(COLORS.keys())[1:] + [COLOR_BLACK]:
|
||||
log.info("Setting LED color to: %s", COLORS[color])
|
||||
movehub.led.set_color(color)
|
||||
@ -148,7 +152,6 @@ def demo_motor_sensors(movehub):
|
||||
movehub.motor_B.unsubscribe(callback_b)
|
||||
|
||||
if movehub.motor_external is not None:
|
||||
demo_motor_sensors.states[movehub.motor_external] = None
|
||||
movehub.motor_external.unsubscribe(callback_e)
|
||||
|
||||
|
||||
@ -170,20 +173,92 @@ def demo_voltage(movehub):
|
||||
|
||||
|
||||
def demo_all(movehub):
|
||||
demo_voltage(movehub)
|
||||
demo_led_colors(movehub)
|
||||
demo_motors_timed(movehub)
|
||||
demo_motors_angled(movehub)
|
||||
demo_port_cd_motor(movehub)
|
||||
demo_led_colors(movehub)
|
||||
demo_tilt_sensor_simple(movehub)
|
||||
demo_tilt_sensor_precise(movehub)
|
||||
demo_color_sensor(movehub)
|
||||
demo_motor_sensors(movehub)
|
||||
demo_voltage(movehub)
|
||||
|
||||
|
||||
DEMO_CHOICES = {
|
||||
'all': demo_all,
|
||||
'voltage': demo_voltage,
|
||||
'led_colors': demo_led_colors,
|
||||
'motors_timed': demo_motors_timed,
|
||||
'motors_angled': demo_motors_angled,
|
||||
'port_cd_motor': demo_port_cd_motor,
|
||||
'tilt_sensor': demo_tilt_sensor_simple,
|
||||
'tilt_sensor_precise': demo_tilt_sensor_precise,
|
||||
'color_sensor': demo_color_sensor,
|
||||
'motor_sensors': demo_motor_sensors,
|
||||
}
|
||||
|
||||
|
||||
def get_options():
|
||||
import argparse
|
||||
arg_parser = argparse.ArgumentParser(
|
||||
description='Demonstrate move-hub communications',
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
'-c', '--connection',
|
||||
default='auto://',
|
||||
help='''Specify connection URL to use, `protocol://mac?param=X` with protocol in:
|
||||
"gatt","pygatt","gattlib","gattool", "bluepy","bluegiga"'''
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
'-d', '--demo',
|
||||
default='all',
|
||||
choices=sorted(DEMO_CHOICES.keys()),
|
||||
help="Run a particular demo, default all"
|
||||
)
|
||||
return arg_parser
|
||||
|
||||
|
||||
def connection_from_url(url):
|
||||
import pylgbst
|
||||
if url == 'auto://':
|
||||
return None
|
||||
try:
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
except ImportError:
|
||||
from urlparse import urlparse, parse_qs
|
||||
parsed = urlparse(url)
|
||||
name = 'get_connection_%s' % parsed.scheme
|
||||
factory = getattr(pylgbst, name, None)
|
||||
if not factory:
|
||||
msg = "Unrecognised URL scheme/protocol, expect a get_connection_<protocol> in pylgbst: %s"
|
||||
raise ValueError(msg % parsed.protocol)
|
||||
params = {}
|
||||
if parsed.netloc.strip():
|
||||
params['hub_mac'] = parsed.netloc
|
||||
for key, value in parse_qs(parsed.query).items():
|
||||
if len(value) == 1:
|
||||
params[key] = value[0]
|
||||
else:
|
||||
params[key] = value
|
||||
return factory(
|
||||
**params
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
|
||||
parser = get_options()
|
||||
options = parser.parse_args()
|
||||
parameters = {}
|
||||
try:
|
||||
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
|
||||
parameters['connection'] = connection
|
||||
except ValueError as err:
|
||||
parser.error(err.args[0])
|
||||
|
||||
hub = MoveHub()
|
||||
demo_all(hub)
|
||||
hub.disconnect()
|
||||
hub = MoveHub(**parameters)
|
||||
try:
|
||||
demo = DEMO_CHOICES[options.demo]
|
||||
demo(hub)
|
||||
finally:
|
||||
hub.disconnect()
|
||||
|
@ -1,4 +1,12 @@
|
||||
[
|
||||
[125, 64, 64],
|
||||
[145, 255, 250]
|
||||
[
|
||||
125,
|
||||
64,
|
||||
64
|
||||
],
|
||||
[
|
||||
145,
|
||||
255,
|
||||
250
|
||||
]
|
||||
]
|
@ -85,7 +85,7 @@ try:
|
||||
sa = round(c + b / divider, 1)
|
||||
sb = round(c - b / divider, 1)
|
||||
logging.info("SpeedA=%s, SpeedB=%s", sa, sb)
|
||||
robot.motor_AB.start_power(sa, sb)
|
||||
robot.motor_AB.start_speed(sa, sb)
|
||||
# time.sleep(0.5)
|
||||
finally:
|
||||
robot.motor_AB.stop()
|
||||
|
@ -1,5 +1,7 @@
|
||||
import logging
|
||||
|
||||
from examples.vernie import Vernie
|
||||
from pylgbst.peripherals import VisionSensor
|
||||
from . import *
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@ -10,7 +12,8 @@ criterion = min
|
||||
cur_luminosity = 0
|
||||
|
||||
|
||||
def on_change_lum(lumn):
|
||||
def on_change_lum(lumn, unknown):
|
||||
del unknown
|
||||
global cur_luminosity
|
||||
cur_luminosity = lumn
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
from pylgbst.constants import COLOR_GREEN, COLOR_NONE
|
||||
from . import *
|
||||
from pylgbst.peripherals import COLOR_GREEN, COLOR_NONE
|
||||
from vernie import *
|
||||
|
||||
robot = Vernie()
|
||||
running = True
|
||||
@ -11,7 +11,7 @@ def callback(color, distance):
|
||||
secs = (10 - distance + 1) / 10.0
|
||||
print("Distance is %.1f inches, I'm running back with %s%% speed!" % (distance, int(speed * 100)))
|
||||
if speed <= 1:
|
||||
robot.motor_AB.timed(secs / 1, -speed, is_async=True)
|
||||
robot.motor_AB.timed(secs / 1, -speed)
|
||||
robot.say("Ouch")
|
||||
|
||||
|
||||
@ -31,6 +31,3 @@ while running:
|
||||
|
||||
robot.vision_sensor.unsubscribe(callback)
|
||||
robot.button.unsubscribe(on_btn)
|
||||
robot.led.set_color(COLOR_NONE)
|
||||
while robot.led.in_progress():
|
||||
time.sleep(1)
|
||||
|
@ -6,42 +6,57 @@ from pylgbst.comms import DebugServer
|
||||
log = logging.getLogger('pylgbst')
|
||||
|
||||
|
||||
def get_connection_bluegiga(controller=None, hub_mac=None):
|
||||
def get_connection_bluegiga(controller=None, hub_mac=None, hub_name=None):
|
||||
del controller # to prevent code analysis warning
|
||||
from pylgbst.comms.cpygatt import BlueGigaConnection
|
||||
|
||||
return BlueGigaConnection().connect(hub_mac)
|
||||
return BlueGigaConnection().connect(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_gattool(controller='hci0', hub_mac=None):
|
||||
def get_connection_gattool(controller='hci0', hub_mac=None, hub_name=None):
|
||||
from pylgbst.comms.cpygatt import GattoolConnection
|
||||
|
||||
return GattoolConnection(controller).connect(hub_mac)
|
||||
return GattoolConnection(controller).connect(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_gatt(controller='hci0', hub_mac=None):
|
||||
def get_connection_gatt(controller='hci0', hub_mac=None, hub_name=None):
|
||||
from pylgbst.comms.cgatt import GattConnection
|
||||
|
||||
return GattConnection(controller).connect(hub_mac)
|
||||
return GattConnection(controller).connect(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_gattlib(controller='hci0', hub_mac=None):
|
||||
def get_connection_gattlib(controller='hci0', hub_mac=None, hub_name=None):
|
||||
from pylgbst.comms.cgattlib import GattLibConnection
|
||||
|
||||
return GattLibConnection(controller).connect(hub_mac)
|
||||
return GattLibConnection(controller).connect(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_bluepy(controller='hci0', hub_mac=None):
|
||||
def get_connection_bluepy(controller='hci0', hub_mac=None, hub_name=None):
|
||||
from pylgbst.comms.cbluepy import BluepyConnection
|
||||
|
||||
return BluepyConnection(controller).connect(hub_mac)
|
||||
return BluepyConnection(controller).connect(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_auto(controller='hci0', hub_mac=None):
|
||||
def get_connection_bleak(controller='hci0', hub_mac=None, hub_name=None):
|
||||
"""
|
||||
Return connection based with Bleak API as an endpoint.
|
||||
|
||||
:param controller: Not used, kept for compatibility with others.
|
||||
:param hub_mac: Optional Lego HUB MAC to connect to.
|
||||
:return: Driver object.
|
||||
"""
|
||||
del controller # to prevent code analysis warning
|
||||
from pylgbst.comms.cbleak import BleakDriver
|
||||
|
||||
return BleakDriver(hub_mac, hub_name)
|
||||
|
||||
|
||||
def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
|
||||
fns = [
|
||||
get_connection_bluepy,
|
||||
get_connection_bluegiga,
|
||||
get_connection_gatt,
|
||||
get_connection_bleak,
|
||||
get_connection_gattool,
|
||||
get_connection_gattlib,
|
||||
]
|
||||
@ -50,7 +65,7 @@ def get_connection_auto(controller='hci0', hub_mac=None):
|
||||
for fn in fns:
|
||||
try:
|
||||
logging.info("Trying %s", fn.__name__)
|
||||
return fn(controller, hub_mac)
|
||||
return fn(controller, hub_mac, hub_name)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except BaseException:
|
||||
|
@ -15,7 +15,6 @@ from pylgbst.utilities import str2hex
|
||||
|
||||
log = logging.getLogger('comms')
|
||||
|
||||
LEGO_MOVE_HUB = "LEGO Move Hub"
|
||||
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
|
||||
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
|
||||
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
|
||||
@ -46,6 +45,22 @@ class Connection(object):
|
||||
def enable_notifications(self):
|
||||
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
|
||||
|
||||
def _is_device_matched(self, address, dev_name, hub_mac, find_name):
|
||||
assert hub_mac or find_name, 'You have to provide either hub_mac or hub_name in connection options'
|
||||
log.debug("Checking device: %s, MAC: %s", dev_name, address)
|
||||
matched = False
|
||||
if address != "00:00:00:00:00:00":
|
||||
if hub_mac:
|
||||
if hub_mac.lower() == address.lower():
|
||||
matched = True
|
||||
elif dev_name == find_name:
|
||||
matched = True
|
||||
|
||||
if matched:
|
||||
log.info("Found %s at %s", dev_name, address)
|
||||
|
||||
return matched
|
||||
|
||||
|
||||
class DebugServer(object):
|
||||
"""
|
||||
|
216
pylgbst/comms/cbleak.py
Normal file
216
pylgbst/comms/cbleak.py
Normal file
@ -0,0 +1,216 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
|
||||
from bleak import BleakClient, discover
|
||||
|
||||
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
|
||||
|
||||
log = logging.getLogger('comms-bleak')
|
||||
|
||||
# Queues to handle request / responses. Acts as a buffer between API and async BLE driver
|
||||
resp_queue = queue.Queue()
|
||||
req_queue = queue.Queue()
|
||||
|
||||
|
||||
class BleakDriver(object):
|
||||
"""Driver that provides interface between API and Bleak."""
|
||||
|
||||
def __init__(self, hub_mac=None, hub_name=None):
|
||||
"""
|
||||
Initialize new object of Bleak Driver class.
|
||||
|
||||
:param hub_mac: Optional Lego HUB MAC to connect to.
|
||||
"""
|
||||
self.hub_mac = hub_mac
|
||||
self.hub_name = hub_name
|
||||
self._handler = None
|
||||
self._abort = False
|
||||
self._connection_thread = None
|
||||
self._processing_thread = None
|
||||
|
||||
def set_notify_handler(self, handler):
|
||||
"""
|
||||
Set handler function used to communicate with an API.
|
||||
|
||||
:param handler: Handler function called by driver when received data
|
||||
:return: None
|
||||
"""
|
||||
self._handler = handler
|
||||
|
||||
def enable_notifications(self):
|
||||
"""
|
||||
Enable notifications, in our cases starts communication threads.
|
||||
|
||||
We cannot do this earlier, because API need to fist set notification handler.
|
||||
:return: None
|
||||
"""
|
||||
self._connection_thread = threading.Thread(target=lambda: asyncio.run(self._bleak_thread()))
|
||||
self._connection_thread.daemon = True
|
||||
self._connection_thread.start()
|
||||
|
||||
self._processing_thread = threading.Thread(target=self._processing)
|
||||
self._processing_thread.daemon = True
|
||||
self._processing_thread.start()
|
||||
|
||||
async def _bleak_thread(self):
|
||||
bleak = BleakConnection()
|
||||
await bleak.connect(self.hub_mac, self.hub_name)
|
||||
await bleak.set_notify_handler(self._safe_handler)
|
||||
# After connecting, need to send any data or hub will drop the connection,
|
||||
# below command is Advertising name request update
|
||||
await bleak.write_char(MOVE_HUB_HW_UUID_CHAR, bytearray([0x05, 0x00, 0x01, 0x01, 0x05]))
|
||||
while not self._abort:
|
||||
await asyncio.sleep(0.1)
|
||||
if req_queue.qsize() != 0:
|
||||
data = req_queue.get()
|
||||
await bleak.write(data[0], data[1])
|
||||
|
||||
logging.info("Communications thread has exited")
|
||||
|
||||
@staticmethod
|
||||
def _safe_handler(handler, data):
|
||||
resp_queue.put((handler, data))
|
||||
|
||||
def _processing(self):
|
||||
while not self._abort:
|
||||
if resp_queue.qsize() != 0:
|
||||
msg = resp_queue.get()
|
||||
self._handler(msg[0], bytes(msg[1]))
|
||||
|
||||
time.sleep(0.01)
|
||||
logging.info("Processing thread has exited")
|
||||
|
||||
def write(self, handle, data):
|
||||
"""
|
||||
Send data to given handle number.
|
||||
|
||||
:param handle: Handle number that will be translated into characteristic uuid
|
||||
:param data: data to send
|
||||
:raises ConnectionError" When internal threads are not working
|
||||
:return: None
|
||||
"""
|
||||
if not self._connection_thread.is_alive() or not self._processing_thread.is_alive():
|
||||
raise ConnectionError('Something went wrong, communication threads not functioning.')
|
||||
|
||||
req_queue.put((handle, data))
|
||||
|
||||
def disconnect(self):
|
||||
"""
|
||||
Disconnect and stops communication threads.
|
||||
|
||||
:return: None
|
||||
"""
|
||||
self._abort = True
|
||||
|
||||
def is_alive(self):
|
||||
"""
|
||||
Indicate whether driver is functioning or not.
|
||||
|
||||
:return: True if driver is functioning; False otherwise.
|
||||
"""
|
||||
if self._connection_thread is not None and self._processing_thread is not None:
|
||||
return self._connection_thread.is_alive() and self._processing_thread.is_alive()
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class BleakConnection(Connection):
|
||||
"""Bleak driver for communicating with BLE device."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize new instance of BleakConnection class."""
|
||||
Connection.__init__(self)
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
self._device = None
|
||||
self._client = None
|
||||
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.WARNING)
|
||||
logging.getLogger('bleak.backends.bluezdbus.client').setLevel(logging.WARNING)
|
||||
|
||||
async def connect(self, hub_mac=None, hub_name=None):
|
||||
"""
|
||||
Connect to device.
|
||||
|
||||
:param hub_mac: Optional Lego HUB MAC to connect to.
|
||||
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
|
||||
:return: None
|
||||
"""
|
||||
log.info("Discovering devices... Press green button on Hub")
|
||||
for i in range(0, 30):
|
||||
devices = await discover(timeout=1)
|
||||
log.debug("Devices: %s", devices)
|
||||
for dev in devices:
|
||||
log.debug(dev)
|
||||
address = dev.address
|
||||
name = dev.name
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
log.info('Device matched: %r', dev)
|
||||
self._device = dev
|
||||
break
|
||||
else:
|
||||
continue
|
||||
|
||||
break
|
||||
else:
|
||||
raise ConnectionError('Device not found.')
|
||||
|
||||
self._client = BleakClient(self._device.address, self.loop)
|
||||
status = await self._client.connect()
|
||||
log.debug('Connection status: {status}'.format(status=status))
|
||||
|
||||
async def write(self, handle, data):
|
||||
"""
|
||||
Send data to given handle number.
|
||||
|
||||
If handle cannot be found in service description, hardcoded LEGO uuid will be used.
|
||||
:param handle: Handle number that will be translated into characteristic uuid
|
||||
:param data: data to send
|
||||
:return: None
|
||||
"""
|
||||
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
|
||||
desc = self._client.services.get_descriptor(handle)
|
||||
|
||||
if not isinstance(data, bytearray):
|
||||
data = bytearray(data)
|
||||
|
||||
if desc is None:
|
||||
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
|
||||
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
|
||||
else:
|
||||
await self._client.write_gatt_char(desc.characteristic_uuid, data)
|
||||
|
||||
async def write_char(self, characteristic_uuid, data):
|
||||
"""
|
||||
Send data to given handle number.
|
||||
|
||||
:param characteristic_uuid: Characteristic uuid used to send data
|
||||
:param data: data to send
|
||||
:return: None
|
||||
"""
|
||||
await self._client.write_gatt_char(characteristic_uuid, data)
|
||||
|
||||
async def set_notify_handler(self, handler):
|
||||
"""
|
||||
Set notification handler.
|
||||
|
||||
:param handler: Handle function to be called when receive any data.
|
||||
:return: None
|
||||
"""
|
||||
|
||||
def c(handle, data):
|
||||
log.debug('Response: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
|
||||
handler(handle, data)
|
||||
|
||||
await self._client.start_notify(MOVE_HUB_HW_UUID_CHAR, c)
|
||||
|
||||
def is_alive(self):
|
||||
"""
|
||||
To keep compatibility with the driver interface.
|
||||
|
||||
This method does nothing.
|
||||
:return: None.
|
||||
"""
|
||||
pass
|
@ -4,7 +4,7 @@ from threading import Thread, Event
|
||||
|
||||
from bluepy import btle
|
||||
|
||||
from pylgbst.comms import Connection, LEGO_MOVE_HUB
|
||||
from pylgbst.comms import Connection
|
||||
from pylgbst.utilities import str2hex, queue
|
||||
|
||||
log = logging.getLogger('comms-bluepy')
|
||||
@ -87,7 +87,7 @@ class BluepyConnection(Connection):
|
||||
self._peripheral = None # :type BluepyThreadedPeripheral
|
||||
self._controller = controller
|
||||
|
||||
def connect(self, hub_mac=None):
|
||||
def connect(self, hub_mac=None, hub_name=None):
|
||||
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
||||
scanner = btle.Scanner()
|
||||
|
||||
@ -98,15 +98,12 @@ class BluepyConnection(Connection):
|
||||
|
||||
for dev in devices:
|
||||
address = dev.addr
|
||||
addressType = dev.addrType
|
||||
address_type = dev.addrType
|
||||
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
|
||||
log.debug("Found dev, name: {}, address: {}".format(name, address))
|
||||
|
||||
if address != "00:00:00:00:00:00":
|
||||
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
|
||||
logging.info("Found %s at %s", name, address)
|
||||
self._peripheral = BluepyThreadedPeripheral(address, addressType, self._controller)
|
||||
break
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
self._peripheral = BluepyThreadedPeripheral(address, address_type, self._controller)
|
||||
break
|
||||
|
||||
return self
|
||||
|
||||
|
@ -5,7 +5,7 @@ from time import sleep
|
||||
|
||||
import gatt
|
||||
|
||||
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
|
||||
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
|
||||
MOVE_HUB_HARDWARE_HANDLE
|
||||
from pylgbst.utilities import str2hex
|
||||
|
||||
@ -88,7 +88,7 @@ class GattConnection(Connection):
|
||||
self._manager_thread.setDaemon(True)
|
||||
log.debug('Starting DeviceManager...')
|
||||
|
||||
def connect(self, hub_mac=None):
|
||||
def connect(self, hub_mac=None, hub_name=None):
|
||||
self._manager_thread.start()
|
||||
self._manager.start_discovery()
|
||||
|
||||
@ -100,12 +100,9 @@ class GattConnection(Connection):
|
||||
for dev in devices:
|
||||
address = dev.mac_address
|
||||
name = dev.alias()
|
||||
logging.debug("Device %s at %s", name, address)
|
||||
if address != "00:00:00:00:00:00":
|
||||
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
|
||||
logging.info("Found %s at %s", name, address)
|
||||
self._device = CustomDevice(address, self._manager)
|
||||
break
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
self._device = CustomDevice(address, self._manager)
|
||||
break
|
||||
|
||||
if not self._device:
|
||||
sleep(1)
|
||||
|
@ -5,7 +5,7 @@ from threading import Thread
|
||||
|
||||
from gattlib import DiscoveryService, GATTRequester
|
||||
|
||||
from pylgbst.comms import Connection, LEGO_MOVE_HUB
|
||||
from pylgbst.comms import Connection
|
||||
from pylgbst.utilities import queue, str2hex
|
||||
|
||||
log = logging.getLogger('comms-gattlib')
|
||||
@ -61,7 +61,7 @@ class GattLibConnection(Connection):
|
||||
self.requester = None
|
||||
self._iface = bt_iface_name
|
||||
|
||||
def connect(self, hub_mac=None):
|
||||
def connect(self, hub_mac=None, hub_name=None):
|
||||
service = DiscoveryService(self._iface)
|
||||
|
||||
while not self.requester:
|
||||
@ -70,11 +70,9 @@ class GattLibConnection(Connection):
|
||||
log.debug("Devices: %s", devices)
|
||||
|
||||
for address, name in devices.items():
|
||||
if address != "00:00:00:00:00:00":
|
||||
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
|
||||
logging.info("Found %s at %s", name, address)
|
||||
self.requester = Requester(address, True, self._iface)
|
||||
break
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
self.requester = Requester(address, True, self._iface)
|
||||
break
|
||||
|
||||
if self.requester:
|
||||
break
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
|
||||
import pygatt
|
||||
|
||||
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_CHAR
|
||||
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
|
||||
from pylgbst.utilities import str2hex
|
||||
|
||||
log = logging.getLogger('comms-pygatt')
|
||||
@ -20,24 +20,23 @@ class GattoolConnection(Connection):
|
||||
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
|
||||
self._conn_hnd = None
|
||||
|
||||
def connect(self, hub_mac=None):
|
||||
def connect(self, hub_mac=None, hub_name=None):
|
||||
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
||||
adapter = self.backend()
|
||||
adapter.start()
|
||||
adapter.start() # enable or disable restart? What's the best?
|
||||
|
||||
while not self._conn_hnd:
|
||||
log.info("Discovering devices...")
|
||||
devices = adapter.scan(1)
|
||||
log.debug("Devices: %s", devices)
|
||||
|
||||
# Pass each device found to _is_device_matched( ) to see if it the device we want
|
||||
for dev in devices:
|
||||
address = dev['address']
|
||||
name = dev['name']
|
||||
if address != "00:00:00:00:00:00":
|
||||
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
|
||||
logging.info("Found %s at %s", name, address)
|
||||
self._conn_hnd = adapter.connect(address)
|
||||
break
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
self._conn_hnd = adapter.connect(address)
|
||||
break
|
||||
|
||||
if self._conn_hnd:
|
||||
break
|
||||
|
@ -4,8 +4,8 @@ import time
|
||||
from pylgbst import get_connection_auto
|
||||
from pylgbst.messages import *
|
||||
from pylgbst.peripherals import *
|
||||
from pylgbst.utilities import str2hex, usbyte, ushort
|
||||
from pylgbst.utilities import queue
|
||||
from pylgbst.utilities import str2hex, usbyte, ushort
|
||||
|
||||
log = logging.getLogger('hub')
|
||||
|
||||
@ -43,7 +43,7 @@ class Hub(object):
|
||||
self.add_message_handler(MsgHubAction, self._handle_action)
|
||||
|
||||
if not connection:
|
||||
connection = get_connection_auto()
|
||||
connection = get_connection_auto() # TODO: how to identify the hub?
|
||||
self.connection = connection
|
||||
self.connection.set_notify_handler(self._notify)
|
||||
self.connection.enable_notifications()
|
||||
@ -137,7 +137,7 @@ class Hub(object):
|
||||
if dev_type in PERIPHERAL_TYPES:
|
||||
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
|
||||
else:
|
||||
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
|
||||
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
|
||||
self.peripherals[port] = Peripheral(self, port)
|
||||
|
||||
log.info("Attached peripheral: %s", self.peripherals[msg.port])
|
||||
@ -184,6 +184,8 @@ class MoveHub(Hub):
|
||||
:type motor_external: EncodedMotor
|
||||
"""
|
||||
|
||||
DEFAULT_NAME = "LEGO Move Hub"
|
||||
|
||||
# PORTS
|
||||
PORT_A = 0x00
|
||||
PORT_B = 0x01
|
||||
@ -197,6 +199,10 @@ class MoveHub(Hub):
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
def __init__(self, connection=None):
|
||||
self._comm_lock = threading.RLock()
|
||||
if connection is None:
|
||||
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
|
||||
|
||||
super(MoveHub, self).__init__(connection)
|
||||
self.info = {}
|
||||
|
||||
@ -247,29 +253,40 @@ class MoveHub(Hub):
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
def _handle_device_change(self, msg):
|
||||
super(MoveHub, self)._handle_device_change(msg)
|
||||
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
|
||||
port = msg.port
|
||||
if port == self.PORT_A:
|
||||
self.motor_A = self.peripherals[port]
|
||||
elif port == self.PORT_B:
|
||||
self.motor_B = self.peripherals[port]
|
||||
elif port == self.PORT_AB:
|
||||
self.motor_AB = self.peripherals[port]
|
||||
elif port == self.PORT_C:
|
||||
self.port_C = self.peripherals[port]
|
||||
elif port == self.PORT_D:
|
||||
self.port_D = self.peripherals[port]
|
||||
elif port == self.PORT_LED:
|
||||
self.led = self.peripherals[port]
|
||||
elif port == self.PORT_TILT_SENSOR:
|
||||
self.tilt_sensor = self.peripherals[port]
|
||||
elif port == self.PORT_CURRENT:
|
||||
self.current = self.peripherals[port]
|
||||
elif port == self.PORT_VOLTAGE:
|
||||
self.voltage = self.peripherals[port]
|
||||
with self._comm_lock:
|
||||
super(MoveHub, self)._handle_device_change(msg)
|
||||
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
|
||||
port = msg.port
|
||||
if port == self.PORT_A:
|
||||
self.motor_A = self.peripherals[port]
|
||||
elif port == self.PORT_B:
|
||||
self.motor_B = self.peripherals[port]
|
||||
elif port == self.PORT_AB:
|
||||
self.motor_AB = self.peripherals[port]
|
||||
elif port == self.PORT_C:
|
||||
self.port_C = self.peripherals[port]
|
||||
elif port == self.PORT_D:
|
||||
self.port_D = self.peripherals[port]
|
||||
elif port == self.PORT_LED:
|
||||
self.led = self.peripherals[port]
|
||||
elif port == self.PORT_TILT_SENSOR:
|
||||
self.tilt_sensor = self.peripherals[port]
|
||||
elif port == self.PORT_CURRENT:
|
||||
self.current = self.peripherals[port]
|
||||
elif port == self.PORT_VOLTAGE:
|
||||
self.voltage = self.peripherals[port]
|
||||
|
||||
if type(self.peripherals[port]) == VisionSensor:
|
||||
self.vision_sensor = self.peripherals[port]
|
||||
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
||||
self.motor_external = self.peripherals[port]
|
||||
if type(self.peripherals[port]) == VisionSensor:
|
||||
self.vision_sensor = self.peripherals[port]
|
||||
elif type(self.peripherals[port]) == EncodedMotor \
|
||||
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
||||
self.motor_external = self.peripherals[port]
|
||||
|
||||
|
||||
class TrainHub(Hub):
|
||||
DEFAULT_NAME = 'TrainHub'
|
||||
|
||||
def __init__(self, connection=None):
|
||||
if connection is None:
|
||||
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
|
||||
super(TrainHub, self).__init__(connection)
|
||||
|
@ -173,7 +173,8 @@ class MsgHubAction(DownstreamMsg, UpstreamMsg):
|
||||
return super(MsgHubAction, self).bytes()
|
||||
|
||||
def is_reply(self, msg):
|
||||
assert isinstance(msg, MsgHubAction)
|
||||
if not isinstance(msg, MsgHubAction):
|
||||
raise TypeError("Unexpected message type: %s" % (msg.__class__,))
|
||||
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
|
||||
return True
|
||||
|
||||
|
@ -232,10 +232,16 @@ class LEDRGB(Peripheral):
|
||||
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, payload)
|
||||
self._send_output(msg)
|
||||
|
||||
def _decode_port_data(self, msg):
|
||||
if len(msg.payload) == 3:
|
||||
return usbyte(msg.payload, 0), usbyte(msg.payload, 1), usbyte(msg.payload, 2),
|
||||
else:
|
||||
return usbyte(msg.payload, 0),
|
||||
|
||||
|
||||
class Motor(Peripheral):
|
||||
SUBCMD_START_POWER = 0x01
|
||||
# SUBCMD_START_POWER = 0x02
|
||||
SUBCMD_START_POWER_GROUPED = 0x02
|
||||
SUBCMD_SET_ACC_TIME = 0x05
|
||||
SUBCMD_SET_DEC_TIME = 0x06
|
||||
SUBCMD_START_SPEED = 0x07
|
||||
@ -248,10 +254,10 @@ class Motor(Peripheral):
|
||||
END_STATE_FLOAT = 0
|
||||
|
||||
def _speed_abs(self, relative):
|
||||
if relative is None:
|
||||
if relative == Motor.END_STATE_BRAKE or relative == Motor.END_STATE_HOLD:
|
||||
# special value for BRAKE
|
||||
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
|
||||
return 127
|
||||
return relative
|
||||
|
||||
if relative < -1:
|
||||
log.warning("Speed cannot be less than -1")
|
||||
@ -265,9 +271,6 @@ class Motor(Peripheral):
|
||||
return int(absolute)
|
||||
|
||||
def _write_direct_mode(self, subcmd, params):
|
||||
if self.virtual_ports:
|
||||
subcmd += 1 # de-facto rule
|
||||
|
||||
params = pack("<B", subcmd) + params
|
||||
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
|
||||
self._send_output(msg)
|
||||
@ -279,22 +282,27 @@ class Motor(Peripheral):
|
||||
msg = MsgPortOutput(self.port, subcmd, params)
|
||||
self._send_output(msg)
|
||||
|
||||
def start_power(self, speed_primary=1.0, speed_secondary=None):
|
||||
def start_power(self, power_primary=1.0, power_secondary=None):
|
||||
"""
|
||||
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
|
||||
"""
|
||||
if speed_secondary is None:
|
||||
speed_secondary = speed_primary
|
||||
if power_secondary is None:
|
||||
power_secondary = power_primary
|
||||
|
||||
if self.virtual_ports:
|
||||
cmd = self.SUBCMD_START_POWER_GROUPED - 1 # because _send_cmd will do +1
|
||||
else:
|
||||
cmd = self.SUBCMD_START_POWER
|
||||
|
||||
params = b""
|
||||
params += pack("<b", self._speed_abs(speed_primary))
|
||||
params += pack("<b", self._speed_abs(power_primary))
|
||||
if self.virtual_ports:
|
||||
params += pack("<b", self._speed_abs(speed_secondary))
|
||||
params += pack("<b", self._speed_abs(power_secondary))
|
||||
|
||||
self._write_direct_mode(self.SUBCMD_START_POWER, params)
|
||||
self._send_cmd(cmd, params)
|
||||
|
||||
def stop(self):
|
||||
self.start_speed(0)
|
||||
self.timed(0)
|
||||
|
||||
def set_acc_profile(self, seconds, profile_no=0x00):
|
||||
"""
|
||||
@ -397,20 +405,20 @@ class EncodedMotor(Motor):
|
||||
def goto_position(self, degrees_primary, degrees_secondary=None, speed=1.0, max_power=1.0,
|
||||
end_state=Motor.END_STATE_BRAKE, use_profile=0b11):
|
||||
"""
|
||||
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startspeedfordegrees-degrees-speed-maxpower-endstate-useprofile-0x0b
|
||||
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-gotoabsoluteposition-abspos-speed-maxpower-endstate-useprofile-0x0d
|
||||
"""
|
||||
if degrees_secondary is None:
|
||||
degrees_secondary = degrees_primary
|
||||
|
||||
params = b""
|
||||
params += pack("<I", degrees_primary)
|
||||
params += pack("<i", degrees_primary)
|
||||
if self.virtual_ports:
|
||||
params += pack("<I", degrees_secondary)
|
||||
params += pack("<i", degrees_secondary)
|
||||
|
||||
params += pack("<b", self._speed_abs(speed))
|
||||
|
||||
params += pack("<B", end_state)
|
||||
params += pack("<B", int(100 * max_power))
|
||||
params += pack("<B", end_state)
|
||||
params += pack("<B", use_profile)
|
||||
|
||||
self._send_cmd(self.SUBCMD_GOTO_ABSOLUTE_POSITION, params)
|
||||
@ -421,7 +429,7 @@ class EncodedMotor(Motor):
|
||||
angle = unpack("<l", data[0:4])[0]
|
||||
return (angle,)
|
||||
elif self._port_mode.mode == self.SENSOR_SPEED:
|
||||
speed = unpack("<b", data[0])[0]
|
||||
speed = unpack("<b", data[0:1])[0]
|
||||
return (speed,)
|
||||
else:
|
||||
log.debug("Got motor sensor data while in unexpected mode: %r", self._port_mode)
|
||||
@ -580,7 +588,7 @@ class VisionSensor(Peripheral):
|
||||
elif self._port_mode.mode == self.CALIBRATE:
|
||||
return [ushort(data, x * 2) for x in range(8)]
|
||||
else:
|
||||
log.debug("Unhandled data in mode %s: %s", self._port_mode.mode, str2hex(data))
|
||||
log.debug("Unhandled VisionSensor data in mode %s: %s", self._port_mode.mode, str2hex(data))
|
||||
return ()
|
||||
|
||||
def set_color(self, color):
|
||||
|
@ -7,6 +7,8 @@ import logging
|
||||
import sys
|
||||
from struct import unpack
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
if sys.version_info[0] == 2:
|
||||
import Queue as queue
|
||||
else:
|
||||
@ -15,22 +17,29 @@ else:
|
||||
queue = queue # just to use it
|
||||
|
||||
|
||||
def check_unpack(seq, index, pattern, size):
|
||||
"""Check that we got size bytes, if so, unpack using pattern"""
|
||||
data = seq[index: index + size]
|
||||
assert len(data) == size, "Unexpected data len %d, expected %d" % (len(data), size)
|
||||
return unpack(pattern, data)[0]
|
||||
|
||||
|
||||
def usbyte(seq, index):
|
||||
return unpack("<B", seq[index:index + 1])[0]
|
||||
return check_unpack(seq, index, "<B", 1)
|
||||
|
||||
|
||||
def ushort(seq, index):
|
||||
return unpack("<H", seq[index:index + 2])[0]
|
||||
return check_unpack(seq, index, "<H", 2)
|
||||
|
||||
|
||||
def usint(seq, index):
|
||||
return unpack("<I", seq[index:index + 4])[0]
|
||||
return check_unpack(seq, index, "<I", 4)
|
||||
|
||||
|
||||
def str2hex(data): # we need it for python 2+3 compatibility
|
||||
# if sys.version_info[0] == 3:
|
||||
# data = bytes(data, 'ascii')
|
||||
if not isinstance(data, (bytes, bytearray)):
|
||||
data = bytes(data, 'ascii')
|
||||
data = bytes(data, "ascii")
|
||||
hexed = binascii.hexlify(data)
|
||||
return hexed
|
||||
|
38
setup.py
38
setup.py
@ -1,16 +1,24 @@
|
||||
from distutils.core import setup
|
||||
from setuptools import setup
|
||||
|
||||
setup(name='pylgbst',
|
||||
description='Python library to interact with LEGO Move Hub (from Lego BOOST set)',
|
||||
version='1.1.1',
|
||||
author='Andrey Pokhilko',
|
||||
author_email='apc4@ya.ru',
|
||||
packages=['pylgbst', "pylgbst.comms"],
|
||||
requires=[],
|
||||
extras_require={
|
||||
'gatt': ["gatt"],
|
||||
'gattlib': ["gattlib"],
|
||||
'pygatt': ["pygatt"],
|
||||
'bluepy': ["bluepy"],
|
||||
}
|
||||
)
|
||||
setup(
|
||||
name="pylgbst",
|
||||
version="1.2.0",
|
||||
|
||||
author="Andrey Pokhilko",
|
||||
author_email="apc4@ya.ru",
|
||||
license="MIT",
|
||||
description="Python library to interact with LEGO PoweredUp devices (Lego BOOST etc.)",
|
||||
url='https://github.com/undera/pylgbst',
|
||||
keywords=['LEGO', 'ROBOTICS', 'BLUETOOTH'],
|
||||
|
||||
packages=["pylgbst", "pylgbst.comms"],
|
||||
requires=[],
|
||||
extras_require={
|
||||
# Note that dbus and gi are normally system packages
|
||||
"gatt": ["gatt", "dbus", "gi"],
|
||||
"gattlib": ["gattlib"],
|
||||
"pygatt": ["pygatt", "pexpect"],
|
||||
"bluepy": ["bluepy"],
|
||||
"bleak": ["bleak"],
|
||||
},
|
||||
)
|
||||
|
@ -1,3 +1,4 @@
|
||||
import sys
|
||||
import time
|
||||
from binascii import unhexlify
|
||||
|
||||
@ -5,7 +6,7 @@ from pylgbst.comms import Connection
|
||||
from pylgbst.hub import MoveHub, Hub
|
||||
from pylgbst.peripherals import *
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.INFO)
|
||||
|
||||
log = logging.getLogger('test')
|
||||
|
||||
|
@ -622,7 +622,6 @@
|
||||
]
|
||||
]
|
||||
},
|
||||
|
||||
"LEDRGB on port 0x32": {
|
||||
"mode_count": 2,
|
||||
"input_modes": [],
|
||||
@ -694,7 +693,6 @@
|
||||
"can_input": false
|
||||
}
|
||||
},
|
||||
|
||||
"ColorDistanceSensor on port 0x1": {
|
||||
"mode_count": 11,
|
||||
"input_modes": [
|
||||
@ -1010,7 +1008,6 @@
|
||||
]
|
||||
]
|
||||
},
|
||||
|
||||
"TiltSensor on port 0x3a": {
|
||||
"mode_count": 8,
|
||||
"input_modes": [
|
||||
@ -1266,7 +1263,6 @@
|
||||
]
|
||||
]
|
||||
},
|
||||
|
||||
"Current on port 0x3b": {
|
||||
"mode_count": 2,
|
||||
"input_modes": [
|
||||
|
61
tests/test_cbleak.py
Normal file
61
tests/test_cbleak.py
Normal file
@ -0,0 +1,61 @@
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import bleak
|
||||
from packaging import version
|
||||
|
||||
import pylgbst
|
||||
import pylgbst.comms.cbleak as cbleak
|
||||
|
||||
bleak.BleakClient = object()
|
||||
bleak.discover = object()
|
||||
|
||||
last_response = None
|
||||
lt37 = version.parse(sys.version.split(' ')[0]) < version.parse("3.7")
|
||||
|
||||
|
||||
class BleakDriverTest(unittest.TestCase):
|
||||
def test_driver_creation(self):
|
||||
connection = pylgbst.get_connection_bleak()
|
||||
self.assertIsInstance(connection, cbleak.BleakDriver)
|
||||
self.assertFalse(connection.is_alive(), 'Checking that factory returns not started driver')
|
||||
|
||||
@unittest.skipIf(lt37, "Python version is too low")
|
||||
def test_communication(self):
|
||||
driver = cbleak.BleakDriver()
|
||||
|
||||
async def fake_thread():
|
||||
print('Fake thread initialized')
|
||||
while not driver._abort:
|
||||
time.sleep(0.1)
|
||||
if cbleak.req_queue.qsize() != 0:
|
||||
print('Received data, sending back')
|
||||
data = cbleak.req_queue.get()
|
||||
cbleak.resp_queue.put(data)
|
||||
|
||||
driver._bleak_thread = fake_thread
|
||||
driver.set_notify_handler(BleakDriverTest.validation_handler)
|
||||
driver.enable_notifications()
|
||||
|
||||
time.sleep(0.5) # time for driver initialization
|
||||
self.assertTrue(driver.is_alive(), 'Checking that driver starts')
|
||||
handle = 0x32
|
||||
data = [0xD, 0xE, 0xA, 0xD, 0xB, 0xE, 0xE, 0xF]
|
||||
driver.write(handle, data)
|
||||
time.sleep(0.5) # processing time
|
||||
self.assertEqual(handle, last_response[0], 'Verifying response handle')
|
||||
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
|
||||
|
||||
driver.disconnect()
|
||||
time.sleep(0.5) # processing time
|
||||
self.assertFalse(driver.is_alive())
|
||||
|
||||
@staticmethod
|
||||
def validation_handler(handle, data):
|
||||
global last_response
|
||||
last_response = (handle, data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
40
tests/test_comms.py
Normal file
40
tests/test_comms.py
Normal file
@ -0,0 +1,40 @@
|
||||
import unittest
|
||||
|
||||
from pylgbst.comms import *
|
||||
|
||||
|
||||
class ConnectionTestCase(unittest.TestCase):
|
||||
def test_is_device_matched(self):
|
||||
conn = Connection()
|
||||
|
||||
hub_address = '1a:2A:3A:4A:5A:6A'
|
||||
other_address = 'A1:a2:a3:a4:a5:a6'
|
||||
zero_address = '00:00:00:00:00:00'
|
||||
hub_name = 'LEGO Move Hub'
|
||||
other_name = 'HRM'
|
||||
|
||||
test_matrix = [
|
||||
# address, name, hub_mac, expected
|
||||
(hub_address, hub_name, hub_address, None, True),
|
||||
(hub_address, hub_name, None, hub_name, True),
|
||||
(hub_address, None, hub_address, None, True),
|
||||
(hub_address, None, None, hub_name, False),
|
||||
(hub_address, other_name, hub_address, None, True),
|
||||
(hub_address, other_name, None, hub_name, False),
|
||||
(other_address, hub_name, hub_address, None, False),
|
||||
(other_address, hub_name, None, hub_name, True),
|
||||
(other_address, None, hub_address, None, False),
|
||||
(other_address, None, None, hub_name, False),
|
||||
(other_address, other_name, hub_address, None, False),
|
||||
(other_address, other_name, None, hub_name, False),
|
||||
(zero_address, hub_name, hub_address, None, False),
|
||||
(zero_address, hub_name, None, hub_name, False),
|
||||
(zero_address, None, hub_address, None, False),
|
||||
(zero_address, None, None, hub_name, False),
|
||||
(zero_address, other_name, hub_address, None, False),
|
||||
(zero_address, other_name, None, hub_name, False),
|
||||
]
|
||||
|
||||
for address, name, hub_mac, fname, expected in test_matrix:
|
||||
matched = conn._is_device_matched(address=address, dev_name=name, hub_mac=hub_mac, find_name=fname)
|
||||
self.assertEqual(matched, expected)
|
@ -151,11 +151,11 @@ class PeripheralsTest(unittest.TestCase):
|
||||
|
||||
hub.connection.notification_delayed('050082030a', 0.1)
|
||||
motor.start_power(1.0)
|
||||
self.assertEqual(b"0800810311510164", hub.writes.pop(1)[1])
|
||||
self.assertEqual(b"07008103110164", hub.writes.pop(1)[1])
|
||||
|
||||
hub.connection.notification_delayed('050082030a', 0.1)
|
||||
motor.stop()
|
||||
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
|
||||
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
|
||||
|
||||
hub.connection.notification_delayed('050082030a', 0.1)
|
||||
motor.set_acc_profile(1.0)
|
||||
@ -171,7 +171,7 @@ class PeripheralsTest(unittest.TestCase):
|
||||
|
||||
hub.connection.notification_delayed('050082030a', 0.1)
|
||||
motor.stop()
|
||||
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
|
||||
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
|
||||
|
||||
logging.debug("\n\n")
|
||||
hub.connection.notification_delayed('0500820301', 0.1)
|
||||
@ -191,7 +191,7 @@ class PeripheralsTest(unittest.TestCase):
|
||||
hub.connection.notification_delayed('0500820301', 0.1)
|
||||
hub.connection.notification_delayed('050082030a', 0.2)
|
||||
motor.goto_position(0)
|
||||
self.assertEqual(b"0e008103110d00000000647f6403", hub.writes.pop(1)[1])
|
||||
self.assertEqual(b"0e008103110d0000000064647f03", hub.writes.pop(1)[1])
|
||||
|
||||
hub.connection.wait_notifications_handled()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user