InfiniTime/bootloader/ota-dfu-python/ble_legacy_dfu_controller.py

292 lines
11 KiB
Python
Raw Permalink Normal View History

import math
import pexpect
import time
from array import array
from util import *
from nrf_ble_dfu_controller import NrfBleDfuController
verbose = False
class Procedures:
START_DFU = 1
INITIALIZE_DFU = 2
RECEIVE_FIRMWARE_IMAGE = 3
VALIDATE_FIRMWARE = 4
ACTIVATE_IMAGE_AND_RESET = 5
RESET_SYSTEM = 6
REPORT_RECEIVED_IMAGE_SIZE = 7
PRN_REQUEST = 8
RESPONSE = 16
PACKET_RECEIPT_NOTIFICATION = 17
string_map = {
START_DFU : "START_DFU",
INITIALIZE_DFU : "INITIALIZE_DFU",
RECEIVE_FIRMWARE_IMAGE : "RECEIVE_FIRMWARE_IMAGE",
VALIDATE_FIRMWARE : "VALIDATE_FIRMWARE",
ACTIVATE_IMAGE_AND_RESET : "ACTIVATE_IMAGE_AND_RESET",
RESET_SYSTEM : "RESET_SYSTEM",
REPORT_RECEIVED_IMAGE_SIZE : "REPORT_RECEIVED_IMAGE_SIZE",
PRN_REQUEST : "PACKET_RECEIPT_NOTIFICATION_REQUEST",
RESPONSE : "RESPONSE",
PACKET_RECEIPT_NOTIFICATION : "PACKET_RECEIPT_NOTIFICATION",
}
@staticmethod
def to_string(proc):
return Procedures.string_map[proc]
@staticmethod
def from_string(proc_str):
return int(proc_str, 16)
class Responses:
SUCCESS = 1
INVALID_STATE = 2
NOT_SUPPORTED = 3
DATA_SIZE_EXCEEDS_LIMITS = 4
CRC_ERROR = 5
OPERATION_FAILED = 6
string_map = {
SUCCESS : "SUCCESS",
INVALID_STATE : "INVALID_STATE",
NOT_SUPPORTED : "NOT_SUPPORTED",
DATA_SIZE_EXCEEDS_LIMITS : "DATA_SIZE_EXCEEDS_LIMITS",
CRC_ERROR : "CRC_ERROR",
OPERATION_FAILED : "OPERATION_FAILED",
}
@staticmethod
def to_string(res):
return Responses.string_map[res]
@staticmethod
def from_string(res_str):
return int(res_str, 16)
class BleDfuControllerLegacy(NrfBleDfuController):
# Class constants
UUID_CONTROL_POINT = "00001531-1212-efde-1523-785feabcd123"
UUID_PACKET = "00001532-1212-efde-1523-785feabcd123"
UUID_VERSION = "00001534-1212-efde-1523-785feabcd123"
# Constructor inherited from abstract base class
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
def start(self, verbose=False):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
self.pkt_receipt_interval = 10
if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
# Subscribe to notifications from Control Point characteristic
if verbose: print("Enabling notifications")
self._enable_notifications(self.ctrlpt_cccd_handle)
# Send 'START DFU' + Application Command
if verbose: print("Sending START_DFU")
self._dfu_send_command(Procedures.START_DFU, [0x04])
# Transmit binary image size
# Need to pad the byte array with eight zero bytes
# (because that's what the bootloader is expecting...)
hex_size_array_lsb = uint32_to_bytes_le(len(self.bin_array))
zero_pad_array_le(hex_size_array_lsb, 8)
self._dfu_send_data(hex_size_array_lsb)
# Wait for response to Image Size
print("Waiting for Image Size notification")
self._wait_and_parse_notify()
# Send 'INIT DFU' + Init Packet Command
self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x00])
# Transmit the Init image (DAT).
self._dfu_send_init()
# Send 'INIT DFU' + Init Packet Complete Command
self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x01])
print("Waiting for INIT DFU notification")
# Wait for INIT DFU notification (indicates flash erase completed)
self._wait_and_parse_notify()
# Set the Packet Receipt Notification interval
if verbose: print("Setting pkt receipt notification interval")
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
self._dfu_send_command(Procedures.PRN_REQUEST, prn)
# Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state.
self._dfu_send_command(Procedures.RECEIVE_FIRMWARE_IMAGE)
# Send bin_array contents as as series of packets (burst mode).
# Each segment is pkt_payload_size bytes long.
# For every pkt_receipt_interval sends, wait for notification.
segment_count = 0
segment_total = int(math.ceil(self.image_size/float(self.pkt_payload_size)))
time_start = time.time()
last_send_time = time.time()
print("Begin DFU")
for i in range(0, self.image_size, self.pkt_payload_size):
segment = self.bin_array[i:i + self.pkt_payload_size]
self._dfu_send_data(segment)
segment_count += 1
# print "segment #{} of {}, dt = {}".format(segment_count, segment_total, time.time() - last_send_time)
# last_send_time = time.time()
if (segment_count == segment_total):
print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
duration = time.time() - time_start
print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
if verbose: print("segments sent: {}".format(segment_count))
print("Waiting for DFU complete notification")
# Wait for DFU complete notification
self._wait_and_parse_notify()
elif (segment_count % self.pkt_receipt_interval) == 0:
(proc, res, pkts) = self._wait_and_parse_notify()
# TODO: Check pkts == segment_count * pkt_payload_size
if res != Responses.SUCCESS:
raise Exception("bad notification status: {}".format(Responses.to_string(res)))
print_progress(pkts, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
# Send Validate Command
self._dfu_send_command(Procedures.VALIDATE_FIRMWARE)
print("Waiting for Firmware Validation notification")
# Wait for Firmware Validation notification
self._wait_and_parse_notify()
# Wait a bit for copy on the peer to be finished
time.sleep(1)
# Send Activate and Reset Command
print("Activate and reset")
self._dfu_send_command(Procedures.ACTIVATE_IMAGE_AND_RESET)
# --------------------------------------------------------------------------
# Check if the peripheral is running in bootloader (DFU) or application mode
# Returns True if the peripheral is in DFU mode
# --------------------------------------------------------------------------
def check_DFU_mode(self):
if verbose: print("Checking DFU State...")
cmd = 'char-read-uuid %s' % self.UUID_VERSION
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Skip two rows
try:
res = self.ble_conn.expect('handle:.*', timeout=10)
# res = self.ble_conn.expect('handle:', timeout=10)
except pexpect.TIMEOUT as e:
print("State timeout")
except:
pass
return self.ble_conn.after.find(b'value: 08 00')!=-1
def switch_to_dfu_mode(self):
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
# Enable notifications
cmd = 'char-write-req 0x%02x %02x' % (bl_cccd_handle, 1)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Reset the board in DFU mode. After reset the board will be disconnected
cmd = 'char-write-req 0x%02x 0104' % (bl_value_handle)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
time.sleep(0.5)
#print "Send 'START DFU' + Application Command"
#self._dfu_state_set(0x0104)
# Reconnect the board.
#ret = self.scan_and_connect()
#if verbose: print("Connected " + str(ret))
#return ret
return 1
# --------------------------------------------------------------------------
# Parse notification status results
# --------------------------------------------------------------------------
def _dfu_parse_notify(self, notify):
if len(notify) < 3:
print("notify data length error")
return None
if verbose: print(notify)
dfu_notify_opcode = Procedures.from_string(notify[0])
if dfu_notify_opcode == Procedures.RESPONSE:
dfu_procedure = Procedures.from_string(notify[1])
dfu_response = Responses.from_string(notify[2])
procedure_str = Procedures.to_string(dfu_procedure)
response_str = Responses.to_string(dfu_response)
if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, response_str))
return (dfu_procedure, dfu_response)
if dfu_notify_opcode == Procedures.PACKET_RECEIPT_NOTIFICATION:
receipt = bytes_to_uint32_le(notify[1:5])
return (dfu_notify_opcode, Responses.SUCCESS, receipt)
# --------------------------------------------------------------------------
# Wait for a notification and parse the response
# --------------------------------------------------------------------------
def _wait_and_parse_notify(self):
if verbose: print("Waiting for notification")
notify = self._dfu_wait_for_notify()
if notify is None:
raise Exception("No notification received")
if verbose: print("Parsing notification")
result = self._dfu_parse_notify(notify)
if result[1] != Responses.SUCCESS:
raise Exception("Error in {} procedure, reason: {}".format(
Procedures.to_string(result[0]),
Responses.to_string(result[1])))
return result
#--------------------------------------------------------------------------
# Send the Init info (*.dat file contents) to peripheral device.
#--------------------------------------------------------------------------
def _dfu_send_init(self):
if verbose: print("dfu_send_init")
# Open the DAT file and create array of its contents
init_bin_array = array('B', open(self.datfile_path, 'rb').read())
# Transmit Init info
self._dfu_send_data(init_bin_array)