Coverage for python / lsst / analysis / tools / tasks / gatherResourceUsage.py: 19%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 09:07 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22__all__ = ( 

23 "ConsolidateResourceUsageConfig", 

24 "ConsolidateResourceUsageConnections", 

25 "ConsolidateResourceUsageTask", 

26 "GatherResourceUsageConfig", 

27 "GatherResourceUsageConnections", 

28 "GatherResourceUsageTask", 

29 "ResourceUsageQuantumGraphBuilder", 

30) 

31 

32import argparse 

33import dataclasses 

34import datetime 

35import logging 

36import re 

37from collections.abc import Iterable, Sequence 

38from typing import Any 

39 

40import numpy as np 

41import pandas as pd 

42from astropy.time import Time 

43 

44from lsst.daf.butler import Butler, DatasetRef, DatasetType 

45from lsst.pex.config import Field, ListField 

46from lsst.pipe.base import ( 

47 Instrument, 

48 PipelineTask, 

49 PipelineTaskConfig, 

50 PipelineTaskConnections, 

51 QuantumGraph, 

52 Struct, 

53) 

54from lsst.pipe.base import connectionTypes as cT 

55from lsst.pipe.base.pipeline_graph import PipelineGraph 

56from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder 

57from lsst.pipe.base.quantum_graph_skeleton import DatasetKey, QuantumGraphSkeleton 

58 

59# It's not great to be importing a private symbol, but this is a temporary 

60# workaround for the fact that prior to w.2022.10, the units for memory values 

61# written in task metadata were platform-dependent. Once we no longer care 

62# about older runs, this import and the code that uses it can be removed. 

63from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER 

64 

65_LOG = logging.getLogger(__name__) 

66 

67 

68class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()): 

69 """Connection definitions for `ConsolidateResourceUsageTask`.""" 

70 

71 output_table = cT.Output( 

72 name="ResourceUsageSummary", 

73 storageClass="DataFrame", 

74 dimensions=(), 

75 doc="Consolidated table of resource usage statistics. One row per task label", 

76 ) 

77 

78 def __init__(self, *, config): 

79 super().__init__(config=config) 

80 for name in self.config.input_names: 

81 setattr( 

82 self, 

83 name, 

84 cT.Input( 

85 name, 

86 storageClass="DataFrame", 

87 dimensions=(), 

88 doc="Resource usage statistics for a task.", 

89 ), 

90 ) 

91 self.inputs.add(name) 

92 

93 

94class ConsolidateResourceUsageConfig( 

95 PipelineTaskConfig, pipelineConnections=ConsolidateResourceUsageConnections 

96): 

97 """Configuration definitions for `ConsolidateResourceUsageTask`.""" 

98 

99 input_names = ListField[str]( 

100 doc="Input resource usage dataset type names", 

101 default=[], 

102 ) 

103 

104 

105class ConsolidateResourceUsageTask(PipelineTask): 

106 """A `PipelineTask` that summarizes task resource usage into a single 

107 table with per-task rows. 

108 

109 Notes 

110 ----- 

111 This is an unusual `PipelineTask` in that its input connection has 

112 dynamic dimensions, and its quanta are generally built via a custom 

113 quantum-graph builder defined in the same module. 

114 """ 

115 

116 ConfigClass = ConsolidateResourceUsageConfig 

117 _DefaultName = "consolidateResourceUsage" 

118 

119 def run(self, **kwargs: Any) -> Struct: 

120 quantiles = [] 

121 for input_name, ru_table in kwargs.items(): 

122 if not input_name.endswith("resource_usage"): 

123 continue 

124 else: 

125 df = ru_table.quantile( 

126 [0.0, 0.01, 0.05, 0.32, 0.50, 0.68, 0.95, 0.99, 1.0], 

127 numeric_only=True, 

128 ).reset_index() 

129 df["task"] = input_name.replace("_resource_usage", "") 

130 df["quanta"] = len(ru_table) 

131 df["integrated_runtime"] = ru_table["run_time"].sum() 

