Skip to content

Commit 3669f62

Browse files
committed
Add the ability to generate measurement bundles
Add infrastructure to enable creating MetricMeasurementBundles when producing the consolidated resources.
1 parent fa081d1 commit 3669f62

File tree

1 file changed

+166
-1
lines changed

1 file changed

+166
-1
lines changed

python/lsst/analysis/tools/tasks/gatherResourceUsage.py

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from collections.abc import Iterable, Sequence
3838
from typing import Any
3939

40+
import astropy.units as apu
4041
import numpy as np
4142
import pandas as pd
4243
from lsst.daf.butler import Butler, DatasetRef, DatasetType
@@ -54,6 +55,11 @@
5455
from lsst.pipe.base.pipeline_graph import PipelineGraph
5556
from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder
5657
from lsst.pipe.base.quantum_graph_skeleton import DatasetKey, QuantumGraphSkeleton
58+
from lsst.verify import Measurement
59+
60+
from ..interfaces import MetricMeasurementBundle
61+
62+
from ..interfaces._task import _timestampValidator
5763

5864
# It's not great to be importing a private symbol, but this is a temporary
5965
# workaround for the fact that prior to w.2022.10, the units for memory values
@@ -64,6 +70,58 @@
6470
_LOG = logging.getLogger(__name__)
6571

6672

73+
def _resource_table_to_bundle(
74+
table: pd.DataFrame, dataset_identifier: str, reference_package: str, timestamp_version: str
75+
) -> MetricMeasurementBundle:
76+
"""Convert a resource usge table into a `MetricMeasurementBundle`
77+
78+
See `lsst.analysis.tools.interfaces.AnalysisPipelineTask` for more
79+
information on each of the following options.
80+
81+
Parameters
82+
----------
83+
table : `DataFrame`
84+
Resource Usge in the the form of a DataFrame.
85+
dataset_identifier : `str`
86+
The name of the data processing to associate with this metric bundle.
87+
reference_package : `str`
88+
The reference package to use if the timestamp version is set to a
89+
package version.
90+
timestamp_version : `str`
91+
The type of timestamp to associate with the bundle.
92+
"""
93+
bundle = MetricMeasurementBundle(
94+
dataset_identifier=dataset_identifier,
95+
reference_package=reference_package,
96+
timestamp_version=timestamp_version,
97+
)
98+
# determine all the columns in the table these will become measurements.
99+
column_keys = set(table.keys())
100+
# discard the task, as this will be like the AnalysisTools in the bundle.
101+
column_keys.remove("task")
102+
# Measurements need units, use this to map the column to unit type.
103+
unit_mapping = (
104+
("quanta", apu.Unit("count")),
105+
("_hrs", apu.Unit("hour")),
106+
("_GB", apu.Unit("Gbyte")),
107+
("_s", apu.Unit("s")),
108+
)
109+
# for each row, grab the task name, and create a list of measurements.
110+
for _, row in table.iterrows():
111+
task_name = f"{row['task']}_memrun"
112+
task_data = []
113+
for key in column_keys:
114+
unit = None
115+
for stub, value in unit_mapping:
116+
if stub in key:
117+
unit = value
118+
if unit is None:
119+
raise ValueError(f"Could not determine units for task {row['task']}")
120+
task_data.append(Measurement(key, row[key] * unit))
121+
bundle[task_name] = task_data
122+
return bundle
123+
124+
67125
class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()):
68126
"""Connection definitions for `ConsolidateResourceUsageTask`."""
69127

@@ -74,6 +132,16 @@ class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()
74132
doc="Consolidated table of resource usage statistics. One row per task label",
75133
)
76134

135+
output_metrics = cT.Output(
136+
name="ResourceUsageSummary_metrics",
137+
storageClass="MetricMeasurementBundle",
138+
dimensions=(),
139+
doc=(
140+
"MetricMeasurementBundle with the same information as the ResourceUsageSummary in the form "
141+
"required for Sasquatch dispatch"
142+
),
143+
)
144+
77145
def __init__(self, *, config):
78146
super().__init__(config=config)
79147
for name in self.config.input_names:
@@ -88,6 +156,8 @@ def __init__(self, *, config):
88156
),
89157
)
90158
self.inputs.add(name)
159+
if not self.config.do_make_metrics:
160+
self.outputs.remove("output_metrics")
91161

92162

93163
class ConsolidateResourceUsageConfig(
@@ -99,6 +169,25 @@ class ConsolidateResourceUsageConfig(
99169
doc="Input resource usage dataset type names",
100170
default=[],
101171
)
172+
do_make_metrics = Field[bool](doc="Make metric bundle in addition to DataFrame", default=False)
173+
dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True)
174+
reference_package = Field[str](
175+
doc="A package who's version, at the time of metric upload to a "
176+
"time series database, will be converted to a timestamp of when "
177+
"that version was produced",
178+
default="lsst_distrib",
179+
optional=True,
180+
)
181+
timestamp_version = Field[str](
182+
doc="Which time stamp should be used as the reference timestamp for a "
183+
"metric in a time series database, valid values are; "
184+
"reference_package_timestamp, run_timestamp, current_timestamp, "
185+
"dataset_timestamp and explicit_timestamp:datetime where datetime is "
186+
"given in the form %Y%m%dT%H%M%S%z",
187+
default="run_timestamp",
188+
check=_timestampValidator,
189+
optional=True,
190+
)
102191

