mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Get rid of six as dependency
This commit is contained in:
parent
d286f61fa4
commit
b77831f225
@ -5,31 +5,16 @@ import binascii
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
import sys
|
|
||||||
import traceback
|
import traceback
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
|
from gattlib import DiscoveryService, GATTRequester
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from gattlib import DiscoveryService, GATTRequester
|
from pylgbst.constants import MSG_DEVICE_SHUTDOWN, queue, str2hex
|
||||||
from six.moves import queue
|
|
||||||
|
|
||||||
from pylgbst.constants import MSG_DEVICE_SHUTDOWN
|
|
||||||
|
|
||||||
log = logging.getLogger('comms')
|
log = logging.getLogger('comms')
|
||||||
|
|
||||||
|
|
||||||
def str2hex(data): # TODO: eliminate it
|
|
||||||
return binascii.hexlify(data).decode("utf8")
|
|
||||||
|
|
||||||
|
|
||||||
if sys.version_info[0] == 2:
|
|
||||||
def get_byte(seq, index):
|
|
||||||
return ord(seq[index])
|
|
||||||
else:
|
|
||||||
def get_byte(seq, index):
|
|
||||||
return seq[index]
|
|
||||||
|
|
||||||
LEGO_MOVE_HUB = "LEGO Move Hub"
|
LEGO_MOVE_HUB = "LEGO Move Hub"
|
||||||
|
|
||||||
|
|
||||||
@ -44,7 +29,6 @@ class Requester(GATTRequester):
|
|||||||
super(Requester, self).__init__(p_object, *args, **kwargs)
|
super(Requester, self).__init__(p_object, *args, **kwargs)
|
||||||
self.notification_sink = None
|
self.notification_sink = None
|
||||||
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
self._notify_queue = queue.Queue() # this queue is to minimize time spent in gattlib C code
|
self._notify_queue = queue.Queue() # this queue is to minimize time spent in gattlib C code
|
||||||
thr = Thread(target=self._dispatch_notifications)
|
thr = Thread(target=self._dispatch_notifications)
|
||||||
thr.setDaemon(True)
|
thr.setDaemon(True)
|
||||||
|
@ -1,3 +1,23 @@
|
|||||||
|
import binascii
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
|
||||||
|
if sys.version_info[0] == 2:
|
||||||
|
import Queue as queue
|
||||||
|
else:
|
||||||
|
import queue as queue
|
||||||
|
|
||||||
|
queue = queue # just to use it
|
||||||
|
|
||||||
|
|
||||||
|
def str2hex(data): # TODO: eliminate it
|
||||||
|
return binascii.hexlify(data).decode("utf8")
|
||||||
|
|
||||||
|
|
||||||
|
def get_byte(seq, index):
|
||||||
|
return struct.unpack("<B", seq[index:index + 1])[0]
|
||||||
|
|
||||||
|
|
||||||
# GENERAL
|
# GENERAL
|
||||||
MOVE_HUB_HARDWARE_HANDLE = 0x0E
|
MOVE_HUB_HARDWARE_HANDLE = 0x0E
|
||||||
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
|
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
|
||||||
|
@ -2,7 +2,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
from struct import pack
|
from struct import pack
|
||||||
|
|
||||||
from pylgbst.comms import BLEConnection, str2hex, get_byte
|
from pylgbst.comms import BLEConnection
|
||||||
from pylgbst.constants import *
|
from pylgbst.constants import *
|
||||||
from pylgbst.peripherals import Button, EncodedMotor, ColorDistanceSensor, LED, TiltSensor, Voltage, Peripheral, \
|
from pylgbst.peripherals import Button, EncodedMotor, ColorDistanceSensor, LED, TiltSensor, Voltage, Peripheral, \
|
||||||
Amperage
|
Amperage
|
||||||
@ -31,6 +31,10 @@ class MoveHub(object):
|
|||||||
:type motor_external: EncodedMotor
|
:type motor_external: EncodedMotor
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
DEV_STATUS_DETACHED = 0x00
|
||||||
|
DEV_STATUS_DEVICE = 0x01
|
||||||
|
DEV_STATUS_GROUP = 0x02
|
||||||
|
|
||||||
def __init__(self, connection=None):
|
def __init__(self, connection=None):
|
||||||
if not connection:
|
if not connection:
|
||||||
connection = BLEConnection()
|
connection = BLEConnection()
|
||||||
@ -132,12 +136,26 @@ class MoveHub(object):
|
|||||||
|
|
||||||
def _handle_port_info(self, data):
|
def _handle_port_info(self, data):
|
||||||
port = get_byte(data, 3)
|
port = get_byte(data, 3)
|
||||||
dev_type = get_byte(data, 5)
|
status = get_byte(data, 4)
|
||||||
|
|
||||||
if port in PORTS and dev_type in DEVICE_TYPES:
|
if status == self.DEV_STATUS_DETACHED:
|
||||||
log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port])
|
log.info("Detached %s", self.devices[port])
|
||||||
|
self.devices[port] = None
|
||||||
|
elif status == self.DEV_STATUS_DEVICE or status == self.DEV_STATUS_GROUP:
|
||||||
|
dev_type = get_byte(data, 5)
|
||||||
|
self._attach_device(dev_type, port)
|
||||||
else:
|
else:
|
||||||
log.warning("Device 0x%x at port 0x%x", dev_type, port)
|
raise ValueError("Unhandled device status: %s", status)
|
||||||
|
|
||||||
|
self._update_field(port)
|
||||||
|
if self.devices[port] is None:
|
||||||
|
del self.devices[port]
|
||||||
|
|
||||||
|
def _attach_device(self, dev_type, port):
|
||||||
|
if port in PORTS and dev_type in DEVICE_TYPES:
|
||||||
|
log.info("Attached %s at port %s", DEVICE_TYPES[dev_type], PORTS[port])
|
||||||
|
else:
|
||||||
|
log.warning("Attached device 0x%x at port 0x%x", dev_type, port)
|
||||||
|
|
||||||
if dev_type == DEV_MOTOR:
|
if dev_type == DEV_MOTOR:
|
||||||
self.devices[port] = EncodedMotor(self, port)
|
self.devices[port] = EncodedMotor(self, port)
|
||||||
@ -156,9 +174,10 @@ class MoveHub(object):
|
|||||||
elif dev_type == DEV_VOLTAGE:
|
elif dev_type == DEV_VOLTAGE:
|
||||||
self.devices[port] = Voltage(self, port)
|
self.devices[port] = Voltage(self, port)
|
||||||
else:
|
else:
|
||||||
log.debug("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
|
log.warning("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
|
||||||
self.devices[port] = Peripheral(self, port)
|
self.devices[port] = Peripheral(self, port)
|
||||||
|
|
||||||
|
def _update_field(self, port):
|
||||||
if port == PORT_A:
|
if port == PORT_A:
|
||||||
self.motor_A = self.devices[port]
|
self.motor_A = self.devices[port]
|
||||||
elif port == PORT_B:
|
elif port == PORT_B:
|
||||||
@ -178,7 +197,7 @@ class MoveHub(object):
|
|||||||
elif port == PORT_VOLTAGE:
|
elif port == PORT_VOLTAGE:
|
||||||
self.voltage = self.devices[port]
|
self.voltage = self.devices[port]
|
||||||
else:
|
else:
|
||||||
log.debug("Unhandled port: %s", PORTS[port])
|
log.warning("Unhandled port: %s", PORTS[port])
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
|
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
|
||||||
|
@ -4,9 +4,6 @@ import traceback
|
|||||||
from struct import pack, unpack
|
from struct import pack, unpack
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from six.moves import queue
|
|
||||||
|
|
||||||
from pylgbst.comms import str2hex
|
|
||||||
from pylgbst.constants import *
|
from pylgbst.constants import *
|
||||||
|
|
||||||
log = logging.getLogger('peripherals')
|
log = logging.getLogger('peripherals')
|
||||||
@ -29,7 +26,6 @@ class Peripheral(object):
|
|||||||
self._working = False
|
self._working = False
|
||||||
self._subscribers = set()
|
self._subscribers = set()
|
||||||
self._port_subscription_mode = None
|
self._port_subscription_mode = None
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
self._incoming_port_data = queue.Queue()
|
self._incoming_port_data = queue.Queue()
|
||||||
thr = Thread(target=self._queue_reader)
|
thr = Thread(target=self._queue_reader)
|
||||||
thr.setDaemon(True)
|
thr.setDaemon(True)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user