import logging import time from struct import pack, unpack from threading import Thread from six.moves import queue from pylgbst import get_byte, str2hex from pylgbst.constants import * log = logging.getLogger('peripherals') class Peripheral(object): """ :type parent: MoveHub :type _incoming_port_data: queue.Queue """ def __init__(self, parent, port): """ :type parent: pylgbst.MoveHub :type port: int """ super(Peripheral, self).__init__() self.parent = parent self.port = port self._working = False self._subscribers = set() self._port_subscription_mode = None self._incoming_port_data = queue.Queue() thr = Thread(target=self._queue_reader) thr.setDaemon(True) thr.setName("Port data queue: %s" % self) thr.start() def __repr__(self): return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port) def _write_to_hub(self, msg_type, params): cmd = pack(" 1: log.warning("Speed cannot be more than 1") relative = 1 absolute = round(relative * 100) if absolute < 0: absolute += 255 return int(absolute) def _wrap_and_write(self, command, speed_primary, speed_secondary): # set for port command = self.MOVEMENT_TYPE + command command += pack(" 90: return val - 256 else: return val class ColorDistanceSensor(Peripheral): def __init__(self, parent, port): super(ColorDistanceSensor, self).__init__(parent, port) def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_FLOAT, granularity=1): super(ColorDistanceSensor, self).subscribe(callback, mode, granularity) def handle_port_data(self, data): if self._port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT: color = get_byte(data, 4) distance = get_byte(data, 5) partial = get_byte(data, 7) if partial: distance += 1.0 / partial self._notify_subscribers(color, float(distance)) elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY: color = get_byte(data, 4) self._notify_subscribers(color) elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_OFF1 or self._port_subscription_mode == CDS_MODE_OFF2: log.info("Turned off led on %s", self) elif self._port_subscription_mode == CDS_MODE_COUNT_2INCH: count = unpack("