Coverage for python / lsst / analysis / tools / tasks / gatherResourceUsage.py: 19%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:09 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22__all__ = ( 

23 "ConsolidateResourceUsageConfig", 

24 "ConsolidateResourceUsageConnections", 

25 "ConsolidateResourceUsageTask", 

26 "GatherResourceUsageConfig", 

27 "GatherResourceUsageConnections", 

28 "GatherResourceUsageTask", 

29 "ResourceUsageQuantumGraphBuilder", 

30) 

31 

32import argparse 

33import dataclasses 

34import datetime 

35import logging 

36import re 

37from collections.abc import Iterable, Sequence 

38from typing import Any 

39 

40import numpy as np 

41import pandas as pd 

42from astropy.time import Time 

43from lsst.daf.butler import Butler, DatasetRef, DatasetType 

44from lsst.pex.config import Field, ListField 

45from lsst.pipe.base import ( 

46 Instrument, 

47 PipelineTask, 

48 PipelineTaskConfig, 

49 PipelineTaskConnections, 

50 QuantumGraph, 

51 Struct, 

52) 

53from lsst.pipe.base import connectionTypes as cT 

54from lsst.pipe.base.pipeline_graph import PipelineGraph 

55from lsst.pipe.base.quantum_graph_builder import QuantumGraphBuilder 

56from lsst.pipe.base.quantum_graph_skeleton import DatasetKey, QuantumGraphSkeleton 

57 

58# It's not great to be importing a private symbol, but this is a temporary 

59# workaround for the fact that prior to w.2022.10, the units for memory values 

60# written in task metadata were platform-dependent. Once we no longer care 

61# about older runs, this import and the code that uses it can be removed. 

62from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER 

63 

64_LOG = logging.getLogger(__name__) 

65 

66 

67class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()): 

68 """Connection definitions for `ConsolidateResourceUsageTask`.""" 

69 

70 output_table = cT.Output( 

71 name="ResourceUsageSummary", 

72 storageClass="DataFrame", 

73 dimensions=(), 

74 doc="Consolidated table of resource usage statistics. One row per task label", 

75 ) 

76 

77 def __init__(self, *, config): 

78 super().__init__(config=config) 

79 for name in self.config.input_names: 

80 setattr( 

81 self, 

82 name, 

83 cT.Input( 

84 name, 

85 storageClass="DataFrame", 

86 dimensions=(), 

87 doc="Resource usage statistics for a task.", 

88 ), 

89 ) 

90 self.inputs.add(name) 

91 

92 

93class ConsolidateResourceUsageConfig( 

94 PipelineTaskConfig, pipelineConnections=ConsolidateResourceUsageConnections 

95): 

96 """Configuration definitions for `ConsolidateResourceUsageTask`.""" 

97 

98 input_names = ListField[str]( 

99 doc="Input resource usage dataset type names", 

100 default=[], 

101 ) 

102 

103 

104class ConsolidateResourceUsageTask(PipelineTask): 

105 """A `PipelineTask` that summarizes task resource usage into a single 

106 table with per-task rows. 

107 

108 Notes 

109 ----- 

110 This is an unusual `PipelineTask` in that its input connection has 

111 dynamic dimensions, and its quanta are generally built via a custom 

112 quantum-graph builder defined in the same module. 

113 """ 

114 

115 ConfigClass = ConsolidateResourceUsageConfig 

116 _DefaultName = "consolidateResourceUsage" 

117 

118 def run(self, **kwargs: Any) -> Struct: 

119 quantiles = [] 

120 for input_name, ru_table in kwargs.items(): 

121 if not input_name.endswith("resource_usage"): 

122 continue 

123 else: 

124 df = ru_table.quantile( 

125 [0.0, 0.01, 0.05, 0.32, 0.50, 0.68, 0.95, 0.99, 1.0], 

126 numeric_only=True, 

127 ).reset_index() 

128 df["task"] = input_name.replace("_resource_usage", "") 

129 df["quanta"] = len(ru_table) 

130 df["integrated_runtime"] = ru_table["run_time"].sum() 

