diff --git a/flipper_api.py b/flipper_api.py index be8b9c1..836c6db 100644 --- a/flipper_api.py +++ b/flipper_api.py @@ -1,14 +1,12 @@ #!/usr/bin/env python3 +# pylint: disable=protected-access +# pylint: disable=no-member +# pylint: disable=missing-class-docstring +# pylint: disable=missing-function-docstring -from decimal import InvalidContext -import sys -from tracemalloc import start -from urllib import response -from flipperzero_protobuf_py.flipperzero_protobuf_compiled import application_pb2, flipper_pb2, storage_pb2 -import serial -import time import threading +from flipperzero_protobuf_py.flipperzero_protobuf_compiled import flipper_pb2, storage_pb2 from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper from flipperzero_protobuf_py.cli_helpers import * @@ -20,7 +18,6 @@ class FlipperAPI(): self.flipper = flipper_serial self.mutex=threading.Lock() - def connect(self): with self.mutex: self.proto = ProtoFlipper(self.flipper) @@ -29,7 +26,6 @@ class FlipperAPI(): print_hex(self.proto.cmd_system_ping()) - def _cmd_storage_list_directory(self, path): cmd_data = storage_pb2.ListRequest() cmd_data.path = path @@ -78,7 +74,7 @@ class FlipperAPI(): raise InvalidNameError() - def list_directory(self, path, additional_data = {}): + def list_directory(self, path, additional_data): with self.mutex: self._cmd_storage_list_directory(path) @@ -120,8 +116,7 @@ class FlipperAPI(): self.check_response_status(packet) contents.extend(packet.storage_read_response.file.data) if not packet.has_next: - break - + break return {'data': contents} def mkdir(self, path): diff --git a/flipper_fs.py b/flipper_fs.py new file mode 100644 index 0000000..dbd3fef --- /dev/null +++ b/flipper_fs.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 + +# pylint: disable=missing-class-docstring +# pylint: disable=missing-function-docstring +# pylint: disable=missing-module-docstring + +import errno +import stat +import time + +import fuse + +import flipper_api + +class FlipperZeroFileSystem(fuse.Operations, fuse.LoggingMixIn): + def __init__(self, serial_port) -> None: + super().__init__() + self.api = flipper_api.FlipperAPI(serial_port) + self.api.connect() + self.file_root = { + 'type': 'DIR' + } + self._fd = 0 + + def find_child_by_name(self, parent, child_name): + for child in parent['children']: + if child['name'] == child_name: + return child + raise fuse.FuseOSError(errno.ENOENT) + + def get_file_from_parts(self, parent, parts, index): + def list_dir(dir_path): + return self.api.list_directory(dir_path, {'full_path': dir_path, 'parent': parent}) + + if index <= len(parts): + full_path = f"/{'/'.join(parts[:index])}" + + if parent['type'] == 'DIR': + try: + parent['children'] + except KeyError: + parent['children'] = list_dir(full_path) + + if index == len(parts): + return parent + + child = self.find_child_by_name(parent, parts[index]) + + return self.get_file_from_parts(child, parts, index + 1) + + return parent + + def get_file_by_path(self, path_full: str): + path = path_full[:] + if path[0] == '/': + path = path[1:] + if path == '': + parts = [] + else: + parts = path.split('/') + + return self.get_file_from_parts(self.file_root, parts, 0) + + def readdir(self, path, fh = None): + # print(f'requested {path}') + + parent = self.get_file_by_path(path) + + return ['.', '..'] + [child['name'] for child in parent['children']] + + def getattr(self, path, fh=None): + file = self.get_file_by_path(path) + + try: + return file['attr'] + except KeyError: + pass + + print(f'getting attr for {path}') + + now = time.time() + + attr = { + 'st_mode': 0o777, + 'st_ctime': now, + 'st_mtime': now, + 'st_atime': now + } + + if file['type'] == 'DIR': + attr['st_mode'] |= stat.S_IFDIR + attr['st_nlink'] = 2 + else: + response = self.api.stat(path) + attr['st_size'] = response['size'] + attr['st_mode'] |= stat.S_IFREG + attr['st_nlink'] = 1 + + file['attr'] = attr + return attr + + def read(self, path, size, offset, fh): + cached = self.get_file_by_path(path) + + try: + return bytes(cached['contents'][offset:offset + size]) + except KeyError: + pass + + data = None + + print(f'reading {path}') + + data = self.api.read_file_contents(path)['data'] + + cached['contents'] = data + return bytes(data[offset:offset + size]) + + def write(self, path, data, offset, fh): + print(f'write file: {path} offset: {offset} length: {len(data)}') + try: + cached = self.get_file_by_path(path) + except OSError: + self.create(path, None) + cached = self.get_file_by_path(path) + + cached['contents'][offset:offset] = list(data) + cached['attr']['st_size'] = len(cached['contents']) + self.api.write(path, bytes(cached['contents'])) + return len(data) + + def open(self, path, flags): + print(f'open {path} {flags}') + self._fd += 1 + return self._fd + + def get_filename_from_path(self, path): + parts = path[1:].split('/') + return parts[-1] + + def get_parent_from_path(self, path): + return path[:-(len(self.get_filename_from_path(path)) + 1)] + + def append_to_parend(self, child_path, child): + parent_path = self.get_parent_from_path(child_path) + parent = self.get_file_by_path(parent_path) + child['parent'] = parent + parent['children'].append(child) + + def mkdir(self, path, mode): + print(f'mkdir {path}') + self.append_to_parend(path, { + 'name': self.get_filename_from_path(path), + 'type': 'DIR' + }) + self.api.mkdir(path) + return + + def rename(self, old, new): + try: + new_file = self.get_file_by_path(new) + new_file['parent']['children'].remove(new_file) + self.api.delete(new, True) + except OSError: + pass + + print(f'renaming {old} -> {new}') + cached = self.get_file_by_path(old) + self.api.rename(old, new) + parts = new.split('/') + cached['name'] = parts[-1] + + def rmdir(self, path): + self.unlink(path) + + def create(self, path, mode, fi=None): + print(f'creating {path}') + self.append_to_parend(path, { + 'name': self.get_filename_from_path(path), + 'type': 'FILE', + 'contents': [], + }) + self.api.write(path, bytes()) + self._fd += 1 + return self._fd + + def unlink(self, path): + # print(f'unlinking {path}') + cached = self.get_file_by_path(path) + self.api.delete(path, True) + cached['parent']['children'].remove(cached) diff --git a/flipper_serial.py b/flipper_serial.py new file mode 100644 index 0000000..feb8722 --- /dev/null +++ b/flipper_serial.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +# pylint: disable=missing-class-docstring +# pylint: disable=missing-function-docstring +# pylint: disable=missing-module-docstring +# pylint: disable=line-too-long + + +import serial +import serial.tools.list_ports +import serial_ble + +class FlipperSerial(): + _flipperusb = "USB VID:PID=0483:5740" + _read_characteristic = '19ed82ae-ed21-4c9d-4145-228e61fe0000' + _write_characteristic = '19ed82ae-ed21-4c9d-4145-228e62fe0000' + _is_cli = True + + def discover(self): + ports = serial.tools.list_ports.comports() + for check_port in ports: + if self._flipperusb in check_port.hwid: + print("Found: ", check_port.description, "(",check_port.device,")") + return check_port.device + return None + + def open(self, **resource): + for key, value in resource.items(): + if key == "serial_device" and value is not None: + rsc = self._create_physical_serial(value) + if key == "ble_address" and value is not None: + rsc = self._create_ble_serial(value) + + if rsc is None: + raise FlipperSerialException + return rsc + + def close(self): + try: + self._serial_device.stop() + print('stopped bluetooth') + except AttributeError: + pass + + def _create_physical_serial(self, file): + resource = serial.Serial(file, timeout=1) + resource.baudrate = 230400 + resource.flushOutput() + resource.flushInput() + if self._is_cli: + resource.read_until(b'>: ') + resource.write(b"start_rpc_session\r") + resource.read_until(b'\n') + return resource + + def _create_ble_serial(self, address): + bluetooth = serial_ble.BLESerial(address, self._read_characteristic, self._write_characteristic) + print('connecting...') + bluetooth.start(None) + print('connected') + + return bluetooth + +class FlipperSerialException(Exception): + pass diff --git a/fzfs.py b/fzfs.py index ba48207..52a7878 100644 --- a/fzfs.py +++ b/fzfs.py @@ -1,35 +1,17 @@ -from ast import parse -from audioop import add -import errno -from fileinput import filename -from os import unlink -import pathlib -from signal import signal -from stat import S_IFDIR, ST_ATIME, ST_CTIME, ST_MODE, ST_MTIME, ST_NLINK -from turtle import back -import flipper_api -import fuse -import logging -import time -import stat -import os +# pylint: disable=missing-class-docstring +# pylint: disable=missing-function-docstring +# pylint: disable=missing-module-docstring +# pylint: disable=line-too-long + import argparse -import pathlib -import serial -import serial_ble -import serial.tools.list_ports -import sys +import logging +import os -flipperusbid = "USB VID:PID=0483:5740" +import fuse -def autodiscover(): - ports = serial.tools.list_ports.comports() - for check_port in ports: - if flipperusbid in check_port.hwid: - print("Found: ", check_port.description, "(",check_port.device,")") - return check_port.device - return None +import flipper_fs +import flipper_serial def main(): parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection') @@ -40,18 +22,14 @@ def main(): logging.basicConfig(level=logging.DEBUG) - mountpoint = args.mountpoint + flsrl = flipper_serial.FlipperSerial() - if not os.path.isdir(mountpoint): - print('mountpoint must be an empty folder') - return - - if len(os.listdir(mountpoint)) != 0: - print('mountpoint must be an empty folder') + if not os.path.isdir(args.mountpoint) and len(os.listdir(args.mountpoint)) != 0: + print(args.mountpoint, ': mountpoint must be an empty folder') return if args.serial_device is None: - args.serial_device = autodiscover() + args.serial_device = flsrl.discover() if args.serial_device is None and args.ble_address is None: print('either serial_device or ble_address required') @@ -61,254 +39,24 @@ def main(): print('only one of serial_device/ble_address required') return - serial_device = None - - def create_serial_device(): - if args.serial_device is not None: - if not os.path.exists(args.serial_device): - print('serial device not an actual file') - parser.print_usage() - exit() - - return create_physical_serial(args.serial_device, True) - if args.ble_address is not None: - def disconnect_handler(client): - print('disconnected') - sys.exit(0) - - return create_ble_serial(args.ble_address, None) - - serial_device = create_serial_device() - - if serial_device is None: - print('failed creating serial device') - - backend = FlipperZeroFileSysten(serial_device) - - fuse_started = True - # fuse_thread = threading.Thread(target=fuse.FUSE, kwargs={'operations': backend, 'mountpoint': mountpoint, 'foreground': True}) - def fuse_start(): - fuse.FUSE(backend, mountpoint, foreground=True) - - print('starting fs...') - fuse_start() - print('fuse stopped') + if not os.path.exists(args.serial_device): + print(args.serial_device,': no such file or directory') + parser.print_usage() + return try: - serial_device.stop() - print('stopped bluetooth') - except AttributeError: - pass - - -def create_physical_serial(file, is_cli): - s = serial.Serial(file, timeout=1) - s.baudrate = 230400 - s.flushOutput() - s.flushInput() - if is_cli: - s.read_until(b'>: ') - s.write(b"start_rpc_session\r") - s.read_until(b'\n') - return s - -def create_ble_serial(address, disconnected_handler): - s = serial_ble.BLESerial(address, '19ed82ae-ed21-4c9d-4145-228e61fe0000', '19ed82ae-ed21-4c9d-4145-228e62fe0000') - print('connecting...') - s.start(disconnected_handler) - print('connected') - - return s - - - -class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn): - def __init__(self, serial_port) -> None: - super().__init__() - self.api = flipper_api.FlipperAPI(serial_port) - self.api.connect() - self.file_root = { - 'type': 'DIR' - } - self.fd = 0 - - def find_child_by_name(self, parent, child_name): - for child in parent['children']: - if child['name'] == child_name: - return child - raise fuse.FuseOSError(errno.ENOENT) - - def get_file_from_parts(self, parent, parts, index): - def list_dir(dir_path): - return self.api.list_directory(dir_path, {'full_path': dir_path, 'parent': parent}) - - if index <= len(parts): - full_path = f"/{'/'.join(parts[:index])}" - - if parent['type'] == 'DIR': - try: - parent['children'] - except KeyError: - parent['children'] = list_dir(full_path) - - if index == len(parts): - return parent - - child = self.find_child_by_name(parent, parts[index]) - - return self.get_file_from_parts(child, parts, index + 1) - - return parent - - - def get_file_by_path(self, path_full: str): - path = path_full[:] - if path[0] == '/': - path = path[1:] - if path == '': - parts = [] - else: - parts = path.split('/') - - return self.get_file_from_parts(self.file_root, parts, 0) - - def readdir(self, path, fh = None): - # print(f'requested {path}') - - parent = self.get_file_by_path(path) - - return ['.', '..'] + [child['name'] for child in parent['children']] - - def getattr(self, path, fh=None): - file = self.get_file_by_path(path) - - try: - return file['attr'] - except KeyError: - pass - - print(f'getting attr for {path}') - - now = time.time() - - attr = { - 'st_mode': 0o777, - 'st_ctime': now, - 'st_mtime': now, - 'st_atime': now - } - - is_dir = (file['type'] == 'DIR') - - if is_dir: - attr['st_mode'] |= stat.S_IFDIR - attr['st_nlink'] = 2 - else: - response = self.api.stat(path) - attr['st_size'] = response['size'] - attr['st_mode'] |= stat.S_IFREG - attr['st_nlink'] = 1 - - - file['attr'] = attr - - return attr - - def read(self, path, size, offset, fh): - cached = self.get_file_by_path(path) - - try: - return bytes(cached['contents'][offset:offset + size]) - except KeyError: - pass - - data = None - - print(f'reading {path}') - - data = self.api.read_file_contents(path)['data'] - - cached['contents'] = data - return bytes(data[offset:offset + size]) - - def write(self, path, data, offset, fh): - print(f'write file: {path} offset: {offset} length: {len(data)}') - try: - cached = self.get_file_by_path(path) - except OSError: - self.create(path, None) - cached = self.get_file_by_path(path) - - cached['contents'][offset:offset] = list(data) - cached['attr']['st_size'] = len(cached['contents']) - self.api.write(path, bytes(cached['contents'])) - return len(data) - - - def open(self, path, flags): - print(f'open {path} {flags}') - self.fd += 1 - return self.fd - - - def get_filename_from_path(self, path): - parts = path[1:].split('/') - return parts[-1] - - def get_parent_from_path(self, path): - return path[:-(len(self.get_filename_from_path(path)) + 1)] - - def append_to_parend(self, child_path, child): - parent_path = self.get_parent_from_path(child_path) - parent = self.get_file_by_path(parent_path) - child['parent'] = parent - parent['children'].append(child) - - def mkdir(self, path, mode): - print(f'mkdir {path}') - self.append_to_parend(path, { - 'name': self.get_filename_from_path(path), - 'type': 'DIR' - }) - self.api.mkdir(path) + serial_device = flsrl.open(serial_device=args.serial_device, ble_address=args.ble_address) + except flipper_serial.FlipperSerialException: + print('Failed creating serial device') return - - def rename(self, old, new): - try: - new_file = self.get_file_by_path(new) - new_file['parent']['children'].remove(new_file) - self.api.delete(new, True) - except OSError: - pass - - print(f'renaming {old} -> {new}') - cached = self.get_file_by_path(old) - self.api.rename(old, new) - parts = new.split('/') - cached['name'] = parts[-1] - - def rmdir(self, path): - self.unlink(path) - - def create(self, path, mode, fi=None): - print(f'creating {path}') - self.append_to_parend(path, { - 'name': self.get_filename_from_path(path), - 'type': 'FILE', - 'contents': [], - }) - self.api.write(path, bytes()) - self.fd += 1 - return self.fd - - def unlink(self, path): - # print(f'unlinking {path}') - cached = self.get_file_by_path(path) - self.api.delete(path, True) - cached['parent']['children'].remove(cached) + print('starting fs...') + backend = flipper_fs.FlipperZeroFileSystem(serial_device) + fuse.FUSE(backend, args.mountpoint, foreground=True) + print('fuse stopped') + flsrl.close() if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - main() \ No newline at end of file + main()