1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Andrey Pokhilko
e7e571b012 Fix UT 2020-09-20 11:06:59 +03:00
Dmitry Fink
f015d4b03b
Fix to work with Bleak on a mac (#78)
Sometimes the data returnes is not byte/bytes, but native objective-c class _NSInlineData.
Seems to be a bleak bug, just convert it to bytes as workaround for now.

Co-authored-by: Dmitry Fink <finik@dishero.com>
2020-09-20 11:04:39 +03:00
Andrey Pokhilko
17ce398595
Update README.md 2020-09-14 10:18:24 +03:00
Dmitry Fink
37c11c0682
Update README.md (#77)
Fix documentation, describe proper way to launch with custom connection
2020-09-14 10:17:48 +03:00
Andrey Pokhilko
35e3868a64 Add lock for device detects 2020-08-12 09:34:33 +03:00
Andrey Pokhilko
c73311528d Cosmetics around bleak 2020-07-04 09:29:55 +03:00
Andrey Pokhilko
afdbe4b2e0 Improve demo logging 2020-06-30 08:47:55 +03:00
5 changed files with 62 additions and 53 deletions

View File

@ -72,13 +72,13 @@ Depending on backend type, you might need Linux `sudo` to be used when running P
### Bluetooth Connection Options
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `BleakConnection()` - if you use Bleak backend (`bleak` library prerequisite)
- use `get_connection_auto()` to attempt backend auto-detection
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
@ -86,11 +86,9 @@ All the functions above have optional arguments to specify adapter name and Hub
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst.hub import MoveHub
from pylgbst.comms.cgatt import GattConnection
conn = GattConnection("hci1")
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
from pylgbst import get_connection_gatt
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
hub = MoveHub(conn)
```

View File

@ -246,12 +246,12 @@ def connection_from_url(url):
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection)
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
parameters['connection'] = connection
except ValueError as err:
parser.error(err.args[0])

View File

@ -78,7 +78,7 @@ class BleakDriver(object):
while not self._abort:
if resp_queue.qsize() != 0:
msg = resp_queue.get()
self._handler(msg[0], msg[1])
self._handler(msg[0], bytes(msg[1]))
time.sleep(0.01)
logging.info("Processing thread has exited")
@ -139,17 +139,21 @@ class BleakConnection(Connection):
:return: None
"""
log.info("Discovering devices... Press green button on Hub")
devices = await discover(timeout=10)
log.debug("Devices: %s", devices)
for i in range(0, 30):
devices = await discover(timeout=1)
log.debug("Devices: %s", devices)
for dev in devices:
log.debug(dev)
address = dev.address
name = dev.name
if self._is_device_matched(address, name, hub_mac, hub_name):
log.info('Device matched: %r', dev)
self._device = dev
break
else:
continue
for dev in devices:
log.debug(dev)
address = dev.address
name = dev.name
if self._is_device_matched(address, name, hub_mac, hub_name):
log.info('Device matched: %r', dev)
self._device = dev
break
break
else:
raise ConnectionError('Device not found.')

View File

@ -137,7 +137,7 @@ class Hub(object):
if dev_type in PERIPHERAL_TYPES:
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
else:
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
self.peripherals[port] = Peripheral(self, port)
log.info("Attached peripheral: %s", self.peripherals[msg.port])
@ -184,6 +184,8 @@ class MoveHub(Hub):
:type motor_external: EncodedMotor
"""
DEFAULT_NAME = "LEGO Move Hub"
# PORTS
PORT_A = 0x00
PORT_B = 0x01
@ -197,8 +199,9 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def __init__(self, connection=None):
self._comm_lock = threading.RLock()
if connection is None:
connection = get_connection_auto(hub_name="LEGO Move Hub")
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(MoveHub, self).__init__(connection)
self.info = {}
@ -250,36 +253,40 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def _handle_device_change(self, msg):
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
with self._comm_lock:
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
if port == self.PORT_A:
self.motor_A = self.peripherals[port]
elif port == self.PORT_B:
self.motor_B = self.peripherals[port]
elif port == self.PORT_AB:
self.motor_AB = self.peripherals[port]
elif port == self.PORT_C:
self.port_C = self.peripherals[port]
elif port == self.PORT_D:
self.port_D = self.peripherals[port]
elif port == self.PORT_LED:
self.led = self.peripherals[port]
elif port == self.PORT_TILT_SENSOR:
self.tilt_sensor = self.peripherals[port]
elif port == self.PORT_CURRENT:
self.current = self.peripherals[port]
elif port == self.PORT_VOLTAGE:
self.voltage = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor \
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
class TrainHub(Hub):
DEFAULT_NAME = 'TrainHub'
def __init__(self, connection=None):
if connection is None:
connection = get_connection_auto(hub_name='TrainHub')
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
super(TrainHub, self).__init__(connection)

View File

@ -45,7 +45,7 @@ class BleakDriverTest(unittest.TestCase):
driver.write(handle, data)
time.sleep(0.5) # processing time
self.assertEqual(handle, last_response[0], 'Verifying response handle')
self.assertEqual(data, last_response[1], 'Verifying response data')
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
driver.disconnect()
time.sleep(0.5) # processing time