diff --git a/flipper_serial.py b/flipper_serial.py new file mode 100644 index 0000000..feb8722 --- /dev/null +++ b/flipper_serial.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +# pylint: disable=missing-class-docstring +# pylint: disable=missing-function-docstring +# pylint: disable=missing-module-docstring +# pylint: disable=line-too-long + + +import serial +import serial.tools.list_ports +import serial_ble + +class FlipperSerial(): + _flipperusb = "USB VID:PID=0483:5740" + _read_characteristic = '19ed82ae-ed21-4c9d-4145-228e61fe0000' + _write_characteristic = '19ed82ae-ed21-4c9d-4145-228e62fe0000' + _is_cli = True + + def discover(self): + ports = serial.tools.list_ports.comports() + for check_port in ports: + if self._flipperusb in check_port.hwid: + print("Found: ", check_port.description, "(",check_port.device,")") + return check_port.device + return None + + def open(self, **resource): + for key, value in resource.items(): + if key == "serial_device" and value is not None: + rsc = self._create_physical_serial(value) + if key == "ble_address" and value is not None: + rsc = self._create_ble_serial(value) + + if rsc is None: + raise FlipperSerialException + return rsc + + def close(self): + try: + self._serial_device.stop() + print('stopped bluetooth') + except AttributeError: + pass + + def _create_physical_serial(self, file): + resource = serial.Serial(file, timeout=1) + resource.baudrate = 230400 + resource.flushOutput() + resource.flushInput() + if self._is_cli: + resource.read_until(b'>: ') + resource.write(b"start_rpc_session\r") + resource.read_until(b'\n') + return resource + + def _create_ble_serial(self, address): + bluetooth = serial_ble.BLESerial(address, self._read_characteristic, self._write_characteristic) + print('connecting...') + bluetooth.start(None) + print('connected') + + return bluetooth + +class FlipperSerialException(Exception): + pass diff --git a/fzfs.py b/fzfs.py index 8c73e75..e71fd5a 100644 --- a/fzfs.py +++ b/fzfs.py @@ -21,15 +21,8 @@ import serial_ble import serial.tools.list_ports import sys -flipperusbid = "USB VID:PID=0483:5740" -def autodiscover(): - ports = serial.tools.list_ports.comports() - for check_port in ports: - if flipperusbid in check_port.hwid: - print("Found: ", check_port.description, "(",check_port.device,")") - return check_port.device - return None +import flipper_serial def main(): parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection') @@ -45,13 +38,14 @@ def main(): if not os.path.isdir(mountpoint): print('mountpoint must be an empty folder') return + flsrl = flipper_serial.FlipperSerial() if len(os.listdir(mountpoint)) != 0: print('mountpoint must be an empty folder') return if args.serial_device is None: - args.serial_device = autodiscover() + args.serial_device = flsrl.discover() if args.serial_device is None and args.ble_address is None: print('either serial_device or ble_address required') @@ -61,29 +55,10 @@ def main(): print('only one of serial_device/ble_address required') return - serial_device = None - - def create_serial_device(): - if args.serial_device is not None: - if not os.path.exists(args.serial_device): - print('serial device not an actual file') - parser.print_usage() - exit() - - return create_physical_serial(args.serial_device, True) - if args.ble_address is not None: - def disconnect_handler(client): - print('disconnected') - sys.exit(0) - - return create_ble_serial(args.ble_address, None) - - serial_device = create_serial_device() - - if serial_device is None: - print('failed creating serial device') - - backend = FlipperZeroFileSysten(serial_device) + if not os.path.exists(args.serial_device): + print(args.serial_device,': no such file or directory') + parser.print_usage() + return fuse_started = True # fuse_thread = threading.Thread(target=fuse.FUSE, kwargs={'operations': backend, 'mountpoint': mountpoint, 'foreground': True}) @@ -94,31 +69,7 @@ def main(): fuse_start() print('fuse stopped') - try: - serial_device.stop() - print('stopped bluetooth') - except AttributeError: - pass - - -def create_physical_serial(file, is_cli): - s = serial.Serial(file, timeout=1) - s.baudrate = 230400 - s.flushOutput() - s.flushInput() - if is_cli: - s.read_until(b'>: ') - s.write(b"start_rpc_session\r") - s.read_until(b'\n') - return s - -def create_ble_serial(address, disconnected_handler): - s = serial_ble.BLESerial(address, '19ed82ae-ed21-4c9d-4145-228e61fe0000', '19ed82ae-ed21-4c9d-4145-228e62fe0000') - print('connecting...') - s.start(disconnected_handler) - print('connected') - - return bluetooth + flsrl.close() if __name__ == '__main__': logging.basicConfig(level=logging.INFO)