[docs]defwait_for_port(port:int,host:str="localhost",timeout:float=10.0):"""Wait until a port starts accepting TCP connections. Args: port: Port number. host: Host address on which the port should exist. timeout: In seconds. How long to wait before raising errors. Raises: TimeoutError: The port isn't accepting connection after time specified in `timeout`. """start_time=time.perf_counter()whileTrue:try:withsocket.create_connection((host,port),timeout=timeout):breakexceptOSErrorasex:time.sleep(0.01)iftime.perf_counter()-start_time>=timeout:raiseTimeoutError("Waited too long for the port {} on host {} to start accepting connections.".format(port,host))fromex
[docs]classPortReservationManager:"""Manages port reservations to prevent race conditions."""
[docs]defreserve_port(self)->int:"""Reserve a port and keep the socket open until released."""withself._lock:sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)sock.bind(("127.0.0.1",0))port=sock.getsockname()[1]self._reserved_ports[port]=sockreturnport
[docs]defrelease_port(self,port:int):"""Release a reserved port."""withself._lock:ifportinself._reserved_ports:self._reserved_ports[port].close()delself._reserved_ports[port]
[docs]@contextmanagerdefreserve_port_context(self):"""Context manager for port reservation."""port=self.reserve_port()try:yieldportfinally:self.release_port(port)
# Global port manager instance_port_manager=PortReservationManager()
[docs]defget_reserved_port()->int:"""Get an available port with reservation to prevent race conditions."""return_port_manager.reserve_port()
[docs]asyncdefwait_for_service_ready(port:int,host:str="localhost",health_path:Optional[str]=None,custom_check:Optional[Callable[[int],Awaitable[bool]]]=None,timeout:float=10.0,retry_interval:float=0.1,):"""Wait for a service to be ready by checking health endpoint or custom check. Args: port: Port number. host: Host address. health_path: Optional health check endpoint path. custom_check: Optional custom readiness check function. timeout: Total timeout in seconds. retry_interval: Time between retries in seconds. Raises: TimeoutError: If service is not ready within timeout. """start_time=time.perf_counter()base_url=f"http://{host}:{port}"whiletime.perf_counter()-start_time<timeout:try:ifcustom_check:ifawaitcustom_check(port):returnelifhealth_path:asyncwithhttpx.AsyncClient()asclient:response=awaitclient.get(f"{base_url}{health_path}")ifresponse.status_code==200:returnelse:# Fall back to simple port checkwithsocket.create_connection((host,port),timeout=1):returnexceptException:awaitasyncio.sleep(retry_interval)raiseTimeoutError(f"Service not ready on {host}:{port} after {timeout} seconds")