diff --git a/pylgbst/__init__.py b/pylgbst/__init__.py index b9ee15f..e3d09d2 100644 --- a/pylgbst/__init__.py +++ b/pylgbst/__init__.py @@ -111,6 +111,7 @@ class MoveHub(object): self.devices[port].finished() elif status == STATUS_CONFLICT: log.warning("Command conflict on port %s", PORTS[port]) + self.devices[port].finished() else: log.warning("Unhandled status value: 0x%x", status) diff --git a/pylgbst/comms.py b/pylgbst/comms.py index 28e7a56..a5a5c8b 100644 --- a/pylgbst/comms.py +++ b/pylgbst/comms.py @@ -1,19 +1,19 @@ """ This package holds communication aspects """ +import binascii import json import logging import socket import sys import time import traceback +from Queue import Queue from abc import abstractmethod from binascii import unhexlify from gattlib import DiscoveryService, GATTRequester from threading import Thread -import binascii - from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN log = logging.getLogger('transport') @@ -42,14 +42,31 @@ class Requester(GATTRequester): super(Requester, self).__init__(p_object, *args, **kwargs) self.notification_sink = None + self._notify_queue = Queue() + self._notifier_thread = Thread(target=self._dispatch_notifications) + self._notifier_thread.setDaemon(True) + self._notifier_thread.setName("Notify queue dispatcher") + self._notifier_thread.start() + def on_notification(self, handle, data): # log.debug("requester notified, sink: %s", self.notification_sink) - if self.notification_sink: - self.notification_sink(handle, data) + self._notify_queue.put((handle, data)) def on_indication(self, handle, data): log.debug("Indication on handle %s: %s", handle, str2hex(data)) + def _dispatch_notifications(self): + while True: + handle, data = self._notify_queue.get() + if self.notification_sink: + try: + self.notification_sink(handle, data) + except BaseException: + log.warning("Failed to dispatch notification: %s", str2hex(data)) + log.warning("Failed to dispatch notification: %s", traceback.format_exc()) + else: + log.warning("Dropped notification %s: %s", handle, str2hex(data)) + class Connection(object): @abstractmethod @@ -70,17 +87,17 @@ class BLEConnection(Connection): Main transport class, uses real Bluetooth LE connection. Loops with timeout of 1 seconds to find device named "Lego MOVE Hub" - :type requester: Requester + :type _requester: Requester """ def __init__(self): super(BLEConnection, self).__init__() - self.requester = None + self._requester = None def connect(self, bt_iface_name='hci0'): service = DiscoveryService(bt_iface_name) - while not self.requester: + while not self._requester: log.info("Discovering devices using %s...", bt_iface_name) devices = service.discover(1) log.debug("Devices: %s", devices) @@ -88,25 +105,22 @@ class BLEConnection(Connection): for address, name in devices.items(): if name == LEGO_MOVE_HUB: logging.info("Found %s at %s", name, address) - self._get_requester(address, bt_iface_name) + self._requester = Requester(address, True, bt_iface_name) break return self - def _get_requester(self, address, bt_iface_name): - self.requester = Requester(address, True, bt_iface_name) - def set_notify_handler(self, handler): - if self.requester: + if self._requester: log.debug("Setting notification handler: %s", handler) - self.requester.notification_sink = handler + self._requester.notification_sink = handler else: raise RuntimeError("No requester available") def read(self, handle): # FIXME: repeating reads hangs it... log.debug("Reading from: %s", handle) - data = self.requester.read_by_handle(handle) + data = self._requester.read_by_handle(handle) log.debug("Result: %s", data) if isinstance(data, list): data = data[0] @@ -114,7 +128,7 @@ class BLEConnection(Connection): def write(self, handle, data): log.debug("Writing to %s: %s", handle, str2hex(data)) - return self.requester.write_by_handle(handle, data) + return self._requester.write_by_handle(handle, data) class DebugServer(object): @@ -141,7 +155,7 @@ class DebugServer(object): conn, addr = self.sock.accept() if not self._running: raise KeyboardInterrupt("Shutdown") - self.ble.requester.notification_sink = lambda x, y: self._notify(conn, x, y) + self.ble._requester.notification_sink = lambda x, y: self._notify(conn, x, y) try: self._handle_conn(conn) except KeyboardInterrupt: @@ -149,14 +163,14 @@ class DebugServer(object): except BaseException: log.error("Problem handling incoming connection: %s", traceback.format_exc()) finally: - self.ble.requester.notification_sink = self._notify_dummy + self.ble._requester.notification_sink = self._notify_dummy conn.close() def __del__(self): self.sock.close() def _notify_dummy(self, handle, data): - log.debug("Notification from handle %s: %s", handle, unhexlify(data)) + log.debug("Notification from handle %s: %s", handle, hexlify(data.strip())) self._check_shutdown(data) def _notify(self, conn, handle, data): @@ -228,6 +242,7 @@ class DebugServerConnection(Connection): self.incoming = [] self.reader = Thread(target=self._recv) + self.reader.setName("Debug connection reader") self.reader.setDaemon(True) self.reader.start() @@ -265,7 +280,7 @@ class DebugServerConnection(Connection): data = self.sock.recv(1024) log.debug("Recv from debug server: %s", data.strip()) if not data: - break + raise KeyboardInterrupt("Server has closed connection") self.buf += data diff --git a/pylgbst/peripherals.py b/pylgbst/peripherals.py index 3029362..a682975 100644 --- a/pylgbst/peripherals.py +++ b/pylgbst/peripherals.py @@ -126,7 +126,6 @@ class EncodedMotor(Peripheral): command += self.TRAILER - self._working = -1 self._write_to_hub(MSG_SET_PORT_VAL, command) def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): @@ -141,6 +140,7 @@ class EncodedMotor(Peripheral): raise ValueError("Too large value for seconds: %s", seconds) command += pack('