Coverage for python / lsst / analysis / tools / tasks / gatherResourceUsage.py: 19%
239 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22__all__ = (
23 "ConsolidateResourceUsageConfig",
24 "ConsolidateResourceUsageConnections",
25 "ConsolidateResourceUsageTask",
26 "GatherResourceUsageConfig",
27 "GatherResourceUsageConnections",
28 "GatherResourceUsageTask",
29 "ResourceUsageQuantumGraphBuilder",
30)
32import argparse
33import dataclasses
34import datetime
35import logging
36import re
37from collections.abc import Iterable, Sequence
38from typing import Any
40import numpy as np
41import pandas as pd
42from astropy.time import Time
44from lsst.daf.butler import Butler, DatasetRef, DatasetType
45from lsst.pex.config import Field, ListField
46from lsst.pipe.base import (
47 Instrument,
48 PipelineTask,
49 PipelineTaskConfig,
50 PipelineTaskConnections,
51 QuantumGraph,
52 Struct,
53)
54from lsst.pipe.base import connectionTypes as cT
55from lsst.pipe.base.pipeline_graph import PipelineGraph
56from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder
57from lsst.pipe.base.quantum_graph_skeleton import DatasetKey, QuantumGraphSkeleton
59# It's not great to be importing a private symbol, but this is a temporary
60# workaround for the fact that prior to w.2022.10, the units for memory values
61# written in task metadata were platform-dependent. Once we no longer care
62# about older runs, this import and the code that uses it can be removed.
63from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER
65_LOG = logging.getLogger(__name__)
68class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()):
69 """Connection definitions for `ConsolidateResourceUsageTask`."""
71 output_table = cT.Output(
72 name="ResourceUsageSummary",
73 storageClass="DataFrame",
74 dimensions=(),
75 doc="Consolidated table of resource usage statistics. One row per task label",
76 )
78 def __init__(self, *, config):
79 super().__init__(config=config)
80 for name in self.config.input_names:
81 setattr(
82 self,
83 name,
84 cT.Input(
85 name,
86 storageClass="DataFrame",
87 dimensions=(),
88 doc="Resource usage statistics for a task.",
89 ),
90 )
91 self.inputs.add(name)
94class ConsolidateResourceUsageConfig(
95 PipelineTaskConfig, pipelineConnections=ConsolidateResourceUsageConnections
96):
97 """Configuration definitions for `ConsolidateResourceUsageTask`."""
99 input_names = ListField[str](
100 doc="Input resource usage dataset type names",
101 default=[],
102 )
105class ConsolidateResourceUsageTask(PipelineTask):
106 """A `PipelineTask` that summarizes task resource usage into a single
107 table with per-task rows.
109 Notes
110 -----
111 This is an unusual `PipelineTask` in that its input connection has
112 dynamic dimensions, and its quanta are generally built via a custom
113 quantum-graph builder defined in the same module.
114 """
116 ConfigClass = ConsolidateResourceUsageConfig
117 _DefaultName = "consolidateResourceUsage"
119 def run(self, **kwargs: Any) -> Struct:
120 quantiles = []
121 for input_name, ru_table in kwargs.items():
122 if not input_name.endswith("resource_usage"):
123 continue
124 else:
125 df = ru_table.quantile(
126 [0.0, 0.01, 0.05, 0.32, 0.50, 0.68, 0.95, 0.99, 1.0],
127 numeric_only=True,
128 ).reset_index()
129 df["task"] = input_name.replace("_resource_usage", "")
130 df["quanta"] = len(ru_table)
131 df["integrated_runtime"] = ru_table["run_time"].sum()
133 quantiles.append(
134 df[
135 [
136 "index",
137 "quanta",
138 "task",
139 "memory",
140 "init_time",
141 "run_time",
142 "wall_time",
143 "integrated_runtime",
144 ]
145 ]
146 )
148 full_quantiles = pd.concat(quantiles)
149 full_quantiles["percentile"] = (full_quantiles["index"] * 100).astype(int)
150 full_quantiles["percentile_name"] = "p" + full_quantiles["percentile"].astype(str).str.zfill(3)
151 full_quantiles["memoryGB"] = full_quantiles["memory"] / 1024 / 1024 / 1024
152 full_quantiles["integrated_runtime_hrs"] = full_quantiles["integrated_runtime"] / 3600.0
153 memoryGB = pd.pivot_table(
154 full_quantiles, values="memoryGB", columns=["percentile_name"], index=["task"]
155 ).add_prefix("mem_GB_")
156 runtime = pd.pivot_table(
157 full_quantiles, values="run_time", columns=["percentile_name"], index=["task"]
158 ).add_prefix("runtime_s_")
159 walltime = pd.pivot_table(
160 full_quantiles, values="wall_time", columns=["percentile_name"], index=["task"]
161 ).add_prefix("walltime_s_")
162 memrun = pd.merge(
163 pd.merge(
164 memoryGB.reset_index(),
165 runtime.reset_index(),
166 left_on="task",
167 right_on="task",
168 ),
169 walltime.reset_index(),
170 left_on="task",
171 right_on="task",
172 )
173 memrun = pd.merge(
174 full_quantiles[["task", "quanta", "integrated_runtime_hrs"]]
175 .drop_duplicates()
176 .sort_values("task"),
177 memrun,
178 )
180 return Struct(output_table=memrun)
183class GatherResourceUsageConnections(
184 PipelineTaskConnections, dimensions=(), defaultTemplates={"input_task_label": "PLACEHOLDER"}
185):
186 """Connection definitions for `GatherResourceUsageTask`."""
188 output_table = cT.Output(
189 "{input_task_label}_resource_statistics", # Should always be overridden.
190 storageClass="DataFrame",
191 dimensions=(),
192 doc=(
193 "Table that aggregates memory and CPU usage statistics from one "
194 "or more tasks. "
195 "This will have one row for each data ID, with columns for each "
196 "task or method's memory usage and runtime."
197 ),
198 )
199 input_metadata = cT.Input(
200 "{input_task_label}_metadata", # Should always be overridden.
201 storageClass="TaskMetadata",
202 dimensions=(), # Actually set in __init__, according to configuration.
203 doc="Metadata dataset for another task to gather resource usage from.",
204 multiple=True,
205 deferLoad=True,
206 )
208 def __init__(self, *, config):
209 super().__init__(config=config)
210 if "PLACEHOLDER" in self.output_table.name:
211 raise ValueError("Connection configuration for output_table must be overridden.")
212 if "PLACEHOLDER" in self.input_metadata.name:
213 raise ValueError("Connection configuration for input_metadata must be overridden.")
214 # Override the empty dimension set the connection was defined with with
215 # those the task was configured with.
216 self.input_metadata = dataclasses.replace(
217 self.input_metadata,
218 dimensions=list(self.config.dimensions),
219 )
222class GatherResourceUsageConfig(PipelineTaskConfig, pipelineConnections=GatherResourceUsageConnections):
223 """Configuration definitions for `GatherResourceUsageTask`."""
225 dimensions = ListField[str](
226 doc=(
227 "The quantum dimensions for the input metadata connection, and "
228 "the columns (after expansion to include implied dimensions) used "
229 "to identify rows in the output table."
230 ),
231 )
232 memory = Field[bool](
233 doc=(
234 "Whether to extract peak memory usage (maximum resident set size) "
235 "for this task. "
236 "Note that memory usage cannot be further subdivided because only "
237 "a per-process peak is available (and hence if multiple quanta "
238 "are run in one quantum, even per-quantum values may be "
239 "misleading)."
240 ),
241 default=True,
242 )
243 prep_time = Field[bool](
244 doc=(
245 "Whether to extract the CPU time duration for the work the "
246 "middleware does prior to initializing the task (mostly checking "
247 "for input dataset existence)."
248 ),
249 default=False,
250 )
251 init_time = Field[bool](
252 doc=("Whether to extract the CPU time duration for actually constructing the task."),
253 default=True,
254 )
255 run_time = Field[bool](
256 doc=("Whether to extract the CPU time duration for actually executing the task."),
257 default=True,
258 )
259 wall_time = Field[bool](
260 doc=("Whether to extract the wall_time duration for actually executing the task."),
261 default=True,
262 )
263 method_times = ListField[str](
264 doc=(
265 "Names of @lsst.utils.timer.timeMethod-decorated methods for "
266 "which CPU time durations should also be extracted. Use '.' "
267 "separators to refer to subtask methods at arbitrary depth."
268 ),
269 optional=False,
270 default=[],
271 )
272 input_task_label = Field[str](
273 doc=(
274 "Label for the top-level task whose metadata is being processed "
275 "within its own metadata file, if this differs from the prefix of "
276 "connections.input_metadata."
277 ),
278 default=None,
279 optional=True,
280 )
283class GatherResourceUsageTask(PipelineTask):
284 """A `PipelineTask` that gathers resource usage statistics from task
285 metadata.
287 Notes
288 -----
289 This is an unusual `PipelineTask` in that its input connection has
290 dynamic dimensions.
292 Its output table has columns for each of the dimensions of the input
293 metadata's data ID, as well as (subject to configuration):
295 - ``memory``: the maximum resident set size for the entire quantum
296 (in bytes);
297 - ``prep_time``: the time spent in the pre-initialization step in
298 which the middleware checks which of the quantum's inputs are available;
299 - ``init_time``: the time spent in task construction;
300 - ``run_time``: the time spent executing the task's runQuantum
301 method.
302 - ``wall_time`` : elapsed time in the pre-initialization step, in task
303 construction, and in executing the task's runQuantum method.
304 Specifically, this is the difference between `prepUtc`, which triggers
305 as soon as single quantum execution has begun (but can include some
306 checks and running `updatedQuantumInputs`), and `endUtc`, which triggers
307 immediately after `runQuantum`.
308 - ``{method}``: the time spent in a particular task or subtask
309 method decorated with `lsst.utils.timer.timeMethod`.
311 All time durations are CPU times in seconds, and all columns are 64-bit
312 floating point. Methods or steps that did not run are given a duration of
313 zero.
315 It is expected that this task will be configured to run multiple times in
316 most pipelines, often once for each other task in the pipeline.
317 """
319 ConfigClass = GatherResourceUsageConfig
320 _DefaultName = "gatherResourceUsage"
322 def runQuantum(
323 self,
324 butlerQC,
325 inputRefs,
326 outputRefs,
327 ):
328 # Docstring inherited.
329 # This override exists just so we can pass the butler registry's
330 # DimensionUniverse to run in order to standardize the dimensions.
331 inputs = butlerQC.get(inputRefs)
332 outputs = self.run(butlerQC.dimensions, **inputs)
333 butlerQC.put(outputs, outputRefs)
335 def run(self, universe, input_metadata):
336 """Gather resource usage statistics from per-quantum metadata.
338 Parameters
339 ----------
340 universe : `DimensionUniverse`
341 Object managing all dimensions recognized by the butler; used to
342 standardize and expand `GatherResourceUsageConfig.dimensions`.
343 input_metadata : `list` [ `DeferredDatasetHandle` ]
344 List of `lsst.daf.butler.DeferredDatasetHandle` that can be used to
345 load all input metadata datasets.
347 Returns
348 -------
349 result : `Struct`
350 Structure with a single element:
352 - ``outout_table``: a `pandas.DataFrame` that aggregates the
353 configured resource usage statistics.
354 """
355 dimensions = universe.conform(self.config.dimensions)
356 # Transform input list into a dict keyed by data ID.
357 handles_by_data_id = {}
358 for handle in input_metadata:
359 handles_by_data_id[handle.dataId] = handle
360 n_rows = len(handles_by_data_id)
361 # Create a dict of empty column arrays that we'll ultimately make into
362 # a table.
363 columns = {
364 d: np.zeros(n_rows, dtype=_dtype_from_field_spec(universe.dimensions[d].primaryKey))
365 for d in dimensions.names
366 }
367 for attr_name in ("memory", "prep_time", "init_time", "run_time", "wall_time"):
368 if getattr(self.config, attr_name):
369 columns[attr_name] = np.zeros(n_rows, dtype=float)
370 for method_name in self.config.method_times:
371 columns[method_name] = np.zeros(n_rows, dtype=float)
372 # Populate the table, one row at a time.
373 warned_about_metadata_version = False
374 for index, (data_id, handle) in enumerate(handles_by_data_id.items()):
375 # Fill in the data ID columns.
376 for k, v in data_id.mapping.items():
377 columns[k][index] = v
378 # Load the metadata dataset and fill in the columns derived from
379 # it.
380 metadata = handle.get()
381 try:
382 quantum_metadata = metadata["quantum"]
383 except KeyError:
384 self.log.warning(
385 "Metadata dataset %s @ %s has no 'quantum' key.",
386 handle.ref.datasetType.name,
387 handle.dataId,
388 )
389 else:
390 if self.config.memory:
391 columns["memory"][index], warned_about_metadata_version = self._extract_memory(
392 quantum_metadata,
393 handle,
394 warned_about_metadata_version,
395 )
396 for key, value in self._extract_quantum_timing(quantum_metadata).items():
397 columns[key][index] = value
398 for key, value in self._extract_method_timing(metadata, handle).items():
399 columns[key][index] = value
400 return Struct(output_table=pd.DataFrame(columns, copy=False))
402 def _extract_memory(self, quantum_metadata, handle, warned_about_metadata_version):
403 """Extract maximum memory usage from quantum metadata.
405 Parameters
406 ----------
407 quantum_metadata : `lsst.pipe.base.TaskMetadata`
408 The nested metadata associated with the label "quantum" inside a
409 PipelineTask's metadata.
410 handle : `lsst.daf.butler.DeferredDatasetHandle`
411 Butler handle for the metadata dataset; used to identify the
412 metadata in diagnostic messages only.
413 warned_about_metadata_version : `bool`
414 Whether we have already emitted at least one warning about old
415 metadata versions.
417 Returns
418 -------
419 memory : `float`
420 Maximum memory usage in bytes.
421 warned_about_metadata_version : `bool`
422 Whether we have now emitted at least one warning about old
423 metadata versions.
424 """
425 # Attempt to work around memory units being
426 # platform-dependent for metadata written prior to
427 # w.2022.10.
428 memory_multiplier = 1
429 if quantum_metadata.get("__version__", 0) < 1:
430 memory_multiplier = _RUSAGE_MEMORY_MULTIPLIER
431 msg = (
432 "Metadata dataset %s @ %s is too old; guessing memory units by "
433 "assuming the platform has not changed"
434 )
435 if not warned_about_metadata_version:
436 self.log.warning(msg, handle.ref.datasetType.name, handle.dataId)
437 self.log.warning(
438 "Warnings about memory units for other inputs " "will be emitted only at DEBUG level."
439 )
440 warned_about_metadata_version = True
441 else:
442 self.log.debug(msg, handle.ref.datasetType.name, handle.dataId)
443 return (
444 quantum_metadata["endMaxResidentSetSize"] * memory_multiplier,
445 warned_about_metadata_version,
446 )
448 def _extract_quantum_timing(self, quantum_metadata):
449 """Extract timing for standard PipelineTask quantum-execution steps
450 from metadata.
452 Parameters
453 ----------
454 quantum_metadata : `lsst.pipe.base.TaskMetadata`
455 The nested metadata associated with the label "quantum" inside a
456 PipelineTask's metadata.
458 Returns
459 -------
460 timing : `dict` [ `str`, `float` ]
461 CPU times in bytes, for all stages enabled in configuration.
462 """
463 end_time = quantum_metadata["endCpuTime"]
464 times = [
465 quantum_metadata["prepCpuTime"],
466 quantum_metadata.get("initCpuTime", end_time),
467 quantum_metadata.get("startCpuTime", end_time),
468 end_time,
469 ]
471 quantum_timing = {
472 attr_name: end - begin
473 for attr_name, begin, end in zip(
474 ["prep_time", "init_time", "run_time"],
475 times[:-1],
476 times[1:],
477 )
478 if getattr(self.config, attr_name)
479 }
480 if self.config.wall_time:
481 start_wall_time = Time(quantum_metadata["prepUtc"].split("+")[0])
482 end_wall_time = Time(quantum_metadata["endUtc"].split("+")[0])
483 quantum_timing["wall_time"] = (end_wall_time - start_wall_time).sec
485 return quantum_timing
487 def _extract_method_timing(self, metadata, handle):
488 """Extract timing for standard PipelineTask quantum-execution steps
489 from metadata.
491 Parameters
492 ----------
493 quantum_metadata : `lsst.pipe.base.TaskMetadata`
494 The nested metadata associated with the label "quantum" inside a
495 PipelineTask's metadata.
496 handle : `lsst.daf.butler.DeferredDatasetHandle`
497 Butler handle for the metadata dataset; used infer the prefix used
498 for method names within the metadata.
500 Returns
501 -------
502 timing : `dict` [ `str`, `float` ]
503 CPU times in bytes, for all methods enabled in configuration.
504 """
505 if self.config.input_task_label is not None:
506 task_label = self.config.input_task_label
507 else:
508 task_label = handle.ref.datasetType.name[: -len("_metadata")]
509 result = {}
510 for method_name in self.config.method_times:
511 terms = [task_label] + list(method_name.split("."))
512 metadata_method_name = ":".join(terms[:-1]) + "." + terms[-1]
513 try:
514 method_start_time = metadata[f"{metadata_method_name}StartCpuTime"]
515 method_end_time = metadata[f"{metadata_method_name}EndCpuTime"]
516 except KeyError:
517 # A method missing from the metadata is not a problem;
518 # it's reasonable for configuration or even runtime
519 # logic to result in a method not being called. When
520 # that happens, we just let the times stay zero.
521 pass
522 else:
523 result[f"{task_label}.{method_name}"] = method_end_time - method_start_time
524 return result
527def _dtype_from_field_spec(field_spec):
528 """Return the `np.dtype` that can be used to hold the values of a butler
529 dimension field.
531 Parameters
532 ----------
533 field_spec : `lsst.daf.butler.core.ddl.FieldSpec`
534 Object describing the field in a SQL-friendly sense.
536 Returns
537 -------
538 dtype : `np.dtype`
539 Numpy data type description.
540 """
541 python_type = field_spec.getPythonType()
542 if python_type is str:
543 return np.dtype((str, field_spec.length))
544 else:
545 return np.dtype(python_type)
548class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
549 """Custom quantum graph generator and pipeline builder for resource
550 usage summary tasks.
552 Parameters
553 ----------
554 butler : `lsst.daf.butler.Butler`
555 Butler client to query for inputs and dataset types.
556 dataset_type_names : `~collections.abc.Iterable` [ `str` ], optional
557 Iterable of dataset type names or shell-style glob patterns for the
558 metadata datasets to be used as input. Default is all datasets ending
559 with ``_metadata`` (other than the resource-usage summary tasks' own
560 metadata outputs, where are always ignored). A gather-resource task
561 with a single quantum is created for each matching metadata dataset.
562 where : `str`, optional
563 Data ID expression that constrains the input metadata datasets.
564 input_collections : `~collections.abc.Sequence` [ `str` ], optional
565 Sequence of collections to search for inputs. If not provided,
566 ``butler.collections`` is used and must not be empty.
567 output_run : `str`, optional
568 Output `~lsst.daf.butler.CollectionType.RUN` collection name. If not
569 provided, ``butler.run`` is used and must not be `None`.
570 skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional
571 Sequence of collections to search for outputs, allowing quanta whose
572 outputs exist to be skipped.
573 clobber : `bool`, optional
574 Whether *execution* of this quantum graph will permit clobbering. If
575 `False` (default), existing outputs in ``output_run`` are an error
576 unless ``skip_existing_in`` will cause those quanta to be skipped.
578 Notes
579 -----
580 The resource usage summary tasks cannot easily be added to a regular
581 pipeline, as it's much more natural to have the gather tasks run
582 automatically on all *other* tasks. And we can generate a quantum graph
583 for these particular tasks much more efficiently than the general-purpose
584 algorithm could.
585 """
587 def __init__(
588 self,
589 butler: Butler,
590 *,
591 dataset_type_names: Iterable[str] | None = None,
592 where: str = "",
593 input_collections: Sequence[str] | None = None,
594 output_run: str | None = None,
595 skip_existing_in: Sequence[str] = (),
596 clobber: bool = False,
597 ):
598 # Start by querying for metadata datasets, since we'll need to know
599 # which dataset types exist in the input collections in order to
600 # build the pipeline.
601 input_dataset_types: Any
602 if not dataset_type_names:
603 input_dataset_types = "*_metadata"
604 else:
605 input_dataset_types = dataset_type_names
606 pipeline_graph = PipelineGraph()
607 metadata_refs: dict[str, set[DatasetRef]] = {}
608 consolidate_config = ConsolidateResourceUsageConfig()
609 for results in butler.registry.queryDatasets(
610 input_dataset_types,
611 where=where,
612 findFirst=True,
613 collections=input_collections,
614 ).byParentDatasetType():
615 input_metadata_dataset_type = results.parentDatasetType
616 refs_for_type = set(results)
617 if refs_for_type:
618 gather_task_label, gather_dataset_type_name = self._add_gather_task(
619 pipeline_graph, input_metadata_dataset_type
620 )
621 if gather_task_label is None or gather_dataset_type_name is None:
622 continue
623 metadata_refs[gather_task_label] = refs_for_type
624 consolidate_config.input_names.append(gather_dataset_type_name)
625 pipeline_graph.add_task(
626 task_class=ConsolidateResourceUsageTask,
627 config=consolidate_config,
628 label=ConsolidateResourceUsageTask._DefaultName,
629 )
630 # Now that we have the pipeline graph, we can delegate to super.
631 super().__init__(
632 pipeline_graph,
633 butler,
634 input_collections=input_collections,
635 output_run=output_run,
636 skip_existing_in=skip_existing_in,
637 clobber=clobber,
638 )
639 # We've already queried for all of our input datasets, so we don't want
640 # to do that again in process_subgraph, even though that's where most
641 # QG builders do their queries.
642 self.gather_inputs: dict[str, list[DatasetKey]] = {}
643 self.existing_inputs: list[DatasetRef] = []
644 for gather_task_label, gather_input_refs in metadata_refs.items():
645 gather_inputs_for_task: list[DatasetKey] = []
646 for ref in gather_input_refs:
647 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
648 self.existing_inputs.append(ref)
649 gather_inputs_for_task.append(dataset_key)
650 self.gather_inputs[gather_task_label] = gather_inputs_for_task
652 @classmethod
653 def _add_gather_task(
654 cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType
655 ) -> tuple[str | None, str | None]:
656 """Add a single configuration of `GatherResourceUsageTask` to a
657 pipeline graph.
659 Parameters
660 ----------
661 pipeline_graph : `lsst.pipe.base.PipelineGraph`
662 Pipeline graph to modify in-place.
663 input_metadata_dataset_type : `lsst.daf.butler.DatasetType`
664 Dataset type for the task's input dataset, which is the metadata
665 output of the task whose resource usage information is being
666 extracted.
668 Returns
669 -------
670 gather_task_label : `str` or `None`
671 Label of the new task in the pipeline, or `None` if the given
672 dataset type is not a metadata dataset type or is itself a
673 gatherResourceUsage metadata dataset type.
674 gather_dataset_type_name : `str or `None`
675 Name of the task's output table dataset type, or `None` if the
676 given dataset type is not a metadata dataset type or is itself a
677 gatherResourceUsage metadata dataset type.
678 """
679 if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None:
680 return None, None
681 elif "gatherResourceUsage" in input_metadata_dataset_type.name:
682 return None, None
683 else:
684 input_task_label = m.group(1)
685 gather_task_label = f"{input_task_label}_gatherResourceUsage"
686 gather_dataset_type_name = f"{input_task_label}_resource_usage"
687 gather_config = GatherResourceUsageConfig()
688 gather_config.dimensions = input_metadata_dataset_type.dimensions.names
689 gather_config.connections.input_metadata = input_metadata_dataset_type.name
690 gather_config.connections.output_table = gather_dataset_type_name
691 pipeline_graph.add_task(
692 label=gather_task_label,
693 task_class=GatherResourceUsageTask,
694 config=gather_config,
695 )
696 return gather_task_label, gather_dataset_type_name
698 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
699 skeleton = QuantumGraphSkeleton(subgraph.tasks.keys())
700 for ref in self.existing_inputs:
701 skeleton.add_dataset_node(ref.datasetType.name, ref.dataId)
702 skeleton.set_dataset_ref(ref)
703 consolidate_inputs = []
704 for task_node in subgraph.tasks.values():
705 if task_node.task_class is GatherResourceUsageTask:
706 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id)
707 skeleton.add_input_edges(quantum_key, self.gather_inputs[task_node.label])
708 for write_edge in task_node.iter_all_outputs():
709 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
710 assert (
711 output_node.dimensions == self.universe.empty
712 ), "All outputs should have empty dimensions."
713 gather_output_key = skeleton.add_dataset_node(
714 write_edge.parent_dataset_type_name, self.empty_data_id
715 )
716 skeleton.add_output_edge(quantum_key, gather_output_key)
717 if write_edge.connection_name in task_node.outputs:
718 # Not a special output like metadata or log.
719 consolidate_inputs.append(gather_output_key)
720 else:
721 assert task_node.task_class is ConsolidateResourceUsageTask
722 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id)
723 skeleton.add_input_edges(quantum_key, consolidate_inputs)
724 for write_edge in task_node.iter_all_outputs():
725 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
726 assert (
727 output_node.dimensions == self.universe.empty
728 ), "All outputs should have empty dimensions."
729 consolidate_output_key = skeleton.add_dataset_node(
730 write_edge.parent_dataset_type_name, self.empty_data_id
731 )
732 skeleton.add_output_edge(quantum_key, consolidate_output_key)
733 # We don't need to do any follow-up searches for output datasets,
734 # because the outputs all have empty dimensions and the base
735 # QuantumGraphBuilder takes care of those.
736 return skeleton
738 @classmethod
739 def make_argument_parser(cls) -> argparse.ArgumentParser:
740 """Make the argument parser for the command-line interface."""
741 parser = argparse.ArgumentParser(
742 description=(
743 "Build a QuantumGraph that gathers and consolidates "
744 "resource usage tables from existing metadata datasets."
745 ),
746 )
747 parser.add_argument("repo", type=str, help="Path to data repository or butler configuration.")
748 parser.add_argument("filename", type=str, help="Output filename for QuantumGraph.")
749 parser.add_argument(
750 "collections",
751 type=str,
752 nargs="+",
753 help="Collection(s)s to search for input metadata.",
754 )
755 parser.add_argument(
756 "--dataset-types",
757 type=str,
758 action="extend",
759 help="Glob-style patterns for input metadata dataset types.",
760 )
761 parser.add_argument(
762 "--where",
763 type=str,
764 default="",
765 help="Data ID expression used when querying for input metadata datasets.",
766 )
767 parser.add_argument(
768 "--output",
769 type=str,
770 help=(
771 "Name of the output CHAINED collection. If this options is specified and "
772 "--output-run is not, then a new RUN collection will be created by appending "
773 "a timestamp to the value of this option."
774 ),
775 default=None,
776 metavar="COLL",
777 )
778 parser.add_argument(
779 "--output-run",
780 type=str,
781 help=(
782 "Output RUN collection to write resulting images. If not provided "
783 "then --output must be provided and a new RUN collection will be created "
784 "by appending a timestamp to the value passed with --output."
785 ),
786 default=None,
787 metavar="RUN",
788 )
789 return parser
791 @classmethod
792 def main(cls) -> None:
793 """Run the command-line interface for this quantum-graph builder.
795 This function provides the implementation for the
796 ``build-gather-resource-usage-qg`` script.
797 """
798 parser = cls.make_argument_parser()
799 args = parser.parse_args()
800 # Figure out collection names
801 if args.output_run is None:
802 if args.output is None:
803 raise ValueError("At least one of --output or --output-run options is required.")
804 args.output_run = f"{args.output}/{Instrument.makeCollectionTimestamp()}"
806 butler = Butler(args.repo, collections=args.collections)
807 builder = cls(
808 butler,
809 dataset_type_names=args.dataset_types,
810 where=args.where,
811 input_collections=args.collections,
812 output_run=args.output_run,
813 )
814 qg: QuantumGraph = builder.build(
815 # Metadata includes a subset of attributes defined in CmdLineFwk.
816 metadata={
817 "input": args.collections,
818 "butler_argument": args.repo,
819 "output": args.output,
820 "output_run": args.output_run,
821 "data_query": args.where,
822 "time": f"{datetime.datetime.now()}",
823 }
824 )
825 qg.saveUri(args.filename)