1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Merge branch 'master' of github.com:undera/pylgbst

This commit is contained in:
Andrey Pohilko 2018-07-18 13:59:16 +03:00
commit 7bfe700a75
27 changed files with 835 additions and 423 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
.idea
*.iml
*.pyc
*.pyc
build

View File

@ -1,14 +1,14 @@
sudo: false
language: python
virtualenv:
system_site_packages: true
matrix:
include:
- os: linux
python: 2.7
- os: linux
python: 3.5
- os: linux
python: 3.6
python: 3.4
addons:
apt:
@ -16,10 +16,22 @@ addons:
- libboost-python-dev
- libboost-thread-dev
- libbluetooth-dev
- libglib2.0-dev
- libdbus-1-dev
- libdbus-glib-1-dev
- libgirepository-1.0-1
- python-dbus
- python-gi
- python3-dbus
- python3-gi
install:
- pip install codecov gattlib
script: coverage run --source=. `which nosetests` . --nocapture
- pip install codecov nose-exclude gattlib pygatt gatt pexpect
script: coverage run --source=. `which nosetests` tests --nocapture --exclude-dir=examples
after_success:
- coverage report -m
- codecov

View File

@ -1,4 +1,4 @@
# Python library to interact with Move Hub
from pylgbst.comms_gatt import GattConnection# Python library to interact with Move Hub
_Move Hub is central controller block of [LEGO® Boost Robotics Set](https://www.lego.com/en-us/boost)._
@ -6,7 +6,7 @@ In fact, Move Hub is just Bluetooth hardware, all manipulations are done with co
Best way to start is to look into [`demo.py`](examples/demo.py) file, and run it (assuming you have installed library).
If you have Vernie assembled, you might run scripts from [`vernie`](examples/vernie/) directory.
If you have Vernie assembled, you might run scripts from [`examples/vernie`](examples/vernie/) directory.
Demonstrational videos:
@ -29,17 +29,17 @@ Demonstrational videos:
## Usage
_Please note that it requires [gattlib](https://bitbucket.org/OscarAcena/pygattlib) to be installed, which is not supported on Windows._
_Please note that this library requires one of Bluetooth backend libraries to be installed, please read section [here](#general-notes) for details_
Install library like this:
```bash
pip install https://github.com/undera/pylgbst/archive/0.5.tar.gz
pip install https://github.com/undera/pylgbst/archive/0.6.tar.gz
```
Then instantiate MoveHub object and start invoking its methods. Following is example to just print peripherals detected on Hub:
```python
from pylgbst import MoveHub
from pylgbst.movehub import MoveHub
hub = MoveHub()
@ -67,7 +67,7 @@ All these methods are synchronous by default, means method does not return until
An example:
```python
from pylgbst import MoveHub
from pylgbst.movehub import MoveHub
import time
hub = MoveHub()
@ -92,7 +92,7 @@ hub.motor_external.stop()
Any motor allows to subscribe to its rotation sensor. Two sensor modes are available: rotation angle (`EncodedMotor.SENSOR_ANGLE`) and rotation speed (`EncodedMotor.SENSOR_SPEED`). Example:
```python
from pylgbst import MoveHub, EncodedMotor
from pylgbst.movehub import MoveHub, EncodedMotor
import time
def callback(angle):
@ -112,7 +112,7 @@ MoveHub's internal tilt sensor is available through `tilt_sensor` field. There a
An example:
```python
from pylgbst import MoveHub, TiltSensor
from pylgbst.movehub import MoveHub, TiltSensor
import time
def callback(pitch, roll, yaw):
@ -159,7 +159,7 @@ Distance works in range of 0-10 inches, with ability to measure last inch in hig
Simple example of subscribing to sensor:
```python
from pylgbst import MoveHub, ColorDistanceSensor
from pylgbst.movehub import MoveHub, ColorDistanceSensor
import time
def callback(clr, distance):
@ -194,7 +194,7 @@ You can obtain colors are present as constants `COLOR_*` and also a map of avail
Additionally, you can subscribe to LED color change events, using callback function as shown in example below.
```python
from pylgbst import MoveHub, COLORS, COLOR_NONE, COLOR_RED
from pylgbst.movehub import MoveHub, COLORS, COLOR_NONE, COLOR_RED
import time
def callback(clr):
@ -221,7 +221,7 @@ Tip: blinking orange color of LED means battery is low.
Note that `Button` class is not real `Peripheral`, as it has no port and not listed in `devices` field of Hub. Still, subscribing to button is done usual way:
```python
from pylgbst import MoveHub
from pylgbst.movehub import MoveHub
def callback(is_pressed):
print("Btn pressed: %s" % is_pressed)
@ -235,7 +235,7 @@ hub.button.subscribe(callback)
`MoveHub` class has field `voltage` to subscribe to battery voltage status. Callback accepts single parameter with current value. The range of values is float between `0` and `1.0`. Every time data is received, value is also written into `last_value` field of `Voltage` object. Values less than `0.2` are known as lowest values, when unit turns off.
```python
from pylgbst import MoveHub
from pylgbst.movehub import MoveHub
import time
def callback(value):
@ -249,19 +249,56 @@ print ("Value: " % hub.voltage.last_value)
### General Notes
#### Bluetooth Connection
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it uses instance of `BLEConnection` to connect directly to Move Hub. You can specify instance of `DebugServerConnection` if you are using Debug Server (more details below).
#### Bluetooth Backend Prerequisites
If you want to specify name for Bluetooth interface to use on local computer, create instance of `BLEConnection` and call `connect(if_name)` method of connection. Then pass it to `MoveHub` constructor. Like this:
You have following options to install as Bluetooth backend:
- `pip install pygatt` - [pygatt](https://github.com/peplin/pygatt) lib, works on both Windows and Linux
- `pip install gatt` - [gatt](https://github.com/getsenic/gatt-python) lib, supports Linux, does not work on Windows
- `pip install gattlib` - [gattlib](https://bitbucket.org/OscarAcena/pygattlib) - supports Linux, does not work on Windows, requires `sudo`
_Please let author know if you have discovered any compatibility/preprequisite details, so we will update this section to help future users_
Depending on backend type, you might need Linux `sudo` to be used when running Python.
#### Bluetooth Connection Options
There is optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use GattTool Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattTool Backend on Linux (`gattlib` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and MoveHub mac address. Please look function source code for details.
If you want to specify name for Bluetooth interface to use on local computer, you can passthat to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst import BLEConnection, MoveHub
from pylgbst.movehub import MoveHub
from pylgbst.comms_gatt import GattConnection
conn = BLEConnection()
conn.connect("hci1")
conn = GattConnection("hci1")
conn.connect() # you can pass MoveHub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
hub = MoveHub(conn)
```
#### Use Disconnect in `finally`
It is recommended to make sure `disconnect()` method is called on connection object after you have finished your program. This ensures Bluetooth subsystem is cleared and avoids problems for subsequent re-connects of MoveHub. The best way to do that in Python is to use `try ... finally` clause:
```python
from pylgbst import get_connection_auto
from pylgbst.movehub import MoveHub
conn=get_connection_auto() # ! don't put this into `try` block
try:
hub = MoveHub(conn)
finally:
conn.disconnect()
```
#### Devices Detecting
As part of instantiating process, `MoveHub` waits up to 1 minute for all builtin devices to appear, such as motors on ports A and B, tilt sensor, button and battery. This not guarantees that external motor and/or color sensor will be present right after `MoveHub` instantiated. Usually, sleeping for couple of seconds gives it enough time to detect everything.
@ -274,17 +311,15 @@ It is possible to subscribe with multiple times for the same sensor. Only one, v
Good practice for any program is to unsubscribe from all sensor subscriptions before ending, especially when used with `DebugServer`.
## Debug Server
Running debug server opens permanent BLE connection to Hub and listening on TCP port for communications. This avoids the need to re-start Hub all the time.
There is `DebugServerConnection` class that you can use with it, instead of `BLEConnection`.
Starting debug server is done like this:
Starting debug server is done like this (you may need to run it with `sudo`, depending on your BLE backend):
```bash
sudo python -c "from pylgbst.comms import *; \
import logging; logging.basicConfig(level=logging.DEBUG); \
DebugServer(BLEConnection().connect()).start()"
python -c "import logging; logging.basicConfig(level=logging.DEBUG); \
import pylgbst; pylgbst.start_debug_server()"
```
Then push green button on MoveHub, so permanent BLE connection will be established.

View File

@ -1,9 +1,11 @@
# coding=utf-8
import time
from time import sleep
from pylgbst import *
from pylgbst.comms import DebugServerConnection
from pylgbst.movehub import MoveHub
from pylgbst.movehub import MoveHub, COLORS, COLOR_BLACK
from pylgbst.peripherals import EncodedMotor, TiltSensor, Amperage, Voltage
log = logging.getLogger("demo")
@ -185,9 +187,12 @@ if __name__ == '__main__':
try:
connection = DebugServerConnection()
except BaseException:
logging.warning("Failed to use debug server: %s", traceback.format_exc())
connection = BLEConnection().connect()
logging.debug("Failed to use debug server: %s", traceback.format_exc())
connection = get_connection_auto()
hub = MoveHub(connection)
sleep(10000)
#demo_all(hub)
try:
hub = MoveHub(connection)
sleep(1)
# demo_all(hub)
finally:
connection.disconnect()

View File

@ -1,10 +1,9 @@
import logging
import traceback
import time
from pylgbst import MoveHub
from pylgbst.comms import DebugServerConnection, BLEConnection
from pylgbst import get_connection_auto
from pylgbst.comms import DebugServerConnection
from pylgbst.movehub import MoveHub
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
@ -13,13 +12,13 @@ if __name__ == '__main__':
conn = DebugServerConnection()
except BaseException:
logging.warning("Failed to use debug server: %s", traceback.format_exc())
conn = BLEConnection().connect()
conn = get_connection_auto()
hub = MoveHub(conn)
try:
hub.motor_AB.constant(0.45, 0.45)
hub.motor_external.angled(12590, 0.1)
#time.sleep(180)
# time.sleep(180)
finally:
hub.motor_AB.stop()
if hub.motor_external:

View File

@ -2,7 +2,8 @@ import logging
import math
import time
from pylgbst import ColorDistanceSensor, COLORS, COLOR_RED, COLOR_CYAN
from pylgbst.constants import COLOR_RED, COLOR_CYAN, COLORS
from pylgbst.peripherals import ColorDistanceSensor
class Plotter(object):

View File

@ -6,8 +6,9 @@ import traceback
import six
from examples.plotter import Plotter
from pylgbst import EncodedMotor, PORT_AB, PORT_C, PORT_A, PORT_B, MoveHub
from pylgbst.comms import DebugServerConnection, BLEConnection
from pylgbst import get_connection_auto
from pylgbst.comms import DebugServerConnection
from pylgbst.movehub import EncodedMotor, PORT_AB, PORT_C, PORT_A, PORT_B, MoveHub
from tests import HubMock
@ -223,7 +224,7 @@ if __name__ == '__main__':
conn = DebugServerConnection()
except BaseException:
logging.warning("Failed to use debug server: %s", traceback.format_exc())
conn = BLEConnection().connect()
conn = get_connection_auto()
hub = MoveHub(conn) if 1 else get_hub_mock()

View File

@ -1,8 +1,10 @@
import logging
import traceback
from pylgbst import MoveHub, COLORS, COLOR_RED, COLOR_YELLOW, COLOR_CYAN, COLOR_BLUE, COLOR_BLACK
from pylgbst.comms import DebugServerConnection, BLEConnection
from pylgbst import get_connection_auto
from pylgbst.comms import DebugServerConnection
from pylgbst.constants import COLORS, COLOR_YELLOW, COLOR_BLUE, COLOR_CYAN, COLOR_RED, COLOR_BLACK
from pylgbst.movehub import MoveHub
class ColorSorter(MoveHub):
@ -82,7 +84,7 @@ if __name__ == '__main__':
conn = DebugServerConnection()
except BaseException:
logging.warning("Failed to use debug server: %s", traceback.format_exc())
conn = BLEConnection().connect()
conn = get_connection_auto()
sorter = ColorSorter(conn)
empty = 0

View File

@ -3,9 +3,11 @@ import hashlib
import os
import re
import subprocess
import time
from pylgbst import *
from pylgbst.comms import DebugServerConnection
from pylgbst.movehub import MoveHub
try:
import gtts
@ -41,7 +43,8 @@ SPEECH_LANG_MAP = {
"commands help": "Available commands are: "
"forward, backward, turn left, turn right, "
"head left, head right, head straight, shot and say",
"finished": "Thank you! Robot is now turning off"
"finished": "Thank you! Robot is now turning off",
"text is empty": "Please, enter not empty text to say!"
},
"ru": {
"ready": "Робот Веернии готов к работе",
@ -49,9 +52,10 @@ SPEECH_LANG_MAP = {
"ok": "хорошо",
"commands help": "Доступные команды это: вперёд, назад, влево, вправо, "
"голову влево, голову вправо, голову прямо, выстрел, скажи",
"Finished": "Робот завершает работу. Спасибо!",
"finished": "Робот завершает работу. Спасибо!",
"commands from file": "Исполняю команды из файла",
"fire": "Выстрел!",
"text is empty": "Пожалуйста, наберите не пустой текст!"
}
}
@ -64,8 +68,8 @@ class Vernie(MoveHub):
try:
conn = DebugServerConnection()
except BaseException:
logging.debug("Failed to use debug server: %s", traceback.format_exc())
conn = BLEConnection().connect()
logging.warning("Failed to use debug server: %s", traceback.format_exc())
conn = get_connection_auto()
super(Vernie, self).__init__(conn)
self.language = language
@ -132,6 +136,9 @@ class Vernie(MoveHub):
confirm(cmd)
self.head(STRAIGHT)
elif cmd[0] in ("say", "скажи", "сказать"):
if not cmd[1:]:
self.say("text is empty")
return
say(' '.join(cmd[1:]))
elif cmd[0] in ("fire", "shot", "огонь", "выстрел"):
say("fire")

View File

@ -1,3 +1,4 @@
from pylgbst.peripherals import ColorDistanceSensor
from . import *
logging.basicConfig(level=logging.INFO)

View File

@ -1,3 +1,4 @@
from pylgbst.constants import COLOR_GREEN, COLOR_NONE
from . import *
robot = Vernie()

View File

@ -1,3 +1,5 @@
import sys
from . import *
robot = Vernie()
@ -9,7 +11,7 @@ def confirmation(command):
robot.say(command[0])
with open("vernie.commands") as fhd:
with open(os.path.join(os.path.dirname(__file__), "vernie.commands")) as fhd:
for cmd in fhd.readlines():
sys.stdout.write("%s" % cmd)
robot.interpret_command(cmd, confirmation)

View File

@ -1,2 +1,62 @@
from pylgbst.movehub import *
from pylgbst.peripherals import *
import logging
import traceback
from pylgbst.comms import DebugServer
log = logging.getLogger('pylgbst')
def get_connection_bluegiga(mac):
from pylgbst.comms_pygatt import BlueGigaConnection
return BlueGigaConnection().connect(mac)
def get_connection_gattool(controller='hci0', hub_mac=None):
from pylgbst.comms_pygatt import GattoolConnection
return GattoolConnection(controller).connect(hub_mac)
def get_connection_gatt(controller='hci0', hub_mac=None):
from pylgbst.comms_gatt import GattConnection
return GattConnection(controller).connect(hub_mac)
def get_connection_gattlib(controller='hci0', hub_mac=None):
from pylgbst.comms_gattlib import GattLibConnection
return GattLibConnection(controller).connect(hub_mac)
def get_connection_auto(controller='hci0', hub_mac=None):
conn = None
try:
return get_connection_bluegiga(hub_mac)
except BaseException:
logging.debug("Failed: %s", traceback.format_exc())
try:
conn = get_connection_gatt(controller, hub_mac)
except BaseException:
logging.debug("Failed: %s", traceback.format_exc())
try:
conn = get_connection_gattool(controller, hub_mac)
except BaseException:
logging.debug("Failed: %s", traceback.format_exc())
try:
conn = get_connection_gattlib(controller, hub_mac)
except BaseException:
logging.debug("Failed: %s", traceback.format_exc())
if conn is None:
raise Exception("Failed to autodetect connection, make sure you have installed prerequisites")
return conn
def start_debug_server(iface="hci0", port=9090):
server = DebugServer(get_connection_auto(iface))
server.start(port)

View File

@ -8,54 +8,23 @@ import socket
import traceback
from abc import abstractmethod
from binascii import unhexlify
from gattlib import DiscoveryService, GATTRequester
from threading import Thread
from pylgbst.constants import MSG_DEVICE_SHUTDOWN, queue, str2hex
from pylgbst.constants import MSG_DEVICE_SHUTDOWN, ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE
from pylgbst.utilities import str2hex
log = logging.getLogger('comms')
LEGO_MOVE_HUB = "LEGO Move Hub"
# noinspection PyMethodOverriding
class Requester(GATTRequester):
"""
Wrapper to access `on_notification` capability of GATT
Set "notification_sink" field to a callable that will handle incoming data
"""
def __init__(self, p_object, *args, **kwargs):
super(Requester, self).__init__(p_object, *args, **kwargs)
self.notification_sink = None
self._notify_queue = queue.Queue() # this queue is to minimize time spent in gattlib C code
thr = Thread(target=self._dispatch_notifications)
thr.setDaemon(True)
thr.setName("Notify queue dispatcher")
thr.start()
def on_notification(self, handle, data):
# log.debug("requester notified, sink: %s", self.notification_sink)
self._notify_queue.put((handle, data))
def on_indication(self, handle, data):
log.debug("Indication on handle %s: %s", handle, str2hex(data))
def _dispatch_notifications(self):
while True:
handle, data = self._notify_queue.get()
if self.notification_sink:
try:
self.notification_sink(handle, data)
except BaseException:
log.warning("Data was: %s", str2hex(data))
log.warning("Failed to dispatch notification: %s", traceback.format_exc())
else:
log.warning("Dropped notification %s: %s", handle, str2hex(data))
class Connection(object):
def connect(self, hub_mac=None):
pass
def disconnect(self):
pass
@abstractmethod
def write(self, handle, data):
pass
@ -64,48 +33,8 @@ class Connection(object):
def set_notify_handler(self, handler):
pass
class BLEConnection(Connection):
"""
Main transport class, uses real Bluetooth LE connection.
Loops with timeout of 1 seconds to find device named "Lego MOVE Hub"
:type requester: Requester
"""
def __init__(self):
super(BLEConnection, self).__init__()
self.requester = None
def connect(self, bt_iface_name='hci0', hub_mac=None):
service = DiscoveryService(bt_iface_name)
while not self.requester:
log.info("Discovering devices using %s...", bt_iface_name)
devices = service.discover(1)
log.debug("Devices: %s", devices)
for address, name in devices.items():
if name == LEGO_MOVE_HUB or hub_mac == address:
logging.info("Found %s at %s", name, address)
self.requester = Requester(address, True, bt_iface_name)
break
if self.requester:
break
return self
def set_notify_handler(self, handler):
if self.requester:
log.debug("Setting notification handler: %s", handler)
self.requester.notification_sink = handler
else:
raise RuntimeError("No requester available")
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, str2hex(data))
return self.requester.write_by_handle(handle, data)
def enable_notifications(self):
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
class DebugServer(object):
@ -114,13 +43,13 @@ class DebugServer(object):
It holds BLE connection to Move Hub, so no need to re-start it every time
Usage: DebugServer(BLEConnection().connect()).start()
:type ble: BLEConnection
:type connection: BLEConnection
"""
def __init__(self, ble_trans):
def __init__(self, connection):
self._running = False
self.sock = socket.socket()
self.ble = ble_trans
self.connection = connection
def start(self, port=9090):
self.sock.bind(('', port))
@ -128,11 +57,11 @@ class DebugServer(object):
self._running = True
while self._running:
log.info("Accepting connections at %s", port)
log.info("Accepting MoveHub debug connections at %s", port)
conn, addr = self.sock.accept()
if not self._running:
raise KeyboardInterrupt("Shutdown")
self.ble.requester.notification_sink = lambda x, y: self._notify(conn, x, y)
self.connection.requester.notification_sink = lambda x, y: self._notify(conn, x, y)
try:
self._handle_conn(conn)
except KeyboardInterrupt:
@ -140,7 +69,7 @@ class DebugServer(object):
except BaseException:
log.error("Problem handling incoming connection: %s", traceback.format_exc())
finally:
self.ble.requester.notification_sink = self._notify_dummy
self.connection.requester.notification_sink = self._notify_dummy
conn.close()
def __del__(self):
@ -195,7 +124,7 @@ class DebugServer(object):
def _handle_cmd(self, cmd):
if cmd['type'] == 'write':
self.ble.write(cmd['handle'], unhexlify(cmd['data']))
self.connection.write(cmd['handle'], unhexlify(cmd['data']))
else:
raise ValueError("Unhandled cmd: %s", cmd)
@ -259,10 +188,3 @@ class DebugServerConnection(Connection):
def set_notify_handler(self, handler):
self.notify_handler = handler
def start_debug_server(iface="hci0", port=9090):
ble = BLEConnection()
ble.connect(iface)
server = DebugServer(ble)
server.start(port)

120
pylgbst/comms_gatt.py Normal file
View File

@ -0,0 +1,120 @@
import logging
import re
import threading
from time import sleep
import gatt
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.constants import MOVE_HUB_HW_UUID_SERV, MOVE_HUB_HW_UUID_CHAR, MOVE_HUB_HARDWARE_HANDLE
from pylgbst.utilities import str2hex
log = logging.getLogger('comms-gatt')
class CustomDevice(gatt.Device, object):
def __init__(self, mac_address, manager):
gatt.Device.__init__(self, mac_address=mac_address, manager=manager)
self._notify_callback = lambda hnd, val: None
self._handle = None
def connect(self):
gatt.Device.connect(self)
log.info("Waiting for device connection...")
while self._handle is None:
log.debug("Sleeping...")
sleep(1)
if isinstance(self._handle, BaseException):
exc = self._handle
self._handle = None
raise exc
def write(self, data):
log.debug("Writing to handle: %s", str2hex(data))
return self._handle.write_value(data)
def enable_notifications(self):
log.debug('Enable Notifications...')
self._handle.enable_notifications()
def set_notific_handler(self, func_hnd):
self._notify_callback = func_hnd
def services_resolved(self):
log.debug('Getting MoveHub services and characteristics...')
gatt.Device.services_resolved(self)
log.debug("[%s] Resolved services", self.mac_address)
for service in self.services:
log.debug("[%s] Service [%s]", self.mac_address, service.uuid)
for characteristic in service.characteristics:
log.debug("[%s] Characteristic [%s]", self.mac_address, characteristic.uuid)
if service.uuid == MOVE_HUB_HW_UUID_SERV and characteristic.uuid == MOVE_HUB_HW_UUID_CHAR:
log.debug('MoveHub characteristic found')
self._handle = characteristic
if self._handle is None:
self.manager.stop()
self._handle = RuntimeError("Failed to obtain MoveHub handle")
def characteristic_value_updated(self, characteristic, value):
value = self._fix_weird_bug(value)
log.debug('Notification in GattDevice: %s', str2hex(value))
self._notify_callback(MOVE_HUB_HARDWARE_HANDLE, value)
def _fix_weird_bug(self, value):
if isinstance(value, str) and "dbus.Array" in value: # weird bug from gatt on my Ubuntu 16.04!
log.debug("Fixing broken notify string: %s", value)
return ''.join([chr(int(x.group(1))) for x in re.finditer(r"dbus.Byte\((\d+)\)", value)])
return value
class GattConnection(Connection):
"""
:type _device: CustomDevice
"""
def __init__(self, bt_iface_name='hci0'):
super(GattConnection, self).__init__()
self._device = None
self._iface = bt_iface_name
def connect(self, hub_mac=None):
dev_manager = gatt.DeviceManager(adapter_name=self._iface)
dman_thread = threading.Thread(target=dev_manager.run)
dman_thread.setDaemon(True)
log.debug('Starting DeviceManager...')
dman_thread.start()
dev_manager.start_discovery()
while not self._device:
log.info("Discovering devices...")
devices = dev_manager.devices()
log.debug("Devices: %s", devices)
for dev in devices:
address = dev.mac_address
name = dev.alias()
if name == LEGO_MOVE_HUB or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._device = CustomDevice(address, dev_manager)
break
if not self._device:
sleep(1)
self._device.connect()
return self
def disconnect(self):
self._device.disconnect()
def write(self, handle, data):
self._device.write(data)
def set_notify_handler(self, handler):
self._device.set_notific_handler(handler)
def enable_notifications(self):
self._device.enable_notifications()

91
pylgbst/comms_gattlib.py Normal file
View File

@ -0,0 +1,91 @@
# noinspection PyMethodOverriding
import logging
import traceback
from gattlib import DiscoveryService, GATTRequester
from threading import Thread
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.utilities import queue, str2hex
log = logging.getLogger('comms-gattlib')
class Requester(GATTRequester):
"""
Wrapper to access `on_notification` capability of GATT
Set "notification_sink" field to a callable that will handle incoming data
"""
def __init__(self, p_object, *args, **kwargs):
super(Requester, self).__init__(p_object, *args, **kwargs)
self.notification_sink = None
self._notify_queue = queue.Queue() # this queue is to minimize time spent in gattlib C code
thr = Thread(target=self._dispatch_notifications)
thr.setDaemon(True)
thr.setName("Notify queue dispatcher")
thr.start()
def on_notification(self, handle, data):
# log.debug("requester notified, sink: %s", self.notification_sink)
self._notify_queue.put((handle, data))
def on_indication(self, handle, data):
log.debug("Indication on handle %s: %s", handle, str2hex(data))
def _dispatch_notifications(self):
while True:
handle, data = self._notify_queue.get()
data = data[3:] # for some reason, there are extra bytes
if self.notification_sink:
try:
self.notification_sink(handle, data)
except BaseException:
log.warning("Data was: %s", str2hex(data))
log.warning("Failed to dispatch notification: %s", traceback.format_exc())
else:
log.warning("Dropped notification %s: %s", handle, str2hex(data))
class GattLibConnection(Connection):
"""
Main transport class, uses real Bluetooth LE connection.
Loops with timeout of 1 seconds to find device named "Lego MOVE Hub"
:type requester: Requester
"""
def __init__(self, bt_iface_name='hci0'):
super(GattLibConnection, self).__init__()
self.requester = None
self._iface = bt_iface_name
def connect(self, hub_mac=None):
service = DiscoveryService(self._iface)
while not self.requester:
log.info("Discovering devices using %s...", self._iface)
devices = service.discover(1)
log.debug("Devices: %s", devices)
for address, name in devices.items():
if name == LEGO_MOVE_HUB or hub_mac == address:
logging.info("Found %s at %s", name, address)
self.requester = Requester(address, True, self._iface)
break
if self.requester:
break
return self
def set_notify_handler(self, handler):
if self.requester:
log.debug("Setting notification handler: %s", handler)
self.requester.notification_sink = handler
else:
raise RuntimeError("No requester available")
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, str2hex(data))
return self.requester.write_by_handle(handle, data)

61
pylgbst/comms_pygatt.py Normal file
View File

@ -0,0 +1,61 @@
import logging
import pygatt
from pylgbst.comms import Connection, LEGO_MOVE_HUB
from pylgbst.constants import MOVE_HUB_HW_UUID_CHAR
from pylgbst.utilities import str2hex
log = logging.getLogger('comms-pygatt')
class GattoolConnection(Connection):
"""
Used for connecting to
:type _conn_hnd: pygatt.backends.bgapi.device.BGAPIBLEDevice
"""
def __init__(self, controller='hci0'):
Connection.__init__(self)
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
self._conn_hnd = None
def connect(self, hub_mac=None):
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
adapter = self.backend()
adapter.start()
while not self._conn_hnd:
log.info("Discovering devices...")
devices = adapter.scan(1)
log.debug("Devices: %s", devices)
for dev in devices:
address = dev['address']
name = dev['name']
if name == LEGO_MOVE_HUB or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._conn_hnd = adapter.connect(address)
break
if self._conn_hnd:
break
return self
def disconnect(self):
self._conn_hnd.disconnect()
def write(self, handle, data):
log.debug("Writing to handle %s: %s", handle, str2hex(data))
return self._conn_hnd.char_write_handle(handle, bytearray(data))
def set_notify_handler(self, handler):
self._conn_hnd.subscribe(MOVE_HUB_HW_UUID_CHAR, handler)
class BlueGigaConnection(GattoolConnection):
def __init__(self):
super(BlueGigaConnection, self).__init__()
self.backend = lambda: pygatt.GATTToolBackend()

View File

@ -1,32 +1,12 @@
import binascii
import struct
import sys
if sys.version_info[0] == 2:
import Queue as queue
else:
import queue as queue
queue = queue # just to use it
def str2hex(data):
return binascii.hexlify(data).decode("utf8")
def usbyte(seq, index):
return struct.unpack("<B", seq[index:index + 1])[0]
def ushort(seq, index):
return struct.unpack("<H", seq[index:index + 2])[0]
# GENERAL
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
MOVE_HUB_HARDWARE_HANDLE = 0x0E
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
PACKET_VER = 0x01
LEGO_MOVE_HUB = "LEGO Move Hub"
# PORTS
PORT_C = 0x01

View File

@ -2,10 +2,11 @@ import logging
import time
from struct import pack
from pylgbst.comms import BLEConnection
from pylgbst import get_connection_auto
from pylgbst.constants import *
from pylgbst.peripherals import Button, EncodedMotor, ColorDistanceSensor, LED, TiltSensor, Voltage, Peripheral, \
Amperage
from pylgbst.utilities import str2hex, usbyte
log = logging.getLogger('movehub')
@ -37,7 +38,7 @@ class MoveHub(object):
def __init__(self, connection=None):
if not connection:
connection = BLEConnection().connect()
connection = get_connection_auto()
self.connection = connection
self.info = {}
@ -67,13 +68,14 @@ class MoveHub(object):
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
def _wait_for_devices(self):
self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
self.connection.enable_notifications()
builtin_devices = ()
for num in range(0, 60):
builtin_devices = (self.led, self.motor_A, self.motor_B,
self.motor_AB, self.tilt_sensor, self.button, self.amperage, self.voltage)
if None not in builtin_devices:
log.debug("All devices are present")
return
log.debug("Waiting for builtin devices to appear: %s", builtin_devices)
time.sleep(0.05)
@ -87,7 +89,6 @@ class MoveHub(object):
log.warning("Unsupported notification handle: 0x%s", handle)
return
data = data[3:]
log.debug("Notification on %s: %s", handle, str2hex(orig))
msg_type = usbyte(data, 2)

View File

@ -5,7 +5,9 @@ import traceback
from struct import pack, unpack
from threading import Thread
from pylgbst.constants import *
from pylgbst.constants import PORTS, MSG_SENSOR_SUBSCRIBE, COLOR_NONE, COLOR_BLACK, COLORS, MSG_SET_PORT_VAL, PORT_AB, \
MSG_DEVICE_INFO, INFO_BUTTON_STATE, INFO_ACTION_SUBSCRIBE, INFO_ACTION_UNSUBSCRIBE
from pylgbst.utilities import queue, str2hex, usbyte, ushort
log = logging.getLogger('peripherals')
@ -49,11 +51,11 @@ class Peripheral(object):
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
def started(self):
log.debug("Started: %s", self)
log.debug("Peripheral Started: %s", self)
self._working = True
def finished(self):
log.debug("Finished: %s", self)
log.debug("Peripheral Finished: %s", self)
self._working = False
def in_progress(self):
@ -69,14 +71,16 @@ class Peripheral(object):
if callback:
self._subscribers.add(callback)
def unsubscribe(self, callback=None):
def unsubscribe(self, callback=None, async=False):
if callback in self._subscribers:
self._subscribers.remove(callback)
if self._port_subscription_mode is None:
log.warning("Attempt to unsubscribe while never subscribed: %s", self)
elif not self._subscribers:
self.started()
self._port_subscribe(self._port_subscription_mode, 0, False)
self._wait_sync(async)
self._port_subscription_mode = None
def _notify_subscribers(self, *args, **kwargs):
@ -87,7 +91,7 @@ class Peripheral(object):
try:
self._incoming_port_data.put_nowait(data)
except queue.Full:
logging.debug("Dropped port data: %s", data)
log.debug("Dropped port data: %s", data)
def handle_port_data(self, data):
log.warning("Unhandled device notification for %s: %s", self, str2hex(data[4:]))
@ -137,7 +141,7 @@ class LED(Peripheral):
def subscribe(self, callback, mode=None, granularity=None, async=False):
self._subscribers.add(callback)
def unsubscribe(self, callback=None):
def unsubscribe(self, callback=None, async=False):
if callback in self._subscribers:
self._subscribers.remove(callback)
@ -403,7 +407,7 @@ class Voltage(Peripheral):
val = ushort(data, 4)
self.last_value = val / 4096.0
if self.last_value < 0.2:
logging.warning("Battery low! %s%%", int(100 * self.last_value))
log.warning("Battery low! %s%%", int(100 * self.last_value))
self._notify_subscribers(self.last_value)
@ -440,12 +444,13 @@ class Button(Peripheral):
if callback:
self._subscribers.add(callback)
def unsubscribe(self, callback=None):
def unsubscribe(self, callback=None, async=False):
if callback in self._subscribers:
self._subscribers.remove(callback)
if not self._subscribers:
self.parent.send(MSG_DEVICE_INFO, pack('<B', INFO_BUTTON_STATE) + pack('<B', INFO_ACTION_UNSUBSCRIBE))
# FIXME: will this require sync wait?
def handle_port_data(self, data):
param = usbyte(data, 5)

26
pylgbst/utilities.py Normal file
View File

@ -0,0 +1,26 @@
"""
This module offers some utilities, in a way they are work in both Python 2 and 3
"""
import binascii
import sys
from struct import unpack
if sys.version_info[0] == 2:
import Queue as queue
else:
import queue as queue
queue = queue # just to use it
def usbyte(seq, index):
return unpack("<B", seq[index:index + 1])[0]
def ushort(seq, index):
return unpack("<H", seq[index:index + 2])[0]
def str2hex(data): # we need it for python 2+3 compatibility
return binascii.hexlify(data).decode("utf8")

View File

@ -6,5 +6,10 @@ setup(name='pylgbst',
author='Andrey Pokhilko',
author_email='apc4@ya.ru',
packages=['pylgbst'],
requires=['gattlib']
requires=[],
extras_require={
'gatt': ["gatt"],
'gattlib': ["gattlib"],
'pygatt': ["pygatt"],
}
)

235
tests.py
View File

@ -1,235 +0,0 @@
import unittest
from binascii import unhexlify
from examples.plotter import Plotter
from pylgbst import *
from pylgbst.comms import Connection
from pylgbst.movehub import MoveHub
HANDLE = MOVE_HUB_HARDWARE_HANDLE
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')
class ConnectionMock(Connection):
"""
For unit testing purposes
"""
def __init__(self):
super(ConnectionMock, self).__init__()
self.writes = []
self.notifications = []
self.notification_handler = None
self.running = True
self.finished = False
def set_notify_handler(self, handler):
self.notification_handler = handler
thr = Thread(target=self.notifier)
thr.setDaemon(True)
thr.start()
def notifier(self):
while self.running:
if self.notification_handler:
while self.notifications:
handle, data = self.notifications.pop(0)
self.notification_handler(handle, unhexlify(data.replace(' ', '')))
time.sleep(0.1)
self.finished = True
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, str2hex(data))
self.writes.append((handle, str2hex(data)))
class HubMock(MoveHub):
# noinspection PyUnresolvedReferences
def __init__(self, connection=None):
"""
:type connection: ConnectionMock
"""
super(HubMock, self).__init__(connection if connection else ConnectionMock())
self.notify_mock = self.connection.notifications
self.writes = self.connection.writes
def _wait_for_devices(self):
pass
def _report_status(self):
pass
class GeneralTest(unittest.TestCase):
def _wait_notifications_handled(self, hub):
hub.connection.running = False
for _ in range(1, 180):
time.sleep(1)
log.debug("Waiting for notifications to process...")
if hub.connection.finished:
log.debug("Done waiting")
break
def test_led(self):
hub = HubMock()
led = LED(hub, PORT_LED)
led.set_color(COLOR_RED)
self.assertEqual("0801813201510009", hub.writes[0][1])
def test_tilt_sensor(self):
hub = HubMock()
hub.notify_mock.append((HANDLE, '1b0e00 0f00 04 3a 0128000000000100000001'))
time.sleep(1)
def callback(param1, param2=None, param3=None):
if param2 is None:
log.debug("Tilt: %s", TiltSensor.DUO_STATES[param1])
else:
log.debug("Tilt: %s %s %s", param1, param2, param3)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback)
hub.notify_mock.append((HANDLE, "1b0e000500453a05"))
hub.notify_mock.append((HANDLE, "1b0e000a00473a010100000001"))
time.sleep(1)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_SIMPLE)
hub.notify_mock.append((HANDLE, "1b0e000500453a09"))
time.sleep(1)
self._inject_notification(hub, '1b0e000a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_FULL)
hub.notify_mock.append((HANDLE, "1b0e000600453a04fe"))
time.sleep(1)
self._wait_notifications_handled(hub)
hub.tilt_sensor.unsubscribe(callback)
# TODO: assert
def test_motor(self):
conn = ConnectionMock()
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
hub = HubMock(conn)
time.sleep(0.1)
conn.notifications.append((14, '1b0e00050082390a'))
hub.motor_AB.timed(1.5)
self.assertEqual("0d018139110adc056464647f03", conn.writes[0][1])
conn.notifications.append((14, '1b0e00050082390a'))
hub.motor_AB.angled(90)
self.assertEqual("0f018139110c5a0000006464647f03", conn.writes[1][1])
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append((14, '1b0e00 0f00 04 01 0125000000001000000010'))
conn.notifications.append((14, '1b0e00 0f00 04 02 0126000000001000000010'))
conn.notifications.append((14, '1b0e00 0f00 04 37 0127000100000001000000'))
conn.notifications.append((14, '1b0e00 0f00 04 38 0127000100000001000000'))
conn.notifications.append((14, '1b0e00 0900 04 39 0227003738'))
conn.notifications.append((14, '1b0e00 0f00 04 32 0117000100000001000000'))
conn.notifications.append((14, '1b0e00 0f00 04 3a 0128000000000100000001'))
conn.notifications.append((14, '1b0e00 0f00 04 3b 0115000200000002000000'))
conn.notifications.append((14, '1b0e00 0f00 04 3c 0114000200000002000000'))
conn.notifications.append((14, '1b0e00 0f00 8202 01'))
conn.notifications.append((14, '1b0e00 0f00 8202 0a'))
self._inject_notification(conn, '1b0e00 1200 0101 06 4c45474f204d6f766520487562', 1)
self._inject_notification(conn, '1b0e00 1200 0108 06 4c45474f204d6f766520487562', 2)
self._inject_notification(conn, '1b0e00 0900 47 3c 0227003738', 3)
self._inject_notification(conn, '1b0e00 0600 45 3c 020d', 4)
hub = MoveHub(conn)
# demo_all(hub)
self._wait_notifications_handled(hub)
def test_color_sensor(self):
#
hub = HubMock()
hub.notify_mock.append((HANDLE, '1b0e000f00 04010125000000001000000010'))
time.sleep(1)
def callback(color, unk1, unk2=None):
name = COLORS[color] if color is not None else 'NONE'
log.info("Color: %s %s %s", name, unk1, unk2)
self._inject_notification(hub, '1b0e000a00 4701090100000001', 1)
hub.color_distance_sensor.subscribe(callback)
hub.notify_mock.append((HANDLE, "1b0e0008004501ff0aff00"))
time.sleep(1)
# TODO: assert
self._wait_notifications_handled(hub)
hub.color_distance_sensor.unsubscribe(callback)
def test_button(self):
hub = HubMock()
time.sleep(1)
def callback(pressed):
log.info("Pressed: %s", pressed)
hub.notify_mock.append((HANDLE, "1b0e00060001020600"))
hub.button.subscribe(callback)
hub.notify_mock.append((HANDLE, "1b0e00060001020601"))
hub.notify_mock.append((HANDLE, "1b0e00060001020600"))
time.sleep(1)
# TODO: assert
self._wait_notifications_handled(hub)
hub.button.unsubscribe(callback)
def _inject_notification(self, hub, notify, pause):
def inject():
time.sleep(pause)
if isinstance(hub, ConnectionMock):
hub.notifications.append((HANDLE, notify))
else:
hub.notify_mock.append((HANDLE, notify))
Thread(target=inject).start()
class TestPlotter(unittest.TestCase):
def test_calc1(self):
self.assertEqual((100, 1, 0.5), Plotter._calc_motor_timed(100, 50))
def test_calc2(self):
self.assertEqual((200, 0.5, 1), Plotter._calc_motor_timed(100, 200))
def test_calc_xoverflow(self):
self.assertEqual((800, 0.0125, 1), Plotter._calc_motor_timed(10, 800))
def test_calc3(self):
self.assertEqual((100, 1, 0), Plotter._calc_motor_timed(100, 0))
def test_calc4(self):
self.assertEqual((200, 0, 1), Plotter._calc_motor_timed(0, 200))
def test_calc5(self):
parts = 2
for x in range(0, parts + 1):
res = Plotter._calc_motor_angled(1.0, x * 1.0 / parts)
logging.debug("%s", res)
for x in range(0, parts + 1):
res = Plotter._calc_motor_angled(x * 1.0 / parts, 1.0)
logging.debug("%s", res)
def test_zeroes(self):
res = Plotter._calc_motor_angled(1.0, 0.0)
self.assertNotEqual(0, res[1])
res = Plotter._calc_motor_angled(0.0, 1.0)
self.assertNotEqual(0, res[2])
def test_calc6(self):
res = Plotter._calc_motor_angled(1.0, 0.2)
logging.debug("%s", res)
res = Plotter._calc_motor_angled(1.0, 0.5)
logging.debug("%s", res)
res = Plotter._calc_motor_angled(1.0, 0.8)
logging.debug("%s", res)

60
tests/__init__.py Normal file
View File

@ -0,0 +1,60 @@
from binascii import unhexlify
from pylgbst.comms import Connection
from pylgbst.movehub import MoveHub
from pylgbst.peripherals import *
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')
class HubMock(MoveHub):
# noinspection PyUnresolvedReferences
def __init__(self, connection=None):
"""
:type connection: ConnectionMock
"""
super(HubMock, self).__init__(connection if connection else ConnectionMock())
self.notify_mock = self.connection.notifications
self.writes = self.connection.writes
def _wait_for_devices(self):
pass
def _report_status(self):
pass
class ConnectionMock(Connection):
"""
For unit testing purposes
"""
def __init__(self):
super(ConnectionMock, self).__init__()
self.writes = []
self.notifications = []
self.notification_handler = None
self.running = True
self.finished = False
def set_notify_handler(self, handler):
self.notification_handler = handler
thr = Thread(target=self.notifier)
thr.setDaemon(True)
thr.start()
def notifier(self):
while self.running:
if self.notification_handler:
while self.notifications:
handle, data = self.notifications.pop(0)
self.notification_handler(handle, unhexlify(data.replace(' ', '')))
time.sleep(0.1)
self.finished = True
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, str2hex(data))
self.writes.append((handle, str2hex(data)))

43
tests/test_gatt.py Normal file
View File

@ -0,0 +1,43 @@
import sys
import unittest
import dbus
from gatt import DeviceManager
from pylgbst.comms_gatt import CustomDevice
from tests import log, str2hex
class MockBus(object):
def __init__(self, *args, **kwargs):
# super(MockBus, self).__init__(*args, **kwargs)
pass
def get_object(self, bus_name, object_path, introspect=True, follow_name_owner_changes=False, **kwargs):
return None
dbus.SystemBus = lambda: MockBus()
class DeviceManagerMock(DeviceManager, object):
def update_devices(self):
pass
class TestGatt(unittest.TestCase):
def test_one(self):
log.debug("")
manager = DeviceManagerMock("hci0")
obj = CustomDevice("AA", manager)
def callback(handle, value):
log.debug("%s: %s", type(value), str2hex(value))
if sys.version_info[0] == 2:
self.assertEquals("0f0004020126000000001000000010", str2hex(value))
obj.set_notific_handler(callback)
arr = "dbus.Array([dbus.Byte(15), dbus.Byte(0), dbus.Byte(4), dbus.Byte(2), dbus.Byte(1), dbus.Byte(38), " \
"dbus.Byte(0), dbus.Byte(0), dbus.Byte(0), dbus.Byte(0), dbus.Byte(16), dbus.Byte(0), dbus.Byte(0), " \
"dbus.Byte(0), dbus.Byte(16)], signature=dbus.Signature('y'), variant_level=1)"
obj.characteristic_value_updated(None, arr if sys.version_info[0] == 2 else bytes(arr, 'ascii'))

141
tests/test_movehub.py Normal file
View File

@ -0,0 +1,141 @@
import time
import unittest
from pylgbst.movehub import MoveHub, MOVE_HUB_HARDWARE_HANDLE, PORT_LED, COLOR_RED
from pylgbst.peripherals import LED, TiltSensor, COLORS
from tests import log, HubMock, ConnectionMock, Thread
HANDLE = MOVE_HUB_HARDWARE_HANDLE
class GeneralTest(unittest.TestCase):
def _wait_notifications_handled(self, hub):
hub.connection.running = False
for _ in range(1, 180):
time.sleep(1)
log.debug("Waiting for notifications to process...")
if hub.connection.finished:
log.debug("Done waiting for notifications to process")
break
def test_led(self):
hub = HubMock()
led = LED(hub, PORT_LED)
led.set_color(COLOR_RED)
self.assertEqual("0801813201510009", hub.writes[0][1])
def test_tilt_sensor(self):
hub = HubMock()
hub.notify_mock.append((HANDLE, '0f00 04 3a 0128000000000100000001'))
time.sleep(1)
def callback(param1, param2=None, param3=None):
if param2 is None:
log.debug("Tilt: %s", TiltSensor.DUO_STATES[param1])
else:
log.debug("Tilt: %s %s %s", param1, param2, param3)
self._inject_notification(hub, '0a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback)
hub.notify_mock.append((HANDLE, "0500453a05"))
hub.notify_mock.append((HANDLE, "0a00473a010100000001"))
time.sleep(1)
self._inject_notification(hub, '0a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_SIMPLE)
hub.notify_mock.append((HANDLE, "0500453a09"))
time.sleep(1)
self._inject_notification(hub, '0a00 47 3a 090100000001', 1)
hub.tilt_sensor.subscribe(callback, TiltSensor.MODE_2AXIS_FULL)
hub.notify_mock.append((HANDLE, "0600453a04fe"))
time.sleep(1)
self._inject_notification(hub, '0a00 47 3a 090100000001', 1)
hub.tilt_sensor.unsubscribe(callback)
self._wait_notifications_handled(hub)
# TODO: assert
def test_motor(self):
conn = ConnectionMock()
conn.notifications.append((14, '0900 04 39 0227003738'))
hub = HubMock(conn)
time.sleep(0.1)
conn.notifications.append((14, '050082390a'))
hub.motor_AB.timed(1.5)
self.assertEqual("0d018139110adc056464647f03", conn.writes[0][1])
conn.notifications.append((14, '050082390a'))
hub.motor_AB.angled(90)
self.assertEqual("0f018139110c5a0000006464647f03", conn.writes[1][1])
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append((14, '0f00 04 01 0125000000001000000010'))
conn.notifications.append((14, '0f00 04 02 0126000000001000000010'))
conn.notifications.append((14, '0f00 04 37 0127000100000001000000'))
conn.notifications.append((14, '0f00 04 38 0127000100000001000000'))
conn.notifications.append((14, '0900 04 39 0227003738'))
conn.notifications.append((14, '0f00 04 32 0117000100000001000000'))
conn.notifications.append((14, '0f00 04 3a 0128000000000100000001'))
conn.notifications.append((14, '0f00 04 3b 0115000200000002000000'))
conn.notifications.append((14, '0f00 04 3c 0114000200000002000000'))
conn.notifications.append((14, '0f00 8202 01'))
conn.notifications.append((14, '0f00 8202 0a'))
self._inject_notification(conn, '1200 0101 06 4c45474f204d6f766520487562', 1)
self._inject_notification(conn, '1200 0108 06 4c45474f204d6f766520487562', 2)
self._inject_notification(conn, '0900 47 3c 0227003738', 3)
self._inject_notification(conn, '0600 45 3c 020d', 4)
self._inject_notification(conn, '0900 47 3c 0227003738', 5)
hub = MoveHub(conn)
# demo_all(hub)
self._wait_notifications_handled(hub)
def test_color_sensor(self):
#
hub = HubMock()
hub.notify_mock.append((HANDLE, '0f00 04010125000000001000000010'))
time.sleep(1)
def callback(color, unk1, unk2=None):
name = COLORS[color] if color is not None else 'NONE'
log.info("Color: %s %s %s", name, unk1, unk2)
self._inject_notification(hub, '0a00 4701090100000001', 1)
hub.color_distance_sensor.subscribe(callback)
hub.notify_mock.append((HANDLE, "08004501ff0aff00"))
time.sleep(1)
# TODO: assert
self._inject_notification(hub, '0a00 4701090100000001', 1)
hub.color_distance_sensor.unsubscribe(callback)
self._wait_notifications_handled(hub)
def test_button(self):
hub = HubMock()
time.sleep(1)
def callback(pressed):
log.info("Pressed: %s", pressed)
hub.notify_mock.append((HANDLE, "060001020600"))
hub.button.subscribe(callback)
hub.notify_mock.append((HANDLE, "060001020601"))
hub.notify_mock.append((HANDLE, "060001020600"))
time.sleep(1)
# TODO: assert
hub.button.unsubscribe(callback)
self._wait_notifications_handled(hub)
def _inject_notification(self, hub, notify, pause):
def inject():
time.sleep(pause)
if isinstance(hub, ConnectionMock):
hub.notifications.append((HANDLE, notify))
else:
hub.notify_mock.append((HANDLE, notify))
Thread(target=inject).start()

65
tests/test_pygatt.py Normal file
View File

@ -0,0 +1,65 @@
import unittest
import pygatt
import serial
from pygatt import BLEAddressType
from pygatt.backends.bgapi.bgapi import MAX_CONNECTION_ATTEMPTS
from pygatt.backends.bgapi.device import BGAPIBLEDevice
from pylgbst.comms_pygatt import GattoolConnection
from tests import log
class SerialMock(serial.Serial):
def write(self, data):
self.is_open = True
log.debug("Write data to serial: %s", data)
return len(data)
def flush(self, *args, **kwargs):
pass
def close(self):
pass
def read(self, size=1):
return bytes()
class BGAPIBLEDeviceMock(BGAPIBLEDevice):
def subscribe(self, uuid, callback=None, indication=False):
log.debug("Mock subscribing")
def char_write_handle(self, char_handle, value, wait_for_response=False):
log.debug("Mock write: %s", value)
class BlueGigaBackendMock(pygatt.BGAPIBackend):
def _open_serial_port(self, max_connection_attempts=MAX_CONNECTION_ATTEMPTS):
log.debug("Mock open serial port")
self._ser = SerialMock()
def expect(self, expected, *args, **kargs):
log.debug("Mock expect")
data = {
"packet_type": 0x04,
"sender": "abcdef".encode('ascii'),
"data": [1, 2, 3],
"rssi": 1
}
self._ble_evt_gap_scan_response(data)
def connect(self, address, timeout=5, address_type=BLEAddressType.public, interval_min=60, interval_max=76,
supervision_timeout=100, latency=0):
log.debug("Mock connect")
device = BGAPIBLEDeviceMock("address", 0, self)
return device
class BlueGigaTests(unittest.TestCase):
def test_1(self):
obj = GattoolConnection()
obj.backend = BlueGigaBackendMock
obj.connect(u'66:65:64:63:62:61')
obj.write(0, "test".encode('ascii'))
obj.set_notify_handler(lambda x: None)