1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Compare commits

..

No commits in common. "master" and "1.2.0" have entirely different histories.

5 changed files with 53 additions and 62 deletions

View File

@ -72,13 +72,13 @@ Depending on backend type, you might need Linux `sudo` to be used when running P
### Bluetooth Connection Options
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
- use `get_connection_auto()` to attempt backend auto-detection
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
- use `BleakConnection()` - if you use Bleak backend (`bleak` library prerequisite)
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
@ -86,9 +86,11 @@ All the functions above have optional arguments to specify adapter name and Hub
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
```python
from pylgbst.hub import MoveHub
from pylgbst import get_connection_gatt
from pylgbst.comms.cgatt import GattConnection
conn = GattConnection("hci1")
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
hub = MoveHub(conn)
```

View File

@ -246,12 +246,12 @@ def connection_from_url(url):
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
logging.basicConfig(level=logging.INFO)
parser = get_options()
options = parser.parse_args()
parameters = {}
try:
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
connection = connection_from_url(options.connection)
parameters['connection'] = connection
except ValueError as err:
parser.error(err.args[0])

View File

@ -78,7 +78,7 @@ class BleakDriver(object):
while not self._abort:
if resp_queue.qsize() != 0:
msg = resp_queue.get()
self._handler(msg[0], bytes(msg[1]))
self._handler(msg[0], msg[1])
time.sleep(0.01)
logging.info("Processing thread has exited")
@ -139,9 +139,9 @@ class BleakConnection(Connection):
:return: None
"""
log.info("Discovering devices... Press green button on Hub")
for i in range(0, 30):
devices = await discover(timeout=1)
devices = await discover(timeout=10)
log.debug("Devices: %s", devices)
for dev in devices:
log.debug(dev)
address = dev.address
@ -151,10 +151,6 @@ class BleakConnection(Connection):
self._device = dev
break
else:
continue
break
else:
raise ConnectionError('Device not found.')
self._client = BleakClient(self._device.address, self.loop)

View File

@ -137,7 +137,7 @@ class Hub(object):
if dev_type in PERIPHERAL_TYPES:
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
else:
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
self.peripherals[port] = Peripheral(self, port)
log.info("Attached peripheral: %s", self.peripherals[msg.port])
@ -184,8 +184,6 @@ class MoveHub(Hub):
:type motor_external: EncodedMotor
"""
DEFAULT_NAME = "LEGO Move Hub"
# PORTS
PORT_A = 0x00
PORT_B = 0x01
@ -199,9 +197,8 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def __init__(self, connection=None):
self._comm_lock = threading.RLock()
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
connection = get_connection_auto(hub_name="LEGO Move Hub")
super(MoveHub, self).__init__(connection)
self.info = {}
@ -253,7 +250,6 @@ class MoveHub(Hub):
# noinspection PyTypeChecker
def _handle_device_change(self, msg):
with self._comm_lock:
super(MoveHub, self)._handle_device_change(msg)
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
port = msg.port
@ -278,15 +274,12 @@ class MoveHub(Hub):
if type(self.peripherals[port]) == VisionSensor:
self.vision_sensor = self.peripherals[port]
elif type(self.peripherals[port]) == EncodedMotor \
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
self.motor_external = self.peripherals[port]
class TrainHub(Hub):
DEFAULT_NAME = 'TrainHub'
def __init__(self, connection=None):
if connection is None:
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
connection = get_connection_auto(hub_name='TrainHub')
super(TrainHub, self).__init__(connection)

View File

@ -45,7 +45,7 @@ class BleakDriverTest(unittest.TestCase):
driver.write(handle, data)
time.sleep(0.5) # processing time
self.assertEqual(handle, last_response[0], 'Verifying response handle')
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
self.assertEqual(data, last_response[1], 'Verifying response data')
driver.disconnect()
time.sleep(0.5) # processing time