From 06cfd9c4194f421020822c21ebadd2490ee8d21e Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Tue, 19 Sep 2017 21:48:53 +0300 Subject: [PATCH] Refactoring --- demo.py | 19 +-- pylgbst/__init__.py | 170 +-------------------------- pylgbst/comms.py | 8 +- pylgbst/constants.py | 50 -------- pylgbst/movehub.py | 177 ++++++++++++++++++++++++++++ pylgbst/peripherals.py | 212 ++++++++++++++++++---------------- tests.py | 37 +++--- vernie/__init__.py | 7 +- vernie/android_remote.py | 6 +- vernie/go_towards_light.py | 10 +- vernie/read_typed_commands.py | 2 +- vernie/run_commands_file.py | 4 +- 12 files changed, 339 insertions(+), 363 deletions(-) create mode 100644 pylgbst/movehub.py diff --git a/demo.py b/demo.py index fccaebb..e42b451 100644 --- a/demo.py +++ b/demo.py @@ -1,7 +1,10 @@ # coding=utf-8 +import traceback from time import sleep from pylgbst import * +from pylgbst.comms import DebugServerConnection +from pylgbst.movehub import MoveHub log = logging.getLogger("demo") @@ -73,7 +76,7 @@ def demo_tilt_sensor_simple(movehub): def callback(param1): demo_tilt_sensor_simple.cnt += 1 - log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TILT_STATES[param1]) + log.info("Tilt #%s of %s: %s", demo_tilt_sensor_simple.cnt, limit, TiltSensor.TILT_STATES[param1]) movehub.tilt_sensor.subscribe(callback) while demo_tilt_sensor_simple.cnt < limit: @@ -91,7 +94,7 @@ def demo_tilt_sensor_precise(movehub): demo_tilt_sensor_simple.cnt += 1 log.info("Tilt #%s of %s: roll:%s pitch:%s yaw:%s", demo_tilt_sensor_simple.cnt, limit, pitch, roll, yaw) - movehub.tilt_sensor.subscribe(callback, mode=TILT_MODE_FULL) + movehub.tilt_sensor.subscribe(callback, mode=TiltSensor.MODE_FULL) while demo_tilt_sensor_simple.cnt < limit: time.sleep(1) @@ -169,14 +172,4 @@ if __name__ == '__main__': connection = BLEConnection().connect() hub = MoveHub(connection) - - mtr = hub.motor_A - - # mtr.timed(5, async=True) - # time.sleep(1) - # mtr.stop() - - mtr.test() - time.sleep(3) - mtr.stop() - time.sleep(1) + demo_all(hub) diff --git a/pylgbst/__init__.py b/pylgbst/__init__.py index 5ac375c..6d0e618 100644 --- a/pylgbst/__init__.py +++ b/pylgbst/__init__.py @@ -1,170 +1,2 @@ -from pylgbst.comms import * -from pylgbst.constants import * +from pylgbst.movehub import * from pylgbst.peripherals import * - -log = logging.getLogger('movehub') - - -class MoveHub(object): - """ - :type connection: pylgbst.comms.Connection - :type devices: dict[int,Peripheral] - :type led: LED - :type tilt_sensor: TiltSensor - :type button: Button - :type battery: Battery - :type color_distance_sensor: pylgbst.peripherals.ColorDistanceSensor - :type motor_external: EncodedMotor - :type port_C: Peripheral - :type port_D: Peripheral - :type motor_A: EncodedMotor - :type motor_B: EncodedMotor - :type motor_AB: EncodedMotor - """ - - def __init__(self, connection=None): - if not connection: - connection = BLEConnection() - - self.connection = connection - self.devices = {} - - # shorthand fields - self.button = Button(self) - self.led = None - self.battery = None - self.motor_A = None - self.motor_B = None - self.motor_AB = None - self.tilt_sensor = None - self.color_distance_sensor = None - self.motor_external = None - self.port_C = None - self.port_D = None - - self.connection.set_notify_handler(self._notify) - - self._wait_for_devices() - - def _wait_for_devices(self): - self.connection.write(ENABLE_NOTIFICATIONS_HANDLE, ENABLE_NOTIFICATIONS_VALUE) - - builtin_devices = () - for num in range(0, 60): - builtin_devices = (self.led, self.motor_A, self.motor_B, self.motor_AB, self.tilt_sensor, self.button) - if None not in builtin_devices: - return - log.debug("Waiting for builtin devices to appear: %s", builtin_devices) - time.sleep(1) - log.warning("Got only these devices: %s", builtin_devices) - raise RuntimeError("Failed to obtain all builtin devices") - - def _notify(self, handle, data): - orig = data - - if handle != MOVE_HUB_HARDWARE_HANDLE: - log.warning("Unsupported notification handle: 0x%s", handle) - return - - data = data[3:] - log.debug("Notification on %s: %s", handle, str2hex(orig)) - - msg_type = get_byte(data, 2) - - if msg_type == MSG_PORT_INFO: - self._handle_port_info(data) - elif msg_type == MSG_PORT_STATUS: - self._handle_port_status(data) - elif msg_type == MSG_SENSOR_DATA: - self._handle_sensor_data(data) - elif msg_type == MSG_SENSOR_SUBSCRIBE_ACK: - port = get_byte(data, 3) - log.debug("Sensor subscribe ack on port %s", PORTS[port]) - self.devices[port].finished() - elif msg_type == MSG_PORT_CMD_ERROR: - log.warning("Command error: %s", str2hex(data[3:])) - elif msg_type == MSG_DEVICE_SHUTDOWN: - log.warning("Device reported shutdown: %s", str2hex(data)) - raise KeyboardInterrupt("Device shutdown") - elif msg_type == MSG_DEVICE_INFO: - self._handle_device_info(data) - else: - log.warning("Unhandled msg type 0x%x: %s", msg_type, str2hex(orig)) - - def _handle_device_info(self, data): - if get_byte(data, 3) == 2: - self.button.handle_port_data(data) - else: - log.warning("Unhandled device info: %s", str2hex(data)) - - def _handle_sensor_data(self, data): - port = get_byte(data, 3) - if port not in self.devices: - log.warning("Notification on port with no device: %s", PORTS[port]) - return - - device = self.devices[port] - device.queue_port_data(data) - - def _handle_port_status(self, data): - port = get_byte(data, 3) - status = get_byte(data, 4) - - if status == STATUS_STARTED: - self.devices[port].started() - elif status == STATUS_FINISHED: - self.devices[port].finished() - elif status == STATUS_CONFLICT: - log.warning("Command conflict on port %s", PORTS[port]) - else: - log.warning("Unhandled status value: 0x%x", status) - - def _handle_port_info(self, data): - port = get_byte(data, 3) - dev_type = get_byte(data, 5) - - if port in PORTS and dev_type in DEVICE_TYPES: - log.debug("Device %s at port %s", DEVICE_TYPES[dev_type], PORTS[port]) - else: - log.warning("Device 0x%x at port 0x%x", dev_type, port) - - if dev_type == DEV_MOTOR: - self.devices[port] = EncodedMotor(self, port) - elif dev_type == DEV_IMOTOR: - self.motor_external = EncodedMotor(self, port) - self.devices[port] = self.motor_external - elif dev_type == DEV_DCS: - self.color_distance_sensor = ColorDistanceSensor(self, port) - self.devices[port] = self.color_distance_sensor - elif dev_type == DEV_LED: - self.devices[port] = LED(self, port) - elif dev_type == DEV_TILT_SENSOR: - self.devices[port] = TiltSensor(self, port) - elif dev_type == DEV_BATTERY: - self.devices[port] = Battery(self, port) - else: - log.debug("Unhandled peripheral type 0x%x on port 0x%x", dev_type, port) - self.devices[port] = Peripheral(self, port) - - if port == PORT_A: - self.motor_A = self.devices[port] - elif port == PORT_B: - self.motor_B = self.devices[port] - elif port == PORT_AB: - self.motor_AB = self.devices[port] - elif port == PORT_C: - self.port_C = self.devices[port] - elif port == PORT_D: - self.port_D = self.devices[port] - elif port == PORT_LED: - self.led = self.devices[port] - elif port == PORT_TILT_SENSOR: - self.tilt_sensor = self.devices[port] - elif port == PORT_BATTERY: - self.battery = self.devices[port] - else: - log.debug("Unhandled port: %s", PORTS[port]) - - def shutdown(self): - cmd = pack("') + 1:]) yyy = float(parts[2][parts[2].rfind('>') + 1:]) zzz = float(parts[3][parts[3].rfind('>') + 1:]) @@ -58,7 +58,7 @@ try: message = data except KeyboardInterrupt: raise - except: + except BaseException: break if not message: diff --git a/vernie/go_towards_light.py b/vernie/go_towards_light.py index cd23976..ed72ebf 100644 --- a/vernie/go_towards_light.py +++ b/vernie/go_towards_light.py @@ -9,9 +9,9 @@ criterion = min cur_luminosity = 0 -def on_change_lum(lum): +def on_change_lum(lumn): global cur_luminosity - cur_luminosity = lum + cur_luminosity = lumn lum_values = {} @@ -23,12 +23,12 @@ def on_btn(pressed): running = False -def on_turn(angle): - lum_values[angle] = cur_luminosity +def on_turn(angl): + lum_values[angl] = cur_luminosity robot.button.subscribe(on_btn) -robot.color_distance_sensor.subscribe(on_change_lum, CDS_MODE_LUMINOSITY, granularity=1) +robot.color_distance_sensor.subscribe(on_change_lum, ColorDistanceSensor.LUMINOSITY, granularity=1) robot.motor_A.subscribe(on_turn, granularity=30) # TODO: add bump detect to go back? while running: diff --git a/vernie/read_typed_commands.py b/vernie/read_typed_commands.py index 9782203..70a25e4 100644 --- a/vernie/read_typed_commands.py +++ b/vernie/read_typed_commands.py @@ -5,7 +5,7 @@ robot = Vernie() robot.say('type commands') -def confirmation(cmd): +def confirmation(_): robot.say("ok") diff --git a/vernie/run_commands_file.py b/vernie/run_commands_file.py index 01a800c..2e3d0f6 100644 --- a/vernie/run_commands_file.py +++ b/vernie/run_commands_file.py @@ -5,8 +5,8 @@ robot = Vernie() robot.say("commands from file") -def confirmation(cmd): - robot.say(cmd[0]) +def confirmation(command): + robot.say(command[0]) with open("vernie.commands") as fhd: