mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
* Cosmetics * Harmonograph demo * Cleanup * Original * Original * Cosmetics * Original file * Fixes * cosmetics * separate classes * Cosmetics * Cosmetics * fix tests * Remove plotter tests * Add bluegiga * Rename it * Progress * Fix tests * Cosmetics * Found a way for pygatt! * Playing with gatt * Fix hung subscribe * rename class * add test * skeleton for autodetect * safer order * Fix tests * Fix test * Add dbus install * another try * 2 * 3 * 34 * 6 * 7 * Isolate some tests * 8 * back to roots * Try more * 9 * Help * rep * site-packs * Fix? * Py3 come on * dbus * busss * dev null! * Fix test * Cleanup * Fix tests * Fix after review * add package * FIx package paths * Cosmetics * Update * More doc
61 lines
1.6 KiB
Python
61 lines
1.6 KiB
Python
from binascii import unhexlify
|
|
|
|
from pylgbst.comms import Connection
|
|
from pylgbst.movehub import MoveHub
|
|
from pylgbst.peripherals import *
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
log = logging.getLogger('test')
|
|
|
|
|
|
class HubMock(MoveHub):
|
|
# noinspection PyUnresolvedReferences
|
|
def __init__(self, connection=None):
|
|
"""
|
|
:type connection: ConnectionMock
|
|
"""
|
|
super(HubMock, self).__init__(connection if connection else ConnectionMock())
|
|
self.notify_mock = self.connection.notifications
|
|
self.writes = self.connection.writes
|
|
|
|
def _wait_for_devices(self):
|
|
pass
|
|
|
|
def _report_status(self):
|
|
pass
|
|
|
|
|
|
class ConnectionMock(Connection):
|
|
"""
|
|
For unit testing purposes
|
|
"""
|
|
|
|
def __init__(self):
|
|
super(ConnectionMock, self).__init__()
|
|
self.writes = []
|
|
self.notifications = []
|
|
self.notification_handler = None
|
|
self.running = True
|
|
self.finished = False
|
|
|
|
def set_notify_handler(self, handler):
|
|
self.notification_handler = handler
|
|
thr = Thread(target=self.notifier)
|
|
thr.setDaemon(True)
|
|
thr.start()
|
|
|
|
def notifier(self):
|
|
while self.running:
|
|
if self.notification_handler:
|
|
while self.notifications:
|
|
handle, data = self.notifications.pop(0)
|
|
self.notification_handler(handle, unhexlify(data.replace(' ', '')))
|
|
time.sleep(0.1)
|
|
|
|
self.finished = True
|
|
|
|
def write(self, handle, data):
|
|
log.debug("Writing to %s: %s", handle, str2hex(data))
|
|
self.writes.append((handle, str2hex(data)))
|