Coverage for python / lsst / pipe / base / prerequisite_helpers.py: 32%
201 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Helper classes for finding prerequisite input datasets during
29QuantumGraph generation.
30"""
32from __future__ import annotations
34__all__ = (
35 "PrerequisiteBounds",
36 "PrerequisiteFinder",
37 "PrerequisiteInfo",
38 "SkyPixBoundsBuilder",
39 "TimespanBuilder",
40)
42import dataclasses
43from abc import ABC, abstractmethod
44from collections.abc import Callable, Iterable, Mapping, Sequence
45from typing import cast
47from lsst.daf.butler import (
48 Butler,
49 DataCoordinate,
50 DatasetRef,
51 DatasetType,
52 DimensionElement,
53 Registry,
54 SkyPixDimension,
55 Timespan,
56)
57from lsst.daf.butler.registry import MissingDatasetTypeError
58from lsst.sphgeom import RangeSet, Region
60from .pipeline_graph import DatasetTypeNode, PipelineGraph, ReadEdge, TaskNode
63@dataclasses.dataclass
64class PrerequisiteInfo:
65 """A QuantumGraph-generation helper class that manages the searches for all
66 prerequisite input connections for a task.
68 Parameters
69 ----------
70 task_node : `TaskNode`
71 The relevant node.
72 pipeline_graph : `PipelineGraph`
73 The pipeline graph.
74 """
76 bounds: PrerequisiteBounds
77 """Another helper object that manages the spatial/temporal bounds of the
78 task's quanta.
79 """
81 finders: dict[str, PrerequisiteFinder]
82 """Mapping of helper objects responsible for a single prerequisite input
83 connection.
85 Keys are connection names. Elements of this dictionary should be removed
86 by implementations of `QuantumGraphBuilder.process_subgraph` to take
87 responsibility for finding them away from the the `QuantumGraphBuilder`
88 base class.
89 """
91 def __init__(self, task_node: TaskNode, pipeline_graph: PipelineGraph):
92 self.bounds = PrerequisiteBounds(task_node)
93 self.finders = {
94 edge.connection_name: PrerequisiteFinder(edge, self.bounds, pipeline_graph)
95 for edge in task_node.prerequisite_inputs.values()
96 }
98 def update_bounds(self) -> None:
99 """Inspect the current state of `finders` and update `bounds` to
100 reflect the needs of only the finders that remain.
101 """
102 self.bounds.all_dataset_skypix.clear()
103 self.bounds.any_dataset_has_timespan = False
104 for finder in self.finders.values():
105 self.bounds.all_dataset_skypix.update(finder.dataset_skypix)
106 self.bounds.any_dataset_has_timespan = (
107 self.bounds.any_dataset_has_timespan or finder.dataset_has_timespan
108 )
111class PrerequisiteFinder:
112 """A QuantumGraph-generation helper class that manages the searches for a
113 prerequisite input connection.
115 Parameters
116 ----------
117 edge : `pipeline_graph.ReadEdge`
118 A `~pipeline_graph.PipelineGraph` edge that represents a single
119 prerequisite input connection.
120 bounds : `PrerequisiteBounds`
121 Another helper object that manages the spatial/temporal bounds of the
122 task's quanta, shared by all prerequisite inputs for that task.
123 pipeline_graph : `pipeline_graph.PipelineGraph`
124 Graph representation of the pipeline.
126 Notes
127 -----
128 `PrerequisiteFinder` instances are usually constructed by a
129 `PrerequisiteInfo` instance, which is in turn constructed by and attached
130 to the base `QuantumGraphBuilder` when a new builder is constructed. During
131 the `QuantumGraphBuilder.process_subgraph` hook implemented by a builder
132 subclass, prerequisite inputs may be found in other ways (e.g. via bulk
133 queries), as long as the results are consistent with the finder's
134 attributes, and this is indicated to the base `QuantumGraphBuilder` by
135 removing those finder instances after those prerequisites have been found
136 and added to a `QuantumGraphSkeleton`. Finder instances that remain in the
137 builder are used by calling `PrerequisiteFinder.find` on each quantum
138 later in `QuantumGraphBuilder.build`.
139 """
141 def __init__(
142 self,
143 edge: ReadEdge,
144 bounds: PrerequisiteBounds,
145 pipeline_graph: PipelineGraph,
146 ):
147 self.edge = edge
148 self._bounds = bounds
149 self.dataset_type_node = pipeline_graph.dataset_types[edge.parent_dataset_type_name]
150 self.lookup_function = self.task_node.get_lookup_function(edge.connection_name)
151 self.dataset_skypix = {}
152 self.dataset_other_spatial = {}
153 self.dataset_has_timespan = False
154 self.constraint_dimensions = self.task_node.dimensions
155 if self.lookup_function is None:
156 for family in self.dataset_type_node.dimensions.spatial - self.task_node.dimensions.spatial:
157 best_spatial_element = family.choose(self.dataset_type_node.dimensions)
158 if isinstance(best_spatial_element, SkyPixDimension):
159 self.dataset_skypix[best_spatial_element.name] = best_spatial_element
160 else:
161 self.dataset_other_spatial[best_spatial_element.name] = cast(
162 DimensionElement, best_spatial_element
163 )
164 self.dataset_has_timespan = bool(
165 # If the task dimensions has a temporal family that isn't in
166 # the dataset type (i.e. "observation_timespans", like visit
167 # or exposure)...
168 self.task_node.dimensions.temporal - self.dataset_type_node.dimensions.temporal
169 ) and (
170 # ...and the dataset type has a temporal family that isn't in
171 # the task dimensions, or is a calibration, the prerequisite
172 # search needs a temporal join. Note that the default
173 # dimension universe only has one temporal dimension family, so
174 # in practice this just means "calibration lookups when visit
175 # or exposure is in the task dimensions".
176 self.dataset_type_node.is_calibration
177 or bool(self.dataset_type_node.dimensions.temporal - self.task_node.dimensions.temporal)
178 )
179 new_constraint_dimensions = set()
180 universe = self.task_node.dimensions.universe
181 for dimension_name in self.task_node.dimensions.names:
182 if dimension_name in self.dataset_type_node.dimensions.names:
183 new_constraint_dimensions.add(dimension_name)
184 else:
185 dimension = universe[dimension_name]
186 if not (dimension.spatial or dimension.temporal):
187 new_constraint_dimensions.add(dimension_name)
188 self.constraint_dimensions = universe.conform(new_constraint_dimensions)
190 edge: ReadEdge
191 """The `~pipeline_graph.PipelineGraph` edge that represents the
192 prerequisite input connection.
193 """
195 dataset_type_node: DatasetTypeNode
196 """The `~pipeline_graph.PipelineGraph` node that represents the dataset
197 type of this connection.
199 This always uses the registry storage class and is never a component
200 dataset type.
201 """
203 lookup_function: (
204 Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None
205 )
206 """A task-provided callback for finding these datasets.
208 If this is not `None`, it must be used to ensure correct behavior.
209 """
211 dataset_skypix: dict[str, SkyPixDimension]
212 """Dimensions representing a pixelization of the sky used by the dataset
213 type for this connection that are also not part of the task's dimensions.
215 Keys are dimension names. It is at least extremely rare for this
216 dictionary to have more than one element.
217 """
219 dataset_other_spatial: dict[str, DimensionElement]
220 """Spatial dimensions other than sky pixelizations used by the dataset type
221 for this connection that are also not part of the task's dimensions.
222 """
224 dataset_has_timespan: bool
225 """Whether the dataset has a timespan that should be used in the lookup,
226 either because it is a calibration dataset or because it has temporal
227 dimensions that are not part of the tasks's dimensions.
228 """
230 @property
231 def task_node(self) -> TaskNode:
232 """The `~pipeline_graph.PipelineGraph` node that represents the task
233 for this connection.
234 """
235 return self._bounds.task_node
237 def find(
238 self,
239 butler: Butler,
240 input_collections: Sequence[str],
241 data_id: DataCoordinate,
242 skypix_bounds: Mapping[str, RangeSet],
243 timespan: Timespan | None,
244 ) -> list[DatasetRef]:
245 """Find prerequisite input datasets for a single quantum.
247 Parameters
248 ----------
249 butler : `lsst.daf.butler.Butler`
250 Butler client to use for queries.
251 input_collections : `~collections.abc.Sequence` [ `str` ]
252 Sequence of collections to search, in order.
253 data_id : `lsst.daf.butler.DataCoordinate`
254 Data ID for the quantum.
255 skypix_bounds : `~collections.abc.Mapping` \
256 [ `str`, `lsst.sphgeom.RangeSet` ]
257 The spatial bounds of this quantum in various skypix dimensions.
258 Keys are skypix dimension names (a superset of those in
259 `dataset_skypix`) and values are sets of integer pixel ID ranges.
260 timespan : `lsst.daf.butler.Timespan` or `None`
261 The temporal bounds of this quantum. Guaranteed to not be `None`
262 if `dataset_has_timespan` is `True`.
264 Returns
265 -------
266 refs : `list` [ `lsst.daf.butler.DatasetRef` ]
267 Dataset references. These use
268 ``self.dataset_type_node.dataset_type``, which may differ from the
269 connection's dataset type in storage class or [lack of] component.
271 Raises
272 ------
273 NotImplementedError
274 Raised for certain relationships between task and dataset type
275 dimensions that are possible to define but not believed to be
276 useful in practice. These errors occur late rather than early in
277 order to allow a `QuantumGraphBuilder` subclass to handle them
278 first, in case an unusual task's needs must be met by a custom
279 builder class anyway.
280 """
281 if self.lookup_function:
282 # If there is a lookup function, just use it; nothing else matters.
283 return [
284 self.dataset_type_node.generalize_ref(ref)
285 for ref in self.lookup_function(
286 self.edge.adapt_dataset_type(self.dataset_type_node.dataset_type),
287 butler.registry,
288 data_id,
289 input_collections,
290 )
291 if ref is not None
292 ]
293 if self.dataset_type_node.dimensions <= self.constraint_dimensions:
294 # If this is a calibration dataset and the dataset doesn't have
295 # any dimensions that aren't constrained by the quantum data
296 # ID, we know there'll only be one result, and that means we
297 # can call Butler.find_dataset, which takes a timespan. Note
298 # that the AllDimensionsQuantumGraphBuilder subclass will
299 # intercept this case in order to optimize it when:
300 #
301 # - PipelineTaskConnections.getTemporalBoundsConnections is
302 # empty;
303 #
304 # - the quantum data IDs have temporal dimensions;
305 #
306 # and when that happens PrerequisiteFinder.find never gets
307 # called.
308 try:
309 ref = butler.find_dataset(
310 self.dataset_type_node.dataset_type,
311 data_id.subset(self.constraint_dimensions),
312 collections=input_collections,
313 timespan=timespan,
314 )
315 except MissingDatasetTypeError:
316 ref = None
317 return [ref] if ref is not None else []
318 elif self.dataset_has_timespan:
319 extra_dimensions = self.dataset_type_node.dimensions.names - self.constraint_dimensions.names
320 raise NotImplementedError(
321 f"No support for calibration lookup {self.task_node.label}.{self.edge.connection_name} "
322 f"with dimension(s) {extra_dimensions} not fully constrained by the task. "
323 "Please create a feature-request ticket and use a lookup function in the meantime."
324 )
325 if self.dataset_skypix:
326 if not self.dataset_has_timespan and not self.dataset_other_spatial:
327 # If the dataset has skypix dimensions but is not otherwise
328 # spatial or temporal (this describes reference catalogs and
329 # things like them), we can stuff the skypix IDs we want into
330 # the query via bind parameters and call queryDatasets. Once
331 # again AllDimensionsQuantumGraphBuilder will often intercept
332 # this case in order to optimize it, when:
333 #
334 # - PipelineTaskConnections.getSpatialBoundsConnections is
335 # empty;
336 #
337 # - the quantum data IDs have spatial dimensions;
338 #
339 # and when that happens PrerequisiteFinder.find never gets
340 # called.
341 where_terms: list[str] = []
342 bind: dict[str, list[int]] = {}
343 for name in self.dataset_skypix:
344 where_terms.append(f"{name} IN (:{name}_pixels)")
345 pixels: list[int] = []
346 for begin, end in skypix_bounds[name]:
347 pixels.extend(range(begin, end))
348 bind[f"{name}_pixels"] = pixels
349 try:
350 return butler.query_datasets(
351 self.dataset_type_node.dataset_type,
352 collections=input_collections,
353 data_id=data_id.subset(self.constraint_dimensions),
354 where=" AND ".join(where_terms),
355 bind=bind,
356 with_dimension_records=True,
357 limit=None,
358 explain=False,
359 )
360 except MissingDatasetTypeError:
361 return []
362 else:
363 raise NotImplementedError(
364 f"No support for skypix lookup {self.task_node.label}.{self.edge.connection_name} "
365 "that requires additional spatial and/or temporal constraints. "
366 "Please create a feature-request ticket and use a lookup function in the meantime."
367 )
368 if self._bounds.spatial_connections or self._bounds.temporal_connections:
369 raise NotImplementedError(
370 f"No support for prerequisite lookup {self.task_node.label}.{self.edge.connection_name} "
371 "that requires other connections to determine spatial or temporal bounds but does not "
372 "fit into one of our standard cases. "
373 "Please create a feature-request ticket and use a lookup function in the meantime."
374 )
375 # If the spatial/temporal bounds are not customized, and the dataset
376 # doesn't have any skypix dimensions, a vanilla query_datasets call
377 # should work. This case should always be optimized by
378 # AllDimensionsQuantumGraphBuilder as well. Note that we use the
379 # original quantum data ID here, not those with constraint_dimensions
380 # that strips out the spatial/temporal stuff, because here we want the
381 # butler query system to handle the spatial/temporal stuff like it
382 # normally would.
383 try:
384 return butler.query_datasets(
385 self.dataset_type_node.dataset_type,
386 collections=input_collections,
387 data_id=data_id,
388 with_dimension_records=True,
389 limit=None,
390 explain=False,
391 )
392 except MissingDatasetTypeError:
393 return []
396@dataclasses.dataclass
397class PrerequisiteBounds:
398 """A QuantumGraph-generation helper class that manages the spatial and
399 temporal bounds of a tasks' quanta, for the purpose of finding
400 prerequisite inputs.
401 """
403 task_node: TaskNode
404 """The `~pipeline_graph.PipelineGraph` node that represents the task."""
406 spatial_connections: frozenset[str] = dataclasses.field(init=False)
407 """Regular input or output connections whose (assumed spatial) data IDs
408 should be used to define the spatial bounds of this task's quanta.
410 See Also
411 --------
412 PipelineTaskConnections.getSpatialBoundsConnections
413 """
415 temporal_connections: frozenset[str] = dataclasses.field(init=False)
416 """Regular input or output connections whose (assumed temporal) data IDs
417 should be used to define the temporal bounds of this task's quanta.
419 See Also
420 --------
421 PipelineTaskConnections.getTemporalBoundsConnections
422 """
424 all_dataset_skypix: dict[str, SkyPixDimension] = dataclasses.field(default_factory=dict)
425 """The union of all `PrerequisiteFinder.dataset_skypix` attributes for all
426 (remaining) prerequisite finders for this task.
427 """
429 any_dataset_has_timespan: bool = dataclasses.field(default=False)
430 """Whether any `PrerequisiteFinder.dataset_has_timespan` attribute is true
431 for any (remaining) prerequisite finder for this task.
432 """
434 def __post_init__(self) -> None:
435 self.spatial_connections = frozenset(self.task_node.get_spatial_bounds_connections())
436 self.temporal_connections = frozenset(self.task_node.get_temporal_bounds_connections())
438 def make_skypix_bounds_builder(self, quantum_data_id: DataCoordinate) -> SkyPixBoundsBuilder:
439 """Return an object that accumulates the appropriate spatial bounds for
440 a quantum.
442 Parameters
443 ----------
444 quantum_data_id : `lsst.daf.butler.DataCoordinate`
445 Data ID for this quantum.
447 Returns
448 -------
449 builder : `SkyPixBoundsBuilder`
450 Object that accumulates the appropriate spatial bounds for a
451 quantum. If the spatial bounds are not needed, this object will do
452 nothing.
453 """
454 if not self.all_dataset_skypix:
455 return _TrivialSkyPixBoundsBuilder()
456 if self.spatial_connections:
457 return _ConnectionSkyPixBoundsBuilder(
458 self.task_node, self.spatial_connections, self.all_dataset_skypix.values(), quantum_data_id
459 )
460 if self.task_node.dimensions.spatial:
461 return _QuantumOnlySkyPixBoundsBuilder(self.all_dataset_skypix.values(), quantum_data_id)
462 else:
463 return _UnboundedSkyPixBoundsBuilder(self.all_dataset_skypix.values())
465 def make_timespan_builder(self, quantum_data_id: DataCoordinate) -> TimespanBuilder:
466 """Return an object that accumulates the appropriate timespan for
467 a quantum.
469 Parameters
470 ----------
471 quantum_data_id : `lsst.daf.butler.DataCoordinate`
472 Data ID for this quantum.
474 Returns
475 -------
476 builder : `TimespanBuilder`
477 Object that accumulates the appropriate timespan bounds for a
478 quantum. If a timespan is not needed, this object will do nothing.
479 """
480 if not self.any_dataset_has_timespan:
481 return _TrivialTimespanBuilder()
482 if self.temporal_connections:
483 return _ConnectionTimespanBuilder(self.task_node, self.temporal_connections, quantum_data_id)
484 if self.task_node.dimensions.temporal:
485 return _QuantumOnlyTimespanBuilder(quantum_data_id)
486 else:
487 return _UnboundedTimespanBuilder()
490class SkyPixBoundsBuilder(ABC):
491 """A base class for objects that accumulate the appropriate spatial bounds
492 for a quantum.
493 """
495 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None:
496 """Handle the skeleton graph node for a regular input/output connection
497 for this quantum, including its data ID in the bounds if appropriate.
499 Parameters
500 ----------
501 parent_dataset_type_name : `str`
502 Name of the dataset type. Never a component dataset type name.
503 data_id : `lsst.daf.butler.DataCoordinate`
504 Data ID for the dataset.
505 """
506 pass
508 @abstractmethod
509 def finish(self) -> dict[str, RangeSet]:
510 """Finish building the spatial bounds and return them.
512 Returns
513 -------
514 bounds : `dict` [ `str`, `lsst.sphgeom.RangeSet` ]
515 The spatial bounds of this quantum in various skypix dimensions.
516 Keys are skypix dimension names and values are sets of integer
517 pixel ID ranges.
518 """
519 raise NotImplementedError()
522class TimespanBuilder(ABC):
523 """A base class for objects that accumulate the appropriate timespan
524 for a quantum.
525 """
527 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None:
528 """Handle the skeleton graph node for a regular input/output connection
529 for this quantum, including its data ID in the bounds if appropriate.
531 Parameters
532 ----------
533 parent_dataset_type_name : `str`
534 Name of the dataset type. Never a component dataset type name.
535 data_id : `lsst.daf.butler.DataCoordinate`
536 Data ID for the dataset.
537 """
538 pass
540 @abstractmethod
541 def finish(self) -> Timespan | None:
542 """Finish building the timespan and return it.
544 Returns
545 -------
546 timespan : `lsst.daf.butler.Timespan` or `None`
547 The timespan of this quantum, or `None` if it is known to not be
548 needed.
549 """
550 raise NotImplementedError()
553class _TrivialSkyPixBoundsBuilder(SkyPixBoundsBuilder):
554 """Implementation of `SkyPixBoundsBuilder` for when no skypix bounds are
555 needed.
556 """
558 def finish(self) -> dict[str, RangeSet]:
559 return {}
562class _TrivialTimespanBuilder(TimespanBuilder):
563 """Implementation of `TimespanBuilder` for when no timespan is needed."""
565 def finish(self) -> None:
566 return None
569class _QuantumOnlySkyPixBoundsBuilder(SkyPixBoundsBuilder):
570 """Implementation of `SkyPixBoundsBuilder` for when the quantum data IDs
571 provide the only relevant spatial regions.
572 """
574 def __init__(self, dimensions: Iterable[SkyPixDimension], quantum_data_id: DataCoordinate) -> None:
575 self._region = quantum_data_id.region
576 self._dimensions = dimensions
578 def finish(self) -> dict[str, RangeSet]:
579 return {
580 dimension.name: dimension.pixelization.envelope(self._region) for dimension in self._dimensions
581 }
584class _QuantumOnlyTimespanBuilder(TimespanBuilder):
585 """Implementation of `TimespanBuilder` for when the quantum data IDs
586 provide the only relevant timespans.
587 """
589 def __init__(self, quantum_data_id: DataCoordinate) -> None:
590 self._timespan = cast(Timespan, quantum_data_id.timespan)
592 def finish(self) -> Timespan:
593 return self._timespan
596class _UnboundedSkyPixBoundsBuilder(SkyPixBoundsBuilder):
597 """Implementation of `SkyPixBoundsBuilder` for when the bounds cover the
598 full sky.
599 """
601 def __init__(self, dimensions: Iterable[SkyPixDimension]):
602 self._dimensions = dimensions
604 def finish(self) -> dict[str, RangeSet]:
605 return {dimension.name: dimension.pixelization.universe() for dimension in self._dimensions}
608class _UnboundedTimespanBuilder(TimespanBuilder):
609 """Implementation of `TimespanBuilder` for when the timespan covers all
610 time.
611 """
613 def finish(self) -> Timespan:
614 return Timespan(None, None)
617class _ConnectionSkyPixBoundsBuilder(SkyPixBoundsBuilder):
618 """Implementation of `SkyPixBoundsBuilder` for when other input or output
619 connections contribute to the spatial bounds.
620 """
622 def __init__(
623 self,
624 task_node: TaskNode,
625 bounds_connections: frozenset[str],
626 dimensions: Iterable[SkyPixDimension],
627 quantum_data_id: DataCoordinate,
628 ) -> None:
629 self._dimensions = dimensions
630 self._regions: list[Region] = []
631 if task_node.dimensions.spatial:
632 self._regions.append(quantum_data_id.region)
633 self._dataset_type_names: set[str] = set()
634 for connection_name in bounds_connections:
635 if edge := task_node.inputs.get(connection_name):
636 self._dataset_type_names.add(edge.parent_dataset_type_name)
637 else:
638 self._dataset_type_names.add(task_node.outputs[connection_name].parent_dataset_type_name)
639 # Note that we end up raising if the input is a prerequisite (and
640 # hence not in task_node.inputs or task_node.outputs); this
641 # justifies the cast in `handle_dataset`.
643 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None:
644 if parent_dataset_type_name in self._dataset_type_names:
645 self._regions.append(data_id.region)
647 def finish(self) -> dict[str, RangeSet]:
648 result = {}
649 for dimension in self._dimensions:
650 bounds = RangeSet()
651 for region in self._regions:
652 bounds |= dimension.pixelization.envelope(region)
653 result[dimension.name] = bounds
654 return result
657class _ConnectionTimespanBuilder(TimespanBuilder):
658 """Implementation of `TimespanBuilder` for when other input or output
659 connections contribute to the timespan.
660 """
662 def __init__(
663 self,
664 task_node: TaskNode,
665 bounds_connections: frozenset[str],
666 quantum_data_id: DataCoordinate,
667 ) -> None:
668 timespan = (
669 cast(Timespan, quantum_data_id.timespan)
670 if task_node.dimensions.temporal
671 else Timespan.makeEmpty()
672 )
673 self._begin_nsec = timespan.nsec[0]
674 self._end_nsec = timespan.nsec[1]
675 self._dataset_type_names = set()
676 for connection_name in bounds_connections:
677 if edge := task_node.inputs.get(connection_name):
678 self._dataset_type_names.add(edge.parent_dataset_type_name)
679 else:
680 self._dataset_type_names.add(task_node.outputs[connection_name].parent_dataset_type_name)
681 # Note that we end up raising if the input is a prerequisite (and
682 # hence not in task_node.inputs or task_node.outputs); this
683 # justifies the cast in `handle_dataset`.
685 def handle_dataset(self, parent_dataset_type_name: str, data_id: DataCoordinate) -> None:
686 if parent_dataset_type_name in self._dataset_type_names:
687 nsec = cast(Timespan, data_id.timespan).nsec
688 self._begin_nsec = min(self._begin_nsec, nsec[0])
689 self._end_nsec = max(self._end_nsec, nsec[1])
691 def finish(self) -> Timespan:
692 return Timespan(None, None, _nsec=(self._begin_nsec, self._end_nsec))