1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Try make it py3 compatible

This commit is contained in:
Andrey Pohilko 2017-09-13 21:32:59 +03:00
parent 8c85d249ed
commit b4dcf03823
3 changed files with 36 additions and 15 deletions

View File

@ -2,6 +2,7 @@ import logging
import struct
import time
from pylgbst.comms import str2hex
from pylgbst.constants import *
log = logging.getLogger('movehub')
@ -50,7 +51,7 @@ class MoveHub(object):
Using https://github.com/JorgePe/BOOSTreveng/blob/master/Notifications.md
"""
orig = data
log.debug("Notification on %s: %s", handle, orig.encode("hex"))
log.debug("Notification on %s: %s", handle, str2hex(orig))
data = data[3:]
msg_type = ord(data[2])
@ -60,7 +61,7 @@ class MoveHub(object):
elif msg_type == MSG_PORT_STATUS:
self._handle_port_status(data)
else:
log.warning("Unhandled msg type 0x%x: %s", msg_type, orig.encode("hex"))
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
pass

View File

@ -4,16 +4,36 @@ This package holds communication aspects
import json
import logging
import socket
import sys
import time
import traceback
from abc import abstractmethod
from gattlib import DiscoveryService, GATTRequester
from threading import Thread
from gattlib import DiscoveryService, GATTRequester
from pylgbst.constants import DEVICE_NAME, LEGO_MOVE_HUB
log = logging.getLogger('transport')
if sys.version_info[0] == 2:
def str2hex(data):
return data.encode("hex")
def hex2str(data):
return data.decode("hex")
else:
import binascii
def str2hex(data):
return binascii.hexlify(data)
def hex2str(data):
return binascii.unhexlify(data)
# noinspection PyMethodOverriding
class Requester(GATTRequester):
@ -32,7 +52,7 @@ class Requester(GATTRequester):
self.notification_sink(handle, data)
def on_indication(self, handle, data):
log.debug("Indication on handle %s: %s", handle, data.encode("hex"))
log.debug("Indication on handle %s: %s", handle, str2hex(data))
class Connection(object):
@ -97,7 +117,7 @@ class BLEConnection(Connection):
return data
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data.encode("hex"))
log.debug("Writing to %s: %s", handle, str2hex(data))
return self.requester.write_by_handle(handle, data)
@ -134,7 +154,7 @@ class DebugServer(object):
self.sock.close()
def _notify(self, conn, handle, data):
payload = {"type": "notification", "handle": handle, "data": data.encode('hex')}
payload = {"type": "notification", "handle": handle, "data": str2hex(data)}
log.debug("Send notification: %s", payload)
try:
conn.send(json.dumps(payload) + "\n")
@ -167,10 +187,10 @@ class DebugServer(object):
def _handle_cmd(self, cmd):
if cmd['type'] == 'write':
self.ble.write(cmd['handle'], cmd['data'].decode('hex'))
self.ble.write(cmd['handle'], hex2str(cmd['data']))
elif cmd['type'] == 'read':
data = self.ble.read(cmd['handle'])
payload = {"type": "response", "data": data.encode('hex')}
payload = {"type": "response", "data": str2hex(data)}
log.debug("Send response: %s", payload)
self.sock.send(json.dumps(payload) + "\n")
else:
@ -201,7 +221,7 @@ class DebugServerConnection(Connection):
payload = {
"type": "write",
"handle": handle,
"data": data.encode("hex")
"data": str2hex(data)
}
self._send(payload)
@ -216,7 +236,7 @@ class DebugServerConnection(Connection):
for item in self.incoming:
if item['type'] == 'response':
self.incoming.remove(item)
return item['data'].decode('hex')
return hex2str(item['data'])
time.sleep(0.1)
def _send(self, payload):
@ -238,7 +258,7 @@ class DebugServerConnection(Connection):
if line:
item = json.loads(line)
if item['type'] == 'notification' and self.notify_handler:
self.notify_handler(item['handle'], item['data'].decode('hex'))
self.notify_handler(item['handle'], hex2str(item['data']))
elif item['type'] == 'response':
self.incoming.append(item)
else:

View File

@ -4,7 +4,7 @@ import unittest
from threading import Thread
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB
from pylgbst.comms import Connection
from pylgbst.comms import Connection, str2hex
from pylgbst.constants import PORT_LED
logging.basicConfig(level=logging.DEBUG)
@ -36,14 +36,14 @@ class ConnectionMock(Connection):
if self.notification_handler:
while self.notifications:
handle, data = self.notifications.pop(0)
self.notification_handler(handle, data.replace(' ', '').decode('hex'))
self.notification_handler(handle, hex2str(data.replace(' ', '')))
time.sleep(0.1)
self.finished = True
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data.encode("hex"))
self.writes.append((handle, data.encode("hex")))
log.debug("Writing to %s: %s", handle, str2hex(data))
self.writes.append((handle, str2hex(data)))
def read(self, handle):
log.debug("Reading from: %s", handle)