1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00
Ingo Jache 990ccdb268 New LEGO-Boost Firmware 2.0.00.0017 breaks pylgbst... (#33)
* - Updated Port-Numbers (New Firmware 2.0.00.0017)
- Skipping Device without Mac-Address
(Seems to occur randomly after Firmware-Update,
referring to https://github.com/LLK/scratch-vm/issues/2230 )
- Fixed racecondition in Hub.send()
(Reply could come before entering _sync_lock)

* - fixed Unit-Tests (Changed Port-Numbers)
- moved the write-call in hub.send() outside the lock
2019-08-12 10:38:04 +03:00

146 lines
5.7 KiB
Python

import time
import unittest
from pylgbst.hub import Hub, MoveHub
from pylgbst.messages import MsgHubAction, MsgHubAlert, MsgHubProperties
from pylgbst.peripherals import VisionSensor
from pylgbst.utilities import usbyte
from tests import ConnectionMock
class HubTest(unittest.TestCase):
def test_hub_properties(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed('060001060600', 0.1)
msg = MsgHubProperties(MsgHubProperties.VOLTAGE_PERC, MsgHubProperties.UPD_REQUEST)
resp = hub.send(msg)
assert isinstance(resp, MsgHubProperties)
self.assertEqual(1, len(resp.parameters))
self.assertEqual(0, usbyte(resp.parameters, 0))
conn.notification_delayed('12000101064c45474f204d6f766520487562', 0.1)
msg = MsgHubProperties(MsgHubProperties.ADVERTISE_NAME, MsgHubProperties.UPD_REQUEST)
resp = hub.send(msg)
assert isinstance(resp, MsgHubProperties)
self.assertEqual(b"LEGO Move Hub", resp.parameters)
conn.wait_notifications_handled()
def test_device_attached(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
# regular startup attaches
conn.notifications.append('0f0004010126000000001000000010')
conn.notifications.append('0f0004020125000000001000000010')
# detach and reattach
conn.notifications.append('0500040100')
conn.notifications.append('0500040200')
conn.notifications.append('0f0004010126000000001000000010')
conn.notifications.append('0f0004020125000000001000000010')
del hub
conn.wait_notifications_handled()
def test_hub_actions(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed('04000230', 0.1)
hub.send(MsgHubAction(MsgHubAction.UPSTREAM_SHUTDOWN))
conn.notification_delayed('04000231', 0.1)
hub.send(MsgHubAction(MsgHubAction.UPSTREAM_DISCONNECT))
conn.notification_delayed('04000232', 0.1)
hub.send(MsgHubAction(MsgHubAction.UPSTREAM_BOOT_MODE))
time.sleep(0.1)
conn.wait_notifications_handled()
def test_hub_alerts(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed('0600 03 0104ff', 0.1)
hub.send(MsgHubAlert(MsgHubAlert.LOW_VOLTAGE, MsgHubAlert.UPD_REQUEST))
conn.notification_delayed('0600 03 030400', 0.1)
hub.send(MsgHubAlert(MsgHubAlert.LOW_SIGNAL, MsgHubAlert.UPD_REQUEST))
conn.wait_notifications_handled()
def test_error(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed("0500056105", 0.1)
time.sleep(0.2)
conn.wait_notifications_handled()
def test_disconnect_off(self):
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed("04000231", 0.1)
hub.disconnect()
self.assertEqual(b"04000202", conn.writes[1][1])
conn = ConnectionMock().connect()
hub = Hub(conn)
conn.notification_delayed("04000230", 0.1)
hub.switch_off()
self.assertEqual(b"04000201", conn.writes[1][1])
def test_sensor(self):
conn = ConnectionMock().connect()
conn.notifications.append("0f0004020125000000001000000010") # add dev
hub = Hub(conn)
time.sleep(0.1)
dev = hub.peripherals[0x02]
assert isinstance(dev, VisionSensor)
conn.notification_delayed("0a004702080000000000", 0.1)
conn.notification_delayed("08004502ff0aff00", 0.2) # value for sensor
self.assertEqual((255, 10.0), dev.get_sensor_data(VisionSensor.COLOR_DISTANCE_FLOAT))
self.assertEqual(b"0a004102080100000000", conn.writes.pop(1)[1])
self.assertEqual(b"0500210200", conn.writes.pop(1)[1])
vals = []
cb = lambda x, y=None: vals.append((x, y))
conn.notification_delayed("0a004702080100000001", 0.1) # subscribe ack
dev.subscribe(cb, granularity=1)
self.assertEqual(b"0a004102080100000001", conn.writes.pop(1)[1])
conn.notification_delayed("08004502ff0aff00", 0.1) # value for sensor
time.sleep(0.3)
conn.notification_delayed("0a004702080000000000", 0.3) # unsubscribe ack
dev.unsubscribe(cb)
self.assertEqual(b"0a004102080100000000", conn.writes.pop(1)[1])
self.assertEqual([(255, 10.0)], vals)
class MoveHubTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append('0f00 04 02 0125000000001000000010')
conn.notifications.append('0f00 04 03 0126000000001000000010')
conn.notifications.append('0f00 04 00 0127000100000001000000')
conn.notifications.append('0f00 04 01 0127000100000001000000')
conn.notifications.append('0900 04 10 0227003738')
conn.notifications.append('0f00 04 32 0117000100000001000000')
conn.notifications.append('0f00 04 3a 0128000000000100000001')
conn.notifications.append('0f00 04 3b 0115000200000002000000')
conn.notifications.append('0f00 04 3c 0114000200000002000000')
conn.notification_delayed('12000101064c45474f204d6f766520487562', 1.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 1.3)
conn.notification_delayed('060001060600', 1.5)
conn.notification_delayed('0600030104ff', 1.7)
MoveHub(conn.connect())
time.sleep(1)
conn.wait_notifications_handled()
self.assertEqual(b"0500010105", conn.writes[1][1])
self.assertEqual(b"0500010d05", conn.writes[2][1])
self.assertEqual(b"0500010605", conn.writes[3][1])
self.assertEqual(b"0500030103", conn.writes[4][1])