Merge pull request #6 from teintuc/features/refactorisation

Features/refactorisation
This commit is contained in:
Daniel Dakhno 2023-02-23 12:20:29 +01:00 committed by GitHub
commit 242db96937
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 290 additions and 291 deletions

View File

@ -1,14 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# pylint: disable=protected-access
# pylint: disable=no-member
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
from decimal import InvalidContext
import sys
from tracemalloc import start
from urllib import response
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import application_pb2, flipper_pb2, storage_pb2
import serial
import time
import threading import threading
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import flipper_pb2, storage_pb2
from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper
from flipperzero_protobuf_py.cli_helpers import * from flipperzero_protobuf_py.cli_helpers import *
@ -20,7 +18,6 @@ class FlipperAPI():
self.flipper = flipper_serial self.flipper = flipper_serial
self.mutex=threading.Lock() self.mutex=threading.Lock()
def connect(self): def connect(self):
with self.mutex: with self.mutex:
self.proto = ProtoFlipper(self.flipper) self.proto = ProtoFlipper(self.flipper)
@ -29,7 +26,6 @@ class FlipperAPI():
print_hex(self.proto.cmd_system_ping()) print_hex(self.proto.cmd_system_ping())
def _cmd_storage_list_directory(self, path): def _cmd_storage_list_directory(self, path):
cmd_data = storage_pb2.ListRequest() cmd_data = storage_pb2.ListRequest()
cmd_data.path = path cmd_data.path = path
@ -78,7 +74,7 @@ class FlipperAPI():
raise InvalidNameError() raise InvalidNameError()
def list_directory(self, path, additional_data = {}): def list_directory(self, path, additional_data):
with self.mutex: with self.mutex:
self._cmd_storage_list_directory(path) self._cmd_storage_list_directory(path)
@ -121,7 +117,6 @@ class FlipperAPI():
contents.extend(packet.storage_read_response.file.data) contents.extend(packet.storage_read_response.file.data)
if not packet.has_next: if not packet.has_next:
break break
return {'data': contents} return {'data': contents}
def mkdir(self, path): def mkdir(self, path):

191
flipper_fs.py Normal file
View File