131 

132 quantiles.append( 

133 df[ 

134 [ 

135 "index", 

136 "quanta", 

137 "task", 

138 "memory", 

139 "init_time", 

140 "run_time", 

141 "wall_time", 

142 "integrated_runtime", 

143 ] 

144 ] 

145 ) 

146 

147 full_quantiles = pd.concat(quantiles) 

148 full_quantiles["percentile"] = (full_quantiles["index"] * 100).astype(int) 

149 full_quantiles["percentile_name"] = "p" + full_quantiles["percentile"].astype(str).str.zfill(3) 

150 full_quantiles["memoryGB"] = full_quantiles["memory"] / 1024 / 1024 / 1024 

151 full_quantiles["integrated_runtime_hrs"] = full_quantiles["integrated_runtime"] / 3600.0 

152 memoryGB = pd.pivot_table( 

153 full_quantiles, values="memoryGB", columns=["percentile_name"], index=["task"] 

154 ).add_prefix("mem_GB_") 

155 runtime = pd.pivot_table( 

156 full_quantiles, values="run_time", columns=["percentile_name"], index=["task"] 

157 ).add_prefix("runtime_s_") 

158 walltime = pd.pivot_table( 

159 full_quantiles, values="wall_time", columns=["percentile_name"], index=["task"] 

160 ).add_prefix("walltime_s_") 

161 memrun = pd.merge( 

162 pd.merge( 

163 memoryGB.reset_index(), 

164 runtime.reset_index(), 

165 left_on="task", 

166 right_on="task", 

167 ), 

168 walltime.reset_index(), 

169 left_on="task", 

170 right_on="task", 

171 ) 

172 memrun = pd.merge( 

173 full_quantiles[["task", "quanta", "integrated_runtime_hrs"]] 

174 .drop_duplicates() 

175 .sort_values("task"), 

176 memrun, 

177 ) 

178 

179 return Struct(output_table=memrun) 

180 

181 

182class GatherResourceUsageConnections( 

183 PipelineTaskConnections, dimensions=(), defaultTemplates={"input_task_label": "PLACEHOLDER"} 

184): 

185 """Connection definitions for `GatherResourceUsageTask`.""" 

186 

187 output_table = cT.Output( 

188 "{input_task_label}_resource_statistics", # Should always be overridden. 

189 storageClass="DataFrame", 

190 dimensions=(), 

191 doc=( 

192 "Table that aggregates memory and CPU usage statistics from one " 

193 "or more tasks. " 

194 "This will have one row for each data ID, with columns for each " 

195 "task or method's memory usage and runtime." 

196 ), 

197 ) 

198 input_metadata = cT.Input( 

199 "{input_task_label}_metadata", # Should always be overridden. 

200 storageClass="TaskMetadata", 

201 dimensions=(), # Actually set in __init__, according to configuration. 

202 doc="Metadata dataset for another task to gather resource usage from.", 

203 multiple=True, 

204 deferLoad=True, 

205 ) 

206 

207 def __init__(self, *, config): 

208 super().__init__(config=config) 

209 if "PLACEHOLDER" in self.output_table.name: 

210 raise ValueError("Connection configuration for output_table must be overridden.") 

211 if "PLACEHOLDER" in self.input_metadata.name: 

212 raise ValueError("Connection configuration for input_metadata must be overridden.") 

213 # Override the empty dimension set the connection was defined with with 

214 # those the task was configured with. 

215 self.input_metadata = dataclasses.replace( 

216 self.input_metadata, 

217 dimensions=list(self.config.dimensions), 

218 ) 

219 

220 

221class GatherResourceUsageConfig(PipelineTaskConfig, pipelineConnections=GatherResourceUsageConnections): 

222 """Configuration definitions for `GatherResourceUsageTask`.""" 

223 

224 dimensions = ListField[str]( 

225 doc=( 

226 "The quantum dimensions for the input metadata connection, and " 

227 "the columns (after expansion to include implied dimensions) used " 

228 "to identify rows in the output table." 

229 ), 

230 ) 

