diff --git a/doc/changes/DM-50550.feature.md b/doc/changes/DM-50550.feature.md new file mode 100644 index 000000000..3e68ac182 --- /dev/null +++ b/doc/changes/DM-50550.feature.md @@ -0,0 +1 @@ +Add support for generating exception diagnostics with Data ID grouping by combining provenance and dimension records. \ No newline at end of file diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 5541326a6..3f9f21e5a 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,12 +42,16 @@ import concurrent.futures import dataclasses import datetime +import io import itertools import logging +import sys import textwrap import threading import uuid +from collections import defaultdict from collections.abc import Callable, Iterator, Mapping, Sequence, Set +from contextlib import contextmanager from enum import Enum from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast @@ -69,6 +73,7 @@ MissingDatasetTypeError, QuantumBackedButler, ) +from lsst.daf.butler.registry.queries import DimensionRecordQueryResults from lsst.resources import ResourcePathExpression from lsst.utils.logging import PeriodicLogger, getLogger @@ -655,7 +660,7 @@ def _add_quantum_info( self.recovered_quanta.append(dict(info["data_id"].required)) if final_quantum_run is not None and final_quantum_run.caveats: code = final_quantum_run.caveats.concise() - self.caveats.setdefault(code, []).append(dict(info["data_id"].required)) + self.caveats.setdefault(code, []).append(dict(info["data_id"].mapping)) if final_quantum_run.caveats & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: if final_quantum_run.exception is not None: self.exceptions.setdefault(final_quantum_run.exception.type_name, []).append( @@ -934,7 +939,34 @@ def aggregate(cls, summaries: Sequence[Summary]) -> Summary: result_dataset_summary._add_data_id_group(dataset_type_summary) return result - def pprint(self, brief: bool = False, datasets: bool = True) -> None: + @contextmanager + def tty_buffer(self) -> Iterator[io.StringIO]: + """Context manager that temporarily redirects sys.stdout to a + teletypewriter-like buffer. Useful for capturing output that formats + differently when writing to a TTY. + """ + + class MockTTY(io.StringIO): + # Pretend to be a terminal to capture full TTY output. + def isatty(self) -> bool: + return True + + orig = sys.stdout + buf = MockTTY() + sys.stdout = buf + try: + yield buf # Use buffer inside `with` block. + finally: + sys.stdout = orig # Restore original stdout. + + def pprint( + self, + brief: bool = False, + datasets: bool = True, + show_exception_diagnostics: bool = False, + butler: Butler | None = None, + return_exception_diagnostics_table: bool = False, + ) -> astropy.table.Table | None: """Print this summary to stdout, as a series of tables. Parameters @@ -948,6 +980,21 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None: includes a summary table of dataset counts for various status and (if ``brief`` is `True`) a table with per-data ID information for each unsuccessful or cursed dataset. + show_exception_diagnostics : `bool`, optional + If `True`, include a table of exception diagnostics in the output. + butler : `lsst.daf.butler.Butler`, optional + The butler used to create this summary. This is only used to get + exposure dimension records for the exception diagnostics. + return_exception_diagnostics_table : `bool`, optional + If `True`, return the exception diagnostics table in addition to + printing it. Only supported if ``show_exception_diagnostics`` is + `True`. + + Returns + ------- + exception_diagnostics_table : `astropy.table.Table` or `None` + A table of exception diagnostics, if requested and available. + Otherwise, `None`. """ self.make_quantum_table().pprint_all() print("") @@ -958,6 +1005,47 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None: if exception_table := self.make_exception_table(): exception_table.pprint_all() print("") + exception_diagnostics_table = None + if show_exception_diagnostics: + if return_exception_diagnostics_table: + # Keep an original copy of the table to be returned. + exception_diagnostics_table = self.make_exception_diagnostics_table(butler) + exception_diagnostics_table_view = exception_diagnostics_table.copy() + else: + exception_diagnostics_table_view = self.make_exception_diagnostics_table(butler) + if exception_diagnostics_table_view: + # Shorten the exception type name by trimming the module name. + exception_diagnostics_table_view["Exception"] = [ + val.rsplit(".", maxsplit=1)[-1] if val is not None else val + for val in exception_diagnostics_table_view["Exception"] + ] + # Shorten the exception message to a maximum width. + max_message_width = 45 + exception_diagnostics_table_view["Exception Message"] = [ + textwrap.shorten(msg, width=max_message_width, placeholder="...") + if msg and isinstance(msg, str) and len(msg) > max_message_width + else msg + for msg in exception_diagnostics_table_view["Exception Message"] + ] + with self.tty_buffer() as buffer: + # Use pprint() to trim long tables; pprint_all() may flood + # the screen in those cases. + exception_diagnostics_table_view.pprint() + last_line = buffer.getvalue().splitlines()[-1] + # Print the table from the buffer. + print(buffer.getvalue()) + if "Length =" in last_line: + # The table was too long to print, we had to truncate it. + print( + "▲ Note: The exception diagnostics table above is truncated. " + "Use --exception-diagnostics-filename to save the complete table." + ) + print("") + elif return_exception_diagnostics_table: + raise ValueError( + "The exception diagnostics table was requested to be returned, " + "but `show_exception_diagnostics` is False." + ) if datasets: self.make_dataset_table().pprint_all() print("") @@ -971,6 +1059,7 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None: print(f"{dataset_type_name} errors:") bad_dataset_table.pprint_all() print("") + return exception_diagnostics_table def make_quantum_table(self) -> astropy.table.Table: """Construct an `astropy.table.Table` with a tabular summary of the @@ -1075,6 +1164,141 @@ def make_exception_table(self) -> astropy.table.Table: rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)}) return astropy.table.Table(rows) + def make_exception_diagnostics_table( + self, + butler: Butler | None = None, + add_dimension_records: bool = True, + add_exception_msg: bool = True, + max_message_width: int | None = None, + shorten_type_name: bool = False, + ) -> astropy.table.Table: + """Construct an `astropy.table.Table` showing exceptions grouped by + data ID. + + Each row represents one data ID that encountered an exception, along + with the exception type under the column named after the task that + raised it. If a Butler is provided, the table will also include a + subset of exposure-related metadata pulled from the exposure dimension + records. The exception message can optionaly be included in the table. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler`, optional + Butler instance used to fetch dimension records. + add_dimension_records : `bool`, optional + If `True`, include visit and exposure dimension records in the + table, if available. This requires ``butler`` to be provided. + add_exception_msg : `bool`, optional + If `True`, include the exception message in the table. + max_message_width : `int`, optional + Maximum width for storing exception messages in the output table. + Longer messages will be truncated. If not provided, messages will + be included in full without truncation. + shorten_type_name : `bool`, optional + If `True`, shorten the exception type name by removing the + package name. This is useful for making the table more readable + when the package name is long or not relevant to the user. + + Returns + ------- + table : `astropy.table.Table` + Table with one row per data ID and columns for exception types (by + task), and optionally, exposure dimension records and exception + messages. + """ + if add_dimension_records and butler is None: + raise ValueError("Butler is required to fetch dimension records.") + + # The additional columns for visit and exposure records to add to the + # output table, if available. Note that 'band', 'day_obs', and + # 'physical_filter' already exist in `exception.data_id` below. + needed_visit_records = ["exposure_time", "target_name"] + needed_exposure_records = ["exposure_time", "target_name"] + + rows: defaultdict[tuple, defaultdict[str, str]] = defaultdict(lambda: defaultdict(str)) + exposure_data_ids: list[dict] = [] + visit_data_ids: list[dict] = [] + dimension_record_lookup: dict[str, DimensionRecordQueryResults] = {} + + # Loop over all tasks and exceptions, and associate them with data IDs. + for task_label, task_summary in self.tasks.items(): + for type_name, exceptions in task_summary.exceptions.items(): + for exception in exceptions: + data_id = DataCoordinate.standardize(exception.data_id, universe=butler.dimensions) + if add_dimension_records: + if "visit" in data_id: + visit_data_ids.append(data_id) + elif "exposure" in data_id: + exposure_data_ids.append(data_id) + # Define a hashable and stable tuple of data ID values. + key = tuple(sorted(data_id.mapping.items())) + assert len(rows[key]) == 0, f"Multiple exceptions for one data ID: {key}" + assert rows[key]["Exception"] == "", f"Duplicate entry for data ID {key} in {task_label}" + if shorten_type_name: + # Trim off the package name from the exception type for + # brevity. + type_name = type_name.rsplit(".", maxsplit=1)[-1] + rows[key]["Task"] = task_label + rows[key]["Exception"] = type_name + if add_exception_msg: + msg = exception.exception.message + if max_message_width and len(msg) > max_message_width: + msg = textwrap.shorten(msg, max_message_width) + rows[key]["Exception Message"] = msg + + if add_dimension_records and (visit_data_ids or exposure_data_ids): + # Preload all the dimension records up front for faster O(1) lookup + # later. Querying per data ID in the loop is painfully slow. These + # data IDs are limited to the ones that have exceptions. + with butler.query() as query: + query = query.join_data_coordinates(visit_data_ids + exposure_data_ids) + for element in ["visit", "exposure"]: + dimension_record_lookup |= { + f"{element}:{d.dataId[element]}": d for d in query.dimension_records(element) + } + + # Loop over the data IDs and fill in the dimension records. + for element, data_ids, needed_records in zip( + ["visit", "exposure"], + [visit_data_ids, exposure_data_ids], + [needed_visit_records, needed_exposure_records], + ): + for data_id in data_ids: + key = tuple(sorted(data_id.mapping.items())) + for k in needed_records: + rows[key][k] = getattr( + dimension_record_lookup[f"{element}:{data_id[element]}"], k, None + ) + + # Extract all unique data ID keys from the rows for the table header. + all_key_columns = {k for key in rows for k, _ in key} + + # Loop over all rows and add them to the table. + table_rows = [] + for key, values in rows.items(): + # Create a new row with all key columns initialized to None, + # allowing missing values to be properly masked when `masked=True`. + row = {col: None for col in all_key_columns} + # Fill in data ID fields from the key. + row.update(dict(key)) + # Add dimension records next, if requested and available. + if add_dimension_records: + if visit_data_ids: + for col in needed_visit_records: + row[col] = values.get(col, None) + if exposure_data_ids: + for col in needed_exposure_records: + row[col] = values.get(col, None) + # Add task label and exception type. + for col in ("Task", "Exception"): + row[col] = values.get(col, None) + # Add the exception message, if requested. + if add_exception_msg: + row["Exception Message"] = values.get("Exception Message", None) + table_rows.append(row) + + return astropy.table.Table(table_rows, masked=True) + def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: """Construct an `astropy.table.Table` with per-data-ID information about failed, wonky, and partial-outputs-error quanta. @@ -1295,7 +1519,7 @@ def to_summary( ---------- butler : `lsst.daf.butler.Butler`, optional Ignored; accepted for backwards compatibility. - do_store_logs : `bool` + do_store_logs : `bool`, optional Store the logs in the summary dictionary. n_cores : `int`, optional