1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Deeper tilt sensor support

This commit is contained in:
Andrey Pohilko 2017-09-14 16:40:57 +03:00
parent fc30bf2c1d
commit eeb1891cb6
6 changed files with 45 additions and 28 deletions

View File

@ -26,9 +26,9 @@ def demo_tilt_sensor_precise(movehub):
demo_tilt_sensor_simple.cnt = 0 demo_tilt_sensor_simple.cnt = 0
limit = 100 limit = 100
def callback(param1, param2): def callback(pitch, roll, yaw):
demo_tilt_sensor_simple.cnt += 1 demo_tilt_sensor_simple.cnt += 1
log.info("Tilt #%s of %s: %s %s", demo_tilt_sensor_simple.cnt, limit, param1, param2) log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw)
movehub.tilt_sensor.subscribe(callback, mode=TILT_SENSOR_MODE_FULL) movehub.tilt_sensor.subscribe(callback, mode=TILT_SENSOR_MODE_FULL)
while demo_tilt_sensor_simple.cnt < limit: while demo_tilt_sensor_simple.cnt < limit:
@ -129,9 +129,9 @@ if __name__ == '__main__':
connection = BLEConnection().connect() connection = BLEConnection().connect()
hub = MoveHub(connection) hub = MoveHub(connection)
#demo_tilt_sensor_precise(hub) demo_tilt_sensor_precise(hub)
demo_all(hub) # demo_all(hub)
log.info("Sleeping 60s") log.info("Sleeping 60s")
sleep(60) sleep(60)

View File

@ -81,6 +81,8 @@ class MoveHub(object):
self._handle_sensor_data(data) self._handle_sensor_data(data)
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK: elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)]) log.debug("Sensor subscribe ack on port %s", PORTS[get_byte(data, 3)])
elif msg_type == MSG_PORT_CMD_ERROR:
log.warning("Command error: %s", str2hex(data[3:]))
else: else:
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig)) log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
else: else:
@ -88,6 +90,9 @@ class MoveHub(object):
def _handle_sensor_data(self, data): def _handle_sensor_data(self, data):
port = get_byte(data, 3) port = get_byte(data, 3)
if port not in self.devices:
log.warning("Notification on port with no device: %s", PORTS[port])
return
sensor = self.devices[port] sensor = self.devices[port]
if isinstance(sensor, TiltSensor): if isinstance(sensor, TiltSensor):
sensor.handle_notification(data) sensor.handle_notification(data)

View File

@ -48,7 +48,7 @@ else:
def int2byte(val): def int2byte(val):
return bytes(val, ) return bytes((val,))
# noinspection PyMethodOverriding # noinspection PyMethodOverriding

View File

@ -59,10 +59,14 @@ PORTS = {
} }
# PACKET TYPES # PACKET TYPES
MSG_DEVICE_INFO = 0x01
MSG_PING_RESPONSE = 0x03
MSG_PORT_INFO = 0x04 MSG_PORT_INFO = 0x04
MSG_PORT_CMD_ERROR = 0x05
MSG_SET_PORT_VAL = 0x81 MSG_SET_PORT_VAL = 0x81
MSG_PORT_STATUS = 0x82 MSG_PORT_STATUS = 0x82
MSG_SENSOR_SUBSCRIBE = 0x41 MSG_SENSOR_SUBSCRIBE = 0x41
#MSG_SENSOR_UNSUBSCRIBE = 0x42
MSG_SENSOR_DATA = 0x45 MSG_SENSOR_DATA = 0x45
MSG_SENSOR_SUBSCRIBE_ACK = 0x47 MSG_SENSOR_SUBSCRIBE_ACK = 0x47
@ -91,10 +95,11 @@ STATUS_CONFLICT = 0x05
STATUS_FINISHED = 0x0a STATUS_FINISHED = 0x0a
# TILT # TILT
TILT_SENSOR_MODE_FULL = 0x00 TILT_SENSOR_MODE_2AXIS_FULL = 0x00
TILT_SENSOR_MODE_2AXIS = 0x01 TILT_SENSOR_MODE_2AXIS_SIMPLE = 0x01
TILT_SENSOR_MODE_BASIC = 0x02 TILT_SENSOR_MODE_BASIC = 0x02
TILT_SENSOR_MODE_OFF = 0x03 TILT_SENSOR_MODE_BUMP = 0x03
TILT_SENSOR_MODE_FULL = 0x04
TILT_HORIZONTAL = 0x00 TILT_HORIZONTAL = 0x00
TILT_UP = 0x01 TILT_UP = 0x01

View File

