import logging import struct import time from pylgbst import get_byte, int2byte from pylgbst.constants import * log = logging.getLogger('peripherals') class Peripheral(object): """ :type parent: MoveHub """ def __init__(self, parent, port): """ :type parent: pylgbst.MoveHub :type port: int """ super(Peripheral, self).__init__() self.parent = parent self.port = port self.working = False self._subscribers = set() def __repr__(self): return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A') def _write_to_hub(self, msg_type, params): cmd = int2byte(PACKET_VER) + int2byte(msg_type) + int2byte(self.port) cmd += params self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, int2byte(len(cmd) + 1) + cmd) # should we +1 cmd len here? def _set_port_val(self, value): # FIXME: became obsolete self._write_to_hub(MSG_SET_PORT_VAL, value) def _subscribe_on_port(self, params): # FIXME: became obsolete self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params) def started(self): self.working = True def finished(self): self.working = False def _notify_subscribers(self, *args, **kwargs): for subscriber in self._subscribers: subscriber(*args, **kwargs) class LED(Peripheral): def set_color(self, color): if color not in COLORS: raise ValueError("Color %s is not in list of available colors" % color) cmd = b'\x11\x51\x00' + int2byte(color) self._set_port_val(cmd) class EncodedMotor(Peripheral): TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something MOVEMENT_TYPE = b'\x11' TIMED_SINGLE = b'\x09' TIMED_GROUP = b'\x0A' ANGLED_SINGLE = b'\x0B' ANGLED_GROUP = b'\x0C' def __init__(self, parent, port): super(EncodedMotor, self).__init__(parent, port) if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]: raise ValueError("Invalid port for motor: %s" % port) def _speed_abs(self, relative): if relative < -1 or relative > 1: raise ValueError("Invalid speed value: %s", relative) absolute = round(relative * 100) if absolute < 0: absolute += 255 return int(absolute) def _wrap_and_write(self, command, speed_primary, speed_secondary): # set for port command = self.MOVEMENT_TYPE + command command += int2byte(self._speed_abs(speed_primary)) if self.port == PORT_AB: command += int2byte(self._speed_abs(speed_secondary)) command += self.TRAILER self._set_port_val(command) def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False): if speed_secondary is None: speed_secondary = speed_primary # movement type command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE # time msec = int(seconds * 1000) if msec >= pow(2, 16): raise ValueError("Too large value for seconds: %s", seconds) command += struct.pack(' 90: return val - 256 else: return val class ColorDistanceSensor(Peripheral): pass class Button(Peripheral): def __init__(self, parent): super(Button, self).__init__(parent, 0) LISTEN_COLOR_SENSOR_ON_C = b' \x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01' LISTEN_COLOR_SENSOR_ON_D = b' \x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01' LISTEN_DIST_SENSOR_ON_C = b' \x0a\x00 \x41\x01 \x08\x01\x00\x00\x00\x01' LISTEN_DIST_SENSOR_ON_D = b' \x0a\x00 \x41\x02 \x08\x01\x00\x00\x00\x01' LISTEN_ENCODER_ON_A = b' \x0a\x00 \x41\x37 \x02\x01\x00\x00\x00\x01' LISTEN_ENCODER_ON_B = b' \x0a\x00 \x41\x38 \x02\x01\x00\x00\x00\x01' LISTEN_ENCODER_ON_C = b' \x0a\x00 \x41\x01 \x02\x01\x00\x00\x00\x01' LISTEN_ENCODER_ON_D = b' \x0a\x00 \x41\x02 \x02\x01\x00\x00\x00\x01'