Skip to content

Commit 922fc72

Browse files
authored
Merge pull request #133 from opentensor/feat/thewhaleking/add-websocket-logging
Adds ability to log raw websockets for debugging.
2 parents dfca638 + c573189 commit 922fc72

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]]
7676

7777
logger = logging.getLogger("async_substrate_interface")
78+
raw_websocket_logger = logging.getLogger("raw_websocket")
7879

7980

8081
class AsyncExtrinsicReceipt:
@@ -505,6 +506,7 @@ def __init__(
505506
max_connections=100,
506507
shutdown_timer=5,
507508
options: Optional[dict] = None,
509+
_log_raw_websockets: bool = False,
508510
):
509511
"""
510512
Websocket manager object. Allows for the use of a single websocket connection by multiple
@@ -532,6 +534,8 @@ def __init__(
532534
self._exit_task = None
533535
self._open_subscriptions = 0
534536
self._options = options if options else {}
537+
self._log_raw_websockets = _log_raw_websockets
538+
535539
try:
536540
now = asyncio.get_running_loop().time()
537541
except RuntimeError:
@@ -615,7 +619,10 @@ async def shutdown(self):
615619
async def _recv(self) -> None:
616620
try:
617621
# TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic
618-
response = json.loads(await self.ws.recv(decode=False))
622+
recd = await self.ws.recv(decode=False)
623+
if self._log_raw_websockets:
624+
raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}")
625+
response = json.loads(recd)
619626
self.last_received = await self.loop_time()
620627
async with self._lock:
621628
# note that these 'subscriptions' are all waiting sent messages which have not received
@@ -660,7 +667,10 @@ async def send(self, payload: dict) -> int:
660667
# self._open_subscriptions += 1
661668
await self.max_subscriptions.acquire()
662669
try:
663-
await self.ws.send(json.dumps({**payload, **{"id": original_id}}))
670+
to_send = {**payload, **{"id": original_id}}
671+
if self._log_raw_websockets:
672+
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
673+
await self.ws.send(json.dumps(to_send))
664674
self.last_sent = await self.loop_time()
665675
return original_id
666676
except (ConnectionClosed, ssl.SSLError, EOFError):
@@ -699,6 +709,7 @@ def __init__(
699709
max_retries: int = 5,
700710
retry_timeout: float = 60.0,
701711
_mock: bool = False,
712+
_log_raw_websockets: bool = False,
702713
):
703714
"""
704715
The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to
@@ -716,16 +727,19 @@ def __init__(
716727
max_retries: number of times to retry RPC requests before giving up
717728
retry_timeout: how to long wait since the last ping to retry the RPC request
718729
_mock: whether to use mock version of the subtensor interface
730+
_log_raw_websockets: whether to log raw websocket requests during RPC requests
719731
720732
"""
721733
self.max_retries = max_retries
722734
self.retry_timeout = retry_timeout
723735
self.chain_endpoint = url
724736
self.url = url
725737
self._chain = chain_name
738+
self._log_raw_websockets = _log_raw_websockets
726739
if not _mock:
727740
self.ws = Websocket(
728741
url,
742+
_log_raw_websockets=_log_raw_websockets,
729743
options={
730744
"max_size": self.ws_max_size,
731745
"write_limit": 2**16,

async_substrate_interface/substrate_addons.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def __init__(
117117
max_retries: int = 5,
118118
retry_timeout: float = 60.0,
119119
_mock: bool = False,
120+
_log_raw_websockets: bool = False,
120121
archive_nodes: Optional[list[str]] = None,
121122
):
122123
fallback_chains = fallback_chains or []
@@ -151,6 +152,7 @@ def __init__(
151152
_mock=_mock,
152153
retry_timeout=retry_timeout,
153154
max_retries=max_retries,
155+
_log_raw_websockets=_log_raw_websockets,
154156
)
155157
initialized = True
156158
logger.info(f"Connected to {chain_url}")
@@ -260,6 +262,7 @@ def __init__(
260262
max_retries: int = 5,
261263
retry_timeout: float = 60.0,
262264
_mock: bool = False,
265+
_log_raw_websockets: bool = False,
263266
archive_nodes: Optional[list[str]] = None,
264267
):
265268
fallback_chains = fallback_chains or []
@@ -287,6 +290,7 @@ def __init__(
287290
_mock=_mock,
288291
retry_timeout=retry_timeout,
289292
max_retries=max_retries,
293+
_log_raw_websockets=_log_raw_websockets,
290294
)
291295
self._original_methods = {
292296
method: getattr(self, method) for method in RETRY_METHODS

async_substrate_interface/sync_substrate.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
ResultHandler = Callable[[dict, Any], tuple[dict, bool]]
5454

5555
logger = logging.getLogger("async_substrate_interface")
56+
raw_websocket_logger = logging.getLogger("raw_websocket")
5657

5758

5859
class ExtrinsicReceipt:
@@ -485,6 +486,7 @@ def __init__(
485486
max_retries: int = 5,
486487
retry_timeout: float = 60.0,
487488
_mock: bool = False,
489+
_log_raw_websockets: bool = False,
488490
):
489491
"""
490492
The sync compatible version of the subtensor interface commands we use in bittensor. Use this instance only
@@ -501,6 +503,7 @@ def __init__(
501503
max_retries: number of times to retry RPC requests before giving up
502504
retry_timeout: how to long wait since the last ping to retry the RPC request
503505
_mock: whether to use mock version of the subtensor interface
506+
_log_raw_websockets: whether to log raw websocket requests during RPC requests
504507
505508
"""
506509
self.max_retries = max_retries
@@ -527,6 +530,7 @@ def __init__(
527530
self.registry_type_map = {}
528531
self.type_id_to_name = {}
529532
self._mock = _mock
533+
self.log_raw_websockets = _log_raw_websockets
530534
if not _mock:
531535
self.ws = self.connect(init=True)
532536
self.initialize()
@@ -1831,12 +1835,18 @@ def _make_rpc_request(
18311835
ws = self.connect(init=False if attempt == 1 else True)
18321836
for payload in payloads:
18331837
item_id = get_next_id()
1834-
ws.send(json.dumps({**payload["payload"], **{"id": item_id}}))
1838+
to_send = {**payload["payload"], **{"id": item_id}}
1839+
if self.log_raw_websockets:
1840+
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
1841+
ws.send(json.dumps(to_send))
18351842
request_manager.add_request(item_id, payload["id"])
18361843

18371844
while True:
18381845
try:
1839-
response = json.loads(ws.recv(timeout=self.retry_timeout, decode=False))
1846+
recd = ws.recv(timeout=self.retry_timeout, decode=False)
1847+
if self.log_raw_websockets:
1848+
raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}")
1849+
response = json.loads(recd)
18401850
except (TimeoutError, ConnectionClosed):
18411851
if attempt >= self.max_retries:
18421852
logger.warning(

0 commit comments

Comments
 (0)