231 memory = Field[bool]( 

232 doc=( 

233 "Whether to extract peak memory usage (maximum resident set size) " 

234 "for this task. " 

235 "Note that memory usage cannot be further subdivided because only " 

236 "a per-process peak is available (and hence if multiple quanta " 

237 "are run in one quantum, even per-quantum values may be " 

238 "misleading)." 

239 ), 

240 default=True, 

241 ) 

242 prep_time = Field[bool]( 

243 doc=( 

244 "Whether to extract the CPU time duration for the work the " 

245 "middleware does prior to initializing the task (mostly checking " 

246 "for input dataset existence)." 

247 ), 

248 default=False, 

249 ) 

250 init_time = Field[bool]( 

251 doc=("Whether to extract the CPU time duration for actually constructing the task."), 

252 default=True, 

253 ) 

254 run_time = Field[bool]( 

255 doc=("Whether to extract the CPU time duration for actually executing the task."), 

256 default=True, 

257 ) 

258 wall_time = Field[bool]( 

259 doc=("Whether to extract the wall_time duration for actually executing the task."), 

260 default=True, 

261 ) 

262 method_times = ListField[str]( 

263 doc=( 

264 "Names of @lsst.utils.timer.timeMethod-decorated methods for " 

265 "which CPU time durations should also be extracted. Use '.' " 

266 "separators to refer to subtask methods at arbitrary depth." 

267 ), 

268 optional=False, 

269 default=[], 

270 ) 

271 input_task_label = Field[str]( 

272 doc=( 

273 "Label for the top-level task whose metadata is being processed " 

274 "within its own metadata file, if this differs from the prefix of " 

275 "connections.input_metadata." 

276 ), 

277 default=None, 

278 optional=True, 

279 ) 

280 

281 

282class GatherResourceUsageTask(PipelineTask): 

283 """A `PipelineTask` that gathers resource usage statistics from task 

284 metadata. 

285 

286 Notes 

287 ----- 

288 This is an unusual `PipelineTask` in that its input connection has 

289 dynamic dimensions. 

290 

291 Its output table has columns for each of the dimensions of the input 

292 metadata's data ID, as well as (subject to configuration): 

293 

294 - ``memory``: the maximum resident set size for the entire quantum 

295 (in bytes); 

296 - ``prep_time``: the time spent in the pre-initialization step in 

297 which the middleware checks which of the quantum's inputs are available; 

298 - ``init_time``: the time spent in task construction; 

299 - ``run_time``: the time spent executing the task's runQuantum 

300 method. 

301 - ``wall_time`` : elapsed time in the pre-initialization step, in task 

302 construction, and in executing the task's runQuantum method. 

303 Specifically, this is the difference between `prepUtc`, which triggers 

304 as soon as single quantum execution has begun (but can include some 

305 checks and running `updatedQuantumInputs`), and `endUtc`, which triggers 

306 immediately after `runQuantum`. 

307 - ``{method}``: the time spent in a particular task or subtask 

308 method decorated with `lsst.utils.timer.timeMethod`. 

309 

310 All time durations are CPU times in seconds, and all columns are 64-bit 

311 floating point. Methods or steps that did not run are given a duration of 

312 zero. 

313 

314 It is expected that this task will be configured to run multiple times in 

315 most pipelines, often once for each other task in the pipeline. 

316 """ 

317 

318 ConfigClass = GatherResourceUsageConfig 

319 _DefaultName = "gatherResourceUsage" 

320 

321 def runQuantum( 

322 self, 

323 butlerQC, 

324 inputRefs, 

325 outputRefs, 

326 ): 

327 # Docstring inherited. 

328 # This override exists just so we can pass the butler registry's 

329 # DimensionUniverse to run in order to standardize the dimensions. 

330 inputs = butlerQC.get(inputRefs) 

331 outputs = self.run(butlerQC.dimensions, **inputs) 

332 butlerQC.put(outputs, outputRefs) 

333 

334 def run(self, universe, input_metadata): 

