mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Support push button handling
This commit is contained in:
parent
fa0ee2d77a
commit
185c33cd26
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
Requires `gattlib` to be installed, currently Python 2.7 only
|
Requires `gattlib` to be installed, currently Python 2.7 only
|
||||||
|
|
||||||
Best way to start is to look at [demo.py](demo.py) file, and run it.
|
Best way to start is to look into [demo.py](demo.py) file, and run it.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
@ -20,7 +20,6 @@ Best way to start is to look at [demo.py](demo.py) file, and run it.
|
|||||||
from pylgbst import MoveHub
|
from pylgbst import MoveHub
|
||||||
|
|
||||||
hub = MoveHub()
|
hub = MoveHub()
|
||||||
print(hub.get_name())
|
|
||||||
|
|
||||||
for device in hub.devices:
|
for device in hub.devices:
|
||||||
print(device)
|
print(device)
|
||||||
@ -39,6 +38,7 @@ sudo python -c "from pylgbst.comms import *; import logging; logging.basicConfig
|
|||||||
- Give nice documentation examples, don't forget to mention logging
|
- Give nice documentation examples, don't forget to mention logging
|
||||||
- make angled motors to be synchronous by default => 3-state status
|
- make angled motors to be synchronous by default => 3-state status
|
||||||
- make sure unit tests cover all important code
|
- make sure unit tests cover all important code
|
||||||
|
- generalize getting device info + give constants (low priority)
|
||||||
|
|
||||||
## Links
|
## Links
|
||||||
|
|
||||||
|
2
demo.py
2
demo.py
@ -177,6 +177,6 @@ if __name__ == '__main__':
|
|||||||
sleep(10)
|
sleep(10)
|
||||||
# demo_port_cd_motor(hub)
|
# demo_port_cd_motor(hub)
|
||||||
sleep(10)
|
sleep(10)
|
||||||
hub.button.unsubscribe(cb_log)
|
#hub.button.unsubscribe(cb_log)
|
||||||
sleep(10)
|
sleep(10)
|
||||||
# demo_all(hub)
|
# demo_all(hub)
|
||||||
|
@ -59,10 +59,6 @@ class MoveHub(object):
|
|||||||
log.warning("Got only these devices: %s", builtin_devices)
|
log.warning("Got only these devices: %s", builtin_devices)
|
||||||
raise RuntimeError("Failed to obtain all builtin devices")
|
raise RuntimeError("Failed to obtain all builtin devices")
|
||||||
|
|
||||||
def get_name(self):
|
|
||||||
# note: reading this too fast makes it hang
|
|
||||||
return self.connection.read(DEVICE_NAME)
|
|
||||||
|
|
||||||
def _notify(self, handle, data):
|
def _notify(self, handle, data):
|
||||||
orig = data
|
orig = data
|
||||||
|
|
||||||
|
@ -6,14 +6,14 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import time
|
|
||||||
import traceback
|
import traceback
|
||||||
from six.moves import queue
|
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
from gattlib import DiscoveryService, GATTRequester
|
from gattlib import DiscoveryService, GATTRequester
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
|
from six.moves import queue
|
||||||
|
|
||||||
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
|
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
|
||||||
|
|
||||||
log = logging.getLogger('comms')
|
log = logging.getLogger('comms')
|
||||||
@ -70,10 +70,6 @@ class Requester(GATTRequester):
|
|||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
@abstractmethod
|
|
||||||
def read(self, handle):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def write(self, handle, data):
|
def write(self, handle, data):
|
||||||
pass
|
pass
|
||||||
@ -88,17 +84,17 @@ class BLEConnection(Connection):
|
|||||||
Main transport class, uses real Bluetooth LE connection.
|
Main transport class, uses real Bluetooth LE connection.
|
||||||
Loops with timeout of 1 seconds to find device named "Lego MOVE Hub"
|
Loops with timeout of 1 seconds to find device named "Lego MOVE Hub"
|
||||||
|
|
||||||
:type _requester: Requester
|
:type requester: Requester
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(BLEConnection, self).__init__()
|
super(BLEConnection, self).__init__()
|
||||||
self._requester = None
|
self.requester = None
|
||||||
|
|
||||||
def connect(self, bt_iface_name='hci0'):
|
def connect(self, bt_iface_name='hci0'):
|
||||||
service = DiscoveryService(bt_iface_name)
|
service = DiscoveryService(bt_iface_name)
|
||||||
|
|
||||||
while not self._requester:
|
while not self.requester:
|
||||||
log.info("Discovering devices using %s...", bt_iface_name)
|
log.info("Discovering devices using %s...", bt_iface_name)
|
||||||
devices = service.discover(1)
|
devices = service.discover(1)
|
||||||
log.debug("Devices: %s", devices)
|
log.debug("Devices: %s", devices)
|
||||||
@ -106,30 +102,21 @@ class BLEConnection(Connection):
|
|||||||
for address, name in devices.items():
|
for address, name in devices.items():
|
||||||
if name == LEGO_MOVE_HUB:
|
if name == LEGO_MOVE_HUB:
|
||||||
logging.info("Found %s at %s", name, address)
|
logging.info("Found %s at %s", name, address)
|
||||||
self._requester = Requester(address, True, bt_iface_name)
|
self.requester = Requester(address, True, bt_iface_name)
|
||||||
break
|
break
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def set_notify_handler(self, handler):
|
def set_notify_handler(self, handler):
|
||||||
if self._requester:
|
if self.requester:
|
||||||
log.debug("Setting notification handler: %s", handler)
|
log.debug("Setting notification handler: %s", handler)
|
||||||
self._requester.notification_sink = handler
|
self.requester.notification_sink = handler
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("No requester available")
|
raise RuntimeError("No requester available")
|
||||||
|
|
||||||
def read(self, handle):
|
|
||||||
# FIXME: repeating reads hangs it...
|
|
||||||
log.debug("Reading from: %s", handle)
|
|
||||||
data = self._requester.read_by_handle(handle)
|
|
||||||
log.debug("Result: %s", data)
|
|
||||||
if isinstance(data, list):
|
|
||||||
data = data[0]
|
|
||||||
return data
|
|
||||||
|
|
||||||
def write(self, handle, data):
|
def write(self, handle, data):
|
||||||
log.debug("Writing to %s: %s", handle, str2hex(data))
|
log.debug("Writing to %s: %s", handle, str2hex(data))
|
||||||
return self._requester.write_by_handle(handle, data)
|
return self.requester.write_by_handle(handle, data)
|
||||||
|
|
||||||
|
|
||||||
class DebugServer(object):
|
class DebugServer(object):
|
||||||
@ -156,7 +143,7 @@ class DebugServer(object):
|
|||||||
conn, addr = self.sock.accept()
|
conn, addr = self.sock.accept()
|
||||||
if not self._running:
|
if not self._running:
|
||||||
raise KeyboardInterrupt("Shutdown")
|
raise KeyboardInterrupt("Shutdown")
|
||||||
self.ble._requester.notification_sink = lambda x, y: self._notify(conn, x, y)
|
self.ble.requester.notification_sink = lambda x, y: self._notify(conn, x, y)
|
||||||
try:
|
try:
|
||||||
self._handle_conn(conn)
|
self._handle_conn(conn)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
@ -164,7 +151,7 @@ class DebugServer(object):
|
|||||||
except BaseException:
|
except BaseException:
|
||||||
log.error("Problem handling incoming connection: %s", traceback.format_exc())
|
log.error("Problem handling incoming connection: %s", traceback.format_exc())
|
||||||
finally:
|
finally:
|
||||||
self.ble._requester.notification_sink = self._notify_dummy
|
self.ble.requester.notification_sink = self._notify_dummy
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@ -220,11 +207,6 @@ class DebugServer(object):
|
|||||||
def _handle_cmd(self, cmd):
|
def _handle_cmd(self, cmd):
|
||||||
if cmd['type'] == 'write':
|
if cmd['type'] == 'write':
|
||||||
self.ble.write(cmd['handle'], unhexlify(cmd['data']))
|
self.ble.write(cmd['handle'], unhexlify(cmd['data']))
|
||||||
elif cmd['type'] == 'read':
|
|
||||||
data = self.ble.read(cmd['handle'])
|
|
||||||
payload = {"type": "response", "data": str2hex(data)}
|
|
||||||
log.debug("Send response: %s", payload)
|
|
||||||
self.sock.send(json.dumps(payload) + "\n")
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("Unhandled cmd: %s", cmd)
|
raise ValueError("Unhandled cmd: %s", cmd)
|
||||||
|
|
||||||
@ -258,20 +240,6 @@ class DebugServerConnection(Connection):
|
|||||||
}
|
}
|
||||||
self._send(payload)
|
self._send(payload)
|
||||||
|
|
||||||
def read(self, handle):
|
|
||||||
payload = {
|
|
||||||
"type": "read",
|
|
||||||
"handle": handle
|
|
||||||
}
|
|
||||||
self._send(payload)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
for item in self.incoming:
|
|
||||||
if item['type'] == 'response':
|
|
||||||
self.incoming.remove(item)
|
|
||||||
return unhexlify(item['data'])
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
def _send(self, payload):
|
def _send(self, payload):
|
||||||
log.debug("Sending to debug server: %s", payload)
|
log.debug("Sending to debug server: %s", payload)
|
||||||
self.sock.send(json.dumps(payload) + "\n")
|
self.sock.send(json.dumps(payload) + "\n")
|
||||||
|
@ -34,6 +34,7 @@ PORTS = {
|
|||||||
|
|
||||||
# PACKET TYPES
|
# PACKET TYPES
|
||||||
MSG_DEVICE_INFO = 0x01
|
MSG_DEVICE_INFO = 0x01
|
||||||
|
# 0501010305 gives 090001030600000010
|
||||||
MSG_DEVICE_SHUTDOWN = 0x02 # sent when hub shuts down by button hold
|
MSG_DEVICE_SHUTDOWN = 0x02 # sent when hub shuts down by button hold
|
||||||
MSG_PING_RESPONSE = 0x03
|
MSG_PING_RESPONSE = 0x03
|
||||||
MSG_PORT_INFO = 0x04
|
MSG_PORT_INFO = 0x04
|
||||||
|
2
setup.py
2
setup.py
@ -6,5 +6,5 @@ setup(name='pylgbst',
|
|||||||
author='Andrey Pokhilko',
|
author='Andrey Pokhilko',
|
||||||
author_email='apc4@ya.ru',
|
author_email='apc4@ya.ru',
|
||||||
packages=['pylgbst'],
|
packages=['pylgbst'],
|
||||||
requires=['six', 'future', 'gattlib']
|
requires=['six', 'gattlib']
|
||||||
)
|
)
|
||||||
|
4
tests.py
4
tests.py
@ -42,10 +42,6 @@ class ConnectionMock(Connection):
|
|||||||
log.debug("Writing to %s: %s", handle, str2hex(data))
|
log.debug("Writing to %s: %s", handle, str2hex(data))
|
||||||
self.writes.append((handle, str2hex(data)))
|
self.writes.append((handle, str2hex(data)))
|
||||||
|
|
||||||
def read(self, handle):
|
|
||||||
log.debug("Reading from: %s", handle)
|
|
||||||
return None # TODO
|
|
||||||
|
|
||||||
|
|
||||||
class HubMock(MoveHub):
|
class HubMock(MoveHub):
|
||||||
def __init__(self, connection=None):
|
def __init__(self, connection=None):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user