mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
implement thread per device queue
This commit is contained in:
parent
81ce664d21
commit
1171ee2668
@ -102,7 +102,7 @@ class MoveHub(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
device = self.devices[port]
|
device = self.devices[port]
|
||||||
device.handle_port_data(data)
|
device.queue_port_data(data)
|
||||||
|
|
||||||
def _handle_port_status(self, data):
|
def _handle_port_status(self, data):
|
||||||
port = get_byte(data, 3)
|
port = get_byte(data, 3)
|
||||||
|
@ -43,11 +43,11 @@ class Requester(GATTRequester):
|
|||||||
self.notification_sink = None
|
self.notification_sink = None
|
||||||
|
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
self._notify_queue = queue.Queue()
|
self._notify_queue = queue.Queue() # this queue is to minimize time spent in gattlib C code
|
||||||
self._notifier_thread = Thread(target=self._dispatch_notifications)
|
thr = Thread(target=self._dispatch_notifications)
|
||||||
self._notifier_thread.setDaemon(True)
|
thr.setDaemon(True)
|
||||||
self._notifier_thread.setName("Notify queue dispatcher")
|
thr.setName("Notify queue dispatcher")
|
||||||
self._notifier_thread.start()
|
thr.start()
|
||||||
|
|
||||||
def on_notification(self, handle, data):
|
def on_notification(self, handle, data):
|
||||||
# log.debug("requester notified, sink: %s", self.notification_sink)
|
# log.debug("requester notified, sink: %s", self.notification_sink)
|
||||||
@ -63,7 +63,7 @@ class Requester(GATTRequester):
|
|||||||
try:
|
try:
|
||||||
self.notification_sink(handle, data)
|
self.notification_sink(handle, data)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
log.warning("Failed to dispatch notification: %s", str2hex(data))
|
log.warning("Data was: %s", str2hex(data))
|
||||||
log.warning("Failed to dispatch notification: %s", traceback.format_exc())
|
log.warning("Failed to dispatch notification: %s", traceback.format_exc())
|
||||||
else:
|
else:
|
||||||
log.warning("Dropped notification %s: %s", handle, str2hex(data))
|
log.warning("Dropped notification %s: %s", handle, str2hex(data))
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from struct import pack, unpack
|
from struct import pack, unpack
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
from six.moves import queue
|
||||||
|
|
||||||
from pylgbst import get_byte, str2hex
|
from pylgbst import get_byte, str2hex
|
||||||
from pylgbst.constants import *
|
from pylgbst.constants import *
|
||||||
@ -11,6 +14,7 @@ log = logging.getLogger('peripherals')
|
|||||||
class Peripheral(object):
|
class Peripheral(object):
|
||||||
"""
|
"""
|
||||||
:type parent: MoveHub
|
:type parent: MoveHub
|
||||||
|
:type _incoming_port_data: queue.Queue
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, parent, port):
|
def __init__(self, parent, port):
|
||||||
@ -24,6 +28,11 @@ class Peripheral(object):
|
|||||||
self._working = False
|
self._working = False
|
||||||
self._subscribers = set()
|
self._subscribers = set()
|
||||||
self._port_subscription_mode = None
|
self._port_subscription_mode = None
|
||||||
|
self._incoming_port_data = queue.Queue()
|
||||||
|
thr = Thread(target=self._queue_reader)
|
||||||
|
thr.setDaemon(True)
|
||||||
|
thr.setName("Port data queue: %s" % self)
|
||||||
|
thr.start()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port)
|
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port)
|
||||||
@ -71,10 +80,21 @@ class Peripheral(object):
|
|||||||
for subscriber in self._subscribers:
|
for subscriber in self._subscribers:
|
||||||
subscriber(*args, **kwargs)
|
subscriber(*args, **kwargs)
|
||||||
|
|
||||||
|
def queue_port_data(self, data):
|
||||||
|
self._incoming_port_data.put(data)
|
||||||
|
|
||||||
def handle_port_data(self, data):
|
def handle_port_data(self, data):
|
||||||
log.warning("Unhandled device notification for %s: %s", self, str2hex(data[4:]))
|
log.warning("Unhandled device notification for %s: %s", self, str2hex(data[4:]))
|
||||||
self._notify_subscribers(data[4:])
|
self._notify_subscribers(data[4:])
|
||||||
|
|
||||||
|
def _queue_reader(self):
|
||||||
|
while True:
|
||||||
|
data = self._incoming_port_data.get()
|
||||||
|
try:
|
||||||
|
self.handle_port_data(data)
|
||||||
|
except BaseException:
|
||||||
|
log.warning("Failed to handle port data by %s: %s", self, str2hex(data))
|
||||||
|
|
||||||
|
|
||||||
class LED(Peripheral):
|
class LED(Peripheral):
|
||||||
SOMETHING = b'\x51\x00'
|
SOMETHING = b'\x51\x00'
|
||||||
|
@ -74,8 +74,9 @@ class Vernie(MoveHub):
|
|||||||
self._head_position = 0
|
self._head_position = 0
|
||||||
self.motor_external.subscribe(self._external_motor_data)
|
self.motor_external.subscribe(self._external_motor_data)
|
||||||
|
|
||||||
self._reset_head()
|
# self._reset_head() FIXME: restore it
|
||||||
self.say("ready")
|
# self.say("ready")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
def say(self, phrase):
|
def say(self, phrase):
|
||||||
if phrase in self.SPEECH_LANG_MAP[self.language]:
|
if phrase in self.SPEECH_LANG_MAP[self.language]:
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from vernie import *
|
from vernie import *
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
robot = Vernie()
|
robot = Vernie()
|
||||||
running = True
|
running = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user