mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Try using our own thing
This commit is contained in:
parent
9400d881e0
commit
abd502b70c
@ -25,12 +25,15 @@ for device in hub.devices:
|
||||
print(device)
|
||||
```
|
||||
|
||||
## Debug Server
|
||||
|
||||
`sudo python -c "from pylgbst.comms import *; import logging; logging.basicConfig(level=logging.DEBUG); DebugServer(BLEConnection().connect()).start()"`
|
||||
|
||||
## Roadmap
|
||||
|
||||
- Make it 2/3 compatible
|
||||
- Add travis unit tests and coverage
|
||||
- Give nice documentation examples, don't forget to mention logging
|
||||
- make angled motors to be synchronous by default
|
||||
- make angled motors to be synchronous by default => 3-state status
|
||||
- make sure unit tests cover all important code
|
||||
|
||||
## Links
|
||||
|
||||
|
80
demo.py
80
demo.py
@ -5,38 +5,6 @@ from pylgbst import *
|
||||
log = logging.getLogger("demo")
|
||||
|
||||
|
||||
def demo_tilt_sensor_simple(movehub):
|
||||
log.info("Tilt sensor simple test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 10
|
||||
|
||||
def callback(param1):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TILT_STATES[param1])
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_tilt_sensor_precise(movehub):
|
||||
log.info("Tilt sensor precise test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 50
|
||||
|
||||
def callback(pitch, roll, yaw):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw)
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback, mode=TILT_MODE_FULL)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_led_colors(movehub):
|
||||
# LED colors demo
|
||||
log.info("LED colors demo")
|
||||
@ -96,18 +64,36 @@ def demo_port_cd_motor(movehub):
|
||||
sleep(1)
|
||||
|
||||
|
||||
def vernie_head(movehub):
|
||||
portd = EncodedMotor(movehub, PORT_D)
|
||||
while True:
|
||||
angle = 20
|
||||
portd.angled(angle, 0.2)
|
||||
sleep(2)
|
||||
portd.angled(angle, -0.2)
|
||||
sleep(2)
|
||||
portd.angled(angle, -0.2)
|
||||
sleep(2)
|
||||
portd.angled(angle, 0.2)
|
||||
sleep(2)
|
||||
def demo_tilt_sensor_simple(movehub):
|
||||
log.info("Tilt sensor simple test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 10
|
||||
|
||||
def callback(param1):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TILT_STATES[param1])
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_tilt_sensor_precise(movehub):
|
||||
log.info("Tilt sensor precise test. Turn device in different ways.")
|
||||
demo_tilt_sensor_simple.cnt = 0
|
||||
limit = 50
|
||||
|
||||
def callback(pitch, roll, yaw):
|
||||
demo_tilt_sensor_simple.cnt += 1
|
||||
log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw)
|
||||
|
||||
movehub.tilt_sensor.subscribe(callback, mode=TILT_MODE_FULL)
|
||||
while demo_tilt_sensor_simple.cnt < limit:
|
||||
time.sleep(1)
|
||||
|
||||
movehub.tilt_sensor.unsubscribe(callback)
|
||||
|
||||
|
||||
def demo_color_sensor(movehub):
|
||||
@ -117,8 +103,7 @@ def demo_color_sensor(movehub):
|
||||
|
||||
def callback(color, distance=None):
|
||||
demo_color_sensor.cnt += 1
|
||||
color = COLORS[color] if color in COLORS else color
|
||||
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, color, distance)
|
||||
log.info("#%s/%s: Color %s, distance %s", demo_color_sensor.cnt, limit, COLORS[color], distance)
|
||||
|
||||
movehub.color_distance_sensor.subscribe(callback)
|
||||
while demo_color_sensor.cnt < limit:
|
||||
@ -183,7 +168,8 @@ if __name__ == '__main__':
|
||||
|
||||
hub = MoveHub(connection)
|
||||
|
||||
demo_all(hub)
|
||||
demo_motor_sensors(hub)
|
||||
# demo_all(hub)
|
||||
|
||||
log.info("Sleeping 60s")
|
||||
sleep(60)
|
||||
|
@ -122,6 +122,7 @@ COLOR_YELLOW = 0x07
|
||||
COLOR_ORANGE = 0x09
|
||||
COLOR_RED = 0x09
|
||||
COLOR_WHITE = 0x0a
|
||||
COLOR_NONE = 0xFF
|
||||
COLORS = {
|
||||
COLOR_BLACK: "BLACK",
|
||||
COLOR_PINK: "PINK",
|
||||
@ -133,7 +134,8 @@ COLORS = {
|
||||
COLOR_YELLOW: "YELLOW",
|
||||
COLOR_ORANGE: "ORANGE",
|
||||
COLOR_RED: "RED",
|
||||
COLOR_WHITE: "WHITE"
|
||||
COLOR_WHITE: "WHITE",
|
||||
COLOR_NONE: "NONE"
|
||||
}
|
||||
|
||||
# MOTORS
|
||||
|
@ -21,7 +21,7 @@ class Peripheral(object):
|
||||
super(Peripheral, self).__init__()
|
||||
self.parent = parent
|
||||
self.port = port
|
||||
self.working = False
|
||||
self._working = 0 # 3-state, -1 means command sent, 1 means notified on command, 0 means notified on finish
|
||||
self._subscribers = set()
|
||||
self._port_subscription_mode = None
|
||||
|
||||
@ -41,10 +41,13 @@ class Peripheral(object):
|
||||
self._write_to_hub(MSG_SENSOR_SUBSCRIBE, params)
|
||||
|
||||
def started(self):
|
||||
self.working = True
|
||||
self._working = 1
|
||||
|
||||
def finished(self):
|
||||
self.working = False
|
||||
self._working = 0
|
||||
|
||||
def is_working(self):
|
||||
return bool(self._working)
|
||||
|
||||
def subscribe(self, callback, mode, granularity=1):
|
||||
self._port_subscription_mode = mode
|
||||
@ -76,6 +79,7 @@ class LED(Peripheral):
|
||||
raise ValueError("Color %s is not in list of available colors" % color)
|
||||
|
||||
cmd = pack("<?", do_notify) + self.SOMETHING + pack("<B", color)
|
||||
self._working = -1
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, cmd)
|
||||
|
||||
def finished(self):
|
||||
@ -98,14 +102,14 @@ class EncodedMotor(Peripheral):
|
||||
ANGLED_SINGLE = b'\x0B'
|
||||
ANGLED_GROUP = b'\x0C'
|
||||
|
||||
def __init__(self, parent, port):
|
||||
super(EncodedMotor, self).__init__(parent, port)
|
||||
if port not in [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]:
|
||||
raise ValueError("Invalid port for motor: %s" % port)
|
||||
|
||||
def _speed_abs(self, relative):
|
||||
if relative < -1 or relative > 1:
|
||||
raise ValueError("Invalid speed value: %s", relative)
|
||||
if relative < -1:
|
||||
log.warning("Speed cannot be less than -1")
|
||||
relative = -1
|
||||
|
||||
if relative > 1:
|
||||
log.warning("Speed cannot be more than 1")
|
||||
relative = 1
|
||||
|
||||
absolute = round(relative * 100)
|
||||
if absolute < 0:
|
||||
@ -122,6 +126,7 @@ class EncodedMotor(Peripheral):
|
||||
|
||||
command += self.TRAILER
|
||||
|
||||
self._working = -1
|
||||
self._write_to_hub(MSG_SET_PORT_VAL, command)
|
||||
|
||||
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
||||
@ -137,21 +142,32 @@ class EncodedMotor(Peripheral):
|
||||
command += pack('<H', msec)
|
||||
|
||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||
self.__wait_sync(async)
|
||||
|
||||
if not async:
|
||||
time.sleep(seconds)
|
||||
|
||||
def angled(self, angle, speed_primary=1, speed_secondary=None):
|
||||
def angled(self, angle, speed_primary=1, speed_secondary=None, async=False):
|
||||
if speed_secondary is None:
|
||||
speed_secondary = speed_primary
|
||||
|
||||
if angle < 0:
|
||||
angle = -angle
|
||||
speed_primary = -speed_primary
|
||||
speed_secondary = -speed_secondary
|
||||
|
||||
# movement type
|
||||
command = self.ANGLED_GROUP if self.port == PORT_AB else self.ANGLED_SINGLE
|
||||
# angle
|
||||
command += pack('<I', angle)
|
||||
|
||||
self._wrap_and_write(command, speed_primary, speed_secondary)
|
||||
# TODO: how to tell when motor has stopped?
|
||||
self.__wait_sync(async)
|
||||
|
||||
def __wait_sync(self, async):
|
||||
if not async:
|
||||
log.debug("Waiting for sync command work to finish...")
|
||||
while self.is_working():
|
||||
time.sleep(0.05)
|
||||
|
||||
# TODO: how to tell when motor has stopped?
|
||||
|
||||
def handle_port_data(self, data):
|
||||
if self._port_subscription_mode == MOTOR_MODE_ANGLE:
|
||||
@ -172,9 +188,6 @@ class EncodedMotor(Peripheral):
|
||||
|
||||
|
||||
class TiltSensor(Peripheral):
|
||||
def __init__(self, parent, port):
|
||||
super(TiltSensor, self).__init__(parent, port)
|
||||
|
||||
def subscribe(self, callback, mode=TILT_MODE_BASIC, granularity=1):
|
||||
super(TiltSensor, self).subscribe(callback, mode, granularity)
|
||||
|
||||
@ -222,10 +235,10 @@ class ColorDistanceSensor(Peripheral):
|
||||
partial = get_byte(data, 7)
|
||||
if partial:
|
||||
distance += 1.0 / partial
|
||||
self._notify_subscribers(color if color != 0xFF else None, float(distance))
|
||||
self._notify_subscribers(color, float(distance))
|
||||
elif self._port_subscription_mode == CDS_MODE_COLOR_ONLY:
|
||||
color = get_byte(data, 4)
|
||||
self._notify_subscribers(color if color != 0xFF else None)
|
||||
self._notify_subscribers(color)
|
||||
elif self._port_subscription_mode == CDS_MODE_DISTANCE_INCHES:
|
||||
distance = get_byte(data, 4)
|
||||
self._notify_subscribers(distance)
|
||||
|
69
vernie.py
Normal file
69
vernie.py
Normal file
@ -0,0 +1,69 @@
|
||||
from pylgbst import *
|
||||
|
||||
right = RIGHT = 1
|
||||
left = LEFT = -1
|
||||
straight = STRAIGHT = 0
|
||||
|
||||
|
||||
class Vernie(MoveHub):
|
||||
def __init__(self, conn=None):
|
||||
super(Vernie, self).__init__(conn)
|
||||
|
||||
while True:
|
||||
required_devices = (self.color_distance_sensor, self.motor_external)
|
||||
if None not in required_devices:
|
||||
break
|
||||
log.debug("Waiting for required devices to appear: %s", required_devices)
|
||||
time.sleep(1)
|
||||
|
||||
self._head_position = 0
|
||||
self.motor_external.subscribe(self._external_motor_data)
|
||||
|
||||
self._color_detected = COLOR_NONE
|
||||
self._sensor_distance = 10
|
||||
self.color_distance_sensor.subscribe(self._color_distance_data)
|
||||
|
||||
def _external_motor_data(self, data):
|
||||
log.debug("External motor position: %s", data)
|
||||
self._head_position = data
|
||||
|
||||
def _color_distance_data(self, color, distance):
|
||||
log.debug("Color & Distance data: %s %s", COLORS[color], distance)
|
||||
self._sensor_distance = distance
|
||||
self._color_detected = color
|
||||
self.led.set_color(self._color_detected if self._color_detected != COLOR_NONE else COLOR_BLACK)
|
||||
|
||||
def head_to(self, direction=RIGHT, speed=0.25, angle=25):
|
||||
if direction == STRAIGHT:
|
||||
angle = -self._head_position
|
||||
direction = 1
|
||||
|
||||
self.motor_external.angled(direction * angle, speed)
|
||||
|
||||
def program(self):
|
||||
time.sleep(1)
|
||||
while True:
|
||||
self.head_to(LEFT)
|
||||
time.sleep(1)
|
||||
|
||||
self.head_to(STRAIGHT)
|
||||
time.sleep(1)
|
||||
|
||||
self.head_to(RIGHT)
|
||||
time.sleep(1)
|
||||
|
||||
self.head_to(STRAIGHT)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
try:
|
||||
connection = DebugServerConnection()
|
||||
except BaseException:
|
||||
logging.warning("Failed to use debug server: %s", traceback.format_exc())
|
||||
connection = BLEConnection().connect()
|
||||
|
||||
vernie = Vernie(connection)
|
||||
vernie.program()
|
Loading…
x
Reference in New Issue
Block a user