1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Compare commits

...

31 Commits

Author SHA1 Message Date
Andrey Pokhilko
e7e571b012 Fix UT 2020-09-20 11:06:59 +03:00
Dmitry Fink
f015d4b03b
Fix to work with Bleak on a mac (#78)
Sometimes the data returnes is not byte/bytes, but native objective-c class _NSInlineData.
Seems to be a bleak bug, just convert it to bytes as workaround for now.

Co-authored-by: Dmitry Fink <finik@dishero.com>
2020-09-20 11:04:39 +03:00
Andrey Pokhilko
17ce398595
Update README.md 2020-09-14 10:18:24 +03:00
Dmitry Fink
37c11c0682
Update README.md (#77)
Fix documentation, describe proper way to launch with custom connection
2020-09-14 10:17:48 +03:00
Andrey Pokhilko
35e3868a64 Add lock for device detects 2020-08-12 09:34:33 +03:00
Andrey Pokhilko
c73311528d Cosmetics around bleak 2020-07-04 09:29:55 +03:00
Andrey Pokhilko
afdbe4b2e0 Improve demo logging 2020-06-30 08:47:55 +03:00
Andrey Pokhilko
8d9bb94d87 Release 1.2.0 2020-06-27 15:41:55 +03:00
Andrey Pokhilko
7949f3477c Some docs 2020-06-27 15:39:50 +03:00
Andrey Pokhilko
777bc3ad32 Merge branch 'master' of github.com:undera/pylgbst 2020-06-27 15:32:56 +03:00
Andrey Pokhilko
94db2840f4
Allow finding hub by name (#68)
* Cosmetics

* Add hub name parameter

* Fix tests

* Fix test

* Fix conn priority
2020-06-27 15:26:47 +03:00
Andrey Pokhilko
93e1573e64 Cosmetics 2020-06-27 13:56:01 +03:00
Andrey Pokhilko
3c2f0b493b
Experiment with Bleak changes (#55)
* Introduce driver that works with Bleak, enables to use BLE devices in windows without a need of external BLE dongle.

* Fix issues in auto review.

* Add method description and UT.

* Fix docstring to comply with pep257

* Experiment

* Make test only work in 3.7+

* Fix versions

* One more try

* Kick it

* Kick

* cmon

* Dummm

* yeah yeah

* Add

* Fix version

Co-authored-by: mgr <tomekmgr@gmail.com>
2020-06-27 13:45:53 +03:00
Andrey Pokhilko
ba7594a081
Update README.md 2020-06-11 21:27:36 +03:00
Andrey Pokhilko
c7e24c10d4
Update README.md 2020-06-05 22:30:14 +03:00
Andrey Pokhilko
c47fb2326a
Offer alternative fix for start_power commands (#53)
* Offer alternative fix

* Fix test

* Dummy commit

* fix

* Decrease coverage

* fix test

* turn into comments

* Empty it

* Rollback experiment
2020-03-07 09:36:30 +03:00
Andrey Pokhilko
fef871946a experiment with codacy 2020-02-15 18:55:14 +03:00
Andrey Pokhilko
ff51129247 Fix test 2020-02-08 15:20:07 +03:00
laurentchar
69b234b924
peripheral:EncodedMotor: fixed goto_position() (#50) 2020-02-08 15:18:15 +03:00
laurentchar
d1019ac9f4
peripheral:Motor: fixed _speed_abs for END_STATE_BRAKE (#48) 2020-01-30 22:03:33 +03:00
MDE
dff312534f
Added test and fix for device matching (#46)
* Added test for device matching

Mocking the scanning and testing the connect() of each backend would be
better, but that requires more refactoring with prior agreement.
Added unittest2 dependency for subTest support, other solutions are
available.

* Fixed matching for Move Hub

If other BLE devices are around, an exception occurs on hub_mac.lower() if
default hub_mac (None) is used.

* fixup! Added test for device matching

Removed unittest2 dependency and features
2020-01-29 09:33:53 +03:00
Andrey Pokhilko
9e4fab4aae Fix tests 2020-01-28 22:00:29 +03:00
Andrey Pokhilko
17e22bf810 Use command codes in write direct 2020-01-28 20:53:28 +03:00
laurentchar
300268a2ab Update peripherals.py (#45) 2020-01-09 13:45:02 +03:00
laurentchar
4f8dbe852c demo.py (#44)
Line #155 should be removed
2020-01-08 10:56:57 +03:00
carnage
d271f251dd Add advanced button handler (#42)
* Add advanced button handler

* Rename pylgbst/extras/advancedbutton.py to examples/advancedbutton/advancedbutton.py

* Create README.md
2019-12-29 11:28:11 +03:00
Andrey Pokhilko
c71befdb66 Update handler signature 2019-12-27 22:36:14 +03:00
Andrey Pokhilko
6a49f5c840 Enable subscribing to LED notifications 2019-12-27 22:18:51 +03:00
Andrey Pokhilko
0a4227d132 Refactoring 2019-12-27 22:07:22 +03:00
Andrey Pohilko
1e48f23f61 Fix issues 2019-12-27 16:22:37 +03:00
Andrey Pohilko
f058ece155 Add meta info file 2019-12-27 10:50:06 +03:00
24 changed files with 609 additions and 135 deletions

View File

@ -1,14 +1,8 @@
sudo: false
language: python
virtualenv:
system_site_packages: true
matrix:
include:
- os: linux
python: 2.7
- os: linux
python: 3.5
python:
- 3.6
- 3.8
addons:
apt:
@ -21,19 +15,23 @@ addons:
- libdbus-1-dev
- libdbus-glib-1-dev
- libgirepository-1.0-1
- libgirepository1.0-dev
- python-dbus
- python-gi
- python3-dbus
- python3-gi
- bluez
install:
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
- pip install codecov nose-exclude pygatt gatt pexpect bluepy
- pip install codecov codacy-coverage pytest pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
- pip install --upgrade attrs
env:
- READTHEDOCS=True
script: coverage run --omit="examples/*" --source=. -m nose tests -v --exclude-dir=examples
script:
- coverage run --omit="examples/*" --source=. -m pytest -v --ignore=examples --log-level=INFO tests
after_success:
- coverage report -m
- coverage xml
- codecov
- python-codacy-coverage -r coverage.xml

View File

@ -1,10 +1,10 @@
# Python library to interact with Move Hub
# Python library to interact with Move Hub / PoweredUp Hubs
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/themes/boost)._
In fact, Move Hub is just Bluetooth hardware, all manipulations are done with commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of ways to issue these commands is to write Python program using this library.
In fact, Move Hub is just a Bluetooth hardware piece, and all manipulations with it are made by commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of the ways to issue these commands is to write Python program using this library.
Best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
The best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
If you have Vernie assembled, you might run scripts from [`examples/vernie`](examples/vernie) directory.
@ -55,12 +55,13 @@ Each peripheral kind has own methods to do actions and/or get sensor data. See [
## Bluetooth Backend Prerequisites
You have following options to install as Bluetooth backend:
You have following options to install as Bluetooth backend (some of them might require `sudo` on Linux):
- `pip install pygatt` - [pygatt](https://github.com/peplin/pygatt) lib, works on both Windows and Linux
- `pip install gatt` - [gatt](https://github.com/getsenic/gatt-python) lib, supports Linux, does not work on Windows
- `pip install gattlib` - [gattlib](https://bitbucket.org/OscarAcena/pygattlib) - supports Linux, does not work on Windows, requires `sudo`
- `pip install bluepy` - [bluepy](https://github.com/IanHarvey/bluepy) lib, supports Linux, including Raspbian, which allows connection to the hub from the Raspberry PI
- `pip install bleak` - [bleak](https://github.com/hbldh/bleak) lib, supports Linux/Windows/MacOS
Running on Windows requires [Bluegiga BLED112 Bluetooth Smart Dongle](https://www.silabs.com/products/wireless/bluetooth/bluetooth-low-energy-modules/bled112-bluetooth-smart-dongle) hardware piece, because no other hardware currently works on Windows with Python+BLE.
@ -69,26 +70,25 @@ _Please let author know if you have discovered any compatibility/preprequisite d
Depending on backend type, you might need Linux `sudo` to be used when running Python.
### Bluetooth Connection Options
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_auto()` to attempt backend auto-detection
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and MoveHub mac address. Please look function source code for details.
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst.hub import MoveHub
from pylgbst.comms.cgatt import GattConnection
conn = GattConnection("hci1")
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
from pylgbst import get_connection_gatt
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
hub = MoveHub(conn)
```
@ -119,4 +119,4 @@ Then push green button on MoveHub, so permanent BLE connection will be establish
- https://github.com/JorgePe/BOOSTreveng - initial source of protocol knowledge
- https://github.com/nathankellenicki/node-poweredup - JavaScript version of library
- https://github.com/spezifisch/sphero-python/blob/master/BB8joyDrive.py - example with another approach to bluetooth libs
- https://github.com/virantha/bricknil - for the lovers of async Python, alternative implementation of library to control PoweredUp Hubs

View File

@ -0,0 +1,40 @@
### Advanced button
This example shows how you can add additional functionallity to the move hub button.
It adds three new actions which you can use instead of the standard subscription to a button press:
- Click - a single quick up/down press
- Double click - a double up/down press, second click must occur within .5 secs of first one
- Long press - a press and hold on the button for > .7 secs
```python
from pylgbst.hub import MoveHub
from advancedbutton import AdvancedButton
import time
hub = MoveHub()
b = AdvancedButton(hub)
def clicked():
print("button clicked")
def pressed():
print("button pressed")
def doubleclicked():
print("button double clicked")
b.click.subscribe(clicked)
b.double_click.subscribe(doubleclicked)
b.long_press.subscribe(pressed)
time.sleep(120)
```
You can alter the timings using the two constants `DOUBLE_CLICK_TIME` and `LONG_PRESS_TIME`

View File

@ -0,0 +1,73 @@
import time
import threading
DOUBLE_CLICK_TIME = 0.5
LONG_PRESS_TIME = 0.7
class AdvancedButton:
def __init__(self, hub):
self.state = 0
self.last_pressed = 0
self.press_time = None
self.hub = hub
self.hub.button.subscribe(self.pressed)
self.click = ButtonAction()
self.double_click = ButtonAction()
self.long_press = ButtonAction()
def pressed(self, state):
if state == 2:
return
press_time = time.time()
if state == 1:
self.state = 1
self.press_time = press_time
return
if state == 0 and self.state == 1:
self.state = 0
press_duration = press_time - self.press_time
else:
return
if press_duration > LONG_PRESS_TIME:
# long press
self.long_press.notify()
return
if (press_time - self.last_pressed) < DOUBLE_CLICK_TIME:
# double click
self.last_pressed = 0
self.double_click.notify()
return
# could be first of a double click, could be single click
self.last_pressed = press_time
def timeout():
time.sleep(DOUBLE_CLICK_TIME)
if self.last_pressed == press_time:
# not clicked while sleeping
# single click
self.click.notify()
threading.Thread(target=timeout).start()
class ButtonAction:
def __init__(self):
self.subscribers = set()
def subscribe(self, callback):
self.subscribers.add(callback)
def unsubscribe(self, callback=None):
if callback in self.subscribers:
self.subscribers.remove(callback)
def notify(self):
for subscriber in self.subscribers.copy():
subscriber()

View File

@ -14,7 +14,7 @@ def demo_led_colors(movehub):
log.info("LED colors demo")
# We get a response with payload and port, not x and y here...
def colour_callback(**named):
def colour_callback(named):
log.info("LED Color callback: %s", named)
movehub.led.subscribe(colour_callback)
@ -152,7 +152,6 @@ def demo_motor_sensors(movehub):
movehub.motor_B.unsubscribe(callback_b)
if movehub.motor_external is not None:
demo_motor_sensors.states[movehub.motor_external] = None
movehub.motor_external.unsubscribe(callback_e)
@ -174,15 +173,15 @@ def demo_voltage(movehub):
def demo_all(movehub):
demo_voltage(movehub)
demo_led_colors(movehub)
demo_motors_timed(movehub)
demo_motors_angled(movehub)
demo_port_cd_motor(movehub)
demo_led_colors(movehub)
demo_tilt_sensor_simple(movehub)
demo_tilt_sensor_precise(movehub)
demo_color_sensor(movehub)
demo_motor_sensors(movehub)
demo_voltage(movehub)
DEMO_CHOICES = {
@ -247,12 +246,12 @@ def connection_from_url(url):
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection)
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
parameters['connection'] = connection
except ValueError as err:
parser.error(err.args[0])

View File

@ -85,7 +85,7 @@ try:
sa = round(c + b / divider, 1)
sb = round(c - b / divider, 1)
logging.info("SpeedA=%s, SpeedB=%s", sa, sb)
robot.motor_AB.start_power(sa, sb)
robot.motor_AB.start_speed(sa, sb)
# time.sleep(0.5)
finally:
robot.motor_AB.stop()

View File

@ -1,5 +1,7 @@
import logging
from examples.vernie import Vernie
from pylgbst.peripherals import VisionSensor
from . import *
logging.basicConfig(level=logging.INFO)
@ -10,7 +12,8 @@ criterion = min
cur_luminosity = 0
def on_change_lum(lumn):
def on_change_lum(lumn, unknown):
del unknown
global cur_luminosity
cur_luminosity = lumn

View File

@ -1,5 +1,5 @@
from pylgbst.constants import COLOR_GREEN, COLOR_NONE
from . import *
from pylgbst.peripherals import COLOR_GREEN, COLOR_NONE
from vernie import *
robot = Vernie()
running = True
@ -11,7 +11,7 @@ def callback(color, distance):
secs = (10 - distance + 1) / 10.0
print("Distance is %.1f inches, I'm running back with %s%% speed!" % (distance, int(speed * 100)))
if speed <= 1:
robot.motor_AB.timed(secs / 1, -speed, is_async=True)
robot.motor_AB.timed(secs / 1, -speed)
robot.say("Ouch")
@ -31,6 +31,3 @@ while running:
robot.vision_sensor.unsubscribe(callback)
robot.button.unsubscribe(on_btn)
robot.led.set_color(COLOR_NONE)
while robot.led.in_progress():
time.sleep(1)

View File

@ -6,42 +6,57 @@ from pylgbst.comms import DebugServer
log = logging.getLogger('pylgbst')
def get_connection_bluegiga(controller=None, hub_mac=None):
def get_connection_bluegiga(controller=None, hub_mac=None, hub_name=None):
del controller # to prevent code analysis warning
from pylgbst.comms.cpygatt import BlueGigaConnection
return BlueGigaConnection().connect(hub_mac)
return BlueGigaConnection().connect(hub_mac, hub_name)
def get_connection_gattool(controller='hci0', hub_mac=None):
def get_connection_gattool(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cpygatt import GattoolConnection
return GattoolConnection(controller).connect(hub_mac)
return GattoolConnection(controller).connect(hub_mac, hub_name)
def get_connection_gatt(controller='hci0', hub_mac=None):
def get_connection_gatt(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cgatt import GattConnection
return GattConnection(controller).connect(hub_mac)
return GattConnection(controller).connect(hub_mac, hub_name)
def get_connection_gattlib(controller='hci0', hub_mac=None):
def get_connection_gattlib(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cgattlib import GattLibConnection
return GattLibConnection(controller).connect(hub_mac)
return GattLibConnection(controller).connect(hub_mac, hub_name)
def get_connection_bluepy(controller='hci0', hub_mac=None):
def get_connection_bluepy(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cbluepy import BluepyConnection
return BluepyConnection(controller).connect(hub_mac)
return BluepyConnection(controller).connect(hub_mac, hub_name)
def get_connection_auto(controller='hci0', hub_mac=None):
def get_connection_bleak(controller='hci0', hub_mac=None, hub_name=None):
"""
Return connection based with Bleak API as an endpoint.
:param controller: Not used, kept for compatibility with others.
:param hub_mac: Optional Lego HUB MAC to connect to.
:return: Driver object.
"""
del controller # to prevent code analysis warning
from pylgbst.comms.cbleak import BleakDriver
return BleakDriver(hub_mac, hub_name)
def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
fns = [
get_connection_bluepy,
get_connection_bluegiga,
get_connection_gatt,
get_connection_bleak,
get_connection_gattool,
get_connection_gattlib,
]
@ -50,7 +65,7 @@ def get_connection_auto(controller='hci0', hub_mac=None):
for fn in fns:
try:
logging.info("Trying %s", fn.__name__)
return fn(controller, hub_mac)
return fn(controller, hub_mac, hub_name)
except KeyboardInterrupt:
raise
except BaseException:

View File

@ -15,7 +15,6 @@ from pylgbst.utilities import str2hex
log = logging.getLogger('comms')
LEGO_MOVE_HUB = "LEGO Move Hub"
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
@ -46,13 +45,21 @@ class Connection(object):
def enable_notifications(self):
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
def _is_device_matched(self, address, name, hub_mac):
log.debug("Checking device name: %s, MAC: %s", name, address)
def _is_device_matched(self, address, dev_name, hub_mac, find_name):
assert hub_mac or find_name, 'You have to provide either hub_mac or hub_name in connection options'
log.debug("Checking device: %s, MAC: %s", dev_name, address)
matched = False
if address != "00:00:00:00:00:00":
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac.lower() == address.lower():
log.info("Found %s at %s", name, address)
return True
return False
if hub_mac:
if hub_mac.lower() == address.lower():
matched = True
elif dev_name == find_name:
matched = True
if matched:
log.info("Found %s at %s", dev_name, address)
return matched
class DebugServer(object):

216
pylgbst/comms/cbleak.py Normal file
View File

@ -0,0 +1,216 @@
import asyncio
import logging
import queue
import threading
import time
from bleak import BleakClient, discover
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
log = logging.getLogger('comms-bleak')
# Queues to handle request / responses. Acts as a buffer between API and async BLE driver
resp_queue = queue.Queue()
req_queue = queue.Queue()
class BleakDriver(object):
"""Driver that provides interface between API and Bleak."""
def __init__(self, hub_mac=None, hub_name=None):
"""
Initialize new object of Bleak Driver class.
:param hub_mac: Optional Lego HUB MAC to connect to.
"""
self.hub_mac = hub_mac
self.hub_name = hub_name
self._handler = None
self._abort = False
self._connection_thread = None
self._processing_thread = None
def set_notify_handler(self, handler):
"""
Set handler function used to communicate with an API.
:param handler: Handler function called by driver when received data
:return: None
"""
self._handler = handler
def enable_notifications(self):
"""
Enable notifications, in our cases starts communication threads.
We cannot do this earlier, because API need to fist set notification handler.
:return: None
"""
self._connection_thread = threading.Thread(target=lambda: asyncio.run(self._bleak_thread()))
self._connection_thread.daemon = True
self._connection_thread.start()
self._processing_thread = threading.Thread(target=self._processing)
self._processing_thread.daemon = True
self._processing_thread.start()
async def _bleak_thread(self):
bleak = BleakConnection()
await bleak.connect(self.hub_mac, self.hub_name)
await bleak.set_notify_handler(self._safe_handler)
# After connecting, need to send any data or hub will drop the connection,
# below command is Advertising name request update
await bleak.write_char(MOVE_HUB_HW_UUID_CHAR, bytearray([0x05, 0x00, 0x01, 0x01, 0x05]))
while not self._abort:
await asyncio.sleep(0.1)
if req_queue.qsize() != 0:
data = req_queue.get()
await bleak.write(data[0], data[1])
logging.info("Communications thread has exited")
@staticmethod
def _safe_handler(handler, data):
resp_queue.put((handler, data))
def _processing(self):
while not self._abort:
if resp_queue.qsize() != 0:
msg = resp_queue.get()
self._handler(msg[0], bytes(msg[1]))
time.sleep(0.01)
logging.info("Processing thread has exited")
def write(self, handle, data):
"""
Send data to given handle number.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:raises ConnectionError" When internal threads are not working
:return: None
"""
if not self._connection_thread.is_alive() or not self._processing_thread.is_alive():
raise ConnectionError('Something went wrong, communication threads not functioning.')
req_queue.put((handle, data))
def disconnect(self):
"""
Disconnect and stops communication threads.
:return: None
"""
self._abort = True
def is_alive(self):
"""
Indicate whether driver is functioning or not.
:return: True if driver is functioning; False otherwise.
"""
if self._connection_thread is not None and self._processing_thread is not None:
return self._connection_thread.is_alive() and self._processing_thread.is_alive()
else:
return False
class BleakConnection(Connection):
"""Bleak driver for communicating with BLE device."""
def __init__(self):
"""Initialize new instance of BleakConnection class."""
Connection.__init__(self)
self.loop = asyncio.get_event_loop()
self._device = None
self._client = None
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.WARNING)
logging.getLogger('bleak.backends.bluezdbus.client').setLevel(logging.WARNING)
async def connect(self, hub_mac=None, hub_name=None):
"""
Connect to device.
:param hub_mac: Optional Lego HUB MAC to connect to.
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
:return: None
"""
log.info("Discovering devices... Press green button on Hub")
for i in range(0, 30):
devices = await discover(timeout=1)
log.debug("Devices: %s", devices)
for dev in devices:
log.debug(dev)
address = dev.address
name = dev.name
if self._is_device_matched(address, name, hub_mac, hub_name):
log.info('Device matched: %r', dev)
self._device = dev
break
else:
continue
break
else:
raise ConnectionError('Device not found.')
self._client = BleakClient(self._device.address, self.loop)
status = await self._client.connect()
log.debug('Connection status: {status}'.format(status=status))
async def write(self, handle, data):
"""
Send data to given handle number.
If handle cannot be found in service description, hardcoded LEGO uuid will be used.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:return: None
"""
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
desc = self._client.services.get_descriptor(handle)
if not isinstance(data, bytearray):
data = bytearray(data)
if desc is None:
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
else:
await self._client.write_gatt_char(desc.characteristic_uuid, data)
async def write_char(self, characteristic_uuid, data):
"""
Send data to given handle number.
:param characteristic_uuid: Characteristic uuid used to send data
:param data: data to send
:return: None
"""
await self._client.write_gatt_char(characteristic_uuid, data)
async def set_notify_handler(self, handler):
"""
Set notification handler.
:param handler: Handle function to be called when receive any data.
:return: None
"""
def c(handle, data):
log.debug('Response: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
handler(handle, data)
await self._client.start_notify(MOVE_HUB_HW_UUID_CHAR, c)
def is_alive(self):
"""
To keep compatibility with the driver interface.
This method does nothing.
:return: None.
"""
pass

View File

@ -87,7 +87,7 @@ class BluepyConnection(Connection):
self._peripheral = None # :type BluepyThreadedPeripheral
self._controller = controller
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
scanner = btle.Scanner()
@ -98,11 +98,11 @@ class BluepyConnection(Connection):
for dev in devices:
address = dev.addr
addressType = dev.addrType
address_type = dev.addrType
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
if self._is_device_matched(address, name, hub_mac):
self._peripheral = BluepyThreadedPeripheral(address, addressType, self._controller)
if self._is_device_matched(address, name, hub_mac, hub_name):
self._peripheral = BluepyThreadedPeripheral(address, address_type, self._controller)
break
return self

View File

@ -88,7 +88,7 @@ class GattConnection(Connection):
self._manager_thread.setDaemon(True)
log.debug('Starting DeviceManager...')
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
self._manager_thread.start()
self._manager.start_discovery()
@ -100,7 +100,7 @@ class GattConnection(Connection):
for dev in devices:
address = dev.mac_address
name = dev.alias()
if self._is_device_matched(address, name, hub_mac):
if self._is_device_matched(address, name, hub_mac, hub_name):
self._device = CustomDevice(address, self._manager)
break

View File

@ -61,7 +61,7 @@ class GattLibConnection(Connection):
self.requester = None
self._iface = bt_iface_name
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
service = DiscoveryService(self._iface)
while not self.requester:
@ -70,7 +70,7 @@ class GattLibConnection(Connection):
log.debug("Devices: %s", devices)
for address, name in devices.items():
if self._is_device_matched(address, name, hub_mac):
if self._is_device_matched(address, name, hub_mac, hub_name):
self.requester = Requester(address, True, self._iface)
break

View File

@ -2,7 +2,7 @@ import logging
import pygatt
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_CHAR
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
from pylgbst.utilities import str2hex
log = logging.getLogger('comms-pygatt')
@ -20,20 +20,21 @@ class GattoolConnection(Connection):
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
self._conn_hnd = None
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
adapter = self.backend()
adapter.start()
adapter.start() # enable or disable restart? What's the best?
while not self._conn_hnd:
log.info("Discovering devices...")
devices = adapter.scan(1)
log.debug("Devices: %s", devices)
# Pass each device found to _is_device_matched( ) to see if it the device we want
for dev in devices:
address = dev['address']
name = dev['name']
if self._is_device_matched(address, name, hub_mac):
if self._is_device_matched(address, name, hub_mac, hub_name):
self._conn_hnd = adapter.connect(address)
break

View File

@ -4,8 +4,8 @@ import time
from pylgbst import get_connection_auto
from pylgbst.messages import *
from pylgbst.peripherals import *
from pylgbst.utilities import str2hex, usbyte, ushort
from pylgbst.utilities import queue
from pylgbst.utilities import str2hex, usbyte, ushort
log = logging.getLogger('hub')
@ -43,7 +43,7 @@ class Hub(object):
self.add_message_handler(MsgHubAction, self._handle_action)
if not connection:
connection = get_connection_auto()
connection = get_connection_auto() # TODO: how to identify the hub?
self.connection = connection
self.connection.set_notify_handler(self._notify)
self.connection.enable_notifications()
@ -137,7 +137,7 @@ class Hub(object):
if dev_type in PERIPHERAL_TYPES:
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
else:
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
self.peripherals[port] = Peripheral(self, port)
log.info("Attached peripheral: %s", self.peripherals[msg.port])
@ -184,6 +184,8 @@ class MoveHub(Hub):
:type motor_external: EncodedMotor
"""
DEFAULT_NAME = "LEGO Move Hub"
# PORTS
PORT_A = 0x00
PORT_B = 0x01
@ -197,6 +199,10 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def __init__(self, connection=None):
self._comm_lock = threading.RLock()
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(MoveHub, self).__init__(connection)
self.info = {}
@ -247,29 +253,40 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def _handle_device_change(self, msg):
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
with self._comm_lock:
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor \
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
class TrainHub(Hub):
DEFAULT_NAME = 'TrainHub'
def __init__(self, connection=None):
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(TrainHub, self).__init__(connection)

View File

@ -232,10 +232,16 @@ class LEDRGB(Peripheral):
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, payload)
self._send_output(msg)
def _decode_port_data(self, msg):
if len(msg.payload) == 3:
return usbyte(msg.payload, 0), usbyte(msg.payload, 1), usbyte(msg.payload, 2),
else:
return usbyte(msg.payload, 0),
class Motor(Peripheral):
SUBCMD_START_POWER = 0x01
# SUBCMD_START_POWER = 0x02
SUBCMD_START_POWER_GROUPED = 0x02
SUBCMD_SET_ACC_TIME = 0x05
SUBCMD_SET_DEC_TIME = 0x06
SUBCMD_START_SPEED = 0x07
@ -248,10 +254,10 @@ class Motor(Peripheral):
END_STATE_FLOAT = 0
def _speed_abs(self, relative):
if relative is None:
if relative == Motor.END_STATE_BRAKE or relative == Motor.END_STATE_HOLD:
# special value for BRAKE
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
return 127
return relative
if relative < -1:
log.warning("Speed cannot be less than -1")
@ -265,9 +271,6 @@ class Motor(Peripheral):
return int(absolute)
def _write_direct_mode(self, subcmd, params):
if self.virtual_ports:
subcmd += 1 # de-facto rule
params = pack("<B", subcmd) + params
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
self._send_output(msg)
@ -279,22 +282,27 @@ class Motor(Peripheral):
msg = MsgPortOutput(self.port, subcmd, params)
self._send_output(msg)
def start_power(self, speed_primary=1.0, speed_secondary=None):
def start_power(self, power_primary=1.0, power_secondary=None):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
"""
if speed_secondary is None:
speed_secondary = speed_primary
if power_secondary is None:
power_secondary = power_primary
if self.virtual_ports:
cmd = self.SUBCMD_START_POWER_GROUPED - 1 # because _send_cmd will do +1
else:
cmd = self.SUBCMD_START_POWER
params = b""
params += pack("<b", self._speed_abs(speed_primary))
params += pack("<b", self._speed_abs(power_primary))
if self.virtual_ports:
params += pack("<b", self._speed_abs(speed_secondary))
params += pack("<b", self._speed_abs(power_secondary))
self._write_direct_mode(self.SUBCMD_START_POWER, params)
self._send_cmd(cmd, params)
def stop(self):
self.start_speed(0)
self.timed(0)
def set_acc_profile(self, seconds, profile_no=0x00):
"""
@ -409,8 +417,8 @@ class EncodedMotor(Motor):
params += pack("<b", self._speed_abs(speed))
params += pack("<B", end_state)
params += pack("<B", int(100 * max_power))
params += pack("<B", end_state)
params += pack("<B", use_profile)
self._send_cmd(self.SUBCMD_GOTO_ABSOLUTE_POSITION, params)
@ -580,7 +588,7 @@ class VisionSensor(Peripheral):
elif self._port_mode.mode == self.CALIBRATE:
return [ushort(data, x * 2) for x in range(8)]
else:
log.debug("Unhandled data in mode %s: %s", self._port_mode.mode, str2hex(data))
log.debug("Unhandled VisionSensor data in mode %s: %s", self._port_mode.mode, str2hex(data))
return ()
def set_color(self, color):

View File

@ -20,13 +20,8 @@ queue = queue # just to use it
def check_unpack(seq, index, pattern, size):
"""Check that we got size bytes, if so, unpack using pattern"""
data = seq[index: index + size]
if len(data) == size:
return unpack(pattern, data)[0]
else:
log.warning(
"Unpacking of %s bytes failed, insufficient data: %r", size, seq[index:]
)
raise ValueError(data, "Expected %s bytes" % (size,))
assert len(data) == size, "Unexpected data len %d, expected %d" % (len(data), size)
return unpack(pattern, data)[0]
def usbyte(seq, index):

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[metadata]
description-file = README.md

View File

@ -2,12 +2,12 @@ from setuptools import setup
setup(
name="pylgbst",
version="1.1.2",
version="1.2.0",
author="Andrey Pokhilko",
author_email="apc4@ya.ru",
license="MIT",
description="Python library to interact with LEGO Move Hub (from Lego BOOST set)",
description="Python library to interact with LEGO PoweredUp devices (Lego BOOST etc.)",
url='https://github.com/undera/pylgbst',
keywords=['LEGO', 'ROBOTICS', 'BLUETOOTH'],
@ -19,5 +19,6 @@ setup(
"gattlib": ["gattlib"],
"pygatt": ["pygatt", "pexpect"],
"bluepy": ["bluepy"],
"bleak": ["bleak"],
},
)

View File

@ -1,3 +1,4 @@
import sys
import time
from binascii import unhexlify
@ -5,7 +6,7 @@ from pylgbst.comms import Connection
from pylgbst.hub import MoveHub, Hub
from pylgbst.peripherals import *
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.INFO)
log = logging.getLogger('test')

61
tests/test_cbleak.py Normal file
View File

@ -0,0 +1,61 @@
import sys
import time
import unittest
import bleak
from packaging import version
import pylgbst
import pylgbst.comms.cbleak as cbleak
bleak.BleakClient = object()
bleak.discover = object()
last_response = None
lt37 = version.parse(sys.version.split(' ')[0]) < version.parse("3.7")
class BleakDriverTest(unittest.TestCase):
def test_driver_creation(self):
connection = pylgbst.get_connection_bleak()
self.assertIsInstance(connection, cbleak.BleakDriver)
self.assertFalse(connection.is_alive(), 'Checking that factory returns not started driver')
@unittest.skipIf(lt37, "Python version is too low")
def test_communication(self):
driver = cbleak.BleakDriver()
async def fake_thread():
print('Fake thread initialized')
while not driver._abort:
time.sleep(0.1)
if cbleak.req_queue.qsize() != 0:
print('Received data, sending back')
data = cbleak.req_queue.get()
cbleak.resp_queue.put(data)
driver._bleak_thread = fake_thread
driver.set_notify_handler(BleakDriverTest.validation_handler)
driver.enable_notifications()
time.sleep(0.5) # time for driver initialization
self.assertTrue(driver.is_alive(), 'Checking that driver starts')
handle = 0x32
data = [0xD, 0xE, 0xA, 0xD, 0xB, 0xE, 0xE, 0xF]
driver.write(handle, data)
time.sleep(0.5) # processing time
self.assertEqual(handle, last_response[0], 'Verifying response handle')
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
driver.disconnect()
time.sleep(0.5) # processing time
self.assertFalse(driver.is_alive())
@staticmethod
def validation_handler(handle, data):
global last_response
last_response = (handle, data)
if __name__ == '__main__':
unittest.main()

40
tests/test_comms.py Normal file
View File

@ -0,0 +1,40 @@
import unittest
from pylgbst.comms import *
class ConnectionTestCase(unittest.TestCase):
def test_is_device_matched(self):
conn = Connection()
hub_address = '1a:2A:3A:4A:5A:6A'
other_address = 'A1:a2:a3:a4:a5:a6'
zero_address = '00:00:00:00:00:00'
hub_name = 'LEGO Move Hub'
other_name = 'HRM'
test_matrix = [
# address, name, hub_mac, expected
(hub_address, hub_name, hub_address, None, True),
(hub_address, hub_name, None, hub_name, True),
(hub_address, None, hub_address, None, True),
(hub_address, None, None, hub_name, False),
(hub_address, other_name, hub_address, None, True),
(hub_address, other_name, None, hub_name, False),
(other_address, hub_name, hub_address, None, False),
(other_address, hub_name, None, hub_name, True),
(other_address, None, hub_address, None, False),
(other_address, None, None, hub_name, False),
(other_address, other_name, hub_address, None, False),
(other_address, other_name, None, hub_name, False),
(zero_address, hub_name, hub_address, None, False),
(zero_address, hub_name, None, hub_name, False),
(zero_address, None, hub_address, None, False),
(zero_address, None, None, hub_name, False),
(zero_address, other_name, hub_address, None, False),
(zero_address, other_name, None, hub_name, False),
]
for address, name, hub_mac, fname, expected in test_matrix:
matched = conn._is_device_matched(address=address, dev_name=name, hub_mac=hub_mac, find_name=fname)
self.assertEqual(matched, expected)

View File

@ -151,11 +151,11 @@ class PeripheralsTest(unittest.TestCase):
hub.connection.notification_delayed('050082030a', 0.1)
motor.start_power(1.0)
self.assertEqual(b"0800810311510164", hub.writes.pop(1)[1])
self.assertEqual(b"07008103110164", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082030a', 0.1)
motor.set_acc_profile(1.0)
@ -171,7 +171,7 @@ class PeripheralsTest(unittest.TestCase):
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
logging.debug("\n\n")
hub.connection.notification_delayed('0500820301', 0.1)
@ -191,7 +191,7 @@ class PeripheralsTest(unittest.TestCase):
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.goto_position(0)
self.assertEqual(b"0e008103110d00000000647f6403", hub.writes.pop(1)[1])
self.assertEqual(b"0e008103110d0000000064647f03", hub.writes.pop(1)[1])
hub.connection.wait_notifications_handled()