132 

133 quantiles.append( 

134 df[ 

135 [ 

136 "index", 

137 "quanta", 

138 "task", 

139 "memory", 

140 "init_time", 

141 "run_time", 

142 "wall_time", 

143 "integrated_runtime", 

144 ] 

145 ] 

146 ) 

147 

148 full_quantiles = pd.concat(quantiles) 

149 full_quantiles["percentile"] = (full_quantiles["index"] * 100).astype(int) 

150 full_quantiles["percentile_name"] = "p" + full_quantiles["percentile"].astype(str).str.zfill(3) 

151 full_quantiles["memoryGB"] = full_quantiles["memory"] / 1024 / 1024 / 1024 

152 full_quantiles["integrated_runtime_hrs"] = full_quantiles["integrated_runtime"] / 3600.0 

153 memoryGB = pd.pivot_table( 

154 full_quantiles, values="memoryGB", columns=["percentile_name"], index=["task"] 

155 ).add_prefix("mem_GB_") 

156 runtime = pd.pivot_table( 

157 full_quantiles, values="run_time", columns=["percentile_name"], index=["task"] 

158 ).add_prefix("runtime_s_") 

159 walltime = pd.pivot_table( 

160 full_quantiles, values="wall_time", columns=["percentile_name"], index=["task"] 

161 ).add_prefix("walltime_s_") 

162 memrun = pd.merge( 

163 pd.merge( 

164 memoryGB.reset_index(), 

165 runtime.reset_index(), 

166 left_on="task", 

167 right_on="task", 

168 ), 

169 walltime.reset_index(), 

170 left_on="task", 

171 right_on="task", 

172 ) 

173 memrun = pd.merge( 

174 full_quantiles[["task", "quanta", "integrated_runtime_hrs"]] 

175 .drop_duplicates() 

176 .sort_values("task"), 

177 memrun, 

178 ) 

179 

180 return Struct(output_table=memrun) 

181 

182 

183class GatherResourceUsageConnections( 

184 PipelineTaskConnections, dimensions=(), defaultTemplates={"input_task_label": "PLACEHOLDER"} 

185): 

186 """Connection definitions for `GatherResourceUsageTask`.""" 

187 

188 output_table = cT.Output( 

189 "{input_task_label}_resource_statistics", # Should always be overridden. 

190 storageClass="DataFrame", 

191 dimensions=(), 

192 doc=( 

193 "Table that aggregates memory and CPU usage statistics from one " 

194 "or more tasks. " 

195 "This will have one row for each data ID, with columns for each " 

196 "task or method's memory usage and runtime." 

197 ), 

198 ) 

199 input_metadata = cT.Input( 

200 "{input_task_label}_metadata", # Should always be overridden. 

201 storageClass="TaskMetadata", 

202 dimensions=(), # Actually set in __init__, according to configuration. 

203 doc="Metadata dataset for another task to gather resource usage from.", 

204 multiple=True, 

205 deferLoad=True, 

206 ) 

207 

208 def __init__(self, *, config): 

209 super().__init__(config=config) 

210 if "PLACEHOLDER" in self.output_table.name: 

211 raise ValueError("Connection configuration for output_table must be overridden.") 

212 if "PLACEHOLDER" in self.input_metadata.name: 

213 raise ValueError("Connection configuration for input_metadata must be overridden.") 

214 # Override the empty dimension set the connection was defined with with 

215 # those the task was configured with. 

216 self.input_metadata = dataclasses.replace( 

217 self.input_metadata, 

218 dimensions=list(self.config.dimensions), 

219 ) 

220 

221 

222class GatherResourceUsageConfig(PipelineTaskConfig, pipelineConnections=GatherResourceUsageConnections): 

223 """Configuration definitions for `GatherResourceUsageTask`.""" 

224 

225 dimensions = ListField[str]( 

226 doc=( 

227 "The quantum dimensions for the input metadata connection, and " 

228 "the columns (after expansion to include implied dimensions) used " 

229 "to identify rows in the output table." 

230 ), 

231 ) 

