import logging import time import traceback from struct import pack, unpack from threading import Thread from six.moves import queue from pylgbst.comms import str2hex from pylgbst.constants import * log = logging.getLogger('peripherals') class Peripheral(object): """ :type parent: MoveHub :type _incoming_port_data: queue.Queue """ def __init__(self, parent, port): """ :type parent: pylgbst.movehub.MoveHub :type port: int """ super(Peripheral, self).__init__() self.parent = parent self.port = port self._working = False self._subscribers = set() self._port_subscription_mode = None # noinspection PyUnresolvedReferences self._incoming_port_data = queue.Queue() thr = Thread(target=self._queue_reader) thr.setDaemon(True) thr.setName("Port data queue: %s" % self) thr.start() def __repr__(self): return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port) def _write_to_hub(self, msg_type, params): cmd = pack(" 1: log.warning("Speed cannot be more than 1") relative = 1 absolute = round(relative * 100) # scale of 100 is proven by experiments if absolute < 0: absolute += 255 return int(absolute) def _wrap_and_write(self, mtype, params, speed_primary, speed_secondary): if self.port == PORT_AB: mtype += 1 # de-facto rule params = pack(" 90: return val - 256 else: return val class ColorDistanceSensor(Peripheral): COLOR_ONLY = 0x00 DISTANCE_INCHES = 0x01 COUNT_2INCH = 0x02 DISTANCE_HOW_CLOSE = 0x03 DISTANCE_SUBINCH_HOW_CLOSE = 0x04 OFF1 = 0x05 STREAM_3_VALUES = 0x06 OFF2 = 0x07 COLOR_DISTANCE_FLOAT = 0x08 LUMINOSITY = 0x09 SOME_20BYTES = 0x0a # TODO: understand it def __init__(self, parent, port): super(ColorDistanceSensor, self).__init__(parent, port) def subscribe(self, callback, mode=COLOR_DISTANCE_FLOAT, granularity=1, async=False): super(ColorDistanceSensor, self).subscribe(callback, mode, granularity) def handle_port_data(self, data): if self._port_subscription_mode == self.COLOR_DISTANCE_FLOAT: color = unpack("