Coverage for python / lsst / pipe / base / prerequisite_helpers.py: 32%

201 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Helper classes for finding prerequisite input datasets during 

29QuantumGraph generation. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ( 

35 "PrerequisiteBounds", 

36 "PrerequisiteFinder", 

37 "PrerequisiteInfo", 

38 "SkyPixBoundsBuilder", 

39 "TimespanBuilder", 

40) 

41 

42import dataclasses 

43from abc import ABC, abstractmethod 

44from collections.abc import Callable, Iterable, Mapping, Sequence 

45from typing import cast 

46 

47from lsst.daf.butler import ( 

48 Butler, 

49 DataCoordinate, 

50 DatasetRef, 

51 DatasetType, 

52 DimensionElement, 

53 Registry, 

54 SkyPixDimension, 

55 Timespan, 

56) 

57from lsst.daf.butler.registry import MissingDatasetTypeError 

58from lsst.sphgeom import RangeSet, Region 

59 

60from .pipeline_graph import DatasetTypeNode, PipelineGraph, ReadEdge, TaskNode 

61 

62 

63@dataclasses.dataclass 

64class PrerequisiteInfo: 

65 """A QuantumGraph-generation helper class that manages the searches for all 

66 prerequisite input connections for a task. 

67 

68 Parameters 

69 ---------- 

70 task_node : `TaskNode` 

71 The relevant node. 

72 pipeline_graph : `PipelineGraph` 

73 The pipeline graph. 

74 """ 

75 

76 bounds: PrerequisiteBounds 

77 """Another helper object that manages the spatial/temporal bounds of the 

78 task's quanta. 

79 """ 

80 

81 finders: dict[str, PrerequisiteFinder] 

82 """Mapping of helper objects responsible for a single prerequisite input 

83 connection. 

84 

85 Keys are connection names. Elements of this dictionary should be removed 

86 by implementations of `QuantumGraphBuilder.process_subgraph` to take 

87 responsibility for finding them away from the the `QuantumGraphBuilder` 

88 base class. 

89 """ 

90 

91 def __init__(self, task_node: TaskNode, pipeline_graph: PipelineGraph): 

92 self.bounds = PrerequisiteBounds(task_node) 

93 self.finders = { 

94 edge.connection_name: PrerequisiteFinder(edge, self.bounds, pipeline_graph) 

95 for edge in task_node.prerequisite_inputs.values() 

96 } 

97 

98 def update_bounds(self) -> None: 

99 """Inspect the current state of `finders` and update `bounds` to 

100 reflect the needs of only the finders that remain. 

101 """ 

102 self.bounds.all_dataset_skypix.clear() 

103 self.bounds.any_dataset_has_timespan = False 

104 for finder in self.finders.values(): 

105 self.bounds.all_dataset_skypix.update(finder.dataset_skypix) 

106 self.bounds.any_dataset_has_timespan = ( 

107 self.bounds.any_dataset_has_timespan or finder.dataset_has_timespan 

108 ) 

109 

110 

111class PrerequisiteFinder: 

112 """A QuantumGraph-generation helper class that manages the searches for a 

113 prerequisite input connection. 

114 

115 Parameters 

116 ---------- 

117 edge : `pipeline_graph.ReadEdge` 

118 A `~pipeline_graph.PipelineGraph` edge that represents a single 

119 prerequisite input connection. 

120 bounds : `PrerequisiteBounds` 

121 Another helper object that manages the spatial/temporal bounds of the 

122 task's quanta, shared by all prerequisite inputs for that task. 

123 pipeline_graph : `pipeline_graph.PipelineGraph` 

124 Graph representation of the pipeline. 

125 

126 Notes 

127 ----- 

128 `PrerequisiteFinder` instances are usually constructed by a 

129 `PrerequisiteInfo` instance, which is in turn constructed by and attached 

130 to the base `QuantumGraphBuilder` when a new builder is constructed. During 

131 the `QuantumGraphBuilder.process_subgraph` hook implemented by a builder 

132 subclass, prerequisite inputs may be found in other ways (e.g. via bulk 

133 queries), as long as the results are consistent with the finder's 

134 attributes, and this is indicated to the base `QuantumGraphBuilder` by 

135 removing those finder instances after those prerequisites have been found 

136 and added to a `QuantumGraphSkeleton`. Finder instances that remain in the 

137 builder are used by calling `PrerequisiteFinder.find` on each quantum 

138 later in `QuantumGraphBuilder.build`. 

139 """ 