232 memory = Field[bool]( 

233 doc=( 

234 "Whether to extract peak memory usage (maximum resident set size) " 

235 "for this task. " 

236 "Note that memory usage cannot be further subdivided because only " 

237 "a per-process peak is available (and hence if multiple quanta " 

238 "are run in one quantum, even per-quantum values may be " 

239 "misleading)." 

240 ), 

241 default=True, 

242 ) 

243 prep_time = Field[bool]( 

244 doc=( 

245 "Whether to extract the CPU time duration for the work the " 

246 "middleware does prior to initializing the task (mostly checking " 

247 "for input dataset existence)." 

248 ), 

249 default=False, 

250 ) 

251 init_time = Field[bool]( 

252 doc=("Whether to extract the CPU time duration for actually constructing the task."), 

253 default=True, 

254 ) 

255 run_time = Field[bool]( 

256 doc=("Whether to extract the CPU time duration for actually executing the task."), 

257 default=True, 

258 ) 

259 wall_time = Field[bool]( 

260 doc=("Whether to extract the wall_time duration for actually executing the task."), 

261 default=True, 

262 ) 

263 method_times = ListField[str]( 

264 doc=( 

265 "Names of @lsst.utils.timer.timeMethod-decorated methods for " 

266 "which CPU time durations should also be extracted. Use '.' " 

267 "separators to refer to subtask methods at arbitrary depth." 

268 ), 

269 optional=False, 

270 default=[], 

271 ) 

272 input_task_label = Field[str]( 

273 doc=( 

274 "Label for the top-level task whose metadata is being processed " 

275 "within its own metadata file, if this differs from the prefix of " 

276 "connections.input_metadata." 

277 ), 

278 default=None, 

279 optional=True, 

280 ) 

281 

282 

283class GatherResourceUsageTask(PipelineTask): 

284 """A `PipelineTask` that gathers resource usage statistics from task 

285 metadata. 

286 

287 Notes 

288 ----- 

289 This is an unusual `PipelineTask` in that its input connection has 

290 dynamic dimensions. 

291 

292 Its output table has columns for each of the dimensions of the input 

293 metadata's data ID, as well as (subject to configuration): 

294 

295 - ``memory``: the maximum resident set size for the entire quantum 

296 (in bytes); 

297 - ``prep_time``: the time spent in the pre-initialization step in 

298 which the middleware checks which of the quantum's inputs are available; 

299 - ``init_time``: the time spent in task construction; 

300 - ``run_time``: the time spent executing the task's runQuantum 

301 method. 

302 - ``wall_time`` : elapsed time in the pre-initialization step, in task 

303 construction, and in executing the task's runQuantum method. 

304 Specifically, this is the difference between `prepUtc`, which triggers 

305 as soon as single quantum execution has begun (but can include some 

306 checks and running `updatedQuantumInputs`), and `endUtc`, which triggers 

307 immediately after `runQuantum`. 

308 - ``{method}``: the time spent in a particular task or subtask 

309 method decorated with `lsst.utils.timer.timeMethod`. 

310 

311 All time durations are CPU times in seconds, and all columns are 64-bit 

312 floating point. Methods or steps that did not run are given a duration of 

313 zero. 

314 

315 It is expected that this task will be configured to run multiple times in 

316 most pipelines, often once for each other task in the pipeline. 

317 """ 

318 

319 ConfigClass = GatherResourceUsageConfig 

320 _DefaultName = "gatherResourceUsage" 

321 

322 def runQuantum( 

323 self, 

324 butlerQC, 

325 inputRefs, 

326 outputRefs, 

327 ): 

328 # Docstring inherited. 

329 # This override exists just so we can pass the butler registry's 

330 # DimensionUniverse to run in order to standardize the dimensions. 

331 inputs = butlerQC.get(inputRefs) 

332 outputs = self.run(butlerQC.dimensions, **inputs) 

333 butlerQC.put(outputs, outputRefs) 

334 

335 def run(self, universe, input_metadata): 

