mirror of
https://github.com/dakhnod/fzfs.git
synced 2025-05-15 23:00:10 -07:00
initial commit
This commit is contained in:
commit
f0e2989d51
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
.vscode/
|
||||||
|
venv/
|
||||||
|
__pycache__/
|
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[submodule "flipperzero_protobuf_py"]
|
||||||
|
path = flipperzero_protobuf_py
|
||||||
|
url = git@github.com:flipperdevices/flipperzero_protobuf_py.git
|
26
README.md
Normal file
26
README.md
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
# Flipper Zero filesystem driver
|
||||||
|
|
||||||
|
This driver allows you to mount the flipper zero over its serial connection and manage it like a regular mass storage.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```
|
||||||
|
git clone git@github.com:dakhnod/fzfs.git
|
||||||
|
cd fzfs
|
||||||
|
git clone git@github.com:flipperdevices/flipperzero_protobuf_py.git
|
||||||
|
python3 -m venv venv
|
||||||
|
. venv/bin/activate
|
||||||
|
pip install protobuf
|
||||||
|
pip install fusepy
|
||||||
|
pip install pyserial
|
||||||
|
```
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
The script takes two arguments, the serial port and the mount point
|
||||||
|
|
||||||
|
```
|
||||||
|
venv/bin/python3 fzfs.py /dev/ttyACM0 /home/user/flipper-zero
|
||||||
|
```
|
||||||
|
|
||||||
|
Then you should be able to access your flipper files through file browser of the console in the mountpoint.
|
168
flipper_api.py
Normal file
168
flipper_api.py
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from decimal import InvalidContext
|
||||||
|
import sys
|
||||||
|
from tracemalloc import start
|
||||||
|
from urllib import response
|
||||||
|
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import application_pb2, flipper_pb2, storage_pb2
|
||||||
|
import serial
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper
|
||||||
|
from flipperzero_protobuf_py.cli_helpers import *
|
||||||
|
|
||||||
|
|
||||||
|
class FlipperAPI():
|
||||||
|
def __init__(self, serial_port) -> None:
|
||||||
|
self.serial_port = serial_port
|
||||||
|
self.proto = None
|
||||||
|
self.flipper = None
|
||||||
|
self.mutex=threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
with self.mutex:
|
||||||
|
self.flipper = serial.Serial(self.serial_port, timeout=1)
|
||||||
|
self.flipper.baudrate = 230400
|
||||||
|
self.flipper.flushOutput()
|
||||||
|
self.flipper.flushInput()
|
||||||
|
|
||||||
|
# disable timeout
|
||||||
|
self.flipper.timeout = None
|
||||||
|
|
||||||
|
# wait for prompt
|
||||||
|
self.flipper.read_until(b'>: ')
|
||||||
|
|
||||||
|
# send command and skip answer
|
||||||
|
self.flipper.write(b"start_rpc_session\r")
|
||||||
|
self.flipper.read_until(b'\n')
|
||||||
|
|
||||||
|
# construct protobuf worker
|
||||||
|
self.proto = ProtoFlipper(self.flipper)
|
||||||
|
|
||||||
|
print("Ping result: ")
|
||||||
|
print_hex(self.proto.cmd_system_ping())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _cmd_storage_list_directory(self, path):
|
||||||
|
cmd_data = storage_pb2.ListRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_list_request')
|
||||||
|
|
||||||
|
def _cmd_storage_stat(self, path):
|
||||||
|
cmd_data = storage_pb2.StatRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
return self.proto._cmd_send_and_read_answer(cmd_data, 'storage_stat_request')
|
||||||
|
|
||||||
|
def _cmd_storage_read(self, path):
|
||||||
|
cmd_data = storage_pb2.ReadRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_read_request')
|
||||||
|
|
||||||
|
def _cmd_storage_mkdir(self, path):
|
||||||
|
cmd_data = storage_pb2.MkdirRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_mkdir_request')
|
||||||
|
|
||||||
|
def _cmd_storage_rmdir(self, path):
|
||||||
|
cmd_data = storage_pb2.RmdirRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_rmdir_request')
|
||||||
|
|
||||||
|
def _cmd_storage_rename(self, old_path, new_path):
|
||||||
|
cmd_data = storage_pb2.RenameRequest()
|
||||||
|
cmd_data.old_path = old_path
|
||||||
|
cmd_data.new_path = new_path
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_rename_request')
|
||||||
|
|
||||||
|
def _cmd_storage_delete(self, path, recursive):
|
||||||
|
cmd_data = storage_pb2.DeleteRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
cmd_data.recursive = recursive
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_delete_request')
|
||||||
|
|
||||||
|
def _cmd_storage_write(self, path, data):
|
||||||
|
cmd_data = storage_pb2.WriteRequest()
|
||||||
|
cmd_data.path = path
|
||||||
|
cmd_data.file.data = data
|
||||||
|
self.proto._cmd_send(cmd_data, 'storage_write_request')
|
||||||
|
|
||||||
|
def check_response_status(self, response):
|
||||||
|
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
|
||||||
|
raise InvalidNameError()
|
||||||
|
|
||||||
|
|
||||||
|
def list_directory(self, path, additional_data = {}):
|
||||||
|
with self.mutex:
|
||||||
|
self._cmd_storage_list_directory(path)
|
||||||
|
|
||||||
|
files = []
|
||||||
|
|
||||||
|
while True:
|
||||||
|
packet = self.proto._cmd_read_answer()
|
||||||
|
self.check_response_status(packet)
|
||||||
|
for file in packet.storage_list_response.file:
|
||||||
|
files.append({**{
|
||||||
|
'name': file.name,
|
||||||
|
'type': storage_pb2.File.FileType.Name(file.type)
|
||||||
|
}, **additional_data})
|
||||||
|
if not packet.has_next:
|
||||||
|
break
|
||||||
|
|
||||||
|
return files
|
||||||
|
|
||||||
|
def stat(self, path):
|
||||||
|
with self.mutex:
|
||||||
|
response = self._cmd_storage_stat(path)
|
||||||
|
|
||||||
|
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
|
||||||
|
raise InvalidNameError()
|
||||||
|
|
||||||
|
response = response.storage_stat_response
|
||||||
|
|
||||||
|
return {'size': response.file.size}
|
||||||
|
|
||||||
|
def read_file_contents(self, path):
|
||||||
|
with self.mutex:
|
||||||
|
self._cmd_storage_read(path)
|
||||||
|
|
||||||
|
contents = []
|
||||||
|
|
||||||
|
while True:
|
||||||
|
packet = self.proto._cmd_read_answer()
|
||||||
|
print(packet)
|
||||||
|
self.check_response_status(packet)
|
||||||
|
contents.extend(packet.storage_read_response.file.data)
|
||||||
|
if not packet.has_next:
|
||||||
|
break
|
||||||
|
|
||||||
|
return {'data': contents}
|
||||||
|
|
||||||
|
def mkdir(self, path):
|
||||||
|
with self.mutex:
|
||||||
|
print(f'mkdir {path}')
|
||||||
|
|
||||||
|
self._cmd_storage_mkdir(path)
|
||||||
|
|
||||||
|
def rmdir(self, path):
|
||||||
|
with self.mutex:
|
||||||
|
print(f'rmdir {path}')
|
||||||
|
|
||||||
|
self._cmd_storage_dirdir(path)
|
||||||
|
|
||||||
|
def rename(self, old_path, new_path):
|
||||||
|
with self.mutex:
|
||||||
|
self._cmd_storage_rename(old_path, new_path)
|
||||||
|
|
||||||
|
def delete(self, path, recursive):
|
||||||
|
with self.mutex:
|
||||||
|
self._cmd_storage_delete(path, recursive)
|
||||||
|
|
||||||
|
def write(self, path, data):
|
||||||
|
with self.mutex:
|
||||||
|
self._cmd_storage_write(path, data)
|
||||||
|
|
||||||
|
class InvalidNameError(RuntimeError):
|
||||||
|
pass
|
1
flipperzero_protobuf_py
Submodule
1
flipperzero_protobuf_py
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 3a13eee93218e0954a04cc46d14c398bdf810d9f
|
208
fzfs.py
Normal file
208
fzfs.py
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
import errno
|
||||||
|
from os import unlink
|
||||||
|
from stat import S_IFDIR, ST_ATIME, ST_CTIME, ST_MODE, ST_MTIME, ST_NLINK
|
||||||
|
|
||||||
|
from numpy import delete, full
|
||||||
|
import flipper_api
|
||||||
|
import sys
|
||||||
|
import fuse
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import stat
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print('starting')
|
||||||
|
try:
|
||||||
|
fs = fuse.FUSE(FlipperZeroFileSysten(sys.argv[1]), sys.argv[2], foreground=True)
|
||||||
|
except:
|
||||||
|
print('stopping')
|
||||||
|
fuse.fuse_exit()
|
||||||
|
|
||||||
|
|
||||||
|
class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
|
||||||
|
def __init__(self, serial_port) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.api = flipper_api.FlipperAPI(serial_port)
|
||||||
|
self.api.connect()
|
||||||
|
self.file_root = {
|
||||||
|
'type': 'DIR'
|
||||||
|
}
|
||||||
|
self.fd = 0
|
||||||
|
|
||||||
|
def find_child_by_name(self, parent, child_name):
|
||||||
|
for child in parent['children']:
|
||||||
|
if child['name'] == child_name:
|
||||||
|
return child
|
||||||
|
raise fuse.FuseOSError(errno.ENOENT)
|
||||||
|
|
||||||
|
def get_file_from_parts(self, parent, parts, index):
|
||||||
|
def list_dir(dir_path):
|
||||||
|
return self.api.list_directory(dir_path, {'full_path': dir_path, 'parent': parent})
|
||||||
|
|
||||||
|
if index <= len(parts):
|
||||||
|
full_path = f"/{'/'.join(parts[:index])}"
|
||||||
|
|
||||||
|
if parent['type'] == 'DIR':
|
||||||
|
try:
|
||||||
|
parent['children']
|
||||||
|
except KeyError:
|
||||||
|
parent['children'] = list_dir(full_path)
|
||||||
|
|
||||||
|
if index == len(parts):
|
||||||
|
return parent
|
||||||
|
|
||||||
|
child = self.find_child_by_name(parent, parts[index])
|
||||||
|
|
||||||
|
return self.get_file_from_parts(child, parts, index + 1)
|
||||||
|
|
||||||
|
return parent
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_by_path(self, path_full: str):
|
||||||
|
path = path_full[:]
|
||||||
|
if path[0] == '/':
|
||||||
|
path = path[1:]
|
||||||
|
if path == '':
|
||||||
|
parts = []
|
||||||
|
else:
|
||||||
|
parts = path.split('/')
|
||||||
|
|
||||||
|
return self.get_file_from_parts(self.file_root, parts, 0)
|
||||||
|
|
||||||
|
def readdir(self, path, fh = None):
|
||||||
|
# print(f'requested {path}')
|
||||||
|
|
||||||
|
parent = self.get_file_by_path(path)
|
||||||
|
|
||||||
|
return ['.', '..'] + [child['name'] for child in parent['children']]
|
||||||
|
|
||||||
|
def getattr(self, path, fh=None):
|
||||||
|
# print(f'getattr {path}')
|
||||||
|
file = self.get_file_by_path(path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return file['attr']
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print(f'getting attr for {path}')
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
attr = {
|
||||||
|
'st_mode': 0o777,
|
||||||
|
'st_ctime': now,
|
||||||
|
'st_mtime': now,
|
||||||
|
'st_atime': now
|
||||||
|
}
|
||||||
|
|
||||||
|
is_dir = (file['type'] == 'DIR')
|
||||||
|
|
||||||
|
if is_dir:
|
||||||
|
attr['st_mode'] |= stat.S_IFDIR
|
||||||
|
attr['st_nlink'] = 2
|
||||||
|
else:
|
||||||
|
response = self.api.stat(path)
|
||||||
|
attr['st_size'] = response['size']
|
||||||
|
attr['st_mode'] |= stat.S_IFREG
|
||||||
|
attr['st_nlink'] = 1
|
||||||
|
|
||||||
|
|
||||||
|
file['attr'] = attr
|
||||||
|
|
||||||
|
return attr
|
||||||
|
|
||||||
|
def read(self, path, size, offset, fh):
|
||||||
|
cached = self.get_file_by_path(path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return bytes(cached['contents'][offset:offset + size])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
data = None
|
||||||
|
|
||||||
|
print(f'reading {path}')
|
||||||
|
|
||||||
|
data = self.api.read_file_contents(path)['data']
|
||||||
|
|
||||||
|
cached['contents'] = data
|
||||||
|
return bytes(data[offset:offset + size])
|
||||||
|
|
||||||
|
def write(self, path, data, offset, fh):
|
||||||
|
print(f'write file: {path} offset: {offset} length: {len(data)} type: {type(data)}')
|
||||||
|
try:
|
||||||
|
cached = self.get_file_by_path(path)
|
||||||
|
except OSError:
|
||||||
|
self.create(path, None)
|
||||||
|
cached = self.get_file_by_path(path)
|
||||||
|
|
||||||
|
cached['contents'][offset:offset] = list(data)
|
||||||
|
cached['attr']['st_size'] = len(cached['contents'])
|
||||||
|
self.api.write(path, bytes(cached['contents']))
|
||||||
|
return len(data)
|
||||||
|
|
||||||
|
|
||||||
|
def get_filename_from_path(self, path):
|
||||||
|
parts = path[1:].split('/')
|
||||||
|
return parts[-1]
|
||||||
|
|
||||||
|
def get_parent_from_path(self, path):
|
||||||
|
return path[:-(len(self.get_filename_from_path(path)) + 1)]
|
||||||
|
|
||||||
|
def append_to_parend(self, child_path, child):
|
||||||
|
parent_path = self.get_parent_from_path(child_path)
|
||||||
|
parent = self.get_file_by_path(parent_path)
|
||||||
|
child['parent'] = parent
|
||||||
|
print(f'appending to {parent_path}')
|
||||||
|
parent['children'].append(child)
|
||||||
|
|
||||||
|
def mkdir(self, path, mode):
|
||||||
|
print(f'mkdir {path}')
|
||||||
|
self.append_to_parend(path, {
|
||||||
|
'name': self.get_filename_from_path(path),
|
||||||
|
'type': 'DIR'
|
||||||
|
})
|
||||||
|
self.api.mkdir(path)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def rename(self, old, new):
|
||||||
|
try:
|
||||||
|
new_file = self.get_file_by_path(new)
|
||||||
|
new_file['parent']['children'].remove(new_file)
|
||||||
|
self.api.delete(new, True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print(f'renaming {old} -> {new}')
|
||||||
|
cached = self.get_file_by_path(old)
|
||||||
|
self.api.rename(old, new)
|
||||||
|
parts = new.split('/')
|
||||||
|
cached['name'] = parts[-1]
|
||||||
|
|
||||||
|
def rmdir(self, path):
|
||||||
|
self.unlink(path)
|
||||||
|
|
||||||
|
def create(self, path, mode, fi=None):
|
||||||
|
print(f'creating {path}')
|
||||||
|
self.append_to_parend(path, {
|
||||||
|
'name': self.get_filename_from_path(path),
|
||||||
|
'type': 'FILE',
|
||||||
|
'contents': [],
|
||||||
|
})
|
||||||
|
self.api.write(path, bytes())
|
||||||
|
self.fd += 1
|
||||||
|
return self.fd
|
||||||
|
|
||||||
|
def unlink(self, path):
|
||||||
|
# print(f'unlinking {path}')
|
||||||
|
cached = self.get_file_by_path(path)
|
||||||
|
self.api.delete(path, True)
|
||||||
|
cached['parent']['children'].remove(cached)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
main()
|
Loading…
x
Reference in New Issue
Block a user