mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e7e571b012 | ||
|
f015d4b03b | ||
|
17ce398595 | ||
|
37c11c0682 | ||
|
35e3868a64 | ||
|
c73311528d | ||
|
afdbe4b2e0 |
20
README.md
20
README.md
@ -72,13 +72,13 @@ Depending on backend type, you might need Linux `sudo` to be used when running P
|
||||
### Bluetooth Connection Options
|
||||
There is an optional parameter for `MoveHub` class constructor, accepting instance of `Connection` object. By default, it will try to use whatever `get_connection_auto()` returns. You have several options to manually control that:
|
||||
|
||||
- use `pylgbst.get_connection_auto()` to attempt backend auto-choice, autodetect uses
|
||||
- use `BlueGigaConnection()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
|
||||
- use `GattConnection()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
|
||||
- use `GattoolConnection()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
|
||||
- use `GattLibConnection()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
|
||||
- use `BluepyConnection()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
|
||||
- use `BleakConnection()` - if you use Bleak backend (`bleak` library prerequisite)
|
||||
- use `get_connection_auto()` to attempt backend auto-detection
|
||||
- use `get_connection_bluegiga()` - if you use BlueGiga Adapter (`pygatt` library prerequisite)
|
||||
- use `get_connection_gatt()` - if you use Gatt Backend on Linux (`gatt` library prerequisite)
|
||||
- use `get_connection_gattool()` - if you use GattTool Backend on Linux (`pygatt` library prerequisite)
|
||||
- use `get_connection_gattlib()` - if you use GattLib Backend on Linux (`gattlib` library prerequisite)
|
||||
- use `get_connection_bluepy()` - if you use Bluepy backend on Linux/Raspbian (`bluepy` library prerequisite)
|
||||
- use `get_connection_bleak()` - if you use Bleak backend (`bleak` library prerequisite)
|
||||
- pass instance of `DebugServerConnection` if you are using [Debug Server](#debug-server) (more details below).
|
||||
|
||||
All the functions above have optional arguments to specify adapter name and Hub name (or mac address). Please take a look at functions source code for details.
|
||||
@ -86,11 +86,9 @@ All the functions above have optional arguments to specify adapter name and Hub
|
||||
If you want to specify name for Bluetooth interface to use on local computer, you can pass that to class or function of getting a connection. Then pass connection object to `MoveHub` constructor. Like this:
|
||||
```python
|
||||
from pylgbst.hub import MoveHub
|
||||
from pylgbst.comms.cgatt import GattConnection
|
||||
|
||||
conn = GattConnection("hci1")
|
||||
conn.connect() # you can pass Hub mac address as parameter here, like 'AA:BB:CC:DD:EE:FF'
|
||||
from pylgbst import get_connection_gatt
|
||||
|
||||
conn = get_connection_gatt(hub_mac='AA:BB:CC:DD:EE:FF')
|
||||
hub = MoveHub(conn)
|
||||
```
|
||||
|
||||
|
@ -246,12 +246,12 @@ def connection_from_url(url):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.basicConfig(level=logging.INFO, format='%(relativeCreated)d\t%(levelname)s\t%(name)s\t%(message)s')
|
||||
parser = get_options()
|
||||
options = parser.parse_args()
|
||||
parameters = {}
|
||||
try:
|
||||
connection = connection_from_url(options.connection)
|
||||
connection = connection_from_url(options.connection) # get_connection_bleak(hub_name=MoveHub.DEFAULT_NAME)
|
||||
parameters['connection'] = connection
|
||||
except ValueError as err:
|
||||
parser.error(err.args[0])
|
||||
|
@ -78,7 +78,7 @@ class BleakDriver(object):
|
||||
while not self._abort:
|
||||
if resp_queue.qsize() != 0:
|
||||
msg = resp_queue.get()
|
||||
self._handler(msg[0], msg[1])
|
||||
self._handler(msg[0], bytes(msg[1]))
|
||||
|
||||
time.sleep(0.01)
|
||||
logging.info("Processing thread has exited")
|
||||
@ -139,17 +139,21 @@ class BleakConnection(Connection):
|
||||
:return: None
|
||||
"""
|
||||
log.info("Discovering devices... Press green button on Hub")
|
||||
devices = await discover(timeout=10)
|
||||
log.debug("Devices: %s", devices)
|
||||
for i in range(0, 30):
|
||||
devices = await discover(timeout=1)
|
||||
log.debug("Devices: %s", devices)
|
||||
for dev in devices:
|
||||
log.debug(dev)
|
||||
address = dev.address
|
||||
name = dev.name
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
log.info('Device matched: %r', dev)
|
||||
self._device = dev
|
||||
break
|
||||
else:
|
||||
continue
|
||||
|
||||
for dev in devices:
|
||||
log.debug(dev)
|
||||
address = dev.address
|
||||
name = dev.name
|
||||
if self._is_device_matched(address, name, hub_mac, hub_name):
|
||||
log.info('Device matched: %r', dev)
|
||||
self._device = dev
|
||||
break
|
||||
break
|
||||
else:
|
||||
raise ConnectionError('Device not found.')
|
||||
|
||||
|
@ -137,7 +137,7 @@ class Hub(object):
|
||||
if dev_type in PERIPHERAL_TYPES:
|
||||
self.peripherals[port] = PERIPHERAL_TYPES[dev_type](self, port)
|
||||
else:
|
||||
log.warning("Have not dedicated class for peripheral type 0x%x on port 0x%x", dev_type, port)
|
||||
log.warning("Have not dedicated class for peripheral type %x on port %x", dev_type, port)
|
||||
self.peripherals[port] = Peripheral(self, port)
|
||||
|
||||
log.info("Attached peripheral: %s", self.peripherals[msg.port])
|
||||
@ -184,6 +184,8 @@ class MoveHub(Hub):
|
||||
:type motor_external: EncodedMotor
|
||||
"""
|
||||
|
||||
DEFAULT_NAME = "LEGO Move Hub"
|
||||
|
||||
# PORTS
|
||||
PORT_A = 0x00
|
||||
PORT_B = 0x01
|
||||
@ -197,8 +199,9 @@ class MoveHub(Hub):
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
def __init__(self, connection=None):
|
||||
self._comm_lock = threading.RLock()
|
||||
if connection is None:
|
||||
connection = get_connection_auto(hub_name="LEGO Move Hub")
|
||||
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
|
||||
|
||||
super(MoveHub, self).__init__(connection)
|
||||
self.info = {}
|
||||
@ -250,36 +253,40 @@ class MoveHub(Hub):
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
def _handle_device_change(self, msg):
|
||||
super(MoveHub, self)._handle_device_change(msg)
|
||||
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
|
||||
port = msg.port
|
||||
if port == self.PORT_A:
|
||||
self.motor_A = self.peripherals[port]
|
||||
elif port == self.PORT_B:
|
||||
self.motor_B = self.peripherals[port]
|
||||
elif port == self.PORT_AB:
|
||||
self.motor_AB = self.peripherals[port]
|
||||
elif port == self.PORT_C:
|
||||
self.port_C = self.peripherals[port]
|
||||
elif port == self.PORT_D:
|
||||
self.port_D = self.peripherals[port]
|
||||
elif port == self.PORT_LED:
|
||||
self.led = self.peripherals[port]
|
||||
elif port == self.PORT_TILT_SENSOR:
|
||||
self.tilt_sensor = self.peripherals[port]
|
||||
elif port == self.PORT_CURRENT:
|
||||
self.current = self.peripherals[port]
|
||||
elif port == self.PORT_VOLTAGE:
|
||||
self.voltage = self.peripherals[port]
|
||||
with self._comm_lock:
|
||||
super(MoveHub, self)._handle_device_change(msg)
|
||||
if isinstance(msg, MsgHubAttachedIO) and msg.event != MsgHubAttachedIO.EVENT_DETACHED:
|
||||
port = msg.port
|
||||
if port == self.PORT_A:
|
||||
self.motor_A = self.peripherals[port]
|
||||
elif port == self.PORT_B:
|
||||
self.motor_B = self.peripherals[port]
|
||||
elif port == self.PORT_AB:
|
||||
self.motor_AB = self.peripherals[port]
|
||||
elif port == self.PORT_C:
|
||||
self.port_C = self.peripherals[port]
|
||||
elif port == self.PORT_D:
|
||||
self.port_D = self.peripherals[port]
|
||||
elif port == self.PORT_LED:
|
||||
self.led = self.peripherals[port]
|
||||
elif port == self.PORT_TILT_SENSOR:
|
||||
self.tilt_sensor = self.peripherals[port]
|
||||
elif port == self.PORT_CURRENT:
|
||||
self.current = self.peripherals[port]
|
||||
elif port == self.PORT_VOLTAGE:
|
||||
self.voltage = self.peripherals[port]
|
||||
|
||||
if type(self.peripherals[port]) == VisionSensor:
|
||||
self.vision_sensor = self.peripherals[port]
|
||||
elif type(self.peripherals[port]) == EncodedMotor and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
||||
self.motor_external = self.peripherals[port]
|
||||
if type(self.peripherals[port]) == VisionSensor:
|
||||
self.vision_sensor = self.peripherals[port]
|
||||
elif type(self.peripherals[port]) == EncodedMotor \
|
||||
and port not in (self.PORT_A, self.PORT_B, self.PORT_AB):
|
||||
self.motor_external = self.peripherals[port]
|
||||
|
||||
|
||||
class TrainHub(Hub):
|
||||
DEFAULT_NAME = 'TrainHub'
|
||||
|
||||
def __init__(self, connection=None):
|
||||
if connection is None:
|
||||
connection = get_connection_auto(hub_name='TrainHub')
|
||||
connection = get_connection_auto(hub_name=self.DEFAULT_NAME)
|
||||
super(TrainHub, self).__init__(connection)
|
||||
|
@ -45,7 +45,7 @@ class BleakDriverTest(unittest.TestCase):
|
||||
driver.write(handle, data)
|
||||
time.sleep(0.5) # processing time
|
||||
self.assertEqual(handle, last_response[0], 'Verifying response handle')
|
||||
self.assertEqual(data, last_response[1], 'Verifying response data')
|
||||
self.assertEqual(bytes(data), last_response[1], 'Verifying response data')
|
||||
|
||||
driver.disconnect()
|
||||
time.sleep(0.5) # processing time
|
||||
|
Loading…
x
Reference in New Issue
Block a user