Add a serial class

This commit is contained in:
Charles Teinturier 2023-02-03 16:09:40 +01:00
parent 84cc4600f2
commit 5d96d94c63
2 changed files with 73 additions and 57 deletions

65
flipper_serial.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
# pylint: disable=missing-module-docstring
# pylint: disable=line-too-long
import serial
import serial.tools.list_ports
import serial_ble
class FlipperSerial():
_flipperusb = "USB VID:PID=0483:5740"
_read_characteristic = '19ed82ae-ed21-4c9d-4145-228e61fe0000'
_write_characteristic = '19ed82ae-ed21-4c9d-4145-228e62fe0000'
_is_cli = True
def discover(self):
ports = serial.tools.list_ports.comports()
for check_port in ports:
if self._flipperusb in check_port.hwid:
print("Found: ", check_port.description, "(",check_port.device,")")
return check_port.device
return None
def open(self, **resource):
for key, value in resource.items():
if key == "serial_device" and value is not None:
rsc = self._create_physical_serial(value)
if key == "ble_address" and value is not None:
rsc = self._create_ble_serial(value)
if rsc is None:
raise FlipperSerialException
return rsc
def close(self):
try:
self._serial_device.stop()
print('stopped bluetooth')
except AttributeError:
pass
def _create_physical_serial(self, file):
resource = serial.Serial(file, timeout=1)
resource.baudrate = 230400
resource.flushOutput()
resource.flushInput()
if self._is_cli:
resource.read_until(b'>: ')
resource.write(b"start_rpc_session\r")
resource.read_until(b'\n')
return resource
def _create_ble_serial(self, address):
bluetooth = serial_ble.BLESerial(address, self._read_characteristic, self._write_characteristic)
print('connecting...')
bluetooth.start(None)
print('connected')
return bluetooth
class FlipperSerialException(Exception):
pass

65
fzfs.py
View File

@ -21,15 +21,8 @@ import serial_ble
import serial.tools.list_ports
import sys
flipperusbid = "USB VID:PID=0483:5740"
def autodiscover():
ports = serial.tools.list_ports.comports()
for check_port in ports:
if flipperusbid in check_port.hwid:
print("Found: ", check_port.description, "(",check_port.device,")")
return check_port.device
return None
import flipper_serial
def main():
parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection')
@ -45,13 +38,14 @@ def main():
if not os.path.isdir(mountpoint):
print('mountpoint must be an empty folder')
return
flsrl = flipper_serial.FlipperSerial()
if len(os.listdir(mountpoint)) != 0:
print('mountpoint must be an empty folder')
return
if args.serial_device is None:
args.serial_device = autodiscover()
args.serial_device = flsrl.discover()
if args.serial_device is None and args.ble_address is None:
print('either serial_device or ble_address required')
@ -61,29 +55,10 @@ def main():
print('only one of serial_device/ble_address required')
return
serial_device = None
def create_serial_device():
if args.serial_device is not None:
if not os.path.exists(args.serial_device):
print('serial device not an actual file')
parser.print_usage()
exit()
return create_physical_serial(args.serial_device, True)
if args.ble_address is not None:
def disconnect_handler(client):
print('disconnected')
sys.exit(0)
return create_ble_serial(args.ble_address, None)
serial_device = create_serial_device()
if serial_device is None:
print('failed creating serial device')
backend = FlipperZeroFileSysten(serial_device)
if not os.path.exists(args.serial_device):
print(args.serial_device,': no such file or directory')
parser.print_usage()
return
fuse_started = True
# fuse_thread = threading.Thread(target=fuse.FUSE, kwargs={'operations': backend, 'mountpoint': mountpoint, 'foreground': True})
@ -94,31 +69,7 @@ def main():
fuse_start()
print('fuse stopped')
try:
serial_device.stop()
print('stopped bluetooth')
except AttributeError:
pass
def create_physical_serial(file, is_cli):
s = serial.Serial(file, timeout=1)
s.baudrate = 230400
s.flushOutput()
s.flushInput()
if is_cli:
s.read_until(b'>: ')
s.write(b"start_rpc_session\r")
s.read_until(b'\n')
return s
def create_ble_serial(address, disconnected_handler):
s = serial_ble.BLESerial(address, '19ed82ae-ed21-4c9d-4145-228e61fe0000', '19ed82ae-ed21-4c9d-4145-228e62fe0000')
print('connecting...')
s.start(disconnected_handler)
print('connected')
return bluetooth
flsrl.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)