diff --git a/livechat/utils/ws_client.py b/livechat/utils/ws_client.py index 13d0c90..ea9e47b 100644 --- a/livechat/utils/ws_client.py +++ b/livechat/utils/ws_client.py @@ -23,15 +23,20 @@ def on_message(ws_client: WebSocketApp, message: str): def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str): - logger.info('websocket closed:') + logger.info('websocket closed') if close_status_code or close_msg: - logger.info('close status code: ' + str(close_status_code)) - logger.info('close message: ' + str(close_msg)) + logger.info(f'close status code: {close_status_code}') + logger.info(f'close message: {close_msg}') def on_error(ws_client: WebSocketApp, error: Exception): - logger.error(f'websocket error occurred: {str(error)}') + error_details = { + 'error_type': type(error).__name__, + 'error_message': str(error), + 'url': getattr(ws_client, 'url', 'unknown'), + } + logger.error(f'WebSocket error occurred: {error_details}') class WebsocketClient(WebSocketApp): @@ -67,6 +72,7 @@ def open(self, logger.warning( 'Cannot open new websocket connection, already connected.') return + self.response_timeout = response_timeout run_forever_kwargs = { 'sslopt': { @@ -76,12 +82,140 @@ def open(self, 'ping_timeout': ping_timeout, 'ping_interval': ping_interval, } + if keep_alive: - ping_thread = threading.Thread(target=self.run_forever, - kwargs=run_forever_kwargs) - ping_thread.start() - self._wait_till_sock_connected(ws_conn_timeout) + logger.debug( + f'Starting WebSocket connection to:\n{self.url}\nwith header:\n{self.header}' + ) + + connection_event = threading.Event() + connection_error = threading.Event() + handshake_info = { + 'status': None, + 'headers': None, + 'error': None, + 'url': self.url + } + + error_info = {'message': None} + + original_on_open = getattr(self, 'on_open', None) + original_on_error = getattr(self, 'on_error', None) + + def on_open_with_event(ws): + try: + if hasattr(ws.sock, 'handshake_response'): + handshake_info[ + 'status'] = ws.sock.handshake_response.status + handshake_info[ + 'headers'] = ws.sock.handshake_response.headers + logger.debug( + f'WebSocket handshake successful - Status: {handshake_info["status"]}' + ) + logger.debug( + f'WebSocket handshake headers: {handshake_info["headers"]}' + ) + else: + logger.debug( + 'WebSocket handshake completed but no response details available' + ) + except Exception as e: + logger.warning(f'Could not capture handshake details: {e}') + + connection_event.set() + if original_on_open: + original_on_open(ws) + + def on_error_with_event(ws, error): + error_type = type(error).__name__ + error_msg = str(error) + + try: + if hasattr(ws.sock, 'handshake_response' + ) and ws.sock.handshake_response: + handshake_info[ + 'status'] = ws.sock.handshake_response.status + handshake_info[ + 'headers'] = ws.sock.handshake_response.headers + except Exception: + pass + + handshake_info['error'] = { + 'type': error_type, + 'message': error_msg, + 'during_handshake': True + } + + logger.error( + f'WebSocket error during connection: {error_type}: {error_msg}' + ) + + error_info['message'] = error_msg + connection_error.set() + if original_on_error: + original_on_error(ws, error) + + self.on_open = on_open_with_event + self.on_error = on_error_with_event + + try: + ping_thread = threading.Thread(target=self.run_forever, + kwargs=run_forever_kwargs) + ping_thread.daemon = True + ping_thread.start() + + if connection_event.wait(timeout=ws_conn_timeout): + logger.debug( + f'WebSocket connection established successfully.\nHandshake status: {handshake_info["status"]}' + ) + elif connection_error.is_set(): + error_msg = error_info[ + 'message'] or 'Unknown connection error' + logger.error(f'WebSocket connection failed: {error_msg}') + logger.error( + f'Handshake info:\n {json.dumps(handshake_info, indent=4)}' + ) + + raise TimeoutError( + f'WebSocket connection failed due to error: {error_msg}' + ) + else: + logger.error( + 'WebSocket connection timeout - no response within timeout period' + ) + if self.sock: + if hasattr(self.sock, 'handshake_response'): + handshake_info[ + 'status'] = self.sock.handshake_response.status + handshake_info[ + 'headers'] = self.sock.handshake_response.headers + else: + handshake_info['status'] = 'unknown' + handshake_info['headers'] = 'unknown' + else: + handshake_info['status'] = 'no socket' + handshake_info['headers'] = 'no socket' + + logger.error( + f'Timeout details: {ws_conn_timeout}s waiting for connection to {handshake_info["url"]}' + ) + logger.error( + f'Handshake info:\n {json.dumps(handshake_info, indent=4)}' + ) + + raise TimeoutError( + f'WebSocket handshake timeout after {ws_conn_timeout}s - server did not respond to HTTP upgrade request' + ) + + except Exception as e: + logger.error(f'Failed to establish WebSocket connection: {e}') + raise + finally: + self.on_open = original_on_open + self.on_error = original_on_error + return + self.run_forever(**run_forever_kwargs) def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: @@ -96,14 +230,23 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: RtmResponse: RTM response structure (`request_id`, `action`, `type`, `success` and `payload` properties) ''' + if not self.is_connected(): + raise WebSocketConnectionClosedException( + 'Connection is already closed.') + request_id = str(random.randint(1, 9999999999)) request.update({'request_id': request_id}) request_json = json.dumps(request, indent=4) logger.info(f'\nREQUEST:\n{request_json}') - if not self.sock or self.sock.send(request_json, opcode) == 0: - raise WebSocketConnectionClosedException( - 'Connection is already closed.') + try: + send_result = self.sock.send(request_json, opcode) + if send_result == 0: + raise WebSocketConnectionClosedException( + 'Failed to send data - connection closed.') + except Exception as e: + logger.error(f'Failed to send WebSocket message: {e}') + raise WebSocketConnectionClosedException(f'Connection error: {e}') def await_message(stop_event: threading.Event) -> dict: while not stop_event.is_set(): @@ -130,16 +273,22 @@ def await_message(stop_event: threading.Event) -> dict: return RtmResponse(response) - def _wait_till_sock_connected(self, - timeout: Union[float, int] = 10) -> NoReturn: - ''' Polls until `self.sock` is connected. - Args: - timeout (float): timeout value in seconds, default 10. ''' - if timeout < 0: - raise TimeoutError('Timed out waiting for WebSocket to open.') + def is_connected(self) -> bool: + ''' Check if WebSocket connection is active and healthy. ''' try: - assert self.sock.connected - return - except (AttributeError, AssertionError): - sleep(0.1) - return self._wait_till_sock_connected(timeout=timeout - 0.1) + return (self.sock is not None and hasattr(self.sock, 'connected') + and self.sock.connected + and getattr(self, 'keep_running', False)) + except Exception: + return False + + def close(self, code: int = 1000, reason: str = 'Normal closure') -> None: + ''' Close WebSocket connection gracefully. ''' + logger.info( + f'Closing WebSocket connection (code: {code}, reason: {reason})') + try: + if self.sock: + self.sock.close(code, reason) + self.keep_running = False + except Exception as e: + logger.warning(f'Error during WebSocket close: {e}')