Skip to content

Debug ws connections #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 172 additions & 23 deletions livechat/utils/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ def on_message(ws_client: WebSocketApp, message: str):


def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str):
logger.info('websocket closed:')
logger.info('websocket closed')

if close_status_code or close_msg:
logger.info('close status code: ' + str(close_status_code))
logger.info('close message: ' + str(close_msg))
logger.info(f'close status code: {close_status_code}')
logger.info(f'close message: {close_msg}')


def on_error(ws_client: WebSocketApp, error: Exception):
logger.error(f'websocket error occurred: {str(error)}')
error_details = {
'error_type': type(error).__name__,
'error_message': str(error),
'url': getattr(ws_client, 'url', 'unknown'),
}
logger.error(f'WebSocket error occurred: {error_details}')


class WebsocketClient(WebSocketApp):
Expand Down Expand Up @@ -67,6 +72,7 @@ def open(self,
logger.warning(
'Cannot open new websocket connection, already connected.')
return

self.response_timeout = response_timeout
run_forever_kwargs = {
'sslopt': {
Expand All @@ -76,12 +82,140 @@ def open(self,
'ping_timeout': ping_timeout,
'ping_interval': ping_interval,
}

if keep_alive:
ping_thread = threading.Thread(target=self.run_forever,
kwargs=run_forever_kwargs)
ping_thread.start()
self._wait_till_sock_connected(ws_conn_timeout)
logger.debug(
f'Starting WebSocket connection to:\n{self.url}\nwith header:\n{self.header}'
)

connection_event = threading.Event()
connection_error = threading.Event()
handshake_info = {
'status': None,
'headers': None,
'error': None,
'url': self.url
}

error_info = {'message': None}

original_on_open = getattr(self, 'on_open', None)
original_on_error = getattr(self, 'on_error', None)

def on_open_with_event(ws):
try:
if hasattr(ws.sock, 'handshake_response'):
handshake_info[
'status'] = ws.sock.handshake_response.status
handshake_info[
'headers'] = ws.sock.handshake_response.headers
logger.debug(
f'WebSocket handshake successful - Status: {handshake_info["status"]}'
)
logger.debug(
f'WebSocket handshake headers: {handshake_info["headers"]}'
)
else:
logger.debug(
'WebSocket handshake completed but no response details available'
)
except Exception as e:
logger.warning(f'Could not capture handshake details: {e}')

connection_event.set()
if original_on_open:
original_on_open(ws)

def on_error_with_event(ws, error):
error_type = type(error).__name__
error_msg = str(error)

try:
if hasattr(ws.sock, 'handshake_response'
) and ws.sock.handshake_response:
handshake_info[
'status'] = ws.sock.handshake_response.status
handshake_info[
'headers'] = ws.sock.handshake_response.headers
except Exception:
pass

handshake_info['error'] = {
'type': error_type,
'message': error_msg,
'during_handshake': True
}

logger.error(
f'WebSocket error during connection: {error_type}: {error_msg}'
)

error_info['message'] = error_msg
connection_error.set()
if original_on_error:
original_on_error(ws, error)

self.on_open = on_open_with_event
self.on_error = on_error_with_event

try:
ping_thread = threading.Thread(target=self.run_forever,
kwargs=run_forever_kwargs)
ping_thread.daemon = True
ping_thread.start()

if connection_event.wait(timeout=ws_conn_timeout):
logger.debug(
f'WebSocket connection established successfully.\nHandshake status: {handshake_info["status"]}'
)
elif connection_error.is_set():
error_msg = error_info[
'message'] or 'Unknown connection error'
logger.error(f'WebSocket connection failed: {error_msg}')
logger.error(
f'Handshake info:\n {json.dumps(handshake_info, indent=4)}'
)

raise TimeoutError(
f'WebSocket connection failed due to error: {error_msg}'
)
else:
logger.error(
'WebSocket connection timeout - no response within timeout period'
)
if self.sock:
if hasattr(self.sock, 'handshake_response'):
handshake_info[
'status'] = self.sock.handshake_response.status
handshake_info[
'headers'] = self.sock.handshake_response.headers
else:
handshake_info['status'] = 'unknown'
handshake_info['headers'] = 'unknown'
else:
handshake_info['status'] = 'no socket'
handshake_info['headers'] = 'no socket'

logger.error(
f'Timeout details: {ws_conn_timeout}s waiting for connection to {handshake_info["url"]}'
)
logger.error(
f'Handshake info:\n {json.dumps(handshake_info, indent=4)}'
)

raise TimeoutError(
f'WebSocket handshake timeout after {ws_conn_timeout}s - server did not respond to HTTP upgrade request'
)

except Exception as e:
logger.error(f'Failed to establish WebSocket connection: {e}')
raise
finally:
self.on_open = original_on_open
self.on_error = original_on_error

return

self.run_forever(**run_forever_kwargs)

def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict:
Expand All @@ -96,14 +230,23 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict:
RtmResponse: RTM response structure (`request_id`, `action`,
`type`, `success` and `payload` properties)
'''
if not self.is_connected():
raise WebSocketConnectionClosedException(
'Connection is already closed.')

request_id = str(random.randint(1, 9999999999))
request.update({'request_id': request_id})
request_json = json.dumps(request, indent=4)
logger.info(f'\nREQUEST:\n{request_json}')

if not self.sock or self.sock.send(request_json, opcode) == 0:
raise WebSocketConnectionClosedException(
'Connection is already closed.')
try:
send_result = self.sock.send(request_json, opcode)
if send_result == 0:
raise WebSocketConnectionClosedException(
'Failed to send data - connection closed.')
except Exception as e:
logger.error(f'Failed to send WebSocket message: {e}')
raise WebSocketConnectionClosedException(f'Connection error: {e}')

def await_message(stop_event: threading.Event) -> dict:
while not stop_event.is_set():
Expand All @@ -130,16 +273,22 @@ def await_message(stop_event: threading.Event) -> dict:

return RtmResponse(response)

def _wait_till_sock_connected(self,
timeout: Union[float, int] = 10) -> NoReturn:
''' Polls until `self.sock` is connected.
Args:
timeout (float): timeout value in seconds, default 10. '''
if timeout < 0:
raise TimeoutError('Timed out waiting for WebSocket to open.')
def is_connected(self) -> bool:
''' Check if WebSocket connection is active and healthy. '''
try:
assert self.sock.connected
return
except (AttributeError, AssertionError):
sleep(0.1)
return self._wait_till_sock_connected(timeout=timeout - 0.1)
return (self.sock is not None and hasattr(self.sock, 'connected')
and self.sock.connected
and getattr(self, 'keep_running', False))
except Exception:
return False

def close(self, code: int = 1000, reason: str = 'Normal closure') -> None:
''' Close WebSocket connection gracefully. '''
logger.info(
f'Closing WebSocket connection (code: {code}, reason: {reason})')
try:
if self.sock:
self.sock.close(code, reason)
self.keep_running = False
except Exception as e:
logger.warning(f'Error during WebSocket close: {e}')
Loading