mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Refactoring
This commit is contained in:
parent
abd502b70c
commit
b40ee3ac1a
@ -8,36 +8,25 @@ import sys
|
|||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
|
from binascii import unhexlify
|
||||||
from gattlib import DiscoveryService, GATTRequester
|
from gattlib import DiscoveryService, GATTRequester
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
|
import binascii
|
||||||
|
|
||||||
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
|
from pylgbst.constants import LEGO_MOVE_HUB, MSG_DEVICE_SHUTDOWN
|
||||||
|
|
||||||
log = logging.getLogger('transport')
|
log = logging.getLogger('transport')
|
||||||
|
|
||||||
if sys.version_info[0] == 2:
|
|
||||||
def str2hex(data):
|
|
||||||
return data.encode("hex")
|
|
||||||
|
|
||||||
|
def str2hex(data): # TODO: eliminate it
|
||||||
def hex2str(data):
|
|
||||||
return data.decode("hex")
|
|
||||||
|
|
||||||
|
|
||||||
def get_byte(seq, index):
|
|
||||||
return ord(seq[index])
|
|
||||||
else:
|
|
||||||
import binascii
|
|
||||||
|
|
||||||
|
|
||||||
def str2hex(data):
|
|
||||||
return binascii.hexlify(data).decode("utf8")
|
return binascii.hexlify(data).decode("utf8")
|
||||||
|
|
||||||
|
|
||||||
def hex2str(data):
|
if sys.version_info[0] == 2:
|
||||||
return binascii.unhexlify(data)
|
def get_byte(seq, index):
|
||||||
|
return ord(seq[index])
|
||||||
|
else:
|
||||||
def get_byte(seq, index):
|
def get_byte(seq, index):
|
||||||
return seq[index]
|
return seq[index]
|
||||||
|
|
||||||
@ -167,7 +156,7 @@ class DebugServer(object):
|
|||||||
self.sock.close()
|
self.sock.close()
|
||||||
|
|
||||||
def _notify_dummy(self, handle, data):
|
def _notify_dummy(self, handle, data):
|
||||||
log.debug("Notification from handle %s: %s", handle, hex2str(data))
|
log.debug("Notification from handle %s: %s", handle, unhexlify(data))
|
||||||
self._check_shutdown(data)
|
self._check_shutdown(data)
|
||||||
|
|
||||||
def _notify(self, conn, handle, data):
|
def _notify(self, conn, handle, data):
|
||||||
@ -215,7 +204,7 @@ class DebugServer(object):
|
|||||||
|
|
||||||
def _handle_cmd(self, cmd):
|
def _handle_cmd(self, cmd):
|
||||||
if cmd['type'] == 'write':
|
if cmd['type'] == 'write':
|
||||||
self.ble.write(cmd['handle'], hex2str(cmd['data']))
|
self.ble.write(cmd['handle'], unhexlify(cmd['data']))
|
||||||
elif cmd['type'] == 'read':
|
elif cmd['type'] == 'read':
|
||||||
data = self.ble.read(cmd['handle'])
|
data = self.ble.read(cmd['handle'])
|
||||||
payload = {"type": "response", "data": str2hex(data)}
|
payload = {"type": "response", "data": str2hex(data)}
|
||||||
@ -264,7 +253,7 @@ class DebugServerConnection(Connection):
|
|||||||
for item in self.incoming:
|
for item in self.incoming:
|
||||||
if item['type'] == 'response':
|
if item['type'] == 'response':
|
||||||
self.incoming.remove(item)
|
self.incoming.remove(item)
|
||||||
return hex2str(item['data'])
|
return unhexlify(item['data'])
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
def _send(self, payload):
|
def _send(self, payload):
|
||||||
@ -286,7 +275,7 @@ class DebugServerConnection(Connection):
|
|||||||
if line:
|
if line:
|
||||||
item = json.loads(line)
|
item = json.loads(line)
|
||||||
if item['type'] == 'notification' and self.notify_handler:
|
if item['type'] == 'notification' and self.notify_handler:
|
||||||
self.notify_handler(item['handle'], hex2str(item['data']))
|
self.notify_handler(item['handle'], unhexlify(item['data']))
|
||||||
elif item['type'] == 'response':
|
elif item['type'] == 'response':
|
||||||
self.incoming.append(item)
|
self.incoming.append(item)
|
||||||
else:
|
else:
|
||||||
|
3
tests.py
3
tests.py
@ -1,4 +1,5 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
from binascii import unhexlify
|
||||||
|
|
||||||
from pylgbst import *
|
from pylgbst import *
|
||||||
|
|
||||||
@ -33,7 +34,7 @@ class ConnectionMock(Connection):
|
|||||||
if self.notification_handler:
|
if self.notification_handler:
|
||||||
while self.notifications:
|
while self.notifications:
|
||||||
handle, data = self.notifications.pop(0)
|
handle, data = self.notifications.pop(0)
|
||||||
self.notification_handler(handle, hex2str(data.replace(' ', '')))
|
self.notification_handler(handle, unhexlify(data.replace(' ', '')))
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
self.finished = True
|
self.finished = True
|
||||||
|
@ -22,6 +22,7 @@ class Vernie(MoveHub):
|
|||||||
self._color_detected = COLOR_NONE
|
self._color_detected = COLOR_NONE
|
||||||
self._sensor_distance = 10
|
self._sensor_distance = 10
|
||||||
self.color_distance_sensor.subscribe(self._color_distance_data)
|
self.color_distance_sensor.subscribe(self._color_distance_data)
|
||||||
|
log.info("Vernie is ready.")
|
||||||
|
|
||||||
def _external_motor_data(self, data):
|
def _external_motor_data(self, data):
|
||||||
log.debug("External motor position: %s", data)
|
log.debug("External motor position: %s", data)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user