1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Debug server works

This commit is contained in:
Andrey Pohilko 2017-09-13 10:12:47 +03:00
parent 14daa67ede
commit e7d43f391b
4 changed files with 84 additions and 18 deletions

View File

@ -3,7 +3,7 @@ import traceback
from time import sleep
from pylegoboost import MoveHub, COLORS_MAP
from pylegoboost.transport import DebugServerConnection, BLEConnection
from pylegoboost.comms import DebugServerConnection, BLEConnection
log = logging.getLogger("demo")

View File

@ -3,7 +3,7 @@ from pylegoboost.constants import *
class MoveHub(object):
"""
:type connection: pylegoboost.transport.Connection
:type connection: pylegoboost.comms.Connection
:type led: LED
"""
@ -21,6 +21,9 @@ class MoveHub(object):
# self.button
# self.tilt_sensor
# transport.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
# transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01')
class Peripheral(object):
"""

View File

@ -1,3 +1,6 @@
"""
This package holds communication aspects
"""
import json
import logging
import socket
@ -12,6 +15,11 @@ log = logging.getLogger('transport')
# noinspection PyMethodOverriding
class Requester(GATTRequester):
"""
Wrapper to access `on_notification` capability of GATT
Set "notification_sink" field to a callable that will handle incoming data
"""
def __init__(self, p_object, *args, **kwargs):
super(Requester, self).__init__(p_object, *args, **kwargs)
self.notification_sink = None
@ -45,6 +53,7 @@ class ConnectionMock(Connection):
"""
def notify(self, handle, data):
# TODO
pass
def write(self, handle, data):
@ -56,6 +65,9 @@ class ConnectionMock(Connection):
class BLEConnection(Connection):
"""
Main transport class, uses real Bluetooth LE connection.
Loops with timeout of 5 seconds to find device named "Lego MOVE Hub"
:type requester: Requester
"""
@ -97,10 +109,19 @@ class BLEConnection(Connection):
return self.requester.write_by_handle(handle, data)
def notify(self, handle, data):
# TODO
log.debug("Notification on %s: %s", handle, data.encode("hex"))
class DebugServer(object):
"""
Starts TCP server to be used with DebugServerConnection to speed-up development process
It holds BLE connection to Move Hub, so no need to re-start it every time
Usage: DebugServer(BLEConnection().connect()).start()
:type ble: BLEConnection
"""
def __init__(self, ble_trans):
self.sock = socket.socket()
self.ble = ble_trans
@ -110,6 +131,7 @@ class DebugServer(object):
self.sock.listen(1)
while True:
log.info("Accepting connections at %s", port)
conn, addr = self.sock.accept()
try:
self._handle_conn(conn)
@ -126,7 +148,7 @@ class DebugServer(object):
buf = ""
while True:
data = conn.recv(1024)
log.debug("Recv: %s", data)
log.debug("Recv: %s", data.strip())
if not data:
break
@ -145,18 +167,65 @@ class DebugServer(object):
# conn.send(data.upper())
def _handle_cmd(self, line):
pass
def _handle_cmd(self, cmd):
if cmd['type'] == 'write':
self.ble.write(cmd['handle'], cmd['data'].decode('hex'))
elif cmd['type'] == 'read':
self.sock.send(self.ble.read(cmd['handle']).encode('hex') + "\n")
else:
raise ValueError("Unhandled cmd: %s", cmd)
class DebugServerConnection(Connection):
"""
Connection type to be used with DebugServer, replaces BLEConnection
"""
def __init__(self):
self.buf = ""
self.sock = socket.socket()
self.sock.connect(('localhost', 9090))
# sock.send('hello, world!')
# data = sock.recv(1024)
def __del__(self):
self.sock.close()
def notify(self, handle, data):
# TODO
pass
def write(self, handle, data):
payload = {
"type": "write",
"handle": handle,
"data": data.encode("hex")
}
self._send(payload)
def read(self, handle):
payload = {
"type": "read",
"handle": handle
}
self._send(payload)
return self._recv()
def _send(self, payload):
log.debug("Sending to debug server: %s", payload)
self.sock.send(json.dumps(payload) + "\n")
def _recv(self):
while True:
data = self.sock.recv(1024)
log.debug("Recv from debug server: %s", data.strip())
if not data:
break
self.buf += data
if "\n" in self.buf:
line = self.buf[:self.buf.index("\n")]
self.buf = self.buf[self.buf.index("\n") + 1:]
if line:
return line.decode("hex")
break
raise RuntimeError("No data read")

12
test.py
View File

@ -2,18 +2,12 @@ import logging
import unittest
from demo import demo_all
from pylegoboost.transport import ConnectionMock
from pylegoboost.comms import ConnectionMock, DebugServerConnection
logging.basicConfig(level=logging.DEBUG)
class GeneralTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
demo_all(conn)
# transport.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
# transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01')
# from pylegoboost import DebugServer
# srv = DebugServer(None)
# srv.start()
#conn = ConnectionMock()
demo_all(DebugServerConnection())