Source code for pyrokid_cxr_clientm.extend.controllers.wifi_controller

"""com.rokid.cxr.client-m:1.0.4 - extend/controllers/WifiController.java in Python

WifiController class gets the IP address of the glasses based on open ports
The original Android code actually did Wifi Direct/P2P connections, but I cannot do that.
"""

from __future__ import annotations
from abc import ABC, abstractmethod
from json import dumps as json_dumps
from threading import Thread
from time import sleep
import socket, struct, platform, subprocess, re
import concurrent.futures
from typing import Optional, List

from ...libcaps import Caps
from ...utils import LogUtil, ValueUtil
#from ...controllers.cxr_controller import CxrController

# https://learn.microsoft.com/en-us/windows/win32/api/wlanapi/nf-wlanapi-wfdstartopensession

def get_local_subnet() -> tuple[str, int]:
	"""Get local subnet and calculate number of hosts"""
	try:
		s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
		s.connect(("8.8.8.8", 80))
		local_ip = s.getsockname()[0]
		s.close()
		
		system = platform.system()
		netmask = None
		
		if system == 'Windows':
			startupinfo = subprocess.STARTUPINFO()
			startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
			startupinfo.wShowWindow = subprocess.SW_HIDE
			
			result = subprocess.run(['ipconfig'], capture_output=True, text=True,
				startupinfo=startupinfo,
				creationflags=subprocess.CREATE_NO_WINDOW)
			
			lines = result.stdout.split('\n')
			for i, line in enumerate(lines):
				if local_ip in line:
					# Look for subnet mask in next few lines
					for j in range(i, min(i+10, len(lines))):
						if 'Subnet Mask' in lines[j]:
							mask_match = re.search(r'(\d+\.\d+\.\d+\.\d+)', lines[j])
							if mask_match:
								netmask = mask_match.group(1)
								break
					break
		
		elif system == 'Linux':
			# Try ip command first (modern)
			try:
				result = subprocess.run(['ip', 'addr', 'show'], 
					capture_output=True, text=True, timeout=5)
				
				lines = result.stdout.split('\n')
				for line in lines:
					if local_ip in line and 'inet ' in line:
						# Format: "inet 192.168.1.100/24 brd ..."
						match = re.search(rf'inet {re.escape(local_ip)}/(\d+)', line)
						if match:
							cidr = int(match.group(1))
							# Convert CIDR to netmask
							mask_int = (0xffffffff << (32 - cidr)) & 0xffffffff
							netmask = socket.inet_ntoa(struct.pack('>I', mask_int))
							break
			except:
				pass
			
			# Fallback to ifconfig
			if not netmask:
				try:
					result = subprocess.run(['ifconfig'], 
						capture_output=True, text=True, timeout=5)
					
					lines = result.stdout.split('\n')
					for i, line in enumerate(lines):
						if local_ip in line:
							# Look for netmask in same or next line
							for j in range(i, min(i+3, len(lines))):
								mask_match = re.search(r'netmask\s+(\d+\.\d+\.\d+\.\d+)', lines[j])
								if not mask_match:
									# Try hex format: netmask 0xffffff00
									hex_match = re.search(r'netmask\s+(0x[0-9a-fA-F]+)', lines[j])
									if hex_match:
										mask_int = int(hex_match.group(1), 16)
										netmask = socket.inet_ntoa(struct.pack('>I', mask_int))
										break
								else:
									netmask = mask_match.group(1)
									break
							break
				except:
					pass
		
		elif system == 'Darwin':  # macOS
			try:
				result = subprocess.run(['ifconfig'], 
					capture_output=True, text=True, timeout=5)
				
				lines = result.stdout.split('\n')
				for i, line in enumerate(lines):
					if local_ip in line and 'inet ' in line:
						# Format: "inet 192.168.1.100 netmask 0xffffff00"
						for j in range(i, min(i+3, len(lines))):
							# Look for hex netmask
							hex_match = re.search(r'netmask\s+(0x[0-9a-fA-F]+)', lines[j])
							if hex_match:
								mask_int = int(hex_match.group(1), 16)
								netmask = socket.inet_ntoa(struct.pack('>I', mask_int))
								break
							# Or decimal format
							mask_match = re.search(r'netmask\s+(\d+\.\d+\.\d+\.\d+)', lines[j])
							if mask_match:
								netmask = mask_match.group(1)
								break
						break
			except:
				pass
		
		# If we got a netmask, calculate CIDR and return
		if netmask:
			cidr = sum([bin(int(x)).count('1') for x in netmask.split('.')])
			num_hosts = 2**(32 - cidr) - 2
			if num_hosts == -1: num_hosts = 1
			
			# Calculate network address
			ip_int = struct.unpack('>I', socket.inet_aton(local_ip))[0]
			mask_int = struct.unpack('>I', socket.inet_aton(netmask))[0]
			network_int = ip_int & mask_int
			network = socket.inet_ntoa(struct.pack('>I', network_int))
			
			return f"{network}/{cidr}", num_hosts
		
		# Default fallback to /24
		base = '.'.join(local_ip.split('.')[0:3])
		return f"{base}.0/24", 254
		
	except Exception as e:
		print(f"Error getting subnet: {e}")
		return "192.168.1.0/24", 254

