1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Started experiments, notifications work

This commit is contained in:
Andrey Pohilko 2017-09-12 22:25:16 +03:00
commit 6be73c45af
6 changed files with 321 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea
*.iml
*.pyc

21
README.md Normal file
View File

@ -0,0 +1,21 @@
# Ideas
Make a server that will listen to wire proto to speed up debug
# Links
https://github.com/JorgePe/BOOSTreveng
https://github.com/hobbyquaker/node-movehub
https://github.com/JorgePe/pyb00st
https://github.com/RealTimeWeb/blockpy
https://ru.wikipedia.org/wiki/App_Inventor
https://en.wikipedia.org/wiki/Blockly

0
pylegoboost/__init__.py Normal file
View File

137
pylegoboost/constants.py Normal file
View File

@ -0,0 +1,137 @@
MOVE_HUB_HARDWARE_HANDLE = 0x0E
MOVE_HUB_HARDWARE_UUID = '00001624-1212-efde-1623-785feabcd123'
DEVICE_NAME = 0x07
ENABLE_NOTIFICATIONS_HANDLE = 0x000f
ENABLE_NOTIFICATIONS_VALUE = b'\x01\x00'
# Ports
PORT_A = 0x37
PORT_B = 0x38
PORT_C = 0x01
PORT_D = 0x02
PORT_AB = 0x39
# Commands for setting RGB LED color
SET_LED_OFF = b'\x08\x00\x81\x32\x11\x51\x00\x00'
SET_LED_PINK = b'\x08\x00\x81\x32\x11\x51\x00\x01'
SET_LED_PURPLE = b'\x08\x00\x81\x32\x11\x51\x00\x02'
SET_LED_BLUE = b'\x08\x00\x81\x32\x11\x51\x00\x03'
SET_LED_LIGHTBLUE = b'\x08\x00\x81\x32\x11\x51\x00\x04'
SET_LED_CYAN = b'\x08\x00\x81\x32\x11\x51\x00\x05'
SET_LED_GREEN = b'\x08\x00\x81\x32\x11\x51\x00\x06'
SET_LED_YELLOW = b'\x08\x00\x81\x32\x11\x51\x00\x07'
SET_LED_ORANGE = b'\x08\x00\x81\x32\x11\x51\x00\x08'
SET_LED_RED = b'\x08\x00\x81\x32\x11\x51\x00\x09'
SET_LED_WHITE = b'\x08\x00\x81\x32\x11\x51\x00\x0A'
SET_LED_COLOR = [SET_LED_OFF,
SET_LED_PINK,
SET_LED_PURPLE,
SET_LED_BLUE,
SET_LED_LIGHTBLUE,
SET_LED_CYAN,
SET_LED_GREEN,
SET_LED_YELLOW,
SET_LED_ORANGE,
SET_LED_RED,
SET_LED_WHITE]
# Colors:
LED_COLORS = ['OFF', 'PINK', 'PURPLE', 'BLUE', 'LIGHTBLUE', 'CYAN', 'GREEN', 'YELLOW', 'ORANGE', 'RED', 'WHITE']
# Motors:
MOTOR_A = bytes([0x37])
MOTOR_B = bytes([0x38])
MOTOR_AB = bytes([0x39])
MOTOR_C = bytes([0x01])
MOTOR_D = bytes([0x02])
# a group of all single motors
MOTORS = [MOTOR_A, MOTOR_B, MOTOR_AB, MOTOR_C, MOTOR_D]
# a group of 1 is silly but there might be other pairs in the future
MOTOR_PAIRS = [MOTOR_AB]
# Commands for Interactive Motors (Timed):
# Motor A, B, C, D: 12-byte commands
# Motor AB: 13-byte commands
MOTOR_TIMED_INI = b'\x0c\x01\x81'
MOTOR_TIMED_MID = b'\x11\x09'
MOTOR_TIMED_END = b'\x64\x7f\x03'
MOTORS_TIMED_INI = b'\x0d\x01\x81'
MOTORS_TIMED_MID = b'\x11\x0A'
MOTORS_TIMED_END = b'\x64\x7f\x03'
# Commands for Interactive Motors (Angle):
# Motor A, B, C, D: 14-byte commands
# Motor AB: 15-byte commands
MOTOR_ANGLE_INI = b'\x0e\x01\x81'
MOTOR_ANGLE_MID = b'\x11\x0b'
MOTOR_ANGLE_END = b'\x64\x7f\x03'
MOTORS_ANGLE_INI = b'\x0f\x01\x81'
MOTORS_ANGLE_MID = b'\x11\x0c'
MOTORS_ANGLE_END = b'\x64\x7f\x03'
# Commands for WeDo Motors (just Duty Cycle):
MOTOR_WEDO_INI = b'\x08\x00\x81'
MOTOR_WEDO_MID = b'\x11\x51\x00'
# Commands for Color Sensor
LISTEN_COLOR_SENSOR_ON_C = b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01'
LISTEN_COLOR_SENSOR_ON_D = b'\x0a\x00\x41\x02\x08\x01\x00\x00\x00\x01'
# Sensor Colors:
COLOR_SENSOR_COLORS = ['BLACK', '', '', 'BLUE', '', 'GREEN', '', 'YELLOW', '', 'RED', 'WHITE']
# Commands for Distance Sensor
LISTEN_DIST_SENSOR_ON_C = b'\x0a\x00\x41\x01\x08\x01\x00\x00\x00\x01'
LISTEN_DIST_SENSOR_ON_D = b'\x0a\x00\x41\x02\x08\x01\x00\x00\x00\x01'
# Commands for Reading Encoders
LISTEN_ENCODER_ON_A = b'\x0a\x00\x41\x37\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_B = b'\x0a\x00\x41\x38\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_C = b'\x0a\x00\x41\x01\x02\x01\x00\x00\x00\x01'
LISTEN_ENCODER_ON_D = b'\x0a\x00\x41\x02\x02\x01\x00\x00\x00\x01'
#
ENCODER_MID = 2147483648
ENCODER_MAX = 4294967296
# Commands for Reading Button
LISTEN_BUTTON = b'\x05\x00\x01\x02\x02'
BUTTON_PRESSED = '\x01'
BUTTON_RELEASED = '\x00'
# Commands for Tilt Sensor
LISTEN_TILT_BASIC = b'\x0a\x00\x41\x3a\x02\x01\x00\x00\x00\x01'
LISTEN_TILT_FULL = b'\x0a\x00\x41\x3a\x00\x01\x00\x00\x00\x01'
TILT_HORIZ = 0x00
TILT_UP = 0x01
TILT_DOWN = 0x02
TILT_RIGHT = 0x03
TILT_LEFT = 0x04
TILT_INVERT = 0x05
TILT_BASIC_VALUES = [TILT_HORIZ, TILT_UP, TILT_DOWN, TILT_RIGHT, TILT_LEFT, TILT_INVERT]
TILT_BASIC_TEXT = ['TILT_HORIZ', 'TILT_UP', 'TILT_DOWN', 'TILT_RIGHT', 'TILT_LEFT', 'TILT_INVERT']
# Commands for WeDo Tilt Sensor
# There ARE more modes, use just this one for now
LISTEN_WEDO_TILT_ON_C = b'\x0a\x00\x41\x01\x00\x01\x00\x00\x00\x01'
LISTEN_WEDO_TILT_ON_D = b'\x0a\x00\x41\x02\x00\x01\x00\x00\x00\x01'
# Commands for WeDo Distance Sensor
# There MIGHT be more modes, use just this one for now
LISTEN_WEDO_DISTANCE_ON_C = b'\x0a\x00\x41\x01\x00\x01\x00\x00\x00\x01'
LISTEN_WEDO_DISTANCE_ON_D = b'\x0a\x00\x41\x02\x00\x01\x00\x00\x00\x01'