336 """Gather resource usage statistics from per-quantum metadata. 

337 

338 Parameters 

339 ---------- 

340 universe : `DimensionUniverse` 

341 Object managing all dimensions recognized by the butler; used to 

342 standardize and expand `GatherResourceUsageConfig.dimensions`. 

343 input_metadata : `list` [ `DeferredDatasetHandle` ] 

344 List of `lsst.daf.butler.DeferredDatasetHandle` that can be used to 

345 load all input metadata datasets. 

346 

347 Returns 

348 ------- 

349 result : `Struct` 

350 Structure with a single element: 

351 

352 - ``outout_table``: a `pandas.DataFrame` that aggregates the 

353 configured resource usage statistics. 

354 """ 

355 dimensions = universe.conform(self.config.dimensions) 

356 # Transform input list into a dict keyed by data ID. 

357 handles_by_data_id = {} 

358 for handle in input_metadata: 

359 handles_by_data_id[handle.dataId] = handle 

360 n_rows = len(handles_by_data_id) 

361 # Create a dict of empty column arrays that we'll ultimately make into 

362 # a table. 

363 columns = { 

364 d: np.zeros(n_rows, dtype=_dtype_from_field_spec(universe.dimensions[d].primaryKey)) 

365 for d in dimensions.names 

366 } 

367 for attr_name in ("memory", "prep_time", "init_time", "run_time", "wall_time"): 

368 if getattr(self.config, attr_name): 

369 columns[attr_name] = np.zeros(n_rows, dtype=float) 

370 for method_name in self.config.method_times: 

371 columns[method_name] = np.zeros(n_rows, dtype=float) 

372 # Populate the table, one row at a time. 

373 warned_about_metadata_version = False 

374 for index, (data_id, handle) in enumerate(handles_by_data_id.items()): 

375 # Fill in the data ID columns. 

376 for k, v in data_id.mapping.items(): 

377 columns[k][index] = v 

378 # Load the metadata dataset and fill in the columns derived from 

379 # it. 

380 metadata = handle.get() 

381 try: 

382 quantum_metadata = metadata["quantum"] 

383 except KeyError: 

384 self.log.warning( 

385 "Metadata dataset %s @ %s has no 'quantum' key.", 

386 handle.ref.datasetType.name, 

387 handle.dataId, 

388 ) 

389 else: 

390 if self.config.memory: 

391 columns["memory"][index], warned_about_metadata_version = self._extract_memory( 

392 quantum_metadata, 

393 handle, 

394 warned_about_metadata_version, 

395 ) 

396 for key, value in self._extract_quantum_timing(quantum_metadata).items(): 

397 columns[key][index] = value 

398 for key, value in self._extract_method_timing(metadata, handle).items(): 

399 columns[key][index] = value 

400 return Struct(output_table=pd.DataFrame(columns, copy=False)) 

401 

402 def _extract_memory(self, quantum_metadata, handle, warned_about_metadata_version): 

403 """Extract maximum memory usage from quantum metadata. 

404 

405 Parameters 

406 ---------- 

407 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

408 The nested metadata associated with the label "quantum" inside a 

409 PipelineTask's metadata. 

410 handle : `lsst.daf.butler.DeferredDatasetHandle` 

411 Butler handle for the metadata dataset; used to identify the 

412 metadata in diagnostic messages only. 

413 warned_about_metadata_version : `bool` 

414 Whether we have already emitted at least one warning about old 

415 metadata versions. 

416 

417 Returns 

418 ------- 

419 memory : `float` 

420 Maximum memory usage in bytes. 

421 warned_about_metadata_version : `bool` 

422 Whether we have now emitted at least one warning about old 

423 metadata versions. 

424 """ 

425 # Attempt to work around memory units being 

426 # platform-dependent for metadata written prior to 

427 # w.2022.10. 

428 memory_multiplier = 1 

429 if quantum_metadata.get("__version__", 0) < 1: 

430 memory_multiplier = _RUSAGE_MEMORY_MULTIPLIER 

