mirror of
https://github.com/NaitLee/Cat-Printer.git
synced 2025-05-15 14:50:26 -07:00
370 lines
12 KiB
Python
370 lines
12 KiB
Python
'''
|
|
Cat-Printer: Web Interface Server
|
|
|
|
Copyright © 2021-2023 NaitLee Soft. All rights reserved.
|
|
|
|
License GPL-3.0-or-later: https://www.gnu.org/licenses/gpl-3.0.html
|
|
'''
|
|
|
|
# if pylint is annoying you, see file .pylintrc
|
|
|
|
import os
|
|
import io
|
|
import sys
|
|
import json
|
|
import warnings
|
|
import webbrowser
|
|
|
|
# For now we can't use `ThreadingHTTPServer`
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
|
|
# import `printer` first, to diagnostic some common errors
|
|
from printer import PrinterDriver, PrinterError, i18n, info
|
|
|
|
from bleak.exc import BleakDBusError, BleakError # pylint: disable=wrong-import-order
|
|
|
|
from printer_lib.ipp import IPP
|
|
|
|
# Supress non-sense asyncio warnings
|
|
warnings.simplefilter('ignore', RuntimeWarning, 0, True)
|
|
|
|
IsAndroid = (os.environ.get("P4A_BOOTSTRAP") is not None)
|
|
|
|
|
|
class DictAsObject(dict):
|
|
""" Let you use a dict like an object in JavaScript.
|
|
"""
|
|
def __getattr__(self, key):
|
|
return self.get(key, None)
|
|
def __setattr__(self, key, value):
|
|
self[key] = value
|
|
|
|
class PrinterServerError(PrinterError):
|
|
'Error of PrinterServer'
|
|
|
|
mime_type = {
|
|
'html': 'text/html;charset=utf-8',
|
|
'css': 'text/css;charset=utf-8',
|
|
'js': 'text/javascript;charset=utf-8',
|
|
'txt': 'text/plain;charset=utf-8',
|
|
'json': 'application/json;charset=utf-8',
|
|
'png': 'image/png',
|
|
'svg': 'image/svg+xml;charset=utf-8',
|
|
'wasm': 'application/wasm',
|
|
'octet-stream': 'application/octet-stream'
|
|
}
|
|
def mime(url: str):
|
|
'Get pre-defined MIME type of a certain url by extension name'
|
|
return mime_type.get(url.rsplit('.', 1)[-1], mime_type['octet-stream'])
|
|
|
|
def concat_files(*paths, prefix_format='', buffer=4 * 1024 * 1024) -> bytes:
|
|
'Generator, that yields buffered file content, with optional prefix'
|
|
for path in paths:
|
|
yield prefix_format.format(path).encode('utf-8')
|
|
with open(path, 'rb') as file:
|
|
while data := file.read(buffer):
|
|
yield data
|
|
|
|
class PrinterServerHandler(BaseHTTPRequestHandler):
|
|
'(Local) server handler for Cat Printer Web interface'
|
|
|
|
buffer = 4 * 1024 * 1024
|
|
|
|
max_payload = buffer * 16
|
|
|
|
settings = DictAsObject({
|
|
'config_path': 'config.json',
|
|
'version': 4,
|
|
'first_run': True,
|
|
'is_android': False,
|
|
'scan_time': 4.0,
|
|
'dry_run': False,
|
|
'energy': 64,
|
|
'quality': 36
|
|
})
|
|
_settings_blacklist = (
|
|
'printer', 'is_android'
|
|
)
|
|
all_script: list = []
|
|
|
|
printer: PrinterDriver = PrinterDriver()
|
|
|
|
ipp: IPP = None
|
|
|
|
def log_request(self, _code=200, _size=0):
|
|
pass
|
|
|
|
def log_error(self, *_args):
|
|
pass
|
|
|
|
def handle_one_request(self):
|
|
try:
|
|
# this handler would have only one instance
|
|
# broken pipe could make it die. ignore
|
|
super().handle_one_request()
|
|
except BrokenPipeError:
|
|
pass
|
|
|
|
def do_GET(self):
|
|
'Called when server got a GET http request'
|
|
# prepare
|
|
path, _, _args = self.path.partition('?')
|
|
if '/..' in path or '../' in path:
|
|
return
|
|
if path == '/':
|
|
path += 'index.html'
|
|
# special
|
|
if path.startswith('/~'):
|
|
action = path[2:]
|
|
if action == 'every.js':
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', mime(path))
|
|
self.end_headers()
|
|
for data in concat_files(*(self.all_script), prefix_format='\n// {0}\n'):
|
|
self.wfile.write(data)
|
|
return
|
|
path = 'www' + path
|
|
# not found
|
|
if not os.path.isfile(path):
|
|
self.send_response(404)
|
|
self.send_header('Content-Type', mime('txt'))
|
|
self.end_headers()
|
|
return
|
|
# static
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', mime(path))
|
|
# self.send_header('Content-Size', str(os.stat(path).st_size))
|
|
self.end_headers()
|
|
with open(path, 'rb') as file:
|
|
while True:
|
|
chunk = file.read(self.buffer)
|
|
if not self.wfile.write(chunk):
|
|
break
|
|
return
|
|
|
|
def api_success(self, body_json=None):
|
|
'Called when an API call is being considered successful'
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', mime('json'))
|
|
self.end_headers()
|
|
if body_json is None:
|
|
self.wfile.write(b'{}')
|
|
else:
|
|
self.wfile.write(json.dumps(body_json).encode('utf-8'))
|
|
|
|
def api_fail(self, error_json):
|
|
'Called when an API call is failed'
|
|
self.send_response(500)
|
|
self.send_header('Content-Type', mime('json'))
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(error_json).encode('utf-8'))
|
|
self.wfile.flush()
|
|
|
|
def load_config(self):
|
|
'Load config file, or if not exist, create one with default'
|
|
if IsAndroid:
|
|
self.settings['is_android'] = True
|
|
from android.storage import app_storage_path # pylint: disable=import-error
|
|
settings_path = app_storage_path()
|
|
os.makedirs(settings_path, exist_ok=True)
|
|
self.settings['config_path'] = os.path.join(
|
|
settings_path, 'config.json'
|
|
)
|
|
if os.path.exists(self.settings.config_path):
|
|
with open(self.settings.config_path, 'r', encoding='utf-8') as file:
|
|
settings = DictAsObject(json.load(file))
|
|
if (settings.version is None or
|
|
settings.version < self.settings.version):
|
|
# Version too old, start over
|
|
# TODO: selective?
|
|
self.save_config()
|
|
return
|
|
for key in settings:
|
|
self.settings[key] = settings[key]
|
|
else:
|
|
if os.name in ('posix',) or IsAndroid:
|
|
self.settings['scan_time'] = 2.0
|
|
self.save_config()
|
|
|
|
def save_config(self):
|
|
'Save config file'
|
|
with open(self.settings.config_path, 'w', encoding='utf-8') as file:
|
|
settings = {}
|
|
for i in self.settings:
|
|
if i not in self._settings_blacklist:
|
|
settings[i] = self.settings[i]
|
|
json.dump(settings, file, indent=4)
|
|
|
|
def update_printer(self):
|
|
'Update `PrinterDriver` state/config'
|
|
self.printer.dry_run = self.settings.dry_run
|
|
self.printer.scan_time = self.settings.scan_time
|
|
self.printer.fake = self.settings.fake
|
|
self.printer.dump = self.settings.dump
|
|
if self.settings.energy is not None:
|
|
self.printer.energy = int(self.settings.energy) * 0x100
|
|
if self.settings.quality is not None:
|
|
self.printer.speed = int(self.settings.quality)
|
|
self.printer.flip_h = self.settings.flip_h or self.settings.flip
|
|
self.printer.flip_v = self.settings.flip_v or self.settings.flip
|
|
self.printer.rtl = self.settings.force_rtl
|
|
|
|
def handle_api(self):
|
|
'Handle API request from POST'
|
|
content_length = int(self.headers.get('Content-Length'))
|
|
body = self.rfile.read(content_length)
|
|
api = self.path[1:]
|
|
if api == 'print':
|
|
self.update_printer()
|
|
self.printer.print(io.BytesIO(body))
|
|
self.api_success()
|
|
return
|
|
data = DictAsObject(json.loads(body))
|
|
if api == 'devices':
|
|
self.printer.connect(None)
|
|
devices_list = [{
|
|
'name': device.name,
|
|
'address': device.address
|
|
} for device in self.printer.scan(everything=data.get('everything'))]
|
|
self.api_success({
|
|
'devices': devices_list
|
|
})
|
|
return
|
|
if api == 'query':
|
|
self.load_config()
|
|
self.api_success(self.settings)
|
|
return
|
|
if api == 'set':
|
|
for key in data:
|
|
self.settings[key] = data[key]
|
|
self.save_config()
|
|
self.update_printer()
|
|
self.api_success()
|
|
return
|
|
if api == 'connect':
|
|
name, address = data['device'].split(',')
|
|
self.printer.connect(name, address)
|
|
self.api_success()
|
|
if api == 'exit':
|
|
self.api_success()
|
|
self.exit()
|
|
|
|
def exit(self):
|
|
'Stop correctly & cleanly'
|
|
self.save_config()
|
|
self.printer.unload()
|
|
sys.exit(0)
|
|
|
|
def do_POST(self):
|
|
'Called when server got a POST http request'
|
|
content_length = int(self.headers.get('Content-Length', -1))
|
|
if (content_length < -1 or
|
|
content_length > self.max_payload
|
|
):
|
|
return
|
|
if self.headers.get('Content-Type') == 'application/ipp':
|
|
if self.ipp is None:
|
|
self.ipp = IPP(self)
|
|
self.ipp.handle_ipp()
|
|
return
|
|
try:
|
|
self.handle_api()
|
|
return
|
|
except BleakDBusError as e:
|
|
# TODO: better error reporting
|
|
self.api_fail({
|
|
'name': e.dbus_error,
|
|
'details': e.dbus_error_details
|
|
})
|
|
except BleakError as e:
|
|
self.api_fail({
|
|
'name': 'BleakError',
|
|
'details': str(e)
|
|
})
|
|
except EOFError as e:
|
|
# mostly, device disconnected but not by this program
|
|
self.api_fail({
|
|
'name': 'EOFError',
|
|
'details': ''
|
|
})
|
|
except RuntimeError as e:
|
|
self.api_fail({
|
|
'name': 'RuntimeError',
|
|
'details': str(e)
|
|
})
|
|
except PrinterError as e:
|
|
self.api_fail({
|
|
'name': e.message,
|
|
'details': e.message_localized
|
|
})
|
|
except Exception as e:
|
|
self.api_fail({
|
|
'name': 'Exception',
|
|
'details': str(e)
|
|
})
|
|
raise
|
|
|
|
class PrinterServer(HTTPServer):
|
|
''' (local) server for Cat Printer Web Interface
|
|
The reason to override is to only init the handler once,
|
|
avoiding confliction, and stop cleanly
|
|
'''
|
|
|
|
handler_class = None
|
|
handler: PrinterServerHandler = None
|
|
|
|
def __init__(self, server_address, RequestHandlerClass):
|
|
self.handler_class = RequestHandlerClass
|
|
super().__init__(server_address, RequestHandlerClass)
|
|
|
|
def finish_request(self, request, client_address):
|
|
if self.handler is None:
|
|
self.handler = self.handler_class(request, client_address, self)
|
|
self.handler.load_config()
|
|
with open(os.path.join('www', 'all-scripts.txt'), 'r', encoding='utf-8') as file:
|
|
for path in file.read().split('\n'):
|
|
if path != '':
|
|
self.handler.all_script.append(os.path.join('www', path))
|
|
return
|
|
self.handler.__init__(request, client_address, self)
|
|
|
|
def server_close(self):
|
|
if self.handler is not None:
|
|
self.handler.exit()
|
|
super().server_close()
|
|
|
|
|
|
def serve():
|
|
'Start server'
|
|
address, port = '127.0.0.1', 8095
|
|
listen_all = False
|
|
if '-a' in sys.argv:
|
|
info(i18n('will-listen-on-all-addresses'))
|
|
listen_all = True
|
|
server = PrinterServer(('' if listen_all else address, port), PrinterServerHandler)
|
|
service_url = f'http://{address}:{port}/'
|
|
|
|
|
|
info(i18n('serving-at-0', service_url))
|
|
if '-s' not in sys.argv and not IsAndroid:
|
|
webbrowser.open(service_url)
|
|
# Request required bluetooth permissions (Android 12+)
|
|
if IsAndroid:
|
|
from android.permissions import request_permissions, Permission
|
|
try:
|
|
request_permissions([Permission.BLUETOOTH_SCAN, Permission.BLUETOOTH_CONNECT])
|
|
except Exception:
|
|
print('Exception on requesting Android Permissions. Continuing', file=sys.stderr)
|
|
pass
|
|
from android.app import Activity
|
|
from android.content import Intent
|
|
print(Intent.getIntent().getType())
|
|
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
server.server_close()
|
|
|
|
if __name__ == '__main__':
|
|
serve()
|