1
0
mirror of https://github.com/peterantypas/maiana.git synced 2025-05-28 05:10:40 -07:00

Fully functional but needs a lot of hardening

This commit is contained in:
Peter Antypas 2022-01-30 12:44:14 -08:00
parent fd00109ca4
commit e536b5b597
3 changed files with 153 additions and 4 deletions

View File

@ -0,0 +1,124 @@
from threading import Thread
import wx
import os
import binascii
import sys
from enum import Enum
EVT_FWUPDATE_ID = wx.NewId()
class EventType(Enum):
DFU_ENABLED = 1
TRANSFER_STARTED = 2
TRANSFER_PROGRESS = 3
TRANSFER_COMPLETE = 4
ERROR = 5
class FwUpdateEvent(wx.PyEvent):
def __init__(self, eventType, data):
wx.PyEvent.__init__(self)
self.SetEventType(EVT_FWUPDATE_ID)
self.eventType = eventType
self.data = data
class FWUpdateThread(Thread):
def __init__(self, window, port, filename):
Thread.__init__(self)
self.daemon = True
self.window = window
self.port = port
self.filesize = 0
self.crc32 = 0
try:
self.file = open(filename, "rb")
except:
print("Unable to open file {0}".format(filename))
sys.exit(2)
self.filesize = os.stat(filename).st_size
# print filesize
self.data = self.file.read()
self.crc32 = (binascii.crc32(self.data) & 0xFFFFFFFF)
self.file.seek(0)
def is_unit_running(self):
print("Checking if unit is running")
s = self.port.readline().strip().decode('utf-8')
if len(s) > 0:
# print s
tokens = s.split(',')
if len(tokens) > 3:
return True
return False
def drain_port(self):
self.port.flushInput()
s = self.port.readline()
while len(s) > 0:
s = self.port.readline()
def enable_dfu(self):
if self.is_unit_running():
print("Unit is running, switching to DFU mode")
self.port.write(b'dfu\r\n')
self.drain_port()
for x in range(5):
self.port.write(b'\r\n')
s = self.port.readline().strip()
if s.find(b"MAIANA bootloader") >= 0:
wx.PostEvent(self.window, FwUpdateEvent(EventType.DFU_ENABLED, None))
return True
wx.PostEvent(self.window, FwUpdateEvent(EventType.ERROR, b'Could not enter DFU mode'))
return False
def begin_transfer(self):
self.drain_port()
command = "load {0} {1:x}\r\n".format(self.filesize, self.crc32).encode('utf-8')
self.port.write(command)
print(command)
s = self.port.readline().strip()
if s == b"READY":
wx.PostEvent(self.window, FwUpdateEvent(EventType.TRANSFER_STARTED, None))
return True
wx.PostEvent(self.window, FwUpdateEvent(EventType.ERROR, 'Failed:{}'.format(s)))
return False
def run(self):
if not self.enable_dfu():
print("Could not get unit into DFU mode")
return
print("Unit is in DFU mode")
if not self.begin_transfer():
print("Unable to begin transfer, restart the unit and retry")
return
print("Starting transfer")
bytes = self.file.read(2048)
resp = ''
bytesSent = 0
while len(bytes) > 0:
self.port.write(bytes)
resp = self.port.readline().strip()
if resp.find(b"OK") < 0:
break
bytesSent += len(bytes)
progress = bytesSent/self.filesize
wx.PostEvent(self.window, FwUpdateEvent(EventType.TRANSFER_PROGRESS, progress))
bytes = self.file.read(2048)
wx.PostEvent(self.window, FwUpdateEvent(EventType.TRANSFER_COMPLETE, None))

View File

@ -222,8 +222,12 @@ class MainFrame ( wx.Frame ):
bSizer41.Add( gSizer5, 1, wx.EXPAND, 5 )
self.m_FWUpdateStatusLbl = wx.StaticText( self.m_FWUpdatePnl, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.DefaultSize, 0 )
self.m_FWUpdateStatusLbl.Wrap( -1 )
bSizer41.Add( ( 0, 0), 1, wx.EXPAND, 5 )
self.m_FWUpdateStatusLbl.SetMinSize( wx.Size( 520,-1 ) )
bSizer41.Add( self.m_FWUpdateStatusLbl, 0, wx.ALL, 5 )
self.m_FWUpdatePnl.SetSizer( bSizer41 )

View File

@ -2,11 +2,13 @@ from mainframe import MainFrame
from model import MaianaClient
import serial
import wx
from fwUpdateThread import *
class MainWindow(MainFrame):
def __init__(self):
MainFrame.__init__(self, None)
self.m_SerialPortChoice.Set(MaianaClient.serial_ports())
self.portname = None
self.port = None
self.data = None
@ -16,7 +18,8 @@ class MainWindow(MainFrame):
def onSerialBtnClick(self, event):
if self.port is None:
try:
self.port = serial.Serial(self.m_SerialPortChoice.GetString(self.m_SerialPortChoice.GetSelection()), 38400, timeout=1)
self.portname = self.m_SerialPortChoice.GetString(self.m_SerialPortChoice.GetSelection())
self.port = serial.Serial(self.portname, 38400, timeout=1)
except:
wx.MessageBox(b'Unable to open port, it may be in use', 'Error', wx.OK | wx.ICON_ERROR)
return
@ -55,10 +58,28 @@ class MainWindow(MainFrame):
def onFWBinarySelection(self, event):
event.Skip()
self.m_FWUpdateBtn.Enable()
def onFWUpdateBtnClick(self, event):
event.Skip()
self.Connect(-1, -1, EVT_FWUPDATE_ID, self.onFwUpdateEvent)
thr = FWUpdateThread(self, self.port, self.m_FWBinaryPicker.Path)
thr.start()
self.m_FWUpdateBtn.Disable()
def onFwUpdateEvent(self, evt):
print(evt.eventType, evt.data)
if evt.eventType == EventType.DFU_ENABLED:
self.m_FWUpdateStatusLbl.SetLabel("DFU enabled")
elif evt.eventType == EventType.TRANSFER_STARTED:
self.m_FWUpdateStatusLbl.SetLabel("Transferring")
elif evt.eventType == EventType.TRANSFER_PROGRESS:
self.m_FWProgress.SetValue(evt.data * 100)
elif evt.eventType == EventType.TRANSFER_COMPLETE:
self.m_FWProgress.SetValue(0)
self.m_FWUpdateStatusLbl.SetLabel("Transfer completed")
self.refreshSys()
elif evt.eventType == EventType.ERROR:
self.m_FWUpdateStatusLbl.SetLabel(evt.data)
def enableUI(self):
self.m_StationPnl.Enable()