mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Merge branch 'master' of github.com:undera/pylgbst
This commit is contained in:
commit
777bc3ad32
@ -21,14 +21,14 @@ addons:
|
|||||||
install:
|
install:
|
||||||
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
|
- wget https://github.com/labapart/gattlib/releases/download/dev/gattlib_dbus_0.2-dev_x86_64.deb
|
||||||
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
|
- sudo dpkg -i gattlib_dbus_0.2-dev_x86_64.deb
|
||||||
- pip install codecov codacy-coverage nose-exclude pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
|
- pip install codecov codacy-coverage pytest pygatt gatt pexpect bluepy bleak packaging dbus-python pygobject
|
||||||
- pip install --upgrade attrs
|
- pip install --upgrade attrs
|
||||||
|
|
||||||
env:
|
env:
|
||||||
- READTHEDOCS=True
|
- READTHEDOCS=True
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- coverage run --omit="examples/*" --source=. -m nose tests -v --exclude-dir=examples
|
- coverage run --omit="examples/*" --source=. -m pytest -v --ignore=examples --log-level=INFO tests
|
||||||
|
|
||||||
after_success:
|
after_success:
|
||||||
- coverage report -m
|
- coverage report -m
|
||||||
|
@ -173,15 +173,15 @@ def demo_voltage(movehub):
|
|||||||
|
|
||||||
|
|
||||||
def demo_all(movehub):
|
def demo_all(movehub):
|
||||||
demo_voltage(movehub)
|
|
||||||
demo_led_colors(movehub)
|
|
||||||
demo_motors_timed(movehub)
|
demo_motors_timed(movehub)
|
||||||
demo_motors_angled(movehub)
|
demo_motors_angled(movehub)
|
||||||
demo_port_cd_motor(movehub)
|
demo_port_cd_motor(movehub)
|
||||||
|
demo_led_colors(movehub)
|
||||||
demo_tilt_sensor_simple(movehub)
|
demo_tilt_sensor_simple(movehub)
|
||||||
demo_tilt_sensor_precise(movehub)
|
demo_tilt_sensor_precise(movehub)
|
||||||
demo_color_sensor(movehub)
|
demo_color_sensor(movehub)
|
||||||
demo_motor_sensors(movehub)
|
demo_motor_sensors(movehub)
|
||||||
|
demo_voltage(movehub)
|
||||||
|
|
||||||
|
|
||||||
DEMO_CHOICES = {
|
DEMO_CHOICES = {
|
||||||
|
@ -6,38 +6,38 @@ from pylgbst.comms import DebugServer
|
|||||||
log = logging.getLogger('pylgbst')
|
log = logging.getLogger('pylgbst')
|
||||||
|
|
||||||
|
|
||||||
def get_connection_bluegiga(controller=None, hub_mac=None):
|
def get_connection_bluegiga(controller=None, hub_mac=None, hub_name=None):
|
||||||
del controller # to prevent code analysis warning
|
del controller # to prevent code analysis warning
|
||||||
from pylgbst.comms.cpygatt import BlueGigaConnection
|
from pylgbst.comms.cpygatt import BlueGigaConnection
|
||||||
|
|
||||||
return BlueGigaConnection().connect(hub_mac)
|
return BlueGigaConnection().connect(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_gattool(controller='hci0', hub_mac=None):
|
def get_connection_gattool(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
from pylgbst.comms.cpygatt import GattoolConnection
|
from pylgbst.comms.cpygatt import GattoolConnection
|
||||||
|
|
||||||
return GattoolConnection(controller).connect(hub_mac)
|
return GattoolConnection(controller).connect(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_gatt(controller='hci0', hub_mac=None):
|
def get_connection_gatt(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
from pylgbst.comms.cgatt import GattConnection
|
from pylgbst.comms.cgatt import GattConnection
|
||||||
|
|
||||||
return GattConnection(controller).connect(hub_mac)
|
return GattConnection(controller).connect(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_gattlib(controller='hci0', hub_mac=None):
|
def get_connection_gattlib(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
from pylgbst.comms.cgattlib import GattLibConnection
|
from pylgbst.comms.cgattlib import GattLibConnection
|
||||||
|
|
||||||
return GattLibConnection(controller).connect(hub_mac)
|
return GattLibConnection(controller).connect(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_bluepy(controller='hci0', hub_mac=None):
|
def get_connection_bluepy(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
from pylgbst.comms.cbluepy import BluepyConnection
|
from pylgbst.comms.cbluepy import BluepyConnection
|
||||||
|
|
||||||
return BluepyConnection(controller).connect(hub_mac)
|
return BluepyConnection(controller).connect(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_bleak(controller='hci0', hub_mac=None):
|
def get_connection_bleak(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
"""
|
"""
|
||||||
Return connection based with Bleak API as an endpoint.
|
Return connection based with Bleak API as an endpoint.
|
||||||
|
|
||||||
@ -48,24 +48,24 @@ def get_connection_bleak(controller='hci0', hub_mac=None):
|
|||||||
del controller # to prevent code analysis warning
|
del controller # to prevent code analysis warning
|
||||||
from pylgbst.comms.cbleak import BleakDriver
|
from pylgbst.comms.cbleak import BleakDriver
|
||||||
|
|
||||||
return BleakDriver(hub_mac)
|
return BleakDriver(hub_mac, hub_name)
|
||||||
|
|
||||||
|
|
||||||
def get_connection_auto(controller='hci0', hub_mac=None):
|
def get_connection_auto(controller='hci0', hub_mac=None, hub_name=None):
|
||||||
fns = [
|
fns = [
|
||||||
get_connection_bluepy,
|
get_connection_bluepy,
|
||||||
get_connection_bluegiga,
|
get_connection_bluegiga,
|
||||||
get_connection_gatt,
|
get_connection_gatt,
|
||||||
|
get_connection_bleak,
|
||||||
get_connection_gattool,
|
get_connection_gattool,
|
||||||
get_connection_gattlib,
|
get_connection_gattlib,
|
||||||
get_connection_bleak,
|
|
||||||
]
|
]
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
for fn in fns:
|
for fn in fns:
|
||||||
try:
|
try:
|
||||||
logging.info("Trying %s", fn.__name__)
|
logging.info("Trying %s", fn.__name__)
|
||||||
return fn(controller, hub_mac)
|
return fn(controller, hub_mac, hub_name)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
raise
|
raise
|
||||||
except BaseException:
|
except BaseException:
|
||||||
|
@ -15,7 +15,6 @@ from pylgbst.utilities import str2hex
|
|||||||
|
|
||||||
log = logging.getLogger('comms')
|
log = logging.getLogger('comms')
|
||||||
|
|
||||||
LEGO_MOVE_HUB = "LEGO Move Hub"
|
|
||||||
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
|
MOVE_HUB_HW_UUID_SERV = '00001623-1212-efde-1623-785feabcd123'
|
||||||
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
|
MOVE_HUB_HW_UUID_CHAR = '00001624-1212-efde-1623-785feabcd123'
|
||||||
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
|
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
|
||||||
@ -46,18 +45,19 @@ class Connection(object):
|
|||||||
def enable_notifications(self):
|
def enable_notifications(self):
|
||||||
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
|
self.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
|
||||||
|
|
||||||
def _is_device_matched(self, address, name, hub_mac):
|
def _is_device_matched(self, address, dev_name, hub_mac, find_name):
|
||||||
log.debug("Checking device name: %s, MAC: %s", name, address)
|
assert hub_mac or find_name, 'You have to provide either hub_mac or hub_name in connection options'
|
||||||
|
log.debug("Checking device: %s, MAC: %s", dev_name, address)
|
||||||
matched = False
|
matched = False
|
||||||
if address != "00:00:00:00:00:00":
|
if address != "00:00:00:00:00:00":
|
||||||
if hub_mac:
|
if hub_mac:
|
||||||
if hub_mac.lower() == address.lower():
|
if hub_mac.lower() == address.lower():
|
||||||
matched = True
|
matched = True
|
||||||
elif name == LEGO_MOVE_HUB:
|
elif dev_name == find_name:
|
||||||
matched = True
|
matched = True
|
||||||
|
|
||||||
if matched:
|
if matched:
|
||||||
log.info("Found %s at %s", name, address)
|
log.info("Found %s at %s", dev_name, address)
|
||||||
|
|
||||||
return matched
|
return matched
|
||||||
|
|
||||||
|
@ -18,13 +18,14 @@ req_queue = queue.Queue()
|
|||||||
class BleakDriver(object):
|
class BleakDriver(object):
|
||||||
"""Driver that provides interface between API and Bleak."""
|
"""Driver that provides interface between API and Bleak."""
|
||||||
|
|
||||||
def __init__(self, hub_mac=None):
|
def __init__(self, hub_mac=None, hub_name=None):
|
||||||
"""
|
"""
|
||||||
Initialize new object of Bleak Driver class.
|
Initialize new object of Bleak Driver class.
|
||||||
|
|
||||||
:param hub_mac: Optional Lego HUB MAC to connect to.
|
:param hub_mac: Optional Lego HUB MAC to connect to.
|
||||||
"""
|
"""
|
||||||
self.hub_mac = hub_mac
|
self.hub_mac = hub_mac
|
||||||
|
self.hub_name = hub_name
|
||||||
self._handler = None
|
self._handler = None
|
||||||
self._abort = False
|
self._abort = False
|
||||||
self._connection_thread = None
|
self._connection_thread = None
|
||||||
@ -56,7 +57,7 @@ class BleakDriver(object):
|
|||||||
|
|
||||||
async def _bleak_thread(self):
|
async def _bleak_thread(self):
|
||||||
bleak = BleakConnection()
|
bleak = BleakConnection()
|
||||||
await bleak.connect(self.hub_mac)
|
await bleak.connect(self.hub_mac, self.hub_name)
|
||||||
await bleak.set_notify_handler(self._safe_handler)
|
await bleak.set_notify_handler(self._safe_handler)
|
||||||
# After connecting, need to send any data or hub will drop the connection,
|
# After connecting, need to send any data or hub will drop the connection,
|
||||||
# below command is Advertising name request update
|
# below command is Advertising name request update
|
||||||
@ -67,6 +68,8 @@ class BleakDriver(object):
|
|||||||
data = req_queue.get()
|
data = req_queue.get()
|
||||||
await bleak.write(data[0], data[1])
|
await bleak.write(data[0], data[1])
|
||||||
|
|
||||||
|
logging.info("Communications thread has exited")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _safe_handler(handler, data):
|
def _safe_handler(handler, data):
|
||||||
resp_queue.put((handler, data))
|
resp_queue.put((handler, data))
|
||||||
@ -77,7 +80,8 @@ class BleakDriver(object):
|
|||||||
msg = resp_queue.get()
|
msg = resp_queue.get()
|
||||||
self._handler(msg[0], msg[1])
|
self._handler(msg[0], msg[1])
|
||||||
|
|
||||||
time.sleep(0.1)
|
time.sleep(0.01)
|
||||||
|
logging.info("Processing thread has exited")
|
||||||
|
|
||||||
def write(self, handle, data):
|
def write(self, handle, data):
|
||||||
"""
|
"""
|
||||||
@ -123,9 +127,10 @@ class BleakConnection(Connection):
|
|||||||
|
|
||||||
self._device = None
|
self._device = None
|
||||||
self._client = None
|
self._client = None
|
||||||
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.getLogger().level)
|
logging.getLogger('bleak.backends.dotnet.client').setLevel(logging.WARNING)
|
||||||
|
logging.getLogger('bleak.backends.bluezdbus.client').setLevel(logging.WARNING)
|
||||||
|
|
||||||
async def connect(self, hub_mac=None):
|
async def connect(self, hub_mac=None, hub_name=None):
|
||||||
"""
|
"""
|
||||||
Connect to device.
|
Connect to device.
|
||||||
|
|
||||||
@ -133,20 +138,19 @@ class BleakConnection(Connection):
|
|||||||
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
|
:raises ConnectionError: When cannot connect to given MAC or name matching fails.
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
log.info("Discovering devices... Press Green button on lego MoveHub")
|
log.info("Discovering devices... Press green button on Hub")
|
||||||
devices = await discover()
|
devices = await discover(timeout=10)
|
||||||
log.debug("Devices: %s", devices)
|
log.debug("Devices: %s", devices)
|
||||||
|
|
||||||
for dev in devices:
|
for dev in devices:
|
||||||
log.debug(dev)
|
log.debug(dev)
|
||||||
address = dev.address
|
address = dev.address
|
||||||
name = dev.name
|
name = dev.name
|
||||||
if self._is_device_matched(address, name, hub_mac):
|
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||||
log.info('Device matched')
|
log.info('Device matched: %r', dev)
|
||||||
self._device = dev
|
self._device = dev
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
if not self._device:
|
|
||||||
raise ConnectionError('Device not found.')
|
raise ConnectionError('Device not found.')
|
||||||
|
|
||||||
self._client = BleakClient(self._device.address, self.loop)
|
self._client = BleakClient(self._device.address, self.loop)
|
||||||
@ -164,6 +168,10 @@ class BleakConnection(Connection):
|
|||||||
"""
|
"""
|
||||||
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
|
log.debug('Request: {handle} {payload}'.format(handle=handle, payload=[hex(x) for x in data]))
|
||||||
desc = self._client.services.get_descriptor(handle)
|
desc = self._client.services.get_descriptor(handle)
|
||||||
|
|
||||||
|
if not isinstance(data, bytearray):
|
||||||
|
data = bytearray(data)
|
||||||
|
|
||||||
if desc is None:
|
if desc is None:
|
||||||
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
|
# dedicated handle not found, try to send by using LEGO Move Hub default characteristic
|
||||||
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
|
await self._client.write_gatt_char(MOVE_HUB_HW_UUID_CHAR, data)
|
||||||
|
@ -87,7 +87,7 @@ class BluepyConnection(Connection):
|
|||||||
self._peripheral = None # :type BluepyThreadedPeripheral
|
self._peripheral = None # :type BluepyThreadedPeripheral
|
||||||
self._controller = controller
|
self._controller = controller
|
||||||
|
|
||||||
def connect(self, hub_mac=None):
|
def connect(self, hub_mac=None, hub_name=None):
|
||||||
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
||||||
scanner = btle.Scanner()
|
scanner = btle.Scanner()
|
||||||
|
|
||||||
@ -98,11 +98,11 @@ class BluepyConnection(Connection):
|
|||||||
|
|
||||||
for dev in devices:
|
for dev in devices:
|
||||||
address = dev.addr
|
address = dev.addr
|
||||||
addressType = dev.addrType
|
address_type = dev.addrType
|
||||||
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
|
name = dev.getValueText(COMPLETE_LOCAL_NAME_ADTYPE)
|
||||||
|
|
||||||
if self._is_device_matched(address, name, hub_mac):
|
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||||
self._peripheral = BluepyThreadedPeripheral(address, addressType, self._controller)
|
self._peripheral = BluepyThreadedPeripheral(address, address_type, self._controller)
|
||||||
break
|
break
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
@ -88,7 +88,7 @@ class GattConnection(Connection):
|
|||||||
self._manager_thread.setDaemon(True)
|
self._manager_thread.setDaemon(True)
|
||||||
log.debug('Starting DeviceManager...')
|
log.debug('Starting DeviceManager...')
|
||||||
|
|
||||||
def connect(self, hub_mac=None):
|
def connect(self, hub_mac=None, hub_name=None):
|
||||||
self._manager_thread.start()
|
self._manager_thread.start()
|
||||||
self._manager.start_discovery()
|
self._manager.start_discovery()
|
||||||
|
|
||||||
@ -100,7 +100,7 @@ class GattConnection(Connection):
|
|||||||
for dev in devices:
|
for dev in devices:
|
||||||
address = dev.mac_address
|
address = dev.mac_address
|
||||||
name = dev.alias()
|
name = dev.alias()
|
||||||
if self._is_device_matched(address, name, hub_mac):
|
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||||
self._device = CustomDevice(address, self._manager)
|
self._device = CustomDevice(address, self._manager)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ class GattLibConnection(Connection):
|
|||||||
self.requester = None
|
self.requester = None
|
||||||
self._iface = bt_iface_name
|
self._iface = bt_iface_name
|
||||||
|
|
||||||
def connect(self, hub_mac=None):
|
def connect(self, hub_mac=None, hub_name=None):
|
||||||
service = DiscoveryService(self._iface)
|
service = DiscoveryService(self._iface)
|
||||||
|
|
||||||
while not self.requester:
|
while not self.requester:
|
||||||
@ -70,7 +70,7 @@ class GattLibConnection(Connection):
|
|||||||
log.debug("Devices: %s", devices)
|
log.debug("Devices: %s", devices)
|
||||||
|
|
||||||
for address, name in devices.items():
|
for address, name in devices.items():
|
||||||
if self._is_device_matched(address, name, hub_mac):
|
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||||
self.requester = Requester(address, True, self._iface)
|
self.requester = Requester(address, True, self._iface)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import logging
|
|||||||
|
|
||||||
import pygatt
|
import pygatt
|
||||||
|
|
||||||
from pylgbst.comms import Connection, LEGO_MOVE_HUB, MOVE_HUB_HW_UUID_CHAR
|
from pylgbst.comms import Connection, MOVE_HUB_HW_UUID_CHAR
|
||||||
from pylgbst.utilities import str2hex
|
from pylgbst.utilities import str2hex
|
||||||
|
|
||||||
log = logging.getLogger('comms-pygatt')
|
log = logging.getLogger('comms-pygatt')
|
||||||
@ -20,20 +20,21 @@ class GattoolConnection(Connection):
|
|||||||
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
|
self.backend = lambda: pygatt.GATTToolBackend(hci_device=controller)
|
||||||
self._conn_hnd = None
|
self._conn_hnd = None
|
||||||
|
|
||||||
def connect(self, hub_mac=None):
|
def connect(self, hub_mac=None, hub_name=None):
|
||||||
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
log.debug("Trying to connect client to MoveHub with MAC: %s", hub_mac)
|
||||||
adapter = self.backend()
|
adapter = self.backend()
|
||||||
adapter.start()
|
adapter.start() # enable or disable restart? What's the best?
|
||||||
|
|
||||||
while not self._conn_hnd:
|
while not self._conn_hnd:
|
||||||
log.info("Discovering devices...")
|
log.info("Discovering devices...")
|
||||||
devices = adapter.scan(1)
|
devices = adapter.scan(1)
|
||||||
log.debug("Devices: %s", devices)
|
log.debug("Devices: %s", devices)
|
||||||
|
|
||||||
|
# Pass each device found to _is_device_matched( ) to see if it the device we want
|
||||||
for dev in devices:
|
for dev in devices:
|
||||||
address = dev['address']
|
address = dev['address']
|
||||||
name = dev['name']
|
name = dev['name']
|
||||||
if self._is_device_matched(address, name, hub_mac):
|
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||||
self._conn_hnd = adapter.connect(address)
|
self._conn_hnd = adapter.connect(address)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -4,8 +4,8 @@ import time
|
|||||||
from pylgbst import get_connection_auto
|
from pylgbst import get_connection_auto
|
||||||
from pylgbst.messages import *
|
from pylgbst.messages import *
|
||||||
from pylgbst.peripherals import *
|
from pylgbst.peripherals import *
|
||||||
from pylgbst.utilities import str2hex, usbyte, ushort
|
|
||||||
from pylgbst.utilities import queue
|
from pylgbst.utilities import queue
|
||||||
|
from pylgbst.utilities import str2hex, usbyte, ushort
|
||||||
|
|
||||||
log = logging.getLogger('hub')
|
log = logging.getLogger('hub')
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ class Hub(object):
|
|||||||
self.add_message_handler(MsgHubAction, self._handle_action)
|
self.add_message_handler(MsgHubAction, self._handle_action)
|
||||||
|
|
||||||
if not connection:
|
if not connection:
|
||||||
connection = get_connection_auto()
|
connection = get_connection_auto() # TODO: how to identify the hub?
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.connection.set_notify_handler(self._notify)
|
self.connection.set_notify_handler(self._notify)
|
||||||
self.connection.enable_notifications()
|
self.connection.enable_notifications()
|
||||||
@ -197,6 +197,9 @@ class MoveHub(Hub):
|
|||||||
|
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
def __init__(self, connection=None):
|
def __init__(self, connection=None):
|
||||||
|
if connection is None:
|
||||||
|
connection = get_connection_auto(hub_name="LEGO Move Hub")
|
||||||
|
|
||||||
super(MoveHub, self).__init__(connection)
|
super(MoveHub, self).__init__(connection)
|
||||||
self.info = {}
|
self.info = {}
|
||||||
|
|
||||||
@ -273,3 +276,10 @@ class MoveHub(Hub):
|
|||||||
self.vision_sensor = self.peripherals[port]
|
self.vision_sensor = self.peripherals[port]
|
||||||
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
||||||
self.motor_external = self.peripherals[port]
|
self.motor_external = self.peripherals[port]
|
||||||
|
|
||||||
|
|
||||||
|
class TrainHub(Hub):
|
||||||
|
def __init__(self, connection=None):
|
||||||
|
if connection is None:
|
||||||
|
connection = get_connection_auto(hub_name='TrainHub')
|
||||||
|
super(TrainHub, self).__init__(connection)
|
||||||
|
@ -2,6 +2,7 @@ import unittest
|
|||||||
|
|
||||||
from pylgbst.comms import *
|
from pylgbst.comms import *
|
||||||
|
|
||||||
|
|
||||||
class ConnectionTestCase(unittest.TestCase):
|
class ConnectionTestCase(unittest.TestCase):
|
||||||
def test_is_device_matched(self):
|
def test_is_device_matched(self):
|
||||||
conn = Connection()
|
conn = Connection()
|
||||||
@ -9,30 +10,31 @@ class ConnectionTestCase(unittest.TestCase):
|
|||||||
hub_address = '1a:2A:3A:4A:5A:6A'
|
hub_address = '1a:2A:3A:4A:5A:6A'
|
||||||
other_address = 'A1:a2:a3:a4:a5:a6'
|
other_address = 'A1:a2:a3:a4:a5:a6'
|
||||||
zero_address = '00:00:00:00:00:00'
|
zero_address = '00:00:00:00:00:00'
|
||||||
hub_name = LEGO_MOVE_HUB
|
hub_name = 'LEGO Move Hub'
|
||||||
other_name = 'HRM'
|
other_name = 'HRM'
|
||||||
|
|
||||||
test_matrix = [
|
test_matrix = [
|
||||||
# address, name, hub_mac, expected
|
# address, name, hub_mac, expected
|
||||||
(hub_address, hub_name, hub_address, True),
|
(hub_address, hub_name, hub_address, None, True),
|
||||||
(hub_address, hub_name, None, True),
|
(hub_address, hub_name, None, hub_name, True),
|
||||||
(hub_address, None, hub_address, True),
|
(hub_address, None, hub_address, None, True),
|
||||||
(hub_address, None, None, False),
|
(hub_address, None, None, hub_name, False),
|
||||||
(hub_address, other_name, hub_address, True),
|
(hub_address, other_name, hub_address, None, True),
|
||||||
(hub_address, other_name, None, False),
|
(hub_address, other_name, None, hub_name, False),
|
||||||
(other_address, hub_name, hub_address, False),
|
(other_address, hub_name, hub_address, None, False),
|
||||||
(other_address, hub_name, None, True),
|
(other_address, hub_name, None, hub_name, True),
|
||||||
(other_address, None, hub_address, False),
|
(other_address, None, hub_address, None, False),
|
||||||
(other_address, None, None, False),
|
(other_address, None, None, hub_name, False),
|
||||||
(other_address, other_name, hub_address, False),
|
(other_address, other_name, hub_address, None, False),
|
||||||
(other_address, other_name, None, False),
|
(other_address, other_name, None, hub_name, False),
|
||||||
(zero_address, hub_name, hub_address, False),
|
(zero_address, hub_name, hub_address, None, False),
|
||||||
(zero_address, hub_name, None, False),
|
(zero_address, hub_name, None, hub_name, False),
|
||||||
(zero_address, None, hub_address, False),
|
(zero_address, None, hub_address, None, False),
|
||||||
(zero_address, None, None, False),
|
(zero_address, None, None, hub_name, False),
|
||||||
(zero_address, other_name, hub_address, False),
|
(zero_address, other_name, hub_address, None, False),
|
||||||
(zero_address, other_name, None, False),
|
(zero_address, other_name, None, hub_name, False),
|
||||||
]
|
]
|
||||||
|
|
||||||
for address, name, hub_mac, expected in test_matrix:
|
for address, name, hub_mac, fname, expected in test_matrix:
|
||||||
self.assertEqual(conn._is_device_matched(address=address, name=name, hub_mac=hub_mac), expected)
|
matched = conn._is_device_matched(address=address, dev_name=name, hub_mac=hub_mac, find_name=fname)
|
||||||
|
self.assertEqual(matched, expected)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user