-
Notifications
You must be signed in to change notification settings - Fork 12
DM-50550: Add exception diagnostics table to quantum provenance #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add support for generating exception diagnostics with Data ID grouping by combining provenance and dimension records. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,12 +42,16 @@ | |
import concurrent.futures | ||
import dataclasses | ||
import datetime | ||
import io | ||
import itertools | ||
import logging | ||
import sys | ||
import textwrap | ||
import threading | ||
import uuid | ||
from collections import defaultdict | ||
from collections.abc import Callable, Iterator, Mapping, Sequence, Set | ||
from contextlib import contextmanager | ||
from enum import Enum | ||
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast | ||
|
||
|
@@ -69,6 +73,7 @@ | |
MissingDatasetTypeError, | ||
QuantumBackedButler, | ||
) | ||
from lsst.daf.butler.registry.queries import DimensionRecordQueryResults | ||
from lsst.resources import ResourcePathExpression | ||
from lsst.utils.logging import PeriodicLogger, getLogger | ||
|
||
|
@@ -655,7 +660,7 @@ | |
self.recovered_quanta.append(dict(info["data_id"].required)) | ||
if final_quantum_run is not None and final_quantum_run.caveats: | ||
code = final_quantum_run.caveats.concise() | ||
self.caveats.setdefault(code, []).append(dict(info["data_id"].required)) | ||
self.caveats.setdefault(code, []).append(dict(info["data_id"].mapping)) | ||
if final_quantum_run.caveats & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: | ||
if final_quantum_run.exception is not None: | ||
self.exceptions.setdefault(final_quantum_run.exception.type_name, []).append( | ||
|
@@ -934,7 +939,34 @@ | |
result_dataset_summary._add_data_id_group(dataset_type_summary) | ||
return result | ||
|
||
def pprint(self, brief: bool = False, datasets: bool = True) -> None: | ||
@contextmanager | ||
def tty_buffer(self) -> Iterator[io.StringIO]: | ||
"""Context manager that temporarily redirects sys.stdout to a | ||
teletypewriter-like buffer. Useful for capturing output that formats | ||
differently when writing to a TTY. | ||
""" | ||
|
||
class MockTTY(io.StringIO): | ||
# Pretend to be a terminal to capture full TTY output. | ||
def isatty(self) -> bool: | ||
return True | ||
|
||
orig = sys.stdout | ||
buf = MockTTY() | ||
sys.stdout = buf | ||
try: | ||
yield buf # Use buffer inside `with` block. | ||
finally: | ||
sys.stdout = orig # Restore original stdout. | ||
|
||
def pprint( | ||
self, | ||
brief: bool = False, | ||
datasets: bool = True, | ||
show_exception_diagnostics: bool = False, | ||
butler: Butler | None = None, | ||
return_exception_diagnostics_table: bool = False, | ||
) -> astropy.table.Table | None: | ||
"""Print this summary to stdout, as a series of tables. | ||
|
||
Parameters | ||
|
@@ -948,6 +980,21 @@ | |
includes a summary table of dataset counts for various status and | ||
(if ``brief`` is `True`) a table with per-data ID information for | ||
each unsuccessful or cursed dataset. | ||
show_exception_diagnostics : `bool`, optional | ||
If `True`, include a table of exception diagnostics in the output. | ||
butler : `lsst.daf.butler.Butler`, optional | ||
The butler used to create this summary. This is only used to get | ||
exposure dimension records for the exception diagnostics. | ||
return_exception_diagnostics_table : `bool`, optional | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there's any advantage to having this option. If |
||
If `True`, return the exception diagnostics table in addition to | ||
printing it. Only supported if ``show_exception_diagnostics`` is | ||
`True`. | ||
|
||
Returns | ||
------- | ||
exception_diagnostics_table : `astropy.table.Table` or `None` | ||
A table of exception diagnostics, if requested and available. | ||
Otherwise, `None`. | ||
""" | ||
self.make_quantum_table().pprint_all() | ||
print("") | ||
|
@@ -958,6 +1005,47 @@ | |
if exception_table := self.make_exception_table(): | ||
exception_table.pprint_all() | ||
print("") | ||
exception_diagnostics_table = None | ||
if show_exception_diagnostics: | ||
if return_exception_diagnostics_table: | ||
# Keep an original copy of the table to be returned. | ||
exception_diagnostics_table = self.make_exception_diagnostics_table(butler) | ||
exception_diagnostics_table_view = exception_diagnostics_table.copy() | ||
else: | ||
exception_diagnostics_table_view = self.make_exception_diagnostics_table(butler) | ||
if exception_diagnostics_table_view: | ||
# Shorten the exception type name by trimming the module name. | ||
exception_diagnostics_table_view["Exception"] = [ | ||
val.rsplit(".", maxsplit=1)[-1] if val is not None else val | ||
for val in exception_diagnostics_table_view["Exception"] | ||
] | ||
# Shorten the exception message to a maximum width. | ||
max_message_width = 45 | ||
exception_diagnostics_table_view["Exception Message"] = [ | ||
textwrap.shorten(msg, width=max_message_width, placeholder="...") | ||
if msg and isinstance(msg, str) and len(msg) > max_message_width | ||
else msg | ||
for msg in exception_diagnostics_table_view["Exception Message"] | ||
] | ||
with self.tty_buffer() as buffer: | ||
# Use pprint() to trim long tables; pprint_all() may flood | ||
# the screen in those cases. | ||
exception_diagnostics_table_view.pprint() | ||
last_line = buffer.getvalue().splitlines()[-1] | ||
# Print the table from the buffer. | ||
print(buffer.getvalue()) | ||
if "Length =" in last_line: | ||
# The table was too long to print, we had to truncate it. | ||
print( | ||
"▲ Note: The exception diagnostics table above is truncated. " | ||
"Use --exception-diagnostics-filename to save the complete table." | ||
) | ||
print("") | ||
elif return_exception_diagnostics_table: | ||
raise ValueError( | ||
"The exception diagnostics table was requested to be returned, " | ||
"but `show_exception_diagnostics` is False." | ||
) | ||
if datasets: | ||
self.make_dataset_table().pprint_all() | ||
print("") | ||
|
@@ -971,6 +1059,7 @@ | |
print(f"{dataset_type_name} errors:") | ||
bad_dataset_table.pprint_all() | ||
print("") | ||
return exception_diagnostics_table | ||
|
||
def make_quantum_table(self) -> astropy.table.Table: | ||
"""Construct an `astropy.table.Table` with a tabular summary of the | ||
|
@@ -1075,6 +1164,141 @@ | |
rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)}) | ||
return astropy.table.Table(rows) | ||
|
||
def make_exception_diagnostics_table( | ||
self, | ||
butler: Butler | None = None, | ||
add_dimension_records: bool = True, | ||
add_exception_msg: bool = True, | ||
max_message_width: int | None = None, | ||
shorten_type_name: bool = False, | ||
) -> astropy.table.Table: | ||
"""Construct an `astropy.table.Table` showing exceptions grouped by | ||
data ID. | ||
|
||
Each row represents one data ID that encountered an exception, along | ||
with the exception type under the column named after the task that | ||
raised it. If a Butler is provided, the table will also include a | ||
subset of exposure-related metadata pulled from the exposure dimension | ||
records. The exception message can optionaly be included in the table. | ||
|
||
Parameters | ||
---------- | ||
butler : `lsst.daf.butler.Butler`, optional | ||
Butler instance used to fetch dimension records. | ||
add_dimension_records : `bool`, optional | ||
If `True`, include visit and exposure dimension records in the | ||
table, if available. This requires ``butler`` to be provided. | ||
add_exception_msg : `bool`, optional | ||
If `True`, include the exception message in the table. | ||
max_message_width : `int`, optional | ||
Maximum width for storing exception messages in the output table. | ||
Longer messages will be truncated. If not provided, messages will | ||
be included in full without truncation. | ||
shorten_type_name : `bool`, optional | ||
If `True`, shorten the exception type name by removing the | ||
package name. This is useful for making the table more readable | ||
when the package name is long or not relevant to the user. | ||
|
||
Returns | ||
------- | ||
table : `astropy.table.Table` | ||
Table with one row per data ID and columns for exception types (by | ||
task), and optionally, exposure dimension records and exception | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be more clear here to write "columns for exception type and, optionally, dimension records and exception message." |
||
messages. | ||
""" | ||
if add_dimension_records and butler is None: | ||
raise ValueError("Butler is required to fetch dimension records.") | ||
|
||
# The additional columns for visit and exposure records to add to the | ||
# output table, if available. Note that 'band', 'day_obs', and | ||
# 'physical_filter' already exist in `exception.data_id` below. | ||
needed_visit_records = ["exposure_time", "target_name"] | ||
needed_exposure_records = ["exposure_time", "target_name"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These sorts of strings should be passed in by the caller, not included in the code; I'm imagining something like an argument that's a list of extra quantities to put in the table. You might then be able to use: If you can initialize that with some dimensions (i.e. from the data IDs that have a particular exception), you can give it strings like "exposure_time" or "target_name" and it will figure out which dimension they belong to (of the ones it has been given, since those kinds of columns aren't always unique). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We modified the code so that here it calls an entry point that returns a dict of dimensions with list values of the metadata fields to be included. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @enourbakhsh I don't see the new code we worked on here. Did you forget to push it? It was a big change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @timj by the time I had it polished, @cmsaunders was already in the middle of the first review round. I didn’t want to interfere, so I’m planning to push those changes in the second round. |
||
|
||
rows: defaultdict[tuple, defaultdict[str, str]] = defaultdict(lambda: defaultdict(str)) | ||
exposure_data_ids: list[dict] = [] | ||
visit_data_ids: list[dict] = [] | ||
dimension_record_lookup: dict[str, DimensionRecordQueryResults] = {} | ||
|
||
# Loop over all tasks and exceptions, and associate them with data IDs. | ||
for task_label, task_summary in self.tasks.items(): | ||
for type_name, exceptions in task_summary.exceptions.items(): | ||
for exception in exceptions: | ||
data_id = DataCoordinate.standardize(exception.data_id, universe=butler.dimensions) | ||
if add_dimension_records: | ||
if "visit" in data_id: | ||
visit_data_ids.append(data_id) | ||
elif "exposure" in data_id: | ||
exposure_data_ids.append(data_id) | ||
# Define a hashable and stable tuple of data ID values. | ||
key = tuple(sorted(data_id.mapping.items())) | ||
assert len(rows[key]) == 0, f"Multiple exceptions for one data ID: {key}" | ||
assert rows[key]["Exception"] == "", f"Duplicate entry for data ID {key} in {task_label}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand how the previous line could pass and then this line fail. If |
||
if shorten_type_name: | ||
# Trim off the package name from the exception type for | ||
# brevity. | ||
type_name = type_name.rsplit(".", maxsplit=1)[-1] | ||
rows[key]["Task"] = task_label | ||
rows[key]["Exception"] = type_name | ||
if add_exception_msg: | ||
msg = exception.exception.message | ||
if max_message_width and len(msg) > max_message_width: | ||
msg = textwrap.shorten(msg, max_message_width) | ||
rows[key]["Exception Message"] = msg | ||
|
||
if add_dimension_records and (visit_data_ids or exposure_data_ids): | ||
# Preload all the dimension records up front for faster O(1) lookup | ||
# later. Querying per data ID in the loop is painfully slow. These | ||
# data IDs are limited to the ones that have exceptions. | ||
with butler.query() as query: | ||
query = query.join_data_coordinates(visit_data_ids + exposure_data_ids) | ||
for element in ["visit", "exposure"]: | ||
dimension_record_lookup |= { | ||
f"{element}:{d.dataId[element]}": d for d in query.dimension_records(element) | ||
} | ||
|
||
# Loop over the data IDs and fill in the dimension records. | ||
for element, data_ids, needed_records in zip( | ||
["visit", "exposure"], | ||
[visit_data_ids, exposure_data_ids], | ||
[needed_visit_records, needed_exposure_records], | ||
): | ||
for data_id in data_ids: | ||
key = tuple(sorted(data_id.mapping.items())) | ||
for k in needed_records: | ||
rows[key][k] = getattr( | ||
dimension_record_lookup[f"{element}:{data_id[element]}"], k, None | ||
) | ||
|
||
# Extract all unique data ID keys from the rows for the table header. | ||
all_key_columns = {k for key in rows for k, _ in key} | ||
|
||
# Loop over all rows and add them to the table. | ||
table_rows = [] | ||
for key, values in rows.items(): | ||
# Create a new row with all key columns initialized to None, | ||
# allowing missing values to be properly masked when `masked=True`. | ||
row = {col: None for col in all_key_columns} | ||
# Fill in data ID fields from the key. | ||
row.update(dict(key)) | ||
# Add dimension records next, if requested and available. | ||
if add_dimension_records: | ||
if visit_data_ids: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't you need to check whether the data_id for this row is in visit_data_ids (or in exposure_data_ids at line 1289)? |
||
for col in needed_visit_records: | ||
row[col] = values.get(col, None) | ||
if exposure_data_ids: | ||
for col in needed_exposure_records: | ||
row[col] = values.get(col, None) | ||
# Add task label and exception type. | ||
for col in ("Task", "Exception"): | ||
row[col] = values.get(col, None) | ||
# Add the exception message, if requested. | ||
if add_exception_msg: | ||
row["Exception Message"] = values.get("Exception Message", None) | ||
table_rows.append(row) | ||
|
||
return astropy.table.Table(table_rows, masked=True) | ||
|
||
def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: | ||
"""Construct an `astropy.table.Table` with per-data-ID information | ||
about failed, wonky, and partial-outputs-error quanta. | ||
|
@@ -1295,7 +1519,7 @@ | |
---------- | ||
butler : `lsst.daf.butler.Butler`, optional | ||
Ignored; accepted for backwards compatibility. | ||
do_store_logs : `bool` | ||
do_store_logs : `bool`, optional | ||
Store the logs in the summary dictionary. | ||
n_cores : `int`, optional | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you need to change this?