264 lines
10 KiB
Python
264 lines
10 KiB
Python
|
import os
|
||
|
import pexpect
|
||
|
import re
|
||
|
|
||
|
from abc import ABCMeta, abstractmethod
|
||
|
from array import array
|
||
|
from util import *
|
||
|
|
||
|
verbose = False
|
||
|
|
||
|
class NrfBleDfuController(object, metaclass=ABCMeta):
|
||
|
ctrlpt_handle = 0
|
||
|
ctrlpt_cccd_handle = 0
|
||
|
data_handle = 0
|
||
|
|
||
|
pkt_receipt_interval = 10
|
||
|
pkt_payload_size = 20
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Start the firmware update process
|
||
|
# --------------------------------------------------------------------------
|
||
|
@abstractmethod
|
||
|
def start(self):
|
||
|
pass
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Check if the peripheral is running in bootloader (DFU) or application mode
|
||
|
# Returns True if the peripheral is in DFU mode
|
||
|
# --------------------------------------------------------------------------
|
||
|
@abstractmethod
|
||
|
def check_DFU_mode(self):
|
||
|
pass
|
||
|
|
||
|
@abstractmethod
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Switch from application to bootloader (DFU)
|
||
|
# --------------------------------------------------------------------------
|
||
|
def switch_to_dfu_mode(self):
|
||
|
pass
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Parse notification status results
|
||
|
# --------------------------------------------------------------------------
|
||
|
@abstractmethod
|
||
|
def _dfu_parse_notify(self, notify):
|
||
|
pass
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Wait for a notification and parse the response
|
||
|
# --------------------------------------------------------------------------
|
||
|
@abstractmethod
|
||
|
def _wait_and_parse_notify(self):
|
||
|
pass
|
||
|
|
||
|
def __init__(self, target_mac, firmware_path, datfile_path):
|
||
|
self.target_mac = target_mac
|
||
|
|
||
|
self.firmware_path = firmware_path
|
||
|
self.datfile_path = datfile_path
|
||
|
|
||
|
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % target_mac)
|
||
|
self.ble_conn.delaybeforesend = 0
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Start the firmware update process
|
||
|
# --------------------------------------------------------------------------
|
||
|
def start(self):
|
||
|
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
|
||
|
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
|
||
|
|
||
|
if verbose:
|
||
|
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
|
||
|
print('Packet handle: 0x%04x' % (self.data_handle))
|
||
|
|
||
|
# Subscribe to notifications from Control Point characteristic
|
||
|
self._enable_notifications(self.ctrlpt_cccd_handle)
|
||
|
|
||
|
# Set the Packet Receipt Notification interval
|
||
|
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
|
||
|
self._dfu_send_command(Procedures.SET_PRN, prn)
|
||
|
|
||
|
self._dfu_send_init()
|
||
|
|
||
|
self._dfu_send_image()
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Initialize:
|
||
|
# Hex: read and convert hexfile into bin_array
|
||
|
# Bin: read binfile into bin_array
|
||
|
# --------------------------------------------------------------------------
|
||
|
def input_setup(self):
|
||
|
print("Sending file " + os.path.split(self.firmware_path)[1] + " to " + self.target_mac)
|
||
|
|
||
|
if self.firmware_path == None:
|
||
|
raise Exception("input invalid")
|
||
|
|
||
|
name, extent = os.path.splitext(self.firmware_path)
|
||
|
|
||
|
if extent == ".bin":
|
||
|
self.bin_array = array('B', open(self.firmware_path, 'rb').read())
|
||
|
|
||
|
self.image_size = len(self.bin_array)
|
||
|
print("Binary imge size: %d" % self.image_size)
|
||
|
print("Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)))
|
||
|
|
||
|
return
|
||
|
|
||
|
if extent == ".hex":
|
||
|
intelhex = IntelHex(self.firmware_path)
|
||
|
self.bin_array = intelhex.tobinarray()
|
||
|
self.image_size = len(self.bin_array)
|
||
|
print("bin array size: ", self.image_size)
|
||
|
return
|
||
|
|
||
|
raise Exception("input invalid")
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Perform a scan and connect via gatttool.
|
||
|
# Will return True if a connection was established, False otherwise
|
||
|
# --------------------------------------------------------------------------
|
||
|
def scan_and_connect(self, timeout=2):
|
||
|
if verbose: print("scan_and_connect")
|
||
|
|
||
|
print("Connecting to %s" % (self.target_mac))
|
||
|
|
||
|
try:
|
||
|
self.ble_conn.expect('\[LE\]>', timeout=timeout)
|
||
|
except pexpect.TIMEOUT as e:
|
||
|
return False
|
||
|
|
||
|
self.ble_conn.sendline('connect')
|
||
|
|
||
|
try:
|
||
|
res = self.ble_conn.expect('.*Connection successful.*', timeout=timeout)
|
||
|
except pexpect.TIMEOUT as e:
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Disconnect from the peripheral and close the gatttool connection
|
||
|
# --------------------------------------------------------------------------
|
||
|
def disconnect(self):
|
||
|
self.ble_conn.sendline('exit')
|
||
|
self.ble_conn.close()
|
||
|
|
||
|
def target_mac_increase(self, inc):
|
||
|
self.target_mac = uint_to_mac_string(mac_string_to_uint(self.target_mac) + inc)
|
||
|
|
||
|
# Re-start gatttool with the new address
|
||
|
self.disconnect()
|
||
|
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % self.target_mac)
|
||
|
self.ble_conn.delaybeforesend = 0
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Fetch handles for a given UUID.
|
||
|
# Will return a three-tuple: (char handle, value handle, CCCD handle)
|
||
|
# Will raise an exception if the UUID is not found
|
||
|
# --------------------------------------------------------------------------
|
||
|
def _get_handles(self, uuid):
|
||
|
self.ble_conn.before = ""
|
||
|
self.ble_conn.sendline('characteristics')
|
||
|
|
||
|
try:
|
||
|
self.ble_conn.expect([uuid], timeout=2)
|
||
|
handles = re.findall(b'.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before)
|
||
|
(handle, value_handle) = handles[-1]
|
||
|
except pexpect.TIMEOUT as e:
|
||
|
raise Exception("UUID not found: {}".format(uuid))
|
||
|
|
||
|
return (int(handle, 16), int(value_handle, 16), int(value_handle, 16)+1)
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Wait for notification to arrive.
|
||
|
# Example format: "Notification handle = 0x0019 value: 10 01 01"
|
||
|
# --------------------------------------------------------------------------
|
||
|
def _dfu_wait_for_notify(self):
|
||
|
while True:
|
||
|
if verbose: print("dfu_wait_for_notify")
|
||
|
|
||
|
if not self.ble_conn.isalive():
|
||
|
print("connection not alive")
|
||
|
return None
|
||
|
|
||
|
try:
|
||
|
index = self.ble_conn.expect('Notification handle = .*? \r\n', timeout=30)
|
||
|
|
||
|
except pexpect.TIMEOUT:
|
||
|
#
|
||
|
# The gatttool does not report link-lost directly.
|
||
|
# The only way found to detect it is monitoring the prompt '[CON]'
|
||
|
# and if it goes to '[ ]' this indicates the connection has
|
||
|
# been broken.
|
||
|
# In order to get a updated prompt string, issue an empty
|
||
|
# sendline(''). If it contains the '[ ]' string, then
|
||
|
# raise an exception. Otherwise, if not a link-lost condition,
|
||
|
# continue to wait.
|
||
|
#
|
||
|
self.ble_conn.sendline('')
|
||
|
string = self.ble_conn.before
|
||
|
if '[ ]' in string:
|
||
|
print('Connection lost! ')
|
||
|
raise Exception('Connection Lost')
|
||
|
return None
|
||
|
|
||
|
if index == 0:
|
||
|
after = self.ble_conn.after
|
||
|
hxstr = after.split()[3:]
|
||
|
handle = int(float.fromhex(hxstr[0].decode('UTF-8')))
|
||
|
return hxstr[2:]
|
||
|
|
||
|
else:
|
||
|
print("unexpeced index: {0}".format(index))
|
||
|
return None
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Send a procedure + any parameters required
|
||
|
# --------------------------------------------------------------------------
|
||
|
def _dfu_send_command(self, procedure, params=[]):
|
||
|
if verbose: print('_dfu_send_command')
|
||
|
|
||
|
cmd = 'char-write-req 0x%04x %02x' % (self.ctrlpt_handle, procedure)
|
||
|
cmd += array_to_hex_string(params)
|
||
|
|
||
|
if verbose: print(cmd)
|
||
|
|
||
|
self.ble_conn.sendline(cmd)
|
||
|
|
||
|
# Verify that command was successfully written
|
||
|
try:
|
||
|
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
|
||
|
except pexpect.TIMEOUT as e:
|
||
|
print("State timeout")
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Send an array of bytes
|
||
|
# --------------------------------------------------------------------------
|
||
|
def _dfu_send_data(self, data):
|
||
|
cmd = 'char-write-cmd 0x%04x' % (self.data_handle)
|
||
|
cmd += ' '
|
||
|
cmd += array_to_hex_string(data)
|
||
|
|
||
|
if verbose: print(cmd)
|
||
|
|
||
|
self.ble_conn.sendline(cmd)
|
||
|
|
||
|
# --------------------------------------------------------------------------
|
||
|
# Enable notifications from the Control Point Handle
|
||
|
# --------------------------------------------------------------------------
|
||
|
def _enable_notifications(self, cccd_handle):
|
||
|
if verbose: print('_enable_notifications')
|
||
|
|
||
|
cmd = 'char-write-req 0x%04x %s' % (cccd_handle, '0100')
|
||
|
|
||
|
if verbose: print(cmd)
|
||
|
|
||
|
self.ble_conn.sendline(cmd)
|
||
|
|
||
|
# Verify that command was successfully written
|
||
|
try:
|
||
|
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
|
||
|
except pexpect.TIMEOUT as e:
|
||
|
print("State timeout")
|