1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Merge branch 'master' of github.com:undera/pylgbst

This commit is contained in:
Andrey Pokhilko 2019-08-13 15:45:59 +03:00
commit 3f19c983c2
11 changed files with 66 additions and 60 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@
*.pyc
build
*.avi
test_real.py
test_real.py
.vscode/settings.json

View File

@ -8,7 +8,7 @@ matrix:
- os: linux
python: 2.7
- os: linux
python: 3.4
python: 3.5
addons:
apt:
@ -27,7 +27,9 @@ addons:
- python3-dbus
- python3-gi
install:
- pip install codecov nose-exclude gattlib pygatt gatt pexpect bluepy
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
- pip install codecov nose-exclude pygatt gatt pexpect bluepy
script: coverage run --source=. -m nose tests -v --exclude-dir=examples

View File

@ -35,7 +35,7 @@ _Please note that this library requires one of Bluetooth backend libraries to be
Install library like this:
```bash
pip install https://github.com/undera/pylgbst/archive/1.0.tar.gz
pip install https://github.com/undera/pylgbst/archive/1.0.1.tar.gz
```
Then instantiate MoveHub object and start invoking its methods. Following is example to just print peripherals detected on Hub:

View File

@ -13,7 +13,7 @@ log = logging.getLogger("demo")
def demo_led_colors(movehub):
# LED colors demo
log.info("LED colors demo")
movehub.color_distance_sensor.subscribe(lambda x, y: None)
movehub.led.subscribe(lambda x, y: None)
for color in list(COLORS.keys())[1:] + [COLOR_BLACK]:
log.info("Setting LED color to: %s", COLORS[color])
movehub.led.set_color(color)
@ -111,11 +111,11 @@ def demo_color_sensor(movehub):
demo_color_sensor.cnt += 1
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, COLORS[color], distance)
movehub.color_distance_sensor.subscribe(callback)
movehub.vision_sensor.subscribe(callback)
while demo_color_sensor.cnt < limit:
time.sleep(1)
movehub.color_distance_sensor.unsubscribe(callback)
movehub.vision_sensor.unsubscribe(callback)
def demo_motor_sensors(movehub):

View File

@ -33,10 +33,11 @@ class GattoolConnection(Connection):
for dev in devices:
address = dev['address']
name = dev['name']
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._conn_hnd = adapter.connect(address)
break
if address != "00:00:00:00:00:00":
if (not hub_mac and name == LEGO_MOVE_HUB) or hub_mac == address:
logging.info("Found %s at %s", name, address)
self._conn_hnd = adapter.connect(address)
break
if self._conn_hnd:
break

View File

@ -61,19 +61,21 @@ class Hub(object):
:rtype: pylgbst.messages.UpstreamMsg
"""
log.debug("Send message: %r", msg)
self.connection.write(self.HUB_HARDWARE_HANDLE, msg.bytes())
msgbytes = msg.bytes()
if msg.needs_reply:
with self._sync_lock:
assert not self._sync_request, "Pending request %r while trying to put %r" % (self._sync_request, msg)
self._sync_request = msg
log.debug("Waiting for sync reply to %r...", msg)
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
resp = self._sync_replies.get()
log.debug("Fetched sync reply: %r", resp)
if isinstance(resp, MsgGenericError):
raise RuntimeError(resp.message())
return resp
else:
self.connection.write(self.HUB_HARDWARE_HANDLE, msgbytes)
return None
def _notify(self, handle, data):
@ -183,12 +185,12 @@ class MoveHub(Hub):
"""
# PORTS
PORT_C = 0x01
PORT_D = 0x02
PORT_A = 0x00
PORT_B = 0x01
PORT_C = 0x02
PORT_D = 0x03
PORT_AB = 0x10
PORT_LED = 0x32
PORT_A = 0x37
PORT_B = 0x38
PORT_AB = 0x39
PORT_TILT_SENSOR = 0x3A
PORT_CURRENT = 0x3B
PORT_VOLTAGE = 0x3C

View File

@ -58,7 +58,7 @@ class UpstreamMsg(Message):
assert hub_id == 0
msg_type = msg._byte()
assert cls.TYPE == msg_type, "Message type does not match: %x!=%x" % (cls.TYPE, msg_type)
assert isinstance(msg.payload, bytes)
assert isinstance(msg.payload, (bytes, bytearray))
return msg
def __shift(self, vtype, vlen):

View File

@ -30,7 +30,7 @@ def usint(seq, index):
def str2hex(data): # we need it for python 2+3 compatibility
# if sys.version_info[0] == 3:
# data = bytes(data, 'ascii')
if not isinstance(data, bytes):
if not isinstance(data, (bytes, bytearray)):
data = bytes(data, 'ascii')
hexed = binascii.hexlify(data)
return hexed

View File

