diff --git a/doc/changes/DM-50244.misc.md b/doc/changes/DM-50244.misc.md new file mode 100644 index 000000000..e667865d6 --- /dev/null +++ b/doc/changes/DM-50244.misc.md @@ -0,0 +1 @@ +Make quantum graph generation more permissive about missing init-outputs in `--skip-existing-in` collections. diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index 072b9b813..786a87545 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -346,8 +346,16 @@ def build( # Loop over tasks. The pipeline graph must be topologically # sorted, so a quantum is only processed after any quantum that # provides its inputs has been processed. + doomed_tasks: list[str] = [] for task_node in self._pipeline_graph.tasks.values(): - self._resolve_task_quanta(task_node, full_skeleton) + doomed_tasks.extend(self._resolve_task_quanta(task_node, full_skeleton)) + for task_label in doomed_tasks: + if full_skeleton.has_task(task_label): + raise InitInputMissingError( + f"Task {task_label} requires an init-output dataset that was not present " + f"in {self.skip_existing_in} and cannot be produced by this graph. " + "(see warnings logged earlier for details)." + ) # Add global init-outputs to the skeleton. for dataset_type in self._global_init_output_types.values(): dataset_key = full_skeleton.add_dataset_node( @@ -427,7 +435,7 @@ class can add them later, albeit possibly less efficiently). @final @timeMethod - def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkeleton) -> None: + def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkeleton) -> list[str]: """Process the quanta for one task in a skeleton graph to skip those that have already completed and adjust those that request it. @@ -438,6 +446,14 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet skeleton : `quantum_graph_skeleton.QuantumGraphSkeleton` Preliminary quantum graph, to be modified in-place. + Returns + ------- + doomed_tasks : `list` [ `str` ] + Labels of tasks that consume an init-output that will not be + produced by this quantum graph, because the producing task is being + dropped due to skip-existing-in and the output was not present in + the skip-existing-in collection. + Notes ----- This method modifies ``skeleton`` in-place in several ways: @@ -507,6 +523,7 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet # raise if one of the check conditions is not met, which is the # intended behavior. helper = AdjustQuantumHelper(inputs=adjusted_inputs, outputs=adjusted_outputs) + quantum_data_id = skeleton[quantum_key]["data_id"] try: helper.adjust_in_place(task_node.get_connections(), task_node.label, quantum_data_id) except NoWorkFound as err: @@ -561,7 +578,7 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet for no_work_quantum in no_work_quanta: skeleton.remove_quantum_node(no_work_quantum, remove_outputs=True) remaining_quanta = skeleton.get_quanta(task_node.label) - self._resolve_task_init(task_node, skeleton, bool(skipped_quanta)) + doomed_tasks = self._resolve_task_init(task_node, skeleton, bool(skipped_quanta)) message_terms = [] if no_work_quanta: message_terms.append(f"{len(no_work_quanta)} had no work to do") @@ -582,6 +599,7 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet "Dropping task %s because no quanta remain%s.", task_node.label, message_parenthetical ) skeleton.remove_task(task_node.label) + return doomed_tasks def _skip_quantum_if_metadata_exists( self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton @@ -837,7 +855,7 @@ class and components defined by the task's own connections. @final def _resolve_task_init( self, task_node: TaskNode, skeleton: QuantumGraphSkeleton, has_skipped_quanta: bool - ) -> None: + ) -> list[str]: """Add init-input and init-output dataset nodes and edges for a task to the skeleton. @@ -850,9 +868,18 @@ def _resolve_task_init( has_skipped_quanta : `bool` Whether any of this task's quanta were skipped because they had already succeeded. + + Returns + ------- + doomed_tasks : `list` [ `str` ] + Labels of tasks that consume an init-output that will not be + produced by this quantum graph, because the producing task is being + dropped due to skip-existing-in and the output was not present in + the skip-existing-in collection. """ quanta = skeleton.get_quanta(task_node.label) task_init_key = TaskInitKey(task_node.label) + doomed_tasks: list[str] = [] if quanta: adapted_inputs: NamedKeyDict[DatasetType, DatasetRef] = NamedKeyDict() # Process init-inputs. @@ -910,11 +937,23 @@ def _resolve_task_init( write_edge.parent_dataset_type_name, self.empty_data_id ) if (ref := self.empty_dimensions_datasets.outputs_for_skip.get(dataset_key)) is None: - raise InitInputMissingError( - f"Init-output dataset {write_edge.parent_dataset_type_name!r} of skipped task " - f"{task_node.label!r} not found in skip-existing-in collection(s) " - f"{self.skip_existing_in}." - ) from None + # init-outputs really shouldn't be missing, but because + # they might in important contexts (rapid analysis) we want + # to try really hard to just warn and proceed without them. + # First we see if any tasks are going to need those as + # inputs, and later once we've dropped all the task we're + # going to, we'll see if any remain that can't proceed. + consuming_tasks = self._pipeline_graph.consumers_of(write_edge.parent_dataset_type_name) + doomed_tasks.extend([n.label for n in consuming_tasks]) + self.log.warning( + "Init-output dataset %r of skipped task %r not found in skip-existing-in " + "collection(s) %s. Proceeding unless this is needed as an input by a retained task.", + write_edge.parent_dataset_type_name, + task_node.label, + self.skip_existing_in, + ) + skeleton.remove_dataset_nodes([dataset_key]) + continue skeleton.set_dataset_ref(ref, dataset_key) # If this dataset was "in the way" (i.e. already in the output # run), it isn't anymore. @@ -924,6 +963,7 @@ def _resolve_task_init( # dooms all downstream quanta to the same fate, so we don't bother # doing anything with the task's init-outputs, since nothing is # going to consume them. + return doomed_tasks @final @timeMethod