mirror of
https://github.com/dakhnod/fzfs.git
synced 2025-05-15 14:50:09 -07:00
152 lines
4.7 KiB
Python
152 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
# pylint: disable=protected-access
|
|
# pylint: disable=no-member
|
|
# pylint: disable=missing-class-docstring
|
|
# pylint: disable=missing-function-docstring
|
|
|
|
import threading
|
|
|
|
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import flipper_pb2, storage_pb2
|
|
from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper
|
|
from flipperzero_protobuf_py.cli_helpers import *
|
|
|
|
|
|
class FlipperAPI():
|
|
def __init__(self, flipper_serial) -> None:
|
|
self.serial_port = flipper_serial
|
|
self.proto = None
|
|
self.flipper = flipper_serial
|
|
self.mutex=threading.Lock()
|
|
|
|
def connect(self):
|
|
with self.mutex:
|
|
self.proto = ProtoFlipper(self.flipper)
|
|
|
|
print("Ping result: ")
|
|
print_hex(self.proto.cmd_system_ping())
|
|
|
|
|
|
def _cmd_storage_list_directory(self, path):
|
|
cmd_data = storage_pb2.ListRequest()
|
|
cmd_data.path = path
|
|
self.proto._cmd_send(cmd_data, 'storage_list_request')
|
|
|
|
def _cmd_storage_stat(self, path):
|
|
cmd_data = storage_pb2.StatRequest()
|
|
cmd_data.path = path
|
|
return self.proto._cmd_send_and_read_answer(cmd_data, 'storage_stat_request')
|
|
|
|
def _cmd_storage_read(self, path):
|
|
cmd_data = storage_pb2.ReadRequest()
|
|
cmd_data.path = path
|
|
self.proto._cmd_send(cmd_data, 'storage_read_request')
|
|
|
|
def _cmd_storage_mkdir(self, path):
|
|
cmd_data = storage_pb2.MkdirRequest()
|
|
cmd_data.path = path
|
|
self.proto._cmd_send(cmd_data, 'storage_mkdir_request')
|
|
|
|
def _cmd_storage_rmdir(self, path):
|
|
cmd_data = storage_pb2.RmdirRequest()
|
|
cmd_data.path = path
|
|
self.proto._cmd_send(cmd_data, 'storage_rmdir_request')
|
|
|
|
def _cmd_storage_rename(self, old_path, new_path):
|
|
cmd_data = storage_pb2.RenameRequest()
|
|
cmd_data.old_path = old_path
|
|
cmd_data.new_path = new_path
|
|
self.proto._cmd_send(cmd_data, 'storage_rename_request')
|
|
|
|
def _cmd_storage_delete(self, path, recursive):
|
|
cmd_data = storage_pb2.DeleteRequest()
|
|
cmd_data.path = path
|
|
cmd_data.recursive = recursive
|
|
self.proto._cmd_send(cmd_data, 'storage_delete_request')
|
|
|
|
def _cmd_storage_write(self, path, data):
|
|
cmd_data = storage_pb2.WriteRequest()
|
|
cmd_data.path = path
|
|
cmd_data.file.data = data
|
|
self.proto._cmd_send(cmd_data, 'storage_write_request')
|
|
|
|
def check_response_status(self, response):
|
|
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
|
|
raise InvalidNameError()
|
|
|
|
|
|
def list_directory(self, path, additional_data):
|
|
with self.mutex:
|
|
self._cmd_storage_list_directory(path)
|
|
|
|
files = []
|
|
|
|
while True:
|
|
packet = self.proto._cmd_read_answer()
|
|
self.check_response_status(packet)
|
|
for file in packet.storage_list_response.file:
|
|
files.append({**{
|
|
'name': file.name,
|
|
'type': storage_pb2.File.FileType.Name(file.type)
|
|
}, **additional_data})
|
|
if not packet.has_next:
|
|
break
|
|
|
|
return files
|
|
|
|
def stat(self, path):
|
|
with self.mutex:
|
|
response = self._cmd_storage_stat(path)
|
|
|
|
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
|
|
raise InvalidNameError()
|
|
|
|
response = response.storage_stat_response
|
|
|
|
return {'size': response.file.size}
|
|
|
|
def read_file_contents(self, path):
|
|
with self.mutex:
|
|
self._cmd_storage_read(path)
|
|
|
|
contents = []
|
|
|
|
while True:
|
|
packet = self.proto._cmd_read_answer()
|
|
print(packet)
|
|
self.check_response_status(packet)
|
|
contents.extend(packet.storage_read_response.file.data)
|
|
if not packet.has_next:
|
|
break
|
|
return {'data': contents}
|
|
|
|
def mkdir(self, path):
|
|
with self.mutex:
|
|
print(f'mkdir {path}')
|
|
|
|
self._cmd_storage_mkdir(path)
|
|
|
|
def rmdir(self, path):
|
|
with self.mutex:
|
|
print(f'rmdir {path}')
|
|
|
|
self._cmd_storage_dirdir(path)
|
|
|
|
def rename(self, old_path, new_path):
|
|
with self.mutex:
|
|
self._cmd_storage_rename(old_path, new_path)
|
|
|
|
def delete(self, path, recursive):
|
|
with self.mutex:
|
|
self._cmd_storage_delete(path, recursive)
|
|
|
|
def write(self, path, data):
|
|
with self.mutex:
|
|
self._cmd_storage_write(path, data)
|
|
|
|
def close(self):
|
|
with self.mutex:
|
|
self.proto.cmd_flipper_stop_session()
|
|
|
|
class InvalidNameError(RuntimeError):
|
|
pass
|