140 

141 def __init__( 

142 self, 

143 edge: ReadEdge, 

144 bounds: PrerequisiteBounds, 

145 pipeline_graph: PipelineGraph, 

146 ): 

147 self.edge = edge 

148 self._bounds = bounds 

149 self.dataset_type_node = pipeline_graph.dataset_types[edge.parent_dataset_type_name] 

150 self.lookup_function = self.task_node.get_lookup_function(edge.connection_name) 

151 self.dataset_skypix = {} 

152 self.dataset_other_spatial = {} 

153 self.dataset_has_timespan = False 

154 self.constraint_dimensions = self.task_node.dimensions 

155 if self.lookup_function is None: 

156 for family in self.dataset_type_node.dimensions.spatial - self.task_node.dimensions.spatial: 

157 best_spatial_element = family.choose(self.dataset_type_node.dimensions) 

158 if isinstance(best_spatial_element, SkyPixDimension): 

159 self.dataset_skypix[best_spatial_element.name] = best_spatial_element 

160 else: 

161 self.dataset_other_spatial[best_spatial_element.name] = cast( 

162 DimensionElement, best_spatial_element 

163 ) 

164 self.dataset_has_timespan = bool( 

165 # If the task dimensions has a temporal family that isn't in 

166 # the dataset type (i.e. "observation_timespans", like visit 

167 # or exposure)... 

168 self.task_node.dimensions.temporal - self.dataset_type_node.dimensions.temporal 

169 ) and ( 

170 # ...and the dataset type has a temporal family that isn't in 

171 # the task dimensions, or is a calibration, the prerequisite 

172 # search needs a temporal join. Note that the default 

173 # dimension universe only has one temporal dimension family, so 

174 # in practice this just means "calibration lookups when visit 

175 # or exposure is in the task dimensions". 

176 self.dataset_type_node.is_calibration 

177 or bool(self.dataset_type_node.dimensions.temporal - self.task_node.dimensions.temporal) 

178 ) 

179 new_constraint_dimensions = set() 

180 universe = self.task_node.dimensions.universe 

181 for dimension_name in self.task_node.dimensions.names: 

182 if dimension_name in self.dataset_type_node.dimensions.names: 

183 new_constraint_dimensions.add(dimension_name) 

184 else: 

185 dimension = universe[dimension_name] 

186 if not (dimension.spatial or dimension.temporal): 

187 new_constraint_dimensions.add(dimension_name) 

188 self.constraint_dimensions = universe.conform(new_constraint_dimensions) 

189 

190 edge: ReadEdge 

191 """The `~pipeline_graph.PipelineGraph` edge that represents the 

192 prerequisite input connection. 

193 """ 

194 

195 dataset_type_node: DatasetTypeNode 

196 """The `~pipeline_graph.PipelineGraph` node that represents the dataset 

197 type of this connection. 

198 

199 This always uses the registry storage class and is never a component 

200 dataset type. 

201 """ 

202 

203 lookup_function: ( 

204 Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None 

205 ) 

206 """A task-provided callback for finding these datasets. 

207 

208 If this is not `None`, it must be used to ensure correct behavior. 

209 """ 

210 

211 dataset_skypix: dict[str, SkyPixDimension] 

212 """Dimensions representing a pixelization of the sky used by the dataset 

213 type for this connection that are also not part of the task's dimensions. 

214 

215 Keys are dimension names. It is at least extremely rare for this 

216 dictionary to have more than one element. 

217 """ 

