1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

refactoring

This commit is contained in:
Andrey Pohilko 2017-09-23 10:58:27 +03:00
parent b77831f225
commit a5b0a567df
3 changed files with 40 additions and 38 deletions

View File

@ -10,14 +10,18 @@ else:
queue = queue # just to use it
def str2hex(data): # TODO: eliminate it
def str2hex(data):
return binascii.hexlify(data).decode("utf8")
def get_byte(seq, index):
def usbyte(seq, index):
return struct.unpack("<B", seq[index:index + 1])[0]
def ushort(seq, index):
return struct.unpack("<H", seq[index:index + 2])[0]
# GENERAL
MOVE_HUB_HARDWARE_HANDLE = 0x0E
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'

View File

@ -84,7 +84,7 @@ class MoveHub(object):
data = data[3:]
log.debug("Notification on %s: %s", handle, str2hex(orig))
msg_type = get_byte(data, 2)
msg_type = usbyte(data, 2)
if msg_type == MSG_PORT_INFO:
self._handle_port_info(data)
@ -93,7 +93,7 @@ class MoveHub(object):
elif msg_type == MSG_SENSOR_DATA:
self._handle_sensor_data(data)
elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK:
port = get_byte(data, 3)
port = usbyte(data, 3)
log.debug("Sensor subscribe ack on port %s", PORTS[port])
self.devices[port].finished()
elif msg_type == MSG_PORT_CMD_ERROR:
@ -107,13 +107,13 @@ class MoveHub(object):
log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig))
def _handle_device_info(self, data):
if get_byte(data, 3) == 2:
if usbyte(data, 3) == 2:
self.button.handle_port_data(data)
else:
log.warning("Unhandled device info: %s", str2hex(data))
def _handle_sensor_data(self, data):
port = get_byte(data, 3)
port = usbyte(data, 3)
if port not in self.devices:
log.warning("Notification on port with no device: %s", PORTS[port])
return
@ -122,8 +122,8 @@ class MoveHub(object):
device.queue_port_data(data)
def _handle_port_status(self, data):
port = get_byte(data, 3)
status = get_byte(data, 4)
port = usbyte(data, 3)
status = usbyte(data, 4)
if status == STATUS_STARTED:
self.devices[port].started()
@ -135,14 +135,14 @@ class MoveHub(object):
log.warning("Unhandled status value: 0x%x", status)
def _handle_port_info(self, data):
port = get_byte(data, 3)
status = get_byte(data, 4)
port = usbyte(data, 3)
status = usbyte(data, 4)
if status == self.DEV_STATUS_DETACHED:
log.info("Detached %s", self.devices[port])
self.devices[port] = None
elif status == self.DEV_STATUS_DEVICE or status == self.DEV_STATUS_GROUP:
dev_type = get_byte(data, 5)
dev_type = usbyte(data, 5)
self._attach_device(dev_type, port)
else:
raise ValueError("Unhandled device status: %s", status)

View File

