Source code for pyrokid_cxr_clientm.cxr_socket_protocol
"""
com.rokid.cxr.client-m:1.0.9 - CXRSocketProtocol.java in Python
CXRSocketProtocol is the layer that directly connects to the glasses.
It is not supposed to be used directly, as :class:`CxrApi` is supposed to be used to talk back and forth.
"""
from abc import ABC, abstractmethod
from enum import IntEnum
from typing import Callable
from .utils import ValueUtil
from .libcaps import Caps
[docs]
class PacketTypeIds(IntEnum):
"""Packet Type IDs used in :class:`CXRSocketProtocol` Requests and Responses"""
REQUEST = 0x1001 # request
RESPONSE = 0x1002 # response
NOTIFY = 0x1003 # notify
AUTH_REQUEST = 0x1004 # authRequest
AUTH_RESPONSE = 0x1005 # authResponse
ROKID_ACCOUNT_REQUEST = 0x1006 # changeRokidAccount
# Binary transfer related. Ai_TakePhoto for example
TRANSFER_START = 0x2001
TRANSFER_DATA = 0x2002
TRANSFER_END = 0x2003
# ARTC video frames
ARTC_START = 0x2011
ARTC_DATA = 0x2012
ARTC_END = 0x2013
# AudioStream frames
AUDIO_START = 0x3001
AUDIO_DATA = 0x3002
AUDIO_END = 0x3003
[docs]
class CXRSocketProtocol:
"""com.rokid.cxr.client.CXRSocketProtocol Java class in Python"""
a: str
"""mTag"""
b = None # BluetoothSocket
c = None # InputStream
d = None # OutputStream
e: Callable = None
f: int = 0
g: bool = False
h: bool = False
i: 'CXRSocketProtocol.Callback' = None
"""mCallback"""
j: int
"""size?"""
[docs]
class Callback(ABC):
"""com.rokid.cxr.client.CXRSocketProtocol.Callback Interface - Please extend this class and implement the methods!"""
[docs]
@abstractmethod
def onResponse(self, reqId: int, args: Caps) -> None: pass
[docs]
@abstractmethod
def onNotify(self, cmd: str, args: Caps) -> None: pass
[docs]
@abstractmethod
def onReceived(self, cmd: str, args: Caps, stream: bytes) -> None: pass
[docs]
@abstractmethod
def onStartAudioStream(self, i: int, codec: int, cmd: str, args: Caps) -> None: pass
[docs]
@abstractmethod
def onAudioStream(self, i: int, data: bytes, offset: int, size: int) -> None: pass
[docs]
@abstractmethod
def onAudioStreamFinish(self, codec: int) -> None: pass
[docs]
@abstractmethod
def onARTCFrame(self, data: bytes) -> None: pass
[docs]
@abstractmethod
def onDisconnect(self) -> None: pass
[docs]
def version(self) -> int: return 4
__all__ = ['CXRSocketProtocol', 'PacketTypeIds']