431 msg = ( 

432 "Metadata dataset %s @ %s is too old; guessing memory units by " 

433 "assuming the platform has not changed" 

434 ) 

435 if not warned_about_metadata_version: 

436 self.log.warning(msg, handle.ref.datasetType.name, handle.dataId) 

437 self.log.warning( 

438 "Warnings about memory units for other inputs " "will be emitted only at DEBUG level." 

439 ) 

440 warned_about_metadata_version = True 

441 else: 

442 self.log.debug(msg, handle.ref.datasetType.name, handle.dataId) 

443 return ( 

444 quantum_metadata["endMaxResidentSetSize"] * memory_multiplier, 

445 warned_about_metadata_version, 

446 ) 

447 

448 def _extract_quantum_timing(self, quantum_metadata): 

449 """Extract timing for standard PipelineTask quantum-execution steps 

450 from metadata. 

451 

452 Parameters 

453 ---------- 

454 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

455 The nested metadata associated with the label "quantum" inside a 

456 PipelineTask's metadata. 

457 

458 Returns 

459 ------- 

460 timing : `dict` [ `str`, `float` ] 

461 CPU times in bytes, for all stages enabled in configuration. 

462 """ 

463 end_time = quantum_metadata["endCpuTime"] 

464 times = [ 

465 quantum_metadata["prepCpuTime"], 

466 quantum_metadata.get("initCpuTime", end_time), 

467 quantum_metadata.get("startCpuTime", end_time), 

468 end_time, 

469 ] 

470 

471 quantum_timing = { 

472 attr_name: end - begin 

473 for attr_name, begin, end in zip( 

474 ["prep_time", "init_time", "run_time"], 

475 times[:-1], 

476 times[1:], 

477 ) 

478 if getattr(self.config, attr_name) 

479 } 

480 if self.config.wall_time: 

481 start_wall_time = Time(quantum_metadata["prepUtc"].split("+")[0]) 

482 end_wall_time = Time(quantum_metadata["endUtc"].split("+")[0]) 

483 quantum_timing["wall_time"] = (end_wall_time - start_wall_time).sec 

484 

485 return quantum_timing 

486 

487 def _extract_method_timing(self, metadata, handle): 

488 """Extract timing for standard PipelineTask quantum-execution steps 

489 from metadata. 

490 

491 Parameters 

492 ---------- 

493 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

494 The nested metadata associated with the label "quantum" inside a 

495 PipelineTask's metadata. 

496 handle : `lsst.daf.butler.DeferredDatasetHandle` 

497 Butler handle for the metadata dataset; used infer the prefix used 

498 for method names within the metadata. 

499 

500 Returns 

501 ------- 

502 timing : `dict` [ `str`, `float` ] 

503 CPU times in bytes, for all methods enabled in configuration. 

504 """ 

505 if self.config.input_task_label is not None: 

506 task_label = self.config.input_task_label 

507 else: 

508 task_label = handle.ref.datasetType.name[: -len("_metadata")] 

509 result = {} 

510 for method_name in self.config.method_times: 

511 terms = [task_label] + list(method_name.split(".")) 

512 metadata_method_name = ":".join(terms[:-1]) + "." + terms[-1] 

513 try: 

514 method_start_time = metadata[f"{metadata_method_name}StartCpuTime"] 

515 method_end_time = metadata[f"{metadata_method_name}EndCpuTime"] 

516 except KeyError: 

517 # A method missing from the metadata is not a problem; 

518 # it's reasonable for configuration or even runtime 

519 # logic to result in a method not being called. When 

520 # that happens, we just let the times stay zero. 

521 pass 

522 else: 

523 result[f"{task_label}.{method_name}"] = method_end_time - method_start_time 

524 return result 

525 

526 

527def _dtype_from_field_spec(field_spec): 

