Add python script to DFU from a linux PC to the Pinetime

This commit is contained in:
JF 2020-06-01 15:21:58 +02:00
parent dca559aad5
commit b41a856b9d
10 changed files with 1509 additions and 2 deletions

View file

@ -41,10 +41,10 @@ Pack the image into a .zip file for the NRF DFU protocol:
adafruit-nrfutil dfu genpkg --dev-type 0x0052 --application image.bin dfu.zip adafruit-nrfutil dfu genpkg --dev-type 0x0052 --application image.bin dfu.zip
` `
Use NRFConnect or dfu.py to upload the zip file to the device: Use NRFConnect or dfu.py (in <project root>/bootloader/ota-dfu-python) to upload the zip file to the device:
` `
sudo dfu.py -z /home/jf/nrf52/bootloader/dfu.zip -a <pinetime MAC address> --legacy sudo dfu.py -z /home/jf/nrf52/bootloader/dfu.zip -a <pinetime MAC address> --legacy
` `
**TODO** : dfu.py **Note** : dfu.py is a slightly modified version of [this repo](https://github.com/daniel-thompson/ota-dfu-python).

View file

@ -0,0 +1 @@
This directory contains source forked from https://github.com/daniel-thompson/ota-dfu-python.

View file

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -0,0 +1,118 @@
# Python nRF5 OTA DFU Controller
So... this is my fork of dingara's fork of astronomer80's fork of
foldedtoad's Python OTA DFU utility.
My own contribution is little more than a brute force conversion to
python3. It is sparsely tested so there are likely to be a few
remaining bytes versus string bugs remaining in the places I didn't test
. I used it primarily as part of
[wasp-os](https://github.com/daniel-thompson/wasp-os) as a way to
deliver OTA updates to nRF52-based smart watches, especially the
[Pine64 PineTime](https://www.pine64.org/pinetime/).
## What does it do?
This is a Python program that uses `gatttool` (provided with the Linux BlueZ driver) to achieve Over The Air (OTA) Device Firmware Updates (DFU) to a Nordic Semiconductor nRF5 (either nRF51 or nRF52) device via Bluetooth Low Energy (BLE).
### Main features:
* Perform OTA DFU to an nRF5 peripheral without an external USB BLE dongle.
* Ability to detect if the peripheral is running in application mode or bootloader, and automatically switch if needed (buttonless).
* Support for both Legacy (SDK <= 11) and Secure (SDK >= 12) bootloader.
Before using this utility the nRF5 peripheral device needs to be programmed with a DFU bootloader (see Nordic Semiconductor documentation/examples for instructions on that).
## Prerequisites
* BlueZ 5.4 or above
* Python 3.6
* Python `pexpect` module (available via pip)
* Python `intelhex` module (available via pip)
## Firmware Build Requirement
* Your nRF5 peripheral firmware build method will produce a firmware file ending with either `*.hex` or `*.bin`.
* Your nRF5 firmware build method will produce an Init file ending with `.dat`.
* The typical naming convention is `application.bin` and `application.dat`, but this utility will accept other names.
## Generating init files
### Legacy bootloader
Use the `gen_dat` application (you need to compile it with `gcc gen_dat.c -o gen_dat` on first run) to generate a `.dat` file from your `.bin` file. Example:
./gen_dat application.bin application.dat
Note: The `gen_dat` utility expects a `.bin` file input, so you'll get Cyclic Redundancy Check (CRC) errors during DFU using a `.dat` file generated from a `.hex` file.
An alternative is to use `nrfutil` from Nordic Semiconductor, but I've found this method to be easier. You may need to edit the `gen_dat` source to fit your specific application.
### Secure bootloader
You need to use `nrfutil` to generate firmware packages for the new secure bootloader (SDK > 12) as the package needs to be signed with a private/public key pair. Note that the bootloader will need to be programmed with the corresponding public key. See the [nrfutil repo](https://github.com/NordicSemiconductor/pc-nrfutil) for details.
Note: I've had problems with the pip version of `nrfutil`. I recommend [installing from source](https://github.com/NordicSemiconductor/pc-nrfutil#running-and-installing-from-source) instead.
## Usage
There are two ways to specify firmware files for this utility. Either by specifying both the `.hex` or `.bin` file with the `.dat` file, or more easily by the `.zip` file, which contains both the hex and dat files.
The new `.zip` file form is encouraged by Nordic, but the older hex/bin + dat file methods should still work.
## Usage Examples
> sudo ./dfu.py -f ~/application.hex -d ~/application.dat -a CD:E3:4A:47:1C:E4
or:
> sudo ./dfu.py -z ~/application.zip -a CD:E3:4A:47:1C:E4
You can use the `hcitool lescan` to figure out the address of a DFU target, for example:
$ sudo hcitool -i hci0 lescan
LE Scan ...
CD:E3:4A:47:1C:E4 <TARGET_NAME>
CD:E3:4A:47:1C:E4 (unknown)
## Example Output
================================
== ==
== DFU Server ==
== ==
================================
Sending file application.bin to CD:E3:4A:47:1C:E4
bin array size: 60788
Checking DFU State...
Board needs to switch in DFU mode
Switching to DFU mode
Enable Notifications in DFU mode
Sending hex file size
Waiting for Image Size notification
Waiting for INIT DFU notification
Begin DFU
Progress: |xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx| 100.0% Complete (60788 of 60788 bytes)
Upload complete in 0 minutes and 14 seconds
segments sent: 3040
Waiting for DFU complete notification
Waiting for Firmware Validation notification
Activate and reset
DFU Server done
## TODO:
* Implement link-loss procedure for Legacy Controller.
* Update example output in readme.
* Add makefile examples.
* More code cleanup.
## Info & References
* [Nordic Legacy DFU Service](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v11.0.0/bledfu_transport_bleservice.html?cp=4_0_3_4_3_1_4_1)
* [Nordic Legacy DFU sequence diagrams](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v11.0.0/bledfu_transport_bleprofile.html?cp=4_0_3_4_3_1_4_0_1_6#ota_profile_pkt_rcpt_notif)
* [Nordic Secure DFU bootloader](http://infocenter.nordicsemi.com/topic/com.nordic.infocenter.sdk5.v12.2.0/lib_dfu_transport_ble.html?cp=4_0_1_3_5_2_2)
* [nrfutil](https://github.com/NordicSemiconductor/pc-nrfutil)

View file

@ -0,0 +1,291 @@
import math
import pexpect
import time
from array import array
from util import *
from nrf_ble_dfu_controller import NrfBleDfuController
verbose = False
class Procedures:
START_DFU = 1
INITIALIZE_DFU = 2
RECEIVE_FIRMWARE_IMAGE = 3
VALIDATE_FIRMWARE = 4
ACTIVATE_IMAGE_AND_RESET = 5
RESET_SYSTEM = 6
REPORT_RECEIVED_IMAGE_SIZE = 7
PRN_REQUEST = 8
RESPONSE = 16
PACKET_RECEIPT_NOTIFICATION = 17
string_map = {
START_DFU : "START_DFU",
INITIALIZE_DFU : "INITIALIZE_DFU",
RECEIVE_FIRMWARE_IMAGE : "RECEIVE_FIRMWARE_IMAGE",
VALIDATE_FIRMWARE : "VALIDATE_FIRMWARE",
ACTIVATE_IMAGE_AND_RESET : "ACTIVATE_IMAGE_AND_RESET",
RESET_SYSTEM : "RESET_SYSTEM",
REPORT_RECEIVED_IMAGE_SIZE : "REPORT_RECEIVED_IMAGE_SIZE",
PRN_REQUEST : "PACKET_RECEIPT_NOTIFICATION_REQUEST",
RESPONSE : "RESPONSE",
PACKET_RECEIPT_NOTIFICATION : "PACKET_RECEIPT_NOTIFICATION",
}
@staticmethod
def to_string(proc):
return Procedures.string_map[proc]
@staticmethod
def from_string(proc_str):
return int(proc_str, 16)
class Responses:
SUCCESS = 1
INVALID_STATE = 2
NOT_SUPPORTED = 3
DATA_SIZE_EXCEEDS_LIMITS = 4
CRC_ERROR = 5
OPERATION_FAILED = 6
string_map = {
SUCCESS : "SUCCESS",
INVALID_STATE : "INVALID_STATE",
NOT_SUPPORTED : "NOT_SUPPORTED",
DATA_SIZE_EXCEEDS_LIMITS : "DATA_SIZE_EXCEEDS_LIMITS",
CRC_ERROR : "CRC_ERROR",
OPERATION_FAILED : "OPERATION_FAILED",
}
@staticmethod
def to_string(res):
return Responses.string_map[res]
@staticmethod
def from_string(res_str):
return int(res_str, 16)
class BleDfuControllerLegacy(NrfBleDfuController):
# Class constants
UUID_CONTROL_POINT = "00001531-1212-efde-1523-785feabcd123"
UUID_PACKET = "00001532-1212-efde-1523-785feabcd123"
UUID_VERSION = "00001534-1212-efde-1523-785feabcd123"
# Constructor inherited from abstract base class
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
def start(self, verbose=False):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
self.pkt_receipt_interval = 10
if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
# Subscribe to notifications from Control Point characteristic
if verbose: print("Enabling notifications")
self._enable_notifications(self.ctrlpt_cccd_handle)
# Send 'START DFU' + Application Command
if verbose: print("Sending START_DFU")
self._dfu_send_command(Procedures.START_DFU, [0x04])
# Transmit binary image size
# Need to pad the byte array with eight zero bytes
# (because that's what the bootloader is expecting...)
hex_size_array_lsb = uint32_to_bytes_le(len(self.bin_array))
zero_pad_array_le(hex_size_array_lsb, 8)
self._dfu_send_data(hex_size_array_lsb)
# Wait for response to Image Size
print("Waiting for Image Size notification")
self._wait_and_parse_notify()
# Send 'INIT DFU' + Init Packet Command
self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x00])
# Transmit the Init image (DAT).
self._dfu_send_init()
# Send 'INIT DFU' + Init Packet Complete Command
self._dfu_send_command(Procedures.INITIALIZE_DFU, [0x01])
print("Waiting for INIT DFU notification")
# Wait for INIT DFU notification (indicates flash erase completed)
self._wait_and_parse_notify()
# Set the Packet Receipt Notification interval
if verbose: print("Setting pkt receipt notification interval")
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
self._dfu_send_command(Procedures.PRN_REQUEST, prn)
# Send 'RECEIVE FIRMWARE IMAGE' command to set DFU in firmware receive state.
self._dfu_send_command(Procedures.RECEIVE_FIRMWARE_IMAGE)
# Send bin_array contents as as series of packets (burst mode).
# Each segment is pkt_payload_size bytes long.
# For every pkt_receipt_interval sends, wait for notification.
segment_count = 0
segment_total = int(math.ceil(self.image_size/float(self.pkt_payload_size)))
time_start = time.time()
last_send_time = time.time()
print("Begin DFU")
for i in range(0, self.image_size, self.pkt_payload_size):
segment = self.bin_array[i:i + self.pkt_payload_size]
self._dfu_send_data(segment)
segment_count += 1
# print "segment #{} of {}, dt = {}".format(segment_count, segment_total, time.time() - last_send_time)
# last_send_time = time.time()
if (segment_count == segment_total):
print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
duration = time.time() - time_start
print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
if verbose: print("segments sent: {}".format(segment_count))
print("Waiting for DFU complete notification")
# Wait for DFU complete notification
self._wait_and_parse_notify()
elif (segment_count % self.pkt_receipt_interval) == 0:
(proc, res, pkts) = self._wait_and_parse_notify()
# TODO: Check pkts == segment_count * pkt_payload_size
if res != Responses.SUCCESS:
raise Exception("bad notification status: {}".format(Responses.to_string(res)))
print_progress(pkts, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
# Send Validate Command
self._dfu_send_command(Procedures.VALIDATE_FIRMWARE)
print("Waiting for Firmware Validation notification")
# Wait for Firmware Validation notification
self._wait_and_parse_notify()
# Wait a bit for copy on the peer to be finished
time.sleep(1)
# Send Activate and Reset Command
print("Activate and reset")
self._dfu_send_command(Procedures.ACTIVATE_IMAGE_AND_RESET)
# --------------------------------------------------------------------------
# Check if the peripheral is running in bootloader (DFU) or application mode
# Returns True if the peripheral is in DFU mode
# --------------------------------------------------------------------------
def check_DFU_mode(self):
if verbose: print("Checking DFU State...")
cmd = 'char-read-uuid %s' % self.UUID_VERSION
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Skip two rows
try:
res = self.ble_conn.expect('handle:.*', timeout=10)
# res = self.ble_conn.expect('handle:', timeout=10)
except pexpect.TIMEOUT as e:
print("State timeout")
except:
pass
return self.ble_conn.after.find(b'value: 08 00')!=-1
def switch_to_dfu_mode(self):
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
# Enable notifications
cmd = 'char-write-req 0x%02x %02x' % (bl_cccd_handle, 1)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Reset the board in DFU mode. After reset the board will be disconnected
cmd = 'char-write-req 0x%02x 0104' % (bl_value_handle)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
time.sleep(0.5)
#print "Send 'START DFU' + Application Command"
#self._dfu_state_set(0x0104)
# Reconnect the board.
#ret = self.scan_and_connect()
#if verbose: print("Connected " + str(ret))
#return ret
return 1
# --------------------------------------------------------------------------
# Parse notification status results
# --------------------------------------------------------------------------
def _dfu_parse_notify(self, notify):
if len(notify) < 3:
print("notify data length error")
return None
if verbose: print(notify)
dfu_notify_opcode = Procedures.from_string(notify[0])
if dfu_notify_opcode == Procedures.RESPONSE:
dfu_procedure = Procedures.from_string(notify[1])
dfu_response = Responses.from_string(notify[2])
procedure_str = Procedures.to_string(dfu_procedure)
response_str = Responses.to_string(dfu_response)
if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, response_str))
return (dfu_procedure, dfu_response)
if dfu_notify_opcode == Procedures.PACKET_RECEIPT_NOTIFICATION:
receipt = bytes_to_uint32_le(notify[1:5])
return (dfu_notify_opcode, Responses.SUCCESS, receipt)
# --------------------------------------------------------------------------
# Wait for a notification and parse the response
# --------------------------------------------------------------------------
def _wait_and_parse_notify(self):
if verbose: print("Waiting for notification")
notify = self._dfu_wait_for_notify()
if notify is None:
raise Exception("No notification received")
if verbose: print("Parsing notification")
result = self._dfu_parse_notify(notify)
if result[1] != Responses.SUCCESS:
raise Exception("Error in {} procedure, reason: {}".format(
Procedures.to_string(result[0]),
Responses.to_string(result[1])))
return result
#--------------------------------------------------------------------------
# Send the Init info (*.dat file contents) to peripheral device.
#--------------------------------------------------------------------------
def _dfu_send_init(self):
if verbose: print("dfu_send_init")
# Open the DAT file and create array of its contents
init_bin_array = array('B', open(self.datfile_path, 'rb').read())
# Transmit Init info
self._dfu_send_data(init_bin_array)

View file

@ -0,0 +1,323 @@
import math
import pexpect
import time
from array import array
from util import *
from nrf_ble_dfu_controller import NrfBleDfuController
verbose = False
class Procedures:
CREATE = 0x01
SET_PRN = 0x02
CALC_CHECKSUM = 0x03
EXECUTE = 0x04
SELECT = 0x06
RESPONSE = 0x60
PARAM_COMMAND = 0x01
PARAM_DATA = 0x02
string_map = {
CREATE : "CREATE",
SET_PRN : "SET_PRN",
CALC_CHECKSUM : "CALC_CHECKSUM",
EXECUTE : "EXECUTE",
SELECT : "SELECT",
RESPONSE : "RESPONSE",
}
@staticmethod
def to_string(proc):
return Procedures.string_map[proc]
@staticmethod
def from_string(proc_str):
return int(proc_str, 16)
class Results:
INVALID_CODE = 0x00
SUCCESS = 0x01
OPCODE_NOT_SUPPORTED = 0x02
INVALID_PARAMETER = 0x03
INSUFF_RESOURCES = 0x04
INVALID_OBJECT = 0x05
UNSUPPORTED_TYPE = 0x07
OPERATION_NOT_PERMITTED = 0x08
OPERATION_FAILED = 0x0A
string_map = {
INVALID_CODE : "INVALID_CODE",
SUCCESS : "SUCCESS",
OPCODE_NOT_SUPPORTED : "OPCODE_NOT_SUPPORTED",
INVALID_PARAMETER : "INVALID_PARAMETER",
INSUFF_RESOURCES : "INSUFFICIENT_RESOURCES",
INVALID_OBJECT : "INVALID_OBJECT",
UNSUPPORTED_TYPE : "UNSUPPORTED_TYPE",
OPERATION_NOT_PERMITTED : "OPERATION_NOT_PERMITTED",
OPERATION_FAILED : "OPERATION_FAILED",
}
@staticmethod
def to_string(res):
return Results.string_map[res]
@staticmethod
def from_string(res_str):
return int(res_str, 16)
class BleDfuControllerSecure(NrfBleDfuController):
# Class constants
UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50'
UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50'
UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50'
# Constructor inherited from abstract base class
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
def start(self):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
# Subscribe to notifications from Control Point characteristic
self._enable_notifications(self.ctrlpt_cccd_handle)
# Set the Packet Receipt Notification interval
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
self._dfu_send_command(Procedures.SET_PRN, prn)
self._dfu_send_init()
self._dfu_send_image()
# --------------------------------------------------------------------------
# Check if the peripheral is running in bootloader (DFU) or application mode
# Returns True if the peripheral is in DFU mode
# --------------------------------------------------------------------------
def check_DFU_mode(self):
print("Checking DFU State...")
self.ble_conn.sendline('characteristics')
dfu_mode = False
try:
self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2)
except pexpect.TIMEOUT as e:
dfu_mode = True
return dfu_mode
def switch_to_dfu_mode(self):
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS)
self._enable_notifications(bl_cccd_handle)
# Reset the board in DFU mode. After reset the board will be disconnected
cmd = 'char-write-req 0x%04x 01' % (bl_value_handle)
self.ble_conn.sendline(cmd)
# Wait some time for board to reboot
time.sleep(0.5)
# Increase the mac address by one and reconnect
self.target_mac_increase(1)
return self.scan_and_connect()
# --------------------------------------------------------------------------
# Parse notification status results
# --------------------------------------------------------------------------
def _dfu_parse_notify(self, notify):
if len(notify) < 3:
print("notify data length error")
return None
if verbose: print(notify)
dfu_notify_opcode = Procedures.from_string(notify[0])
if dfu_notify_opcode == Procedures.RESPONSE:
dfu_procedure = Procedures.from_string(notify[1])
dfu_result = Results.from_string(notify[2])
procedure_str = Procedures.to_string(dfu_procedure)
result_str = Results.to_string(dfu_result)
# if verbose: print "opcode: {0}, proc: {1}, res: {2}".format(dfu_notify_opcode, procedure_str, result_str)
if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, result_str))
# Packet Receipt notifications are sent in the exact same format
# as responses to the CALC_CHECKSUM procedure.
if(dfu_procedure == Procedures.CALC_CHECKSUM and dfu_result == Results.SUCCESS):
offset = bytes_to_uint32_le(notify[3:7])
crc32 = bytes_to_uint32_le(notify[7:11])
return (dfu_procedure, dfu_result, offset, crc32)
elif(dfu_procedure == Procedures.SELECT and dfu_result == Results.SUCCESS):
max_size = bytes_to_uint32_le(notify[3:7])
offset = bytes_to_uint32_le(notify[7:11])
crc32 = bytes_to_uint32_le(notify[11:15])
return (dfu_procedure, dfu_result, max_size, offset, crc32)
else:
return (dfu_procedure, dfu_result)
# --------------------------------------------------------------------------
# Wait for a notification and parse the response
# --------------------------------------------------------------------------
def _wait_and_parse_notify(self):
if verbose: print("Waiting for notification")
notify = self._dfu_wait_for_notify()
if notify is None:
raise Exception("No notification received")
if verbose: print("Parsing notification")
result = self._dfu_parse_notify(notify)
if result[1] != Results.SUCCESS:
raise Exception("Error in {} procedure, reason: {}".format(
Procedures.to_string(result[0]),
Results.to_string(result[1])))
return result
# --------------------------------------------------------------------------
# Send the Init info (*.dat file contents) to peripheral device.
# --------------------------------------------------------------------------
def _dfu_send_init(self):
if verbose: print("dfu_send_init")
# Open the DAT file and create array of its contents
init_bin_array = array('B', open(self.datfile_path, 'rb').read())
init_size = len(init_bin_array)
init_crc = 0;
# Select command
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_COMMAND]);
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
if offset != init_size or crc32 != init_crc:
if offset == 0 or offset > init_size:
# Create command
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_COMMAND] + uint32_to_bytes_le(init_size))
res = self._wait_and_parse_notify()
segment_count = 0
segment_total = int(math.ceil(init_size/float(self.pkt_payload_size)))
for i in range(0, init_size, self.pkt_payload_size):
segment = init_bin_array[i:i + self.pkt_payload_size]
self._dfu_send_data(segment)
segment_count += 1
if (segment_count % self.pkt_receipt_interval) == 0:
(proc, res, offset, crc32) = self._wait_and_parse_notify()
if res != Results.SUCCESS:
raise Exception("bad notification status: {}".format(Results.to_string(res)))
# Calculate CRC
self._dfu_send_command(Procedures.CALC_CHECKSUM)
self._wait_and_parse_notify()
# Execute command
self._dfu_send_command(Procedures.EXECUTE)
self._wait_and_parse_notify()
print("Init packet successfully transfered")
# --------------------------------------------------------------------------
# Send the Firmware image to peripheral device.
# --------------------------------------------------------------------------
def _dfu_send_image(self):
if verbose: print("dfu_send_image")
# Select Data Object
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_DATA])
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
# Split the firmware into multiple objects
num_objects = int(math.ceil(self.image_size / float(max_size)))
print("Max object size: %d, num objects: %d, offset: %d, total size: %d" % (max_size, num_objects, offset, self.image_size))
time_start = time.time()
last_send_time = time.time()
obj_offset = (offset/max_size)*max_size
while(obj_offset < self.image_size):
# print "\nSending object {} of {}".format(obj_offset/max_size+1, num_objects)
obj_offset += self._dfu_send_object(obj_offset, max_size)
# Image uploaded successfully, update the progress bar
print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
duration = time.time() - time_start
print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
# --------------------------------------------------------------------------
# Send a single data object of given size and offset.
# --------------------------------------------------------------------------
def _dfu_send_object(self, offset, obj_max_size):
if offset != self.image_size:
if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Create Data Object
size = min(obj_max_size, self.image_size - offset)
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size))
self._wait_and_parse_notify()
segment_count = 0
segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size)))
segment_begin = offset
segment_end = min(offset+obj_max_size, self.image_size)
for i in range(segment_begin, segment_end, self.pkt_payload_size):
num_bytes = min(self.pkt_payload_size, segment_end - i)
segment = self.bin_array[i:i + num_bytes]
self._dfu_send_data(segment)
segment_count += 1
# print "j: {} i: {}, end: {}, bytes: {}, size: {} segment #{} of {}".format(
# offset, i, segment_end, num_bytes, self.image_size, segment_count, segment_total)
if (segment_count % self.pkt_receipt_interval) == 0:
try:
(proc, res, offset, crc32) = self._wait_and_parse_notify()
except e:
# Likely no notification received, need to re-transmit object
return 0
if res != Results.SUCCESS:
raise Exception("bad notification status: {}".format(Results.to_string(res)))
if crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Something went wrong, need to re-transmit this object
return 0
print_progress(offset, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
# Calculate CRC
self._dfu_send_command(Procedures.CALC_CHECKSUM)
(proc, res, offset, crc32) = self._wait_and_parse_notify()
if(crc32 != crc32_unsigned(self.bin_array[0:offset])):
# Need to re-transmit object
return 0
# Execute command
self._dfu_send_command(Procedures.EXECUTE)
self._wait_and_parse_notify()
# If everything executed correctly, return amount of bytes transfered
return obj_max_size

188
bootloader/ota-dfu-python/dfu.py Executable file
View file

@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""
------------------------------------------------------------------------------
DFU Server for Nordic nRF51 based systems.
Conforms to nRF51_SDK 11.0 BLE_DFU requirements.
------------------------------------------------------------------------------
"""
import os, re
import sys
import optparse
import time
import math
import traceback
from unpacker import Unpacker
from ble_secure_dfu_controller import BleDfuControllerSecure
from ble_legacy_dfu_controller import BleDfuControllerLegacy
def main():
init_msg = """
================================
== ==
== DFU Server ==
== ==
================================
"""
# print "DFU Server start"
print(init_msg)
try:
parser = optparse.OptionParser(usage='%prog -f <hex_file> -a <dfu_target_address>\n\nExample:\n\tdfu.py -f application.hex -d application.dat -a cd:e3:4a:47:1c:e4',
version='0.5')
parser.add_option('-a', '--address',
action='store',
dest="address",
type="string",
default=None,
help='DFU target address.'
)
parser.add_option('-f', '--file',
action='store',
dest="hexfile",
type="string",
default=None,
help='hex file to be uploaded.'
)
parser.add_option('-d', '--dat',
action='store',
dest="datfile",
type="string",
default=None,
help='dat file to be uploaded.'
)
parser.add_option('-z', '--zip',
action='store',
dest="zipfile",
type="string",
default=None,
help='zip file to be used.'
)
parser.add_option('--secure',
action='store_true',
dest='secure_dfu',
default=True,
help='Use secure bootloader (Nordic SDK > 12)'
)
parser.add_option('--legacy',
action='store_false',
dest='secure_dfu',
help='Use secure bootloader (Nordic SDK < 12)'
)
options, args = parser.parse_args()
except Exception as e:
print(e)
print("For help use --help")
sys.exit(2)
try:
''' Validate input parameters '''
if not options.address:
parser.print_help()
exit(2)
unpacker = None
hexfile = None
datfile = None
if options.zipfile != None:
if (options.hexfile != None) or (options.datfile != None):
print("Conflicting input directives")
exit(2)
unpacker = Unpacker()
#print options.zipfile
try:
hexfile, datfile = unpacker.unpack_zipfile(options.zipfile)
except Exception as e:
print("ERR")
print(e)
pass
else:
if (not options.hexfile) or (not options.datfile):
parser.print_help()
exit(2)
if not os.path.isfile(options.hexfile):
print("Error: Hex file doesn't exist")
exit(2)
if not os.path.isfile(options.datfile):
print("Error: DAT file doesn't exist")
exit(2)
hexfile = options.hexfile
datfile = options.datfile
''' Start of Device Firmware Update processing '''
if options.secure_dfu:
ble_dfu = BleDfuControllerSecure(options.address.upper(), hexfile, datfile)
else:
ble_dfu = BleDfuControllerLegacy(options.address.upper(), hexfile, datfile)
# Initialize inputs
ble_dfu.input_setup()
# Connect to peer device. Assume application mode.
if ble_dfu.scan_and_connect():
if not ble_dfu.check_DFU_mode():
print("Need to switch to DFU mode")
success = ble_dfu.switch_to_dfu_mode()
if not success:
print("Couldn't reconnect")
else:
# The device might already be in DFU mode (MAC + 1)
ble_dfu.target_mac_increase(1)
# Try connection with new address
print("Couldn't connect, will try DFU MAC")
if not ble_dfu.scan_and_connect():
raise Exception("Can't connect to device")
ble_dfu.start()
# Disconnect from peer device if not done already and clean up.
ble_dfu.disconnect()
except Exception as e:
# print traceback.format_exc()
print("Exception at line {}: {}".format(sys.exc_info()[2].tb_lineno, e))
pass
except:
pass
# If Unpacker for zipfile used then delete Unpacker
if unpacker != None:
unpacker.delete()
print("DFU Server done")
"""
------------------------------------------------------------------------------
------------------------------------------------------------------------------
"""
if __name__ == '__main__':
# Do not litter the world with broken .pyc files.
sys.dont_write_bytecode = True
main()

View file

@ -0,0 +1,263 @@
import os
import pexpect
import re
from abc import ABCMeta, abstractmethod
from array import array
from util import *
verbose = False
class NrfBleDfuController(object, metaclass=ABCMeta):
ctrlpt_handle = 0
ctrlpt_cccd_handle = 0
data_handle = 0
pkt_receipt_interval = 10
pkt_payload_size = 20
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
@abstractmethod
def start(self):
pass
# --------------------------------------------------------------------------
# Check if the peripheral is running in bootloader (DFU) or application mode
# Returns True if the peripheral is in DFU mode
# --------------------------------------------------------------------------
@abstractmethod
def check_DFU_mode(self):
pass
@abstractmethod
# --------------------------------------------------------------------------
# Switch from application to bootloader (DFU)
# --------------------------------------------------------------------------
def switch_to_dfu_mode(self):
pass
# --------------------------------------------------------------------------
# Parse notification status results
# --------------------------------------------------------------------------
@abstractmethod
def _dfu_parse_notify(self, notify):
pass
# --------------------------------------------------------------------------
# Wait for a notification and parse the response
# --------------------------------------------------------------------------
@abstractmethod
def _wait_and_parse_notify(self):
pass
def __init__(self, target_mac, firmware_path, datfile_path):
self.target_mac = target_mac
self.firmware_path = firmware_path
self.datfile_path = datfile_path
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % target_mac)
self.ble_conn.delaybeforesend = 0
# --------------------------------------------------------------------------
# Start the firmware update process
# --------------------------------------------------------------------------
def start(self):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
# Subscribe to notifications from Control Point characteristic
self._enable_notifications(self.ctrlpt_cccd_handle)
# Set the Packet Receipt Notification interval
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
self._dfu_send_command(Procedures.SET_PRN, prn)
self._dfu_send_init()
self._dfu_send_image()
# --------------------------------------------------------------------------
# Initialize:
# Hex: read and convert hexfile into bin_array
# Bin: read binfile into bin_array
# --------------------------------------------------------------------------
def input_setup(self):
print("Sending file " + os.path.split(self.firmware_path)[1] + " to " + self.target_mac)
if self.firmware_path == None:
raise Exception("input invalid")
name, extent = os.path.splitext(self.firmware_path)
if extent == ".bin":
self.bin_array = array('B', open(self.firmware_path, 'rb').read())
self.image_size = len(self.bin_array)
print("Binary imge size: %d" % self.image_size)
print("Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)))
return
if extent == ".hex":
intelhex = IntelHex(self.firmware_path)
self.bin_array = intelhex.tobinarray()
self.image_size = len(self.bin_array)
print("bin array size: ", self.image_size)
return
raise Exception("input invalid")
# --------------------------------------------------------------------------
# Perform a scan and connect via gatttool.
# Will return True if a connection was established, False otherwise
# --------------------------------------------------------------------------
def scan_and_connect(self, timeout=2):
if verbose: print("scan_and_connect")
print("Connecting to %s" % (self.target_mac))
try:
self.ble_conn.expect('\[LE\]>', timeout=timeout)
except pexpect.TIMEOUT as e:
return False
self.ble_conn.sendline('connect')
try:
res = self.ble_conn.expect('.*Connection successful.*', timeout=timeout)
except pexpect.TIMEOUT as e:
return False
return True
# --------------------------------------------------------------------------
# Disconnect from the peripheral and close the gatttool connection
# --------------------------------------------------------------------------
def disconnect(self):
self.ble_conn.sendline('exit')
self.ble_conn.close()
def target_mac_increase(self, inc):
self.target_mac = uint_to_mac_string(mac_string_to_uint(self.target_mac) + inc)
# Re-start gatttool with the new address
self.disconnect()
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % self.target_mac)
self.ble_conn.delaybeforesend = 0
# --------------------------------------------------------------------------
# Fetch handles for a given UUID.
# Will return a three-tuple: (char handle, value handle, CCCD handle)
# Will raise an exception if the UUID is not found
# --------------------------------------------------------------------------
def _get_handles(self, uuid):
self.ble_conn.before = ""
self.ble_conn.sendline('characteristics')
try:
self.ble_conn.expect([uuid], timeout=2)
handles = re.findall(b'.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before)
(handle, value_handle) = handles[-1]
except pexpect.TIMEOUT as e:
raise Exception("UUID not found: {}".format(uuid))
return (int(handle, 16), int(value_handle, 16), int(value_handle, 16)+1)
# --------------------------------------------------------------------------
# Wait for notification to arrive.
# Example format: "Notification handle = 0x0019 value: 10 01 01"
# --------------------------------------------------------------------------
def _dfu_wait_for_notify(self):
while True:
if verbose: print("dfu_wait_for_notify")
if not self.ble_conn.isalive():
print("connection not alive")
return None
try:
index = self.ble_conn.expect('Notification handle = .*? \r\n', timeout=30)
except pexpect.TIMEOUT:
#
# The gatttool does not report link-lost directly.
# The only way found to detect it is monitoring the prompt '[CON]'
# and if it goes to '[ ]' this indicates the connection has
# been broken.
# In order to get a updated prompt string, issue an empty
# sendline(''). If it contains the '[ ]' string, then
# raise an exception. Otherwise, if not a link-lost condition,
# continue to wait.
#
self.ble_conn.sendline('')
string = self.ble_conn.before
if '[ ]' in string:
print('Connection lost! ')
raise Exception('Connection Lost')
return None
if index == 0:
after = self.ble_conn.after
hxstr = after.split()[3:]
handle = int(float.fromhex(hxstr[0].decode('UTF-8')))
return hxstr[2:]
else:
print("unexpeced index: {0}".format(index))
return None
# --------------------------------------------------------------------------
# Send a procedure + any parameters required
# --------------------------------------------------------------------------
def _dfu_send_command(self, procedure, params=[]):
if verbose: print('_dfu_send_command')
cmd = 'char-write-req 0x%04x %02x' % (self.ctrlpt_handle, procedure)
cmd += array_to_hex_string(params)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Verify that command was successfully written
try:
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
except pexpect.TIMEOUT as e:
print("State timeout")
# --------------------------------------------------------------------------
# Send an array of bytes
# --------------------------------------------------------------------------
def _dfu_send_data(self, data):
cmd = 'char-write-cmd 0x%04x' % (self.data_handle)
cmd += ' '
cmd += array_to_hex_string(data)
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# --------------------------------------------------------------------------
# Enable notifications from the Control Point Handle
# --------------------------------------------------------------------------
def _enable_notifications(self, cccd_handle):
if verbose: print('_enable_notifications')
cmd = 'char-write-req 0x%04x %s' % (cccd_handle, '0100')
if verbose: print(cmd)
self.ble_conn.sendline(cmd)
# Verify that command was successfully written
try:
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
except pexpect.TIMEOUT as e:
print("State timeout")

View file

@ -0,0 +1,52 @@
import os.path
import zipfile
import tempfile
import random
import string
import shutil
import re
from os.path import basename
class Unpacker(object):
#--------------------------------------------------------------------------
#
#--------------------------------------------------------------------------
def entropy(self, length):
return ''.join(random.choice('abcdefghijklmnopqrstuvwxyz') for i in range (length))
#--------------------------------------------------------------------------
#
#--------------------------------------------------------------------------
def unpack_zipfile(self, file):
if not os.path.isfile(file):
raise Exception("Error: file, not found!")
# Create unique working direction into which the zip file is expanded
self.unzip_dir = "{0}/{1}_{2}".format(tempfile.gettempdir(), os.path.splitext(basename(file))[0], self.entropy(6))
datfilename = ""
binfilename = ""
with zipfile.ZipFile(file, 'r') as zip:
files = [item.filename for item in zip.infolist()]
datfilename = [m.group(0) for f in files for m in [re.search('.*\.dat', f)] if m].pop()
binfilename = [m.group(0) for f in files for m in [re.search('.*\.bin', f)] if m].pop()
zip.extractall(r'{0}'.format(self.unzip_dir))
datfile = "{0}/{1}".format(self.unzip_dir, datfilename)
binfile = "{0}/{1}".format(self.unzip_dir, binfilename)
# print "DAT file: " + datfile
# print "BIN file: " + binfile
return binfile, datfile
#--------------------------------------------------------------------------
#
#--------------------------------------------------------------------------
def delete(self):
# delete self.unzip_dir and its contents
shutil.rmtree(self.unzip_dir)

View file

@ -0,0 +1,70 @@
import sys
import binascii
import re
def bytes_to_uint32_le(bytes):
return (int(bytes[3], 16) << 24) | (int(bytes[2], 16) << 16) | (int(bytes[1], 16) << 8) | (int(bytes[0], 16) << 0)
def uint32_to_bytes_le(uint32):
return [(uint32 >> 0) & 0xff,
(uint32 >> 8) & 0xff,
(uint32 >> 16) & 0xff,
(uint32 >> 24) & 0xff]
def uint16_to_bytes_le(value):
return [(value >> 0 & 0xFF),
(value >> 8 & 0xFF)]
def zero_pad_array_le(data, padsize):
for i in range(0, padsize):
data.insert(0, 0)
def array_to_hex_string(arr):
hex_str = ""
for val in arr:
if val > 255:
raise Exception("Value is greater than it is possible to represent with one byte")
hex_str += "%02x" % val
return hex_str
def crc32_unsigned(bytestring):
return binascii.crc32(bytestring.encode('UTF-8')) % (1 << 32)
def mac_string_to_uint(mac):
parts = list(re.match('(..):(..):(..):(..):(..):(..)', mac).groups())
ints = [int(x, 16) for x in parts]
res = 0
for i in range(0, len(ints)):
res += (ints[len(ints)-1 - i] << 8*i)
return res
def uint_to_mac_string(mac):
ints = [0, 0, 0, 0, 0, 0]
for i in range(0, len(ints)):
ints[len(ints)-1 - i] = (mac >> 8*i) & 0xff
return ':'.join(['{:02x}'.format(x).upper() for x in ints])
# Print a nice console progress bar
def print_progress(iteration, total, prefix = '', suffix = '', decimals = 1, barLength = 100):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
barLength - Optional : character length of bar (Int)
"""
formatStr = "{0:." + str(decimals) + "f}"
percents = formatStr.format(100 * (iteration / float(total)))
filledLength = int(round(barLength * iteration / float(total)))
bar = 'x' * filledLength + '-' * (barLength - filledLength)
sys.stdout.write('\r%s |%s| %s%s %s (%d of %d bytes)' % (prefix, bar, percents, '%', suffix, iteration, total)),
if iteration == total:
sys.stdout.write('\n')
sys.stdout.flush()