How-to: Assemble Report Segments¶
AVSS sensor reports can be arbitrarily large and thus they need to be transfered in as segments that fit in the Bluetooth Low Energy MTU. This page outlines an algorithm for assembling report segments into a complete report.
Algorithm to Assemble Report Segments¶
When a segment is received, examine the header to determine its position within
the sequence. If the segment is identified as the initial segment (indicated by
First = 1), reset the buffer and initialize the sequence number to the value
provided in the header of the initial segment. Each subsequent segment is
checked to ensure it matches the expected sequence order.
If a segment is detected out of order, reset the buffer to discard any incorrect
data. If the segment is identified as the final segment (indicated by
Last = 1), a full report has been received in the buffer and is ready to be
processed.
The sequence number wraps around to 0 after reaching 63 and continues from there. Note that there are no guarantees regarding the sequence number of the initial segment.
Listing: Assembling Report Segments with Python¶
The code listing below demonstrates how the report re-assembly algorithm can be implemented in practice. You are free to use this code directly or adapt it to your preferred programming language.
REPORT_HEADER_FIRST = 0x80
REPORT_HEADER_LAST = 0x40
REPORT_HEADER_SEQUENCE_NUMBER = 0x3F
class AvssReportBuffer:
def __init__(self):
self.first_seen: bool = False
self.full_report: bool = False
self.next_segment_number: int | None = None
self.buffer = bytearray()
def reset(self):
"""
Reset the state of the report buffer
Discards all data in the report buffer, starting anew.
"""
self.first_seen = False
self.full_report = False
self.next_segment_number = None
self.buffer = bytearray()
def push(self, data: bytes):
"""
Push a segment to the report buffer.
This function handles the re-assembly of segments. If a complete report
has been received, `get_report()` will return a `bytes()` object.
Any errors in reassembly, such as out-of-order segments, are handled
by discarding the data.
Args:
data: the segment to push to the buffer
"""
header = data[0]
payload = data[1:]
if header & REPORT_HEADER_FIRST:
self.reset()
self.first_seen = True
self.next_segment_no = header & REPORT_HEADER_SEQUENCE_NUMBER
if not self.first_seen:
return
if self.next_segment_no != header & REPORT_HEADER_SEQUENCE_NUMBER:
self.reset()
return
self.buffer.extend(payload)
if header & REPORT_HEADER_LAST:
self.full_report = True
self.next_segment_no = None
def get_report(self) -> bytes | None:
"""
Attempt to get a full report from the report buffer.
This function should be called to extract a full report from the report
buffer. It should be called after each call to `push()` to check if a
full report has been received.
"""
if self.full_report:
return bytes(self.buffer)
else:
return None