import logging import math import traceback from struct import pack, unpack from threading import Thread from pylgbst.messages import MsgHubProperties, MsgPortOutput, MsgPortInputFmtSetupSingle, MsgPortInfoRequest, \ MsgPortModeInfoRequest, MsgPortInfo, MsgPortModeInfo, MsgPortInputFmtSingle from pylgbst.utilities import queue, str2hex, usbyte, ushort, usint log = logging.getLogger('peripherals') # COLORS COLOR_BLACK = 0x00 COLOR_PINK = 0x01 COLOR_PURPLE = 0x02 COLOR_BLUE = 0x03 COLOR_LIGHTBLUE = 0x04 COLOR_CYAN = 0x05 COLOR_GREEN = 0x06 COLOR_YELLOW = 0x07 COLOR_ORANGE = 0x09 COLOR_RED = 0x09 COLOR_WHITE = 0x0a COLOR_NONE = 0xFF COLORS = { COLOR_BLACK: "BLACK", COLOR_PINK: "PINK", COLOR_PURPLE: "PURPLE", COLOR_BLUE: "BLUE", COLOR_LIGHTBLUE: "LIGHTBLUE", COLOR_CYAN: "CYAN", COLOR_GREEN: "GREEN", COLOR_YELLOW: "YELLOW", COLOR_ORANGE: "ORANGE", COLOR_RED: "RED", COLOR_WHITE: "WHITE", COLOR_NONE: "NONE" } # TODO: support more types of peripherals from # https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#io-type-id class Peripheral(object): """ :type parent: pylgbst.hub.Hub :type _incoming_port_data: queue.Queue :type _port_mode: MsgPortInputFmtSingle """ def __init__(self, parent, port): """ :type parent: pylgbst.hub.Hub :type port: int """ super(Peripheral, self).__init__() self.virtual_ports = () self.hub = parent self.port = port self.is_buffered = False self._subscribers = set() self._port_mode = MsgPortInputFmtSingle(self.port, None, False, 1) self._incoming_port_data = queue.Queue(1) # limit 1 means we drop data if we can't handle it fast enough thr = Thread(target=self._queue_reader) thr.setDaemon(True) thr.setName("Port data queue: %s" % self) thr.start() def __repr__(self): msg = "%s on port 0x%x" % (self.__class__.__name__, self.port) if self.virtual_ports: msg += " (ports 0x%x and 0x%x combined)" % (self.virtual_ports[0], self.virtual_ports[1]) return msg def set_port_mode(self, mode, send_updates=None, update_delta=None): assert not self.virtual_ports, "TODO: support combined mode for sensors" if send_updates is None: send_updates = self._port_mode.upd_enabled log.debug("Implied update is enabled=%s", send_updates) if update_delta is None: update_delta = self._port_mode.upd_delta log.debug("Implied update delta=%s", update_delta) if self._port_mode.mode == mode \ and self._port_mode.upd_enabled == send_updates \ and self._port_mode.upd_delta == update_delta: log.debug("Already in target mode, no need to switch") return else: msg = MsgPortInputFmtSetupSingle(self.port, mode, update_delta, send_updates) resp = self.hub.send(msg) assert isinstance(resp, MsgPortInputFmtSingle) self._port_mode = resp def _send_output(self, msg): assert isinstance(msg, MsgPortOutput) msg.is_buffered = self.is_buffered # TODO: support buffering self.hub.send(msg) def get_sensor_data(self, mode): self.set_port_mode(mode) msg = MsgPortInfoRequest(self.port, MsgPortInfoRequest.INFO_PORT_VALUE) resp = self.hub.send(msg) return self._decode_port_data(resp) def subscribe(self, callback, mode=0x00, granularity=1): if self._port_mode.mode != mode and self._subscribers: raise ValueError("Port is in active mode %r, unsubscribe all subscribers first" % self._port_mode) self.set_port_mode(mode, True, granularity) if callback: self._subscribers.add(callback) def unsubscribe(self, callback=None): if callback in self._subscribers: self._subscribers.remove(callback) if not self._port_mode.upd_enabled: log.warning("Attempt to unsubscribe while port value updates are off: %s", self) elif not self._subscribers: self.set_port_mode(self._port_mode.mode, False) def _notify_subscribers(self, *args, **kwargs): for subscriber in self._subscribers: subscriber(*args, **kwargs) return args def queue_port_data(self, msg): try: self._incoming_port_data.put_nowait(msg) except queue.Full: log.debug("Dropped port data: %r", msg) def _decode_port_data(self, msg): """ :rtype: tuple """ log.warning("Unhandled port data: %r", msg) return () def _handle_port_data(self, msg): """ :type msg: pylgbst.messages.MsgPortValueSingle """ decoded = self._decode_port_data(msg) assert isinstance(decoded, (tuple, list)), "Unexpected data type: %s" % type(decoded) self._notify_subscribers(*decoded) def _queue_reader(self): while True: msg = self._incoming_port_data.get() try: self._handle_port_data(msg) except BaseException: log.warning("%s", traceback.format_exc()) log.warning("Failed to handle port data by %s: %r", self, msg) def describe_possible_modes(self): mode_info = self.hub.send(MsgPortInfoRequest(self.port, MsgPortInfoRequest.INFO_MODE_INFO)) assert isinstance(mode_info, MsgPortInfo) info = { "mode_count": mode_info.total_modes, "input_modes": [], "output_modes": [], "capabilities": { "logically_combinable": mode_info.is_combinable(), "synchronizable": mode_info.is_synchronizable(), "can_output": mode_info.is_output(), "can_input": mode_info.is_input(), } } if mode_info.is_combinable(): mode_combinations = self.hub.send(MsgPortInfoRequest(self.port, MsgPortInfoRequest.INFO_MODE_COMBINATIONS)) assert isinstance(mode_combinations, MsgPortInfo) info['possible_mode_combinations'] = mode_combinations.possible_mode_combinations info['modes'] = [] for mode in range(256): info['modes'].append(self._describe_mode(mode)) for mode in mode_info.output_modes: info['output_modes'].append(self._describe_mode(mode)) for mode in mode_info.input_modes: info['input_modes'].append(self._describe_mode(mode)) log.debug("Port info for 0x%x: %s", self.port, info) return info def _describe_mode(self, mode): descr = {"Mode": mode} for info in MsgPortModeInfoRequest.INFO_TYPES: try: resp = self.hub.send(MsgPortModeInfoRequest(self.port, mode, info)) assert isinstance(resp, MsgPortModeInfo) descr[MsgPortModeInfoRequest.INFO_TYPES[info]] = resp.value except RuntimeError: log.debug("Got error while requesting info 0x%x: %s", info, traceback.format_exc()) if info == MsgPortModeInfoRequest.INFO_NAME: break return descr class LEDRGB(Peripheral): MODE_INDEX = 0x00 MODE_RGB = 0x01 def __init__(self, parent, port): super(LEDRGB, self).__init__(parent, port) def set_color(self, color): if isinstance(color, (list, tuple)): assert len(color) == 3, "RGB color has to have 3 values" self.set_port_mode(self.MODE_RGB) payload = pack(" 1: log.warning("Speed cannot be more than 1") relative = 1 absolute = math.ceil(relative * 100) # scale of 100 is proven by experiments return int(absolute) def _write_direct_mode(self, subcmd, params): if self.virtual_ports: subcmd += 1 # de-facto rule params = pack("