335 """Gather resource usage statistics from per-quantum metadata. 

336 

337 Parameters 

338 ---------- 

339 universe : `DimensionUniverse` 

340 Object managing all dimensions recognized by the butler; used to 

341 standardize and expand `GatherResourceUsageConfig.dimensions`. 

342 input_metadata : `list` [ `DeferredDatasetHandle` ] 

343 List of `lsst.daf.butler.DeferredDatasetHandle` that can be used to 

344 load all input metadata datasets. 

345 

346 Returns 

347 ------- 

348 result : `Struct` 

349 Structure with a single element: 

350 

351 - ``outout_table``: a `pandas.DataFrame` that aggregates the 

352 configured resource usage statistics. 

353 """ 

354 dimensions = universe.conform(self.config.dimensions) 

355 # Transform input list into a dict keyed by data ID. 

356 handles_by_data_id = {} 

357 for handle in input_metadata: 

358 handles_by_data_id[handle.dataId] = handle 

359 n_rows = len(handles_by_data_id) 

360 # Create a dict of empty column arrays that we'll ultimately make into 

361 # a table. 

362 columns = { 

363 d: np.zeros(n_rows, dtype=_dtype_from_field_spec(universe.dimensions[d].primaryKey)) 

364 for d in dimensions.names 

365 } 

366 for attr_name in ("memory", "prep_time", "init_time", "run_time", "wall_time"): 

367 if getattr(self.config, attr_name): 

368 columns[attr_name] = np.zeros(n_rows, dtype=float) 

369 for method_name in self.config.method_times: 

370 columns[method_name] = np.zeros(n_rows, dtype=float) 

371 # Populate the table, one row at a time. 

372 warned_about_metadata_version = False 

373 for index, (data_id, handle) in enumerate(handles_by_data_id.items()): 

374 # Fill in the data ID columns. 

375 for k, v in data_id.mapping.items(): 

376 columns[k][index] = v 

377 # Load the metadata dataset and fill in the columns derived from 

378 # it. 

379 metadata = handle.get() 

380 try: 

381 quantum_metadata = metadata["quantum"] 

382 except KeyError: 

383 self.log.warning( 

384 "Metadata dataset %s @ %s has no 'quantum' key.", 

385 handle.ref.datasetType.name, 

386 handle.dataId, 

387 ) 

388 else: 

389 if self.config.memory: 

390 columns["memory"][index], warned_about_metadata_version = self._extract_memory( 

391 quantum_metadata, 

392 handle, 

393 warned_about_metadata_version, 

394 ) 

395 for key, value in self._extract_quantum_timing(quantum_metadata).items(): 

396 columns[key][index] = value 

397 for key, value in self._extract_method_timing(metadata, handle).items(): 

398 columns[key][index] = value 

399 return Struct(output_table=pd.DataFrame(columns, copy=False)) 

400 

401 def _extract_memory(self, quantum_metadata, handle, warned_about_metadata_version): 

402 """Extract maximum memory usage from quantum metadata. 

403 

404 Parameters 

405 ---------- 

406 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

407 The nested metadata associated with the label "quantum" inside a 

408 PipelineTask's metadata. 

409 handle : `lsst.daf.butler.DeferredDatasetHandle` 

410 Butler handle for the metadata dataset; used to identify the 

411 metadata in diagnostic messages only. 

412 warned_about_metadata_version : `bool` 

413 Whether we have already emitted at least one warning about old 

414 metadata versions. 

415 

416 Returns 

417 ------- 

418 memory : `float` 

419 Maximum memory usage in bytes. 

420 warned_about_metadata_version : `bool` 

421 Whether we have now emitted at least one warning about old 

422 metadata versions. 

423 """ 

424 # Attempt to work around memory units being 

425 # platform-dependent for metadata written prior to 

426 # w.2022.10. 

427 memory_multiplier = 1 

428 if quantum_metadata.get("__version__", 0) < 1: 

429 memory_multiplier = _RUSAGE_MEMORY_MULTIPLIER 