528 """Return the `np.dtype` that can be used to hold the values of a butler 

529 dimension field. 

530 

531 Parameters 

532 ---------- 

533 field_spec : `lsst.daf.butler.core.ddl.FieldSpec` 

534 Object describing the field in a SQL-friendly sense. 

535 

536 Returns 

537 ------- 

538 dtype : `np.dtype` 

539 Numpy data type description. 

540 """ 

541 python_type = field_spec.getPythonType() 

542 if python_type is str: 

543 return np.dtype((str, field_spec.length)) 

544 else: 

545 return np.dtype(python_type) 

546 

547 

548class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder): 

549 """Custom quantum graph generator and pipeline builder for resource 

550 usage summary tasks. 

551 

552 Parameters 

553 ---------- 

554 butler : `lsst.daf.butler.Butler` 

555 Butler client to query for inputs and dataset types. 

556 dataset_type_names : `~collections.abc.Iterable` [ `str` ], optional 

557 Iterable of dataset type names or shell-style glob patterns for the 

558 metadata datasets to be used as input. Default is all datasets ending 

559 with ``_metadata`` (other than the resource-usage summary tasks' own 

560 metadata outputs, where are always ignored). A gather-resource task 

561 with a single quantum is created for each matching metadata dataset. 

562 where : `str`, optional 

563 Data ID expression that constrains the input metadata datasets. 

564 input_collections : `~collections.abc.Sequence` [ `str` ], optional 

565 Sequence of collections to search for inputs. If not provided, 

566 ``butler.collections`` is used and must not be empty. 

567 output_run : `str`, optional 

568 Output `~lsst.daf.butler.CollectionType.RUN` collection name. If not 

569 provided, ``butler.run`` is used and must not be `None`. 

570 skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional 

571 Sequence of collections to search for outputs, allowing quanta whose 

572 outputs exist to be skipped. 

573 clobber : `bool`, optional 

574 Whether *execution* of this quantum graph will permit clobbering. If 

575 `False` (default), existing outputs in ``output_run`` are an error 

576 unless ``skip_existing_in`` will cause those quanta to be skipped. 

577 

578 Notes 

579 ----- 

580 The resource usage summary tasks cannot easily be added to a regular 

581 pipeline, as it's much more natural to have the gather tasks run 

582 automatically on all *other* tasks. And we can generate a quantum graph 

583 for these particular tasks much more efficiently than the general-purpose 

584 algorithm could. 

585 """ 

586 

587 def __init__( 

588 self, 

589 butler: Butler, 

590 *, 

591 dataset_type_names: Iterable[str] | None = None, 

592 where: str = "", 

593 input_collections: Sequence[str] | None = None, 

594 output_run: str | None = None, 

595 skip_existing_in: Sequence[str] = (), 

596 clobber: bool = False, 

597 ): 

598 # Start by querying for metadata datasets, since we'll need to know 

599 # which dataset types exist in the input collections in order to 

600 # build the pipeline. 

601 input_dataset_types: Any 

602 if not dataset_type_names: 

603 input_dataset_types = "*_metadata" 

604 else: 

605 input_dataset_types = dataset_type_names 

606 pipeline_graph = PipelineGraph() 

607 metadata_refs: dict[str, set[DatasetRef]] = {} 

608 consolidate_config = ConsolidateResourceUsageConfig() 

609 for results in butler.registry.queryDatasets( 

610 input_dataset_types, 

611 where=where, 

612 findFirst=True, 

613 collections=input_collections, 

614 ).byParentDatasetType(): 

615 input_metadata_dataset_type = results.parentDatasetType 

616 refs_for_type = set(results) 

617 if refs_for_type: 

618 gather_task_label, gather_dataset_type_name = self._add_gather_task( 

619 pipeline_graph, input_metadata_dataset_type 

620 ) 

621 if gather_task_label is None or gather_dataset_type_name is None: 

622 continue 

623 metadata_refs[gather_task_label] = refs_for_type 

624 consolidate_config.input_names.append(gather_dataset_type_name) 

625 pipeline_graph.add_task( 

626 task_class=ConsolidateResourceUsageTask, 

627 config=consolidate_config, 

628 label=ConsolidateResourceUsageTask._DefaultName, 

629 ) 