218 

219 dataset_other_spatial: dict[str, DimensionElement] 

220 """Spatial dimensions other than sky pixelizations used by the dataset type 

221 for this connection that are also not part of the task's dimensions. 

222 """ 

223 

224 dataset_has_timespan: bool 

225 """Whether the dataset has a timespan that should be used in the lookup, 

226 either because it is a calibration dataset or because it has temporal 

227 dimensions that are not part of the tasks's dimensions. 

228 """ 

229 

230 @property 

231 def task_node(self) -> TaskNode: 

232 """The `~pipeline_graph.PipelineGraph` node that represents the task 

233 for this connection. 

234 """ 

235 return self._bounds.task_node 

236 

237 def find( 

238 self, 

239 butler: Butler, 

240 input_collections: Sequence[str], 

241 data_id: DataCoordinate, 

242 skypix_bounds: Mapping[str, RangeSet], 

243 timespan: Timespan | None, 

244 ) -> list[DatasetRef]: 

245 """Find prerequisite input datasets for a single quantum. 

246 

247 Parameters 

248 ---------- 

249 butler : `lsst.daf.butler.Butler` 

250 Butler client to use for queries. 

251 input_collections : `~collections.abc.Sequence` [ `str` ] 

252 Sequence of collections to search, in order. 

253 data_id : `lsst.daf.butler.DataCoordinate` 

254 Data ID for the quantum. 

255 skypix_bounds : `~collections.abc.Mapping` \ 

256 [ `str`, `lsst.sphgeom.RangeSet` ] 

257 The spatial bounds of this quantum in various skypix dimensions. 

258 Keys are skypix dimension names (a superset of those in 

259 `dataset_skypix`) and values are sets of integer pixel ID ranges. 

260 timespan : `lsst.daf.butler.Timespan` or `None` 

261 The temporal bounds of this quantum. Guaranteed to not be `None` 

262 if `dataset_has_timespan` is `True`. 

263 

264 Returns 

265 ------- 

266 refs : `list` [ `lsst.daf.butler.DatasetRef` ] 

267 Dataset references. These use 

268 ``self.dataset_type_node.dataset_type``, which may differ from the 

269 connection's dataset type in storage class or [lack of] component. 

270 

271 Raises 

272 ------ 

273 NotImplementedError 

274 Raised for certain relationships between task and dataset type 

275 dimensions that are possible to define but not believed to be 

276 useful in practice. These errors occur late rather than early in 

277 order to allow a `QuantumGraphBuilder` subclass to handle them 

278 first, in case an unusual task's needs must be met by a custom 

279 builder class anyway. 

280 """ 

281 if self.lookup_function: 

282 # If there is a lookup function, just use it; nothing else matters. 

283 return [ 

284 self.dataset_type_node.generalize_ref(ref) 

285 for ref in self.lookup_function( 

286 self.edge.adapt_dataset_type(self.dataset_type_node.dataset_type), 

287 butler.registry, 

288 data_id, 

289 input_collections, 

290 ) 

291 if ref is not None 

292 ] 

293 if self.dataset_type_node.dimensions <= self.constraint_dimensions: 

294 # If this is a calibration dataset and the dataset doesn't have 

295 # any dimensions that aren't constrained by the quantum data 

296 # ID, we know there'll only be one result, and that means we 

297 # can call Butler.find_dataset, which takes a timespan. Note 

298 # that the AllDimensionsQuantumGraphBuilder subclass will 

299 # intercept this case in order to optimize it when: 

300 # 

301 # - PipelineTaskConnections.getTemporalBoundsConnections is 

302 # empty; 

303 # 

304 # - the quantum data IDs have temporal dimensions; 

305 # 

306 # and when that happens PrerequisiteFinder.find never gets 

307 # called. 

308 try: 