430 msg = ( 

431 "Metadata dataset %s @ %s is too old; guessing memory units by " 

432 "assuming the platform has not changed" 

433 ) 

434 if not warned_about_metadata_version: 

435 self.log.warning(msg, handle.ref.datasetType.name, handle.dataId) 

436 self.log.warning( 

437 "Warnings about memory units for other inputs " "will be emitted only at DEBUG level." 

438 ) 

439 warned_about_metadata_version = True 

440 else: 

441 self.log.debug(msg, handle.ref.datasetType.name, handle.dataId) 

442 return ( 

443 quantum_metadata["endMaxResidentSetSize"] * memory_multiplier, 

444 warned_about_metadata_version, 

445 ) 

446 

447 def _extract_quantum_timing(self, quantum_metadata): 

448 """Extract timing for standard PipelineTask quantum-execution steps 

449 from metadata. 

450 

451 Parameters 

452 ---------- 

453 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

454 The nested metadata associated with the label "quantum" inside a 

455 PipelineTask's metadata. 

456 

457 Returns 

458 ------- 

459 timing : `dict` [ `str`, `float` ] 

460 CPU times in bytes, for all stages enabled in configuration. 

461 """ 

462 end_time = quantum_metadata["endCpuTime"] 

463 times = [ 

464 quantum_metadata["prepCpuTime"], 

465 quantum_metadata.get("initCpuTime", end_time), 

466 quantum_metadata.get("startCpuTime", end_time), 

467 end_time, 

468 ] 

469 

470 quantum_timing = { 

471 attr_name: end - begin 

472 for attr_name, begin, end in zip( 

473 ["prep_time", "init_time", "run_time"], 

474 times[:-1], 

475 times[1:], 

476 ) 

477 if getattr(self.config, attr_name) 

478 } 

479 if self.config.wall_time: 

480 start_wall_time = Time(quantum_metadata["prepUtc"].split("+")[0]) 

481 end_wall_time = Time(quantum_metadata["endUtc"].split("+")[0]) 

482 quantum_timing["wall_time"] = (end_wall_time - start_wall_time).sec 

483 

484 return quantum_timing 

485 

486 def _extract_method_timing(self, metadata, handle): 

487 """Extract timing for standard PipelineTask quantum-execution steps 

488 from metadata. 

489 

490 Parameters 

491 ---------- 

492 quantum_metadata : `lsst.pipe.base.TaskMetadata` 

493 The nested metadata associated with the label "quantum" inside a 

494 PipelineTask's metadata. 

495 handle : `lsst.daf.butler.DeferredDatasetHandle` 

496 Butler handle for the metadata dataset; used infer the prefix used 

497 for method names within the metadata. 

498 

499 Returns 

500 ------- 

501 timing : `dict` [ `str`, `float` ] 

502 CPU times in bytes, for all methods enabled in configuration. 

503 """ 

504 if self.config.input_task_label is not None: 

505 task_label = self.config.input_task_label 

506 else: 

507 task_label = handle.ref.datasetType.name[: -len("_metadata")] 

508 result = {} 

509 for method_name in self.config.method_times: 

510 terms = [task_label] + list(method_name.split(".")) 

511 metadata_method_name = ":".join(terms[:-1]) + "." + terms[-1] 

512 try: 

513 method_start_time = metadata[f"{metadata_method_name}StartCpuTime"] 

514 method_end_time = metadata[f"{metadata_method_name}EndCpuTime"] 

515 except KeyError: 

516 # A method missing from the metadata is not a problem; 

517 # it's reasonable for configuration or even runtime 

518 # logic to result in a method not being called. When 

519 # that happens, we just let the times stay zero. 

520 pass 

521 else: 

522 result[f"{task_label}.{method_name}"] = method_end_time - method_start_time 

523 return result 

524 

525 

526def _dtype_from_field_spec(field_spec): 

527 """Return the `np.dtype` that can be used to hold the values of a butler 

528 dimension field. 

529 

530 Parameters 

531 ---------- 

532 field_spec : `lsst.daf.butler.core.ddl.FieldSpec` 

533 Object describing the field in a SQL-friendly sense. 

534 

535 Returns 

536 ------- 

537 dtype : `np.dtype` 

538 Numpy data type description. 

539 """ 

