1
0
mirror of https://github.com/undera/pylgbst.git synced 2020-11-18 19:37:26 -08:00

Add some text-to-speech into vernie demo script

This commit is contained in:
Andrey Pohilko 2017-09-16 14:52:37 +03:00
parent 185c33cd26
commit 8560b7df9b
5 changed files with 57 additions and 29 deletions

View File

@ -33,10 +33,8 @@ sudo python -c "from pylgbst.comms import *; import logging; logging.basicConfig
## Roadmap
- make notifications from pygattlib to quickly get into parallel thread, to avoid segfaults
- support working with push button
- experiment with motor commands, find what is hidden there
- Give nice documentation examples, don't forget to mention logging
- make angled motors to be synchronous by default => 3-state status
- make sure unit tests cover all important code
- generalize getting device info + give constants (low priority)

24
demo.py
View File

@ -1,6 +1,8 @@
# coding=utf-8
from time import sleep
from pylgbst import *
from vernie import say
log = logging.getLogger("demo")
@ -157,11 +159,6 @@ def demo_all(movehub):
demo_motor_sensors(movehub)
def cb_log(val1, val2=None, val3=None):
pass
# log.info("V1:%s\tV2:%s\tV3:%s", unpack("<H", val1), val2, val3)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
@ -171,12 +168,15 @@ if __name__ == '__main__':
logging.warning("Failed to use debug server: %s", traceback.format_exc())
connection = BLEConnection().connect()
hub = MoveHub(connection)
hub.button.subscribe(cb_log, 0x00, granularity=1)
sleep(10)
# demo_port_cd_motor(hub)
sleep(10)
#hub.button.unsubscribe(cb_log)
sleep(10)
def cb_log(val1, val2=None, val3=None):
log.info("V1:%s\tV2:%s\tV3:%s", val1, val2, val3)
hub = MoveHub(connection)
hub.color_distance_sensor.subscribe(cb_log, CDS_MODE_STREAM_3_VALUES, granularity=3)
sleep(60)
# hub.motor_AB.timed(10, 0.1, async=True)
# sleep(1)
# hub.motor_AB.timed(0, 0)
# demo_all(hub)

View File

@ -114,7 +114,6 @@ class MoveHub(object):
self.devices[port].finished()
elif status == STATUS_CONFLICT:
log.warning("Command conflict on port %s", PORTS[port])
self.devices[port].finished()
else:
log.warning("Unhandled status value: 0x%x", status)
@ -142,7 +141,7 @@ class MoveHub(object):
elif dev_type == DEV_BATTERY:
self.devices[port] = Battery(self, port)
else:
log.warning("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
log.debug("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port)
self.devices[port] = Peripheral(self, port)
if port == PORT_A:

View File

@ -143,10 +143,7 @@ class EncodedMotor(Peripheral):
# movement type
command = self.TIMED_GROUP if self.port == PORT_AB else self.TIMED_SINGLE
# time
msec = int(seconds * 1000)
if msec >= pow(2, 16):
raise ValueError("Too large value for seconds: %s", seconds)
command += pack('<H', msec)
command += pack('<H', int(seconds * 1000))
self.started()
self._wrap_and_write(command, speed_primary, speed_secondary)
@ -279,10 +276,16 @@ class Battery(Peripheral):
super(Battery, self).__init__(parent, port)
self.last_value = None
def subscribe(self, callback, mode=0, granularity=1):
super(Battery, self).subscribe(callback, mode, granularity)
# we know only voltage subscription from it. is it really battery or just onboard voltage?
# device has turned off on 1b0e000600453ba800
# device has turned off on 1b0e000600453ba800 - 168d
# moderate 9v ~= 3840
# good 7.5v ~= 3892
# liion 5v ~= 2100
def handle_port_data(self, data):
self.last_value = unpack("<H", data[4:6])[0]
self.last_value = unpack("<h", data[4:6])[0]
self._notify_subscribers(self.last_value)

View File

@ -1,3 +1,11 @@
# coding=utf-8
import hashlib
import os
import re
import subprocess
import gtts
from pylgbst import *
forward = FORWARD = right = RIGHT = 1
@ -5,13 +13,30 @@ backward = BACKWARD = left = LEFT = -1
straight = STRAIGHT = 0
def say(text):
if isinstance(text, str):
text = text.decode("utf-8")
md5 = hashlib.md5(text.encode('utf-8')).hexdigest()
fname = "/tmp/%s.mp3" % md5
if not os.path.exists(fname):
myre = re.compile('[[A-Za-z]', re.UNICODE)
lang = 'en' if myre.match(text) else 'ru'
logging.getLogger('requests').setLevel(logging.getLogger('').getEffectiveLevel())
tts = gtts.gTTS(text=text, lang=lang, slow=False)
tts.save(fname)
with open(os.devnull, 'w') as fnull:
subprocess.call("mplayer %s" % fname, shell=True, stderr=fnull, stdout=fnull)
class Vernie(MoveHub):
def __init__(self):
try:
conn = DebugServerConnection()
except BaseException:
logging.debug("Failed to use debug server: %s", traceback.format_exc())
conn = BLEConnection().connect()
conn = BLEConnection().connect()
super(Vernie, self).__init__(conn)
@ -42,8 +67,7 @@ class Vernie(MoveHub):
log.debug("Color & Distance data: %s %s", COLORS[color], distance)
self._sensor_distance = distance
self._color_detected = color
if self._color_detected != COLOR_NONE:
self.led.set_color(self._color_detected)
say(COLORS[self._color_detected])
def _reset_head(self):
self.motor_external.timed(1, -0.2)
@ -66,8 +90,7 @@ class Vernie(MoveHub):
self.head(STRAIGHT, speed=0.5)
self.motor_AB.angled(distance * 450, speed * direction, speed * direction)
def program(self):
# while True:
def program_carton_board(self):
self.move(FORWARD)
self.move(FORWARD)
self.turn(RIGHT)
@ -89,9 +112,14 @@ class Vernie(MoveHub):
self.move(BACKWARD, 2)
# TODO: distance sensor game
# TODO: find and follow the lightest direction game
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
comms.log.setLevel(logging.INFO)
vernie = Vernie()
vernie.program()
say("Робот Веернии 01 готов к работе")
say("Вправо-влево, назад-вперед")
# vernie.program()