309 ref = butler.find_dataset( 

310 self.dataset_type_node.dataset_type, 

311 data_id.subset(self.constraint_dimensions), 

312 collections=input_collections, 

313 timespan=timespan, 

314 ) 

315 except MissingDatasetTypeError: 

316 ref = None 

317 return [ref] if ref is not None else [] 

318 elif self.dataset_has_timespan: 

319 extra_dimensions = self.dataset_type_node.dimensions.names - self.constraint_dimensions.names 

320 raise NotImplementedError( 

321 f"No support for calibration lookup {self.task_node.label}.{self.edge.connection_name} " 

322 f"with dimension(s) {extra_dimensions} not fully constrained by the task. " 

323 "Please create a feature-request ticket and use a lookup function in the meantime." 

324 ) 

325 if self.dataset_skypix: 

326 if not self.dataset_has_timespan and not self.dataset_other_spatial: 

327 # If the dataset has skypix dimensions but is not otherwise 

328 # spatial or temporal (this describes reference catalogs and 

329 # things like them), we can stuff the skypix IDs we want into 

330 # the query via bind parameters and call queryDatasets. Once 

331 # again AllDimensionsQuantumGraphBuilder will often intercept 

332 # this case in order to optimize it, when: 

333 # 

334 # - PipelineTaskConnections.getSpatialBoundsConnections is 

335 # empty; 

336 # 

337 # - the quantum data IDs have spatial dimensions; 

338 # 

339 # and when that happens PrerequisiteFinder.find never gets 

340 # called. 

341 where_terms: list[str] = [] 

342 bind: dict[str, list[int]] = {} 

343 for name in self.dataset_skypix: 

344 where_terms.append(f"{name} IN (:{name}_pixels)") 

345 pixels: list[int] = [] 

346 for begin, end in skypix_bounds[name]: 

347 pixels.extend(range(begin, end)) 

348 bind[f"{name}_pixels"] = pixels 

349 try: 

350 return butler.query_datasets( 

351 self.dataset_type_node.dataset_type, 

352 collections=input_collections, 

353 data_id=data_id.subset(self.constraint_dimensions), 

354 where=" AND ".join(where_terms), 

355 bind=bind, 

356 with_dimension_records=True, 

357 limit=None, 

358 explain=False, 

359 ) 

360 except MissingDatasetTypeError: 

361 return [] 

362 else: 

363 raise NotImplementedError( 

364 f"No support for skypix lookup {self.task_node.label}.{self.edge.connection_name} " 

365 "that requires additional spatial and/or temporal constraints. " 

366 "Please create a feature-request ticket and use a lookup function in the meantime." 

367 ) 

368 if self._bounds.spatial_connections or self._bounds.temporal_connections: 

369 raise NotImplementedError( 

370 f"No support for prerequisite lookup {self.task_node.label}.{self.edge.connection_name} " 

371 "that requires other connections to determine spatial or temporal bounds but does not " 

372 "fit into one of our standard cases. " 

373 "Please create a feature-request ticket and use a lookup function in the meantime." 

374 ) 

375 # If the spatial/temporal bounds are not customized, and the dataset 

376 # doesn't have any skypix dimensions, a vanilla query_datasets call 

377 # should work. This case should always be optimized by 

378 # AllDimensionsQuantumGraphBuilder as well. Note that we use the 

379 # original quantum data ID here, not those with constraint_dimensions 

380 # that strips out the spatial/temporal stuff, because here we want the 

381 # butler query system to handle the spatial/temporal stuff like it 

382 # normally would. 

383 try: 

384 return butler.query_datasets( 

385 self.dataset_type_node.dataset_type, 

386 collections=input_collections, 

387 data_id=data_id, 

388 with_dimension_records=True, 

389 limit=None, 

390 explain=False, 

391 ) 

392 except MissingDatasetTypeError: 

393 return [] 

394 

395 

396@dataclasses.dataclass 

397class PrerequisiteBounds: 

