1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

LED demo works

This commit is contained in:
Andrey Pohilko 2017-09-13 09:39:41 +03:00
parent 6be73c45af
commit 14daa67ede
5 changed files with 156 additions and 153 deletions

36
demo.py Normal file
View File

@ -0,0 +1,36 @@
import logging
import traceback
from time import sleep
from pylegoboost import MoveHub, COLORS_MAP
from pylegoboost.transport import DebugServerConnection, BLEConnection
log = logging.getLogger("demo")
def demo_all(conn):
movehub = MoveHub(conn)
demo_led_colors(movehub)
def demo_led_colors(movehub):
# LED colors demo
log.info("LED colors demo")
for color in COLORS_MAP.keys():
log.info("Setting LED color to: %s", COLORS_MAP[color])
movehub.led.set_color(color)
sleep(1)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
try:
connection = DebugServerConnection()
except BaseException:
logging.warning("Failed to use debug server: %s", traceback.format_exc())
connection = BLEConnection().connect()
demo_all(connection)
sleep(10)

View File

@ -0,0 +1,49 @@
from pylegoboost.constants import *
class MoveHub(object):
"""
:type connection: pylegoboost.transport.Connection
:type led: LED
"""
def __init__(self, connection):
self.connection = connection
self.led = LED(self)
# self.motor_a
# self.motor_b
# self.motor_ab
# self.port_c
# self.port_d
# self.button
# self.tilt_sensor
class Peripheral(object):
"""
:type parent: MoveHub
"""
def __init__(self, parent):
super(Peripheral, self).__init__()
self.parent = parent
class LED(Peripheral):
def set_color(self, color):
if color not in COLORS_MAP:
raise ValueError("Color %s is not in list of available colors" % color)
cmd = CMD_SET_COLOR + chr(color)
self.parent.connection.write(MOVE_HUB_HARDWARE_HANDLE, cmd)
class InteractiveMotor(object):
pass
class ColorDistanceSensor(object):
pass

View File