630 # Now that we have the pipeline graph, we can delegate to super. 

631 super().__init__( 

632 pipeline_graph, 

633 butler, 

634 input_collections=input_collections, 

635 output_run=output_run, 

636 skip_existing_in=skip_existing_in, 

637 clobber=clobber, 

638 ) 

639 # We've already queried for all of our input datasets, so we don't want 

640 # to do that again in process_subgraph, even though that's where most 

641 # QG builders do their queries. 

642 self.gather_inputs: dict[str, list[DatasetKey]] = {} 

643 self.existing_inputs: list[DatasetRef] = [] 

644 for gather_task_label, gather_input_refs in metadata_refs.items(): 

645 gather_inputs_for_task: list[DatasetKey] = [] 

646 for ref in gather_input_refs: 

647 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

648 self.existing_inputs.append(ref) 

649 gather_inputs_for_task.append(dataset_key) 

650 self.gather_inputs[gather_task_label] = gather_inputs_for_task 

651 

652 @classmethod 

653 def _add_gather_task( 

654 cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType 

655 ) -> tuple[str | None, str | None]: 

656 """Add a single configuration of `GatherResourceUsageTask` to a 

657 pipeline graph. 

658 

659 Parameters 

660 ---------- 

661 pipeline_graph : `lsst.pipe.base.PipelineGraph` 

662 Pipeline graph to modify in-place. 

663 input_metadata_dataset_type : `lsst.daf.butler.DatasetType` 

664 Dataset type for the task's input dataset, which is the metadata 

665 output of the task whose resource usage information is being 

666 extracted. 

667 

668 Returns 

669 ------- 

670 gather_task_label : `str` or `None` 

671 Label of the new task in the pipeline, or `None` if the given 

672 dataset type is not a metadata dataset type or is itself a 

673 gatherResourceUsage metadata dataset type. 

674 gather_dataset_type_name : `str or `None` 

675 Name of the task's output table dataset type, or `None` if the 

676 given dataset type is not a metadata dataset type or is itself a 

677 gatherResourceUsage metadata dataset type. 

678 """ 

679 if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None: 

680 return None, None 

681 elif "gatherResourceUsage" in input_metadata_dataset_type.name: 

682 return None, None 

683 else: 

684 input_task_label = m.group(1) 

685 gather_task_label = f"{input_task_label}_gatherResourceUsage" 

686 gather_dataset_type_name = f"{input_task_label}_resource_usage" 

687 gather_config = GatherResourceUsageConfig() 

688 gather_config.dimensions = input_metadata_dataset_type.dimensions.names 

689 gather_config.connections.input_metadata = input_metadata_dataset_type.name 

690 gather_config.connections.output_table = gather_dataset_type_name 

691 pipeline_graph.add_task( 

692 label=gather_task_label, 

693 task_class=GatherResourceUsageTask, 

694 config=gather_config, 

695 ) 

696 return gather_task_label, gather_dataset_type_name 

697 

698 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: 

699 skeleton = QuantumGraphSkeleton(subgraph.tasks.keys()) 

700 for ref in self.existing_inputs: 

701 skeleton.add_dataset_node(ref.datasetType.name, ref.dataId) 

702 skeleton.set_dataset_ref(ref) 

703 consolidate_inputs = [] 

704 for task_node in subgraph.tasks.values(): 

705 if task_node.task_class is GatherResourceUsageTask: 

706 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id) 

707 skeleton.add_input_edges(quantum_key, self.gather_inputs[task_node.label]) 

708 for write_edge in task_node.iter_all_outputs(): 

709 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name] 

710 assert ( 

711 output_node.dimensions == self.universe.empty 

712 ), "All outputs should have empty dimensions." 

713 gather_output_key = skeleton.add_dataset_node( 

714 write_edge.parent_dataset_type_name, self.empty_data_id 

715 ) 

716 skeleton.add_output_edge(quantum_key, gather_output_key) 