540 python_type = field_spec.getPythonType() 

541 if python_type is str: 

542 return np.dtype((str, field_spec.length)) 

543 else: 

544 return np.dtype(python_type) 

545 

546 

547class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder): 

548 """Custom quantum graph generator and pipeline builder for resource 

549 usage summary tasks. 

550 

551 Parameters 

552 ---------- 

553 butler : `lsst.daf.butler.Butler` 

554 Butler client to query for inputs and dataset types. 

555 dataset_type_names : `~collections.abc.Iterable` [ `str` ], optional 

556 Iterable of dataset type names or shell-style glob patterns for the 

557 metadata datasets to be used as input. Default is all datasets ending 

558 with ``_metadata`` (other than the resource-usage summary tasks' own 

559 metadata outputs, where are always ignored). A gather-resource task 

560 with a single quantum is created for each matching metadata dataset. 

561 where : `str`, optional 

562 Data ID expression that constrains the input metadata datasets. 

563 input_collections : `~collections.abc.Sequence` [ `str` ], optional 

564 Sequence of collections to search for inputs. If not provided, 

565 ``butler.collections`` is used and must not be empty. 

566 output_run : `str`, optional 

567 Output `~lsst.daf.butler.CollectionType.RUN` collection name. If not 

568 provided, ``butler.run`` is used and must not be `None`. 

569 skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional 

570 Sequence of collections to search for outputs, allowing quanta whose 

571 outputs exist to be skipped. 

572 clobber : `bool`, optional 

573 Whether *execution* of this quantum graph will permit clobbering. If 

574 `False` (default), existing outputs in ``output_run`` are an error 

575 unless ``skip_existing_in`` will cause those quanta to be skipped. 

576 

577 Notes 

578 ----- 

579 The resource usage summary tasks cannot easily be added to a regular 

580 pipeline, as it's much more natural to have the gather tasks run 

581 automatically on all *other* tasks. And we can generate a quantum graph 

582 for these particular tasks much more efficiently than the general-purpose 

583 algorithm could. 

584 """ 

585 

586 def __init__( 

587 self, 

588 butler: Butler, 

589 *, 

590 dataset_type_names: Iterable[str] | None = None, 

591 where: str = "", 

592 input_collections: Sequence[str] | None = None, 

593 output_run: str | None = None, 

594 skip_existing_in: Sequence[str] = (), 

595 clobber: bool = False, 

596 ): 

597 # Start by querying for metadata datasets, since we'll need to know 

598 # which dataset types exist in the input collections in order to 

599 # build the pipeline. 

600 input_dataset_types: Any 

601 if not dataset_type_names: 

602 input_dataset_types = "*_metadata" 

603 else: 

604 input_dataset_types = dataset_type_names 

605 pipeline_graph = PipelineGraph() 

606 metadata_refs: dict[str, set[DatasetRef]] = {} 

607 consolidate_config = ConsolidateResourceUsageConfig() 

608 for results in butler.registry.queryDatasets( 

609 input_dataset_types, 

610 where=where, 

611 findFirst=True, 

612 collections=input_collections, 

613 ).byParentDatasetType(): 

614 input_metadata_dataset_type = results.parentDatasetType 

615 refs_for_type = set(results) 

616 if refs_for_type: 

617 gather_task_label, gather_dataset_type_name = self._add_gather_task( 

618 pipeline_graph, input_metadata_dataset_type 

619 ) 

620 if gather_task_label is None or gather_dataset_type_name is None: 

621 continue 

622 metadata_refs[gather_task_label] = refs_for_type 

623 consolidate_config.input_names.append(gather_dataset_type_name) 

624 pipeline_graph.add_task( 

625 task_class=ConsolidateResourceUsageTask, 

626 config=consolidate_config, 

627 label=ConsolidateResourceUsageTask._DefaultName, 

628 ) 

