Skip to content

Clean logs #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions datura/datura/consumers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@
class BaseConsumer(abc.ABC):
def __init__(self, websocket: WebSocket):
self.websocket = websocket
self.disconnected = True

@abc.abstractmethod
def accepted_request_type(self) -> type[BaseRequest]:
pass

async def connect(self):
await self.websocket.accept()
self.disconnected = False

async def receive_message(self) -> BaseRequest:
data = await self.websocket.receive_text()
return self.accepted_request_type().parse(data)

async def send_message(self, msg: BaseRequest):
await self.websocket.send_text(msg.json())
await self.websocket.send_text(msg.model_dump_json())

async def disconnect(self):
self.disconnected = True
try:
await self.websocket.close()
except Exception:
Expand All @@ -39,11 +42,12 @@ async def handle_message(self, data: BaseRequest):
async def handle(self):
# await self.connect()
try:
while True:
while not self.disconnected:
data: BaseRequest = await self.receive_message()
await self.handle_message(data)
except WebSocketDisconnect as ex:
logger.info("Websocket connection closed, e: %s", str(ex))
if ex.code != 1000 or len(ex.reason) > 0:
logger.info("Websocket connection closed, e: %s", str(ex))
await self.disconnect()
except Exception as ex:
logger.info("Handling message error: %s", str(ex))
Expand Down
18 changes: 11 additions & 7 deletions neurons/miners/src/consumers/validator_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def verify_auth_msg(self, msg: AuthenticateRequest) -> tuple[bool, str]:
keypair = bittensor.Keypair(ss58_address=self.validator_key)
if keypair.verify(msg.blob_for_signing(), msg.signature):
return True, ""
return False, "signature verification failed"


async def handle_authentication(self, msg: AuthenticateRequest):
# check if validator is registered
Expand Down Expand Up @@ -100,8 +102,13 @@ async def check_validator_allowance(self):
If no executors, decline job request
"""
executors = self.executor_service.get_executors_for_validator(self.validator_key)
if len(executors):
logger.info("Found %d executors for validator(%s)", len(executors), self.validator_key)

logger.info("Validator %s was authenticated and has %d assigned executors", self.validator_key, len(executors))

if len(executors) <= 0:
await self.send_message(DeclineJobRequest())
await self.disconnect()
else:
await self.send_message(
AcceptJobRequest(
executors=[
Expand All @@ -110,10 +117,6 @@ async def check_validator_allowance(self):
]
)
)
else:
logger.info("Not found any executors for validator(%s)", self.validator_key)
await self.send_message(DeclineJobRequest())
await self.disconnect()

async def handle_message(self, msg: BaseValidatorRequest):
if isinstance(msg, AuthenticateRequest):
Expand Down Expand Up @@ -143,7 +146,8 @@ async def handle_message(self, msg: BaseValidatorRequest):
logger.info("Sent AcceptSSHKeyRequest to validator %s", self.validator_key)
except Exception as e:
logger.error("Storing SSH key or Sending AcceptSSHKeyRequest failed: %s", str(e))
self.ssh_service.remove_pubkey_from_host(msg.public_key)
# Maybe we should remove key from all executors?
# self.ssh_service.remove_pubkey_from_host(msg.public_key)
await self.send_message(FailedRequest(details=str(e)))
return

Expand Down
11 changes: 7 additions & 4 deletions neurons/miners/src/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ def get_extra_info(extra: dict) -> dict:
def configure_logs_of_other_modules():
miner_hotkey = settings.get_bittensor_wallet().get_hotkey().ss58_address

logging.basicConfig(
level=logging.INFO,
format=f"Miner: {miner_hotkey} | Name: %(name)s | Time: %(asctime)s | Level: %(levelname)s | File: %(filename)s | Function: %(funcName)s | Line: %(lineno)s | Process: %(process)d | Message: %(message)s",
)
log_format = f"%(asctime)s [%(levelname)s] %(message)s"
log_level = logging.INFO if not settings.DEBUG else logging.DEBUG

if settings.ENV == 'dev':
log_format = f"%(asctime)s [%(levelname)s] [%(process)d] [%(name)s | %(funcName)s:%(lineno)s] %(message)s"

logging.basicConfig(force=True, level=log_level, format=log_format)

sqlalchemy_logger = logging.getLogger("sqlalchemy")
sqlalchemy_logger.setLevel(logging.WARNING)
Expand Down
26 changes: 21 additions & 5 deletions neurons/miners/src/services/executor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ async def send_pubkey_to_executor(
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error("API request failed to register SSH key. url=%s", url)
logger.error(
"Failed to register SSH key on executor. url=%s, status_code=%d, reason=%s",
url, response.status, response.reason
)
return None
response_obj: dict = await response.json()
logger.info(
Expand All @@ -59,7 +62,7 @@ async def send_pubkey_to_executor(
response_obj["uuid"] = str(executor.uuid)
response_obj["address"] = executor.address
response_obj["port"] = executor.port
return ExecutorSSHInfo.parse_obj(response_obj)
return ExecutorSSHInfo.model_validate(response_obj)
except Exception as e:
logger.error(
"API request failed to register SSH key. url=%s, error=%s", url, str(e)
Expand All @@ -83,11 +86,15 @@ async def remove_pubkey_from_executor(self, executor: Executor, pubkey: str):
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error("API request failed to register SSH key. url=%s", url)
logger.error(
"Failed to remove SSH key on executor. url=%s, status_code=%d, reason=%s",
url, response.status, response.reason
)
return None
return True
except Exception as e:
logger.error(
"API request failed to register SSH key. url=%s, error=%s", url, str(e)
"API request failed to remove SSH key. url=%s, error=%s", url, str(e)
)

async def register_pubkey(self, validator_hotkey: str, pubkey: bytes, executor_id: Optional[str] = None):
Expand Down Expand Up @@ -133,7 +140,16 @@ async def deregister_pubkey(self, validator_hotkey: str, pubkey: bytes, executor
)
for executor in self.get_executors_for_validator(validator_hotkey, executor_id)
]
await asyncio.gather(*tasks, return_exceptions=True)

total_executors = len(tasks)
results = [
result for result in await asyncio.gather(*tasks, return_exceptions=True) if result
]
logger.info(
"Send pubkey remove API requests to %d executors and received results from %d executors",
total_executors,
len(results),
)

async def get_pod_logs(
self, validator_hotkey: str, executor_id: str, container_name: str
Expand Down