103192

104193
class ConsolidateResourceUsageTask(PipelineTask):
@@ -113,6 +202,7 @@ class ConsolidateResourceUsageTask(PipelineTask):
113202
"""
114203

115204
ConfigClass = ConsolidateResourceUsageConfig
205+
config: ConsolidateResourceUsageConfig
116206
_DefaultName = "consolidateResourceUsage"
117207

118208
def run(self, **kwargs: Any) -> Struct:
@@ -166,8 +256,18 @@ def run(self, **kwargs: Any) -> Struct:
166256
.sort_values("task"),
167257
memrun,
168258
)
259+
results = Struct(output_table=memrun)
260+
261+
if self.config.do_make_metrics:
262+
bundle = _resource_table_to_bundle(
263+
memrun,
264+
self.config.dataset_identifier,
265+
self.config.reference_package,
266+
self.config.timestamp_version,
267+
)
268+
results.output_metrics = bundle
169269

170-
return Struct(output_table=memrun)
270+
return results
171271

172272

173273
class GatherResourceUsageConnections(
@@ -547,6 +647,18 @@ class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
547647
Whether *execution* of this quantum graph will permit clobbering. If
548648
`False` (default), existing outputs in ``output_run`` are an error
549649
unless ``skip_existing_in`` will cause those quanta to be skipped.
650+
make_metric : `bool`, optional
651+
Produce a metric measurement bundle when processing the output
652+
table.
653+
timestamp_version : `str`, optional
654+
The type of timespamp used when creating a `MetricMeasurementBundle`,
655+
see there for more details.
656+
dataset_identifier: `str`, optional
657+
An processing identifer that is associated with the processing of this
658+
data, for instance RC2 subset for the nightly reprocessings.
659+
reference_package : `str`, optional
660+
The dataset used as an identifier when timestamp_version is set to
661+
reference_package.
550662
551663
Notes
552664
-----
@@ -567,6 +679,10 @@ def __init__(
567679
output_run: str | None = None,
568680
skip_existing_in: Sequence[str] = (),
569681
clobber: bool = False,
682+
make_metric=False,
683+
timestamp_version=None,
684+
dataset_identifier=None,
685+
reference_package=None,
570686
):
571687
# Start by querying for metadata datasets, since we'll need to know
572688
# which dataset types exist in the input collections in order to
@@ -580,6 +696,11 @@ def __init__(
580696
pipeline_graph = PipelineGraph()
581697
metadata_refs: dict[str, set[DatasetRef]] = {}
582698
consolidate_config = ConsolidateResourceUsageConfig()
699+
if make_metric:
700+
consolidate_config.do_make_metrics = True
701+
consolidate_config.dataset_identifier = dataset_identifier
702+
consolidate_config.timestamp_version = timestamp_version
703+
consolidate_config.reference_package = reference_package
583704
for results in butler.registry.queryDatasets(
584705
input_dataset_types,
585706
where=where,
@@ -753,6 +874,37 @@ def make_argument_parser(cls) -> argparse.ArgumentParser:
753874
default=None,
754875
metavar="RUN",
755876
)
877+
parser.add_argument(
878+
"--make-metric",
879+
type=bool,
880+
help=(
881+
"Turn the output resource usage table into a metric measurement bundle format compatible "
882+
"with Sasquatch."
883+
),
884+
default=True,
885+
metavar="DO_MAKE_METRIC",
886+
)
887+
parser.add_argument(
888+
"--dataset-identifier",
889+
type=str,
890+
help="Set the dataset these results are associated with.",
891+
default=None,
892+
metavar="DATASET_IDENTIFIER",
893+
)
894+
parser.add_argument(
895+
"--reference-package",
896+
type=str,
897+
help="Reference package to use when selecting reference timestamp",
898+
default=None,
899+
metavar="DATASET_IDENTIFIER",
900+
)
901+
parser.add_argument(
902+
"--timestamp-version",
903+
type=str,
904+
help="Set the dataset these results are associated with.",
905+
default=None,
906+
metavar="DATASET_IDENTIFIER",
907+
)
756908
return parser
757909

758910
@classmethod
@@ -770,13 +922,26 @@ def main(cls) -> None:
770922
raise ValueError("At least one of --output or --output-run options is required.")
771923
args.output_run = "{}/{}".format(args.output, Instrument.makeCollectionTimestamp())
772924

925+
extra_args = {}
926+
if args.make_metric:
927+
if args.dataset_identifier is None or args.timestamp_version is None:
928+
raise ValueError(
929+
"If metrics are going to be created, --dataset-identifier and --timestamp-version "
930+
"must be specified."
931+
)
932+
extra_args["make_metric"] = True
933+
extra_args["timestamp_version"] = args.timestamp_version
934+
extra_args["dataset_identifier"] = args.dataset_identifier
935+
extra_args["reference_package"] = args.reference_package
936+
773937
butler = Butler(args.repo, collections=args.collections)
774938
builder = cls(
775939
butler,
776940
dataset_type_names=args.dataset_types,
777941
where=args.where,
778942
input_collections=args.collections,
779943
output_run=args.output_run,
944+
**extra_args,
780945
)
781946
qg: QuantumGraph = builder.build(
782947
# Metadata includes a subset of attributes defined in CmdLineFwk.

0 commit comments

Comments
 (0)