@ -1,137 +1,34 @@
LEGO_MOVE_HUB = "LEGO Move Hub"
DEVICE_NAME = 0x07
MOVE_HUB_HARDWARE_HANDLE = 0x0E
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
DEVICE_NAME = 0x07
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
# Ports
PORT_A = 0x37
PORT_B = 0x38
PORT_C = 0x01
PORT_D = 0x02
PORT_AB = 0x39
COLOR_OFF = 0x00
COLOR_PINK = 0x01
COLOR_PURPLE = 0x02
COLOR_BLUE = 0x03
COLOR_LIGHTBLUE = 0x04
COLOR_CYAN = 0x05
COLOR_GREEN = 0x06
COLOR_YELLOW = 0x07
COLOR_ORANGE = 0x09
COLOR_RED = 0x09
COLOR_WHITE = 0x0a
COLORS_MAP = {
COLOR_OFF: "OFF",
COLOR_PINK: "PINK",
COLOR_PURPLE: "PURPLE",
COLOR_BLUE: "BLUE",
COLOR_LIGHTBLUE: "LIGHTBLUE",
COLOR_CYAN: "CYAN",
COLOR_GREEN: "GREEN",
COLOR_YELLOW: "YELLOW",
COLOR_ORANGE: "ORANGE",
COLOR_RED: "RED",
COLOR_WHITE: "WHITE"
}
# Commands for setting RGB LED color
SET_LED_OFF = b'\x08\x00\x81\x32\x11\x51\x00\x00'
SET_LED_PINK = b'\x08\x00\x81\x32\x11\x51\x00\x01'
SET_LED_PURPLE = b'\x08\x00\x81\x32\x11\x51\x00\x02'
SET_LED_BLUE = b'\x08\x00\x81\x32\x11\x51\x00\x03'
SET_LED_LIGHTBLUE = b'\x08\x00\x81\x32\x11\x51\x00\x04'
SET_LED_CYAN = b'\x08\x00\x81\x32\x11\x51\x00\x05'
SET_LED_GREEN = b'\x08\x00\x81\x32\x11\x51\x00\x06'
SET_LED_YELLOW = b'\x08\x00\x81\x32\x11\x51\x00\x07'
SET_LED_ORANGE = b'\x08\x00\x81\x32\x11\x51\x00\x08'
SET_LED_RED = b'\x08\x00\x81\x32\x11\x51\x00\x09'
SET_LED_WHITE = b'\x08\x00\x81\x32\x11\x51\x00\x0A'
SET_LED_COLOR = [SET_LED_OFF,
SET_LED_PINK,
SET_LED_PURPLE,
SET_LED_BLUE,
SET_LED_LIGHTBLUE,
SET_LED_CYAN,
SET_LED_GREEN,
SET_LED_YELLOW,
SET_LED_ORANGE,
SET_LED_RED,
SET_LED_WHITE]
# Colors:
LED_COLORS = ['OFF', 'PINK', 'PURPLE', 'BLUE', 'LIGHTBLUE', 'CYAN', 'GREEN', 'YELLOW', 'ORANGE', 'RED', 'WHITE']
# Motors:
MOTOR_A = bytes([0x37])
MOTOR_B = bytes([0x38])
MOTOR_AB = bytes([0x39])
MOTOR_C = bytes([0x01])
MOTOR_D = bytes([0x02])
# a group of all single motors
MOTORS = [MOTOR_A, MOTOR_B, MOTOR_AB, MOTOR_C, MOTOR_D]
# a group of 1 is silly but there might be other pairs in the future
MOTOR_PAIRS = [MOTOR_AB]
# Commands for Interactive Motors (Timed):
# Motor A, B, C, D: 12-byte commands
# Motor AB: 13-byte commands
MOTOR_TIMED_INI = b'\x0c\x01\x81'
MOTOR_TIMED_MID = b'\x11\x09'
MOTOR_TIMED_END = b'\x64\x7f\x03'
MOTORS_TIMED_INI = b'\x0d\x01\x81'
MOTORS_TIMED_MID = b'\x11\x0A'
MOTORS_TIMED_END = b'\x64\x7f\x03'
# Commands for Interactive Motors (Angle):
# Motor A, B, C, D: 14-byte commands
# Motor AB: 15-byte commands
MOTOR_ANGLE_INI = b'\x0e\x01\x81'
MOTOR_ANGLE_MID = b'\x11\x0b'
MOTOR_ANGLE_END = b'\x64\x7f\x03'
MOTORS_ANGLE_INI = b'\x0f\x01\x81'
MOTORS_ANGLE_MID = b'\x11\x0c'
MOTORS_ANGLE_END = b'\x64\x7f\x03'
# Commands for WeDo Motors (just Duty Cycle):
MOTOR_WEDO_INI = b'\x08\x00\x81'
MOTOR_WEDO_MID = b'\x11\x51\x00'
# Commands for Color Sensor
LISTEN_COLOR_SENSOR_ON_C = b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01'
LISTEN_COLOR_SENSOR_ON_D = b'\x0a\x00\x41\x02\x08\x01\x00\x00\x00\x01'
# Sensor Colors:
COLOR_SENSOR_COLORS = ['BLACK', '', '', 'BLUE', '', 'GREEN', '', 'YELLOW', '', 'RED', 'WHITE']
# Commands for Distance Sensor
LISTEN_DIST_SENSOR_ON_C = b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01'
LISTEN_DIST_SENSOR_ON_D = b'\x0a\x00\x41\x02\x08\x01\x00\x00\x00\x01'
# Commands for Reading Encoders
LISTEN_ENCODER_ON_A = b'\x0a\x00\x41\x37\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_B = b'\x0a\x00\x41\x38\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_C = b'\x0a\x00\x41\x01\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_D = b'\x0a\x00\x41\x02\x02\x01\x00\x00\x00\x01'
#
ENCODER_MID = 2147483648
ENCODER_MAX = 4294967296
# Commands for Reading Button
LISTEN_BUTTON = b'\x05\x00\x01\x02\x02'
BUTTON_PRESSED = '\x01'
BUTTON_RELEASED = '\x00'
# Commands for Tilt Sensor
LISTEN_TILT_BASIC = b'\x0a\x00\x41\x3a\x02\x01\x00\x00\x00\x01'
LISTEN_TILT_FULL = b'\x0a\x00\x41\x3a\x00\x01\x00\x00\x00\x01'
TILT_HORIZ = 0x00
TILT_UP = 0x01
TILT_DOWN = 0x02
TILT_RIGHT = 0x03
TILT_LEFT = 0x04
TILT_INVERT = 0x05
TILT_BASIC_VALUES = [TILT_HORIZ, TILT_UP, TILT_DOWN, TILT_RIGHT, TILT_LEFT, TILT_INVERT]
TILT_BASIC_TEXT = ['TILT_HORIZ', 'TILT_UP', 'TILT_DOWN', 'TILT_RIGHT', 'TILT_LEFT', 'TILT_INVERT']
# Commands for WeDo Tilt Sensor
# There ARE more modes, use just this one for now
LISTEN_WEDO_TILT_ON_C = b'\x0a\x00\x41\x01\x00\x01\x00\x00\x00\x01'
LISTEN_WEDO_TILT_ON_D = b'\x0a\x00\x41\x02\x00\x01\x00\x00\x00\x01'
# Commands for WeDo Distance Sensor
# There MIGHT be more modes, use just this one for now
LISTEN_WEDO_DISTANCE_ON_C = b'\x0a\x00\x41\x01\x00\x01\x00\x00\x00\x01'
LISTEN_WEDO_DISTANCE_ON_D = b'\x0a\x00\x41\x02\x00\x01\x00\x00\x00\x01'
CMD_SET_COLOR = b'\x08\x00\x81\x32\x11\x51\x00'