398 """A QuantumGraph-generation helper class that manages the spatial and 

399 temporal bounds of a tasks' quanta, for the purpose of finding 

400 prerequisite inputs. 

401 """ 

402 

403 task_node: TaskNode 

404 """The `~pipeline_graph.PipelineGraph` node that represents the task.""" 

405 

406 spatial_connections: frozenset[str] = dataclasses.field(init=False) 

407 """Regular input or output connections whose (assumed spatial) data IDs 

408 should be used to define the spatial bounds of this task's quanta. 

409 

410 See Also 

411 -------- 

412 PipelineTaskConnections.getSpatialBoundsConnections 

413 """ 

414 

415 temporal_connections: frozenset[str] = dataclasses.field(init=False) 

416 """Regular input or output connections whose (assumed temporal) data IDs 

417 should be used to define the temporal bounds of this task's quanta. 

418 

419 See Also 

420 -------- 

421 PipelineTaskConnections.getTemporalBoundsConnections 

422 """ 

423 

424 all_dataset_skypix: dict[str, SkyPixDimension] = dataclasses.field(default_factory=dict) 

425 """The union of all `PrerequisiteFinder.dataset_skypix` attributes for all 

426 (remaining) prerequisite finders for this task. 

427 """ 

428 

429 any_dataset_has_timespan: bool = dataclasses.field(default=False) 

430 """Whether any `PrerequisiteFinder.dataset_has_timespan` attribute is true 

431 for any (remaining) prerequisite finder for this task. 

432 """ 

433 

434 def __post_init__(self) -> None: 

435 self.spatial_connections = frozenset(self.task_node.get_spatial_bounds_connections()) 

436 self.temporal_connections = frozenset(self.task_node.get_temporal_bounds_connections()) 

437 

438 def make_skypix_bounds_builder(self, quantum_data_id: DataCoordinate) -> SkyPixBoundsBuilder: 

439 """Return an object that accumulates the appropriate spatial bounds for 

440 a quantum. 

441 

442 Parameters 

443 ---------- 

444 quantum_data_id : `lsst.daf.butler.DataCoordinate` 

445 Data ID for this quantum. 

446 

447 Returns 

448 ------- 

449 builder : `SkyPixBoundsBuilder` 

450 Object that accumulates the appropriate spatial bounds for a 

451 quantum. If the spatial bounds are not needed, this object will do 

452 nothing. 

453 """ 

454 if not self.all_dataset_skypix: 

455 return _TrivialSkyPixBoundsBuilder() 

456 if self.spatial_connections: 

457 return _ConnectionSkyPixBoundsBuilder( 

458 self.task_node, self.spatial_connections, self.all_dataset_skypix.values(), quantum_data_id 

459 ) 

460 if self.task_node.dimensions.spatial: 

461 return _QuantumOnlySkyPixBoundsBuilder(self.all_dataset_skypix.values(), quantum_data_id) 

462 else: 

463 return _UnboundedSkyPixBoundsBuilder(self.all_dataset_skypix.values()) 

464 

465 def make_timespan_builder(self, quantum_data_id: DataCoordinate) -> TimespanBuilder: 

466 """Return an object that accumulates the appropriate timespan for 

467 a quantum. 

468 

469 Parameters 

470 ---------- 

471 quantum_data_id : `lsst.daf.butler.DataCoordinate` 

472 Data ID for this quantum. 

473 

474 Returns 

475 ------- 

476 builder : `TimespanBuilder` 

477 Object that accumulates the appropriate timespan bounds for a 

478 quantum. If a timespan is not needed, this object will do nothing. 

479 """ 

480 if not self.any_dataset_has_timespan: 

481 return _TrivialTimespanBuilder() 

482 if self.temporal_connections: 

483 return _ConnectionTimespanBuilder(self.task_node, self.temporal_connections, quantum_data_id) 

484 if self.task_node.dimensions.temporal: 

485 return _QuantumOnlyTimespanBuilder(quantum_data_id) 

486 else: 

487 return _UnboundedTimespanBuilder() 

