From 3117fa787cfceeb775f855d3da0ed72ac10e4a4e Mon Sep 17 00:00:00 2001 From: Erfan Nourbakhsh Date: Thu, 8 May 2025 05:23:21 -0400 Subject: [PATCH 1/3] Add exception diagnostics table to quantum provenance --- .../pipe/base/quantum_provenance_graph.py | 159 +++++++++++++++++- 1 file changed, 157 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 5541326a6..c29230831 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,12 +42,16 @@ import concurrent.futures import dataclasses import datetime +import io import itertools import logging +import sys import textwrap import threading import uuid +from collections import defaultdict from collections.abc import Callable, Iterator, Mapping, Sequence, Set +from contextlib import contextmanager from enum import Enum from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast @@ -934,7 +938,33 @@ def aggregate(cls, summaries: Sequence[Summary]) -> Summary: result_dataset_summary._add_data_id_group(dataset_type_summary) return result - def pprint(self, brief: bool = False, datasets: bool = True) -> None: + @contextmanager + def tty_buffer(self) -> Iterator[io.StringIO]: + """Context manager that temporarily redirects sys.stdout to a + teletypewriter-like buffer. Useful for capturing output that formats + differently when writing to a TTY. + """ + + class MockTTY(io.StringIO): + # Pretend to be a terminal to capture full TTY output. + def isatty(self) -> bool: + return True + + orig = sys.stdout + buf = MockTTY() + sys.stdout = buf + try: + yield buf # Use buffer inside `with` block. + finally: + sys.stdout = orig # Restore original stdout. + + def pprint( + self, + brief: bool = False, + datasets: bool = True, + show_exception_diagnostics: bool = False, + butler: Butler | None = None, + ) -> None: """Print this summary to stdout, as a series of tables. Parameters @@ -948,6 +978,9 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None: includes a summary table of dataset counts for various status and (if ``brief`` is `True`) a table with per-data ID information for each unsuccessful or cursed dataset. + butler : `lsst.daf.butler.Butler`, optional + The butler used to create this summary. This is only used to get + exposure dimension records for the exception diagnostics. """ self.make_quantum_table().pprint_all() print("") @@ -958,6 +991,24 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None: if exception_table := self.make_exception_table(): exception_table.pprint_all() print("") + if show_exception_diagnostics: + exception_diagnostics_table = self.make_exception_diagnostics_table( + butler, max_message_width=45, shorten_type_name=True + ) + with self.tty_buffer() as buffer: + # Use pprint() to trim long tables; pprint_all() may flood the + # screen in those cases. + exception_diagnostics_table.pprint() + last_line = buffer.getvalue().splitlines()[-1] + # Print the table from the buffer. + print(buffer.getvalue()) + if "Length =" in last_line: + # The table was too long to print, so we had to truncate it. + print( + "▲ Note: The exception diagnostics table above is truncated. " + "Use --exception-diagnostics-filename to save the complete table." + ) + print("") if datasets: self.make_dataset_table().pprint_all() print("") @@ -1075,6 +1126,110 @@ def make_exception_table(self) -> astropy.table.Table: rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)}) return astropy.table.Table(rows) + def make_exception_diagnostics_table( + self, + butler: Butler | None = None, + add_exception_msg: bool = True, + max_message_width: int | None = None, + shorten_type_name: bool = False, + ) -> astropy.table.Table: + """Construct an `astropy.table.Table` showing exceptions grouped by + data ID. + + Each row represents one data ID that encountered an exception, along + with the exception type under the column named after the task that + raised it. If a Butler is provided, the table will also include a + subset of exposure-related metadata pulled from the exposure dimension + records. The exception message can optionaly be included in the table. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler`, optional + Butler instance used to fetch exposure records. If not provided, + exposure dimension records will not be included in the table. + add_exception_msg : `bool`, optional + If `True`, include the exception message in the table. + max_message_width : `int`, optional + Maximum width for storing exception messages in the output table. + Longer messages will be truncated. If not provided, messages will + be included in full without truncation. + shorten_type_name : `bool`, optional + If `True`, shorten the exception type name by removing the + package name. This is useful for making the table more readable + when the package name is long or not relevant to the user. + + Returns + ------- + table : `astropy.table.Table` + Table with one row per data ID and columns for exception types (by + task), and optionally, exposure dimension records and exception + messages. + """ + add_exposure_records = True + needed_exposure_records = ["day_obs", "physical_filter", "exposure_time", "target_name"] + + # Preload all exposure dimension records up front for faster O(1) + # lookup later. Querying per data ID in the loop is painfully slow. + if butler: + exposure_record_lookup = { + d.dataId["exposure"]: d for d in butler.query_dimension_records("exposure", explain=False) + } + else: + exposure_record_lookup = {} + add_exposure_records = False + + if butler and not exposure_record_lookup: + _LOG.warning("No exposure records found in the butler; they will not be included in the table.") + add_exposure_records = False + + rows: defaultdict[tuple, defaultdict[str, str]] = defaultdict(lambda: defaultdict(str)) + + # Loop over all tasks and exceptions, and associate them with data IDs. + for task_label, task_summary in self.tasks.items(): + for type_name, exceptions in task_summary.exceptions.items(): + for exception in exceptions: + data_id = exception.data_id + key = tuple(sorted(data_id.items())) # Hashable and stable + assert len(rows[key]) == 0, f"Multiple exceptions for one data ID: {key}" + assert rows[key]["Exception"] == "", f"Duplicate entry for data ID {key} in {task_label}" + if shorten_type_name: + # Trim off the package name from the exception type for + # brevity. + type_name = type_name.rsplit(".", maxsplit=1)[-1] + rows[key]["Task"] = task_label + rows[key]["Exception"] = type_name + if add_exception_msg: + msg = exception.exception.message + if max_message_width and len(msg) > max_message_width: + msg = textwrap.shorten(msg, max_message_width) + rows[key]["Exception Message"] = msg + if add_exposure_records: + exposure_record = exposure_record_lookup[data_id["exposure"]] + for k in needed_exposure_records: + rows[key][k] = getattr(exposure_record, k) + + # Extract all unique columns. + all_columns = {col for r in rows.values() for col in r} + table_rows = [] + + # Loop over all rows and add them to the table. + for key, col_counts in rows.items(): + # Add data ID values as columns at the start of the row. + row = dict(key) + # Add exposure records next, if requested. + if add_exposure_records: + for col in needed_exposure_records: + row[col] = col_counts.get(col, "-") + # Add all other columns last. + for col in all_columns - set(needed_exposure_records) - {"Exception Message"}: + row[col] = col_counts.get(col, "-") + # Add the exception message if requested. + if add_exception_msg: + row["Exception Message"] = col_counts.get("Exception Message", "-") + table_rows.append(row) + + return astropy.table.Table(table_rows) + def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: """Construct an `astropy.table.Table` with per-data-ID information about failed, wonky, and partial-outputs-error quanta. @@ -1295,7 +1450,7 @@ def to_summary( ---------- butler : `lsst.daf.butler.Butler`, optional Ignored; accepted for backwards compatibility. - do_store_logs : `bool` + do_store_logs : `bool`, optional Store the logs in the summary dictionary. n_cores : `int`, optional From 0c7c857ef3ed02286b6ff4c2522bd86721108887 Mon Sep 17 00:00:00 2001 From: Erfan Nourbakhsh Date: Mon, 12 May 2025 16:14:10 -0400 Subject: [PATCH 2/3] Add release note for exception diagnostics --- doc/changes/DM-50550.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/DM-50550.feature.md diff --git a/doc/changes/DM-50550.feature.md b/doc/changes/DM-50550.feature.md new file mode 100644 index 000000000..3e68ac182 --- /dev/null +++ b/doc/changes/DM-50550.feature.md @@ -0,0 +1 @@ +Add support for generating exception diagnostics with Data ID grouping by combining provenance and dimension records. \ No newline at end of file From adb8ea2225c69be93e71ed786efd6da56eb968d9 Mon Sep 17 00:00:00 2001 From: Erfan Nourbakhsh Date: Tue, 13 May 2025 16:15:49 -0400 Subject: [PATCH 3/3] Refactor dimension records logic --- .../pipe/base/quantum_provenance_graph.py | 183 ++++++++++++------ 1 file changed, 126 insertions(+), 57 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index c29230831..3f9f21e5a 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -73,6 +73,7 @@ MissingDatasetTypeError, QuantumBackedButler, ) +from lsst.daf.butler.registry.queries import DimensionRecordQueryResults from lsst.resources import ResourcePathExpression from lsst.utils.logging import PeriodicLogger, getLogger @@ -659,7 +660,7 @@ def _add_quantum_info( self.recovered_quanta.append(dict(info["data_id"].required)) if final_quantum_run is not None and final_quantum_run.caveats: code = final_quantum_run.caveats.concise() - self.caveats.setdefault(code, []).append(dict(info["data_id"].required)) + self.caveats.setdefault(code, []).append(dict(info["data_id"].mapping)) if final_quantum_run.caveats & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: if final_quantum_run.exception is not None: self.exceptions.setdefault(final_quantum_run.exception.type_name, []).append( @@ -964,7 +965,8 @@ def pprint( datasets: bool = True, show_exception_diagnostics: bool = False, butler: Butler | None = None, - ) -> None: + return_exception_diagnostics_table: bool = False, + ) -> astropy.table.Table | None: """Print this summary to stdout, as a series of tables. Parameters @@ -978,9 +980,21 @@ def pprint( includes a summary table of dataset counts for various status and (if ``brief`` is `True`) a table with per-data ID information for each unsuccessful or cursed dataset. + show_exception_diagnostics : `bool`, optional + If `True`, include a table of exception diagnostics in the output. butler : `lsst.daf.butler.Butler`, optional The butler used to create this summary. This is only used to get exposure dimension records for the exception diagnostics. + return_exception_diagnostics_table : `bool`, optional + If `True`, return the exception diagnostics table in addition to + printing it. Only supported if ``show_exception_diagnostics`` is + `True`. + + Returns + ------- + exception_diagnostics_table : `astropy.table.Table` or `None` + A table of exception diagnostics, if requested and available. + Otherwise, `None`. """ self.make_quantum_table().pprint_all() print("") @@ -991,24 +1005,47 @@ def pprint( if exception_table := self.make_exception_table(): exception_table.pprint_all() print("") + exception_diagnostics_table = None if show_exception_diagnostics: - exception_diagnostics_table = self.make_exception_diagnostics_table( - butler, max_message_width=45, shorten_type_name=True + if return_exception_diagnostics_table: + # Keep an original copy of the table to be returned. + exception_diagnostics_table = self.make_exception_diagnostics_table(butler) + exception_diagnostics_table_view = exception_diagnostics_table.copy() + else: + exception_diagnostics_table_view = self.make_exception_diagnostics_table(butler) + if exception_diagnostics_table_view: + # Shorten the exception type name by trimming the module name. + exception_diagnostics_table_view["Exception"] = [ + val.rsplit(".", maxsplit=1)[-1] if val is not None else val + for val in exception_diagnostics_table_view["Exception"] + ] + # Shorten the exception message to a maximum width. + max_message_width = 45 + exception_diagnostics_table_view["Exception Message"] = [ + textwrap.shorten(msg, width=max_message_width, placeholder="...") + if msg and isinstance(msg, str) and len(msg) > max_message_width + else msg + for msg in exception_diagnostics_table_view["Exception Message"] + ] + with self.tty_buffer() as buffer: + # Use pprint() to trim long tables; pprint_all() may flood + # the screen in those cases. + exception_diagnostics_table_view.pprint() + last_line = buffer.getvalue().splitlines()[-1] + # Print the table from the buffer. + print(buffer.getvalue()) + if "Length =" in last_line: + # The table was too long to print, we had to truncate it. + print( + "▲ Note: The exception diagnostics table above is truncated. " + "Use --exception-diagnostics-filename to save the complete table." + ) + print("") + elif return_exception_diagnostics_table: + raise ValueError( + "The exception diagnostics table was requested to be returned, " + "but `show_exception_diagnostics` is False." ) - with self.tty_buffer() as buffer: - # Use pprint() to trim long tables; pprint_all() may flood the - # screen in those cases. - exception_diagnostics_table.pprint() - last_line = buffer.getvalue().splitlines()[-1] - # Print the table from the buffer. - print(buffer.getvalue()) - if "Length =" in last_line: - # The table was too long to print, so we had to truncate it. - print( - "▲ Note: The exception diagnostics table above is truncated. " - "Use --exception-diagnostics-filename to save the complete table." - ) - print("") if datasets: self.make_dataset_table().pprint_all() print("") @@ -1022,6 +1059,7 @@ def pprint( print(f"{dataset_type_name} errors:") bad_dataset_table.pprint_all() print("") + return exception_diagnostics_table def make_quantum_table(self) -> astropy.table.Table: """Construct an `astropy.table.Table` with a tabular summary of the @@ -1129,6 +1167,7 @@ def make_exception_table(self) -> astropy.table.Table: def make_exception_diagnostics_table( self, butler: Butler | None = None, + add_dimension_records: bool = True, add_exception_msg: bool = True, max_message_width: int | None = None, shorten_type_name: bool = False, @@ -1145,8 +1184,10 @@ def make_exception_diagnostics_table( Parameters ---------- butler : `lsst.daf.butler.Butler`, optional - Butler instance used to fetch exposure records. If not provided, - exposure dimension records will not be included in the table. + Butler instance used to fetch dimension records. + add_dimension_records : `bool`, optional + If `True`, include visit and exposure dimension records in the + table, if available. This requires ``butler`` to be provided. add_exception_msg : `bool`, optional If `True`, include the exception message in the table. max_message_width : `int`, optional @@ -1165,31 +1206,32 @@ def make_exception_diagnostics_table( task), and optionally, exposure dimension records and exception messages. """ - add_exposure_records = True - needed_exposure_records = ["day_obs", "physical_filter", "exposure_time", "target_name"] - - # Preload all exposure dimension records up front for faster O(1) - # lookup later. Querying per data ID in the loop is painfully slow. - if butler: - exposure_record_lookup = { - d.dataId["exposure"]: d for d in butler.query_dimension_records("exposure", explain=False) - } - else: - exposure_record_lookup = {} - add_exposure_records = False + if add_dimension_records and butler is None: + raise ValueError("Butler is required to fetch dimension records.") - if butler and not exposure_record_lookup: - _LOG.warning("No exposure records found in the butler; they will not be included in the table.") - add_exposure_records = False + # The additional columns for visit and exposure records to add to the + # output table, if available. Note that 'band', 'day_obs', and + # 'physical_filter' already exist in `exception.data_id` below. + needed_visit_records = ["exposure_time", "target_name"] + needed_exposure_records = ["exposure_time", "target_name"] rows: defaultdict[tuple, defaultdict[str, str]] = defaultdict(lambda: defaultdict(str)) + exposure_data_ids: list[dict] = [] + visit_data_ids: list[dict] = [] + dimension_record_lookup: dict[str, DimensionRecordQueryResults] = {} # Loop over all tasks and exceptions, and associate them with data IDs. for task_label, task_summary in self.tasks.items(): for type_name, exceptions in task_summary.exceptions.items(): for exception in exceptions: - data_id = exception.data_id - key = tuple(sorted(data_id.items())) # Hashable and stable + data_id = DataCoordinate.standardize(exception.data_id, universe=butler.dimensions) + if add_dimension_records: + if "visit" in data_id: + visit_data_ids.append(data_id) + elif "exposure" in data_id: + exposure_data_ids.append(data_id) + # Define a hashable and stable tuple of data ID values. + key = tuple(sorted(data_id.mapping.items())) assert len(rows[key]) == 0, f"Multiple exceptions for one data ID: {key}" assert rows[key]["Exception"] == "", f"Duplicate entry for data ID {key} in {task_label}" if shorten_type_name: @@ -1203,32 +1245,59 @@ def make_exception_diagnostics_table( if max_message_width and len(msg) > max_message_width: msg = textwrap.shorten(msg, max_message_width) rows[key]["Exception Message"] = msg - if add_exposure_records: - exposure_record = exposure_record_lookup[data_id["exposure"]] - for k in needed_exposure_records: - rows[key][k] = getattr(exposure_record, k) - # Extract all unique columns. - all_columns = {col for r in rows.values() for col in r} - table_rows = [] + if add_dimension_records and (visit_data_ids or exposure_data_ids): + # Preload all the dimension records up front for faster O(1) lookup + # later. Querying per data ID in the loop is painfully slow. These + # data IDs are limited to the ones that have exceptions. + with butler.query() as query: + query = query.join_data_coordinates(visit_data_ids + exposure_data_ids) + for element in ["visit", "exposure"]: + dimension_record_lookup |= { + f"{element}:{d.dataId[element]}": d for d in query.dimension_records(element) + } + + # Loop over the data IDs and fill in the dimension records. + for element, data_ids, needed_records in zip( + ["visit", "exposure"], + [visit_data_ids, exposure_data_ids], + [needed_visit_records, needed_exposure_records], + ): + for data_id in data_ids: + key = tuple(sorted(data_id.mapping.items())) + for k in needed_records: + rows[key][k] = getattr( + dimension_record_lookup[f"{element}:{data_id[element]}"], k, None + ) + + # Extract all unique data ID keys from the rows for the table header. + all_key_columns = {k for key in rows for k, _ in key} # Loop over all rows and add them to the table. - for key, col_counts in rows.items(): - # Add data ID values as columns at the start of the row. - row = dict(key) - # Add exposure records next, if requested. - if add_exposure_records: - for col in needed_exposure_records: - row[col] = col_counts.get(col, "-") - # Add all other columns last. - for col in all_columns - set(needed_exposure_records) - {"Exception Message"}: - row[col] = col_counts.get(col, "-") - # Add the exception message if requested. + table_rows = [] + for key, values in rows.items(): + # Create a new row with all key columns initialized to None, + # allowing missing values to be properly masked when `masked=True`. + row = {col: None for col in all_key_columns} + # Fill in data ID fields from the key. + row.update(dict(key)) + # Add dimension records next, if requested and available. + if add_dimension_records: + if visit_data_ids: + for col in needed_visit_records: + row[col] = values.get(col, None) + if exposure_data_ids: + for col in needed_exposure_records: + row[col] = values.get(col, None) + # Add task label and exception type. + for col in ("Task", "Exception"): + row[col] = values.get(col, None) + # Add the exception message, if requested. if add_exception_msg: - row["Exception Message"] = col_counts.get("Exception Message", "-") + row["Exception Message"] = values.get("Exception Message", None) table_rows.append(row) - return astropy.table.Table(table_rows) + return astropy.table.Table(table_rows, masked=True) def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: """Construct an `astropy.table.Table` with per-data-ID information