import logging import math import time import traceback from struct import pack, unpack from threading import Thread from pylgbst.constants import PORTS, MSG_SENSOR_SUBSCRIBE, COLOR_NONE, COLOR_BLACK, COLORS, MSG_SET_PORT_VAL, PORT_AB, \ MSG_DEVICE_INFO, INFO_BUTTON_STATE, INFO_ACTION_SUBSCRIBE, INFO_ACTION_UNSUBSCRIBE from pylgbst.utilities import queue, str2hex, usbyte, ushort log = logging.getLogger('peripherals') class Peripheral(object): """ :type parent: pylgbst.movehub.MoveHub :type _incoming_port_data: queue.Queue """ def __init__(self, parent, port): """ :type parent: pylgbst.movehub.MoveHub :type port: int """ super(Peripheral, self).__init__() self.parent = parent self.port = port self._working = False self._subscribers = set() self._port_subscription_mode = None # TODO: maybe max queue len of 2? self._incoming_port_data = queue.Queue(1) # limit 1 means we drop data if we can't handle it fast enough thr = Thread(target=self._queue_reader) thr.setDaemon(True) thr.setName("Port data queue: %s" % self) thr.start() def __repr__(self): return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port) def _write_to_hub(self, msg_type, params): cmd = pack(" 1: log.warning("Speed cannot be more than 1") relative = 1 absolute = math.ceil(relative * 100) # scale of 100 is proven by experiments return int(absolute) def _wrap_and_write(self, mtype, params, speed_primary, speed_secondary): if self.port == PORT_AB: mtype += 1 # de-facto rule abs_primary = self._speed_abs(speed_primary) abs_secondary = self._speed_abs(speed_secondary) if mtype == self.ANGLED_GROUP and (not abs_secondary or not abs_primary): raise ValueError("Cannot have zero speed in double angled mode") # otherwise it gets nuts params = pack(" 90: return val - 256 else: return val class ColorDistanceSensor(Peripheral): COLOR_ONLY = 0x00 DISTANCE_INCHES = 0x01 COUNT_2INCH = 0x02 DISTANCE_HOW_CLOSE = 0x03 DISTANCE_SUBINCH_HOW_CLOSE = 0x04 OFF1 = 0x05 STREAM_3_VALUES = 0x06 OFF2 = 0x07 COLOR_DISTANCE_FLOAT = 0x08 LUMINOSITY = 0x09 SOME_20BYTES = 0x0a # TODO: understand it def __init__(self, parent, port): super(ColorDistanceSensor, self).__init__(parent, port) def subscribe(self, callback, mode=COLOR_DISTANCE_FLOAT, granularity=1, is_async=False): super(ColorDistanceSensor, self).subscribe(callback, mode, granularity) def handle_port_data(self, data): if self._port_subscription_mode == self.COLOR_DISTANCE_FLOAT: color = usbyte(data, 4) distance = usbyte(data, 5) partial = usbyte(data, 7) if partial: distance += 1.0 / partial self._notify_subscribers(color, float(distance)) elif self._port_subscription_mode == self.COLOR_ONLY: color = usbyte(data, 4) self._notify_subscribers(color) elif self._port_subscription_mode == self.DISTANCE_INCHES: distance = usbyte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == self.DISTANCE_HOW_CLOSE: distance = usbyte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == self.DISTANCE_SUBINCH_HOW_CLOSE: distance = usbyte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == self.OFF1 or self._port_subscription_mode == self.OFF2: log.info("Turned off led on %s", self) elif self._port_subscription_mode == self.COUNT_2INCH: count = unpack("