diff --git a/autogen/agentchat/conversable_agent.py b/autogen/agentchat/conversable_agent.py index 1c18d4f4ded8..2b417bd6f0ad 100644 --- a/autogen/agentchat/conversable_agent.py +++ b/autogen/agentchat/conversable_agent.py @@ -38,7 +38,7 @@ from ..function_utils import get_function_schema, load_basemodels_if_needed, serialize_to_str from ..io.base import IOStream from ..oai.client import ModelClient, OpenAIWrapper -from ..runtime_logging import log_event, log_function_use, log_new_agent, logging_enabled +from ..runtime_logging import log_event, log_flow, log_function_use, log_new_agent, logging_enabled from .agent import Agent, LLMAgent from .chat import ChatResult, a_initiate_chats, initiate_chats from .utils import consolidate_chat_info, gather_usage_summary @@ -430,7 +430,23 @@ def _summary_from_nested_chats( chat_to_run = ConversableAgent._get_chats_to_run(chat_queue, recipient, messages, sender, config) if not chat_to_run: return True, None + + if logging_enabled(): # Nested chat log - start + import uuid + + unique_nested_id = uuid.uuid4() + log_flow( + source=recipient, + code_point="_summary_from_nested_chat start", + code_point_id=str(unique_nested_id), + sender=sender.name, + ) + res = initiate_chats(chat_to_run) + + if logging_enabled(): # Nested chat log - end + log_flow(source=recipient, code_point="_summary_from_nested_chat end", code_point_id=str(unique_nested_id)) + return True, res[-1].summary @staticmethod @@ -856,7 +872,7 @@ def _process_received_message(self, message: Union[Dict, str], sender: Agent, si # When the agent receives a message, the role of the message is "user". (If 'role' exists and is 'function', it will remain unchanged.) valid = self._append_oai_message(message, "user", sender, is_sending=False) if logging_enabled(): - log_event(self, "received_message", message=message, sender=sender.name, valid=valid) + log_event(self, "received_message", message=message, sender=sender.name, valid=valid, silent=silent) if not valid: raise ValueError( @@ -1093,6 +1109,9 @@ def my_message(sender: ConversableAgent, recipient: ConversableAgent, context: d if msg2send is None: break self.send(msg2send, recipient, request_reply=True, silent=silent) + else: + if logging_enabled(): # Log max turns being hit + log_flow(source=self, code_point="_initiate_chat max_turns", code_point_id=None, turns=max_turns) else: self._prepare_chat(recipient, clear_history) if isinstance(message, Callable): @@ -1159,6 +1178,9 @@ async def a_initiate_chat( if msg2send is None: break await self.a_send(msg2send, recipient, request_reply=True, silent=silent) + else: + if logging_enabled(): # Log max turns being hit + log_flow(source=self, code_point="_initiate_chat max_turns", code_point_id=None, turns=max_turns) else: self._prepare_chat(recipient, clear_history) if isinstance(message, Callable): @@ -1243,6 +1265,10 @@ def _last_msg_as_summary(sender, recipient, summary_args) -> str: ) except (IndexError, AttributeError) as e: warnings.warn(f"Cannot extract summary using last_msg: {e}. Using an empty str as summary.", UserWarning) + + if logging_enabled(): + log_flow(source=sender, code_point="_last_msg_as_summary", code_point_id=None, summary=summary) + return summary @staticmethod @@ -1265,6 +1291,17 @@ def _reflection_with_llm_as_summary(sender, recipient, summary_args): f"Cannot extract summary using reflection_with_llm: {e}. Using an empty str as summary.", UserWarning ) summary = "" + + if logging_enabled(): + log_flow( + source=sender, + code_point="_reflection_with_llm_as_summary", + code_point_id=None, + prompt=prompt, + msg_list=msg_list, + summary=summary, + ) + return summary def _reflection_with_llm( @@ -1727,6 +1764,19 @@ def generate_tool_calls_reply( "role": "tool", "content": content, } + + if logging_enabled(): # Logging, including function name + log_flow( + source=self, + code_point="generate_tool_calls_reply", + code_point_id=None, + tool_call_id=str(tool_call_id) if tool_call_id is not None else "", + function_name=function_call["name"], + function_arguments=function_call["arguments"], + return_value=content, + sender=sender.name, + ) + tool_returns.append(tool_call_response) if tool_returns: return True, { @@ -1759,10 +1809,37 @@ async def a_generate_tool_calls_reply( messages = self._oai_messages[sender] message = messages[-1] async_tool_calls = [] + tool_calls_info = [] # List to store tool call info for logging for tool_call in message.get("tool_calls", []): + function_call = tool_call.get("function", {}) + tool_calls_info.append( + { + "tool_call_id": tool_call.get("id"), + "function_name": function_call.get("name"), + "function_arguments": function_call.get("arguments", {}), + } + ) + async_tool_calls.append(self._a_execute_tool_call(tool_call)) if async_tool_calls: tool_returns = await asyncio.gather(*async_tool_calls) + + # Log each tool return along with the corresponding function info + if logging_enabled(): + for tool_return, tool_call_info in zip(tool_returns, tool_calls_info): + log_flow( + source=self, + code_point="a_generate_tool_calls_reply", + code_point_id=None, + tool_call_id=( + str(tool_call_info["tool_call_id"]) if tool_call_info["tool_call_id"] is not None else "" + ), + function_name=tool_call_info["function_name"], + function_arguments=tool_call_info["function_arguments"], + return_value=tool_return["content"], + sender=sender.name if sender else "", + ) + return True, { "role": "tool", "tool_responses": tool_returns, diff --git a/autogen/agentchat/groupchat.py b/autogen/agentchat/groupchat.py index b9784de48202..9d534477a7c5 100644 --- a/autogen/agentchat/groupchat.py +++ b/autogen/agentchat/groupchat.py @@ -10,6 +10,7 @@ import random import re import sys +import uuid from dataclasses import dataclass, field from typing import Callable, Dict, List, Literal, Optional, Tuple, Union @@ -18,7 +19,7 @@ from ..formatting_utils import colored from ..graph_utils import check_graph_validity, invert_disallowed_to_allowed from ..io.base import IOStream -from ..runtime_logging import log_new_agent, logging_enabled +from ..runtime_logging import log_flow, log_new_agent, logging_enabled from .agent import Agent from .chat import ChatResult from .contrib.capabilities import transform_messages @@ -272,6 +273,18 @@ def __post_init__(self): if self.select_speaker_auto_verbose is None or not isinstance(self.select_speaker_auto_verbose, bool): raise ValueError("select_speaker_auto_verbose cannot be None or non-bool") + if logging_enabled(): + log_flow( + "[GroupChat]", + "__post_init__", + "", + agent_names=self.agent_names, + groupchat_name=self.admin_name, + speaker_selection_method=self.speaker_selection_method, + ) + + self._unique_id = str(uuid.uuid4()) # unique identifier for tracking if multiple group chats + @property def agent_names(self) -> List[str]: """Return the names of the agents in the group chat.""" @@ -402,6 +415,14 @@ def manual_select_speaker(self, agents: Optional[List[Agent]] = None) -> Union[A break i = int(i) if i > 0 and i <= _n_agents: + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="manual_select_speaker", + code_point_id=None, + groupchat_name=self.admin_name, + next_agent=agents[i - 1].name, + ) return agents[i - 1] else: raise ValueError @@ -413,7 +434,19 @@ def random_select_speaker(self, agents: Optional[List[Agent]] = None) -> Union[A """Randomly select the next speaker.""" if agents is None: agents = self.agents - return random.choice(agents) + + random_selection = random.choice(agents) + + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="random_select_speaker", + code_point_id=None, + groupchat_name=self.admin_name, + next_agent=random_selection.name, + ) + + return random_selection def _prepare_and_select_agents( self, @@ -424,6 +457,24 @@ def _prepare_and_select_agents( speaker_selection_method = self.speaker_selection_method if isinstance(self.speaker_selection_method, Callable): selected_agent = self.speaker_selection_method(last_speaker, self) + + if logging_enabled() and selected_agent: + log_flow( + source="[GroupChat]", + code_point=f"_prepare_and_select_agents:callable:{self.speaker_selection_method.__name__}", + code_point_id=None, + groupchat_unique_id=self._unique_id, + last_speaker=last_speaker.name, + next_agent=( + selected_agent.name + if selected_agent is not None and isinstance(selected_agent, Agent) + else "[NONE]" + ), + returned_speaker_selection_method=( + selected_agent if selected_agent is not None and isinstance(selected_agent, str) else None + ), + ) + if selected_agent is None: raise NoEligibleSpeaker("Custom speaker selection function returned None. Terminating conversation.") elif isinstance(selected_agent, Agent): @@ -530,6 +581,17 @@ def _prepare_and_select_agents( selected_agent = self.manual_select_speaker(graph_eligible_agents) elif speaker_selection_method.lower() == "round_robin": selected_agent = self.next_agent(last_speaker, graph_eligible_agents) + + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="round_robin", + code_point_id=None, + groupchat_name=self.admin_name, + last_speaker=last_speaker.name, + next_agent=selected_agent.name, + ) + elif speaker_selection_method.lower() == "random": selected_agent = self.random_select_speaker(graph_eligible_agents) else: # auto @@ -540,6 +602,17 @@ def _prepare_and_select_agents( select_speaker_messages[-1] = dict(select_speaker_messages[-1], function_call=None) if select_speaker_messages[-1].get("tool_calls", False): select_speaker_messages[-1] = dict(select_speaker_messages[-1], tool_calls=None) + + if logging_enabled() and selected_agent: + log_flow( + source="[GroupChat]", + code_point=f"speaker_selection_method:{speaker_selection_method}", + code_point_id=None, + groupchat_unique_id=self._unique_id, + last_speaker=last_speaker.name, + next_agent=selected_agent.name, + ) + return selected_agent, graph_eligible_agents, select_speaker_messages def select_speaker(self, last_speaker: Agent, selector: ConversableAgent) -> Agent: @@ -551,7 +624,19 @@ def select_speaker(self, last_speaker: Agent, selector: ConversableAgent) -> Age return selected_agent elif self.speaker_selection_method == "manual": # An agent has not been selected while in manual mode, so move to the next agent - return self.next_agent(last_speaker) + next_agent = self.next_agent(last_speaker) + + if logging_enabled() and selected_agent: + log_flow( + source="[GroupChat]", + code_point="speaker_selection_method:manual-not-selected", + code_point_id=None, + groupchat_name=self.admin_name, + last_speaker=last_speaker.name, + next_agent=next_agent.name, + ) + + return next_agent # auto speaker selection with 2-agent chat return self._auto_select_speaker(last_speaker, selector, messages, agents) @@ -564,7 +649,19 @@ async def a_select_speaker(self, last_speaker: Agent, selector: ConversableAgent return selected_agent elif self.speaker_selection_method == "manual": # An agent has not been selected while in manual mode, so move to the next agent - return self.next_agent(last_speaker) + next_agent = self.next_agent(last_speaker) + + if logging_enabled() and selected_agent: + log_flow( + source="[GroupChat]", + code_point="speaker_selection_method:manual-not-selected", + code_point_id=None, + groupchat_name=self.admin_name, + last_speaker=last_speaker.name, + next_agent=next_agent.name, + ) + + return next_agent # auto speaker selection with 2-agent chat return await self.a_auto_select_speaker(last_speaker, selector, messages, agents) @@ -614,6 +711,17 @@ def _auto_select_speaker( Dict: a counter for mentioned agents. """ + if logging_enabled(): + auto_select_speaker_id = uuid.uuid4() + log_flow( + source="[GroupChat]", + code_point="_auto_select_speaker start", + code_point_id=None, + groupchat_name=self.admin_name, + selector=selector.name, + auto_select_speaker_id=str(auto_select_speaker_id), + ) + # If no agents are passed in, assign all the group chat's agents if agents is None: agents = self.agents @@ -689,6 +797,16 @@ def validate_speaker_name(recipient, messages, sender, config) -> Tuple[bool, Un silent=not self.select_speaker_auto_verbose, # Base silence on the verbose attribute ) + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="_auto_select_speaker end", + code_point_id=None, + groupchat_name=self.admin_name, + selector=selector.name, + auto_select_speaker_id=str(auto_select_speaker_id), + ) + return self._process_speaker_selection_result(result, last_speaker, agents) async def a_auto_select_speaker( @@ -718,6 +836,17 @@ async def a_auto_select_speaker( Dict: a counter for mentioned agents. """ + if logging_enabled(): + auto_select_speaker_id = uuid.uuid4() + log_flow( + source="[GroupChat]", + code_point="a_auto_select_speaker start", + code_point_id=None, + groupchat_name=self.admin_name, + selector=selector.name, + auto_select_speaker_id=str(auto_select_speaker_id), + ) + # If no agents are passed in, assign all the group chat's agents if agents is None: agents = self.agents @@ -787,6 +916,16 @@ def validate_speaker_name(recipient, messages, sender, config) -> Tuple[bool, Un silent=not self.select_speaker_auto_verbose, # Base silence on the verbose attribute ) + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="a_auto_select_speaker end", + code_point_id=None, + groupchat_name=self.admin_name, + selector=selector.name, + auto_select_speaker_id=str(auto_select_speaker_id), + ) + return self._process_speaker_selection_result(result, last_speaker, agents) def _validate_speaker_name( @@ -990,14 +1129,15 @@ def __init__( system_message=system_message, **kwargs, ) - if logging_enabled(): - log_new_agent(self, locals()) + # Store groupchat self._groupchat = groupchat - self._last_speaker = None self._silent = silent + if logging_enabled(): + log_new_agent(self, locals()) + # Order of register_reply is important. # Allow sync chat if initiated using initiate_chat self.register_reply(Agent, GroupChatManager.run_chat, config=groupchat, reset_config=GroupChat.reset) @@ -1099,6 +1239,19 @@ def run_chat( send_introductions = getattr(groupchat, "send_introductions", False) silent = getattr(self, "_silent", False) + if logging_enabled(): + unique_nested_id = uuid.uuid4() + log_flow( + source="[GroupChat]", + code_point="run_chat start", + code_point_id=str(unique_nested_id), + groupchatmanager_source_id=id(self), + groupchatmanager_name=self.name, + sender=sender.name if sender else "None", + send_introductions=send_introductions, + silent=silent, + ) + if send_introductions: # Broadcast the intro intro = groupchat.introductions_msg() @@ -1120,6 +1273,18 @@ def run_chat( self.send(message, agent, request_reply=False, silent=True) if self._is_termination_msg(message) or i == groupchat.max_round - 1: # The conversation is over or it's the last round + + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="run_chat " + + ("is_termination_msg" if self._is_termination_msg(message) else "max_rounds"), + code_point_id=str(unique_nested_id), + groupchatmanager_source_id=id(self), + groupchatmanager_name=self.name, + sender=sender.name if sender else "None", + ) + break try: # select the next speaker @@ -1162,6 +1327,17 @@ def run_chat( for a in groupchat.agents: a.client_cache = a.previous_cache a.previous_cache = None + + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="run_chat end", + code_point_id=str(unique_nested_id), + groupchatmanager_source_id=id(self), + groupchatmanager_name=self.name, + sender=sender.name if sender else "None", + ) + return True, None async def a_run_chat( @@ -1179,6 +1355,19 @@ async def a_run_chat( send_introductions = getattr(groupchat, "send_introductions", False) silent = getattr(self, "_silent", False) + if logging_enabled(): + unique_nested_id = uuid.uuid4() + log_flow( + source="[GroupChat]", + code_point="a_run_chat start", + code_point_id=str(unique_nested_id), + groupchatmanager_source_id=id(self), + groupchatmanager_name=self.name, + sender=sender.name if sender else "None", + send_introductions=send_introductions, + silent=silent, + ) + if send_introductions: # Broadcast the intro intro = groupchat.introductions_msg() @@ -1228,6 +1417,17 @@ async def a_run_chat( for a in groupchat.agents: a.client_cache = a.previous_cache a.previous_cache = None + + if logging_enabled(): + log_flow( + source="[GroupChat]", + code_point="run_chat end", + code_point_id=str(unique_nested_id), + groupchatmanager_source_id=id(self), + groupchatmanager_name=self.name, + sender=sender.name if sender else "None", + ) + return True, None def resume( diff --git a/autogen/logger/base_logger.py b/autogen/logger/base_logger.py index 12a08dd944c1..c3d97270d850 100644 --- a/autogen/logger/base_logger.py +++ b/autogen/logger/base_logger.py @@ -105,14 +105,29 @@ def log_new_client( self, client: Union[AzureOpenAI, OpenAI], wrapper: OpenAIWrapper, init_args: Dict[str, Any] ) -> None: """ - Log the birth of a new OpenAIWrapper. + Log the creation of a new client. Args: - wrapper (OpenAI): The OpenAI client to log. - init_args (dict): The arguments passed to the construct the client + client (AzureOpenAI or OpenAI) The client class to log + wrapper (OpenAIWrapper): The OpenAIWrapper to log + init_args (dict): The arguments passed to the construct the client """ ... + @abstractmethod + def log_new_custom_client( + self, client: Any, wrapper: OpenAIWrapper, init_args: Dict[str, Any], model_client_cls_name: str + ) -> None: + """ + Log the creation of a new custom client. + + Args: + client (Any): The client class to log + wrapper (OpenAIWrapper): The OpenAI wrapper to log + init_args (dict): The arguments passed to the construct the client + model_client_cls_name: The custom client class name + """ + @abstractmethod def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: Any) -> None: """ @@ -125,6 +140,20 @@ def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[st returns (any): The return """ + @abstractmethod + def log_flow( + self, source: Union[str, Agent], code_point: str, code_point_id: str, **kwargs: Dict[str, Any] + ) -> None: + """ + Log a point in code flow + + Args: + source (str or Agent): The source/creator of the event as a string name or an Agent instance + code_point (str): The code point name + code_point_id (str): A unique id to associated related flow code points + kwargs (dict): The flow information to log + """ + @abstractmethod def stop(self) -> None: """ diff --git a/autogen/logger/file_logger.py b/autogen/logger/file_logger.py index 935cd670510b..fc2ea4740f3c 100644 --- a/autogen/logger/file_logger.py +++ b/autogen/logger/file_logger.py @@ -17,7 +17,7 @@ from openai.types.chat import ChatCompletion from autogen.logger.base_logger import BaseLogger -from autogen.logger.logger_utils import get_current_ts, to_dict +from autogen.logger.logger_utils import get_current_ts, to_dict, try_to_dict from .base_logger import LLMConfig @@ -139,7 +139,7 @@ def log_new_agent(self, agent: ConversableAgent, init_args: Dict[str, Any] = {}) "session_id": self.session_id, "current_time": get_current_ts(), "agent_type": type(agent).__name__, - "args": to_dict(init_args), + "args": try_to_dict(init_args, ignore_callable=True), "thread_id": thread_id, } ) @@ -204,7 +204,7 @@ def log_new_wrapper( { "wrapper_id": id(wrapper), "session_id": self.session_id, - "json_state": json.dumps(init_args), + "json_state": try_to_dict(init_args), "timestamp": get_current_ts(), "thread_id": thread_id, } @@ -243,14 +243,38 @@ def log_new_client( "wrapper_id": id(wrapper), "session_id": self.session_id, "class": type(client).__name__, - "json_state": json.dumps(init_args), + "json_state": try_to_dict(init_args), "timestamp": get_current_ts(), "thread_id": thread_id, } ) self.logger.info(log_data) except Exception as e: - self.logger.error(f"[file_logger] Failed to log event {e}") + self.logger.error(f"[file_logger] Failed to log new client {e}") + + def log_new_custom_client( + self, client: Any, wrapper: OpenAIWrapper, init_args: Dict[str, Any], model_client_cls_name: str + ) -> None: + """ + Log a new client instance. + """ + thread_id = threading.get_ident() + + try: + log_data = json.dumps( + { + "client_id": id(client), + "wrapper_id": id(wrapper), + "session_id": self.session_id, + "model_client_class": model_client_cls_name, + "json_state": try_to_dict(init_args), + "timestamp": get_current_ts(), + "thread_id": thread_id, + } + ) + self.logger.info(log_data) + except Exception as e: + self.logger.error(f"[file_logger] Failed to log new custom client class {e}") def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[str, Any], returns: Any) -> None: """ @@ -273,7 +297,53 @@ def log_function_use(self, source: Union[str, Agent], function: F, args: Dict[st ) self.logger.info(log_data) except Exception as e: - self.logger.error(f"[file_logger] Failed to log event {e}") + self.logger.error(f"[file_logger] Failed to log new function use {e}") + + def log_flow( + self, source: Union[str, Agent], code_point: str, code_point_id: str, **kwargs: Dict[str, Any] + ) -> None: + """ + Log a point in code flow + """ + from autogen import Agent + + json_args = json.dumps(kwargs, default=lambda o: f"<>") + thread_id = threading.get_ident() + + if isinstance(source, Agent): + try: + log_data = json.dumps( + { + "source_id": id(source), + "source_name": str(source.name) if hasattr(source, "name") else source, + "agent_module": source.__module__, + "agent_class": source.__class__.__name__, + "code_point": code_point, + "code_point_id": code_point_id, + "info": json_args, + "timestamp": get_current_ts(), + "thread_id": thread_id, + } + ) + self.logger.info(log_data) + except Exception as e: + self.logger.error(f"[file_logger] Failed to log flow {e}") + else: + try: + log_data = json.dumps( + { + "source_id": id(source), + "source_name": str(source.name) if hasattr(source, "name") else source, + "code_point": code_point, + "code_point_id": code_point_id, + "info": json_args, + "timestamp": get_current_ts(), + "thread_id": thread_id, + } + ) + self.logger.info(log_data) + except Exception as e: + self.logger.error(f"[file_logger] Failed to log flow {e}") def get_connection(self) -> None: """Method is intentionally left blank because there is no specific connection needed for the FileLogger.""" diff --git a/autogen/logger/logger_utils.py b/autogen/logger/logger_utils.py index f70b960d4df7..a1e087003dea 100644 --- a/autogen/logger/logger_utils.py +++ b/autogen/logger/logger_utils.py @@ -6,6 +6,8 @@ # SPDX-License-Identifier: MIT import datetime import inspect +import json +from pathlib import Path from typing import Any, Dict, List, Tuple, Union __all__ = ("get_current_ts", "to_dict") @@ -15,28 +17,80 @@ def get_current_ts() -> str: return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f") +def try_to_dict( + obj: Union[int, float, str, bool, Dict[Any, Any], List[Any], Tuple[Any, ...], Any], ignore_callable: bool = False +) -> Any: + """Attempts to convert to dictionary, ensuring that all values are JSON serializable""" + try: + result = to_dict(obj, ignore_callable=ignore_callable) # Attempt conversion using to_dict + + # Validate if the result is JSON serializable by attempting to dump it + json.dumps(result) # This will throw a TypeError or OSError if not serializable + return result + + except (TypeError, ValueError, OSError): # Catch JSON serialization and OSError + # Handle non-serializable types like PosixPath or objects with missing class definitions + if isinstance(obj, Path): + return None # Skip the PosixPath (or convert it with str(obj)) + elif isinstance(obj, (list, tuple)): + return [ + try_to_dict(item, ignore_callable=ignore_callable) for item in obj + ] # Recursively handle lists/tuples + elif isinstance(obj, dict): + return { + key: try_to_dict(value, ignore_callable=ignore_callable) for key, value in obj.items() + } # Recursively handle dicts + else: + # Fallback to string representation for unrecognized or dynamic objects + return None # repr(obj) + + def to_dict( obj: Union[int, float, str, bool, Dict[Any, Any], List[Any], Tuple[Any, ...], Any], exclude: Tuple[str, ...] = (), no_recursive: Tuple[Any, ...] = (), + ignore_callable: bool = False, + _depth: int = 0, + _max_depth: int = 3, # Maximum depth to extract for log ) -> Any: - if isinstance(obj, (int, float, str, bool)): - return obj - elif callable(obj): - return inspect.getsource(obj).strip() - elif isinstance(obj, dict): - return { - str(k): to_dict(str(v)) if isinstance(v, no_recursive) else to_dict(v, exclude, no_recursive) - for k, v in obj.items() - if k not in exclude - } - elif isinstance(obj, (list, tuple)): - return [to_dict(str(v)) if isinstance(v, no_recursive) else to_dict(v, exclude, no_recursive) for v in obj] - elif hasattr(obj, "__dict__"): - return { - str(k): to_dict(str(v)) if isinstance(v, no_recursive) else to_dict(v, exclude, no_recursive) - for k, v in vars(obj).items() - if k not in exclude - } - else: - return obj + if _depth > _max_depth: + return None # Maximum depth reached, stop here + + try: + if isinstance(obj, (int, float, str, bool)): + return obj + elif callable(obj): + return None if ignore_callable else inspect.getsource(obj).strip() + elif isinstance(obj, dict): + return { + str(k): ( + to_dict(str(v), ignore_callable=ignore_callable, _depth=_depth + 1) + if isinstance(v, no_recursive) + else to_dict(v, exclude, no_recursive, ignore_callable, _depth=_depth + 1) + ) + for k, v in obj.items() + if k not in exclude + } + elif isinstance(obj, (list, tuple)): + return [ + ( + to_dict(str(v), ignore_callable=ignore_callable, _depth=_depth + 1) + if isinstance(v, no_recursive) + else to_dict(v, exclude, no_recursive, ignore_callable, _depth=_depth + 1) + ) + for v in obj + ] + elif hasattr(obj, "__dict__"): + return { + str(k): ( + to_dict(str(v), ignore_callable=ignore_callable, _depth=_depth + 1) + if isinstance(v, no_recursive) + else to_dict(v, exclude, no_recursive, ignore_callable=ignore_callable, _depth=_depth + 1) + ) + for k, v in vars(obj).items() + if k not in exclude + } + else: + return obj + except RecursionError: + return f"RECURSION ERROR: {type(obj).__name__}" diff --git a/autogen/logger/sqlite_logger.py b/autogen/logger/sqlite_logger.py index d8f8e4b834ff..4e6f55baf9d9 100644 --- a/autogen/logger/sqlite_logger.py +++ b/autogen/logger/sqlite_logger.py @@ -123,6 +123,19 @@ class TEXT, -- type or class name of cli """ self._run_query(query=query) + query = """ + CREATE TABLE IF NOT EXISTS custom_clients ( + id INTEGER PRIMARY KEY, -- Key assigned by the database + client_id INTEGER, -- result of python id(client) + wrapper_id INTEGER, -- result of python id(wrapper) + session_id TEXT, + model_client_class TEXT, -- type or class name of client + init_args TEXT, -- JSON serialization of constructor + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE(client_id, session_id)) + """ + self._run_query(query=query) + query = """ CREATE TABLE IF NOT EXISTS version ( id INTEGER PRIMARY KEY CHECK (id = 1), -- id of the logging database @@ -157,6 +170,18 @@ class TEXT, -- type or class name of cli """ self._run_query(query=query) + query = """ + CREATE TABLE IF NOT EXISTS flows ( + source_id INTEGER, + source_name TEXT, + code_point TEXT, + code_point_id TEXT, + info TEXT DEFAULT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ); + """ + self._run_query(query=query) + current_verion = self._get_current_db_version() if current_verion is None: self._run_query( @@ -449,6 +474,57 @@ def log_new_client( ) self._run_query(query=query, args=args) + def log_new_custom_client( + self, client: Any, wrapper: OpenAIWrapper, init_args: Dict[str, Any], model_client_cls_name: str + ) -> None: + if self.con is None: + return + + args = to_dict( + init_args, + exclude=( + "self", + "__class__", + "api_key", + ), + ) + + query = """ + INSERT INTO custom_clients (client_id, wrapper_id, session_id, model_client_class, init_args, timestamp) VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (client_id, session_id) DO NOTHING; + """ + args = ( + id(client), + id(wrapper), + self.session_id, + model_client_cls_name, + json.dumps(args), + get_current_ts(), + ) + self._run_query(query=query, args=args) + + def log_flow( + self, source: Union[str, Agent], code_point: str, code_point_id: str, **kwargs: Dict[str, Any] + ) -> None: + + if self.con is None: + return + + json_args = json.dumps(kwargs, default=lambda o: f"<>") + + query = """ + INSERT INTO flows (source_id, source_name, code_point, code_point_id, info, timestamp) VALUES (?, ?, ?, ?, ?, ?) + """ + query_args: Tuple[Any, ...] = ( + id(source), + source.name if hasattr(source, "name") else source, + code_point, + code_point_id, + json_args, + get_current_ts(), + ) + self._run_query(query=query, args=query_args) + def stop(self) -> None: if self.con: self.con.close() diff --git a/autogen/oai/client.py b/autogen/oai/client.py index 87916319d082..3df69f022beb 100644 --- a/autogen/oai/client.py +++ b/autogen/oai/client.py @@ -19,7 +19,13 @@ from autogen.io.base import IOStream from autogen.logger.logger_utils import get_current_ts from autogen.oai.openai_utils import OAI_PRICE1K, get_key, is_valid_api_key -from autogen.runtime_logging import log_chat_completion, log_new_client, log_new_wrapper, logging_enabled +from autogen.runtime_logging import ( + log_chat_completion, + log_new_client, + log_new_custom_client, + log_new_wrapper, + logging_enabled, +) from autogen.token_count_utils import count_token TOOL_ENABLED = False @@ -554,11 +560,13 @@ def _register_default_client(self, config: Dict[str, Any], openai_config: Dict[s if model_client_cls_name is not None: # a config for a custom client is set # adding placeholder until the register_model_client is called with the appropriate class - self._clients.append(PlaceHolderClient(config)) + custom_client = PlaceHolderClient(config) + self._clients.append(custom_client) logger.info( f"Detected custom model client in config: {model_client_cls_name}, model client can not be used until register_model_client is called." ) - # TODO: logging for custom client + if logging_enabled(): + log_new_custom_client(custom_client, self, config, model_client_cls_name) else: if api_type is not None and api_type.startswith("azure"): self._configure_azure_openai(config, openai_config) diff --git a/autogen/runtime_logging.py b/autogen/runtime_logging.py index 73bd62120e10..c5422bd97815 100644 --- a/autogen/runtime_logging.py +++ b/autogen/runtime_logging.py @@ -144,6 +144,24 @@ def log_new_client( autogen_logger.log_new_client(client, wrapper, init_args) +def log_new_custom_client( + client: Any, wrapper: OpenAIWrapper, init_args: Dict[str, Any], model_client_cls_name: str +) -> None: + if autogen_logger is None: + logger.error("[runtime logging] log_new_client: autogen logger is None") + return + + autogen_logger.log_new_custom_client(client, wrapper, init_args, model_client_cls_name) + + +def log_flow(source: Union[str, Agent], code_point: str, code_point_id: str, **kwargs: Dict[str, Any]) -> None: + if autogen_logger is None: + logger.error("[runtime logging] log_new_flow: autogen logger is None") + return + + autogen_logger.log_flow(source, code_point, code_point_id, **kwargs) + + def stop() -> None: global is_logging if autogen_logger: diff --git a/autogen/visualize/__init__.py b/autogen/visualize/__init__.py new file mode 100644 index 000000000000..7ca774795c56 --- /dev/null +++ b/autogen/visualize/__init__.py @@ -0,0 +1,11 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/autogen-ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/microsoft/autogen are under the MIT License. +# SPDX-License-Identifier: MIT +from .visualization import Visualize + +__all__ = [ + "Visualize", +] diff --git a/autogen/visualize/visualization.py b/autogen/visualize/visualization.py new file mode 100644 index 000000000000..2371d6625e72 --- /dev/null +++ b/autogen/visualize/visualization.py @@ -0,0 +1,751 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/autogen-ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/microsoft/autogen are under the MIT License. +# SPDX-License-Identifier: MIT + +import re +from typing import Dict, List, Union +from uuid import uuid4 + +from graphviz import Digraph + +from ..logger import FileLogger, SqliteLogger +from .visualize_base import LogAgent, LogClient, LogEvent, LogFlow, LogInvocation, LogSession +from .visualize_graphviz_utils import ( + add_agent_to_agent_edge, + add_agent_to_eventflow_edge, + add_agent_to_info_edge, + add_code_execution_to_agent_return_edge, + add_eventflow_to_agent_return_edge, + add_eventflow_to_eventflow_edge, + add_eventflow_to_node_edge, + add_invocation_to_agent_return_edge, + add_invocation_to_eventflow_return_edge, + add_node_agent, + add_node_code_execution, + add_node_custom_reply_func, + add_node_eventflow_reply_func_executed, + add_node_human, + add_node_info, + add_node_invocation, + add_node_start, + add_node_summary, + add_node_terminate, + agent_id_by_name, + assign_agent_color, + create_tooltip, + darken_color, + extract_code_exitcode, + extract_invocation_response, + gcm_id_by_groupchat, + truncate_string, +) +from .visualize_log_utils import load_log_file + + +class Visualize: + def __init__(self, log_source: Union[str, FileLogger, SqliteLogger]) -> None: + """Visualizes an AutoGen workflow + + Uses an AutoGen log or logger to create a diagram. Uses GraphViz engine for diagram production. + + Args: + log_source (Union[str, BaseLogger]): A string with a path/filename to a text log file or a Logger object based on BaseLogger + """ + + self.log_source = log_source + + if isinstance(log_source, str): + self.source = "logfile" + elif isinstance(log_source, FileLogger): + + if len(log_source.logger.handlers) != 0: + raise Exception("File logging must be stopped for visualization.") + + self.source = "filelogger" + elif isinstance(log_source, SqliteLogger): + self.source = "sqlitelogger" + else: + raise Exception("Invalid log source.") + + # Colours, fonts, shapes + # Future work to be able to provide your own style + # Shapes: https://graphviz.org/doc/info/shapes.html + self.design_config: Dict = { + "canvas_replace_bg": "#123456", # Note: colour will be replaced by "url(#bg_pattern)" which is a pattern defined in the SVG (added post-creation). Colour should be unique. + "canvas_pattern_bg": "#222222", + "canvas_pattern_color": "#2A2A2A", + "nested_bg": "#18184F", + "groupchat_bg": "#004F4F", + "start_bg": "#222222", + "start_font_color": "#FFFFFF", + "start_border_color": "#6666FF", + "fill_color": "#DDFFF7", + "border_color": "#00BE92", + "font_color": "#FAFAFA", + "node_font_color": "#222222", + "edge_color": "#6666FF", + "edge_success_color": "#00FF00", + "edge_unsuccessful_color": "#FF0000", + "edge_style": "solid", + "edge_style_silent": "dashed", + "font_names": "Helvetica, DejaVu Sans, Arial, Courier, sans-serif", + "label_distance": "5.0", + "node_pen_width": "3.0", + "node_shape": { + "agent": "oval", + "summary": "parallelogram", + "terminate": "octagon", + "invocation": "invhouse", + "info": "note", + "code_execution": "cds", + "custom_reply_func": "septagon", + "human": "Mdiamond", + }, + } + + def create_svg(self, output_path_filename: str, cleanup: bool = True): + """Creates a diagram in SVG format + + Args: + path_filename (str): Path and Filename to create. Removes .svg if it exists (as GraphViz adds it) + cleanup (bool): Remove the GraphViz DOT file (which is created alongside the SVG) + """ + + # Append .svg if needed + if output_path_filename.lower().endswith(".svg"): + output_path_filename = output_path_filename[:-4] + + # Ensure the file exists + + # Objects to populate from log + clients: Dict[int, LogClient] = {} + agents: Dict[int, LogAgent] = {} + events: Dict[float, LogEvent] = {} + flows: Dict[float, LogFlow] = {} + invocations: Dict[str, LogInvocation] = {} + all_ordered: List[LogClient | LogAgent | LogEvent | LogFlow | LogInvocation | LogSession] = [] + + # Keep track of wrapper_ids and client_ids, this can help us reconcile when a client_id doesn't exist we can use a previous client with the same wrapper + wrapper_clients: Dict[int, List] = {} + + if self.source == "logfile" or self.source == "filelogger": + + load_log_file( + self.log_source if self.source == "logfile" else self.log_source.log_file, + clients, + wrapper_clients, + agents, + events, + flows, + invocations, + all_ordered, + ) + + # Visualise the logs + final_dot = self.visualize_execution( + clients, wrapper_clients, agents, events, flows, invocations, all_ordered + ) + + # Render the diagram to file + file_path = final_dot.render(filename=output_path_filename, format="svg", cleanup=cleanup) + + # Post-visualisation, add background, bold content with anchors + self.svg_post_process(file_path) + else: + raise "Sorry, only supporting text log files for now" + + def svg_post_process(self, output_path_filename: str): + """Post-processing the SVG, adding the background pattern + + Args: + output_path_filename (str): SVG file to process + """ + + background_pattern = ( + 'AG' + ) + + # Read the original SVG content + with open(output_path_filename, "r") as file: + svg_content = file.read() + + # Check if tag exists, if not, insert it before the closing tag + if "" not in svg_content: + insert_position = svg_content.find("") + if insert_position == -1: + raise ValueError("Invalid SVG file: no closing tag found.") + + # Add section and new content + defs_section = f"{background_pattern}\n" + updated_svg = svg_content[:insert_position] + defs_section + svg_content[insert_position:] + else: + # If exists, insert the content inside it + insert_position = svg_content.find("") + len("") + updated_svg = svg_content[:insert_position] + background_pattern + svg_content[insert_position:] + + # Update the background fill + updated_svg = updated_svg.replace(self.design_config["canvas_replace_bg"], "url(#bg_pattern)") + + # Bold text with anchors + updated_svg = Visualize.svg_highlight_anchors(updated_svg) + + # Write the updated SVG content to a new file or overwrite the original + with open(output_path_filename, "w") as file: + file.write(updated_svg) + + @staticmethod + def svg_highlight_anchors(svg_string: str): + """Bolds text within anchors and changes cursor to pointer - to indicate hover over + + Args: + svg_string (str): SVG file content + """ + + def modify_anchor_and_text(match): + a_tag = match.group(1) + text_tag = match.group(2) + closing_a = match.group(3) + + # Add cursor: pointer to the anchor tag if it's not already there + if "style" not in a_tag: + a_tag = a_tag.replace(">", ' style="cursor: pointer;">', 1) + elif "cursor: pointer" not in a_tag: + a_tag = a_tag.replace('style="', 'style="cursor: pointer; ', 1) + + # Add font-weight: bold to the text tag if it's not already there + if "font-weight" not in text_tag: + text_tag = text_tag.replace("]*>)([\s\S]*?)([\s\S]*?)" + return re.sub(pattern, modify_anchor_and_text, svg_string) + + def visualize_execution( + self, + clients: Dict[int, LogClient], + wrapper_clients: Dict[int, List], + agents: Dict[int, LogAgent], + events: Dict[float, LogEvent], + flows: Dict[float, LogFlow], + invocations: Dict[str, LogInvocation], + all_ordered: List[LogClient | LogAgent | LogEvent | LogInvocation | LogSession], + ) -> Digraph: + """Create the diagram of the program execution + + Args: + clients (Dict[int, LogClient]): Log clients + wrapper_clients (Dict[int, List]): Log wrapper / clients + agents (Dict[int, LogAgent]): log agents + events (Dict[float, LogEvent]): Log events + flows (Dict[float, LogFlow]): Log flows + invocations (Dict[str, LogInvocation]): Log invocations + all_orderedList ([LogClient | LogAgent | LogEvent | LogFlow | LogInvocation | LogSession]): All log objects in order + + Returns: + A GraphViz digraph + """ + + def summary_text(summary: Union[str, dict]) -> str: + """Returns the summary text for a summary result""" + + if isinstance(summary, str): + return summary + elif isinstance(summary, dict): + if "summary" in summary: + return summary["summary"]["content"] + elif "content" in summary: + return summary["content"] + else: + raise "Can't summarise!" + else: + raise "Can't summarise" + + def create_nested_digraph(nested_chat_node_name: str, label: str, color: str) -> Digraph: + """Creates a nested digraph""" + nested_graph = Digraph(nested_chat_node_name) + nested_graph.attr( + style="rounded, filled", + color=darken_color(color, 0.1), + fillcolor=color, + label=label, + labeljust="r", + labelloc="b", + penwidth="5", + margin="35", + fontcolor=self.design_config["font_color"], + fontname=self.design_config["font_names"], + ) + return nested_graph + + def process_level( + top_level: bool, + current_level: Digraph, + level_id: str, + items: List, + start_index: int, + current_nested_chat_id: str, + parent_agent: LogAgent, + ) -> int: + current_agent: LogAgent = None + last_termination: LogEvent = None + # last_nested_summarize_level_and_event: List[Digraph, LogEvent] = None + available_invocations = [] + last_nested_agent: LogAgent = ( + None # Coming out of a nested chat, we track the last agent to see if we link to it on the next event + ) + + i = start_index + while i < len(items): + item = items[i] + + if isinstance(item, LogClient): + client: LogClient = item + + # For each client, add the client_id to the wrapper dictionary so we can resolve any client_ids that don't appear in the log file + # Can occur with group chat's inner auto speaker select chat + if client.wrapper_id in wrapper_clients: + wrapper_clients[client.wrapper_id].append(client.client_id) + else: + wrapper_clients[client.wrapper_id] = [client.client_id] + + elif isinstance(item, LogAgent): + agent: LogAgent = agents[item.id] + + # Assign an number to the agent as we'll be repeating the agent throughout the diagram + # Every new instance will have an incremented number + if "index" not in agent.visualization_params: + agent.visualization_params["index"] = 0 + + # Assign a colour to the agent + if agent.id not in agent_colors: + assign_agent_color(agent_colors, agent.id) + agent.visualization_params["color"] = agent_colors[agent.id] + + elif isinstance(item, LogFlow): + flow: LogFlow = item + + if flow.code_point == "_summary_from_nested_chat start": + # Start of a nested chat + + nested_chat_id = flow.code_point_id + next_level_id = str(uuid4()) + nested_chat_node_name = f"cluster_{next_level_id}" + + new_nested = create_nested_digraph( + nested_chat_node_name, "Nested Chat", self.design_config["nested_bg"] + ) + + nested_chats[nested_chat_id] = { + "type": "Nested Chat", + "edge_label": "Nested Chat", + "level_index": next_level_id, + "parent_agent_node_id": agent_id_by_name(agents, flow.source_name), + "nested_chat_node_name": nested_chat_node_name, + "linked_to_parent_agent_node": False, + } + nested_graphs[next_level_id] = new_nested + + i, last_nested_agent = process_level( + False, new_nested, next_level_id, items, i + 1, nested_chat_id, current_agent + ) + current_level.subgraph(new_nested) + continue + + elif flow.code_point in [ + "_auto_select_speaker start", + "a_auto_select_speaker start", + ]: + # Start of a group chat auto select speaker + + nested_chat_id = flow.code_point_id + next_level_id = str(uuid4()) + nested_chat_node_name = f"cluster_{next_level_id}" + + new_nested = create_nested_digraph( + nested_chat_node_name, "Group Chat Auto Select Speaker", self.design_config["groupchat_bg"] + ) + + nested_chats[nested_chat_id] = { + "type": "Group Chat", + "edge_label": "Auto Select Speaker", + "level_index": next_level_id, + "parent_agent_node_id": agent_id_by_name(agents, flow.info["selector"]), + "nested_chat_node_name": nested_chat_node_name, + "linked_to_parent_agent_node": False, + } + nested_graphs[next_level_id] = new_nested + + i, last_nested_agent = process_level( + False, new_nested, next_level_id, items, i + 1, nested_chat_id, current_agent + ) + current_level.subgraph(new_nested) + continue + + elif flow.code_point in [ + "_summary_from_nested_chat end", + "_auto_select_speaker end", + "a_auto_select_speaker end", + ]: + + # End the nested chat + return i + 1, current_agent + + elif flow.code_point == "_initiate_chat max_turns": + + # Maximum turns hit, add a termination node + add_node_terminate(self.design_config, current_level, flow) + + # Link it to the agent + add_agent_to_eventflow_edge( + self.design_config, + current_level, + current_agent, + flow, + f"Max turns hit ({flow.info['turns']})", + ) + + last_termination = flow + + elif ( + flow.code_point == "_reflection_with_llm_as_summary" + or flow.code_point == "_last_msg_as_summary" + ): + + # Add summary node + add_node_summary(self.design_config, current_level, flow) + + # Link it to the agent or termination (which ever is later) + summary = summary_text(flow.info["summary"]) + if ( + last_termination is not None + and last_termination.timestamp > current_agent.current_timestamp + ): + add_eventflow_to_eventflow_edge( + self.design_config, current_level, last_termination, flow, flow.code_point, summary + ) + else: + add_agent_to_eventflow_edge( + self.design_config, current_level, current_agent, flow, flow.code_point, summary + ) + + # If we have available invocations, add them to the agent in a return sequence + if len(available_invocations) > 0: + for invocation in available_invocations: + add_invocation_to_eventflow_return_edge( + self.design_config, current_level, flow, invocation, flow.code_point, summary + ) + + # Once added, we clear the available invocations + available_invocations.clear() + + # If we're at the final stage of a group chat auto select speaker, show the name of the agent selected + if ( + current_level.name is not None + and current_nested_chat_id in nested_chats + and nested_chats[current_nested_chat_id]["type"] == "Group Chat" + ): + node_id = add_node_info(self.design_config, current_level, truncate_string(summary, 30)) + add_eventflow_to_node_edge(self.design_config, current_level, flow, node_id, "next speaker") + + # Track last summarize event so we can connect it to the outside + # of the nested chat + # Not used at this stage but may need to be used to link back to summarise nodes + # last_nested_summarize_level_and_event = [current_level, event] + + elif ( + flow.code_point == "generate_tool_calls_reply" + or flow.code_point == "a_generate_tool_calls_reply" + ): + # Tool calls will be like invocations - it will call the tool and loop back + + # Add function call node + add_node_eventflow_reply_func_executed( + self.design_config, + dot, + flow, + flow.info["function_name"], + self.design_config["node_shape"]["code_execution"], + ) + + # Create the edge loop + add_eventflow_to_agent_return_edge( + self.design_config, + current_level, + current_agent, + flow, + flow.code_point, + flow.info["return_value"], + ) + + elif flow.code_point.startswith("_prepare_and_select_agents:callable:"): + # Group Chat callable speaker selection + + callable_name = flow.code_point.split(":")[2] + + # Get the group chat manager for this groupchat as the source + source_agent = agents[gcm_id_by_groupchat(agents, flow.info["groupchat_unique_id"])] + + # Add function call node + add_node_eventflow_reply_func_executed( + self.design_config, + dot, + flow, + callable_name, + self.design_config["node_shape"]["code_execution"], + ) + + # Create the edge loop + add_eventflow_to_agent_return_edge( + self.design_config, current_level, source_agent, flow, flow.info["next_agent"] + ) + + elif flow.code_point == "manual_select_speaker": + # TODO TODO + raise Exception("manual_select_speaker not handled.") + + elif flow.code_point == "random_select_speaker": + # TODO TODO + raise Exception("random_select_speaker not handled.") + + elif flow.code_point == "round_robin": + + next_agent_name = flow.info["next_agent"] + + if current_agent.agent_name == next_agent_name: + # Add info that this agent was selected by round robin + node_id = add_node_info(self.design_config, current_level, "Round Robin") + add_agent_to_info_edge( + self.design_config, current_level, current_agent, node_id, "selected" + ) + else: + print( + f"** flow.code_point == 'round_robin', current agent expected {next_agent_name}, not {current_agent.agent_name}. **" + ) + + elif flow.code_point.startswith("speaker_selection_method:"): + # TODO TODO + print(f"** {flow.code_point} not handled. **") + pass + + elif isinstance(item, LogEvent): + event: LogEvent = item + + if event.event_name == "received_message": + + """ + # If we want to ignore silent events restore, however we do need to show some for group chat. + if "silent" in event.json_state and event.json_state["silent"] == True: + # Ignore silent messages + i += 1 + continue + """ + + is_silent = "silent" in event.json_state and event.json_state["silent"] + + sender_agent = agents[agent_id_by_name(agents, event.json_state["sender"])] + recipient_agent = agents[agent_id_by_name(agents, event.source_name)] + + if top_level and current_agent is None: + add_node_agent(self.design_config, agents, current_level, sender_agent) + + if not top_level and not nested_chats[current_nested_chat_id]["linked_to_parent_agent_node"]: + if nested_chats[current_nested_chat_id]["parent_agent_node_id"] != sender_agent.id: + # If the first agent in this nested/group chat is different to the previous agent then we need to create the agent inside and link to it + add_node_agent(self.design_config, agents, current_level, sender_agent) + is_group_chat = nested_chats[current_nested_chat_id]["type"] == "Group Chat" + add_agent_to_agent_edge( + self.design_config, + agents, + nested_graphs[nested_chats[current_nested_chat_id]["level_index"]], + agents[nested_chats[current_nested_chat_id]["parent_agent_node_id"]], + sender_agent, + nested_chats[current_nested_chat_id]["edge_label"], + dir="both" if is_group_chat else "forward", + style=self.design_config["edge_style" if not is_silent else "edge_style_silent"], + ) + nested_chats[current_nested_chat_id]["linked_to_parent_agent_node"] = True + elif last_nested_agent is not None and last_nested_agent.id == sender_agent.id: + pass + + add_node_agent(self.design_config, agents, current_level, recipient_agent) + + edge_text = event.event_name + tooltip_text = create_tooltip(event.json_state["message"]) + add_agent_to_agent_edge( + self.design_config, + agents, + current_level, + sender_agent, + recipient_agent, + edge_text, + tooltip_text, + style=self.design_config["edge_style" if not is_silent else "edge_style_silent"], + ) + + current_agent = recipient_agent + + elif event.event_name == "reply_func_executed": + # Reply function executed, such as termination, generate oai reply, etc. + + # We'll only consider the final ones for now + if event.json_state["final"]: + # Get the name of the function + reply_func_name = event.json_state["reply_func_name"] + + # Specific function handling + if reply_func_name == "check_termination_and_human_reply": + + if event.json_state["reply"] is None: + # No human reply or human chose to exit, so this is a termination. + + # Add summary node + add_node_terminate(self.design_config, current_level, event) + + # Link it to the agent + add_agent_to_eventflow_edge( + self.design_config, current_level, current_agent, event, reply_func_name + ) + + last_termination = event + else: + # We have a human reply + + # Add human node + add_node_human(self.design_config, current_level, event) + + # Link it to the agent + source_agent = agents[agent_id_by_name(agents, event.source_name)] + add_eventflow_to_agent_return_edge( + self.design_config, + current_level, + source_agent, + event, + reply_func_name, + event.json_state["reply"]["content"], + ) + + """ + # Add a human node (which we'll denote as a human) + if not _have_human_agent(agents): + human_agent = _add_human_agent(agents) + + if not "index" in human_agent.visualization_params: + human_agent.visualization_params["index"] = 0 + + # Assign a colour to the agent + if not human_agent.id in agent_colors: + assign_agent_color(agent_colors, human_agent.id) + human_agent.visualization_params["color"] = agent_colors[human_agent.id] + else: + human_agent = _get_human_agent(agents) + + # Link to the human node + add_agent_to_agent_edge(self.design_config, agents, current_level, current_agent, human_agent, reply_func_name, event.json_state['reply']['content']) + + current_agent = human_agent + """ + + elif reply_func_name == "_summary_from_nested_chats": + + # This may be necessary - review during development for first release. + # + # Add recipient agent to diagram + # recipient_agent = agents[agent_id_by_name(agents, event.source_name)] + # add_node_agent(self.design_config, agents, current_level, recipient_agent) + + # _add_event_to_agent_edge(self.design_config, current_level, last_nested_summarize_level_and_event[1], recipient_agent, reply_func_name, event.json_state['reply']) + + pass + + elif reply_func_name == "generate_code_execution_reply": # Executed code + exitcode = extract_code_exitcode(event.json_state["reply"]) + + if exitcode != -1: + # Add the exitcode node and result + + executing_agent = agents[agent_id_by_name(agents, event.source_name)] + + add_node_code_execution( + self.design_config, current_level, event, exitcode, event.json_state["reply"] + ) + + # Link it to the agent + add_code_execution_to_agent_return_edge( + self.design_config, current_level, executing_agent, event, exitcode + ) + + elif not event.json_state["reply_func_module"].startswith("autogen."): + # Custom reply function, assuming it's not part of the AutoGen package + # Keep this elif at the bottom + + add_node_custom_reply_func(self.design_config, current_level, event, reply_func_name) + add_eventflow_to_agent_return_edge( + self.design_config, + current_level, + current_agent, + event, + "", + create_tooltip(event.json_state["reply"]), + ) + + # If we have available invocations, add them to the agent in a return sequence + if len(available_invocations) > 0: + for invocation in available_invocations: + # Add them to the source agent + source_agent = agents[agent_id_by_name(agents, event.source_name)] + + add_invocation_to_agent_return_edge( + self.design_config, + current_level, + source_agent, + invocation, + reply_func_name, + extract_invocation_response(invocation), + ) + + # Once added, we clear the available invocations + available_invocations.clear() + + else: + pass + + elif isinstance(item, LogInvocation): + invocation: LogInvocation = item + + # Add the invocation node + add_node_invocation(self.design_config, clients, wrapper_clients, dot, invocation) + + available_invocations.append(invocation) + + i += 1 + + return i, None + + # Initialize the main diagram + dot = Digraph() + dot.attr(bgcolor=self.design_config["canvas_replace_bg"]) + + # Tracks each agent's node color + agent_colors = {} + + # Nested graphs (nested chat and group chat) + nested_chats: Dict[str, Dict] = {"0": {}} + nested_graphs: Dict[uuid4, Digraph] = {} + + # Start node + add_node_start(self.design_config, dot) + + # Process the entire diagram recursively, start with top level + process_level(True, dot, "0", all_ordered, 0, "0", None) + + return dot diff --git a/autogen/visualize/visualize_base.py b/autogen/visualize/visualize_base.py new file mode 100644 index 000000000000..019cb6e9ad8b --- /dev/null +++ b/autogen/visualize/visualize_base.py @@ -0,0 +1,212 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/autogen-ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/microsoft/autogen are under the MIT License. +# SPDX-License-Identifier: MIT +import json +from datetime import datetime +from typing import Any, Dict + + +class LogBase: + def __init__(self): + pass + + +class LogSession(LogBase): + def __init__(self, session_id: str): + super().__init__() + self.session_id = session_id + + +class LogClient(LogBase): + def __init__( + self, + client_id: int, + wrapper_id: int, + session_id: str, + class_name: str, + json_state: str, + timestamp: str, + thread_id: int, + is_custom_class: bool, + ): + super().__init__() + self.client_id = client_id + self.wrapper_id = wrapper_id + self.session_id = session_id + if class_name.endswith("Client"): + self.class_name = class_name.replace("Client", "") + else: + self.class_name = class_name + self.json_state = json_state + self.timestamp = timestamp + self.thread_id = thread_id + self.is_custom_class = is_custom_class + + def __str__(self): + return f"Client ({self.client_id}) - {self.class_name}" + + +class LogAgent(LogBase): + def __init__( + self, + id: int, + agent_name: str, + wrapper_id: int, + session_id: str, + current_time: str, + agent_type: str, + args: Dict, + thread_id: int, + ): + super().__init__() + self.id = id + self.agent_name = agent_name + self.wrapper_id = wrapper_id + self.session_id = session_id + self.current_time = current_time + self.current_timestamp = datetime.fromisoformat(current_time).timestamp() + self.agent_type = agent_type + self.args = args + self.thread_id = thread_id + + # Group chat object for a group chat manager + if self.agent_type == "GroupChatManager" and "self" in args and "_groupchat_sourceid" in args["self"]: + self.groupchat_source_id = self.args["self"]["_groupchat_sourceid"] + else: + self.groupchat_source_id = None + + self.visualization_params = {} # For tracking colours and what index we're up to + + def __str__(self): + return f"Agent ({self.id}) - {self.agent_name}" + + +class LogEvent(LogBase): + def __init__( + self, + source_id: int, + source_name: str, + event_name: str, + agent_module: str, + agent_class: str, + json_state: str, + timestamp: str, + thread_id: int, + ): + super().__init__() + self.event_id = _get_id_str(timestamp) + self.source_id = source_id + self.source_name = source_name + self.event_name = event_name + self.agent_module = agent_module + self.agent_class = agent_class + self.json_state = json.loads(json_state) if json_state else "{}" + self.timestamp = _to_unix_timestamp(timestamp) + self.thread_id = thread_id + + """ SAMPLE LOG ENTRIES + + event_name == "received_message" + # json_state + # '{"message": "We\'re launching a new drink, it\'s flavoured like water and is called \'h2-oh\'.\\nKey facts are:\\n- No calories, sugar-free\\n- Can be enjoyed hot or cold\\n- No expiry date\\n- One ingredient, water\\n\\nTargeted to everyone.\\n\\nPlease prepare a brief with this structure:\\nA. Taglines:\\n\\nB. Short-form video summary:\\n\\nC. Alternative product names:\\n", "sender": "user_proxy", "valid": true}' + + event_name == "reply_func_executed" + # json_state + # '{"reply_func_module": "autogen.agentchat.conversable_agent", "reply_func_name": "check_termination_and_human_reply", "final": false, "reply": null}' + # '{"reply_func_module": "autogen.agentchat.conversable_agent", "reply_func_name": "generate_function_call_reply", "final": false, "reply": null}' + # '{"reply_func_module": "autogen.agentchat.conversable_agent", "reply_func_name": "generate_tool_calls_reply", "final": false, "reply": null}' + # '{"reply_func_module": "autogen.agentchat.conversable_agent", "reply_func_name": "generate_oai_reply", "final": true, "reply": {"content": "Agency Red: \\n\\nHere\'s our pitch for the \'h2-oh\' campaign:\\n\\n**A. Taglines:**\\n1. \\"Pure, Simple, Refreshing\\"\\n2. \\"Water, Evolved.\\"\\n3. \\"The Drink that\'s Still Water.\\"\\n\\n**B. Short-form video summary:** \\n(60-second spot)\\n\\n[Scene 1: Close-up of a person taking a sip from a glass of \'h2-oh\']\\nNarrator (Voiceover): \\"We drink it every day, but never truly see it.\\"\\n[Scene 2: A splashy montage of people drinking \'h2-oh\' in different settings]\\nNarrator (Voiceover): \\"Introducing h2-oh, water that\'s still water.\\"\\n[Scene 3: Close-up of the \'h2-oh\' bottle with the label and tagline on screen]\\nNarrator (Voiceover): \\"No calories, no sugar, just pure refreshment.\\"\\n[Scene 4: People enjoying \'h2-oh\' hot and cold]\\nNarrator (Voiceover): \\"Enjoy it hot or cold, whenever you need it.\\"\\n[Scene 5: Close-up of the person taking a sip again with a satisfied expression]\\nNarrator (Voiceover): \\"Experience water, reimagined.\\"\\n\\n**C. Alternative product names:**\\n1. AquaFresh\\n2. Purezza\\n3. HydroFlow", "refusal": null, "role": "assistant", "function_call": null, "tool_calls": null}}' + """ + + def __str__(self): + return ( + f"Event ({self.timestamp}) - {self.source_name}, {self.event_name}, {self.agent_module}, {self.agent_class}" + ) + + +class LogFlow(LogBase): + def __init__( + self, + source_id: int, + source_name: str, + code_point: str, + code_point_id: str, + info: str, + timestamp: str, + thread_id: int, + ): + super().__init__() + self.flow_id = _get_id_str(timestamp) + self.source_id = source_id + self.source_name = source_name + self.code_point = code_point + self.code_point_id = code_point_id + self.info = json.loads(info) if info else "{}" + self.timestamp = _to_unix_timestamp(timestamp) + self.thread_id = thread_id + + """ SAMPLE LOG ENTRIES + + code_point == "_summary_from_nested_chat start" + # "nested_chat_id": "7cadd935-2408-4a6b-90e1-06a8b25b1a82" + # info + # '{"chat_queue": [{"recipient": "<>", "message": "<>", "summary_method": "last_msg", "max_turns": 1}], "sender": "agency_red"}' + + code_point == "_summary_from_nested_chat end" + # "nested_chat_id": "7cadd935-2408-4a6b-90e1-06a8b25b1a82" + + code_point == "generate_tool_calls_reply" + # info + # {"tool_call_id": "ollama_manual_func_1546", "function_name": "currency_calculator", "function_arguments": "{\"base_amount\": 123.45, \"base_currency\": \"EUR\", \"quote_currency\": \"USD\"}", "return_value": "135.80 USD", "sender": "chatbot"} + """ + + def __str__(self): + return f"Flow ({self.timestamp}) - {self.source_name}, {self.code_point}, {self.code_point_id}" + + +class LogInvocation(LogBase): + def __init__( + self, + invocation_id: str, + client_id: int, + wrapper_id: int, + request: Dict, + response: Any, + is_cached: int, + cost: float, + start_time: str, + end_time: str, + thread_id: int, + source_name: str, + ): + super().__init__() + self.invocation_id = invocation_id + self.client_id = client_id + self.wrapper_id = wrapper_id + self.request = request + self.response = response + self.is_cached = is_cached + self.cost = cost + self.start_time = start_time + self.end_time = end_time + self.thread_id = thread_id + self.source_name = source_name + + def __str__(self): + return f"Invocation ({self.invocation_id})" + + +# Timestamp will be key, convert to a number +def _to_unix_timestamp(timestamp_str: str) -> float: + """Convert unix timestamp to a float number""" + dt = datetime.fromisoformat(timestamp_str) + return dt.timestamp() + + +def _get_id_str(timestamp_str: str) -> str: + """Convert timestamp string to a float then to a string""" + id = str(_to_unix_timestamp(timestamp_str)) + return id diff --git a/autogen/visualize/visualize_graphviz_utils.py b/autogen/visualize/visualize_graphviz_utils.py new file mode 100644 index 000000000000..10a7ae9c65f9 --- /dev/null +++ b/autogen/visualize/visualize_graphviz_utils.py @@ -0,0 +1,691 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/autogen-ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/microsoft/autogen are under the MIT License. +# SPDX-License-Identifier: MIT + +# Functions to support visualisation around GraphViz and messages + +import json +import re +from typing import Dict, List, Union +from uuid import uuid4 + +from graphviz import Digraph + +from .visualize_base import LogAgent, LogClient, LogEvent, LogFlow, LogInvocation + +# for * imports, import all functions +__all__ = [ + "assign_agent_color", + "darken_color", + "extract_code_exitcode", + "agent_id_by_name", + "gcm_id_by_groupchat", + "client_by_id", + "has_agent_nodes", + "get_agent_node_id", + "add_node_start", + "add_node_agent", + "add_node_summary", + "add_node_terminate", + "add_node_code_execution", + "add_node_custom_reply_func", + "add_node_human", + "add_node_eventflow_reply_func_executed", + "add_node_invocation", + "add_node_info", + "add_agent_to_agent_edge", + "add_agent_to_eventflow_edge", + "add_agent_to_info_edge", + "add_eventflow_to_eventflow_edge", + "add_eventflow_to_node_edge", + "add_event_to_agent_edge", + "add_invocation_to_agent_return_edge", + "add_invocation_to_eventflow_return_edge", + "add_eventflow_to_agent_return_edge", + "add_code_execution_to_agent_return_edge", + "add_start_to_agent_edge", + "add_invocation_to_event_edge", + "add_agent_info_loop_edge", + "create_tooltip", + "truncate_string", + "extract_invocation_response", +] + + +def assign_agent_color(agent_colors, agent_id) -> str: + """Assigns a color to an agent in a deterministic order. + + Args: + agent_colors: A dictionary mapping agent ids to their assigned colors. + agent_name: The name of the agent to assign a color to. + + Returns: + The color assigned to the agent. + """ + available_colors = [ + "#FAF4D0", + "#C0DFB7", + "#EDB7AD", + "#FBDBD5", + "#E4EEE9", + "#CDD5C6", + "#A9C9D4", + "#E8C4C6", + "#EBCFB9", + "#FF0080", + "#808080", + "#ADD8E6", + "#90EE90", + "#FFFFE0", + ] + color_index = len(agent_colors) % len(available_colors) # Cycle through colors + color = available_colors[color_index] + agent_colors[agent_id] = color + return color + + +def darken_color(color, amount=0.1): + """Darkens a color by a given amount. + + Args: + color: The color to darken, as a hex string (e.g., '#FF0000'). + amount: The amount to darken (0.0 - 1.0). 0.0 is no change, 1.0 is maximum darkness. + + Returns: + The darkened color as a hex string. + """ + c = int(color[1:], 16) + r = max(0, int((c >> 16) - (255 * amount))) # Clamp to 0 + g = max(0, int(((c >> 8) & 0xFF) - (255 * amount))) # Clamp to 0 + b = max(0, int((c & 0xFF) - (255 * amount))) # Clamp to 0 + return f"#{r:02X}{g:02X}{b:02X}" + + +def extract_code_exitcode(log_message: str) -> int: + """Extracts the exitcode from a log message. The format is 'exitcode: 1'""" + match = re.search(r"exitcode:\s*(\d+)", log_message) + if match: + exit_code = int(match.group(1)) + return exit_code + else: + print("Unable to extract exitcode.") + return -1 # Unknown + + +def agent_id_by_name(agents: Dict[int, LogAgent], agent_name: str) -> int: + """Retrieves an agent id by their name""" + for agent in agents.values(): + if agent.agent_name == agent_name: + return agent.id + + raise Exception(f"Unknown agent, name: {agent_name}") + + +def gcm_id_by_groupchat(agents: Dict[int, LogAgent], groupchat_unique_id: str) -> int: + """Retrieves the groupchat manager for a groupchat""" + for agent in agents.values(): + if ( + agent.agent_type == "GroupChatManager" + and "groupchat" in agent.args + and "_unique_id" in agent.args["groupchat"] + and agent.args["groupchat"]["_unique_id"] == groupchat_unique_id + ): + return agent.id + + raise Exception(f"Couldn't find GroupChat Manager for GroupChat with unique_id: {groupchat_unique_id}") + + +def client_by_id( + clients: Dict[int, LogClient], wrapper_clients: Dict[int, List], client_id: int, wrapper_id: int +) -> LogClient: + """Retrieves a client by id. If it can't find it, look up a client based on the wrapper_id""" + if client_id in clients: + return clients[client_id] + elif wrapper_id in wrapper_clients: + return clients[wrapper_clients[wrapper_id][0]] # Return the first client in the wrapper + else: + raise Exception( + f"Unknown client, id: {client_id}. Additionally, can't find other clients with wrapper_id: {wrapper_id}" + ) + + +def has_agent_nodes(agents: Dict[int, LogAgent]): + """Are there agent nodes""" + + for agent in agents.values(): + if len(agent.visualization_params) != 0: + if agent.visualization_params["index"] != 0: + return True + + return False + + +def get_agent_node_id(agent: LogAgent) -> str: + """Gets the unique node id for an agent node""" + return f"{agent.id}_{agent.visualization_params['index']}" + + +def format_tooltip(tooltip: str) -> str: + """Validates that a tooltip is valid, particularly the length which can't be more than 16K""" + if len(tooltip) > 16384: + return tooltip[:16384] + else: + return tooltip + + +def add_node_start(design_config: Dict, dot: Digraph): + + dot.node( + "start", + "START", + color=design_config["start_border_color"], + style="filled", + fillcolor=design_config["start_bg"], + fontcolor=design_config["start_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_agent(design_config: Dict, agents: Dict[int, LogAgent], dot: Digraph, agent: LogAgent): + """Add an agent node to the diagram""" + + starting_point = not has_agent_nodes(agents) + + # Increment the index of the agent + agent.visualization_params["index"] = agent.visualization_params["index"] + 1 + + # Create a unique node id + node_id = get_agent_node_id(agent) + + if starting_point: + # If this is the start of the program, add a start node first and then link to it once we have the agent node + add_node_start(design_config, dot) + + # Add the node to the diagram + color = agent.visualization_params["color"] + dot.node( + node_id, + f"{agent.agent_name} ({agent.visualization_params['index']})", + shape=design_config["node_shape"]["agent"], + color=darken_color(color, 0.2), + style="filled", + fillcolor=color, + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + if starting_point: + # Link the start to the agent + add_start_to_agent_edge(design_config, dot, agent) + + +def add_node_summary(design_config: Dict, dot: Digraph, eventflow: Union[LogEvent | LogFlow]): + """Add a summary node to the diagram""" + + dot.node( + _get_eventflow_id(eventflow), + "Summarize", + shape=design_config["node_shape"]["summary"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_terminate(design_config: Dict, dot: Digraph, eventflow: Union[LogEvent | LogFlow]): + """Add a termination node to the diagram""" + + dot.node( + _get_eventflow_id(eventflow), + "Termination", + shape=design_config["node_shape"]["terminate"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_code_execution( + design_config: Dict, dot: Digraph, event: LogEvent, exitcode: int, tooltip_text: str = "", href_text: str = "" +): + """Add a code execution node to the diagram""" + + edge_color = design_config["edge_success_color"] if exitcode == 0 else design_config["edge_unsuccessful_color"] + + dot.node( + event.event_id, + "Code Execution", + shape=design_config["node_shape"]["code_execution"], + tooltip=format_tooltip(tooltip_text), + href_text=href_text, + color=edge_color, + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_custom_reply_func(design_config: Dict, dot: Digraph, event: LogEvent, reply_func_name: str): + """Add a custom reply function event node to the diagram""" + + dot.node( + event.event_id, + reply_func_name, + shape=design_config["node_shape"]["custom_reply_func"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_human(design_config: Dict, dot: Digraph, event: LogEvent): + """Add a human input node to the diagram""" + + dot.node( + event.event_id, + "Human Reply", + shape=design_config["node_shape"]["human"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_eventflow_reply_func_executed( + design_config: Dict, dot: Digraph, eventflow: Union[LogEvent | LogFlow], label: str, shape_name: str +): + """Add an event node to the diagram""" + + dot.node( + _get_eventflow_id(eventflow), + label, + shape=shape_name, + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_invocation( + design_config: Dict, + clients: Dict[int, LogClient], + wrapper_clients: Dict[int, List], + dot: Digraph, + invocation: LogInvocation, +): + """Add an invocation node to the diagram""" + + client_name = client_by_id(clients, wrapper_clients, invocation.client_id, invocation.wrapper_id).class_name + + dot.node( + invocation.invocation_id, + client_name, + shape=design_config["node_shape"]["invocation"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + +def add_node_info(design_config: Dict, dot: Digraph, label: str) -> str: + """Add an info node to the diagram an returns the name of the node""" + + new_id = str(uuid4()) + + dot.node( + new_id, + label, + shape=design_config["node_shape"]["info"], + color=design_config["border_color"], + style="filled", + fillcolor=design_config["fill_color"], + fontcolor=design_config["node_font_color"], + fontname=design_config["font_names"], + penwidth=design_config["node_pen_width"], + ) + + return new_id + + +def add_agent_to_agent_edge( + design_config: Dict, + agents: Dict[int, LogAgent], + dot: Digraph, + sender_agent: LogAgent, + recipient_agent: LogAgent, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", + dir: str = "forward", + style: str = "solid", +): + """Adds an edge between nodes""" + + # Ensure the agent nodes exist (e.g. they aren't index 0) + if sender_agent.visualization_params["index"] == 0: + add_node_agent(design_config, agents, dot, sender_agent) + + if recipient_agent.visualization_params["index"] == 0: + add_node_agent(design_config, agents, dot, recipient_agent) + + dot.edge( + get_agent_node_id(sender_agent), + get_agent_node_id(recipient_agent), + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + dir=dir, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + style=style, + ) + + +def add_agent_to_eventflow_edge( + design_config: Dict, + dot: Digraph, + agent: LogAgent, + eventflow: Union[LogEvent | LogFlow], + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between an agent and an event""" + + dot.edge( + get_agent_node_id(agent), + _get_eventflow_id(eventflow), + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + + +def add_agent_to_info_edge( + design_config: Dict, + dot: Digraph, + agent: LogAgent, + node_id: str, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between an agent and an event""" + + dot.edge( + get_agent_node_id(agent), + node_id, + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + + +def add_eventflow_to_eventflow_edge( + design_config: Dict, + dot: Digraph, + eventflow_one: Union[LogEvent | LogEvent], + eventflow_two: Union[LogEvent | LogEvent], + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between two events""" + + dot.edge( + _get_eventflow_id(eventflow_one), + _get_eventflow_id(eventflow_two), + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + + +def add_eventflow_to_node_edge( + design_config: Dict, + dot: Digraph, + eventflow: Union[LogEvent | LogFlow], + node_id: str, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between an event and a node""" + + dot.edge( + _get_eventflow_id(eventflow), + node_id, + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + + +def add_event_to_agent_edge( + design_config: Dict, + dot: Digraph, + event: LogEvent, + agent: LogAgent, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between an event and an agent""" + + dot.edge( + event.event_id, + get_agent_node_id(agent), + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + + +def add_invocation_to_agent_return_edge( + design_config: Dict, + dot: Digraph, + agent: LogAgent, + invocation: LogInvocation, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between agent and invocation and a return edge""" + + dot.edge( + get_agent_node_id(agent), + invocation.invocation_id, + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + dot.edge(invocation.invocation_id, get_agent_node_id(agent), color=design_config["edge_color"]) + + +def add_invocation_to_eventflow_return_edge( + design_config: Dict, + dot: Digraph, + eventflow: Union[LogEvent | LogFlow], + invocation: LogInvocation, + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between event and invocation and a return edge""" + + dot.edge( + _get_eventflow_id(eventflow), + invocation.invocation_id, + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + dot.edge(invocation.invocation_id, _get_eventflow_id(eventflow), color=design_config["edge_color"]) + + +def add_eventflow_to_agent_return_edge( + design_config: Dict, + dot: Digraph, + agent: LogAgent, + eventflow: Union[LogEvent | LogFlow], + edge_text: str, + tooltip_text: str = "", + href_text: str = "", +): + """Adds an edge between agent and event/flow and a return edge""" + + dot.edge( + get_agent_node_id(agent), + _get_eventflow_id(eventflow), + label=edge_text, + labeltooltip=format_tooltip(tooltip_text), + labelhref=href_text, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + color=design_config["edge_color"], + fontname=design_config["font_names"], + ) + dot.edge(_get_eventflow_id(eventflow), get_agent_node_id(agent), color=design_config["edge_color"]) + + +def add_code_execution_to_agent_return_edge( + design_config: Dict, dot: Digraph, agent: LogAgent, event: LogEvent, exitcode: int +): + """Adds an edge between agent and code execution and a return edge with result information""" + + label_text = "Success" if exitcode == 0 else "Unsuccessful" + edge_color = design_config["edge_success_color"] if exitcode == 0 else design_config["edge_unsuccessful_color"] + + dot.edge( + event.event_id, + get_agent_node_id(agent), + label=label_text, + color=edge_color, + labeldistance=design_config["label_distance"], + fontcolor=design_config["font_color"], + fontname=design_config["font_names"], + dir="both", + ) + + +def add_start_to_agent_edge(design_config: Dict, dot: Digraph, agent: LogAgent): + """Adds an edge from the start node to an agent""" + + dot.edge("start", get_agent_node_id(agent), color=design_config["edge_color"]) + + +def add_invocation_to_event_edge( + design_config: Dict, dot: Digraph, event: LogEvent, invocation: LogInvocation, label: str +): + """Adds an edge between an event and an invocation""" + + dot.edge(event.event_id, invocation.invocation_id, label, color=design_config["edge_color"]) + + +def add_agent_info_loop_edge( + design_config: Dict, dot: Digraph, agent: LogAgent, edge_text: str, tooltip_text: str = "", href_text: str = "" +): + """Adds an information-only loop edge to/from an agent""" + + dot.edge(get_agent_node_id(agent), get_agent_node_id(agent), label=edge_text, color=design_config["edge_color"]) + + +def create_tooltip(message): + """Create tool tips based on a message""" + if isinstance(message, str): + return message + elif isinstance(message, dict): + if isinstance(message["content"], list): + # If it's a list, just serialise it + return str(message["content"]) + else: + tooltip_text = message["content"] if message["content"] else "" + if "tool_calls" in message and message["tool_calls"] is not None: + for tool_call in message["tool_calls"]: + tooltip_text += f"\nTool call: {json.dumps(tool_call['function'])}" + return tooltip_text + else: + return "Unable to create tooltip" + + +def truncate_string(string, max_length): + """Keeps string lengths to a max length""" + if len(string) <= max_length: + return string + else: + return string[: max_length - 3] + "..." + + +def extract_invocation_response(invocation: LogInvocation) -> str: + """Extract an invocation's response and return as a string""" + + if "ChatCompletionMessage" in invocation.response: + pattern = r'ChatCompletionMessage\(content=(\'|")(.+?)(? Union[LogSession, LogClient, LogEvent, LogFlow, LogAgent, LogInvocation, None]: + """Parses a line of log data and returns the corresponding object. + + Args: + line_data (Dict): The log data as a dictionary. + + Returns: + Union[LogSession, LogClient, LogEvent, LogFlow, LogAgent, LogInvocation, None]: The parsed object, or None if the line cannot be parsed. + """ + + # Check for session ID + if "Session ID:" in line_data: + session_id = line_data.split("Session ID: ")[-1].strip() + return LogSession(session_id) + + # Check for client data + if next(iter(line_data)) == "client_id" and ( + "class" in line_data or "model_client_class" in line_data + ): # First key is client_id + return LogClient( + client_id=line_data.get("client_id"), + wrapper_id=line_data.get("wrapper_id"), + session_id=line_data.get("session_id"), + class_name=line_data.get("class") if "class" in line_data else line_data.get("model_client_class"), + json_state=line_data.get("json_state"), + timestamp=line_data.get("timestamp"), + thread_id=line_data.get("thread_id"), + is_custom_class=False if "class" in line_data else True, + ) + + # Check for agent data + if next(iter(line_data)) == "id" and "agent_name" in line_data: + return LogAgent( + id=line_data.get("id"), + agent_name=line_data.get("agent_name"), + wrapper_id=line_data.get("wrapper_id"), + session_id=line_data.get("session_id"), + current_time=line_data.get("current_time"), + agent_type=line_data.get("agent_type"), + args=line_data.get("args"), + thread_id=line_data.get("thread_id"), + ) + + # Check for flow data + if next(iter(line_data)) == "source_id" and "code_point" in line_data: + return LogFlow( + source_id=line_data.get("source_id"), + source_name=line_data.get("source_name"), + code_point=line_data.get("code_point"), + code_point_id=line_data.get("code_point_id"), + info=line_data.get("info"), + timestamp=line_data.get("timestamp"), + thread_id=line_data.get("thread_id"), + ) + + # Check for event data + if next(iter(line_data)) == "source_id" and "source_name" in line_data: + return LogEvent( + source_id=line_data.get("source_id"), + source_name=line_data.get("source_name"), + event_name=line_data.get("event_name"), + agent_module=line_data.get("agent_module"), + agent_class=line_data.get("agent_class"), + json_state=line_data.get("json_state"), + timestamp=line_data.get("timestamp"), + thread_id=line_data.get("thread_id"), + ) + + # Check for invocation data + if "invocation_id" in line_data: + return LogInvocation( + invocation_id=line_data.get("invocation_id"), + client_id=line_data.get("client_id"), + wrapper_id=line_data.get("wrapper_id"), + request=line_data.get("request"), + response=line_data.get("response"), + is_cached=line_data.get("is_cached"), + cost=line_data.get("cost"), + start_time=line_data.get("start_time"), + end_time=line_data.get("end_time"), + thread_id=line_data.get("thread_id"), + source_name=line_data.get("source_name"), + ) + + return None + + +# Add Log_____ objects +def add_log_client(client: LogClient, log_clients: Dict[int, LogClient]) -> bool: + if client.client_id not in log_clients: + log_clients[client.client_id] = client + # print(f"Client {client.client_id} added - {client.class_name}.") + return True + else: + # print(f"Client {client.client_id} already exists. No duplicate added.") + return False + + +def add_log_agent(agent: LogAgent, log_agents: Dict[int, LogAgent]) -> bool: + if agent.id not in log_agents: + log_agents[agent.id] = agent + # print(f"Agent {agent.id} added - {agent.agent_name} ({agent.agent_type}).") + return True + else: + log_agents[agent.id] = agent + # print(f"Agent {agent.id} already exists. Updating.") + return False + + +# Function to add a new LogEvent to the dictionary +def add_log_event(event: LogEvent, log_events: Dict[float, LogEvent]) -> bool: + if event.timestamp not in log_events: + log_events[event.timestamp] = event + # print(f"Event at {event.timestamp} added - {event.source_name}, {event.event_name}.") + return True + else: + # print(f"Event at {event.timestamp} already exists. No duplicate added.") + return False + + +# Function to add a new LogFlow to the dictionary +def add_log_flow(flow: LogFlow, log_flows: Dict[float, LogFlow]) -> bool: + if flow.timestamp not in log_flows: + log_flows[flow.timestamp] = flow + # print(f"Flow at {flow.timestamp} added - {flow.code_point}, {flow.code_point_id}.") + return True + else: + # print(f"Flow at {flow.timestamp} already exists. No duplicate added.") + return False + + +def add_log_invocation(invocation: LogInvocation, log_invocations: Dict[int, LogInvocation]) -> bool: + if invocation.invocation_id not in log_invocations: + log_invocations[invocation.invocation_id] = invocation + # print(f"Invocation {invocation.invocation_id} added.") + return True + else: + # print(f"Invocation {invocation.invocation_id} already exists. No duplicate added.") + return False + + +def load_log_file( + file_and_path: str, + clients: Dict[int, LogClient], + wrapper_clients: Dict[int, List], + agents: Dict[int, LogAgent], + events: Dict[float, LogEvent], + flows: Dict[float, LogFlow], + invocations: Dict[str, LogInvocation], + all_ordered: List[LogClient | LogAgent | LogEvent | LogFlow | LogInvocation | LogSession], +) -> str: + """Loads an AutoGen log file into respective dictionaries and an ordered list, returning the session id""" + + # Read the log file + with open(file_and_path, "r") as file: + log_lines = file.readlines() + + # Iterate over each line in the log + for i, line in enumerate(log_lines): + # Extract the session ID + if line.startswith("Started new session with Session ID:"): + session_id = line.split(":")[-1].strip() + elif line.startswith("[file_logger]"): + # Ignore file logging errors + pass + else: + # Can it be converted to JSON + try: + line_data = json.loads(line) + + line_object = parse_log_line(line_data=line_data) + + new_object = False + + if isinstance(line_object, LogClient): + new_object = add_log_client(line_object, clients) + + if line_object.wrapper_id in wrapper_clients: + wrapper_clients[line_object.wrapper_id].append(line_object.client_id) + else: + wrapper_clients[line_object.wrapper_id] = [line_object.client_id] + + elif isinstance(line_object, LogAgent): + new_object = add_log_agent(line_object, agents) + elif isinstance(line_object, LogEvent): + new_object = add_log_event(line_object, events) + elif isinstance(line_object, LogFlow): + new_object = add_log_flow(line_object, flows) + elif isinstance(line_object, LogInvocation): + new_object = add_log_invocation(line_object, invocations) + else: + if "wrapper_id" not in line_data: + print(f"Unknown object for the line #{i+1} (ignoring): {line_data}") + + if new_object: + all_ordered.append(line_object) + + except json.JSONDecodeError: + print("Note - can't decode line.") + + return session_id diff --git a/setup.py b/setup.py index f8a1753d2888..284007692cc2 100644 --- a/setup.py +++ b/setup.py @@ -101,6 +101,7 @@ "cohere": ["cohere>=5.5.8"], "ollama": ["ollama>=0.3.2", "fix_busted_json>=0.0.18"], "bedrock": ["boto3>=1.34.149"], + "visualize": ["graphviz>=0.20.3"], } diff --git a/test/test_logging.py b/test/test_logging.py index 531cda1181dc..351567bb8ab5 100644 --- a/test/test_logging.py +++ b/test/test_logging.py @@ -152,6 +152,31 @@ def test_log_function_use(db_connection): assert row["returns"] == json.dumps(returns) +def test_log_flow(db_connection): + cur = db_connection.cursor() + + source = autogen.AssistantAgent(name="TestAgent", code_execution_config=False) + info = {"foo": "bar"} + + autogen.runtime_logging.log_flow( + source=source, + code_point="_summary_from_nested_chat start", + code_point_id="1bb623ba-f2d2-406a-844e-5103a500d038", + **info + ) + + query = """ + SELECT source_id, source_name, code_point, code_point_id, info, timestamp + FROM flows + """ + + for row in cur.execute(query): + assert row["source_name"] == "TestAgent" + assert row["code_point"] == "_summary_from_nested_chat start" + assert row["code_point_id"] == "1bb623ba-f2d2-406a-844e-5103a500d038" + assert row["info"] == json.dumps(info) + + def test_log_new_agent(db_connection): from autogen import AssistantAgent diff --git a/website/docs/installation/Optional-Dependencies.md b/website/docs/installation/Optional-Dependencies.md index 9af76c9ce21f..5685c5c36d5f 100644 --- a/website/docs/installation/Optional-Dependencies.md +++ b/website/docs/installation/Optional-Dependencies.md @@ -131,3 +131,11 @@ AutoGen includes support for handling long textual contexts by leveraging the LL ```bash pip install "autogen[long-context]" ``` + +## Visualize + +To generate diagrams from AutoGen workflow logs please install AutoGen with the `[visualize]` option: + +```bash +pip install "autogen[visualize]" +```