def get_and_parse_subnet() -> tuple[str, str, int]:
	"""Get the local subnet and parse subnet CIDR notation and return base IP, start IP, and count"""
	subnet, num_hosts = get_local_subnet()

	ip_part, cidr = subnet.split('/')
	cidr = int(cidr)
	
	# Convert IP to integer
	ip_int = struct.unpack('>I', socket.inet_aton(ip_part))[0]
	
	# Calculate network address
	mask = (0xffffffff << (32 - cidr)) & 0xffffffff
	network = ip_int & mask
	
	# First usable IP (network + 1)
	first_ip_int = network + 1
	first_ip = socket.inet_ntoa(struct.pack('>I', first_ip_int))
	
	# Base for iteration (network address)
	base_ip = socket.inet_ntoa(struct.pack('>I', network))
	
	return base_ip, first_ip, num_hosts

def scan_port_manual(port: int, timeout: float = 0.5) -> List[str]:
	"""Manually scan subnet for open port"""
	
	# Parse subnet to get actual IP range
	base_ip, first_ip, num_hosts = get_and_parse_subnet()
	
	# Get base as integer for iteration
	base_int = struct.unpack('>I', socket.inet_aton(base_ip))[0]
	
	def check_port(ip_int: int) -> Optional[str]:
		try:
			ip = socket.inet_ntoa(struct.pack('>I', ip_int))
			sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
			sock.settimeout(timeout)
			result = sock.connect_ex((ip, port))
			sock.close()
			
			if result == 0: return ip
		except:
			pass
		return None
	
	ips = []
	
	# Scan all IPs in parallel
	with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
		futures = []
		# Start from network + 1, end at network + num_hosts
		for i in range(1, num_hosts + 1):
			ip_int = base_int + i
			futures.append(executor.submit(check_port, ip_int))
		
		for future in concurrent.futures.as_completed(futures):
			result = future.result()
			if result: ips.append(result)
	
	return ips

def find_device_by_port(port: int = 8848) -> Optional[str]:
	"""Find device with specific port open"""

	ips = scan_port_manual(port)

	if ips: return ips[0]
	return None