@ -126,38 +126,45 @@ class EncodedMotor(Peripheral):
class TiltSensor(Peripheral): class TiltSensor(Peripheral):
TRAILER = b'\x01\x00\x00\x00'
def __init__(self, parent, port): def __init__(self, parent, port):
super(TiltSensor, self).__init__(parent, port) super(TiltSensor, self).__init__(parent, port)
self.mode = TILT_SENSOR_MODE_OFF self.mode = None
def _switch_mode(self, mode):
self.mode = mode
self._subscribe_on_port(int2byte(mode) + b'\x01\x00\x00\x00\x01')
def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC): def subscribe(self, callback, mode=TILT_SENSOR_MODE_BASIC):
if mode not in (TILT_SENSOR_MODE_BASIC, TILT_SENSOR_MODE_2AXIS, TILT_SENSOR_MODE_FULL): # TODO: check input for valid mode
raise ValueError("Wrong tilt sensor mode: 0x%x", mode) self.mode = mode
self._subscribe_on_port(int2byte(self.mode) + self.TRAILER + int2byte(1))
self._switch_mode(mode) self._subscribers.add(callback)
self._subscribers.add(callback) # TODO: maybe join it into `_subscribe_on_port`
def unsubscribe(self, callback): def unsubscribe(self, callback):
self._subscribers.remove(callback) self._subscribers.remove(callback)
if not self._subscribers: if not self._subscribers:
self._switch_mode(TILT_SENSOR_MODE_OFF) self._subscribe_on_port(int2byte(self.mode) + self.TRAILER + int2byte(0))
def handle_notification(self, data): def handle_notification(self, data):
if self.mode == TILT_SENSOR_MODE_BASIC: if self.mode == TILT_SENSOR_MODE_BASIC:
self._notify_subscribers(get_byte(data, 4)) state = get_byte(data, 4)
elif self.mode == TILT_SENSOR_MODE_FULL: self._notify_subscribers(state)
elif self.mode == TILT_SENSOR_MODE_2AXIS_SIMPLE:
# TODO: figure out right interpreting of this
state = get_byte(data, 4)
self._notify_subscribers(state)
elif self.mode == TILT_SENSOR_MODE_BUMP:
bump_count = get_byte(data, 4)
self._notify_subscribers(bump_count)
elif self.mode == TILT_SENSOR_MODE_2AXIS_FULL:
roll = self._byte2deg(get_byte(data, 4)) roll = self._byte2deg(get_byte(data, 4))
pitch = self._byte2deg(get_byte(data, 5)) pitch = self._byte2deg(get_byte(data, 5))
self._notify_subscribers(roll, pitch) self._notify_subscribers(roll, pitch)
elif self.mode == TILT_SENSOR_MODE_2AXIS: elif self.mode == TILT_SENSOR_MODE_FULL:
# TODO: figure out right interpreting of this roll = self._byte2deg(get_byte(data, 4))
self._notify_subscribers(get_byte(data, 4)) pitch = self._byte2deg(get_byte(data, 5))
yaw = self._byte2deg(get_byte(data, 6)) # did I get the order right?
self._notify_subscribers(roll, pitch, yaw)
else: else:
log.debug("Got tilt sensor data while in finished mode: %s", self.mode) log.debug("Got tilt sensor data while in unexpected mode: %s", self.mode)
def _byte2deg(self, val): def _byte2deg(self, val):
if val > 90: if val > 90:

View File

@ -5,7 +5,7 @@ from threading import Thread
from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB from pylgbst import MoveHub, COLOR_RED, LED, EncodedMotor, PORT_AB
from pylgbst.comms import Connection, str2hex, hex2str from pylgbst.comms import Connection, str2hex, hex2str
from pylgbst.constants import PORT_LED, TILT_STATES, TILT_SENSOR_MODE_FULL, TILT_SENSOR_MODE_2AXIS, \ from pylgbst.constants import PORT_LED, TILT_STATES, TILT_SENSOR_MODE_2AXIS_FULL, TILT_SENSOR_MODE_2AXIS_SIMPLE, \
MOVE_HUB_HARDWARE_HANDLE MOVE_HUB_HARDWARE_HANDLE
HANDLE = MOVE_HUB_HARDWARE_HANDLE HANDLE = MOVE_HUB_HARDWARE_HANDLE
@ -92,12 +92,12 @@ class GeneralTest(unittest.TestCase):
hub.connection.notifications.append((HANDLE, "1b0e000500453a05")) hub.connection.notifications.append((HANDLE, "1b0e000500453a05"))
hub.connection.notifications.append((HANDLE, "1b0e000a00473a010100000001")) hub.connection.notifications.append((HANDLE, "1b0e000a00473a010100000001"))
time.sleep(1) time.sleep(1)
hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_2AXIS) hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_2AXIS_SIMPLE)
hub.connection.notifications.append((HANDLE, "1b0e000500453a09")) hub.connection.notifications.append((HANDLE, "1b0e000500453a09"))
time.sleep(1) time.sleep(1)
hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_FULL) hub.tilt_sensor.subscribe(callback, TILT_SENSOR_MODE_2AXIS_FULL)
hub.connection.notifications.append((HANDLE, "1b0e000600453a04fe")) hub.connection.notifications.append((HANDLE, "1b0e000600453a04fe"))
time.sleep(1) time.sleep(1)