Source code for pyrokid_cxr_clientm.libcaps

"""
com.rokid.cxr.client-m:1.0.4 - jni/arm64-v8a/libcaps.so library in Python

Caps class is THE way to decode and encode packets when communicating to the Rokid Glasses.
"""
from __future__ import annotations
from struct import pack as struct_pack, unpack as struct_unpack
from io import BytesIO

[docs] class Caps: """Main Caps container class for serialization/deserialization :param Optional[list[Caps.Value]] values: Optional values list, in case you already have a list of :type:`Caps.Value`'s. Else its recommended to use :func:`Caps.fromBytes` """ CAPS_VERSION = 5 """Current Caps version."""
[docs] class CapsError(Exception): """Base exception for :type:`Caps` operations""" pass
[docs] class IncorrectTypeException(RuntimeError): """Exception that throws when Type of a :type:`Caps.Value` is not what you were expecting""" pass
__a: list[Caps.Value]
[docs] def write(self, value: Any = None) -> Caps: """Add a value to the Caps object""" if isinstance(value, Caps.Value): self.__a.append(value) else: self.__a.append(Caps.Value(None, value)) return self
[docs] def writeInt32(self, value: int) -> Caps: """Write an integer value""" return self.write(Caps.Value(Caps.Value.TYPE_INT32, int(value)))
[docs] def writeUInt32(self, value: int) -> Caps: """Write an unsigned integer value""" return self.write(Caps.Value(Caps.Value.TYPE_UINT32, int(value)))
[docs] def writeInt64(self, value: int) -> Caps: """Write a long integer value""" return self.write(Caps.Value(Caps.Value.TYPE_INT64, int(value)))
[docs] def writeUInt64(self, value: int) -> Caps: """Write an unsigned long value""" return self.write(Caps.Value(Caps.Value.TYPE_UINT64, int(value)))
[docs] def writeFloat(self, value: float) -> Caps: """Write a float value""" return self.write(Caps.Value(Caps.Value.TYPE_FLOAT, float(value)))
[docs] def writeDouble(self, value: float) -> Caps: """Write a double value""" return self.write(Caps.Value(Caps.Value.TYPE_DOUBLE, float(value)))
[docs] def writeObject(self, value: Caps) -> Caps: """Write a nested :type:`Caps` object""" return self.write(Caps.Value(Caps.Value.TYPE_OBJECT, value))
[docs] def writeVoid(self) -> Caps: """Write a void value""" return self.write(Caps.Value(Caps.Value.TYPE_VOID, None))
[docs] def writeString(self, value: str) -> Caps: """Write a string value""" return self.write(Caps.Value(Caps.Value.TYPE_STRING, str(value)))
[docs] def writeBoolean(self, value: bool) -> Caps: """Write a void value""" return self.write(Caps.Value(Caps.Value.TYPE_UINT32, int(value)))
[docs] def writeBinary(self, value: bytes) -> Caps: """Write binary data""" return self.write(Caps.Value(Caps.Value.TYPE_BINARY, value))
def __init__(self, values: list[Caps.Value] = None): self.__a: list[Caps.Value] = values or []
[docs] @staticmethod def fromBytes(data: bytes) -> tuple[Caps, bytes]: """Parse a new Caps object from bytes""" caps = Caps() return caps.parse(data)
def __repr__(self) -> str: return "Caps(%s)" % self.__a
[docs] def dump(self) -> str: return repr(self)
[docs] def serialize(self) -> bytes: """ Serialize the Caps object to bytes Format: [4 bytes size][1 byte version][member descriptors][member data] """ # First pass: serialize member descriptors and data desc_buffer = BytesIO() data_buffer = BytesIO() # Write member count as ULEB128 desc_buffer.write(Caps._encode_uleb128(len(self.__a))) # Write type descriptors for member in self.__a: if not isinstance(member, Caps.Value): continue member_type = member.type() desc_buffer.write(bytes([member_type])) # Write member data for member in self.__a: if not isinstance(member, Caps.Value): continue Caps._serialize_member(member, data_buffer) # Combine everything desc_data = desc_buffer.getvalue() member_data = data_buffer.getvalue() total_size = 5 + len(desc_data) + len(member_data) result = BytesIO() result.write(struct_pack('>I', total_size)) # Big-endian size result.write(bytes([Caps.CAPS_VERSION])) result.write(desc_data) result.write(member_data) return result.getvalue()
[docs] def parse(self, data: bytes) -> tuple['Caps', bytes]: """Parse a bytes to the current Caps object""" if len(data) < 5: raise Caps.CapsError("Data too small") # Parse header size = struct_unpack('>I', data[0:4])[0] version = data[4] if version != Caps.CAPS_VERSION: raise Caps.CapsError("Unsupported version: %d" % (version)) if size > len(data): raise Caps.CapsError("Size mismatch: expected %d, got %d" % (size, len(data))) # Parse members buffer = BytesIO(data[5:]) # Read member count member_count, bytes_read = Caps._decode_uleb128(buffer) # Read type descriptors descriptors = [] for _ in range(member_count): desc = buffer.read(1) if not desc: raise Caps.CapsError("Truncated descriptor data") descriptors.append(desc[0]) # Read member data for desc in descriptors: value = Caps._parse_member(desc, buffer) self.__a.append(Caps.Value(desc, value)) return self, data[size:] # return the unparsed bytes ;)
def __len__(self) -> int: """Return the number of members""" return len(self.__a)
[docs] def size(self) -> int: """Return the number of members""" return self.__len__()
def __getitem__(self, index: int) -> Caps.Value: """Get member value by index""" if index < 0 or index >= len(self.__a): raise IndexError("Index out of range") return self.__a[index]
[docs] def at(self, index: int) -> Caps.Value: """Get member value by index""" return self.__getitem__(index)
[docs] def clear(self) -> None: """Clear all members""" self.__a.clear()
def __str__(self) -> str: return self.dump()
[docs] def toString(self) -> str: return self.dump()
@staticmethod def _serialize_member(member: Caps.Value, buffer: BytesIO) -> None: """Serialize a single member's data""" member_type = member.type() value = member.getValueNoType() if member_type == Caps.Value.TYPE_VOID: pass # No data elif member_type == Caps.Value.TYPE_INT32: buffer.write(Caps._encode_sleb128(value)) elif member_type == Caps.Value.TYPE_UINT32: buffer.write(Caps._encode_uleb128(value)) elif member_type == Caps.Value.TYPE_FLOAT: buffer.write(struct_pack('<f', value)) elif member_type == Caps.Value.TYPE_INT64: buffer.write(Caps._encode_sleb128(value)) elif member_type == Caps.Value.TYPE_UINT64: buffer.write(Caps._encode_uleb128(value)) elif member_type == Caps.Value.TYPE_DOUBLE: buffer.write(struct_pack('<d', value)) elif member_type == Caps.Value.TYPE_STRING: encoded = value.encode('utf-8') buffer.write(Caps._encode_uleb128(len(encoded))) buffer.write(encoded) elif member_type == Caps.Value.TYPE_BINARY: buffer.write(Caps._encode_uleb128(len(value))) buffer.write(value) elif member_type == Caps.Value.TYPE_OBJECT: nested_data = value.serialize() buffer.write(nested_data) else: raise Caps.CapsError("Unknown member type: %d" % (member_type)) @staticmethod def _parse_member(member_type: chr, buffer: BytesIO) -> Any: """Parse a single member from the buffer""" if member_type == Caps.Value.TYPE_VOID: return None elif member_type == Caps.Value.TYPE_INT32: value, _ = Caps._decode_sleb128(buffer) return value elif member_type == Caps.Value.TYPE_UINT32: value, _ = Caps._decode_uleb128(buffer) return value elif member_type == Caps.Value.TYPE_FLOAT: data = buffer.read(4) return struct_unpack('<f', data)[0] elif member_type == Caps.Value.TYPE_INT64: value, _ = Caps._decode_sleb128(buffer) return value elif member_type == Caps.Value.TYPE_UINT64: value, _ = Caps._decode_uleb128(buffer) return value elif member_type == Caps.Value.TYPE_DOUBLE: data = buffer.read(8) return struct_unpack('<d', data)[0] elif member_type == Caps.Value.TYPE_STRING: length, _ = Caps._decode_uleb128(buffer) data = buffer.read(length) return data.decode('utf-8') elif member_type == Caps.Value.TYPE_BINARY: length, _ = Caps._decode_uleb128(buffer) return buffer.read(length) elif member_type == Caps.Value.TYPE_OBJECT: # Read size header size_data = buffer.read(4) size = struct_unpack('>I', size_data)[0] # Read rest of caps data caps_data = size_data + buffer.read(size - 4) cls, _ = Caps.fromBytes(caps_data) return cls else: raise Caps.CapsError("Unknown member type: %d" % (member_type)) @staticmethod def _encode_uleb128(value: int) -> bytes: """Encode unsigned integer as ULEB128""" if value < 0: raise ValueError("ULEB128 requires non-negative value") result = bytearray() while True: byte = value & 0x7F value >>= 7 if value != 0: byte |= 0x80 result.append(byte) if value == 0: break return bytes(result) @staticmethod def _decode_uleb128(buffer: BytesIO) -> tuple[int, int]: """Decode ULEB128 from buffer, returns (value, bytes_read)""" result = 0 shift = 0 bytes_read = 0 while True: byte_data = buffer.read(1) if not byte_data: raise Caps.CapsError("Truncated ULEB128") byte = byte_data[0] bytes_read += 1 result |= (byte & 0x7F) << shift shift += 7 if (byte & 0x80) == 0: break if shift >= 64: raise Caps.CapsError("ULEB128 too large") return result, bytes_read @staticmethod def _encode_sleb128(value: int) -> bytes: """Encode signed integer as SLEB128""" result = bytearray() while True: byte = value & 0x7F value >>= 7 # Sign extend if value == 0 and (byte & 0x40) == 0: result.append(byte) break elif value == -1 and (byte & 0x40) != 0: result.append(byte) break else: result.append(byte | 0x80) return bytes(result) @staticmethod def _decode_sleb128(buffer: BytesIO) -> tuple[int, int]: """Decode SLEB128 from buffer, returns (value, bytes_read)""" result = 0 shift = 0 bytes_read = 0 byte = 0x80 while (byte & 0x80) != 0: byte_data = buffer.read(1) if not byte_data: raise Caps.CapsError("Truncated SLEB128") byte = byte_data[0] bytes_read += 1 result |= (byte & 0x7F) << shift shift += 7 # Sign extend if shift < 64 and (byte & 0x40) != 0: result |= -(1 << shift) return result, bytes_read
[docs] class Value: """Represents a single value in :type:`Caps` format""" TYPE_VOID = ord('V') # 0x56 TYPE_INT32 = ord('i') # 0x69 TYPE_UINT32 = ord('u') # 0x75 TYPE_FLOAT = ord('f') # 0x66 TYPE_INT64 = ord('l') # 0x6c TYPE_UINT64 = ord('k') # 0x6b TYPE_DOUBLE = ord('d') # 0x64 TYPE_STRING = ord('S') # 0x53 TYPE_BINARY = ord('B') # 0x42 TYPE_OBJECT = ord('O') # 0x4f def __init__(self, vType: chr = None, value: Any = None): self.__vType = vType self.__a = value
[docs] def type(self) -> chr: """Determine the type code for this value""" if self.__vType: return self.__vType if self.__a is None: return Caps.Value.TYPE_VOID elif isinstance(self.__a, bool): return Caps.Value.TYPE_UINT32 elif isinstance(self.__a, int): if -2**31 <= self.__a < 2**31: return Caps.Value.TYPE_INT32 elif 0 <= self.__a < 2**32: return Caps.Value.TYPE_UINT32 elif -2**63 <= self.__a < 2**63: return Caps.Value.TYPE_INT64 else: return Caps.Value.TYPE_UINT64 elif isinstance(self.__a, float): return Caps.Value.TYPE_FLOAT # Or double elif isinstance(self.__a, str): return Caps.Value.TYPE_STRING elif isinstance(self.__a, (bytes, bytearray)): return Caps.Value.TYPE_BINARY elif isinstance(self.__a, Caps): return Caps.Value.TYPE_OBJECT else: raise CapsError("Unsupported type: %s" % (type(self.__a)))
[docs] def getValueNoType(self) -> Any: return self.__a
[docs] def getInt(self) -> int: """Validate if value is int/IN32/UINT32 and return it""" if self.type() == Caps.Value.TYPE_INT32 or self.type() == Caps.Value.TYPE_UINT32: return int(self.__a) raise Caps.IncorrectTypeException()
[docs] def getLong(self) -> int: """Validate if value is long/INT64/UINT64 and return it""" if self.type() == Caps.Value.TYPE_INT64 or self.type() == Caps.Value.TYPE_UINT64: return int(self.__a) raise Caps.IncorrectTypeException()
[docs] def getFloat(self) -> float: """Validate if value is float/FLOAT and return it""" if self.type() == Caps.Value.TYPE_FLOAT: return float(self.__a) raise Caps.IncorrectTypeException()
[docs] def getDouble(self) -> float: """Validate if value is float/DOUBLE and return it""" if self.type() == Caps.Value.TYPE_DOUBLE: return float(self.__a) raise Caps.IncorrectTypeException()
[docs] def getString(self) -> str: """Validate if value is str/STRING and return it""" if self.type() == Caps.Value.TYPE_STRING: return str(self.__a) raise Caps.IncorrectTypeException()
[docs] def getBinary(self) -> bytes: """Validate if value is bytes/BINARY and return it""" if self.type() == Caps.Value.TYPE_BINARY: return bytes(self.__a) raise Caps.IncorrectTypeException()
[docs] def getObject(self) -> Caps: """Validate if value is Caps/OBJECT and return it""" if self.type() == Caps.Value.TYPE_OBJECT: return self.__a raise Caps.IncorrectTypeException()
def __repr__(self) -> str: current_type = self.type() a = [i for i in Caps.Value.__dict__.items() if i[1] == current_type] type_string = 'Caps.Value.' + a[0][0] if len(a) == 1 else current_type value_string = "'" + str(self.__a) + "'" if current_type == Caps.Value.TYPE_STRING else str(self.__a) return "Caps.Value(value=%s, vType=%s)" % (value_string, type_string)
# Example usage if __name__ == "__main__": # Create a Caps object caps = Caps() caps.writeUInt32(42) caps.writeString("Hello, World!") caps.writeDouble(3.14159) # Nested Caps nested = Caps() nested.writeString("nested string") nested.writeUInt32(123) caps.writeObject(nested) # Serialize data = caps.serialize() print("Serialized data:", data) print("Size:", len(data), "bytes") # Deserialize parsed, _ = Caps.fromBytes(data) print("\nParsed:", parsed, "\n") print("Value 0:", parsed.at(0).getInt()) print("Value 1:", parsed.at(1).getString()) print("Value 2:", parsed.at(2).getDouble()) print("Value 3 (nested):", parsed.at(3).getObject())