mirror of
https://github.com/undera/pylgbst.git
synced 2020-11-18 19:37:26 -08:00
Timed motor demo works
This commit is contained in:
parent
e7d43f391b
commit
48eef133a1
23
demo.py
23
demo.py
@ -2,7 +2,7 @@ import logging
|
||||
import traceback
|
||||
from time import sleep
|
||||
|
||||
from pylegoboost import MoveHub, COLORS_MAP
|
||||
from pylegoboost import MoveHub, COLORS
|
||||
from pylegoboost.comms import DebugServerConnection, BLEConnection
|
||||
|
||||
log = logging.getLogger("demo")
|
||||
@ -10,19 +10,31 @@ log = logging.getLogger("demo")
|
||||
|
||||
def demo_all(conn):
|
||||
movehub = MoveHub(conn)
|
||||
|
||||
demo_led_colors(movehub)
|
||||
# demo_led_colors(movehub)
|
||||
demo_motors_timed(movehub)
|
||||
|
||||
|
||||
def demo_led_colors(movehub):
|
||||
# LED colors demo
|
||||
log.info("LED colors demo")
|
||||
for color in COLORS_MAP.keys():
|
||||
log.info("Setting LED color to: %s", COLORS_MAP[color])
|
||||
for color in COLORS.keys():
|
||||
log.info("Setting LED color to: %s", COLORS[color])
|
||||
movehub.led.set_color(color)
|
||||
sleep(1)
|
||||
|
||||
|
||||
def demo_motors_timed(movehub):
|
||||
log.info("Motors movement demo: timed")
|
||||
for level in range(0, 101, 5):
|
||||
level /= 100.0
|
||||
log.info("Speed level: %s%%", level)
|
||||
movehub.motor_A.timed(0.2, level)
|
||||
movehub.motor_B.timed(0.2, -level)
|
||||
movehub.motor_AB.timed(1.5, -0.2, 0.2)
|
||||
movehub.motor_AB.timed(0.5, 1)
|
||||
movehub.motor_AB.timed(0.5, -1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@ -33,4 +45,3 @@ if __name__ == '__main__':
|
||||
connection = BLEConnection().connect()
|
||||
|
||||
demo_all(connection)
|
||||
sleep(10)
|
||||
|
@ -1,3 +1,6 @@
|
||||
import struct
|
||||
import time
|
||||
|
||||
from pylegoboost.constants import *
|
||||
|
||||
|
||||
@ -11,9 +14,9 @@ class MoveHub(object):
|
||||
self.connection = connection
|
||||
|
||||
self.led = LED(self)
|
||||
# self.motor_a
|
||||
# self.motor_b
|
||||
# self.motor_ab
|
||||
self.motor_A = EncodedMotor(self, PORT_A)
|
||||
self.motor_B = EncodedMotor(self, PORT_B)
|
||||
self.motor_AB = EncodedMotor(self, PORT_AB)
|
||||
|
||||
# self.port_c
|
||||
# self.port_d
|
||||
@ -37,16 +40,67 @@ class Peripheral(object):
|
||||
|
||||
class LED(Peripheral):
|
||||
def set_color(self, color):
|
||||
if color not in COLORS_MAP:
|
||||
if color not in COLORS:
|
||||
raise ValueError("Color %s is not in list of available colors" % color)
|
||||
|
||||
cmd = CMD_SET_COLOR + chr(color)
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, cmd)
|
||||
|
||||
|
||||
class InteractiveMotor(object):
|
||||
pass
|
||||
class EncodedMotor(Peripheral):
|
||||
PACKET_VER = b'\x01'
|
||||
SET_PORT_VAL = b'\x81'
|
||||
MOTOR_TIMED_END = b'\x64\x7f\x03'
|
||||
TRAILER = b'\x64\x7f\x03' # NOTE: \x64 is 100, might mean something
|
||||
TIMED_GROUP = b'\x0A'
|
||||
TIMED_SINGLE = b'\x09'
|
||||
MOVEMENT_TYPE = b'\x11'
|
||||
|
||||
def __init__(self, parent, port):
|
||||
super(EncodedMotor, self).__init__(parent)
|
||||
if port not in PORTS:
|
||||
raise ValueError("Invalid port for motor: %s" % port)
|
||||
self.port = port
|
||||
|
||||
def _speed_abs(self, relative):
|
||||
relative *= 100
|
||||
if relative < 0:
|
||||
relative += 255
|
||||
return int(relative)
|
||||
|
||||
def timed(self, seconds, speed_primary=1, speed_secondary=None, async=False):
|
||||
if speed_primary < -1 or speed_primary > 1:
|
||||
raise ValueError("Invalid primary motor speed value: %s", speed_primary)
|
||||
|
||||
if speed_secondary is None:
|
||||
speed_secondary = speed_primary
|
||||
|
||||
if speed_secondary < -1 or speed_secondary > 1:
|
||||
raise ValueError("Invalid secondary motor speed value: %s", speed_primary)
|
||||
|
||||
time_ms = int(seconds * 1000)
|
||||
|
||||
# set for port
|
||||
command = self.SET_PORT_VAL + chr(self.port)
|
||||
|
||||
# movement type
|
||||
command += self.MOVEMENT_TYPE
|
||||
command += self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
|
||||
|
||||
command += struct.pack('<H', time_ms)
|
||||
|
||||
command += chr(self._speed_abs(speed_primary))
|
||||
|
||||
###
|
||||
if self.port == PORT_AB:
|
||||
command += chr(self._speed_abs(speed_secondary))
|
||||
|
||||
command += self.TRAILER
|
||||
|
||||
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, chr(len(command)) + self.PACKET_VER + command)
|
||||
if not async:
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
class ColorDistanceSensor(object):
|
||||
class ColorDistanceSensor(Peripheral):
|
||||
pass
|
||||
|
@ -37,7 +37,6 @@ class Connection(object):
|
||||
def read(self, handle):
|
||||
pass
|
||||
|
||||
# TODO: it always writes same handle, hardcode it?
|
||||
@abstractmethod
|
||||
def write(self, handle, data):
|
||||
pass
|
||||
@ -57,10 +56,11 @@ class ConnectionMock(Connection):
|
||||
pass
|
||||
|
||||
def write(self, handle, data):
|
||||
pass
|
||||
log.debug("Writing to %s: %s", handle, data.encode("hex"))
|
||||
|
||||
def read(self, handle):
|
||||
pass
|
||||
log.debug("Reading from: %s", handle)
|
||||
return None # TODO
|
||||
|
||||
|
||||
class BLEConnection(Connection):
|
||||
|
@ -1,3 +1,4 @@
|
||||
# GENERAL
|
||||
LEGO_MOVE_HUB = "LEGO Move Hub"
|
||||
DEVICE_NAME = 0x07
|
||||
MOVE_HUB_HARDWARE_HANDLE = 0x0E
|
||||
@ -6,6 +7,7 @@ MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
|
||||
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
|
||||
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
|
||||
|
||||
# COLORS
|
||||
COLOR_OFF = 0x00
|
||||
COLOR_PINK = 0x01
|
||||
COLOR_PURPLE = 0x02
|
||||
@ -17,7 +19,7 @@ COLOR_YELLOW = 0x07
|
||||
COLOR_ORANGE = 0x09
|
||||
COLOR_RED = 0x09
|
||||
COLOR_WHITE = 0x0a
|
||||
COLORS_MAP = {
|
||||
COLORS = {
|
||||
COLOR_OFF: "OFF",
|
||||
COLOR_PINK: "PINK",
|
||||
COLOR_PURPLE: "PURPLE",
|
||||
@ -32,3 +34,12 @@ COLORS_MAP = {
|
||||
}
|
||||
|
||||
CMD_SET_COLOR = b'\x08\x00\x81\x32\x11\x51\x00'
|
||||
|
||||
# PORTS
|
||||
PORT_A = 0x37
|
||||
PORT_B = 0x38
|
||||
PORT_AB = 0x39
|
||||
PORT_C = 0x01
|
||||
PORT_D = 0x02
|
||||
|
||||
PORTS = [PORT_A, PORT_B, PORT_AB, PORT_C, PORT_D]
|
||||
|
6
test.py
6
test.py
@ -2,12 +2,12 @@ import logging
|
||||
import unittest
|
||||
|
||||
from demo import demo_all
|
||||
from pylegoboost.comms import ConnectionMock, DebugServerConnection
|
||||
from pylegoboost.comms import ConnectionMock
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class GeneralTest(unittest.TestCase):
|
||||
def test_capabilities(self):
|
||||
#conn = ConnectionMock()
|
||||
demo_all(DebugServerConnection())
|
||||
conn = ConnectionMock()
|
||||
demo_all(conn)
|
||||
|
Loading…
x
Reference in New Issue
Block a user