diff --git a/examples/sorter/__init__.py b/examples/sorter/__init__.py new file mode 100644 index 0000000..3bdecd5 --- /dev/null +++ b/examples/sorter/__init__.py @@ -0,0 +1,89 @@ +import logging +import traceback + +from pylgbst import MoveHub, COLORS, COLOR_RED, COLOR_YELLOW, COLOR_CYAN, COLOR_BLUE, COLOR_BLACK +from pylgbst.comms import DebugServerConnection, BLEConnection + + +class ColorSorter(MoveHub): + positions = [COLOR_YELLOW, COLOR_BLUE, COLOR_CYAN, COLOR_RED] + + def __init__(self, connection=None): + super(ColorSorter, self).__init__(connection) + self.position = len(self.positions) + self.color = 0 + self.distance = 10 + self._last_wheel_dir = 1 + self.color_distance_sensor.subscribe(self.on_color) + + def on_color(self, colr, dist): + if colr not in (COLOR_BLACK,) or dist < 2.5: + logging.debug("%s %s", COLORS[colr], dist) + self.color = colr + self.distance = dist + + def feed(self): + self.motor_A.angled(1080) # 1080 was true + + def move_to_bucket(self, color): + logging.info("Bucket: %s", COLORS[color]) + if color in self.positions: + newpos = self.positions.index(color) + else: + newpos = len(self.positions) + + if newpos == self.position: + return + + offset = newpos - self.position + wheel_dir = offset / abs(offset) + + if wheel_dir != self._last_wheel_dir: + self.motor_B.angled(270 * wheel_dir) + + self.motor_B.angled(offset * 360 * 15) + + self.position = newpos + self._last_wheel_dir = wheel_dir + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + + try: + conn = DebugServerConnection() + except BaseException: + logging.warning("Failed to use debug server: %s", traceback.format_exc()) + conn = BLEConnection().connect() + + queue = [None for _ in range(0, 1)] + + sorter = ColorSorter(conn) + empty = 0 + try: + while True: + empty += 1 + item = (sorter.color, sorter.distance) # read once + # logging.info("%s %.2f", COLORS[item[0]], item[1]) + if item[1] <= 5.0: + logging.info("Detected: %s", COLORS[item[0]]) + queue.append(item) + empty = 0 + else: + queue.append(None) + + logging.debug("%s", [COLORS[x[0]] if x else None for x in queue]) + + last = queue.pop(0) + if last: + sorter.move_to_bucket(last[0]) + empty = 0 + + if empty > 20: + sorter.move_to_bucket(COLOR_BLACK) + + sorter.feed() + finally: + if not sorter.motor_B.in_progress(): + sorter.move_to_bucket(COLOR_BLACK) + sorter.motor_AB.stop(async=True)