1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Compare commits

...

71 Commits

Author SHA1 Message Date
Andrey Pokhilko
e7e571b012 Fix UT 2020-09-20 11:06:59 +03:00
Dmitry Fink
f015d4b03b
Fix to work with Bleak on a mac (#78)
Sometimes the data returnes is not byte/bytes, but native objective-c class _NSInlineData.
Seems to be a bleak bug, just convert it to bytes as workaround for now.

Co-authored-by: Dmitry Fink <finik@dishero.com>
2020-09-20 11:04:39 +03:00
Andrey Pokhilko
17ce398595
Update README.md 2020-09-14 10:18:24 +03:00
Dmitry Fink
37c11c0682
Update README.md (#77)
Fix documentation, describe proper way to launch with custom connection
2020-09-14 10:17:48 +03:00
Andrey Pokhilko
35e3868a64 Add lock for device detects 2020-08-12 09:34:33 +03:00
Andrey Pokhilko
c73311528d Cosmetics around bleak 2020-07-04 09:29:55 +03:00
Andrey Pokhilko
afdbe4b2e0 Improve demo logging 2020-06-30 08:47:55 +03:00
Andrey Pokhilko
8d9bb94d87 Release 1.2.0 2020-06-27 15:41:55 +03:00
Andrey Pokhilko
7949f3477c Some docs 2020-06-27 15:39:50 +03:00
Andrey Pokhilko
777bc3ad32 Merge branch 'master' of github.com:undera/pylgbst 2020-06-27 15:32:56 +03:00
Andrey Pokhilko
94db2840f4
Allow finding hub by name (#68)
* Cosmetics

* Add hub name parameter

* Fix tests

* Fix test

* Fix conn priority
2020-06-27 15:26:47 +03:00
Andrey Pokhilko
93e1573e64 Cosmetics 2020-06-27 13:56:01 +03:00
Andrey Pokhilko
3c2f0b493b
Experiment with Bleak changes (#55)
* Introduce driver that works with Bleak, enables to use BLE devices in windows without a need of external BLE dongle.

* Fix issues in auto review.

* Add method description and UT.

* Fix docstring to comply with pep257

* Experiment

* Make test only work in 3.7+

* Fix versions

* One more try

* Kick it

* Kick

* cmon

* Dummm

* yeah yeah

* Add

* Fix version

Co-authored-by: mgr <tomekmgr@gmail.com>
2020-06-27 13:45:53 +03:00
Andrey Pokhilko
ba7594a081
Update README.md 2020-06-11 21:27:36 +03:00
Andrey Pokhilko
c7e24c10d4
Update README.md 2020-06-05 22:30:14 +03:00
Andrey Pokhilko
c47fb2326a
Offer alternative fix for start_power commands (#53)
* Offer alternative fix

* Fix test

* Dummy commit

* fix

* Decrease coverage

* fix test

* turn into comments

* Empty it

* Rollback experiment
2020-03-07 09:36:30 +03:00
Andrey Pokhilko
fef871946a experiment with codacy 2020-02-15 18:55:14 +03:00
Andrey Pokhilko
ff51129247 Fix test 2020-02-08 15:20:07 +03:00
laurentchar
69b234b924
peripheral:EncodedMotor: fixed goto_position() (#50) 2020-02-08 15:18:15 +03:00
laurentchar
d1019ac9f4
peripheral:Motor: fixed _speed_abs for END_STATE_BRAKE (#48) 2020-01-30 22:03:33 +03:00
MDE
dff312534f
Added test and fix for device matching (#46)
* Added test for device matching

Mocking the scanning and testing the connect() of each backend would be
better, but that requires more refactoring with prior agreement.
Added unittest2 dependency for subTest support, other solutions are
available.

* Fixed matching for Move Hub

If other BLE devices are around, an exception occurs on hub_mac.lower() if
default hub_mac (None) is used.

* fixup! Added test for device matching

Removed unittest2 dependency and features
2020-01-29 09:33:53 +03:00
Andrey Pokhilko
9e4fab4aae Fix tests 2020-01-28 22:00:29 +03:00
Andrey Pokhilko
17e22bf810 Use command codes in write direct 2020-01-28 20:53:28 +03:00
laurentchar
300268a2ab Update peripherals.py (#45) 2020-01-09 13:45:02 +03:00
laurentchar
4f8dbe852c demo.py (#44)
Line #155 should be removed
2020-01-08 10:56:57 +03:00
carnage
d271f251dd Add advanced button handler (#42)
* Add advanced button handler

* Rename pylgbst/extras/advancedbutton.py to examples/advancedbutton/advancedbutton.py

* Create README.md
2019-12-29 11:28:11 +03:00
Andrey Pokhilko
c71befdb66 Update handler signature 2019-12-27 22:36:14 +03:00
Andrey Pokhilko
6a49f5c840 Enable subscribing to LED notifications 2019-12-27 22:18:51 +03:00
Andrey Pokhilko
0a4227d132 Refactoring 2019-12-27 22:07:22 +03:00
Andrey Pohilko
1e48f23f61 Fix issues 2019-12-27 16:22:37 +03:00
Andrey Pohilko
f058ece155 Add meta info file 2019-12-27 10:50:06 +03:00
Andrey Pohilko
7ffab4fb0c Publish to PyPi 2019-12-27 10:47:41 +03:00
Andrey Pohilko
907a2dd561 Cosmetics 2019-12-27 10:34:04 +03:00
Mike C. Fletcher
c955820521 Bunch of Tiny Fixes and Enhancements (#41)
* Use setuptools to allow the extras_require to work in python3.6

This also declares some hidden dependencies for the underlying
connection protocols, but note that they are normally reliant
on system-packaged versions, which is a bit less than optimal.

* In message, on assert of incoming type, note failing type

* In utilities, guard against truncated input.

* In demo allow for specifying different connections and demos on command line

Also addresses a crash in led demo where parameters x and y were not provided
to an empty lamba that was passed in.

* Remove commentted line, apply black formatting

* Raise TypeError when an incorrectly-typed message is received

* Apply black automatic formatting to the utilities module
2019-12-27 10:27:59 +03:00
Andrey Pokhilko
7efd92700d
Update .travis.yml 2019-12-26 23:00:43 +03:00
Andrey Pokhilko
7dc8b806fa Update link 2019-12-26 14:42:44 +03:00
Andrey Pokhilko
64776eadc8 Fix issue #40 2019-12-26 14:37:23 +03:00
Andrey Pokhilko
9abe2495b0 Fix issue #39 2019-12-26 14:31:51 +03:00
Andrey Pokhilko
d3e4c58c5a Add one more video 2019-12-26 14:21:05 +03:00
Andrey Pokhilko
c11e8fbd18 A bit of refactoring 2019-12-20 14:39:34 +03:00
Andrey Pokhilko
8b970c0792
Update README.md 2019-12-09 16:33:00 +03:00
Andrey Pokhilko
fc8ed8ce2b
Release 1.1.1 2019-12-09 16:32:47 +03:00
Mariusz Woloszyn
cde1bea308 Orange color fix (#35) 2019-11-12 20:36:14 +02:00
Andrey Pokhilko
13919d7ecc Restoring bb8 connection 2019-09-07 23:10:50 +03:00
Andrey Pokhilko
a6a5f12e6c Make all backends aware of zero address 2019-09-07 23:10:34 +03:00
Andrey Pokhilko
5deafe9d4a Fix name of Hub 2019-08-23 13:24:32 +03:00
Andrey Pokhilko
de54a2edc0 Release 1.1.0 2019-08-17 20:42:52 +03:00
Andrey Pokhilko
43aba755cd Release 2.0.0 2019-08-17 20:41:08 +03:00
Andrey Pokhilko
77508273de Fix test better? 2019-08-15 13:07:52 +03:00
Andrey Pokhilko
462188b6b2 Fix test 2019-08-15 13:04:36 +03:00
Andrey Pokhilko
3a8d17737b BB8 joystick is ready 2019-08-15 11:43:26 +03:00
Andrey Pokhilko
e1e650220f figured out directions of joystick 2019-08-15 10:54:37 +03:00
Andrey Pokhilko
c9112d8fe4 Progressing 2019-08-14 23:29:08 +03:00
Andrey Pokhilko
bafdf3fc63 Keep researching BB8 and joyystick 2019-08-14 17:09:45 +03:00
Andrey Pokhilko
f3e4a9dbdb Preparing joystick operations 2019-08-14 13:58:55 +03:00
Andrey Pokhilko
33364e4e3b Fix hanging on button subscribe 2019-08-14 13:17:15 +03:00
Andrey Pokhilko
3f2b3dcb3a New machine is ready, start creating playground code 2019-08-14 11:05:16 +03:00
Andrey Pokhilko
fc08a495b1 It seems LEGO has renamed Move Hub in FW update 2019-08-14 11:04:58 +03:00
Andrey Pokhilko
6ad116fe1c Change button callback into 3-state 2019-08-14 11:04:29 +03:00
Andrey Pokhilko
544aa82cf5 Finish color pinned bot 2019-08-14 11:04:07 +03:00
Andrey Pokhilko
3f19c983c2 Merge branch 'master' of github.com:undera/pylgbst 2019-08-13 15:45:59 +03:00
Andrey Pokhilko
ee2c5f2923
Update .travis.yml 2019-08-12 10:52:19 +03:00
Andrey Pokhilko
274828528f
Try to fix gattlib 2019-08-12 10:49:31 +03:00
Andrey Pokhilko
cbff4756cd
Python 3.4 => 3.5 2019-08-12 10:38:37 +03:00
Ingo Jache
990ccdb268 New LEGO-Boost Firmware 2.0.00.0017 breaks pylgbst... (#33)
* - Updated Port-Numbers (New Firmware 2.0.00.0017)
- Skipping Device without Mac-Address
(Seems to occur randomly after Firmware-Update,
referring to https://github.com/LLK/scratch-vm/issues/2230 )
- Fixed racecondition in Hub.send()
(Reply could come before entering _sync_lock)

* - fixed Unit-Tests (Changed Port-Numbers)
- moved the write-call in hub.send() outside the lock
2019-08-12 10:38:04 +03:00
Andrey Pokhilko
72dd6f0214 Working code 2019-08-05 17:36:35 +03:00
Andrey Pokhilko
66c376b2bd Fixed hardware part for pin machine 2019-08-05 14:11:53 +03:00
Andrey Pokhilko
6da6797374 Working code for BB8 joystick 2019-08-04 23:44:28 +03:00
Andrey Pokhilko
9f347c233f Continue experiments 2019-08-04 12:11:28 +03:00
Andrey Pokhilko
6aec464283 Start preparing BB8-joystick 2019-08-04 11:36:34 +03:00
Andrey Pokhilko
b1c8667f63
Update README.md 2019-06-18 09:56:17 +03:00
36 changed files with 1203 additions and 249 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@
*.pyc
build
*.avi
test_real.py
test_real.py
.vscode/settings.json

View File

@ -1,14 +1,8 @@
sudo: false
language: python
virtualenv:
system_site_packages: true
matrix:
include:
- os: linux
python: 2.7
- os: linux
python: 3.4
python:
- 3.6
- 3.8
addons:
apt:
@ -21,17 +15,23 @@ addons:
- libdbus-1-dev
- libdbus-glib-1-dev
- libgirepository-1.0-1
- libgirepository1.0-dev
- python-dbus
- python-gi
- python3-dbus
- python3-gi
- bluez
install:
- pip install codecov nose-exclude gattlib pygatt gatt pexpect bluepy
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
- pip install codecov codacy-coverage pytest pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
- pip install --upgrade attrs
env:
- READTHEDOCS=True
script: coverage run --source=. -m nose tests -v --exclude-dir=examples
script:
- coverage run --omit="examples/*" --source=. -m pytest -v --ignore=examples --log-level=INFO tests
after_success:
- coverage report -m
- coverage xml
- codecov
- python-codacy-coverage -r coverage.xml

View File

@ -1,10 +1,10 @@
# Python library to interact with Move Hub
# Python library to interact with Move Hub / PoweredUp Hubs
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/en-us/boost)._
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/themes/boost)._
In fact, Move Hub is just Bluetooth hardware, all manipulations are done with commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of ways to issue these commands is to write Python program using this library.
In fact, Move Hub is just a Bluetooth hardware piece, and all manipulations with it are made by commands passed through Bluetooth Low Energy (BLE) wireless protocol. One of the ways to issue these commands is to write Python program using this library.
Best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
The best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
If you have Vernie assembled, you might run scripts from [`examples/vernie`](examples/vernie) directory.
@ -14,6 +14,8 @@ If you have Vernie assembled, you might run scripts from [`examples/vernie`](exa
[![Laser Engraver](http://img.youtube.com/vi/ZbKmqVBBMhM/0.jpg)](https://youtu.be/ZbKmqVBBMhM)
[![Color Sorter](http://img.youtube.com/vi/829RKT8v8M0/0.jpg)](https://youtu.be/829RKT8v8M0)
[![Face Tracker](http://img.youtube.com/vi/WUOa3j-6XfI/0.jpg)](https://youtu.be/WUOa3j-6XfI)
[![Color Pin Bot](http://img.youtube.com/vi/QY6nRYXQw_U/0.jpg)](https://youtu.be/QY6nRYXQw_U)
[![BB-8 Joystick](http://img.youtube.com/vi/55kE9I4IQSU/0.jpg)](https://youtu.be/55kE9I4IQSU)
## Features
@ -35,7 +37,7 @@ _Please note that this library requires one of Bluetooth backend libraries to be
Install library like this:
```bash
pip install https://github.com/undera/pylgbst/archive/1.0.tar.gz
pip install -U pylgbst
```
Then instantiate MoveHub object and start invoking its methods. Following is example to just print peripherals detected on Hub:
@ -53,12 +55,13 @@ Each peripheral kind has own methods to do actions and/or get sensor data. See [
## Bluetooth Backend Prerequisites
You have following options to install as Bluetooth backend:
You have following options to install as Bluetooth backend (some of them might require `sudo` on Linux):
- `pip install pygatt` - [pygatt](https://github.com/peplin/pygatt) lib, works on both Windows and Linux
- `pip install gatt` - [gatt](https://github.com/getsenic/gatt-python) lib, supports Linux, does not work on Windows
- `pip install gattlib` - [gattlib](https://bitbucket.org/OscarAcena/pygattlib) - supports Linux, does not work on Windows, requires `sudo`
- `pip install bluepy` - [bluepy](https://github.com/IanHarvey/bluepy) lib, supports Linux, including Raspbian, which allows connection to the hub from the Raspberry PI
- `pip install bleak` - [bleak](https://github.com/hbldh/bleak) lib, supports Linux/Windows/MacOS
Running on Windows requires [Bluegiga BLED112 Bluetooth Smart Dongle](https://www.silabs.com/products/wireless/bluetooth/bluetooth-low-energy-modules/bled112-bluetooth-smart-dongle) hardware piece, because no other hardware currently works on Windows with Python+BLE.
@ -67,26 +70,25 @@ _Please let author know if you have discovered any compatibility/preprequisite d
Depending on backend type, you might need Linux `sudo` to be used when running Python.
### Bluetooth Connection Options
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_auto()` to attempt backend auto-detection
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and MoveHub mac address. Please look function source code for details.
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst.hub import MoveHub
from pylgbst.comms.cgatt import GattConnection
conn = GattConnection("hci1")
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
from pylgbst import get_connection_gatt
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
hub = MoveHub(conn)
```
@ -117,4 +119,4 @@ Then push green button on MoveHub, so permanent BLE connection will be establish
- https://github.com/JorgePe/BOOSTreveng - initial source of protocol knowledge
- https://github.com/nathankellenicki/node-poweredup - JavaScript version of library
- https://github.com/spezifisch/sphero-python/blob/master/BB8joyDrive.py - example with another approach to bluetooth libs
- https://github.com/virantha/bricknil - for the lovers of async Python, alternative implementation of library to control PoweredUp Hubs

View File

@ -33,4 +33,11 @@ def callback(is_pressed):
hub = MoveHub()
hub.button.subscribe(callback)
```
```
The state for button has 3 possible values:
- `0` - not released
- `1` - pressed
- `2` - pressed
It is for now unknown why Hub always issues notification with `1` and immediately with `2`, after button is pressed.

View File

@ -0,0 +1,40 @@
### Advanced button
This example shows how you can add additional functionallity to the move hub button.
It adds three new actions which you can use instead of the standard subscription to a button press:
- Click - a single quick up/down press
- Double click - a double up/down press, second click must occur within .5 secs of first one
- Long press - a press and hold on the button for > .7 secs
```python
from pylgbst.hub import MoveHub
from advancedbutton import AdvancedButton
import time
hub = MoveHub()
b = AdvancedButton(hub)
def clicked():
print("button clicked")
def pressed():
print("button pressed")
def doubleclicked():
print("button double clicked")
b.click.subscribe(clicked)
b.double_click.subscribe(doubleclicked)
b.long_press.subscribe(pressed)
time.sleep(120)
```
You can alter the timings using the two constants `DOUBLE_CLICK_TIME` and `LONG_PRESS_TIME`

View File

@ -0,0 +1,73 @@
import time
import threading
DOUBLE_CLICK_TIME = 0.5
LONG_PRESS_TIME = 0.7
class AdvancedButton:
def __init__(self, hub):
self.state = 0
self.last_pressed = 0
self.press_time = None
self.hub = hub
self.hub.button.subscribe(self.pressed)
self.click = ButtonAction()
self.double_click = ButtonAction()
self.long_press = ButtonAction()
def pressed(self, state):
if state == 2:
return
press_time = time.time()
if state == 1:
self.state = 1
self.press_time = press_time
return
if state == 0 and self.state == 1:
self.state = 0
press_duration = press_time - self.press_time
else:
return
if press_duration > LONG_PRESS_TIME:
# long press
self.long_press.notify()
return
if (press_time - self.last_pressed) < DOUBLE_CLICK_TIME:
# double click
self.last_pressed = 0
self.double_click.notify()
return
# could be first of a double click, could be single click
self.last_pressed = press_time
def timeout():
time.sleep(DOUBLE_CLICK_TIME)
if self.last_pressed == press_time:
# not clicked while sleeping
# single click
self.click.notify()
threading.Thread(target=timeout).start()
class ButtonAction:
def __init__(self):
self.subscribers = set()
def subscribe(self, callback):
self.subscribers.add(callback)
def unsubscribe(self, callback=None):
if callback in self.subscribers:
self.subscribers.remove(callback)
def notify(self):
for subscriber in self.subscribers.copy():
subscriber()

View File

@ -2,12 +2,13 @@ import logging
import time
from collections import Counter
from pylgbst.hub import MoveHub, COLOR_NONE, COLOR_BLACK, COLORS, COLOR_CYAN, COLOR_BLUE, COLOR_RED
from pylgbst.hub import MoveHub, COLOR_NONE, COLOR_BLACK, COLORS, COLOR_CYAN, COLOR_BLUE, COLOR_RED, COLOR_YELLOW, \
COLOR_WHITE
from pylgbst.peripherals import EncodedMotor
class Automata(object):
BASE_SPEED = 0.5
BASE_SPEED = 1.0
def __init__(self):
super(Automata, self).__init__()
@ -31,52 +32,48 @@ class Automata(object):
def get_color(self):
res = self._sensor
self._sensor = []
logging.info("Sensor data: %s", res)
logging.debug("Sensor data: %s", res)
cnts = Counter([x[0] for x in res])
clr = cnts.most_common(1)[0][0] if cnts else COLOR_NONE
if clr == COLOR_CYAN:
clr = COLOR_BLUE
self.__hub.led.set_color(clr)
return clr
def left(self):
self.__hub.motor_A.angled(270, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_HOLD)
self.__hub.motor_AB.angled(290, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_FLOAT)
time.sleep(0.1)
self.__hub.motor_A.stop()
self.__hub.motor_AB.stop()
def right(self):
self.__hub.motor_B.angled(-320, self.BASE_SPEED, -self.BASE_SPEED, end_state=EncodedMotor.END_STATE_HOLD)
self.__hub.motor_AB.angled(270, -self.BASE_SPEED, self.BASE_SPEED, end_state=EncodedMotor.END_STATE_FLOAT)
time.sleep(0.1)
self.__hub.motor_B.stop()
self.__hub.motor_AB.stop()
def forward(self):
self.__hub.motor_AB.angled(450, self.BASE_SPEED)
self.__hub.motor_AB.angled(500, self.BASE_SPEED)
def backward(self):
self.__hub.motor_AB.angled(-450, self.BASE_SPEED)
self.__hub.motor_AB.angled(500, -self.BASE_SPEED)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
bot = Automata()
bot.forward()
bot.right()
bot.forward()
bot.left()
bot.forward()
exit(0)
color = COLOR_NONE
color = None
cmds = []
while color != COLOR_RED:
while color != COLOR_NONE:
bot.feed_tape()
color = bot.get_color()
logging.warning(COLORS[color])
cmds.append(COLORS[color])
exp = ['BLUE', 'BLUE', 'BLUE', 'WHITE', 'BLUE', 'BLUE', 'WHITE', 'BLUE', 'WHITE', 'YELLOW', 'BLUE', 'BLUE', 'BLUE',
'BLUE', 'YELLOW', 'WHITE', 'RED']
logging.info("Exp: %s", exp)
logging.info("Act: %s", cmds)
assert exp == cmds
logging.warning(COLORS[color])
if color == COLOR_BLUE:
bot.forward()
elif color == COLOR_RED:
bot.backward()
elif color == COLOR_YELLOW:
bot.left()
elif color == COLOR_WHITE:
bot.right()

34
examples/automata/bot.py Normal file
View File

@ -0,0 +1,34 @@
from examples.automata import Automata
from pylgbst.peripherals import *
# this program is written by Sofia in early 2019
def action_by_color(color):
if color == COLOR_BLUE:
bot.forward()
if color == COLOR_RED:
bot.backward()
if color == COLOR_WHITE:
bot.right()
if color == COLOR_YELLOW:
bot.left()
def read_color():
bot.feed_tape()
color = bot.get_color()
print(COLORS[color])
return color
bot = Automata()
number = 0
color = None
while color != COLOR_NONE:
color = read_color()
number = number + 1
action_by_color(color)
print(number)

View File

@ -0,0 +1,57 @@
import logging
import sys
import time
from examples.bb8joystick import joystick
from examples.bb8joystick.bb8 import BB8
from examples.bb8joystick.joystick import Joystick
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING)
bb8 = BB8()
joystick = Joystick()
def set_bb_color(flag):
if flag:
bb8.color(255, 255, 255)
else:
bb8.color(0, 0, 0)
def set_heading(angle):
a = int(angle) % 360
print("Angle", a)
bb8.heading(a)
def roll(speed, direction):
print("Roll", speed, direction)
if speed < 3:
speed = 0
bb8.roll(speed, direction)
def stop(state):
if state:
print("Stop")
bb8.roll(0, 0)
else:
print("Stabilize")
bb8.color(255, 0, 255)
bb8.stabilize()
bb8.color(0, 0, 0)
try:
joystick.on_button(set_bb_color)
joystick.on_button(stop)
joystick.on_rotation(set_heading)
joystick.on_joystick(roll)
print("All set up")
time.sleep(300)
finally:
bb8.disconnect()
joystick.disconnect()

113
examples/bb8joystick/bb8.py Normal file
View File

@ -0,0 +1,113 @@
import asyncio
import time
import spheropy
# noinspection PyProtectedMember
from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE, _DEVICE_ID_SPHERO
class BLEInterfaceGattool(spheropy.BleInterface):
def _find_adapter(self):
adapter = spheropy.pygatt.GATTToolBackend()
adapter.start()
adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT
self._adapter = adapter
self._adapter_type = adapter_type
return True
class _SpheroImproved(spheropy.Sphero):
async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False,
num_retry_attempts=1):
gattool = BLEInterfaceGattool(search_name)
return await
super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)
async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394
command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE,
command_id=0x22,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[(sleeptime >> 8), (sleeptime & 0xff), 0],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)
return await
self._send_command(command, response_timeout_in_seconds)
async def set_rotation_rate(self, rate, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py
command = _ClientCommandPacket(device_id=_DEVICE_ID_SPHERO,
command_id=0x03,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[rate],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)
return await
self._send_command(command, response_timeout_in_seconds)
class BB8(object):
def __init__(self, name="BB-CC13"):
self._heading = 0
# marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
print("Started to wake up BB-8...")
self._sphero = _SpheroImproved()
self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name))
self._loop.run_until_complete(self._sphero.set_stabilization(True))
self._loop.run_until_complete(self._sphero.set_rotation_rate(1))
self.color(0, 0xFF, 0)
self.stabilize()
print("BB-8 is ready for commands")
def disconnect(self):
print("BB8 enters sleep")
self._loop.run_until_complete(self._sphero.sleep(0))
self._sphero.disconnect()
def color(self, red, green, blue):
self._wait_loop()
self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue, wait_for_response=False))
def heading(self, heading):
self._wait_loop()
heading = 359 - heading
self._heading = heading
# self._loop.run_until_complete(self._sphero.set_heading(359 - heading))
self._loop.run_until_complete(self._sphero.roll(1, heading, spheropy.RollMode.IN_PLACE_ROTATE))
def roll(self, speed=10, direction=0):
self._wait_loop()
direction += self._heading
direction %= 360
speed = int(255 * speed / 10)
speed *= 0.75 # throttle down a bit
self._loop.run_until_complete(self._sphero.roll(int(speed), direction))
def stop(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.roll(0, 0))
def stabilize(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.self_level())
def _wait_loop(self):
while self._loop.is_running():
time.sleep(0.001)
if __name__ == '__main__':
bb8 = BB8()
bb8.color(255, 0, 0)
time.sleep(1)
bb8.color(0, 255, 0)
time.sleep(1)
bb8.color(0, 0, 255)
time.sleep(1)

View File

@ -0,0 +1,145 @@
import logging
import math
import sys
import time
from pylgbst.hub import MoveHub
def _clamp(minvalue, value, maxvalue):
return max(minvalue, min(value, maxvalue))
class Joystick(object):
RANGE_A = 40
RANGE_C = 30
def __init__(self):
super(Joystick, self).__init__()
self._on_joystick = set()
self.button_pressed = False
self._angle_A = 0
self.angle_B = 0
self._angle_C = 0
print("Starting search for Joystick...")
self._hub = MoveHub()
self._reset_sensors()
self._hub.button.subscribe(self._on_btn)
self._on_motor_a(self._on_a)
self.on_rotation(self._on_b)
self._on_motor_c(self._on_c)
print("Joystick is ready")
def disconnect(self):
print("Joystick disconnects")
self._hub.disconnect()
def _reset_sensors(self):
logging.info("Resetting motor encoders")
self._hub.motor_A.preset_encoder()
self._hub.motor_B.preset_encoder()
self._hub.motor_external.preset_encoder()
def on_button(self, callback):
"""
Notifies about button state change. ``callback(state)`` gets single bool parameter
"""
def wrapper(state):
if state in (0, 1):
callback(bool(state))
self._hub.button.subscribe(wrapper)
def _on_motor_a(self, callback):
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
angle = _clamp(-self.RANGE_A, angle, self.RANGE_A)
callback(angle)
self._hub.motor_A.subscribe(wrapper)
def on_rotation(self, callback):
"""
Notifies about B motor rotation. ``callback(state)`` gets single int parameter from 0 to 359
"""
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
val = angle % 360
val = val if val >= 0 else 360 - val - 1
val = 359 - val
callback(val)
self._hub.motor_B.subscribe(wrapper)
def _on_motor_c(self, callback):
def wrapper(angle):
logging.debug("Raw angle: %s", angle)
angle = _clamp(-self.RANGE_C, angle, self.RANGE_C)
callback(angle)
self._hub.motor_external.subscribe(wrapper)
def _on_btn(self, state):
self.button_pressed = bool(state)
def _on_a(self, angle):
logging.debug("A rotated: %s", angle)
self._angle_A = angle
self._calc_joystick()
def _on_b(self, angle):
logging.debug("B rotated: %s", angle)
self.angle_B = angle
def _on_c(self, angle):
logging.debug("C rotated: %s", angle)
self._angle_C = angle
self._calc_joystick()
def on_joystick(self, callback):
"""
Notifies about joystick change. ``callback(speed, direction)`` gets parameters:
- ``speed`` - int value from 0 to 10
- ``direction`` - int value from 0 to 359
"""
self._on_joystick.add(callback)
def _calc_joystick(self):
norm_a = self._angle_A / self.RANGE_A
norm_b = self._angle_C / self.RANGE_C
logging.debug("%s / %s", self._angle_A, self._angle_C)
logging.debug("%s / %s", norm_a, norm_b)
speed = math.sqrt(norm_a ** 2 + norm_b ** 2) # / math.sqrt(2)
speed = _clamp(-1.0, speed, 1.0)
maxsize = sys.maxsize if norm_a >= 0 else -sys.maxsize
direction = math.atan(norm_a / norm_b if norm_b else maxsize)
direction *= 180 / math.pi
if norm_a >= 0 and norm_b >= 0:
direction = 90 - direction
elif norm_a < 0 and norm_b >= 0:
direction = 90 - direction
elif norm_a < 0 and norm_b < 0:
direction = 270 - direction
else:
direction = 270 - direction
for callback in self._on_joystick:
callback(int(round(10 * speed)), int(direction))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
stick = Joystick()
stick.on_button(lambda x: logging.info("Button: %s" % x))
stick.on_rotation(lambda x: logging.info("Rotation: %s" % x))
stick.on_joystick(lambda speed, head: logging.info("Speed: %s, Direction: %s" % (speed, head)))
time.sleep(100)

View File

@ -0,0 +1,18 @@
import time
from examples.bb8joystick import BB8, Joystick
bb8 = BB8()
def button(p):
print(p)
if p == True:
bb8.color(255, 255, 255)
else:
bb8.color(0, 0, 0)
deck = Joystick()
deck.on_button(button)
time.sleep(60)

View File

@ -3,8 +3,7 @@ import time
from time import sleep
from pylgbst import *
from pylgbst.comms import DebugServerConnection
from pylgbst.hub import MoveHub, math
from pylgbst.hub import MoveHub
from pylgbst.peripherals import EncodedMotor, TiltSensor, Current, Voltage, COLORS, COLOR_BLACK
log = logging.getLogger("demo")
@ -13,7 +12,12 @@ log = logging.getLogger("demo")
def demo_led_colors(movehub):
# LED colors demo
log.info("LED colors demo")
movehub.led.subscribe(lambda x, y: None)
# We get a response with payload and port, not x and y here...
def colour_callback(named):
log.info("LED Color callback: %s", named)
movehub.led.subscribe(colour_callback)
for color in list(COLORS.keys())[1:] + [COLOR_BLACK]:
log.info("Setting LED color to: %s", COLORS[color])
movehub.led.set_color(color)
@ -148,7 +152,6 @@ def demo_motor_sensors(movehub):
movehub.motor_B.unsubscribe(callback_b)
if movehub.motor_external is not None:
demo_motor_sensors.states[movehub.motor_external] = None
movehub.motor_external.unsubscribe(callback_e)
@ -170,20 +173,92 @@ def demo_voltage(movehub):
def demo_all(movehub):
demo_voltage(movehub)
demo_led_colors(movehub)
demo_motors_timed(movehub)
demo_motors_angled(movehub)
demo_port_cd_motor(movehub)
demo_led_colors(movehub)
demo_tilt_sensor_simple(movehub)
demo_tilt_sensor_precise(movehub)
demo_color_sensor(movehub)
demo_motor_sensors(movehub)
demo_voltage(movehub)
DEMO_CHOICES = {
'all': demo_all,
'voltage': demo_voltage,
'led_colors': demo_led_colors,
'motors_timed': demo_motors_timed,
'motors_angled': demo_motors_angled,
'port_cd_motor': demo_port_cd_motor,
'tilt_sensor': demo_tilt_sensor_simple,
'tilt_sensor_precise': demo_tilt_sensor_precise,
'color_sensor': demo_color_sensor,
'motor_sensors': demo_motor_sensors,
}
def get_options():
import argparse
arg_parser = argparse.ArgumentParser(
description='Demonstrate move-hub communications',
)
arg_parser.add_argument(
'-c', '--connection',
default='auto://',
help='''Specify connection URL to use, `protocol://mac?param=X` with protocol in:
"gatt","pygatt","gattlib","gattool", "bluepy","bluegiga"'''
)
arg_parser.add_argument(
'-d', '--demo',
default='all',
choices=sorted(DEMO_CHOICES.keys()),
help="Run a particular demo, default all"
)
return arg_parser
def connection_from_url(url):
import pylgbst
if url == 'auto://':
return None
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
parsed = urlparse(url)
name = 'get_connection_%s' % parsed.scheme
factory = getattr(pylgbst, name, None)
if not factory:
msg = "Unrecognised URL scheme/protocol, expect a get_connection_<protocol> in pylgbst: %s"
raise ValueError(msg % parsed.protocol)
params = {}
if parsed.netloc.strip():
params['hub_mac'] = parsed.netloc
for key, value in parse_qs(parsed.query).items():
if len(value) == 1:
params[key] = value[0]
else:
params[key] = value
return factory(
**params
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
parameters['connection'] = connection
except ValueError as err:
parser.error(err.args[0])
hub = MoveHub()
demo_all(hub)
hub.disconnect()
hub = MoveHub(**parameters)
try:
demo = DEMO_CHOICES[options.demo]
demo(hub)
finally:
hub.disconnect()

View File

@ -1,4 +1,12 @@
[
[125, 64, 64],
[145, 255, 250]
[
125,
64,
64
],
[
145,
255,
250
]
]

View File

@ -85,7 +85,7 @@ try:
sa = round(c + b / divider, 1)
sb = round(c - b / divider, 1)
logging.info("SpeedA=%s, SpeedB=%s", sa, sb)
robot.motor_AB.start_power(sa, sb)
robot.motor_AB.start_speed(sa, sb)
# time.sleep(0.5)
finally:
robot.motor_AB.stop()

View File

@ -1,5 +1,7 @@
import logging
from examples.vernie import Vernie
from pylgbst.peripherals import VisionSensor
from . import *
logging.basicConfig(level=logging.INFO)
@ -10,7 +12,8 @@ criterion = min
cur_luminosity = 0
def on_change_lum(lumn):
def on_change_lum(lumn, unknown):
del unknown
global cur_luminosity
cur_luminosity = lumn

View File

@ -1,5 +1,5 @@
from pylgbst.constants import COLOR_GREEN, COLOR_NONE
from . import *
from pylgbst.peripherals import COLOR_GREEN, COLOR_NONE
from vernie import *
robot = Vernie()
running = True
@ -11,7 +11,7 @@ def callback(color, distance):
secs = (10 - distance + 1) / 10.0
print("Distance is %.1f inches, I'm running back with %s%% speed!" % (distance, int(speed * 100)))
if speed <= 1:
robot.motor_AB.timed(secs / 1, -speed, is_async=True)
robot.motor_AB.timed(secs / 1, -speed)
robot.say("Ouch")
@ -31,6 +31,3 @@ while running:
robot.vision_sensor.unsubscribe(callback)
robot.button.unsubscribe(on_btn)
robot.led.set_color(COLOR_NONE)
while robot.led.in_progress():
time.sleep(1)

View File

@ -6,42 +6,57 @@ from pylgbst.comms import DebugServer
log = logging.getLogger('pylgbst')
def get_connection_bluegiga(controller=None, hub_mac=None):
def get_connection_bluegiga(controller=None, hub_mac=None, hub_name=None):
del controller # to prevent code analysis warning
from pylgbst.comms.cpygatt import BlueGigaConnection
return BlueGigaConnection().connect(hub_mac)
return BlueGigaConnection().connect(hub_mac, hub_name)
def get_connection_gattool(controller='hci0', hub_mac=None):
def get_connection_gattool(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cpygatt import GattoolConnection
return GattoolConnection(controller).connect(hub_mac)
return GattoolConnection(controller).connect(hub_mac, hub_name)
def get_connection_gatt(controller='hci0', hub_mac=None):
def get_connection_gatt(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cgatt import GattConnection
return GattConnection(controller).connect(hub_mac)
return GattConnection(controller).connect(hub_mac, hub_name)
def get_connection_gattlib(controller='hci0', hub_mac=None):
def get_connection_gattlib(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cgattlib import GattLibConnection
return GattLibConnection(controller).connect(hub_mac)
return GattLibConnection(controller).connect(hub_mac, hub_name)
def get_connection_bluepy(controller='hci0', hub_mac=None):
def get_connection_bluepy(controller='hci0', hub_mac=None, hub_name=None):
from pylgbst.comms.cbluepy import BluepyConnection
return BluepyConnection(controller).connect(hub_mac)
return BluepyConnection(controller).connect(hub_mac, hub_name)
def get_connection_auto(controller='hci0', hub_mac=None):
def get_connection_bleak(controller='hci0', hub_mac=None, hub_name=None):
"""
Return connection based with Bleak API as an endpoint.
:param controller: Not used, kept for compatibility with others.
:param hub_mac: Optional Lego HUB MAC to connect to.
:return: Driver object.
"""
del controller # to prevent code analysis warning
from pylgbst.comms.cbleak import BleakDriver
return BleakDriver(hub_mac, hub_name)
def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
fns = [
get_connection_bluepy,
get_connection_bluegiga,
get_connection_gatt,
get_connection_bleak,
get_connection_gattool,
get_connection_gattlib,
]
@ -50,7 +65,7 @@ def get_connection_auto(controller='hci0', hub_mac=None):
for fn in fns:
try:
logging.info("Trying %s", fn.__name__)
return fn(controller, hub_mac)
return fn(controller, hub_mac, hub_name)
except KeyboardInterrupt:
raise
except BaseException:

View File

@ -15,7 +15,6 @@ from pylgbst.utilities import str2hex
log = logging.getLogger('comms')
LEGO_MOVE_HUB = "LEGO Move Hub"
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
@ -46,6 +45,22 @@ class Connection(object):
def enable_notifications(self):
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
def _is_device_matched(self, address, dev_name, hub_mac, find_name):
assert hub_mac or find_name, 'You have to provide either hub_mac or hub_name in connection options'
log.debug("Checking device: %s, MAC: %s", dev_name, address)
matched = False
if address != "00:00:00:00:00:00":
if hub_mac:
if hub_mac.lower() == address.lower():
matched = True
elif dev_name == find_name:
matched = True
if matched:
log.info("Found %s at %s", dev_name, address)
return matched
class DebugServer(object):
"""

216
pylgbst/comms/cbleak.py Normal file
View File

@ -0,0 +1,216 @@
import asyncio
import logging
import queue
import threading
import time
from bleak import BleakClient, discover
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
log = logging.getLogger('comms-bleak')
# Queues to handle request / responses. Acts as a buffer between API and async BLE driver
resp_queue = queue.Queue()
req_queue = queue.Queue()
class BleakDriver(object):
"""Driver that provides interface between API and Bleak."""
def __init__(self, hub_mac=None, hub_name=None):
"""
Initialize new object of Bleak Driver class.
:param hub_mac: Optional Lego HUB MAC to connect to.
"""
self.hub_mac = hub_mac
self.hub_name = hub_name
self._handler = None
self._abort = False
self._connection_thread = None
self._processing_thread = None
def set_notify_handler(self, handler):
"""
Set handler function used to communicate with an API.
:param handler: Handler function called by driver when received data
:return: None
"""
self._handler = handler
def enable_notifications(self):
"""
Enable notifications, in our cases starts communication threads.
We cannot do this earlier, because API need to fist set notification handler.
:return: None
"""
self._connection_thread = threading.Thread(target=lambda: asyncio.run(self._bleak_thread()))
self._connection_thread.daemon = True
self._connection_thread.start()
self._processing_thread = threading.Thread(target=self._processing)
self._processing_thread.daemon = True
self._processing_thread.start()
async def _bleak_thread(self):
bleak = BleakConnection()
await bleak.connect(self.hub_mac, self.hub_name)
await bleak.set_notify_handler(self._safe_handler)
# After connecting, need to send any data or hub will drop the connection,
# below command is Advertising name request update
await bleak.write_char(MOVE_HUB_HW_UUID_CHAR, bytearray([0x05, 0x00, 0x01, 0x01, 0x05]))
while not self._abort:
await asyncio.sleep(0.1)
if req_queue.qsize() != 0:
data = req_queue.get()
await bleak.write(data[0], data[1])
logging.info("Communications thread has exited")
@staticmethod
def _safe_handler(handler, data):
resp_queue.put((handler, data))
def _processing(self):
while not self._abort:
if resp_queue.qsize() != 0:
msg = resp_queue.get()
self._handler(msg[0], bytes(msg[1]))
time.sleep(0.01)
logging.info("Processing thread has exited")
def write(self, handle, data):
"""
Send data to given handle number.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:raises ConnectionError" When internal threads are not working
:return: None
"""
if not self._connection_thread.is_alive() or not self._processing_thread.is_alive():
raise ConnectionError('Something went wrong, communication threads not functioning.')
req_queue.put((handle, data))
def disconnect(self):
"""
Disconnect and stops communication threads.
:return: None
"""
self._abort = True
def is_alive(self):
"""
Indicate whether driver is functioning or not.
:return: True if driver is functioning; False otherwise.
"""
if self._connection_thread is not None and self._processing_thread is not None:
return self._connection_thread.is_alive() and self._processing_thread.is_alive()
else:
return False
class BleakConnection(Connection):
"""Bleak driver for communicating with BLE device."""
def __init__(self):
"""Initialize new instance of BleakConnection class."""
Connection.__init__(self)
self.loop = asyncio.get_event_loop()
self._device = None
self._client = None
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.WARNING)
logging.getLogger('bleak.backends.bluezdbus.client').setLevel(logging.WARNING)
async def connect(self, hub_mac=None, hub_name=None):
"""
Connect to device.
:param hub_mac: Optional Lego HUB MAC to connect to.
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
:return: None
"""
log.info("Discovering devices... Press green button on Hub")
for i in range(0, 30):
devices = await discover(timeout=1)
log.debug("Devices: %s", devices)
for dev in devices:
log.debug(dev)
address = dev.address
name = dev.name
if self._is_device_matched(address, name, hub_mac, hub_name):
log.info('Device matched: %r', dev)
self._device = dev
break
else:
continue
break
else:
raise ConnectionError('Device not found.')
self._client = BleakClient(self._device.address, self.loop)
status = await self._client.connect()
log.debug('Connection status: {status}'.format(status=status))
async def write(self, handle, data):
"""
Send data to given handle number.
If handle cannot be found in service description, hardcoded LEGO uuid will be used.
:param handle: Handle number that will be translated into characteristic uuid
:param data: data to send
:return: None
"""
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
desc = self._client.services.get_descriptor(handle)
if not isinstance(data, bytearray):
data = bytearray(data)
if desc is None:
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
else:
await self._client.write_gatt_char(desc.characteristic_uuid, data)
async def write_char(self, characteristic_uuid, data):
"""
Send data to given handle number.
:param characteristic_uuid: Characteristic uuid used to send data
:param data: data to send
:return: None
"""
await self._client.write_gatt_char(characteristic_uuid, data)
async def set_notify_handler(self, handler):
"""
Set notification handler.
:param handler: Handle function to be called when receive any data.
:return: None
"""
def c(handle, data):
log.debug('Response: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
handler(handle, data)
await self._client.start_notify(MOVE_HUB_HW_UUID_CHAR, c)
def is_alive(self):
"""
To keep compatibility with the driver interface.
This method does nothing.
:return: None.
"""
pass

View File

@ -4,7 +4,7 @@ from threading import Thread, Event
from bluepy import btle
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.comms import Connection
from pylgbst.utilities import str2hex, queue
log = logging.getLogger('comms-bluepy')
@ -29,7 +29,7 @@ class BluepyDelegate(btle.DefaultDelegate):
self._handler = handler
def handleNotification(self, cHandle, data):
def handleNotification(self, cHandle, data):
log.debug('Incoming notification')
self._handler(cHandle, data)
@ -45,7 +45,7 @@ class BluepyThreadedPeripheral(object):
self._iface_number = _get_iface_number(controller)
self._disconnect_event = Event()
self._dispatcher_thread = Thread(target=self._dispatch_calls)
self._dispatcher_thread.setDaemon(True)
self._dispatcher_thread.setName("Bluepy call dispatcher")
@ -70,7 +70,6 @@ class BluepyThreadedPeripheral(object):
finally:
self._peripheral.disconnect()
def write(self, handle, data):
self._call_queue.put(lambda: self._peripheral.writeCharacteristic(handle, data))
@ -88,7 +87,7 @@ class BluepyConnection(Connection):
self._peripheral = None # :type BluepyThreadedPeripheral
self._controller = controller
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
scanner = btle.Scanner()
@ -99,13 +98,11 @@ class BluepyConnection(Connection):
for dev in devices:
address = dev.addr
addressType = dev.addrType
address_type = dev.addrType
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
log.debug("Found dev, name: {}, address: {}".format(name, address))
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._peripheral = BluepyThreadedPeripheral(address, addressType, self._controller)
if self._is_device_matched(address, name, hub_mac, hub_name):
self._peripheral = BluepyThreadedPeripheral(address, address_type, self._controller)
break
return self
@ -122,4 +119,3 @@ class BluepyConnection(Connection):
def is_alive(self):
return True

View File

@ -5,7 +5,7 @@ from time import sleep
import gatt
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, \
MOVE_HUB_HARDWARE_HANDLE
from pylgbst.utilities import str2hex
@ -88,7 +88,7 @@ class GattConnection(Connection):
self._manager_thread.setDaemon(True)
log.debug('Starting DeviceManager...')
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
self._manager_thread.start()
self._manager.start_discovery()
@ -100,8 +100,7 @@ class GattConnection(Connection):
for dev in devices:
address = dev.mac_address
name = dev.alias()
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
if self._is_device_matched(address, name, hub_mac, hub_name):
self._device = CustomDevice(address, self._manager)
break

View File

@ -5,7 +5,7 @@ from threading import Thread
from gattlib import DiscoveryService, GATTRequester
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.comms import Connection
from pylgbst.utilities import queue, str2hex
log = logging.getLogger('comms-gattlib')
@ -61,7 +61,7 @@ class GattLibConnection(Connection):
self.requester = None
self._iface = bt_iface_name
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
service = DiscoveryService(self._iface)
while not self.requester:
@ -70,8 +70,7 @@ class GattLibConnection(Connection):
log.debug("Devices: %s", devices)
for address, name in devices.items():
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
if self._is_device_matched(address, name, hub_mac, hub_name):
self.requester = Requester(address, True, self._iface)
break

View File

@ -2,7 +2,7 @@ import logging
import pygatt
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_CHAR
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
from pylgbst.utilities import str2hex
log = logging.getLogger('comms-pygatt')
@ -20,21 +20,21 @@ class GattoolConnection(Connection):
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
self._conn_hnd = None
def connect(self, hub_mac=None):
def connect(self, hub_mac=None, hub_name=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
adapter = self.backend()
adapter.start()
adapter.start() # enable or disable restart? What's the best?
while not self._conn_hnd:
log.info("Discovering devices...")
devices = adapter.scan(1)
log.debug("Devices: %s", devices)
# Pass each device found to _is_device_matched( ) to see if it the device we want
for dev in devices:
address = dev['address']
name = dev['name']
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
if self._is_device_matched(address, name, hub_mac, hub_name):
self._conn_hnd = adapter.connect(address)
break

View File

@ -4,8 +4,8 @@ import time
from pylgbst import get_connection_auto
from pylgbst.messages import *
from pylgbst.peripherals import *
from pylgbst.utilities import str2hex, usbyte, ushort
from pylgbst.utilities import queue
from pylgbst.utilities import str2hex, usbyte, ushort
log = logging.getLogger('hub')
@ -43,7 +43,7 @@ class Hub(object):
self.add_message_handler(MsgHubAction, self._handle_action)
if not connection:
connection = get_connection_auto()
connection = get_connection_auto() # TODO: how to identify the hub?
self.connection = connection
self.connection.set_notify_handler(self._notify)
self.connection.enable_notifications()
@ -61,19 +61,21 @@ class Hub(object):
:rtype: pylgbst.messages.UpstreamMsg
"""
log.debug("Send message: %r", msg)
self.connection.write(self.HUB_HARDWARE_HANDLE, msg.bytes())
msgbytes = msg.bytes()
if msg.needs_reply:
with self._sync_lock:
assert not self._sync_request, "Pending request %r while trying to put %r" % (self._sync_request, msg)
self._sync_request = msg
log.debug("Waiting for sync reply to %r...", msg)
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
resp = self._sync_replies.get()
log.debug("Fetched sync reply: %r", resp)
if isinstance(resp, MsgGenericError):
raise RuntimeError(resp.message())
return resp
else:
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
return None
def _notify(self, handle, data):
@ -135,7 +137,7 @@ class Hub(object):
if dev_type in PERIPHERAL_TYPES:
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
else:
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
self.peripherals[port] = Peripheral(self, port)
log.info("Attached peripheral: %s", self.peripherals[msg.port])
@ -182,19 +184,25 @@ class MoveHub(Hub):
:type motor_external: EncodedMotor
"""
DEFAULT_NAME = "LEGO Move Hub"
# PORTS
PORT_C = 0x01
PORT_D = 0x02
PORT_A = 0x00
PORT_B = 0x01
PORT_C = 0x02
PORT_D = 0x03
PORT_AB = 0x10
PORT_LED = 0x32
PORT_A = 0x37
PORT_B = 0x38
PORT_AB = 0x39
PORT_TILT_SENSOR = 0x3A
PORT_CURRENT = 0x3B
PORT_VOLTAGE = 0x3C
# noinspection PyTypeChecker
def __init__(self, connection=None):
self._comm_lock = threading.RLock()
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(MoveHub, self).__init__(connection)
self.info = {}
@ -219,7 +227,7 @@ class MoveHub(Hub):
if not get_dev_set:
get_dev_set = lambda: (self.motor_A, self.motor_B, self.motor_AB, self.led, self.tilt_sensor,
self.current, self.voltage)
for num in range(0, 60):
for num in range(0, 100):
devices = get_dev_set()
if all(devices):
log.debug("All devices are present: %s", devices)
@ -245,29 +253,40 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def _handle_device_change(self, msg):
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
with self._comm_lock:
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor \
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
class TrainHub(Hub):
DEFAULT_NAME = 'TrainHub'
def __init__(self, connection=None):
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(TrainHub, self).__init__(connection)

View File

@ -18,14 +18,15 @@ class Message(object):
see https://lego.github.io/lego-ble-wireless-protocol-docs/#common-message-header
"""
msglen = len(self.payload) + 3
assert msglen < 127, "TODO: handle logner messages with 2-byte len"
assert msglen < 127, "TODO: handle longer messages with 2-byte len"
return pack("<B", msglen) + pack("<B", self.hub_id) + pack("<B", self.TYPE) + self.payload
def __repr__(self):
# assert self.bytes() # to trigger any field changes
data = self.__dict__
data = {x: (str2hex(y) if isinstance(y, bytes) else y)
for x, y in data.items()
if x not in ('hub_id', 'needs_reply')}
if x not in ('hub_id',)}
return self.__class__.__name__ + "(%s)" % data
@ -125,7 +126,7 @@ class MsgHubProperties(DownstreamMsg, UpstreamMsg):
self.parameters = parameters
def bytes(self):
if self.operation == self.UPD_REQUEST:
if self.operation in (self.UPD_REQUEST, self.UPD_ENABLE):
self.needs_reply = True
self.payload = pack("<B", self.property) + pack("<B", self.operation) + self.parameters
return super(MsgHubProperties, self).bytes()
@ -172,7 +173,8 @@ class MsgHubAction(DownstreamMsg, UpstreamMsg):
return super(MsgHubAction, self).bytes()
def is_reply(self, msg):
assert isinstance(msg, MsgHubAction)
if not isinstance(msg, MsgHubAction):
raise TypeError("Unexpected message type: %s" % (msg.__class__,))
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
return True

View File

@ -19,7 +19,7 @@ COLOR_LIGHTBLUE = 0x04
COLOR_CYAN = 0x05
COLOR_GREEN = 0x06
COLOR_YELLOW = 0x07
COLOR_ORANGE = 0x09
COLOR_ORANGE = 0x08
COLOR_RED = 0x09
COLOR_WHITE = 0x0a
COLOR_NONE = 0xFF
@ -126,7 +126,7 @@ class Peripheral(object):
self.set_port_mode(self._port_mode.mode, False)
def _notify_subscribers(self, *args, **kwargs):
for subscriber in self._subscribers:
for subscriber in self._subscribers.copy():
subscriber(*args, **kwargs)
return args
@ -232,10 +232,16 @@ class LEDRGB(Peripheral):
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, payload)
self._send_output(msg)
def _decode_port_data(self, msg):
if len(msg.payload) == 3:
return usbyte(msg.payload, 0), usbyte(msg.payload, 1), usbyte(msg.payload, 2),
else:
return usbyte(msg.payload, 0),
class Motor(Peripheral):
SUBCMD_START_POWER = 0x01
# SUBCMD_START_POWER = 0x02
SUBCMD_START_POWER_GROUPED = 0x02
SUBCMD_SET_ACC_TIME = 0x05
SUBCMD_SET_DEC_TIME = 0x06
SUBCMD_START_SPEED = 0x07
@ -248,10 +254,10 @@ class Motor(Peripheral):
END_STATE_FLOAT = 0
def _speed_abs(self, relative):
if relative is None:
if relative == Motor.END_STATE_BRAKE or relative == Motor.END_STATE_HOLD:
# special value for BRAKE
# https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
return 127
return relative
if relative < -1:
log.warning("Speed cannot be less than -1")
@ -265,9 +271,6 @@ class Motor(Peripheral):
return int(absolute)
def _write_direct_mode(self, subcmd, params):
if self.virtual_ports:
subcmd += 1 # de-facto rule
params = pack("<B", subcmd) + params
msg = MsgPortOutput(self.port, MsgPortOutput.WRITE_DIRECT_MODE_DATA, params)
self._send_output(msg)
@ -279,22 +282,27 @@ class Motor(Peripheral):
msg = MsgPortOutput(self.port, subcmd, params)
self._send_output(msg)
def start_power(self, speed_primary=1.0, speed_secondary=None):
def start_power(self, power_primary=1.0, power_secondary=None):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startpower-power
"""
if speed_secondary is None:
speed_secondary = speed_primary
if power_secondary is None:
power_secondary = power_primary
if self.virtual_ports:
cmd = self.SUBCMD_START_POWER_GROUPED - 1 # because _send_cmd will do +1
else:
cmd = self.SUBCMD_START_POWER
params = b""
params += pack("<b", self._speed_abs(speed_primary))
params += pack("<b", self._speed_abs(power_primary))
if self.virtual_ports:
params += pack("<b", self._speed_abs(speed_secondary))
params += pack("<b", self._speed_abs(power_secondary))
self._write_direct_mode(self.SUBCMD_START_POWER, params)
self._send_cmd(cmd, params)
def stop(self):
self.start_speed(0)
self.timed(0)
def set_acc_profile(self, seconds, profile_no=0x00):
"""
@ -397,20 +405,20 @@ class EncodedMotor(Motor):
def goto_position(self, degrees_primary, degrees_secondary=None, speed=1.0, max_power=1.0,
end_state=Motor.END_STATE_BRAKE, use_profile=0b11):
"""
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-startspeedfordegrees-degrees-speed-maxpower-endstate-useprofile-0x0b
https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#output-sub-command-gotoabsoluteposition-abspos-speed-maxpower-endstate-useprofile-0x0d
"""
if degrees_secondary is None:
degrees_secondary = degrees_primary
params = b""
params += pack("<I", degrees_primary)
params += pack("<i", degrees_primary)
if self.virtual_ports:
params += pack("<I", degrees_secondary)
params += pack("<i", degrees_secondary)
params += pack("<b", self._speed_abs(speed))
params += pack("<B", end_state)
params += pack("<B", int(100 * max_power))
params += pack("<B", end_state)
params += pack("<B", use_profile)
self._send_cmd(self.SUBCMD_GOTO_ABSOLUTE_POSITION, params)
@ -420,11 +428,8 @@ class EncodedMotor(Motor):
if self._port_mode.mode == self.SENSOR_ANGLE:
angle = unpack("<l", data[0:4])[0]
return (angle,)
elif self._port_mode.mode == self.SENSOR_SOMETHING1:
smth = usbyte(data, 0)
return (smth,)
elif self._port_mode.mode == self.SENSOR_SPEED:
speed = unpack("<b", data[0])[0]
speed = unpack("<b", data[0:1])[0]
return (speed,)
else:
log.debug("Got motor sensor data while in unexpected mode: %r", self._port_mode)
@ -583,7 +588,7 @@ class VisionSensor(Peripheral):
elif self._port_mode.mode == self.CALIBRATE:
return [ushort(data, x * 2) for x in range(8)]
else:
log.debug("Unhandled data in mode %s: %s", self._port_mode.mode, str2hex(data))
log.debug("Unhandled VisionSensor data in mode %s: %s", self._port_mode.mode, str2hex(data))
return ()
def set_color(self, color):
@ -663,4 +668,4 @@ class Button(Peripheral):
:type msg: MsgHubProperties
"""
if msg.property == MsgHubProperties.BUTTON and msg.operation == MsgHubProperties.UPSTREAM_UPDATE:
self._notify_subscribers(bool(usbyte(msg.parameters, 0)))
self._notify_subscribers(usbyte(msg.parameters, 0))

View File

@ -7,6 +7,8 @@ import logging
import sys
from struct import unpack
log = logging.getLogger(__name__)
if sys.version_info[0] == 2:
import Queue as queue
else:
@ -15,22 +17,29 @@ else:
queue = queue # just to use it
def check_unpack(seq, index, pattern, size):
"""Check that we got size bytes, if so, unpack using pattern"""
data = seq[index: index + size]
assert len(data) == size, "Unexpected data len %d, expected %d" % (len(data), size)
return unpack(pattern, data)[0]
def usbyte(seq, index):
return unpack("<B", seq[index:index + 1])[0]
return check_unpack(seq, index, "<B", 1)
def ushort(seq, index):
return unpack("<H", seq[index:index + 2])[0]
return check_unpack(seq, index, "<H", 2)
def usint(seq, index):
return unpack("<I", seq[index:index + 4])[0]
return check_unpack(seq, index, "<I", 4)
def str2hex(data): # we need it for python 2+3 compatibility
# if sys.version_info[0] == 3:
# data = bytes(data, 'ascii')
if not isinstance(data, (bytes, bytearray)):
data = bytes(data, 'ascii')
data = bytes(data, "ascii")
hexed = binascii.hexlify(data)
return hexed

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[metadata]
description-file = README.md

View File

@ -1,16 +1,24 @@
from distutils.core import setup
from setuptools import setup
setup(name='pylgbst',
description='Python library to interact with LEGO Move Hub (from Lego BOOST set)',
version='1.0.1',
author='Andrey Pokhilko',
author_email='apc4@ya.ru',
packages=['pylgbst', "pylgbst.comms"],
requires=[],
extras_require={
'gatt': ["gatt"],
'gattlib': ["gattlib"],
'pygatt': ["pygatt"],
'bluepy': ["bluepy"],
}
)
setup(
name="pylgbst",
version="1.2.0",
author="Andrey Pokhilko",
author_email="apc4@ya.ru",
license="MIT",
description="Python library to interact with LEGO PoweredUp devices (Lego BOOST etc.)",
url='https://github.com/undera/pylgbst',
keywords=['LEGO', 'ROBOTICS', 'BLUETOOTH'],
packages=["pylgbst", "pylgbst.comms"],
requires=[],
extras_require={
# Note that dbus and gi are normally system packages
"gatt": ["gatt", "dbus", "gi"],
"gattlib": ["gattlib"],
"pygatt": ["pygatt", "pexpect"],
"bluepy": ["bluepy"],
"bleak": ["bleak"],
},
)

View File

@ -1,3 +1,4 @@
import sys
import time
from binascii import unhexlify
@ -5,7 +6,7 @@ from pylgbst.comms import Connection
from pylgbst.hub import MoveHub, Hub
from pylgbst.peripherals import *
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.INFO)
log = logging.getLogger('test')

View File

@ -622,7 +622,6 @@
]
]
},
"LEDRGB on port 0x32": {
"mode_count": 2,
"input_modes": [],
@ -694,7 +693,6 @@
"can_input": false
}
},
"ColorDistanceSensor on port 0x1": {
"mode_count": 11,
"input_modes": [
@ -1010,7 +1008,6 @@
]
]
},
"TiltSensor on port 0x3a": {
"mode_count": 8,
"input_modes": [
@ -1266,7 +1263,6 @@
]
]
},
"Current on port 0x3b": {
"mode_count": 2,
"input_modes": [

61
tests/test_cbleak.py Normal file
View File

@ -0,0 +1,61 @@
import sys
import time
import unittest
import bleak
from packaging import version
import pylgbst
import pylgbst.comms.cbleak as cbleak
bleak.BleakClient = object()
bleak.discover = object()
last_response = None
lt37 = version.parse(sys.version.split(' ')[0]) < version.parse("3.7")
class BleakDriverTest(unittest.TestCase):
def test_driver_creation(self):
connection = pylgbst.get_connection_bleak()
self.assertIsInstance(connection, cbleak.BleakDriver)
self.assertFalse(connection.is_alive(), 'Checking that factory returns not started driver')
@unittest.skipIf(lt37, "Python version is too low")
def test_communication(self):
driver = cbleak.BleakDriver()
async def fake_thread():
print('Fake thread initialized')
while not driver._abort:
time.sleep(0.1)
if cbleak.req_queue.qsize() != 0:
print('Received data, sending back')
data = cbleak.req_queue.get()
cbleak.resp_queue.put(data)
driver._bleak_thread = fake_thread
driver.set_notify_handler(BleakDriverTest.validation_handler)
driver.enable_notifications()
time.sleep(0.5) # time for driver initialization
self.assertTrue(driver.is_alive(), 'Checking that driver starts')
handle = 0x32
data = [0xD, 0xE, 0xA, 0xD, 0xB, 0xE, 0xE, 0xF]
driver.write(handle, data)
time.sleep(0.5) # processing time
self.assertEqual(handle, last_response[0], 'Verifying response handle')
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
driver.disconnect()
time.sleep(0.5) # processing time
self.assertFalse(driver.is_alive())
@staticmethod
def validation_handler(handle, data):
global last_response
last_response = (handle, data)
if __name__ == '__main__':
unittest.main()

40
tests/test_comms.py Normal file
View File

@ -0,0 +1,40 @@
import unittest
from pylgbst.comms import *
class ConnectionTestCase(unittest.TestCase):
def test_is_device_matched(self):
conn = Connection()
hub_address = '1a:2A:3A:4A:5A:6A'
other_address = 'A1:a2:a3:a4:a5:a6'
zero_address = '00:00:00:00:00:00'
hub_name = 'LEGO Move Hub'
other_name = 'HRM'
test_matrix = [
# address, name, hub_mac, expected
(hub_address, hub_name, hub_address, None, True),
(hub_address, hub_name, None, hub_name, True),
(hub_address, None, hub_address, None, True),
(hub_address, None, None, hub_name, False),
(hub_address, other_name, hub_address, None, True),
(hub_address, other_name, None, hub_name, False),
(other_address, hub_name, hub_address, None, False),
(other_address, hub_name, None, hub_name, True),
(other_address, None, hub_address, None, False),
(other_address, None, None, hub_name, False),
(other_address, other_name, hub_address, None, False),
(other_address, other_name, None, hub_name, False),
(zero_address, hub_name, hub_address, None, False),
(zero_address, hub_name, None, hub_name, False),
(zero_address, None, hub_address, None, False),
(zero_address, None, None, hub_name, False),
(zero_address, other_name, hub_address, None, False),
(zero_address, other_name, None, hub_name, False),
]
for address, name, hub_mac, fname, expected in test_matrix:
matched = conn._is_device_matched(address=address, dev_name=name, hub_mac=hub_mac, find_name=fname)
self.assertEqual(matched, expected)

View File

@ -122,20 +122,20 @@ class HubTest(unittest.TestCase):
class MoveHubTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append('0f00 04 01 0125000000001000000010')
conn.notifications.append('0f00 04 02 0126000000001000000010')
conn.notifications.append('0f00 04 37 0127000100000001000000')
conn.notifications.append('0f00 04 38 0127000100000001000000')
conn.notifications.append('0900 04 39 0227003738')
conn.notifications.append('0f00 04 02 0125000000001000000010')
conn.notifications.append('0f00 04 03 0126000000001000000010')
conn.notifications.append('0f00 04 00 0127000100000001000000')
conn.notifications.append('0f00 04 01 0127000100000001000000')
conn.notifications.append('0900 04 10 0227003738')
conn.notifications.append('0f00 04 32 0117000100000001000000')
conn.notifications.append('0f00 04 3a 0128000000000100000001')
conn.notifications.append('0f00 04 3b 0115000200000002000000')
conn.notifications.append('0f00 04 3c 0114000200000002000000')
conn.notification_delayed('12000101064c45474f204d6f766520487562', 0.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 0.3)
conn.notification_delayed('060001060600', 0.5)
conn.notification_delayed('0600030104ff', 0.7)
conn.notification_delayed('12000101064c45474f204d6f766520487562', 1.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 1.3)
conn.notification_delayed('060001060600', 1.5)
conn.notification_delayed('0600030104ff', 1.7)
MoveHub(conn.connect())
time.sleep(1)
conn.wait_notifications_handled()

View File

@ -21,6 +21,7 @@ class PeripheralsTest(unittest.TestCase):
def callback(pressed):
vals.append(pressed)
hub.connection.notification_delayed("060001020600", 0.0)
button.subscribe(callback)
time.sleep(0.1)
@ -33,7 +34,7 @@ class PeripheralsTest(unittest.TestCase):
time.sleep(0.1)
hub.connection.wait_notifications_handled()
self.assertEqual([False, True, False], vals)
self.assertEqual([0, 1, 0], vals)
self.assertEqual(b"0500010202", hub.writes[1][1])
self.assertEqual(b"0500010203", hub.writes[2][1])
@ -148,49 +149,49 @@ class PeripheralsTest(unittest.TestCase):
motor = EncodedMotor(hub, MoveHub.PORT_D)
hub.peripherals[MoveHub.PORT_D] = motor
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.start_power(1.0)
self.assertEqual(b"0800810211510164", hub.writes.pop(1)[1])
self.assertEqual(b"07008103110164", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.set_acc_profile(1.0)
self.assertEqual(b"090081021105e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081031105e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.set_dec_profile(1.0)
self.assertEqual(b"090081021106e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081031106e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.start_speed(1.0)
self.assertEqual(b"090081021107646403", hub.writes.pop(1)[1])
self.assertEqual(b"090081031107646403", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109000064647f03", hub.writes.pop(1)[1])
logging.debug("\n\n")
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.timed(1.0)
self.assertEqual(b"0c0081021109e80364647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109e80364647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.angled(180)
self.assertEqual(b"0e008102110bb400000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0e008103110bb400000064647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('050082030a', 0.2)
motor.preset_encoder(-180)
self.assertEqual(b"0b0081021151024cffffff", hub.writes.pop(1)[1])
self.assertEqual(b"0b0081031151024cffffff", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.goto_position(0)
self.assertEqual(b"0e008102110d00000000647f6403", hub.writes.pop(1)[1])
self.assertEqual(b"0e008103110d0000000064647f03", hub.writes.pop(1)[1])
hub.connection.wait_notifications_handled()
@ -204,15 +205,15 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a004701020100000001', 0.1)
hub.connection.notification_delayed('0a004702020100000001', 0.1)
motor.subscribe(callback)
hub.connection.notification_delayed("0800450100000000", 0.1)
hub.connection.notification_delayed("08004501ffffffff", 0.2)
hub.connection.notification_delayed("08004501feffffff", 0.3)
hub.connection.notification_delayed("0800450200000000", 0.1)
hub.connection.notification_delayed("08004502ffffffff", 0.2)
hub.connection.notification_delayed("08004502feffffff", 0.3)
time.sleep(0.4)
hub.connection.notification_delayed('0a004701020000000000', 0.1)
hub.connection.notification_delayed('0a004702020000000000', 0.1)
motor.unsubscribe(callback)
hub.connection.wait_notifications_handled()
@ -228,13 +229,13 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a00 4701080100000001', 0.1)
hub.connection.notification_delayed('0a00 4702080100000001', 0.1)
cds.subscribe(callback)
hub.connection.notification_delayed("08004501ff0aff00", 0.1)
hub.connection.notification_delayed("08004502ff0aff00", 0.1)
time.sleep(0.2)
hub.connection.notification_delayed('0a00 4701090100000001', 0.1)
hub.connection.notification_delayed('0a00 4702090100000001', 0.1)
cds.unsubscribe(callback)
hub.connection.wait_notifications_handled()