1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Bunch of Tiny Fixes and Enhancements (#41)

* Use setuptools to allow the extras_require to work in python3.6

This also declares some hidden dependencies for the underlying
connection protocols, but note that they are normally reliant
on system-packaged versions, which is a bit less than optimal.

* In message, on assert of incoming type, note failing type

* In utilities, guard against truncated input.

* In demo allow for specifying different connections and demos on command line

Also addresses a crash in led demo where parameters x and y were not provided
to an empty lamba that was passed in.

* Remove commentted line, apply black formatting

* Raise TypeError when an incorrectly-typed message is received

* Apply black automatic formatting to the utilities module
This commit is contained in:
Mike C. Fletcher 2019-12-27 02:27:59 -05:00 committed by Andrey Pokhilko
parent 7efd92700d
commit c955820521
4 changed files with 112 additions and 24 deletions

View File

@ -13,7 +13,10 @@ log = logging.getLogger("demo")
def demo_led_colors(movehub):
# LED colors demo
log.info("LED colors demo")
movehub.led.subscribe(lambda x, y: None)
# We get a response with payload and port, not x and y here...
def colour_callback(**named):
log.info("LED Color callback: %s", named)
movehub.led.subscribe(colour_callback)
for color in list(COLORS.keys())[1:] + [COLOR_BLACK]:
log.info("Setting LED color to: %s", COLORS[color])
movehub.led.set_color(color)
@ -180,10 +183,78 @@ def demo_all(movehub):
demo_color_sensor(movehub)
demo_motor_sensors(movehub)
DEMO_CHOICES = {
'all':demo_all,
'voltage':demo_voltage,
'led_colors':demo_led_colors,
'motors_timed':demo_motors_timed,
'motors_angled':demo_motors_angled,
'port_cd_motor':demo_port_cd_motor,
'tilt_sensor':demo_tilt_sensor_simple,
'tilt_sensor_precise':demo_tilt_sensor_precise,
'color_sensor':demo_color_sensor,
'motor_sensors':demo_motor_sensors,
}
def get_options():
import argparse
parser = argparse.ArgumentParser(
description='Demonstrate move-hub communications',
)
parser.add_argument(
'-c','--connection',
default='auto://',
help='''Specify connection URL to use, `protocol://mac?param=X` with protocol in:
"gatt","pygatt","gattlib","gattool", "bluepy","bluegiga"'''
)
parser.add_argument(
'-d','--demo',
default='all',
choices = sorted(DEMO_CHOICES.keys()),
help="Run a particular demo, default all"
)
return parser
def connection_from_url(url):
import pylgbst
if url == 'auto://':
return None
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
parsed = urlparse(url)
name = 'get_connection_%s'%(parsed.scheme)
factory = getattr( pylgbst, name, None)
if not factory:
raise ValueError("Unrecognised URL scheme/protocol, expect a get_connection_<protocol> in pylgbst: %s"%(parsed.protocol))
params = {}
if parsed.netloc.strip():
params['hub_mac'] = parsed.netloc
for key,value in parse_qs(parsed.query).items():
if len(value) == 1:
params[key] = value[0]
else:
params[key] = value
return factory(
**params
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection)
parameters['connection'] = connection
except ValueError as err:
parser.error(error.args[0])
connection
hub = MoveHub()
demo_all(hub)
hub = MoveHub(**parameters)
try:
demo = DEMO_CHOICES[options.demo]
demo(hub)
finally:
hub.disconnect()

View File

@ -173,7 +173,8 @@ class MsgHubAction(DownstreamMsg, UpstreamMsg):
return super(MsgHubAction, self).bytes()
def is_reply(self, msg):
assert isinstance(msg, MsgHubAction)
if not isinstance(msg, MsgHubAction):
raise TypeError("Unexpected message type: %s"%(msg.__class__,))
if self.action == self.DISCONNECT and msg.action == self.UPSTREAM_DISCONNECT:
return True

View File

@ -7,6 +7,8 @@ import logging
import sys
from struct import unpack
log = logging.getLogger(__name__)
if sys.version_info[0] == 2:
import Queue as queue
else:
@ -15,22 +17,34 @@ else:
queue = queue # just to use it
def check_unpack(seq, index, pattern, size):
"""Check that we got size bytes, if so, unpack using pattern"""
data = seq[index : index + size]
if len(data) == size:
return unpack(pattern, data)[0]
else:
log.warning(
"Unpacking of %s bytes failed, insufficient data: %r", size, seq[index:]
)
raise ValueError(data, "Expected %s bytes" % (size,))
def usbyte(seq, index):
return unpack("<B", seq[index:index + 1])[0]
return check_unpack(seq, index, "<B", 1)
def ushort(seq, index):
return unpack("<H", seq[index:index + 2])[0]
return check_unpack(seq, index, "<H", 2)
def usint(seq, index):
return unpack("<I", seq[index:index + 4])[0]
return check_unpack(seq, index, "<I", 4)
def str2hex(data): # we need it for python 2+3 compatibility
# if sys.version_info[0] == 3:
# data = bytes(data, 'ascii')
if not isinstance(data, (bytes, bytearray)):
data = bytes(data, 'ascii')
data = bytes(data, "ascii")
hexed = binascii.hexlify(data)
return hexed

View File

@ -1,16 +1,18 @@
from distutils.core import setup
from setuptools import setup
setup(name='pylgbst',
description='Python library to interact with LEGO Move Hub (from Lego BOOST set)',
version='1.1.1',
author='Andrey Pokhilko',
author_email='apc4@ya.ru',
packages=['pylgbst', "pylgbst.comms"],
setup(
name="pylgbst",
description="Python library to interact with LEGO Move Hub (from Lego BOOST set)",
version="1.1.1",
author="Andrey Pokhilko",
author_email="apc4@ya.ru",
packages=["pylgbst", "pylgbst.comms"],
requires=[],
extras_require={
'gatt': ["gatt"],
'gattlib': ["gattlib"],
'pygatt': ["pygatt"],
'bluepy': ["bluepy"],
}
# Note that dbus and gi are normally system packages
"gatt": ["gatt", "dbus", "gi"],
"gattlib": ["gattlib"],
"pygatt": ["pygatt", "pexpect"],
"bluepy": ["bluepy"],
},
)