629 # Now that we have the pipeline graph, we can delegate to super. 

630 super().__init__( 

631 pipeline_graph, 

632 butler, 

633 input_collections=input_collections, 

634 output_run=output_run, 

635 skip_existing_in=skip_existing_in, 

636 clobber=clobber, 

637 ) 

638 # We've already queried for all of our input datasets, so we don't want 

639 # to do that again in process_subgraph, even though that's where most 

640 # QG builders do their queries. 

641 self.gather_inputs: dict[str, list[DatasetKey]] = {} 

642 self.existing_inputs: list[DatasetRef] = [] 

643 for gather_task_label, gather_input_refs in metadata_refs.items(): 

644 gather_inputs_for_task: list[DatasetKey] = [] 

645 for ref in gather_input_refs: 

646 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

647 self.existing_inputs.append(ref) 

648 gather_inputs_for_task.append(dataset_key) 

649 self.gather_inputs[gather_task_label] = gather_inputs_for_task 

650 

651 @classmethod 

652 def _add_gather_task( 

653 cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType 

654 ) -> tuple[str | None, str | None]: 

655 """Add a single configuration of `GatherResourceUsageTask` to a 

656 pipeline graph. 

657 

658 Parameters 

659 ---------- 

660 pipeline_graph : `lsst.pipe.base.PipelineGraph` 

661 Pipeline graph to modify in-place. 

662 input_metadata_dataset_type : `lsst.daf.butler.DatasetType` 

663 Dataset type for the task's input dataset, which is the metadata 

664 output of the task whose resource usage information is being 

665 extracted. 

666 

667 Returns 

668 ------- 

669 gather_task_label : `str` or `None` 

670 Label of the new task in the pipeline, or `None` if the given 

671 dataset type is not a metadata dataset type or is itself a 

672 gatherResourceUsage metadata dataset type. 

673 gather_dataset_type_name : `str or `None` 

674 Name of the task's output table dataset type, or `None` if the 

675 given dataset type is not a metadata dataset type or is itself a 

676 gatherResourceUsage metadata dataset type. 

677 """ 

678 if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None: 

679 return None, None 

680 elif "gatherResourceUsage" in input_metadata_dataset_type.name: 

681 return None, None 

682 else: 

683 input_task_label = m.group(1) 

684 gather_task_label = f"{input_task_label}_gatherResourceUsage" 

685 gather_dataset_type_name = f"{input_task_label}_resource_usage" 

686 gather_config = GatherResourceUsageConfig() 

687 gather_config.dimensions = input_metadata_dataset_type.dimensions.names 

688 gather_config.connections.input_metadata = input_metadata_dataset_type.name 

689 gather_config.connections.output_table = gather_dataset_type_name 

690 pipeline_graph.add_task( 

691 label=gather_task_label, 

692 task_class=GatherResourceUsageTask, 

693 config=gather_config, 

694 ) 

695 return gather_task_label, gather_dataset_type_name 

696 

697 def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: 

698 skeleton = QuantumGraphSkeleton(subgraph.tasks.keys()) 

699 for ref in self.existing_inputs: 

700 skeleton.add_dataset_node(ref.datasetType.name, ref.dataId) 

701 skeleton.set_dataset_ref(ref) 

702 consolidate_inputs = [] 

703 for task_node in subgraph.tasks.values(): 

704 if task_node.task_class is GatherResourceUsageTask: 

705 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id) 

706 skeleton.add_input_edges(quantum_key, self.gather_inputs[task_node.label]) 

707 for write_edge in task_node.iter_all_outputs(): 

708 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name] 

709 assert ( 

710 output_node.dimensions == self.universe.empty 

711 ), "All outputs should have empty dimensions." 

712 gather_output_key = skeleton.add_dataset_node( 

713 write_edge.parent_dataset_type_name, self.empty_data_id 

714 ) 

715 skeleton.add_output_edge(quantum_key, gather_output_key) 

716 if write_edge.connection_name in task_node.outputs: 

717 # Not a special output like metadata or log. 

718 consolidate_inputs.append(gather_output_key) 

