mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
handle push button
This commit is contained in:
parent
33e97e474f
commit
fa0ee2d77a
12
demo.py
12
demo.py
@ -158,11 +158,12 @@ def demo_all(movehub):
|
||||
|
||||
|
||||
def cb_log(val1, val2=None, val3=None):
|
||||
log.info("V1:%s\tV2:%s\tV3:%s", unpack("<H", val1), val2, val3)
|
||||
pass
|
||||
# log.info("V1:%s\tV2:%s\tV3:%s", unpack("<H", val1), val2, val3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
try:
|
||||
connection = DebugServerConnection()
|
||||
@ -172,9 +173,10 @@ if __name__ == '__main__':
|
||||
|
||||
hub = MoveHub(connection)
|
||||
|
||||
#hub.devices[PORT_SOMETHING1].subscribe(cb_log, 0x00, granularity=1)
|
||||
hub.button.subscribe(cb_log, 0x00, granularity=1)
|
||||
sleep(10)
|
||||
demo_port_cd_motor(hub)
|
||||
# demo_port_cd_motor(hub)
|
||||
sleep(10)
|
||||
hub.button.unsubscribe(cb_log)
|
||||
sleep(10)
|
||||
#hub.devices[PORT_SOMETHING1].unsubscribe(cb_log)
|
||||
# demo_all(hub)
|
||||
|
@ -88,9 +88,17 @@ class MoveHub(object):
|
||||
elif msg_type == MSG_DEVICE_SHUTDOWN:
|
||||
log.warning("Device reported shutdown: %s", str2hex(data))
|
||||
raise KeyboardInterrupt("Device shutdown")
|
||||
elif msg_type == MSG_DEVICE_INFO:
|
||||
self._handle_device_info(data)
|
||||
else:
|
||||
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
|
||||
|
||||
def _handle_device_info(self, data):
|
||||
if get_byte(data, 3) == 2:
|
||||
self.button.handle_port_data(data)
|
||||
else:
|
||||
log.warning("Unhandled device info: %s", str2hex(data))
|
||||
|
||||
def _handle_sensor_data(self, data):
|
||||
port = get_byte(data, 3)
|
||||
if port not in self.devices:
|
||||
@ -159,3 +167,7 @@ class MoveHub(object):
|
||||
self.battery = self.devices[port]
|
||||
else:
|
||||
log.warning("Unhandled port: %s", PORTS[port])
|
||||
|
||||
def shutdown(self):
|
||||
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_SHUTDOWN)
|
||||
self.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
|
||||
|
@ -16,7 +16,7 @@ PORT_LED = 0x32
|
||||
PORT_A = 0x37
|
||||
PORT_B = 0x38
|
||||
PORT_AB = 0x39
|
||||
PORT_TILT_SENSOR = 0x3a
|
||||
PORT_TILT_SENSOR = 0x3A
|
||||
PORT_SOMETHING1 = 0x3B
|
||||
PORT_BATTERY = 0x3C # subscribing on this port showed correlation with power voltage
|
||||
|
||||
@ -46,7 +46,7 @@ MSG_SENSOR_DATA = 0x45
|
||||
MSG_SENSOR_SUBSCRIBE_ACK = 0x47
|
||||
|
||||
# DEVICE TYPES
|
||||
DEV_UNKNOWN1 = 0x15 # one of them is button?
|
||||
DEV_UNKNOWN1 = 0x15 # one of them is button? onboard temperature? maybe another kind of voltage, they have same params
|
||||
DEV_BATTERY = 0x14
|
||||
DEV_DCS = 0x25
|
||||
DEV_IMOTOR = 0x26
|
||||
@ -61,7 +61,7 @@ DEVICE_TYPES = {
|
||||
DEV_TILT_SENSOR: "TILT_SENSOR",
|
||||
DEV_LED: "LED",
|
||||
DEV_UNKNOWN1: "UNKNOWN #1",
|
||||
DEV_BATTERY: "UNKNOWN #2",
|
||||
DEV_BATTERY: "BATTERY",
|
||||
}
|
||||
|
||||
# NOTIFICATIONS
|
||||
|
@ -26,7 +26,7 @@ class Peripheral(object):
|
||||
self._port_subscription_mode = None
|
||||
|
||||
def __repr__(self):
|
||||
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else 'N/A')
|
||||
return "%s on port %s" % (self.__class__.__name__, PORTS[self.port] if self.port in PORTS else self.port)
|
||||
|
||||
def _write_to_hub(self, msg_type, params):
|
||||
cmd = pack("<B", PACKET_VER) + pack("<B", msg_type) + pack("<B", self.port)
|
||||
@ -275,11 +275,38 @@ class ColorDistanceSensor(Peripheral):
|
||||
|
||||
|
||||
class Battery(Peripheral):
|
||||
def __init__(self, parent, port):
|
||||
super(Battery, self).__init__(parent, port)
|
||||
self.last_value = None
|
||||
|
||||
# we know only voltage subscription from it. is it really battery or just onboard voltage?
|
||||
# device has turned off on 1b0e000600453ba800
|
||||
def handle_port_data(self, data):
|
||||
self._notify_subscribers(unpack("<H", data[4:6]))
|
||||
self.last_value = unpack("<H", data[4:6])[0]
|
||||
self._notify_subscribers(self.last_value)
|
||||
|
||||
|
||||
class Button(Peripheral):
|
||||
"""
|
||||
It's not really a peripheral, we use MSG_DEVICE_INFO commands to interact with it
|
||||
"""
|
||||
|
||||
def __init__(self, parent):
|
||||
super(Button, self).__init__(parent, 0)
|
||||
|
||||
def subscribe(self, callback, mode=None, granularity=1):
|
||||
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_INFO) + b'\x02\x02'
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
|
||||
if callback:
|
||||
self._subscribers.add(callback)
|
||||
|
||||
def unsubscribe(self, callback=None):
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
if not self._subscribers:
|
||||
cmd = pack("<B", PACKET_VER) + pack("<B", MSG_DEVICE_INFO) + b'\x02\x03'
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
|
||||
|
||||
def handle_port_data(self, data):
|
||||
self._notify_subscribers(bool(unpack("<B", data[5:6])[0]))
|
||||
|
17
tests.py
17
tests.py
@ -1,5 +1,4 @@
|
||||
import unittest
|
||||
from binascii import unhexlify
|
||||
|
||||
from pylgbst import *
|
||||
|
||||
@ -149,3 +148,19 @@ class GeneralTest(unittest.TestCase):
|
||||
# TODO: assert
|
||||
self._wait_notifications_handled(hub)
|
||||
hub.color_distance_sensor.unsubscribe(callback)
|
||||
|
||||
def test_button(self):
|
||||
hub = HubMock()
|
||||
time.sleep(1)
|
||||
|
||||
def callback(pressed):
|
||||
log.info("Pressed: %s", pressed)
|
||||
|
||||
hub.button.subscribe(callback)
|
||||
|
||||
hub.connection.notifications.append((HANDLE, "1b0e00060001020601"))
|
||||
hub.connection.notifications.append((HANDLE, "1b0e00060001020600"))
|
||||
time.sleep(1)
|
||||
# TODO: assert
|
||||
self._wait_notifications_handled(hub)
|
||||
hub.button.unsubscribe(callback)
|
||||
|
Loading…
x
Reference in New Issue
Block a user