added BLE mode

This commit is contained in:
Daniel Dakhno 2022-09-17 00:09:09 +02:00
parent eb389ee1cd
commit b3284ed938
6 changed files with 229 additions and 36 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.vscode/
venv/
__pycache__/
mount/

View File

@ -9,19 +9,37 @@ git clone --recursive git@github.com:dakhnod/fzfs.git
cd fzfs
python3 -m venv venv
. venv/bin/activate
pip install protobuf fusepy pyserial numpy
pip install -r requirements.txt
```
## Usage
## Connect via USB Serial
The script takes two arguments, the serial port and the mount point
```
venv/bin/python3 fzfs.py /dev/ttyACM0 /home/user/flipper-zero
venv/bin/python3 fzfs.py -d /dev/ttyACM0 -m /home/user/flipper-zero
```
Then you should be able to access your flipper files through file browser of the console in the mountpoint.
## Connect via BLE Serial
First, you need to pair your flipper with your computer. Tihs process varies, but a good starting point is:
```
bluetoothctl
agent on
pair youf_flipper_mac_address
```
This should ask you for a confirmation code and pair your device.
After that, disconnect your Flipper from your computer.
Then, you can run
```
venv/bin/python3 fzfs.py -a "youf_flipper_mac_address" -m /home/user/flipper-zero
```
## Disclaimer
This software is still work in progress and may have errors despite my best efforts, so use with caution.

View File

@ -14,31 +14,15 @@ from flipperzero_protobuf_py.cli_helpers import *
class FlipperAPI():
def __init__(self, serial_port) -> None:
self.serial_port = serial_port
def __init__(self, flipper_serial) -> None:
self.serial_port = flipper_serial
self.proto = None
self.flipper = None
self.flipper = flipper_serial
self.mutex=threading.Lock()
def connect(self):
with self.mutex:
self.flipper = serial.Serial(self.serial_port, timeout=1)
self.flipper.baudrate = 230400
self.flipper.flushOutput()
self.flipper.flushInput()
# disable timeout
self.flipper.timeout = None
# wait for prompt
self.flipper.read_until(b'>: ')
# send command and skip answer
self.flipper.write(b"start_rpc_session\r")
self.flipper.read_until(b'\n')
# construct protobuf worker
self.proto = ProtoFlipper(self.flipper)
print("Ping result: ")

107
fzfs.py
View File

@ -1,24 +1,35 @@
from ast import parse
from audioop import add
import errno
from fileinput import filename
from os import unlink
import pathlib
from signal import signal
from stat import S_IFDIR, ST_ATIME, ST_CTIME, ST_MODE, ST_MTIME, ST_NLINK
from turtle import back
from numpy import delete, full
import flipper_api
import sys
import fuse
import logging
import time
import threading
import stat
import os
import argparse
import pathlib
import serial
import serial_ble
import sys
def main():
if len(sys.argv) != 3:
print('usage: python fzfs.py serial-device mountpoint')
print('example: python fzfs.py /dev/ttyACM0 /home/user/flipper-zero')
return
parser = argparse.ArgumentParser(description='FUSE driver for flipper serial connection')
parser.add_argument('-d', '--device', help='Serial device to connect to', dest='serial_device')
parser.add_argument('-a', '--address', help='Flipper BLE address', dest='ble_address')
parser.add_argument('-m', '--mount', help='Mount point to mount the FZ to', dest='mountpoint', required=True)
args = parser.parse_args()
mountpoint = sys.argv[2]
logging.basicConfig(level=logging.DEBUG)
mountpoint = args.mountpoint
if not os.path.isdir(mountpoint):
print('mountpoint must be an empty folder')
@ -28,10 +39,74 @@ def main():
print('mountpoint must be an empty folder')
return
if args.serial_device is None and args.ble_address is None:
print('either serial_device or ble_address required')
return
if args.serial_device is not None and args.ble_address is not None:
print('only one of serial_device/ble_address required')
return
serial_device = None
def create_serial_device():
if args.serial_device is not None:
if not os.path.exists(args.serial_device):
print('serial device not an actual file')
parser.print_usage()
exit()
return create_physical_serial(args.serial_device, True)
if args.ble_address is not None:
def disconnect_handler(client):
print('disconnected')
sys.exit(0)
return create_ble_serial(args.ble_address, None)
serial_device = create_serial_device()
if serial_device is None:
print('failed creating serial device')
backend = FlipperZeroFileSysten(serial_device)
fuse_started = True
# fuse_thread = threading.Thread(target=fuse.FUSE, kwargs={'operations': backend, 'mountpoint': mountpoint, 'foreground': True})
def fuse_start():
fuse.FUSE(backend, mountpoint, foreground=True)
print('starting fs...')
fuse_start()
print('fuse stopped')
try:
fs = fuse.FUSE(FlipperZeroFileSysten(sys.argv[1]), sys.argv[2], foreground=True)
except:
fuse.fuse_exit()
serial_device.stop()
print('stopped bluetooth')
except AttributeError:
pass
def create_physical_serial(file, is_cli):
s = serial.Serial(file, timeout=1)
s.baudrate = 230400
s.flushOutput()
s.flushInput()
if is_cli:
s.read_until(b'>: ')
s.write(b"start_rpc_session\r")
s.read_until(b'\n')
return s
def create_ble_serial(address, disconnected_handler):
s = serial_ble.BLESerial(address, '19ed82ae-ed21-4c9d-4145-228e61fe0000', '19ed82ae-ed21-4c9d-4145-228e62fe0000')
print('connecting...')
s.start(disconnected_handler)
print('connected')
return s
class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
@ -92,7 +167,6 @@ class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
return ['.', '..'] + [child['name'] for child in parent['children']]
def getattr(self, path, fh=None):
# print(f'getattr {path}')
file = self.get_file_by_path(path)
try:
@ -145,7 +219,7 @@ class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
return bytes(data[offset:offset + size])
def write(self, path, data, offset, fh):
print(f'write file: {path} offset: {offset} length: {len(data)} type: {type(data)}')
print(f'write file: {path} offset: {offset} length: {len(data)}')
try:
cached = self.get_file_by_path(path)
except OSError:
@ -157,6 +231,12 @@ class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
self.api.write(path, bytes(cached['contents']))
return len(data)
def open(self, path, flags):
print(f'open {path} {flags}')
self.fd += 1
return self.fd
def get_filename_from_path(self, path):
parts = path[1:].split('/')
@ -169,7 +249,6 @@ class FlipperZeroFileSysten(fuse.Operations, fuse.LoggingMixIn):
parent_path = self.get_parent_from_path(child_path)
parent = self.get_file_by_path(parent_path)
child['parent'] = parent
print(f'appending to {parent_path}')
parent['children'].append(child)
def mkdir(self, path, mode):

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
async-timeout==4.0.2
bleak==0.17.0
dbus-fast==1.4.0
fusepy==3.0.1
numpy==1.23.3
protobuf==4.21.6
pyserial==3.5

104
serial_ble.py Normal file
View File

@ -0,0 +1,104 @@
from audioop import add
from concurrent.futures import thread
from sqlite3 import connect
import bleak
import serial
import asyncio
import threading
import time
class BLESerial(serial.Serial):
def __init__(self, address: str, read_characteristic: str, write_characteristic: str, read_timeout=1):
self.address = address
self.read_characteristic = read_characteristic
self.write_characteristic = write_characteristic
self.client = None
self.read_buffer = []
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self.loop.run_forever)
self.connect_condition = threading.Condition()
self.write_condition = threading.Condition()
self.read_condition = threading.Condition()
self.read_timeout = read_timeout
self.exception = None
def start(self, disconnect_handler):
self.thread.start()
asyncio.run_coroutine_threadsafe(self.connect(60, disconnect_handler), self.loop)
with self.connect_condition:
self.connect_condition.wait()
def stop(self):
asyncio.run_coroutine_threadsafe(self.disconnect(), self.loop)
with self.connect_condition:
self.connect_condition.wait(20)
with self.connect_condition:
self.connect_condition.notify()
with self.read_condition:
self.read_condition.notify()
with self.write_condition:
self.write_condition.notify()
self.loop.stop()
async def disconnect(self):
await self.client.disconnect()
with self.connect_condition:
self.connect_condition.notify()
async def connect(self, timeout, disconnect_handler=None):
self.client = bleak.BleakClient(self.address, addr_type='random')
self.client.set_disconnected_callback(disconnect_handler)
connected = False
for i in range(10):
try:
print(f'connect attempt {i + 1}/10')
result = await self.client.connect(timeout)
connected = True
break
except bleak.BleakError as e:
pass
if not connected:
raise f'Could not connect to {self.address}'
await self.client.start_notify(self.read_characteristic, self.on_serial_data)
with self.connect_condition:
self.connect_condition.notify()
def on_serial_data(self, size, data):
# print(f'received {list(data)}')
self.read_buffer.extend(list(data))
with self.read_condition:
self.read_condition.notify()
def read(self, size: int):
# print(f'reading {size}, available {len(self.read_buffer)}')
if len(self.read_buffer) < size:
with self.read_condition:
self.read_condition.wait(self.read_timeout)
if not self.client.is_connected:
raise Exception('Device disconnected')
data = self.read_buffer[:size]
self.read_buffer = self.read_buffer[size:]
return bytes(data)
async def write_char(self, char, data):
try:
result = await self.client.write_gatt_char(char, data)
except Exception as e:
print(e)
with self.write_condition:
self.write_condition.notify()
def write(self, data):
if not self.client.is_connected:
raise Exception('Device disconnected')
# print(f'writing {list(data)}')
asyncio.run_coroutine_threadsafe(self.write_char(self.write_characteristic, data), self.loop)
with self.write_condition:
self.write_condition.wait(5)
if not self.client.is_connected:
raise Exception('Device disconnected')