[docs] class WifiController: """com.rokid.cxr.client.extend.controllers.WifiController Java class to Python""" a = None # Context """mContext""" b: ValueUtil.CxrStatus = None """mWifiStatus""" c: 'WifiController.Callback' = None """mCallback""" d: str = None """mDeviceName""" n: int = 0 """mRetryCount""" o: bool = False """mIsConnected""" _CxrController = None """Temporary variable, that should NOT be used, unless you have the implementation of the CxrController"""
[docs] class Callback(ABC): """com.rokid.cxr.client.extend.controllers.WifiController.Callback Interface - Please extend this class and implement the methods"""
[docs] @abstractmethod def onStatusUpdate(self, cxrStatus: ValueUtil.CxrStatus, cxrWifiErrorCode: ValueUtil.CxrWifiErrorCode) -> None: pass
[docs] @abstractmethod def onAddress(self, address: str) -> None: pass
def __init__(self): LogUtil.i("WifiController", "WifiController constructed")
[docs] @staticmethod def getInstance() -> WifiController: LogUtil.v("WifiController", "getInstance") return _f.a
[docs] def init(self, context, deviceName: str, deviceAddress: str, callback: 'WifiController.Callback'): LogUtil.i("WifiController", "init") try: self.b = ValueUtil.CxrStatus.WIFI_INIT self.c = None self.deinit(ValueUtil.CxrWifiErrorCode.SUCCEED) self.a = context self.d = deviceName # self.h = deviceAddress # TODOOOOOO self.c = callback self.connectP2pService() except Exception as exception: LogUtil.e("WifiController", exception)
[docs] def isConnected(self) -> bool: LogUtil.d("WifiController", "isConnected") return self.o
[docs] def deinit(self, cxrWifiErrorCode: ValueUtil.CxrWifiErrorCode) -> ValueUtil.CxrStatus: LogUtil.i("WifiController", "deinit") cxrStatus = ValueUtil.CxrStatus.REQUEST_FAILED try: if self.c is not None: LogUtil.i("WifiController", "notify glass to close wifi p2p") strValue = json_dumps({ "type": "Android" }) LogUtil.i("WifiController", "typeJson: %s", strValue) caps = Caps() caps.write("Sync_Stop") caps.write(strValue) if self._CxrController is not None: cxrStatus = self._CxrController.getInstance().request(1, "Med", caps, None) else: LogUtil.e("WifiController", "mCallback is null") self.d = None self.n = 0 self.o = False self.updateStatus(ValueUtil.CxrStatus.WIFI_UNAVAILABLE, cxrWifiErrorCode) self.c = None except Exception as exception: LogUtil.e("WifiController", exception) return cxrStatus
[docs] def updateStatus(self, cxrStatus: ValueUtil.CxrStatus, cxrWifiErrorCode: ValueUtil.CxrWifiErrorCode) -> None: LogUtil.i("WifiController", "updateStatus: %s, errorCode: %s", cxrStatus, cxrWifiErrorCode) callback: WifiController.Callback = self.c if callback is None: LogUtil.e("WifiController", "mCallback is null") else: if self.b == cxrStatus: LogUtil.e("WifiController", "mWifiStatus == status") return self.b = cxrStatus self.o = (cxrStatus == ValueUtil.CxrStatus.WIFI_AVAILABLE) callback.onStatusUpdate(cxrStatus, cxrWifiErrorCode)
[docs] def connectToDevice(self) -> None: LogUtil.i("WifiController", "connectToDevice") hostAddress = find_device_by_port(port=8848) if hostAddress: self.n = 0 self.handleWifiConnectionChanged(hostAddress) return LogUtil.i("WifiController", "mRetryCount: %d", self.n) i = self.n if i < 10: # was 3, but the original WifiController also has a timeout of 15s, so this would be fine self.n = i + 1 sleep(1) self.connectToDevice() else: LogUtil.e("WifiController", "mRetryCount == WIFI_MAX_RETRY_COUNT") self.deinit(ValueUtil.CxrWifiErrorCode.WIFI_CONNECT_FAILED)
[docs] def handleWifiConnectionChanged(self, hostAddress: str) -> None: LogUtil.i("WifiController", "handleWifiConnectionChanged") if hostAddress is not None and isinstance(hostAddress, str): LogUtil.i("WifiController", "address: %s", hostAddress) self.n = 0 callback: WifiController.Callback = self.c if callback is not None: callback.onAddress(hostAddress) else: LogUtil.e("WifiController", "mCallback is null") self.updateStatus(ValueUtil.CxrStatus.WIFI_AVAILABLE, ValueUtil.CxrWifiErrorCode.SUCCEED)
[docs] def connectP2pService(self) -> None: LogUtil.i("WifiController", "connectP2pService") self.n = 0 Thread(target=self.connectToDevice, daemon=True).start()
class _f: a: WifiController = WifiController() __all__ = ['WifiController']