@ -0,0 +1,191 @@
#!/usr/bin/env python3
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
# pylint: disable=missing-module-docstring
import errno
import stat
import time
import fuse
import flipper_api
class FlipperZeroFileSystem(fuse.Operations, fuse.LoggingMixIn):
def __init__(self, serial_port) -> None:
super().__init__()
self.api = flipper_api.FlipperAPI(serial_port)
self.api.connect()
self.file_root = {
'type': 'DIR'
}
self._fd = 0
def find_child_by_name(self, parent, child_name):
for child in parent['children']:
if child['name'] == child_name:
return child
raise fuse.FuseOSError(errno.ENOENT)
def get_file_from_parts(self, parent, parts, index):
def list_dir(dir_path):
return self.api.list_directory(dir_path, {'full_path': dir_path, 'parent': parent})
if index <= len(parts):
full_path = f"/{'/'.join(parts[:index])}"
if parent['type'] == 'DIR':
try:
parent['children']
except KeyError:
parent['children'] = list_dir(full_path)
if index == len(parts):
return parent
child = self.find_child_by_name(parent, parts[index])
return self.get_file_from_parts(child, parts, index + 1)
return parent
def get_file_by_path(self, path_full: str):
path = path_full[:]
if path[0] == '/':
path = path[1:]
if path == '':
parts = []
else:
parts = path.split('/')
return self.get_file_from_parts(self.file_root, parts, 0)
def readdir(self, path, fh = None):
# print(f'requested {path}')
parent = self.get_file_by_path(path)
return ['.', '..'] + [child['name'] for child in parent['children']]
def getattr(self, path, fh=None):
file = self.get_file_by_path(path)
try:
return file['attr']
except KeyError:
pass
print(f'getting attr for {path}')
now = time.time()
attr = {
'st_mode': 0o777,
'st_ctime': now,
'st_mtime': now,
'st_atime': now
}
if file['type'] == 'DIR':
attr['st_mode'] |= stat.S_IFDIR
attr['st_nlink'] = 2
else:
response = self.api.stat(path)
attr['st_size'] = response['size']
attr['st_mode'] |= stat.S_IFREG
attr['st_nlink'] = 1
file['attr'] = attr
return attr
def read(self, path, size, offset, fh):
cached = self.get_file_by_path(path)
try:
return bytes(cached['contents'][offset:offset + size])
except KeyError:
pass
data = None
print(f'reading {path}')
data = self.api.read_file_contents(path)['data']
cached['contents'] = data
return bytes(data[offset:offset + size])
def write(self, path, data, offset, fh):
print(f'write file: {path} offset: {offset} length: {len(data)}')
try:
cached = self.get_file_by_path(path)
except OSError:
self.create(path, None)
cached = self.get_file_by_path(path)
cached['contents'][offset:offset] = list(data)
cached['attr']['st_size'] = len(cached['contents'])
self.api.write(path, bytes(cached['contents']))
return len(data)
def open(self, path, flags):
print(f'open {path} {flags}')
self._fd += 1
return self._fd
def get_filename_from_path(self, path):
parts = path[1:].split('/')
return parts[-1]
def get_parent_from_path(self, path):
return path[:-(len(self.get_filename_from_path(path)) + 1)]
def append_to_parend(self, child_path, child):
parent_path = self.get_parent_from_path(child_path)
parent = self.get_file_by_path(parent_path)
child['parent'] = parent
parent['children'].append(child)
def mkdir(self, path, mode):
print(f'mkdir {path}')
self.append_to_parend(path, {
'name': self.get_filename_from_path(path),
'type': 'DIR'
})
self.api.mkdir(path)
return
def rename(self, old, new):
try:
new_file = self.get_file_by_path(new)
new_file['parent']['children'].remove(new_file)
self.api.delete(new, True)
except OSError:
pass
print(f'renaming {old} -> {new}')
cached = self.get_file_by_path(old)
self.api.rename(old, new)
parts = new.split('/')
cached['name'] = parts[-1]
def rmdir(self, path):
self.unlink(path)
def create(self, path, mode, fi=None):
print(f'creating {path}')
self.append_to_parend(path, {
'name': self.get_filename_from_path(path),
'type': 'FILE',
'contents': [],
})
self.api.write(path, bytes())
self._fd += 1
return self._fd
def unlink(self, path):
# print(f'unlinking {path}')
cached = self.get_file_by_path(path)
self.api.delete(path, True)
cached['parent']['children'].remove(cached)

65
flipper_serial.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
# pylint: disable=missing-module-docstring
# pylint: disable=line-too-long
import serial
import serial.tools.list_ports
import serial_ble
class FlipperSerial():
_flipperusb = "USB VID:PID=0483:5740"
_read_characteristic = '19ed82ae-ed21-4c9d-4145-228e61fe0000'
_write_characteristic = '19ed82ae-ed21-4c9d-4145-228e62fe0000'
_is_cli = True
def discover(self):
ports = serial.tools.list_ports.comports()
for check_port in ports:
if self._flipperusb in check_port.hwid:
print("Found: ", check_port.description, "(",check_port.device,")")
return check_port.device
return None
def open(self, **resource):
for key, value in resource.items():
if key == "serial_device" and value is not None:
rsc = self._create_physical_serial(value)
if key == "ble_address" and value is not None:
rsc = self._create_ble_serial(value)
if rsc is None:
raise FlipperSerialException
return rsc
def close(self):
try:
self._serial_device.stop()
print('stopped bluetooth')
except AttributeError:
pass
def _create_physical_serial(self, file):
resource = serial.Serial(file, timeout=1)
resource.baudrate = 230400
resource.flushOutput()
resource.flushInput()
if self._is_cli:
resource.read_until(b'>: ')
resource.write(b"start_rpc_session\r")
resource.read_until(b'\n')
return resource
def _create_ble_serial(self, address):
bluetooth = serial_ble.BLESerial(address, self._read_characteristic, self._write_characteristic)
print('connecting...')
bluetooth.start(None)
print('connected')
return bluetooth
class FlipperSerialException(Exception):
pass

