How-to: Assemble Report Segments

AVSS sensor reports can be arbitrarily large and thus they need to be transfered in as segments that fit in the Bluetooth Low Energy MTU. This page outlines an algorithm for assembling report segments into a complete report.

Algorithm to Assemble Report Segments

When a segment is received, examine the header to determine its position within the sequence. If the segment is identified as the initial segment (indicated by First = 1), reset the buffer and initialize the sequence number to the value provided in the header of the initial segment. Each subsequent segment is checked to ensure it matches the expected sequence order. If a segment is detected out of order, reset the buffer to discard any incorrect data. If the segment is identified as the final segment (indicated by Last = 1), a full report has been received in the buffer and is ready to be processed.

The sequence number wraps around to 0 after reaching 63 and continues from there. Note that there are no guarantees regarding the sequence number of the initial segment.

Listing: Assembling Report Segments with Python

The code listing below demonstrates how the report re-assembly algorithm can be implemented in practice. You are free to use this code directly or adapt it to your preferred programming language.

REPORT_HEADER_FIRST = 0x80
REPORT_HEADER_LAST = 0x40
REPORT_HEADER_SEQUENCE_NUMBER = 0x3F

class AvssReportBuffer:
   def __init__(self):
      self.first_seen: bool = False
      self.full_report: bool = False
      self.next_segment_number: int | None = None
      self.buffer = bytearray()

   def reset(self):
      """
      Reset the state of the report buffer

      Discards all data in the report buffer, starting anew.
      """
      self.first_seen = False
      self.full_report = False
      self.next_segment_number = None
      self.buffer = bytearray()

   def push(self, data: bytes):
      """
      Push a segment to the report buffer.

      This function handles the re-assembly of segments. If a complete report
      has been received, `get_report()` will return a `bytes()` object.
      Any errors in reassembly, such as out-of-order segments, are handled
      by discarding the data.

      Args:
         data: the segment to push to the buffer
      """

      header = data[0]
      payload = data[1:]

      if header & REPORT_HEADER_FIRST:
         self.reset()
         self.first_seen = True
         self.next_segment_no = header & REPORT_HEADER_SEQUENCE_NUMBER

      if not self.first_seen:
         return

      if self.next_segment_no != header & REPORT_HEADER_SEQUENCE_NUMBER:
         self.reset()
         return

      self.buffer.extend(payload)

      if header & REPORT_HEADER_LAST:
         self.full_report = True
         self.next_segment_no = None

   def get_report(self) -> bytes | None:
      """
      Attempt to get a full report from the report buffer.

      This function should be called to extract a full report from the report
      buffer. It should be called after each call to `push()` to check if a
      full report has been received.
      """
      if self.full_report:
         return bytes(self.buffer)
      else:
         return None