mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Implemented tilt sensor
This commit is contained in:
parent
1619868af7
commit
6067e4f218
@ -11,6 +11,7 @@ Best way to start is to look at [demo.py](demo.py) file, and run it.
|
||||
- permanent Bluetooth connection server for faster debugging
|
||||
- angled and timed movement for motors
|
||||
- LED color change
|
||||
- sensor data subscribe/unsubscribe
|
||||
|
||||
## Usage
|
||||
|
||||
|
56
demo.py
56
demo.py
@ -1,18 +1,40 @@
|
||||
import logging
|
||||
import traceback
|
||||
from time import sleep
|
||||
|
||||
from pylgbst import MoveHub, COLORS, EncodedMotor, PORT_D
|
||||
from pylgbst.comms import DebugServerConnection, BLEConnection
|
||||
from pylgbst import *
|
||||
|
||||
log = logging.getLogger("demo")
|
||||
|
||||
|
||||
def demo_all(movehub):
|
||||
demo_led_colors(movehub)
|
||||
demo_motors_timed(movehub)
|
||||
demo_motors_angled(movehub)
|
||||
demo_port_cd_motor(movehub)
|
||||
def demo_tilt_sensor_simple(movehub):
|
||||
log.info("Tilt sensor simple test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 10
|
||||
|
||||
def callback(param1):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TILT_STATES[param1])
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_tilt_sensor_precise(movehub):
|
||||
log.info("Tilt sensor precise test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 100
|
||||
|
||||
def callback(param1, param2):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: %s %s", demo_tilt_sensor_simple.cnt, limit, param1, param2)
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback, mode=TILT_SENSOR_MODE_FULL)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_led_colors(movehub):
|
||||
@ -88,6 +110,15 @@ def vernie_head(movehub):
|
||||
sleep(2)
|
||||
|
||||
|
||||
def demo_all(movehub):
|
||||
demo_led_colors(movehub)
|
||||
demo_motors_timed(movehub)
|
||||
demo_motors_angled(movehub)
|
||||
demo_port_cd_motor(movehub)
|
||||
demo_tilt_sensor_simple(movehub)
|
||||
demo_tilt_sensor_precise(movehub)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@ -98,9 +129,12 @@ if __name__ == '__main__':
|
||||
connection = BLEConnection().connect()
|
||||
|
||||
hub = MoveHub(connection)
|
||||
hub.tilt_sensor.subscribe(lambda: log.info("Tilt"))
|
||||
#demo_tilt_sensor_precise(hub)
|
||||
|
||||
demo_all(hub)
|
||||
|
||||
log.info("Sleeping 60s")
|
||||
sleep(60)
|
||||
# demo_all(hub)
|
||||
|
||||
# sleep(1)
|
||||
# hub.get_name()
|
||||
|
@ -88,7 +88,7 @@ class MoveHub(object):
|
||||
port = get_byte(data, 3)
|
||||
sensor = self.devices[port]
|
||||
if isinstance(sensor, TiltSensor):
|
||||
sensor.notify_subscribers()
|
||||
sensor.handle_notification(data)
|
||||
|
||||
def _handle_port_status(self, data):
|
||||
port = get_byte(data, 3)
|
||||
|
@ -64,6 +64,7 @@ MSG_SET_PORT_VAL = 0x81
|
||||
MSG_PORT_STATUS = 0x82
|
||||
MSG_SENSOR_SUBSCRIBE = 0x41
|
||||
MSG_SENSOR_DATA = 0x45
|
||||
MSG_SENSOR_SUBSCRIBE_ACK = 0x47
|
||||
|
||||
# NOTIFICATIONS
|
||||
TYPE_DISTANCE_COLOR_SENSOR = 0x25
|
||||
@ -90,12 +91,18 @@ STATUS_CONFLICT = 0x05
|
||||
STATUS_FINISHED = 0x0a
|
||||
|
||||
# TILT
|
||||
TILT_SENSOR_MODE_FULL = 0x00
|
||||
TILT_SENSOR_MODE_SOME1 = 0x01
|
||||
TILT_SENSOR_MODE_BASIC = 0x02
|
||||
TILT_SENSOR_MODE_OFF = 0x03
|
||||
|
||||
TILT_HORIZ = 0x00
|
||||
TILT_UP = 0x01
|
||||
TILT_DOWN = 0x02
|
||||
TILT_RIGHT = 0x03
|
||||
TILT_LEFT = 0x04
|
||||
TILT_INVERT = 0x05
|
||||
TILT_SOME = 0x09
|
||||
|
||||
TILT_STATES = {
|
||||
TILT_HORIZ: "BACK",
|
||||
@ -104,4 +111,5 @@ TILT_STATES = {
|
||||
TILT_RIGHT: "RIGHT",
|
||||
TILT_LEFT: "LEFT",
|
||||
TILT_INVERT: "FRONT",
|
||||
TILT_SOME: "SOME",
|
||||
}
|
||||
|
@ -1,8 +1,12 @@
|
||||
import logging
|
||||
import struct
|
||||
import time
|
||||
|
||||
from pylgbst import get_byte
|
||||
from pylgbst.constants import *
|
||||
|
||||
log = logging.getLogger('peripherals')
|
||||
|
||||
|
||||
class Peripheral(object):
|
||||
"""
|
||||
@ -42,7 +46,7 @@ class Peripheral(object):
|
||||
def finished(self):
|
||||
self.working = False
|
||||
|
||||
def notify_subscribers(self, *args, **kwargs):
|
||||
def _notify_subscribers(self, *args, **kwargs):
|
||||
for subscriber in self._subscribers:
|
||||
subscriber(*args, **kwargs)
|
||||
|
||||
@ -125,12 +129,35 @@ class ColorDistanceSensor(Peripheral):
|
||||
|
||||
|
||||
class TiltSensor(Peripheral):
|
||||
def subscribe(self, callback):
|
||||
#params = b'\x00\x01\x00\x00\x00\x01' # full
|
||||
params = b'\x02\x01\x00\x00\x00\x01' # basic
|
||||
self._subscribe_on_port(params)
|
||||
def _switch_mode(self, simple):
|
||||
self._subscribe_on_port(chr(simple) + b'\x01\x00\x00\x00\x01')
|
||||
|
||||
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC):
|
||||
if mode not in (TILT_SENSOR_MODE_BASIC, TILT_SENSOR_MODE_SOME1, TILT_SENSOR_MODE_FULL):
|
||||
raise ValueError("Wrong tilt sensor mode: 0x%x", mode)
|
||||
self._switch_mode(mode)
|
||||
self._subscribers.append(callback) # TODO: maybe join it into `_subscribe_on_port`
|
||||
|
||||
# 1b0e00 0a00 47 3a020100000001
|
||||
# 1b0e00 0a00 47 3a020100000001
|
||||
|
||||
# 1b0e000a00 47 3a030100000001 - sent finish?
|
||||
|
||||
def unsubscribe(self, callback):
|
||||
self._subscribers.remove(callback)
|
||||
if not self._subscribers:
|
||||
self._switch_mode(3)
|
||||
|
||||
def handle_notification(self, data):
|
||||
if len(data) == 5:
|
||||
state = get_byte(data, 4)
|
||||
self._notify_subscribers(state)
|
||||
elif len(data) == 6:
|
||||
# TODO: how to interpret these 2 bytes?
|
||||
self._notify_subscribers(get_byte(data, 4), get_byte(data, 5))
|
||||
else:
|
||||
log.warning("Unexpected length for tilt sensor data: %s", len(data))
|
||||
|
||||
|
||||
class Button(Peripheral):
|
||||
def __init__(self, parent):
|
||||
@ -147,6 +174,3 @@ LISTEN_ENCODER_ON_A = b' \x0a\x00 \x41\x37 \x02\x01\x00\x00\x00\x01'
|
||||
LISTEN_ENCODER_ON_B = b' \x0a\x00 \x41\x38 \x02\x01\x00\x00\x00\x01'
|
||||
LISTEN_ENCODER_ON_C = b' \x0a\x00 \x41\x01 \x02\x01\x00\x00\x00\x01'
|
||||
LISTEN_ENCODER_ON_D = b' \x0a\x00 \x41\x02 \x02\x01\x00\x00\x00\x01'
|
||||
|
||||
LISTEN_TILT_BASIC = b' \x0a\x00 \x41\x3a \x02\x01\x00\x00\x00\x01'
|
||||
LISTEN_TILT_FULL = b' \x0a\x00 \x41\x3a \x00\x01\x00\x00\x00\x01'
|
||||
|
20
tests.py
20
tests.py
@ -5,7 +5,7 @@ from threading import Thread
|
||||
|
||||
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB
|
||||
from pylgbst.comms import Connection, str2hex, hex2str
|
||||
from pylgbst.constants import PORT_LED
|
||||
from pylgbst.constants import PORT_LED, TILT_STATES
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@ -61,9 +61,11 @@ class HubMock(MoveHub):
|
||||
class GeneralTest(unittest.TestCase):
|
||||
def _wait_notifications_handled(self, hub):
|
||||
hub.connection.running = False
|
||||
for _ in range(1, 1000):
|
||||
time.sleep(0.1)
|
||||
for _ in range(1, 180):
|
||||
time.sleep(1)
|
||||
log.debug("Waiting for notifications to process...")
|
||||
if hub.connection.finished:
|
||||
log.debug("Done waiting")
|
||||
break
|
||||
|
||||
def test_led(self):
|
||||
@ -77,14 +79,18 @@ class GeneralTest(unittest.TestCase):
|
||||
hub.connection.notifications.append((14, '1b0e00 0f00 04 3a 0128000000000100000001'))
|
||||
time.sleep(1)
|
||||
|
||||
def callback():
|
||||
log.debug("Tilt")
|
||||
def callback(param1, param2=None, param3=None):
|
||||
if param2 is None:
|
||||
log.debug("Tilt: %s", TILT_STATES[param1])
|
||||
else:
|
||||
log.debug("Tilt: %s %s %s", param1, param2, param3)
|
||||
|
||||
hub.tilt_sensor.subscribe(callback)
|
||||
hub.connection.notifications.append((14, "1b0e000500453a05"))
|
||||
hub.connection.notifications.append((14, "1b0e000600453a04fe"))
|
||||
time.sleep(10)
|
||||
self.assertEquals("0a01413a000100000001", hub.connection.writes[0][1])
|
||||
time.sleep(1)
|
||||
self._wait_notifications_handled(hub)
|
||||
# self.assertEquals("0a01413a000100000001", hub.connection.writes[0][1])
|
||||
|
||||
def test_motor(self):
|
||||
conn = ConnectionMock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user