300
fzfs.py
View File

@ -1,35 +1,17 @@
from ast import parse
from audioop import add
import errno
from fileinput import filename
from os import unlink
import pathlib
from signal import signal
from stat import S_IFDIR, ST_ATIME, ST_CTIME, ST_MODE, ST_MTIME, ST_NLINK
from turtle import back
import flipper_api # pylint: disable=missing-class-docstring
import fuse # pylint: disable=missing-function-docstring
import logging # pylint: disable=missing-module-docstring
import time # pylint: disable=line-too-long
import stat
import os
import argparse import argparse
import pathlib import logging
import serial import os
import serial_ble
import serial.tools.list_ports
import sys
flipperusbid = "USB VID:PID=0483:5740" import fuse
def autodiscover(): import flipper_fs
ports = serial.tools.list_ports.comports() import flipper_serial
for check_port in ports:
if flipperusbid in check_port.hwid:
print("Found: ", check_port.description, "(",check_port.device,")")
return check_port.device
return None
def main(): def main():
parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection') parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection')
@ -40,18 +22,14 @@ def main():
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
mountpoint = args.mountpoint flsrl = flipper_serial.FlipperSerial()
if not os.path.isdir(mountpoint): if not os.path.isdir(args.mountpoint) and len(os.listdir(args.mountpoint)) != 0:
print('mountpoint must be an empty folder') print(args.mountpoint, ': mountpoint must be an empty folder')
return
if len(os.listdir(mountpoint)) != 0:
print('mountpoint must be an empty folder')
return return
if args.serial_device is None: if args.serial_device is None:
args.serial_device = autodiscover() args.serial_device = flsrl.discover()
if args.serial_device is None and args.ble_address is None: if args.serial_device is None and args.ble_address is None:
print('either serial_device or ble_address required') print('either serial_device or ble_address required')
@ -61,253 +39,23 @@ def main():
print('only one of serial_device/ble_address required') print('only one of serial_device/ble_address required')
return return
serial_device = None
def create_serial_device():
if args.serial_device is not None:
if not os.path.exists(args.serial_device): if not os.path.exists(args.serial_device):
print('serial device not an actual file') print(args.serial_device,': no such file or directory')
parser.print_usage() parser.print_usage()
exit()
return create_physical_serial(args.serial_device, True)
if args.ble_address is not None:
def disconnect_handler(client):
print('disconnected')
sys.exit(0)
return create_ble_serial(args.ble_address, None)
serial_device = create_serial_device()
if serial_device is None:
print('failed creating serial device')
backend = FlipperZeroFileSysten(serial_device)
fuse_started = True
# fuse_thread = threading.Thread(target=fuse.FUSE, kwargs={'operations': backend, 'mountpoint': mountpoint, 'foreground': True})
def fuse_start():
fuse.FUSE(backend, mountpoint, foreground=True)
print('starting fs...')
fuse_start()
print('fuse stopped')
try:
serial_device.stop()
print('stopped bluetooth')
except AttributeError:
pass
def create_physical_serial(file, is_cli):
s = serial.Serial(file, timeout=1)
s.baudrate = 230400
s.flushOutput()
s.flushInput()
if is_cli:
s.read_until(b'>: ')
s.write(b"start_rpc_session\r")
s.read_until(b'\n')
return s
def create_ble_serial(address, disconnected_handler):
s = serial_ble.BLESerial(address, '19ed82ae-ed21-4c9d-4145-228e61fe0000', '19ed82ae-ed21-4c9d-4145-228e62fe0000')
print('connecting...')
s.start(disconnected_handler)
print('connected')
return s
class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
def __init__(self, serial_port) -> None:
super().__init__()
self.api = flipper_api.FlipperAPI(serial_port)
self.api.connect()
self.file_root = {
'type': 'DIR'
}
self.fd = 0
def find_child_by_name(self, parent, child_name):
for child in parent['children']:
if child['name'] == child_name:
return child
raise fuse.FuseOSError(errno.ENOENT)
def get_file_from_parts(self, parent, parts, index):
def list_dir(dir_path):
return self.api.list_directory(dir_path, {'full_path': dir_path, 'parent': parent})
if index <= len(parts):
full_path = f"/{'/'.join(parts[:index])}"
if parent['type'] == 'DIR':
try:
parent['children']
except KeyError:
parent['children'] = list_dir(full_path)
if index == len(parts):
return parent
child = self.find_child_by_name(parent, parts[index])
return self.get_file_from_parts(child, parts, index + 1)
return parent
def get_file_by_path(self, path_full: str):
path = path_full[:]
if path[0] == '/':
path = path[1:]
if path == '':
parts = []
else:
parts = path.split('/')
return self.get_file_from_parts(self.file_root, parts, 0)
def readdir(self, path, fh = None):
# print(f'requested {path}')
parent = self.get_file_by_path(path)
return ['.', '..'] + [child['name'] for child in parent['children']]
def getattr(self, path, fh=None):
file = self.get_file_by_path(path)
try:
return file['attr']
except KeyError:
pass
print(f'getting attr for {path}')
now = time.time()
attr = {
'st_mode': 0o777,
'st_ctime': now,
'st_mtime': now,
'st_atime': now
}
is_dir = (file['type'] == 'DIR')
if is_dir:
attr['st_mode'] |= stat.S_IFDIR
attr['st_nlink'] = 2
else:
response = self.api.stat(path)
attr['st_size'] = response['size']
attr['st_mode'] |= stat.S_IFREG
attr['st_nlink'] = 1
file['attr'] = attr
return attr
def read(self, path, size, offset, fh):
cached = self.get_file_by_path(path)
try:
return bytes(cached['contents'][offset:offset + size])
except KeyError:
pass
data = None
print(f'reading {path}')
data = self.api.read_file_contents(path)['data']
cached['contents'] = data
return bytes(data[offset:offset + size])
def write(self, path, data, offset, fh):
print(f'write file: {path} offset: {offset} length: {len(data)}')
try:
cached = self.get_file_by_path(path)
except OSError:
self.create(path, None)
cached = self.get_file_by_path(path)
cached['contents'][offset:offset] = list(data)
cached['attr']['st_size'] = len(cached['contents'])
self.api.write(path, bytes(cached['contents']))
return len(data)
def open(self, path, flags):
print(f'open {path} {flags}')
self.fd += 1
return self.fd
def get_filename_from_path(self, path):
parts = path[1:].split('/')
return parts[-1]
def get_parent_from_path(self, path):
return path[:-(len(self.get_filename_from_path(path)) + 1)]
def append_to_parend(self, child_path, child):
parent_path = self.get_parent_from_path(child_path)
parent = self.get_file_by_path(parent_path)
child['parent'] = parent
parent['children'].append(child)
def mkdir(self, path, mode):
print(f'mkdir {path}')
self.append_to_parend(path, {
'name': self.get_filename_from_path(path),
'type': 'DIR'
})
self.api.mkdir(path)
return return
def rename(self, old, new):
try: try:
new_file = self.get_file_by_path(new) serial_device = flsrl.open(serial_device=args.serial_device, ble_address=args.ble_address)
new_file['parent']['children'].remove(new_file) except flipper_serial.FlipperSerialException:
self.api.delete(new, True) print('Failed creating serial device')
except OSError: return
pass
print(f'renaming {old} -> {new}') print('starting fs...')
cached = self.get_file_by_path(old) backend = flipper_fs.FlipperZeroFileSystem(serial_device)
self.api.rename(old, new) fuse.FUSE(backend, args.mountpoint, foreground=True)
parts = new.split('/') print('fuse stopped')
cached['name'] = parts[-1]
def rmdir(self, path):
self.unlink(path)
def create(self, path, mode, fi=None):
print(f'creating {path}')
self.append_to_parend(path, {
'name': self.get_filename_from_path(path),
'type': 'FILE',
'contents': [],
})
self.api.write(path, bytes())
self.fd += 1
return self.fd
def unlink(self, path):
# print(f'unlinking {path}')
cached = self.get_file_by_path(path)
self.api.delete(path, True)
cached['parent']['children'].remove(cached)
flsrl.close()
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)