@ -167,8 +167,6 @@ class EncodedMotor(Peripheral):
relative = 1
absolute = round(relative * 100) # scale of 100 is proven by experiments
if absolute < 0:
absolute += 255
return int(absolute)
def _wrap_and_write(self, mtype, params, speed_primary, speed_secondary):
@ -177,9 +175,9 @@ class EncodedMotor(Peripheral):
params = pack("<B", self.MOVEMENT_TYPE) + pack("<B", mtype) + params
params += pack("<B", self._speed_abs(speed_primary))
params += pack("<b", self._speed_abs(speed_primary))
if self.port == PORT_AB:
params += pack("<B", self._speed_abs(speed_secondary))
params += pack("<b", self._speed_abs(speed_secondary))
params += self.TRAILER
@ -235,7 +233,7 @@ class EncodedMotor(Peripheral):
self._notify_subscribers(rotation)
elif self._port_subscription_mode == self.SENSOR_SOMETHING1:
# TODO: understand what it means
rotation = unpack("<B", data[4])[0]
rotation = usbyte(data, 4)
self._notify_subscribers(rotation)
elif self._port_subscription_mode == self.SENSOR_SPEED:
rotation = unpack("<b", data[4])[0]
@ -289,22 +287,22 @@ class TiltSensor(Peripheral):
def handle_port_data(self, data):
if self._port_subscription_mode == self.MODE_3AXIS_SIMPLE:
state = unpack("<B", data[4:5])[0]
state = usbyte(data, 4)
self._notify_subscribers(state)
elif self._port_subscription_mode == self.MODE_2AXIS_SIMPLE:
state = unpack("<B", data[4:5])[0]
state = usbyte(data, 4)
self._notify_subscribers(state)
elif self._port_subscription_mode == self.MODE_BUMP_COUNT:
bump_count = unpack("<H", data[4:6])[0]
bump_count = ushort(data, 4)
self._notify_subscribers(bump_count)
elif self._port_subscription_mode == self.MODE_2AXIS_FULL:
roll = self._byte2deg(unpack("<B", data[4:5])[0])
pitch = self._byte2deg(unpack("<B", data[5:6])[0])
roll = self._byte2deg(usbyte(data, 4))
pitch = self._byte2deg(usbyte(data, 5))
self._notify_subscribers(roll, pitch)
elif self._port_subscription_mode == self.MODE_3AXIS_FULL:
roll = self._byte2deg(unpack("<B", data[4:5])[0])
pitch = self._byte2deg(unpack("<B", data[5:6])[0])
yaw = self._byte2deg(unpack("<B", data[6:7])[0]) # did I get the order right?
roll = self._byte2deg(usbyte(data, 4))
pitch = self._byte2deg(usbyte(data, 5))
yaw = self._byte2deg(usbyte(data, 6)) # did I get the order right?
self._notify_subscribers(roll, pitch, yaw)
else:
log.debug("Got tilt sensor data while in unexpected mode: %s", self._port_subscription_mode)
@ -337,23 +335,23 @@ class ColorDistanceSensor(Peripheral):
def handle_port_data(self, data):
if self._port_subscription_mode == self.COLOR_DISTANCE_FLOAT:
color = unpack("<B", data[4:5])[0]
distance = unpack("<B", data[5:6])[0]
partial = unpack("<B", data[7:8])[0]
color = usbyte(data, 4)
distance = usbyte(data, 5)
partial = usbyte(data, 7)
if partial:
distance += 1.0 / partial
self._notify_subscribers(color, float(distance))
elif self._port_subscription_mode == self.COLOR_ONLY:
color = unpack("<B", data[4:5])[0]
color = usbyte(data, 4)
self._notify_subscribers(color)
elif self._port_subscription_mode == self.DISTANCE_INCHES:
distance = unpack("<B", data[4:5])[0]
distance = usbyte(data, 4)
self._notify_subscribers(distance)
elif self._port_subscription_mode == self.DISTANCE_HOW_CLOSE:
distance = unpack("<B", data[4:5])[0]
distance = usbyte(data, 4)
self._notify_subscribers(distance)
elif self._port_subscription_mode == self.DISTANCE_SUBINCH_HOW_CLOSE:
distance = unpack("<B", data[4:5])[0]
distance = usbyte(data, 4)
self._notify_subscribers(distance)
elif self._port_subscription_mode == self.OFF1 or self._port_subscription_mode == self.OFF2:
log.info("Turned off led on %s", self)
@ -362,12 +360,12 @@ class ColorDistanceSensor(Peripheral):
self._notify_subscribers(count)
elif self._port_subscription_mode == self.STREAM_3_VALUES:
# TODO: understand better meaning of these 3 values
val1 = unpack("<H", data[4:6])[0]
val2 = unpack("<H", data[6:8])[0]
val3 = unpack("<H", data[8:10])[0]
val1 = ushort(data, 4)
val2 = ushort(data, 6)
val3 = ushort(data, 8)
self._notify_subscribers(val1, val2, val3)
elif self._port_subscription_mode == self.LUMINOSITY:
luminosity = unpack("<H", data[4:6])[0] / 1023.0
luminosity = ushort(data, 4) / 1023.0
self._notify_subscribers(luminosity)
else: # TODO: support whatever we forgot
log.debug("Unhandled data in mode %s: %s", self._port_subscription_mode, str2hex(data))
@ -390,7 +388,7 @@ class Voltage(Peripheral):
# good 7.5v ~= 3892
# liion 5v ~= 2100
def handle_port_data(self, data):
val = unpack("<H", data[4:6])[0]
val = ushort(data, 4)
self.last_value = val / 4096.0
if self.last_value < 0.2:
logging.warning("Battery low! %s%%", int(100 * self.last_value))
@ -409,7 +407,7 @@ class Amperage(Peripheral):
super(Amperage, self).subscribe(callback, mode, granularity)
def handle_port_data(self, data):
val = unpack("<H", data[4:6])[0]
val = ushort(data, 4)
self.last_value = val / 4096.0
self._notify_subscribers(self.last_value)
@ -440,7 +438,7 @@ class Button(Peripheral):
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, pack("<B", len(cmd) + 1) + cmd)
def handle_port_data(self, data):
param = unpack("<B", data[5:6])[0]
param = usbyte(data, 5)
if self.in_progress():
self.finished()
self._notify_subscribers(bool(param))