@ -2,7 +2,7 @@ from distutils.core import setup
setup(name='pylgbst',
description='Python library to interact with LEGO Move Hub (from Lego BOOST set)',
version='1.0',
version='1.0.1',
author='Andrey Pokhilko',
author_email='apc4@ya.ru',
packages=['pylgbst', "pylgbst.comms"],

View File

@ -122,20 +122,20 @@ class HubTest(unittest.TestCase):
class MoveHubTest(unittest.TestCase):
def test_capabilities(self):
conn = ConnectionMock()
conn.notifications.append('0f00 04 01 0125000000001000000010')
conn.notifications.append('0f00 04 02 0126000000001000000010')
conn.notifications.append('0f00 04 37 0127000100000001000000')
conn.notifications.append('0f00 04 38 0127000100000001000000')
conn.notifications.append('0900 04 39 0227003738')
conn.notifications.append('0f00 04 02 0125000000001000000010')
conn.notifications.append('0f00 04 03 0126000000001000000010')
conn.notifications.append('0f00 04 00 0127000100000001000000')
conn.notifications.append('0f00 04 01 0127000100000001000000')
conn.notifications.append('0900 04 10 0227003738')
conn.notifications.append('0f00 04 32 0117000100000001000000')
conn.notifications.append('0f00 04 3a 0128000000000100000001')
conn.notifications.append('0f00 04 3b 0115000200000002000000')
conn.notifications.append('0f00 04 3c 0114000200000002000000')
conn.notification_delayed('12000101064c45474f204d6f766520487562', 0.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 0.3)
conn.notification_delayed('060001060600', 0.5)
conn.notification_delayed('0600030104ff', 0.7)
conn.notification_delayed('12000101064c45474f204d6f766520487562', 1.1)
conn.notification_delayed('0b00010d06001653a0d1d4', 1.3)
conn.notification_delayed('060001060600', 1.5)
conn.notification_delayed('0600030104ff', 1.7)
MoveHub(conn.connect())
time.sleep(1)
conn.wait_notifications_handled()

View File

@ -148,49 +148,49 @@ class PeripheralsTest(unittest.TestCase):
motor = EncodedMotor(hub, MoveHub.PORT_D)
hub.peripherals[MoveHub.PORT_D] = motor
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.start_power(1.0)
self.assertEqual(b"0800810211510164", hub.writes.pop(1)[1])
self.assertEqual(b"0800810311510164", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.set_acc_profile(1.0)
self.assertEqual(b"090081021105e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081031105e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.set_dec_profile(1.0)
self.assertEqual(b"090081021106e80300", hub.writes.pop(1)[1])
self.assertEqual(b"090081031106e80300", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.start_speed(1.0)
self.assertEqual(b"090081021107646403", hub.writes.pop(1)[1])
self.assertEqual(b"090081031107646403", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.1)
hub.connection.notification_delayed('050082030a', 0.1)
motor.stop()
self.assertEqual(b"090081021107006403", hub.writes.pop(1)[1])
self.assertEqual(b"090081031107006403", hub.writes.pop(1)[1])
logging.debug("\n\n")
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.timed(1.0)
self.assertEqual(b"0c0081021109e80364647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0c0081031109e80364647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.angled(180)
self.assertEqual(b"0e008102110bb400000064647f03", hub.writes.pop(1)[1])
self.assertEqual(b"0e008103110bb400000064647f03", hub.writes.pop(1)[1])
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('050082030a', 0.2)
motor.preset_encoder(-180)
self.assertEqual(b"0b0081021151024cffffff", hub.writes.pop(1)[1])
self.assertEqual(b"0b0081031151024cffffff", hub.writes.pop(1)[1])
hub.connection.notification_delayed('0500820201', 0.1)
hub.connection.notification_delayed('050082020a', 0.2)
hub.connection.notification_delayed('0500820301', 0.1)
hub.connection.notification_delayed('050082030a', 0.2)
motor.goto_position(0)
self.assertEqual(b"0e008102110d00000000647f6403", hub.writes.pop(1)[1])
self.assertEqual(b"0e008103110d00000000647f6403", hub.writes.pop(1)[1])
hub.connection.wait_notifications_handled()
@ -204,15 +204,15 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a004701020100000001', 0.1)
hub.connection.notification_delayed('0a004702020100000001', 0.1)
motor.subscribe(callback)
hub.connection.notification_delayed("0800450100000000", 0.1)
hub.connection.notification_delayed("08004501ffffffff", 0.2)
hub.connection.notification_delayed("08004501feffffff", 0.3)
hub.connection.notification_delayed("0800450200000000", 0.1)
hub.connection.notification_delayed("08004502ffffffff", 0.2)
hub.connection.notification_delayed("08004502feffffff", 0.3)
time.sleep(0.4)
hub.connection.notification_delayed('0a004701020000000000', 0.1)
hub.connection.notification_delayed('0a004702020000000000', 0.1)
motor.unsubscribe(callback)
hub.connection.wait_notifications_handled()
@ -228,13 +228,13 @@ class PeripheralsTest(unittest.TestCase):
def callback(*args):
vals.append(args)
hub.connection.notification_delayed('0a00 4701080100000001', 0.1)
hub.connection.notification_delayed('0a00 4702080100000001', 0.1)
cds.subscribe(callback)
hub.connection.notification_delayed("08004501ff0aff00", 0.1)
hub.connection.notification_delayed("08004502ff0aff00", 0.1)
time.sleep(0.2)
hub.connection.notification_delayed('0a00 4701090100000001', 0.1)
hub.connection.notification_delayed('0a00 4702090100000001', 0.1)
cds.unsubscribe(callback)
hub.connection.wait_notifications_handled()