import logging import time from struct import pack, unpack from pylgbst import get_byte, str2hex from pylgbst.constants import * log = logging.getLogger('peripherals') class Peripheral(object): """ :type parent: MoveHub """ def __init__(self, parent, port): """ :type parent: pylgbst.MoveHub :type port: int """ super(Peripheral, self).__init__() self.parent = parent self.port = port self._working = 0 # 3-state, -1 means command sent, 1 means notified on command, 0 means notified on finish self._subscribers = set() self._port_subscription_mode = None def __repr__(self): return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A') def _write_to_hub(self, msg_type, params): cmd = pack(" 1: log.warning("Speed cannot be more than 1") relative = 1 absolute = round(relative * 100) if absolute < 0: absolute += 255 return int(absolute) def _wrap_and_write(self, command, speed_primary, speed_secondary): # set for port command = self.MOVEMENT_TYPE + command command += pack("= pow(2, 16): raise ValueError("Too large value for seconds: %s", seconds) command += pack(' 90: return val - 256 else: return val class ColorDistanceSensor(Peripheral): def __init__(self, parent, port): super(ColorDistanceSensor, self).__init__(parent, port) def subscribe(self, callback, mode=CDS_MODE_COLOR_DISTANCE_FLOAT, granularity=1): super(ColorDistanceSensor, self).subscribe(callback, mode, granularity) def handle_port_data(self, data): if self._port_subscription_mode == CDS_MODE_COLOR_DISTANCE_FLOAT: color = get_byte(data, 4) distance = get_byte(data, 5) partial = get_byte(data, 7) if partial: distance += 1.0 / partial self._notify_subscribers(color, float(distance)) elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY: color = get_byte(data, 4) self._notify_subscribers(color) elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_DISTANCE_HOW_CLOSE: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_DISTANCE_SUBINCH_HOW_CLOSE: distance = get_byte(data, 4) self._notify_subscribers(distance) elif self._port_subscription_mode == CDS_MODE_OFF1 or self._port_subscription_mode == CDS_MODE_OFF2: log.info("Turned off led on %s", self) elif self._port_subscription_mode == CDS_MODE_COUNT_2INCH: count = unpack("