fzfs/flipper_api.py
2023-04-23 22:01:25 -06:00

152 lines
4.7 KiB
Python

#!/usr/bin/env python3
# pylint: disable=protected-access
# pylint: disable=no-member
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
import threading
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import flipper_pb2, storage_pb2
from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper
from flipperzero_protobuf_py.cli_helpers import *
class FlipperAPI():
def __init__(self, flipper_serial) -> None:
self.serial_port = flipper_serial
self.proto = None
self.flipper = flipper_serial
self.mutex=threading.Lock()
def connect(self):
with self.mutex:
self.proto = ProtoFlipper(self.flipper)
print("Ping result: ")
print_hex(self.proto.cmd_system_ping())
def _cmd_storage_list_directory(self, path):
cmd_data = storage_pb2.ListRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_list_request')
def _cmd_storage_stat(self, path):
cmd_data = storage_pb2.StatRequest()
cmd_data.path = path
return self.proto._cmd_send_and_read_answer(cmd_data, 'storage_stat_request')
def _cmd_storage_read(self, path):
cmd_data = storage_pb2.ReadRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_read_request')
def _cmd_storage_mkdir(self, path):
cmd_data = storage_pb2.MkdirRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_mkdir_request')
def _cmd_storage_rmdir(self, path):
cmd_data = storage_pb2.RmdirRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_rmdir_request')
def _cmd_storage_rename(self, old_path, new_path):
cmd_data = storage_pb2.RenameRequest()
cmd_data.old_path = old_path
cmd_data.new_path = new_path
self.proto._cmd_send(cmd_data, 'storage_rename_request')
def _cmd_storage_delete(self, path, recursive):
cmd_data = storage_pb2.DeleteRequest()
cmd_data.path = path
cmd_data.recursive = recursive
self.proto._cmd_send(cmd_data, 'storage_delete_request')
def _cmd_storage_write(self, path, data):
cmd_data = storage_pb2.WriteRequest()
cmd_data.path = path
cmd_data.file.data = data
self.proto._cmd_send(cmd_data, 'storage_write_request')
def check_response_status(self, response):
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
raise InvalidNameError()
def list_directory(self, path, additional_data):
with self.mutex:
self._cmd_storage_list_directory(path)
files = []
while True:
packet = self.proto._cmd_read_answer()
self.check_response_status(packet)
for file in packet.storage_list_response.file:
files.append({**{
'name': file.name,
'type': storage_pb2.File.FileType.Name(file.type)
}, **additional_data})
if not packet.has_next:
break
return files
def stat(self, path):
with self.mutex:
response = self._cmd_storage_stat(path)
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
raise InvalidNameError()
response = response.storage_stat_response
return {'size': response.file.size}
def read_file_contents(self, path):
with self.mutex:
self._cmd_storage_read(path)
contents = []
while True:
packet = self.proto._cmd_read_answer()
print(packet)
self.check_response_status(packet)
contents.extend(packet.storage_read_response.file.data)
if not packet.has_next:
break
return {'data': contents}
def mkdir(self, path):
with self.mutex:
print(f'mkdir {path}')
self._cmd_storage_mkdir(path)
def rmdir(self, path):
with self.mutex:
print(f'rmdir {path}')
self._cmd_storage_dirdir(path)
def rename(self, old_path, new_path):
with self.mutex:
self._cmd_storage_rename(old_path, new_path)
def delete(self, path, recursive):
with self.mutex:
self._cmd_storage_delete(path, recursive)
def write(self, path, data):
with self.mutex:
self._cmd_storage_write(path, data)
def close(self):
with self.mutex:
self.proto.cmd_flipper_stop_session()
class InvalidNameError(RuntimeError):
pass