View File

@ -5,43 +5,62 @@ import traceback
from abc import abstractmethod
from gattlib import DiscoveryService, GATTRequester
from pylegoboost.constants import DEVICE_NAME
from pylegoboost.constants import DEVICE_NAME, LEGO_MOVE_HUB
log = logging.getLogger('transport')
LEGO_MOVE_HUB = "LEGO Move Hub"
def strtohex(sval):
return " ".join("{:02x}".format(ord(c)) for c in sval)
# noinspection PyMethodOverriding
class Requester(GATTRequester):
def __init__(self, p_object, *args, **kwargs):
super(Requester, self).__init__(p_object, *args, **kwargs)
self.notification_sink = None
def on_notification(self, handle, data):
log.debug("Notification on handle %s: %s", handle, strtohex(data))
if self.notification_sink:
self.notification_sink(handle, data)
def on_indication(self, handle, data):
log.debug("Indication on handle %s: %s", handle, strtohex(data))
log.debug("Indication on handle %s: %s", handle, data.encode("hex"))
class Transport(object):
class Connection(object):
@abstractmethod
def read(self, handle):
pass
# TODO: it always writes same handle, hardcode it?
@abstractmethod
def write(self, handle, data):
pass
@abstractmethod
def notify(self, handle, data):
pass
class BLETransport(Transport):
class ConnectionMock(Connection):
"""
For unit testing purposes
"""
def notify(self, handle, data):
pass
def write(self, handle, data):
pass
def read(self, handle):
pass
class BLEConnection(Connection):
"""
:type requester: Requester
"""
def __init__(self):
super(Transport, self).__init__()
super(Connection, self).__init__()
self.requester = None
def connect(self, bt_iface_name='hci0'):
@ -63,6 +82,7 @@ class BLETransport(Transport):
def _get_requester(self, address, bt_iface_name):
self.requester = Requester(address, True, bt_iface_name)
self.requester.notification_sink = self.notify
def read(self, handle):
log.debug("Reading from: %s", handle)
@ -73,9 +93,12 @@ class BLETransport(Transport):
return data
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data)
log.debug("Writing to %s: %s", handle, data.encode("hex"))
return self.requester.write_by_handle(handle, data)
def notify(self, handle, data):
log.debug("Notification on %s: %s", handle, data.encode("hex"))
class DebugServer(object):
def __init__(self, ble_trans):
@ -126,7 +149,7 @@ class DebugServer(object):
pass
class DebugServerTransport(Transport):
class DebugServerConnection(Connection):
def __init__(self):
self.sock = socket.socket()
self.sock.connect(('localhost', 9090))

16
test.py
View File

@ -1,20 +1,18 @@
import logging
import unittest
from time import sleep
from pylegoboost.constants import *
from pylegoboost.transport import BLETransport
from demo import demo_all
from pylegoboost.transport import ConnectionMock
logging.basicConfig(level=logging.DEBUG)
class GeneralTest(unittest.TestCase):
def test_1(self):
transport = BLETransport()
transport.connect()
transport.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
transport.write(MOVE_HUB_HARDWARE_HANDLE, LISTEN_DIST_SENSOR_ON_C)
sleep(60)
def test_capabilities(self):
conn = ConnectionMock()
demo_all(conn)
# transport.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
# transport.write(MOVE_HUB_HARDWARE_HANDLE, b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01')
# from pylegoboost import DebugServer
# srv = DebugServer(None)