488 

489 

490class SkyPixBoundsBuilder(ABC): 

491 """A base class for objects that accumulate the appropriate spatial bounds 

492 for a quantum. 

493 """ 

494 

495 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None: 

496 """Handle the skeleton graph node for a regular input/output connection 

497 for this quantum, including its data ID in the bounds if appropriate. 

498 

499 Parameters 

500 ---------- 

501 parent_dataset_type_name : `str` 

502 Name of the dataset type. Never a component dataset type name. 

503 data_id : `lsst.daf.butler.DataCoordinate` 

504 Data ID for the dataset. 

505 """ 

506 pass 

507 

508 @abstractmethod 

509 def finish(self) -> dict[str, RangeSet]: 

510 """Finish building the spatial bounds and return them. 

511 

512 Returns 

513 ------- 

514 bounds : `dict` [ `str`, `lsst.sphgeom.RangeSet` ] 

515 The spatial bounds of this quantum in various skypix dimensions. 

516 Keys are skypix dimension names and values are sets of integer 

517 pixel ID ranges. 

518 """ 

519 raise NotImplementedError() 

520 

521 

522class TimespanBuilder(ABC): 

523 """A base class for objects that accumulate the appropriate timespan 

524 for a quantum. 

525 """ 

526 

527 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None: 

528 """Handle the skeleton graph node for a regular input/output connection 

529 for this quantum, including its data ID in the bounds if appropriate. 

530 

531 Parameters 

532 ---------- 

533 parent_dataset_type_name : `str` 

534 Name of the dataset type. Never a component dataset type name. 

535 data_id : `lsst.daf.butler.DataCoordinate` 

536 Data ID for the dataset. 

537 """ 

538 pass 

539 

540 @abstractmethod 

541 def finish(self) -> Timespan | None: 

542 """Finish building the timespan and return it. 

543 

544 Returns 

545 ------- 

546 timespan : `lsst.daf.butler.Timespan` or `None` 

547 The timespan of this quantum, or `None` if it is known to not be 

548 needed. 

549 """ 

550 raise NotImplementedError() 

551 

552 

553class _TrivialSkyPixBoundsBuilder(SkyPixBoundsBuilder): 

554 """Implementation of `SkyPixBoundsBuilder` for when no skypix bounds are 

555 needed. 

556 """ 

557 

558 def finish(self) -> dict[str, RangeSet]: 

559 return {} 

560 

561 

562class _TrivialTimespanBuilder(TimespanBuilder): 

563 """Implementation of `TimespanBuilder` for when no timespan is needed.""" 

564 

565 def finish(self) -> None: 

566 return None 

567 

568 

569class _QuantumOnlySkyPixBoundsBuilder(SkyPixBoundsBuilder): 

570 """Implementation of `SkyPixBoundsBuilder` for when the quantum data IDs 

571 provide the only relevant spatial regions. 

572 """ 

573 

574 def __init__(self, dimensions: Iterable[SkyPixDimension], quantum_data_id: DataCoordinate) -> None: 

575 self._region = quantum_data_id.region 

576 self._dimensions = dimensions 

577 

578 def finish(self) -> dict[str, RangeSet]: 

579 return { 

580 dimension.name: dimension.pixelization.envelope(self._region) for dimension in self._dimensions 

581 } 

582 

583 

584class _QuantumOnlyTimespanBuilder(TimespanBuilder): 

585 """Implementation of `TimespanBuilder` for when the quantum data IDs 

586 provide the only relevant timespans. 

587 """ 

588 

589 def __init__(self, quantum_data_id: DataCoordinate) -> None: 

590 self._timespan = cast(Timespan, quantum_data_id.timespan) 

591 

592 def finish(self) -> Timespan: 

593 return self._timespan 

594 

595 

596class _UnboundedSkyPixBoundsBuilder(SkyPixBoundsBuilder): 

597 """Implementation of `SkyPixBoundsBuilder` for when the bounds cover the 

598 full sky. 

599 """ 