139
pylegoboost/transport.py Normal file
View File

@ -0,0 +1,139 @@
import json
import logging
import socket
import traceback
from abc import abstractmethod
from gattlib import DiscoveryService, GATTRequester
from pylegoboost.constants import DEVICE_NAME
log = logging.getLogger('transport')
LEGO_MOVE_HUB = "LEGO Move Hub"
def strtohex(sval):
return " ".join("{:02x}".format(ord(c)) for c in sval)
# noinspection PyMethodOverriding
class Requester(GATTRequester):
def on_notification(self, handle, data):
log.debug("Notification on handle %s: %s", handle, strtohex(data))
def on_indication(self, handle, data):
log.debug("Indication on handle %s: %s", handle, strtohex(data))
class Transport(object):
@abstractmethod
def read(self, handle):
pass
@abstractmethod
def write(self, handle, data):
pass
class BLETransport(Transport):
"""
:type requester: Requester
"""
def __init__(self):
super(Transport, self).__init__()
self.requester = None
def connect(self, bt_iface_name='hci0'):
service = DiscoveryService(bt_iface_name)
while not self.requester:
log.info("Discovering devices using %s...", bt_iface_name)
devices = service.discover(5)
log.debug("Devices: %s", devices)
for address, name in devices.items():
if name == LEGO_MOVE_HUB:
logging.info("Found %s at %s", name, address)
self._get_requester(address, bt_iface_name)
break
log.info("Device declares itself as: %s", self.read(DEVICE_NAME))
return self
def _get_requester(self, address, bt_iface_name):
self.requester = Requester(address, True, bt_iface_name)
def read(self, handle):
log.debug("Reading from: %s", handle)
data = self.requester.read_by_handle(handle)
log.debug("Result: %s", data)
if isinstance(data, list):
data = data[0]
return data
def write(self, handle, data):
log.debug("Writing to %s: %s", handle, data)
return self.requester.write_by_handle(handle, data)
class DebugServer(object):
def __init__(self, ble_trans):
self.sock = socket.socket()
self.ble = ble_trans
def start(self, port=9090):
self.sock.bind(('', port))
self.sock.listen(1)
while True:
conn, addr = self.sock.accept()
try:
self._handle_conn(conn)
finally:
conn.close()
def __del__(self):
self.sock.close()
def _handle_conn(self, conn):
"""
:type conn: socket._socketobject
"""
buf = ""
while True:
data = conn.recv(1024)
log.debug("Recv: %s", data)
if not data:
break
buf += data
if "\n" in buf:
line = buf[:buf.index("\n")]
buf = buf[buf.index("\n") + 1:]
if line:
log.debug("Cmd line: %s", line)
try:
self._handle_cmd(json.loads(line))
except BaseException:
log.error("Failed to handle cmd: %s", traceback.format_exc())
# conn.send(data.upper())
def _handle_cmd(self, line):
pass
class DebugServerTransport(Transport):
def __init__(self):
self.sock = socket.socket()
self.sock.connect(('localhost', 9090))
# sock.send('hello, world!')
# data = sock.recv(1024)
def __del__(self):
self.sock.close()

21
test.py Normal file
View File

@ -0,0 +1,21 @@
import logging
import unittest
from time import sleep
from pylegoboost.constants import *
from pylegoboost.transport import BLETransport
logging.basicConfig(level=logging.DEBUG)
class GeneralTest(unittest.TestCase):
def test_1(self):
transport = BLETransport()
transport.connect()
transport.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE)
transport.write(MOVE_HUB_HARDWARE_HANDLE, LISTEN_DIST_SENSOR_ON_C)
sleep(60)
# from pylegoboost import DebugServer
# srv = DebugServer(None)
# srv.start()