Coverage for python / lsst / analysis / tools / tasks / gatherResourceUsage.py: 19%
239 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22__all__ = (
23 "ConsolidateResourceUsageConfig",
24 "ConsolidateResourceUsageConnections",
25 "ConsolidateResourceUsageTask",
26 "GatherResourceUsageConfig",
27 "GatherResourceUsageConnections",
28 "GatherResourceUsageTask",
29 "ResourceUsageQuantumGraphBuilder",
30)
32import argparse
33import dataclasses
34import datetime
35import logging
36import re
37from collections.abc import Iterable, Sequence
38from typing import Any
40import numpy as np
41import pandas as pd
42from astropy.time import Time
43from lsst.daf.butler import Butler, DatasetRef, DatasetType
44from lsst.pex.config import Field, ListField
45from lsst.pipe.base import (
46 Instrument,
47 PipelineTask,
48 PipelineTaskConfig,
49 PipelineTaskConnections,
50 QuantumGraph,
51 Struct,
52)
53from lsst.pipe.base import connectionTypes as cT
54from lsst.pipe.base.pipeline_graph import PipelineGraph
55from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder
56from lsst.pipe.base.quantum_graph_skeleton import DatasetKey, QuantumGraphSkeleton
58# It's not great to be importing a private symbol, but this is a temporary
59# workaround for the fact that prior to w.2022.10, the units for memory values
60# written in task metadata were platform-dependent. Once we no longer care
61# about older runs, this import and the code that uses it can be removed.
62from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER
64_LOG = logging.getLogger(__name__)
67class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()):
68 """Connection definitions for `ConsolidateResourceUsageTask`."""
70 output_table = cT.Output(
71 name="ResourceUsageSummary",
72 storageClass="DataFrame",
73 dimensions=(),
74 doc="Consolidated table of resource usage statistics. One row per task label",
75 )
77 def __init__(self, *, config):
78 super().__init__(config=config)
79 for name in self.config.input_names:
80 setattr(
81 self,
82 name,
83 cT.Input(
84 name,
85 storageClass="DataFrame",
86 dimensions=(),
87 doc="Resource usage statistics for a task.",
88 ),
89 )
90 self.inputs.add(name)
93class ConsolidateResourceUsageConfig(
94 PipelineTaskConfig, pipelineConnections=ConsolidateResourceUsageConnections
95):
96 """Configuration definitions for `ConsolidateResourceUsageTask`."""
98 input_names = ListField[str](
99 doc="Input resource usage dataset type names",
100 default=[],
101 )
104class ConsolidateResourceUsageTask(PipelineTask):
105 """A `PipelineTask` that summarizes task resource usage into a single
106 table with per-task rows.
108 Notes
109 -----
110 This is an unusual `PipelineTask` in that its input connection has
111 dynamic dimensions, and its quanta are generally built via a custom
112 quantum-graph builder defined in the same module.
113 """
115 ConfigClass = ConsolidateResourceUsageConfig
116 _DefaultName = "consolidateResourceUsage"
118 def run(self, **kwargs: Any) -> Struct:
119 quantiles = []
120 for input_name, ru_table in kwargs.items():
121 if not input_name.endswith("resource_usage"):
122 continue
123 else:
124 df = ru_table.quantile(
125 [0.0, 0.01, 0.05, 0.32, 0.50, 0.68, 0.95, 0.99, 1.0],
126 numeric_only=True,
127 ).reset_index()
128 df["task"] = input_name.replace("_resource_usage", "")
129 df["quanta"] = len(ru_table)
130 df["integrated_runtime"] = ru_table["run_time"].sum()
132 quantiles.append(
133 df[
134 [
135 "index",
136 "quanta",
137 "task",
138 "memory",
139 "init_time",
140 "run_time",
141 "wall_time",
142 "integrated_runtime",
143 ]
144 ]
145 )
147 full_quantiles = pd.concat(quantiles)
148 full_quantiles["percentile"] = (full_quantiles["index"] * 100).astype(int)
149 full_quantiles["percentile_name"] = "p" + full_quantiles["percentile"].astype(str).str.zfill(3)
150 full_quantiles["memoryGB"] = full_quantiles["memory"] / 1024 / 1024 / 1024
151 full_quantiles["integrated_runtime_hrs"] = full_quantiles["integrated_runtime"] / 3600.0
152 memoryGB = pd.pivot_table(
153 full_quantiles, values="memoryGB", columns=["percentile_name"], index=["task"]
154 ).add_prefix("mem_GB_")
155 runtime = pd.pivot_table(
156 full_quantiles, values="run_time", columns=["percentile_name"], index=["task"]
157 ).add_prefix("runtime_s_")
158 walltime = pd.pivot_table(
159 full_quantiles, values="wall_time", columns=["percentile_name"], index=["task"]
160 ).add_prefix("walltime_s_")
161 memrun = pd.merge(
162 pd.merge(
163 memoryGB.reset_index(),
164 runtime.reset_index(),
165 left_on="task",
166 right_on="task",
167 ),
168 walltime.reset_index(),
169 left_on="task",
170 right_on="task",
171 )
172 memrun = pd.merge(
173 full_quantiles[["task", "quanta", "integrated_runtime_hrs"]]
174 .drop_duplicates()
175 .sort_values("task"),
176 memrun,
177 )
179 return Struct(output_table=memrun)
182class GatherResourceUsageConnections(
183 PipelineTaskConnections, dimensions=(), defaultTemplates={"input_task_label": "PLACEHOLDER"}
184):
185 """Connection definitions for `GatherResourceUsageTask`."""
187 output_table = cT.Output(
188 "{input_task_label}_resource_statistics", # Should always be overridden.
189 storageClass="DataFrame",
190 dimensions=(),
191 doc=(
192 "Table that aggregates memory and CPU usage statistics from one "
193 "or more tasks. "
194 "This will have one row for each data ID, with columns for each "
195 "task or method's memory usage and runtime."
196 ),
197 )
198 input_metadata = cT.Input(
199 "{input_task_label}_metadata", # Should always be overridden.
200 storageClass="TaskMetadata",
201 dimensions=(), # Actually set in __init__, according to configuration.
202 doc="Metadata dataset for another task to gather resource usage from.",
203 multiple=True,
204 deferLoad=True,
205 )
207 def __init__(self, *, config):
208 super().__init__(config=config)
209 if "PLACEHOLDER" in self.output_table.name:
210 raise ValueError("Connection configuration for output_table must be overridden.")
211 if "PLACEHOLDER" in self.input_metadata.name:
212 raise ValueError("Connection configuration for input_metadata must be overridden.")
213 # Override the empty dimension set the connection was defined with with
214 # those the task was configured with.
215 self.input_metadata = dataclasses.replace(
216 self.input_metadata,
217 dimensions=list(self.config.dimensions),
218 )
221class GatherResourceUsageConfig(PipelineTaskConfig, pipelineConnections=GatherResourceUsageConnections):
222 """Configuration definitions for `GatherResourceUsageTask`."""
224 dimensions = ListField[str](
225 doc=(
226 "The quantum dimensions for the input metadata connection, and "
227 "the columns (after expansion to include implied dimensions) used "
228 "to identify rows in the output table."
229 ),
230 )
231 memory = Field[bool](
232 doc=(
233 "Whether to extract peak memory usage (maximum resident set size) "
234 "for this task. "
235 "Note that memory usage cannot be further subdivided because only "
236 "a per-process peak is available (and hence if multiple quanta "
237 "are run in one quantum, even per-quantum values may be "
238 "misleading)."
239 ),
240 default=True,
241 )
242 prep_time = Field[bool](
243 doc=(
244 "Whether to extract the CPU time duration for the work the "
245 "middleware does prior to initializing the task (mostly checking "
246 "for input dataset existence)."
247 ),
248 default=False,
249 )
250 init_time = Field[bool](
251 doc=("Whether to extract the CPU time duration for actually constructing the task."),
252 default=True,
253 )
254 run_time = Field[bool](
255 doc=("Whether to extract the CPU time duration for actually executing the task."),
256 default=True,
257 )
258 wall_time = Field[bool](
259 doc=("Whether to extract the wall_time duration for actually executing the task."),
260 default=True,
261 )
262 method_times = ListField[str](
263 doc=(
264 "Names of @lsst.utils.timer.timeMethod-decorated methods for "
265 "which CPU time durations should also be extracted. Use '.' "
266 "separators to refer to subtask methods at arbitrary depth."
267 ),
268 optional=False,
269 default=[],
270 )
271 input_task_label = Field[str](
272 doc=(
273 "Label for the top-level task whose metadata is being processed "
274 "within its own metadata file, if this differs from the prefix of "
275 "connections.input_metadata."
276 ),
277 default=None,
278 optional=True,
279 )
282class GatherResourceUsageTask(PipelineTask):
283 """A `PipelineTask` that gathers resource usage statistics from task
284 metadata.
286 Notes
287 -----
288 This is an unusual `PipelineTask` in that its input connection has
289 dynamic dimensions.
291 Its output table has columns for each of the dimensions of the input
292 metadata's data ID, as well as (subject to configuration):
294 - ``memory``: the maximum resident set size for the entire quantum
295 (in bytes);
296 - ``prep_time``: the time spent in the pre-initialization step in
297 which the middleware checks which of the quantum's inputs are available;
298 - ``init_time``: the time spent in task construction;
299 - ``run_time``: the time spent executing the task's runQuantum
300 method.
301 - ``wall_time`` : elapsed time in the pre-initialization step, in task
302 construction, and in executing the task's runQuantum method.
303 Specifically, this is the difference between `prepUtc`, which triggers
304 as soon as single quantum execution has begun (but can include some
305 checks and running `updatedQuantumInputs`), and `endUtc`, which triggers
306 immediately after `runQuantum`.
307 - ``{method}``: the time spent in a particular task or subtask
308 method decorated with `lsst.utils.timer.timeMethod`.
310 All time durations are CPU times in seconds, and all columns are 64-bit
311 floating point. Methods or steps that did not run are given a duration of
312 zero.
314 It is expected that this task will be configured to run multiple times in
315 most pipelines, often once for each other task in the pipeline.
316 """
318 ConfigClass = GatherResourceUsageConfig
319 _DefaultName = "gatherResourceUsage"
321 def runQuantum(
322 self,
323 butlerQC,
324 inputRefs,
325 outputRefs,
326 ):
327 # Docstring inherited.
328 # This override exists just so we can pass the butler registry's
329 # DimensionUniverse to run in order to standardize the dimensions.
330 inputs = butlerQC.get(inputRefs)
331 outputs = self.run(butlerQC.dimensions, **inputs)
332 butlerQC.put(outputs, outputRefs)
334 def run(self, universe, input_metadata):
335 """Gather resource usage statistics from per-quantum metadata.
337 Parameters
338 ----------
339 universe : `DimensionUniverse`
340 Object managing all dimensions recognized by the butler; used to
341 standardize and expand `GatherResourceUsageConfig.dimensions`.
342 input_metadata : `list` [ `DeferredDatasetHandle` ]
343 List of `lsst.daf.butler.DeferredDatasetHandle` that can be used to
344 load all input metadata datasets.
346 Returns
347 -------
348 result : `Struct`
349 Structure with a single element:
351 - ``outout_table``: a `pandas.DataFrame` that aggregates the
352 configured resource usage statistics.
353 """
354 dimensions = universe.conform(self.config.dimensions)
355 # Transform input list into a dict keyed by data ID.
356 handles_by_data_id = {}
357 for handle in input_metadata:
358 handles_by_data_id[handle.dataId] = handle
359 n_rows = len(handles_by_data_id)
360 # Create a dict of empty column arrays that we'll ultimately make into
361 # a table.
362 columns = {
363 d: np.zeros(n_rows, dtype=_dtype_from_field_spec(universe.dimensions[d].primaryKey))
364 for d in dimensions.names
365 }
366 for attr_name in ("memory", "prep_time", "init_time", "run_time", "wall_time"):
367 if getattr(self.config, attr_name):
368 columns[attr_name] = np.zeros(n_rows, dtype=float)
369 for method_name in self.config.method_times:
370 columns[method_name] = np.zeros(n_rows, dtype=float)
371 # Populate the table, one row at a time.
372 warned_about_metadata_version = False
373 for index, (data_id, handle) in enumerate(handles_by_data_id.items()):
374 # Fill in the data ID columns.
375 for k, v in data_id.mapping.items():
376 columns[k][index] = v
377 # Load the metadata dataset and fill in the columns derived from
378 # it.
379 metadata = handle.get()
380 try:
381 quantum_metadata = metadata["quantum"]
382 except KeyError:
383 self.log.warning(
384 "Metadata dataset %s @ %s has no 'quantum' key.",
385 handle.ref.datasetType.name,
386 handle.dataId,
387 )
388 else:
389 if self.config.memory:
390 columns["memory"][index], warned_about_metadata_version = self._extract_memory(
391 quantum_metadata,
392 handle,
393 warned_about_metadata_version,
394 )
395 for key, value in self._extract_quantum_timing(quantum_metadata).items():
396 columns[key][index] = value
397 for key, value in self._extract_method_timing(metadata, handle).items():
398 columns[key][index] = value
399 return Struct(output_table=pd.DataFrame(columns, copy=False))
401 def _extract_memory(self, quantum_metadata, handle, warned_about_metadata_version):
402 """Extract maximum memory usage from quantum metadata.
404 Parameters
405 ----------
406 quantum_metadata : `lsst.pipe.base.TaskMetadata`
407 The nested metadata associated with the label "quantum" inside a
408 PipelineTask's metadata.
409 handle : `lsst.daf.butler.DeferredDatasetHandle`
410 Butler handle for the metadata dataset; used to identify the
411 metadata in diagnostic messages only.
412 warned_about_metadata_version : `bool`
413 Whether we have already emitted at least one warning about old
414 metadata versions.
416 Returns
417 -------
418 memory : `float`
419 Maximum memory usage in bytes.
420 warned_about_metadata_version : `bool`
421 Whether we have now emitted at least one warning about old
422 metadata versions.
423 """
424 # Attempt to work around memory units being
425 # platform-dependent for metadata written prior to
426 # w.2022.10.
427 memory_multiplier = 1
428 if quantum_metadata.get("__version__", 0) < 1:
429 memory_multiplier = _RUSAGE_MEMORY_MULTIPLIER
430 msg = (
431 "Metadata dataset %s @ %s is too old; guessing memory units by "
432 "assuming the platform has not changed"
433 )
434 if not warned_about_metadata_version:
435 self.log.warning(msg, handle.ref.datasetType.name, handle.dataId)
436 self.log.warning(
437 "Warnings about memory units for other inputs " "will be emitted only at DEBUG level."
438 )
439 warned_about_metadata_version = True
440 else:
441 self.log.debug(msg, handle.ref.datasetType.name, handle.dataId)
442 return (
443 quantum_metadata["endMaxResidentSetSize"] * memory_multiplier,
444 warned_about_metadata_version,
445 )
447 def _extract_quantum_timing(self, quantum_metadata):
448 """Extract timing for standard PipelineTask quantum-execution steps
449 from metadata.
451 Parameters
452 ----------
453 quantum_metadata : `lsst.pipe.base.TaskMetadata`
454 The nested metadata associated with the label "quantum" inside a
455 PipelineTask's metadata.
457 Returns
458 -------
459 timing : `dict` [ `str`, `float` ]
460 CPU times in bytes, for all stages enabled in configuration.
461 """
462 end_time = quantum_metadata["endCpuTime"]
463 times = [
464 quantum_metadata["prepCpuTime"],
465 quantum_metadata.get("initCpuTime", end_time),
466 quantum_metadata.get("startCpuTime", end_time),
467 end_time,
468 ]
470 quantum_timing = {
471 attr_name: end - begin
472 for attr_name, begin, end in zip(
473 ["prep_time", "init_time", "run_time"],
474 times[:-1],
475 times[1:],
476 )
477 if getattr(self.config, attr_name)
478 }
479 if self.config.wall_time:
480 start_wall_time = Time(quantum_metadata["prepUtc"].split("+")[0])
481 end_wall_time = Time(quantum_metadata["endUtc"].split("+")[0])
482 quantum_timing["wall_time"] = (end_wall_time - start_wall_time).sec
484 return quantum_timing
486 def _extract_method_timing(self, metadata, handle):
487 """Extract timing for standard PipelineTask quantum-execution steps
488 from metadata.
490 Parameters
491 ----------
492 quantum_metadata : `lsst.pipe.base.TaskMetadata`
493 The nested metadata associated with the label "quantum" inside a
494 PipelineTask's metadata.
495 handle : `lsst.daf.butler.DeferredDatasetHandle`
496 Butler handle for the metadata dataset; used infer the prefix used
497 for method names within the metadata.
499 Returns
500 -------
501 timing : `dict` [ `str`, `float` ]
502 CPU times in bytes, for all methods enabled in configuration.
503 """
504 if self.config.input_task_label is not None:
505 task_label = self.config.input_task_label
506 else:
507 task_label = handle.ref.datasetType.name[: -len("_metadata")]
508 result = {}
509 for method_name in self.config.method_times:
510 terms = [task_label] + list(method_name.split("."))
511 metadata_method_name = ":".join(terms[:-1]) + "." + terms[-1]
512 try:
513 method_start_time = metadata[f"{metadata_method_name}StartCpuTime"]
514 method_end_time = metadata[f"{metadata_method_name}EndCpuTime"]
515 except KeyError:
516 # A method missing from the metadata is not a problem;
517 # it's reasonable for configuration or even runtime
518 # logic to result in a method not being called. When
519 # that happens, we just let the times stay zero.
520 pass
521 else:
522 result[f"{task_label}.{method_name}"] = method_end_time - method_start_time
523 return result
526def _dtype_from_field_spec(field_spec):
527 """Return the `np.dtype` that can be used to hold the values of a butler
528 dimension field.
530 Parameters
531 ----------
532 field_spec : `lsst.daf.butler.core.ddl.FieldSpec`
533 Object describing the field in a SQL-friendly sense.
535 Returns
536 -------
537 dtype : `np.dtype`
538 Numpy data type description.
539 """
540 python_type = field_spec.getPythonType()
541 if python_type is str:
542 return np.dtype((str, field_spec.length))
543 else:
544 return np.dtype(python_type)
547class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
548 """Custom quantum graph generator and pipeline builder for resource
549 usage summary tasks.
551 Parameters
552 ----------
553 butler : `lsst.daf.butler.Butler`
554 Butler client to query for inputs and dataset types.
555 dataset_type_names : `~collections.abc.Iterable` [ `str` ], optional
556 Iterable of dataset type names or shell-style glob patterns for the
557 metadata datasets to be used as input. Default is all datasets ending
558 with ``_metadata`` (other than the resource-usage summary tasks' own
559 metadata outputs, where are always ignored). A gather-resource task
560 with a single quantum is created for each matching metadata dataset.
561 where : `str`, optional
562 Data ID expression that constrains the input metadata datasets.
563 input_collections : `~collections.abc.Sequence` [ `str` ], optional
564 Sequence of collections to search for inputs. If not provided,
565 ``butler.collections`` is used and must not be empty.
566 output_run : `str`, optional
567 Output `~lsst.daf.butler.CollectionType.RUN` collection name. If not
568 provided, ``butler.run`` is used and must not be `None`.
569 skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional
570 Sequence of collections to search for outputs, allowing quanta whose
571 outputs exist to be skipped.
572 clobber : `bool`, optional
573 Whether *execution* of this quantum graph will permit clobbering. If
574 `False` (default), existing outputs in ``output_run`` are an error
575 unless ``skip_existing_in`` will cause those quanta to be skipped.
577 Notes
578 -----
579 The resource usage summary tasks cannot easily be added to a regular
580 pipeline, as it's much more natural to have the gather tasks run
581 automatically on all *other* tasks. And we can generate a quantum graph
582 for these particular tasks much more efficiently than the general-purpose
583 algorithm could.
584 """
586 def __init__(
587 self,
588 butler: Butler,
589 *,
590 dataset_type_names: Iterable[str] | None = None,
591 where: str = "",
592 input_collections: Sequence[str] | None = None,
593 output_run: str | None = None,
594 skip_existing_in: Sequence[str] = (),
595 clobber: bool = False,
596 ):
597 # Start by querying for metadata datasets, since we'll need to know
598 # which dataset types exist in the input collections in order to
599 # build the pipeline.
600 input_dataset_types: Any
601 if not dataset_type_names:
602 input_dataset_types = "*_metadata"
603 else:
604 input_dataset_types = dataset_type_names
605 pipeline_graph = PipelineGraph()
606 metadata_refs: dict[str, set[DatasetRef]] = {}
607 consolidate_config = ConsolidateResourceUsageConfig()
608 for results in butler.registry.queryDatasets(
609 input_dataset_types,
610 where=where,
611 findFirst=True,
612 collections=input_collections,
613 ).byParentDatasetType():
614 input_metadata_dataset_type = results.parentDatasetType
615 refs_for_type = set(results)
616 if refs_for_type:
617 gather_task_label, gather_dataset_type_name = self._add_gather_task(
618 pipeline_graph, input_metadata_dataset_type
619 )
620 if gather_task_label is None or gather_dataset_type_name is None:
621 continue
622 metadata_refs[gather_task_label] = refs_for_type
623 consolidate_config.input_names.append(gather_dataset_type_name)
624 pipeline_graph.add_task(
625 task_class=ConsolidateResourceUsageTask,
626 config=consolidate_config,
627 label=ConsolidateResourceUsageTask._DefaultName,
628 )
629 # Now that we have the pipeline graph, we can delegate to super.
630 super().__init__(
631 pipeline_graph,
632 butler,
633 input_collections=input_collections,
634 output_run=output_run,
635 skip_existing_in=skip_existing_in,
636 clobber=clobber,
637 )
638 # We've already queried for all of our input datasets, so we don't want
639 # to do that again in process_subgraph, even though that's where most
640 # QG builders do their queries.
641 self.gather_inputs: dict[str, list[DatasetKey]] = {}
642 self.existing_inputs: list[DatasetRef] = []
643 for gather_task_label, gather_input_refs in metadata_refs.items():
644 gather_inputs_for_task: list[DatasetKey] = []
645 for ref in gather_input_refs:
646 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
647 self.existing_inputs.append(ref)
648 gather_inputs_for_task.append(dataset_key)
649 self.gather_inputs[gather_task_label] = gather_inputs_for_task
651 @classmethod
652 def _add_gather_task(
653 cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType
654 ) -> tuple[str | None, str | None]:
655 """Add a single configuration of `GatherResourceUsageTask` to a
656 pipeline graph.
658 Parameters
659 ----------
660 pipeline_graph : `lsst.pipe.base.PipelineGraph`
661 Pipeline graph to modify in-place.
662 input_metadata_dataset_type : `lsst.daf.butler.DatasetType`
663 Dataset type for the task's input dataset, which is the metadata
664 output of the task whose resource usage information is being
665 extracted.
667 Returns
668 -------
669 gather_task_label : `str` or `None`
670 Label of the new task in the pipeline, or `None` if the given
671 dataset type is not a metadata dataset type or is itself a
672 gatherResourceUsage metadata dataset type.
673 gather_dataset_type_name : `str or `None`
674 Name of the task's output table dataset type, or `None` if the
675 given dataset type is not a metadata dataset type or is itself a
676 gatherResourceUsage metadata dataset type.
677 """
678 if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None:
679 return None, None
680 elif "gatherResourceUsage" in input_metadata_dataset_type.name:
681 return None, None
682 else:
683 input_task_label = m.group(1)
684 gather_task_label = f"{input_task_label}_gatherResourceUsage"
685 gather_dataset_type_name = f"{input_task_label}_resource_usage"
686 gather_config = GatherResourceUsageConfig()
687 gather_config.dimensions = input_metadata_dataset_type.dimensions.names
688 gather_config.connections.input_metadata = input_metadata_dataset_type.name
689 gather_config.connections.output_table = gather_dataset_type_name
690 pipeline_graph.add_task(
691 label=gather_task_label,
692 task_class=GatherResourceUsageTask,
693 config=gather_config,
694 )
695 return gather_task_label, gather_dataset_type_name
697 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
698 skeleton = QuantumGraphSkeleton(subgraph.tasks.keys())
699 for ref in self.existing_inputs:
700 skeleton.add_dataset_node(ref.datasetType.name, ref.dataId)
701 skeleton.set_dataset_ref(ref)
702 consolidate_inputs = []
703 for task_node in subgraph.tasks.values():
704 if task_node.task_class is GatherResourceUsageTask:
705 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id)
706 skeleton.add_input_edges(quantum_key, self.gather_inputs[task_node.label])
707 for write_edge in task_node.iter_all_outputs():
708 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
709 assert (
710 output_node.dimensions == self.universe.empty
711 ), "All outputs should have empty dimensions."
712 gather_output_key = skeleton.add_dataset_node(
713 write_edge.parent_dataset_type_name, self.empty_data_id
714 )
715 skeleton.add_output_edge(quantum_key, gather_output_key)
716 if write_edge.connection_name in task_node.outputs:
717 # Not a special output like metadata or log.
718 consolidate_inputs.append(gather_output_key)
719 else:
720 assert task_node.task_class is ConsolidateResourceUsageTask
721 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id)
722 skeleton.add_input_edges(quantum_key, consolidate_inputs)
723 for write_edge in task_node.iter_all_outputs():
724 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name]
725 assert (
726 output_node.dimensions == self.universe.empty
727 ), "All outputs should have empty dimensions."
728 consolidate_output_key = skeleton.add_dataset_node(
729 write_edge.parent_dataset_type_name, self.empty_data_id
730 )
731 skeleton.add_output_edge(quantum_key, consolidate_output_key)
732 # We don't need to do any follow-up searches for output datasets,
733 # because the outputs all have empty dimensions and the base
734 # QuantumGraphBuilder takes care of those.
735 return skeleton
737 @classmethod
738 def make_argument_parser(cls) -> argparse.ArgumentParser:
739 """Make the argument parser for the command-line interface."""
740 parser = argparse.ArgumentParser(
741 description=(
742 "Build a QuantumGraph that gathers and consolidates "
743 "resource usage tables from existing metadata datasets."
744 ),
745 )
746 parser.add_argument("repo", type=str, help="Path to data repository or butler configuration.")
747 parser.add_argument("filename", type=str, help="Output filename for QuantumGraph.")
748 parser.add_argument(
749 "collections",
750 type=str,
751 nargs="+",
752 help="Collection(s)s to search for input metadata.",
753 )
754 parser.add_argument(
755 "--dataset-types",
756 type=str,
757 action="extend",
758 help="Glob-style patterns for input metadata dataset types.",
759 )
760 parser.add_argument(
761 "--where",
762 type=str,
763 default="",
764 help="Data ID expression used when querying for input metadata datasets.",
765 )
766 parser.add_argument(
767 "--output",
768 type=str,
769 help=(
770 "Name of the output CHAINED collection. If this options is specified and "
771 "--output-run is not, then a new RUN collection will be created by appending "
772 "a timestamp to the value of this option."
773 ),
774 default=None,
775 metavar="COLL",
776 )
777 parser.add_argument(
778 "--output-run",
779 type=str,
780 help=(
781 "Output RUN collection to write resulting images. If not provided "
782 "then --output must be provided and a new RUN collection will be created "
783 "by appending a timestamp to the value passed with --output."
784 ),
785 default=None,
786 metavar="RUN",
787 )
788 return parser
790 @classmethod
791 def main(cls) -> None:
792 """Run the command-line interface for this quantum-graph builder.
794 This function provides the implementation for the
795 ``build-gather-resource-usage-qg`` script.
796 """
797 parser = cls.make_argument_parser()
798 args = parser.parse_args()
799 # Figure out collection names
800 if args.output_run is None:
801 if args.output is None:
802 raise ValueError("At least one of --output or --output-run options is required.")
803 args.output_run = "{}/{}".format(args.output, Instrument.makeCollectionTimestamp())
805 butler = Butler(args.repo, collections=args.collections)
806 builder = cls(
807 butler,
808 dataset_type_names=args.dataset_types,
809 where=args.where,
810 input_collections=args.collections,
811 output_run=args.output_run,
812 )
813 qg: QuantumGraph = builder.build(
814 # Metadata includes a subset of attributes defined in CmdLineFwk.
815 metadata={
816 "input": args.collections,
817 "butler_argument": args.repo,
818 "output": args.output,
819 "output_run": args.output_run,
820 "data_query": args.where,
821 "time": f"{datetime.datetime.now()}",
822 }
823 )
824 qg.saveUri(args.filename)