719 else: 

720 assert task_node.task_class is ConsolidateResourceUsageTask 

721 quantum_key = skeleton.add_quantum_node(task_node.label, self.empty_data_id) 

722 skeleton.add_input_edges(quantum_key, consolidate_inputs) 

723 for write_edge in task_node.iter_all_outputs(): 

724 output_node = subgraph.dataset_types[write_edge.parent_dataset_type_name] 

725 assert ( 

726 output_node.dimensions == self.universe.empty 

727 ), "All outputs should have empty dimensions." 

728 consolidate_output_key = skeleton.add_dataset_node( 

729 write_edge.parent_dataset_type_name, self.empty_data_id 

730 ) 

731 skeleton.add_output_edge(quantum_key, consolidate_output_key) 

732 # We don't need to do any follow-up searches for output datasets, 

733 # because the outputs all have empty dimensions and the base 

734 # QuantumGraphBuilder takes care of those. 

735 return skeleton 

736 

737 @classmethod 

738 def make_argument_parser(cls) -> argparse.ArgumentParser: 

739 """Make the argument parser for the command-line interface.""" 

740 parser = argparse.ArgumentParser( 

741 description=( 

742 "Build a QuantumGraph that gathers and consolidates " 

743 "resource usage tables from existing metadata datasets." 

744 ), 

745 ) 

746 parser.add_argument("repo", type=str, help="Path to data repository or butler configuration.") 

747 parser.add_argument("filename", type=str, help="Output filename for QuantumGraph.") 

748 parser.add_argument( 

749 "collections", 

750 type=str, 

751 nargs="+", 

752 help="Collection(s)s to search for input metadata.", 

753 ) 

754 parser.add_argument( 

755 "--dataset-types", 

756 type=str, 

757 action="extend", 

758 help="Glob-style patterns for input metadata dataset types.", 

759 ) 

760 parser.add_argument( 

761 "--where", 

762 type=str, 

763 default="", 

764 help="Data ID expression used when querying for input metadata datasets.", 

765 ) 

766 parser.add_argument( 

767 "--output", 

768 type=str, 

769 help=( 

770 "Name of the output CHAINED collection. If this options is specified and " 

771 "--output-run is not, then a new RUN collection will be created by appending " 

772 "a timestamp to the value of this option." 

773 ), 

774 default=None, 

775 metavar="COLL", 

776 ) 

777 parser.add_argument( 

778 "--output-run", 

779 type=str, 

780 help=( 

781 "Output RUN collection to write resulting images. If not provided " 

782 "then --output must be provided and a new RUN collection will be created " 

783 "by appending a timestamp to the value passed with --output." 

784 ), 

785 default=None, 

786 metavar="RUN", 

787 ) 

788 return parser 

789 

790 @classmethod 

791 def main(cls) -> None: 

792 """Run the command-line interface for this quantum-graph builder. 

793 

794 This function provides the implementation for the 

795 ``build-gather-resource-usage-qg`` script. 

796 """ 

797 parser = cls.make_argument_parser() 

798 args = parser.parse_args() 

799 # Figure out collection names 

800 if args.output_run is None: 

801 if args.output is None: 

802 raise ValueError("At least one of --output or --output-run options is required.") 

803 args.output_run = "{}/{}".format(args.output, Instrument.makeCollectionTimestamp()) 

804 

805 butler = Butler(args.repo, collections=args.collections) 

806 builder = cls( 

807 butler, 

808 dataset_type_names=args.dataset_types, 

809 where=args.where, 

810 input_collections=args.collections, 

811 output_run=args.output_run, 

812 ) 

813 qg: QuantumGraph = builder.build( 

814 # Metadata includes a subset of attributes defined in CmdLineFwk. 

815 metadata={ 

816 "input": args.collections, 

817 "butler_argument": args.repo, 

818 "output": args.output, 

819 "output_run": args.output_run, 

820 "data_query": args.where, 

821 "time": f"{datetime.datetime.now()}", 

822 } 

823 ) 

824 qg.saveUri(args.filename)