717 if write_edge.connection_name in task_node.outputs: 

718 # Not a special output like metadata or log. 

719 consolidate_inputs.append(gather_output_key) 

720 else: 

721 assert task_node.task_class is ConsolidateResourceUsageTask 

722 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id) 

723 skeleton.add_input_edges(quantum_key, consolidate_inputs) 

724 for write_edge in task_node.iter_all_outputs(): 

725 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name] 

726 assert ( 

727 output_node.dimensions == self.universe.empty 

728 ), "All outputs should have empty dimensions." 

729 consolidate_output_key = skeleton.add_dataset_node( 

730 write_edge.parent_dataset_type_name, self.empty_data_id 

731 ) 

732 skeleton.add_output_edge(quantum_key, consolidate_output_key) 

733 # We don't need to do any follow-up searches for output datasets, 

734 # because the outputs all have empty dimensions and the base 

735 # QuantumGraphBuilder takes care of those. 

736 return skeleton 

737 

738 @classmethod 

739 def make_argument_parser(cls) -> argparse.ArgumentParser: 

740 """Make the argument parser for the command-line interface.""" 

741 parser = argparse.ArgumentParser( 

742 description=( 

743 "Build a QuantumGraph that gathers and consolidates " 

744 "resource usage tables from existing metadata datasets." 

745 ), 

746 ) 

747 parser.add_argument("repo", type=str, help="Path to data repository or butler configuration.") 

748 parser.add_argument("filename", type=str, help="Output filename for QuantumGraph.") 

749 parser.add_argument( 

750 "collections", 

751 type=str, 

752 nargs="+", 

753 help="Collection(s)s to search for input metadata.", 

754 ) 

755 parser.add_argument( 

756 "--dataset-types", 

757 type=str, 

758 action="extend", 

759 help="Glob-style patterns for input metadata dataset types.", 

760 ) 

761 parser.add_argument( 

762 "--where", 

763 type=str, 

764 default="", 

765 help="Data ID expression used when querying for input metadata datasets.", 

766 ) 

767 parser.add_argument( 

768 "--output", 

769 type=str, 

770 help=( 

771 "Name of the output CHAINED collection. If this options is specified and " 

772 "--output-run is not, then a new RUN collection will be created by appending " 

773 "a timestamp to the value of this option." 

774 ), 

775 default=None, 

776 metavar="COLL", 

777 ) 

778 parser.add_argument( 

779 "--output-run", 

780 type=str, 

781 help=( 

782 "Output RUN collection to write resulting images. If not provided " 

783 "then --output must be provided and a new RUN collection will be created " 

784 "by appending a timestamp to the value passed with --output." 

785 ), 

786 default=None, 

787 metavar="RUN", 

788 ) 

789 return parser 

790 

791 @classmethod 

792 def main(cls) -> None: 

793 """Run the command-line interface for this quantum-graph builder. 

794 

795 This function provides the implementation for the 

796 ``build-gather-resource-usage-qg`` script. 

797 """ 

798 parser = cls.make_argument_parser() 

799 args = parser.parse_args() 

800 # Figure out collection names 

801 if args.output_run is None: 

802 if args.output is None: 

803 raise ValueError("At least one of --output or --output-run options is required.") 

804 args.output_run = f"{args.output}/{Instrument.makeCollectionTimestamp()}" 

805 

806 butler = Butler(args.repo, collections=args.collections) 

807 builder = cls( 

808 butler, 

809 dataset_type_names=args.dataset_types, 

810 where=args.where, 

811 input_collections=args.collections, 

812 output_run=args.output_run, 

813 ) 

814 qg: QuantumGraph = builder.build( 

815 # Metadata includes a subset of attributes defined in CmdLineFwk. 

816 metadata={ 

817 "input": args.collections, 

818 "butler_argument": args.repo, 

819 "output": args.output, 

820 "output_run": args.output_run, 

821 "data_query": args.where, 

822 "time": f"{datetime.datetime.now()}", 

823 } 

824 ) 

825 qg.saveUri(args.filename)