600 

601 def __init__(self, dimensions: Iterable[SkyPixDimension]): 

602 self._dimensions = dimensions 

603 

604 def finish(self) -> dict[str, RangeSet]: 

605 return {dimension.name: dimension.pixelization.universe() for dimension in self._dimensions} 

606 

607 

608class _UnboundedTimespanBuilder(TimespanBuilder): 

609 """Implementation of `TimespanBuilder` for when the timespan covers all 

610 time. 

611 """ 

612 

613 def finish(self) -> Timespan: 

614 return Timespan(None, None) 

615 

616 

617class _ConnectionSkyPixBoundsBuilder(SkyPixBoundsBuilder): 

618 """Implementation of `SkyPixBoundsBuilder` for when other input or output 

619 connections contribute to the spatial bounds. 

620 """ 

621 

622 def __init__( 

623 self, 

624 task_node: TaskNode, 

625 bounds_connections: frozenset[str], 

626 dimensions: Iterable[SkyPixDimension], 

627 quantum_data_id: DataCoordinate, 

628 ) -> None: 

629 self._dimensions = dimensions 

630 self._regions: list[Region] = [] 

631 if task_node.dimensions.spatial: 

632 self._regions.append(quantum_data_id.region) 

633 self._dataset_type_names: set[str] = set() 

634 for connection_name in bounds_connections: 

635 if edge := task_node.inputs.get(connection_name): 

636 self._dataset_type_names.add(edge.parent_dataset_type_name) 

637 else: 

638 self._dataset_type_names.add(task_node.outputs[connection_name].parent_dataset_type_name) 

639 # Note that we end up raising if the input is a prerequisite (and 

640 # hence not in task_node.inputs or task_node.outputs); this 

641 # justifies the cast in `handle_dataset`. 

642 

643 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None: 

644 if parent_dataset_type_name in self._dataset_type_names: 

645 self._regions.append(data_id.region) 

646 

647 def finish(self) -> dict[str, RangeSet]: 

648 result = {} 

649 for dimension in self._dimensions: 

650 bounds = RangeSet() 

651 for region in self._regions: 

652 bounds |= dimension.pixelization.envelope(region) 

653 result[dimension.name] = bounds 

654 return result 

655 

656 

657class _ConnectionTimespanBuilder(TimespanBuilder): 

658 """Implementation of `TimespanBuilder` for when other input or output 

659 connections contribute to the timespan. 

660 """ 

661 

662 def __init__( 

663 self, 

664 task_node: TaskNode, 

665 bounds_connections: frozenset[str], 

666 quantum_data_id: DataCoordinate, 

667 ) -> None: 

668 timespan = ( 

669 cast(Timespan, quantum_data_id.timespan) 

670 if task_node.dimensions.temporal 

671 else Timespan.makeEmpty() 

672 ) 

673 self._begin_nsec = timespan.nsec[0] 

674 self._end_nsec = timespan.nsec[1] 

675 self._dataset_type_names = set() 

676 for connection_name in bounds_connections: 

677 if edge := task_node.inputs.get(connection_name): 

678 self._dataset_type_names.add(edge.parent_dataset_type_name) 

679 else: 

680 self._dataset_type_names.add(task_node.outputs[connection_name].parent_dataset_type_name) 

681 # Note that we end up raising if the input is a prerequisite (and 

682 # hence not in task_node.inputs or task_node.outputs); this 

683 # justifies the cast in `handle_dataset`. 

684 

685 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None: 

686 if parent_dataset_type_name in self._dataset_type_names: 

687 nsec = cast(Timespan, data_id.timespan).nsec 

688 self._begin_nsec = min(self._begin_nsec, nsec[0]) 

689 self._end_nsec = max(self._end_nsec, nsec[1]) 

690 

691 def finish(self) -> Timespan: 

692 return Timespan(None, None, _nsec=(self._begin_nsec, self._end_nsec))