Coverage for python / lsst / obs / base / defineVisits.py: 23%
482 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:28 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = [
25 "ComputeVisitRegionsConfig",
26 "ComputeVisitRegionsTask",
27 "DefineVisitsConfig",
28 "DefineVisitsTask",
29 "GroupExposuresConfig",
30 "GroupExposuresTask",
31 "VisitDefinitionData",
32 "VisitSystem",
33]
35import cmath
36import dataclasses
37import enum
38import math
39import operator
40from abc import ABCMeta, abstractmethod
41from collections import defaultdict
42from collections.abc import Callable, Iterable, Iterator, Sequence
43from contextlib import contextmanager
44from typing import Any, ClassVar, TypeVar, cast
46import lsst.geom
47from lsst.afw.cameraGeom import FOCAL_PLANE, PIXELS
48from lsst.daf.butler import Butler, DataCoordinate, DataId, DimensionRecord, Progress, Timespan
49from lsst.daf.butler.queries import Query
50from lsst.daf.butler.registry import ConflictingDefinitionError
51from lsst.geom import Box2D
52from lsst.pex.config import Config, Field, makeRegistry, registerConfigurable
53from lsst.pipe.base import Struct, Task
54from lsst.sphgeom import ConvexPolygon, Region, UnitVector3d
55from lsst.utils.introspection import get_full_type_name
57from ._instrument import Instrument, loadCamera
60class VisitSystem(enum.Enum):
61 """Enumeration used to label different visit systems."""
63 ONE_TO_ONE = 0
64 """Each exposure is assigned to its own visit."""
66 BY_GROUP_METADATA = 1
67 """Visit membership is defined by the value of the group dimension or, for
68 older dimension universes, exposure.group_id."""
70 BY_SEQ_START_END = 2
71 """Visit membership is defined by the values of the ``exposure.day_obs``,
72 ``exposure.seq_start``, and ``exposure.seq_end`` values.
73 """
75 @classmethod
76 def all(cls) -> frozenset[VisitSystem]:
77 """Return a `frozenset` containing all members."""
78 return frozenset(cls.__members__.values())
80 @classmethod
81 def from_name(cls, external_name: str) -> VisitSystem:
82 """Construct the enumeration from given name."""
83 name = external_name.upper()
84 name = name.replace("-", "_")
85 try:
86 return cls.__members__[name]
87 except KeyError:
88 raise KeyError(f"Visit system named '{external_name}' not known.") from None
90 @classmethod
91 def from_names(cls, names: Iterable[str] | None) -> frozenset[VisitSystem]:
92 """Return a `frozenset` of all the visit systems matching the supplied
93 names.
95 Parameters
96 ----------
97 names : iterable of `str`, or `None`
98 Names of visit systems. Case insensitive. If `None` or empty, all
99 the visit systems are returned.
101 Returns
102 -------
103 systems : `frozenset` of `VisitSystem`
104 The matching visit systems.
105 """
106 if not names:
107 return cls.all()
109 return frozenset({cls.from_name(name) for name in names})
111 def __str__(self) -> str:
112 name = self.name.lower()
113 name = name.replace("_", "-")
114 return name
117@dataclasses.dataclass
118class VisitDefinitionData:
119 """Struct representing a group of exposures that will be used to define a
120 visit.
121 """
123 instrument: str
124 """Name of the instrument this visit will be associated with.
125 """
127 id: int
128 """Integer ID of the visit.
130 This must be unique across all visit systems for the instrument.
131 """
133 name: str
134 """String name for the visit.
136 This must be unique across all visit systems for the instrument.
137 """
139 visit_systems: set[VisitSystem]
140 """All the visit systems associated with this visit."""
142 exposures: list[DimensionRecord] = dataclasses.field(default_factory=list)
143 """Dimension records for the exposures that are part of this visit.
144 """
147@dataclasses.dataclass
148class _VisitRecords:
149 """Struct containing the dimension records associated with a visit."""
151 visit: DimensionRecord
152 """Record for the 'visit' dimension itself.
153 """
155 visit_definition: list[DimensionRecord]
156 """Records for 'visit_definition', which relates 'visit' to 'exposure'.
157 """
159 visit_detector_region: list[DimensionRecord]
160 """Records for 'visit_detector_region', which associates the combination
161 of a 'visit' and a 'detector' with a region on the sky.
162 """
164 visit_system_membership: list[DimensionRecord]
165 """Records relating visits to an associated visit system."""
168class GroupExposuresConfig(Config):
169 """Configure exposure grouping."""
172class GroupExposuresTask(Task, metaclass=ABCMeta):
173 """Abstract base class for the subtask of `DefineVisitsTask` that is
174 responsible for grouping exposures into visits.
176 Subclasses should be registered with `GroupExposuresTask.registry` to
177 enable use by `DefineVisitsTask`, and should generally correspond to a
178 particular 'visit_system' dimension value. They are also responsible for
179 defining visit IDs and names that are unique across all visit systems in
180 use by an instrument.
182 Parameters
183 ----------
184 config : `GroupExposuresConfig`
185 Configuration information.
186 **kwargs
187 Additional keyword arguments forwarded to the `lsst.pipe.base.Task`
188 constructor.
189 """
191 def __init__(self, config: GroupExposuresConfig, **kwargs: Any):
192 Task.__init__(self, config=config, **kwargs)
194 ConfigClass = GroupExposuresConfig
196 _DefaultName = "groupExposures"
198 registry = makeRegistry(
199 doc="Registry of algorithms for grouping exposures into visits.",
200 configBaseType=GroupExposuresConfig,
201 )
203 @abstractmethod
204 def find_missing(
205 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
206 ) -> list[DimensionRecord]:
207 """Determine, if possible, which exposures might be missing.
209 Parameters
210 ----------
211 exposures : `list` of `lsst.daf.butler.DimensionRecord`
212 The exposure records to analyze.
213 registry : `lsst.daf.butler.Registry`
214 A butler registry that contains these exposure records.
216 Returns
217 -------
218 missing : `list` of `lsst.daf.butler.DimensionRecord`
219 Any exposure records present in registry that were related to
220 the given exposures but were missing from that list and deemed
221 to be relevant.
223 Notes
224 -----
225 Only some grouping schemes are able to find missing exposures. It
226 is acceptable to return an empty list.
227 """
228 raise NotImplementedError()
230 @abstractmethod
231 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
232 """Group the exposures in a way most natural for this visit definition.
234 Parameters
235 ----------
236 exposures : `list` of `lsst.daf.butler.DimensionRecord`
237 The exposure records to group.
239 Returns
240 -------
241 groups : `dict` [Any, `list` [ `lsst.daf.butler.DimensionRecord` ] ]
242 Groupings of exposure records. The key type is relevant to the
243 specific visit definition and could be a string or a tuple.
244 """
245 raise NotImplementedError()
247 @abstractmethod
248 def group(
249 self, exposures: list[DimensionRecord], instrument: Instrument
250 ) -> Iterable[VisitDefinitionData]:
251 """Group the given exposures into visits.
253 Parameters
254 ----------
255 exposures : `list` [ `lsst.daf.butler.DimensionRecord` ]
256 DimensionRecords (for the 'exposure' dimension) describing the
257 exposures to group.
258 instrument : `~lsst.obs.base.Instrument`
259 Instrument specification that can be used to optionally support
260 some visit ID definitions.
262 Returns
263 -------
264 visits : `~collections.abc.Iterable` [ `VisitDefinitionData` ]
265 Structs identifying the visits and the exposures associated with
266 them. This may be an iterator or a container.
267 """
268 raise NotImplementedError()
270 def getVisitSystems(self) -> set[VisitSystem]:
271 """Return identifiers for the 'visit_system' dimension this
272 algorithm implements.
274 Returns
275 -------
276 visit_systems : `set` [`VisitSystem`]
277 The visit systems used by this algorithm.
278 """
279 raise NotImplementedError()
282class ComputeVisitRegionsConfig(Config):
283 """Configure visit region calculations."""
285 padding: Field[int] = Field(
286 dtype=int,
287 default=250,
288 doc=(
289 "Pad raw image bounding boxes with specified number of pixels "
290 "when calculating their (conservatively large) region on the "
291 "sky. Note that the config value for pixelMargin of the "
292 "reference object loaders in meas_algorithms should be <= "
293 "the value set here."
294 ),
295 )
298class ComputeVisitRegionsTask(Task, metaclass=ABCMeta):
299 """Abstract base class for the subtask of `DefineVisitsTask` that is
300 responsible for extracting spatial regions for visits and visit+detector
301 combinations.
303 Subclasses should be registered with `ComputeVisitRegionsTask.registry` to
304 enable use by `DefineVisitsTask`.
306 Parameters
307 ----------
308 config : `ComputeVisitRegionsConfig`
309 Configuration information.
310 butler : `lsst.daf.butler.Butler`
311 The butler to use.
312 **kwargs
313 Additional keyword arguments forwarded to the `~lsst.pipe.base.Task`
314 constructor.
315 """
317 def __init__(self, config: ComputeVisitRegionsConfig, *, butler: Butler, **kwargs: Any):
318 Task.__init__(self, config=config, **kwargs)
319 self.butler = butler
320 self.instrumentMap: dict[str, Instrument] = {}
322 ConfigClass = ComputeVisitRegionsConfig
324 _DefaultName = "computeVisitRegions"
326 registry = makeRegistry(
327 doc="Registry of algorithms for computing on-sky regions for visits and visit+detector combinations.",
328 configBaseType=ComputeVisitRegionsConfig,
329 )
331 def getInstrument(self, instrumentName: str) -> Instrument:
332 """Retrieve an `~lsst.obs.base.Instrument` associated with this
333 instrument name.
335 Parameters
336 ----------
337 instrumentName : `str`
338 The name of the instrument.
340 Returns
341 -------
342 instrument : `~lsst.obs.base.Instrument`
343 The associated instrument object.
345 Notes
346 -----
347 The result is cached.
348 """
349 instrument = self.instrumentMap.get(instrumentName)
350 if instrument is None:
351 instrument = Instrument.fromName(instrumentName, self.butler.registry)
352 self.instrumentMap[instrumentName] = instrument
353 return instrument
355 @abstractmethod
356 def compute(
357 self,
358 visit: VisitDefinitionData,
359 *,
360 collections: Sequence[str] | str | None = None,
361 ) -> tuple[Region, dict[int, Region]]:
362 """Compute regions for the given visit and all detectors in that visit.
364 Parameters
365 ----------
366 visit : `VisitDefinitionData`
367 Struct describing the visit and the exposures associated with it.
368 collections : `collections.abc.Sequence` [ `str` ] or `str` or `None`
369 Collections to be searched for camera geometry, overriding
370 ``self.butler.collections.defaults``. Can be any of the types
371 supported by the ``collections`` argument to butler construction.
373 Returns
374 -------
375 visitRegion : `lsst.sphgeom.Region`
376 Region for the full visit.
377 visitDetectorRegions : `dict` [ `int`, `lsst.sphgeom.Region` ]
378 Dictionary mapping detector ID to the region for that detector.
379 Should include all detectors in the visit.
380 """
381 raise NotImplementedError()
384class DefineVisitsConfig(Config):
385 """Configure visit definition."""
387 groupExposures = GroupExposuresTask.registry.makeField(
388 doc="Algorithm for grouping exposures into visits.",
389 default="one-to-one",
390 )
391 computeVisitRegions = ComputeVisitRegionsTask.registry.makeField(
392 doc="Algorithm from computing visit and visit+detector regions.",
393 default="single-raw-wcs",
394 )
395 ignoreNonScienceExposures: Field[bool] = Field(
396 doc=(
397 "If True, silently ignore input exposures that do not have "
398 "observation_type=SCIENCE. If False, raise an exception if one "
399 "encountered."
400 ),
401 dtype=bool,
402 optional=False,
403 default=True,
404 )
405 updateObsCoreTable: Field[bool] = Field(
406 doc=(
407 "If True, update exposure regions in obscore table after visits "
408 "are defined. If False, do not update obscore table."
409 ),
410 dtype=bool,
411 default=True,
412 )
415class DefineVisitsTask(Task):
416 """Driver Task for defining visits (and their spatial regions) in Gen3
417 Butler repositories.
419 Parameters
420 ----------
421 config : `DefineVisitsConfig`
422 Configuration for the task.
423 butler : `~lsst.daf.butler.Butler`
424 Writeable butler instance. Will be used to read ``camera`` datasets
425 and insert/sync dimension data.
426 **kwargs
427 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
428 constructor.
430 Notes
431 -----
432 Each instance of `DefineVisitsTask` reads from / writes to the same Butler.
433 Each invocation of `DefineVisitsTask.run` processes an independent group of
434 exposures into one or more new visits, all belonging to the same visit
435 system and instrument.
437 The actual work of grouping exposures and computing regions is delegated to
438 pluggable subtasks (`GroupExposuresTask` and `ComputeVisitRegionsTask`),
439 respectively. The defaults are to create one visit for every exposure, and
440 to use exactly one (arbitrary) detector-level raw dataset's WCS along with
441 camera geometry to compute regions for all detectors, but the raw WCS is
442 recomputed from the ``exposure`` dimension record's rotation angle and
443 boresight rather than by loading the ``raw.wcs`` dataset directly. Other
444 implementations can be created and configured for instruments for which
445 these choices are unsuitable (e.g. because visits and exposures are not
446 one-to-one, or because ``raw.wcs`` datasets for different detectors may not
447 be consistent with camera geometry).
449 Defining the same visit the same way multiple times (e.g. via multiple
450 invocations of this task on the same exposures, with the same
451 configuration) is safe, but it may be inefficient, as most of the work must
452 be done before new visits can be compared to existing visits.
453 """
455 def __init__(self, config: DefineVisitsConfig, *, butler: Butler, **kwargs: Any):
456 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
457 super().__init__(config, **kwargs)
458 self.butler = butler
459 self.universe = self.butler.dimensions
460 self.progress = Progress("obs.base.DefineVisitsTask")
461 self.makeSubtask("groupExposures")
462 self.makeSubtask("computeVisitRegions", butler=self.butler)
464 def _reduce_kwargs(self) -> dict:
465 # Add extra parameters to pickle
466 return dict(**super()._reduce_kwargs(), butler=self.butler)
468 ConfigClass: ClassVar[type[Config]] = DefineVisitsConfig
470 _DefaultName: ClassVar[str] = "defineVisits"
472 config: DefineVisitsConfig
473 groupExposures: GroupExposuresTask
474 computeVisitRegions: ComputeVisitRegionsTask
476 def _buildVisitRecords(
477 self, definition: VisitDefinitionData, *, collections: Sequence[str] | str | None = None
478 ) -> _VisitRecords:
479 """Build the DimensionRecords associated with a visit.
481 Parameters
482 ----------
483 definition : `VisitDefinitionData`
484 Struct with identifiers for the visit and records for its
485 constituent exposures.
486 collections : `collections.abc.Sequence` [ `str` ] or `str` or `None`
487 Collections to be searched for camera geometry, overriding
488 ``self.butler.collections.defaults``. Can be any of the types
489 supported by the ``collections`` argument to butler construction.
491 Returns
492 -------
493 records : `_VisitRecords`
494 Struct containing DimensionRecords for the visit, including
495 associated dimension elements.
496 """
497 dimension = self.universe["visit"]
499 # Some registries support additional items.
500 supported = {meta.name for meta in dimension.metadata}
502 # Compute all regions.
503 visitRegion, visitDetectorRegions = self.computeVisitRegions.compute(
504 definition, collections=collections
505 )
506 # Aggregate other exposure quantities.
507 timespan = Timespan(
508 begin=_reduceOrNone(min, (e.timespan.begin for e in definition.exposures)),
509 end=_reduceOrNone(max, (e.timespan.end for e in definition.exposures)),
510 )
511 exposure_time = _reduceOrNone(operator.add, (e.exposure_time for e in definition.exposures))
512 physical_filter = _reduceOrNone(_value_if_equal, (e.physical_filter for e in definition.exposures))
513 target_name = _reduceOrNone(_value_if_equal, (e.target_name for e in definition.exposures))
514 science_program = _reduceOrNone(_value_if_equal, (e.science_program for e in definition.exposures))
516 # observing day for a visit is defined by the earliest observation
517 # of the visit
518 observing_day = _reduceOrNone(min, (e.day_obs for e in definition.exposures))
519 observation_reason = _reduceOrNone(
520 _value_if_equal, (e.observation_reason for e in definition.exposures)
521 )
522 if observation_reason is None:
523 # Be explicit about there being multiple reasons
524 observation_reason = "various"
526 # Use the mean zenith angle as an approximation
527 zenith_angle = _reduceOrNone(operator.add, (e.zenith_angle for e in definition.exposures))
528 if zenith_angle is not None:
529 zenith_angle /= len(definition.exposures)
531 # New records that may not be supported.
532 extras: dict[str, Any] = {}
533 if "seq_num" in supported:
534 extras["seq_num"] = _reduceOrNone(min, (e.seq_num for e in definition.exposures))
535 if "azimuth" in supported:
536 # Must take into account 0/360 problem.
537 extras["azimuth"] = _calc_mean_angle([e.azimuth for e in definition.exposures])
539 # visit_system handling changed. This is the logic for visit/exposure
540 # that has support for seq_start/seq_end.
541 if "seq_num" in supported:
542 # Map visit to exposure.
543 visit_definition = [
544 self.universe["visit_definition"].RecordClass(
545 instrument=definition.instrument,
546 visit=definition.id,
547 exposure=exposure.id,
548 )
549 for exposure in definition.exposures
550 ]
552 # Map visit to visit system.
553 visit_system_membership = []
554 for visit_system in self.groupExposures.getVisitSystems():
555 if visit_system in definition.visit_systems:
556 record = self.universe["visit_system_membership"].RecordClass(
557 instrument=definition.instrument,
558 visit=definition.id,
559 visit_system=visit_system.value,
560 )
561 visit_system_membership.append(record)
563 else:
564 # The old approach can only handle one visit system at a time.
565 # If we have been configured with multiple options, prefer the
566 # one-to-one.
567 visit_systems = self.groupExposures.getVisitSystems()
568 if len(visit_systems) > 1:
569 one_to_one = VisitSystem.from_name("one-to-one")
570 if one_to_one not in visit_systems:
571 raise ValueError(
572 f"Multiple visit systems specified ({visit_systems}) for use with old"
573 " dimension universe but unable to find one-to-one."
574 )
575 visit_system = one_to_one
576 else:
577 visit_system = visit_systems.pop()
579 extras["visit_system"] = visit_system.value
581 # The old visit_definition included visit system.
582 visit_definition = [
583 self.universe["visit_definition"].RecordClass(
584 instrument=definition.instrument,
585 visit=definition.id,
586 exposure=exposure.id,
587 visit_system=visit_system.value,
588 )
589 for exposure in definition.exposures
590 ]
592 # This concept does not exist in old schema.
593 visit_system_membership = []
595 # Construct the actual DimensionRecords.
596 return _VisitRecords(
597 visit=dimension.RecordClass(
598 instrument=definition.instrument,
599 id=definition.id,
600 name=definition.name,
601 physical_filter=physical_filter,
602 target_name=target_name,
603 science_program=science_program,
604 observation_reason=observation_reason,
605 day_obs=observing_day,
606 zenith_angle=zenith_angle,
607 exposure_time=exposure_time,
608 timespan=timespan,
609 region=visitRegion,
610 # TODO: no seeing value in exposure dimension records, so we
611 # can't set that here. But there are many other columns that
612 # both dimensions should probably have as well.
613 **extras,
614 ),
615 visit_definition=visit_definition,
616 visit_system_membership=visit_system_membership,
617 visit_detector_region=[
618 self.universe["visit_detector_region"].RecordClass(
619 instrument=definition.instrument,
620 visit=definition.id,
621 detector=detectorId,
622 region=detectorRegion,
623 )
624 for detectorId, detectorRegion in visitDetectorRegions.items()
625 ],
626 )
628 def run(
629 self,
630 dataIds_or_records: Iterable[DataId | DimensionRecord],
631 *,
632 collections: Sequence[str] | str | None = None,
633 update_records: bool = False,
634 incremental: bool = False,
635 skip_conflicting: bool = False,
636 prefilter: bool = False,
637 check_detector_regions: bool = False,
638 ) -> Struct:
639 """Add visit definitions to the registry for the given exposures.
641 Parameters
642 ----------
643 dataIds_or_records : `~collections.abc.Iterable` [ `dict` or \
644 `~lsst.daf.butler.DataCoordinate` or \
645 `~lsst.daf.butler.DimensionRecord` ]
646 Exposure-level data IDs or explicit exposure records. These must
647 all correspond to the same instrument, and are expected to be
648 on-sky science exposures.
649 collections : `~collections.abc.Sequence` [ `str` ] or `str` or `None`
650 Collections to be searched for camera geometry, overriding
651 ``self.butler.collections.defaults``. Can be any of the types
652 supported by the ``collections`` argument to butler construction.
653 update_records : `bool`, optional
654 If `True` (`False` is default), update existing ``visit`` records
655 and ``visit_detector_region`` records. THIS IS AN ADVANCED OPTION
656 THAT SHOULD ONLY BE USED TO FIX REGIONS AND/OR METADATA THAT ARE
657 KNOWN TO BE BAD, AND IT CANNOT BE USED TO REMOVE EXPOSURES OR
658 DETECTORS FROM A VISIT.
659 incremental : `bool`, optional
660 If `True` indicate that exposures are being ingested incrementally
661 and visit definition will be run on partial visits. This will
662 allow the ``visit`` record to be updated if it already exists, but
663 (unlike ``update_records=True``) it will only update the
664 ``visit_detector_region`` records if the ``visit`` record's region
665 changes. If there is any risk that files are being ingested
666 incrementally it is critical that this parameter is set to `True`
667 and not to rely on ``update_records``.
668 skip_conflicting : `bool`, optional
669 If `True` do not raise an error if there is a change in an existing
670 visit definition. This can be used if you solely want to define
671 visits that were somehow missed previously. It has no effect if
672 ``update_records`` is `True` or incremental mode is enabled.
673 prefilter : `bool`, optional
674 If `True`, filter out exposures that already have a visit defined
675 by querying in advance. This is not compatible with
676 ``incremental=True`` or ``update_records=True``. Note that for
677 exposures that may be associated with multiple visits, the
678 presence of any existing visit will cause that exposure to be
679 skipped entirely.
680 check_detector_regions : `bool`, optional
681 If `True`, check existing visits to see if they have a
682 ``visit_detector_region`` record for every detector. The default
683 is to assume that if a ``visit`` record is present, the
684 ``visit_detector_region`` records are as well.
686 Returns
687 -------
688 result : `lsst.pipe.base.Struct`
689 Structure with the following attributes (all `int`):
691 - n_visits: total number of visits defined
692 - n_skipped: number of visits that were already present left alone
693 - n_new: number of new visits inserted
694 - n_fully_updated: number of existing visits fully updated
695 - n_partially_updated: number of visits with non-geometry updates
696 - n_filtered: number of exposures dropped because they were
697 already associated with visits.
699 Raises
700 ------
701 lsst.daf.butler.registry.ConflictingDefinitionError
702 Raised if a visit ID conflict is detected and the existing visit
703 differs from the new one.
704 """
705 if prefilter and (update_records or incremental):
706 raise TypeError("prefilter=True cannot be used with update_records=True or incremental=True")
708 # Normalize, expand, and deduplicate data IDs.
709 self.log.info("Preprocessing data IDs.")
710 dimensions = self.universe.conform(["exposure"])
712 exposure_records: dict[DataCoordinate, DimensionRecord] = {}
713 instruments: set[str] = set()
714 instrument_cls_name: str | None = None
715 instrument_record: DimensionRecord | None = None
717 # Go through the supplied dataset extracting records.
718 # Check that only a single instrument is being used.
719 for external in dataIds_or_records:
720 if isinstance(external, DimensionRecord):
721 record = external
722 if str(record.definition) != "exposure":
723 raise ValueError(f"Can only define visits from exposure records, not {record}.")
724 else:
725 data_id = self.butler.registry.expandDataId(external, dimensions=dimensions)
726 exp_record = data_id.records["exposure"]
727 assert exp_record is not None, "Guaranteed by expandDataIds call earlier."
728 record = exp_record
729 instrument_record = data_id.records["instrument"]
731 # LSSTCam data can assign ra/dec to flats, and dome-closed
732 # engineering tests. Do not assign a visit if we know that
733 # can_see_sky is False. Treat None as True for this test.
734 can_see_sky = getattr(record, "can_see_sky", True)
735 if (
736 record.tracking_ra is None
737 or record.tracking_dec is None
738 or record.sky_angle is None
739 or can_see_sky is False
740 ):
741 if self.config.ignoreNonScienceExposures:
742 continue
743 else:
744 raise RuntimeError(
745 f"Input exposure {external} has observation_type "
746 f"{record.observation_type}, but is not on sky."
747 )
748 instrument_name = record.instrument
749 instruments.add(instrument_name)
750 exposure_records[record.dataId] = record
752 n_filtered = 0
753 visits_with_complete_regions: set[DataCoordinate] | None = None
754 if prefilter or check_detector_regions:
755 with self._query_exposures(exposure_records) as query:
756 if check_detector_regions:
757 visits_with_complete_regions = self._find_visits_with_all_detector_regions(query)
758 if prefilter:
759 n_filtered = self._prefilter(exposure_records, query, visits_with_complete_regions)
761 # Downstream APIs expect a list of records, not a dict.
762 exposures = list(exposure_records.values())
763 if not exposures:
764 self.log.info("No on-sky exposures found after filtering.")
765 return Struct(
766 n_visits=0,
767 n_skipped=0,
768 n_new=0,
769 n_partially_updated=0,
770 n_fully_updated=0,
771 n_filtered=n_filtered,
772 )
773 if len(instruments) > 1:
774 raise RuntimeError(
775 "All data IDs passed to DefineVisitsTask.run must be "
776 f"from the same instrument; got {instruments}."
777 )
778 (instrument,) = instruments
780 # Might need the instrument class for later depending on universe
781 # and grouping scheme.
782 if instrument_cls_name is None:
783 if instrument_record is None:
784 # We were given a DimensionRecord instead of a DataCoordinate.
786 instrument_records = self.butler.query_dimension_records(
787 "instrument", instrument=instrument, limit=1
788 )
789 if len(instrument_records) != 1:
790 raise RuntimeError(
791 f"Instrument {instrument} found in dimension record but unknown to butler."
792 )
793 instrument_record = instrument_records[0]
794 instrument_cls_name = instrument_record.class_name
795 assert instrument_cls_name is not None, "Instrument must be defined by this point"
796 instrument_helper = Instrument.from_string(instrument_cls_name)
798 # Ensure the visit_system our grouping algorithm uses is in the
799 # registry, if it wasn't already.
800 visitSystems = self.groupExposures.getVisitSystems()
801 for visitSystem in visitSystems:
802 self.log.info("Registering visit_system %d: %s.", visitSystem.value, visitSystem)
803 self.butler.registry.syncDimensionData(
804 "visit_system",
805 {"instrument": instrument, "id": visitSystem.value, "name": str(visitSystem)},
806 )
808 # In true incremental we will be given the second snap on its
809 # own on the assumption that the previous snap was already handled.
810 # For correct grouping we need access to the other exposures in the
811 # visit.
812 if incremental:
813 exposures.extend(self.groupExposures.find_missing(exposures, self.butler.registry))
815 # Group exposures into visits, delegating to subtask.
816 self.log.info("Grouping %d exposure(s) into visits.", len(exposures))
817 definitions = list(self.groupExposures.group(exposures, instrument_helper))
818 # Iterate over visits, compute regions, and insert dimension data, one
819 # transaction per visit. If a visit already exists, we skip all other
820 # inserts.
821 self.log.info("Computing regions and other metadata for %d visit(s).", len(definitions))
822 n_skipped: int = 0
823 n_new: int = 0
824 n_fully_updated: int = 0
825 n_partially_updated: int = 0
826 for visitDefinition in self.progress.wrap(
827 definitions, total=len(definitions), desc="Computing regions and inserting visits"
828 ):
829 visitRecords = self._buildVisitRecords(visitDefinition, collections=collections)
830 with self.butler.registry.transaction():
831 try:
832 inserted_or_updated = self.butler.registry.syncDimensionData(
833 "visit",
834 visitRecords.visit,
835 update=(update_records or incremental),
836 )
837 except ConflictingDefinitionError as err:
838 if not skip_conflicting:
839 err.add_note(f"on visit {visitDefinition.id}")
840 raise
841 inserted_or_updated = False
842 self.log.verbose("Skipping visit %d due to conflict.", visitRecords.visit.id)
843 needs_visit_detector_region_inserts = (
844 visits_with_complete_regions is not None
845 and visitRecords.visit.dataId not in visits_with_complete_regions
846 )
847 if inserted_or_updated or update_records or needs_visit_detector_region_inserts:
848 if inserted_or_updated is True:
849 # This is a new visit, not an update to an existing
850 # one, so insert visit definition.
851 # We don't allow visit definitions to change even when
852 # asked to update, because we'd have to delete the old
853 # visit_definitions first and also worry about what
854 # this does to datasets that already use the visit.
855 self.butler.registry.insertDimensionData(
856 "visit_definition", *visitRecords.visit_definition
857 )
858 if visitRecords.visit_system_membership:
859 self.butler.registry.insertDimensionData(
860 "visit_system_membership", *visitRecords.visit_system_membership
861 )
862 elif incremental and len(visitRecords.visit_definition) > 1:
863 # The visit record was modified. This could happen
864 # if a multi-snap visit was redefined with an
865 # additional snap so play it safe and allow for the
866 # visit definition to be updated. We use update=False
867 # here since there should not be any rows updated,
868 # just additional rows added. update=True does not work
869 # correctly with multiple records. In incremental mode
870 # we assume that the caller wants the visit definition
871 # to be updated and has no worries about provenance
872 # with the previous definition.
873 for definition in visitRecords.visit_definition:
874 self.butler.registry.syncDimensionData("visit_definition", definition)
875 if inserted_or_updated is True:
876 # Insert visit-detector regions if the visit is new.
877 self.butler.registry.insertDimensionData(
878 "visit_detector_region",
879 *visitRecords.visit_detector_region,
880 replace=False,
881 )
882 self.log.verbose(
883 "Inserted %s visit_detector_region records for new visit %s.",
884 len(visitRecords.visit_detector_region),
885 visitRecords.visit.id,
886 )
887 n_new += 1
888 # Cast below is because MyPy can't determine that
889 # inserted_or_updated can only be False if update_records
890 # is True.
891 elif (
892 update_records
893 or needs_visit_detector_region_inserts
894 or "region" in cast(dict, inserted_or_updated)
895 ):
896 if needs_visit_detector_region_inserts:
897 self.log.info(
898 "Visit %d was missing detector region records.", visitRecords.visit.id
899 )
900 # Replace visit-detector regions if we were told to
901 # update records explicitly, or if the visit region
902 # changed in an incremental=True update, or if this
903 # visit didn't have complete detector regions.
904 self.butler.registry.insertDimensionData(
905 "visit_detector_region",
906 *visitRecords.visit_detector_region,
907 replace=True,
908 )
909 self.log.verbose(
910 "Re-inserted %s visit_detector_region records for updated visit %s.",
911 len(visitRecords.visit_detector_region),
912 visitRecords.visit.id,
913 )
914 n_fully_updated += 1
915 else:
916 self.log.verbose(
917 "Updated visit %s without modifying visit_detector_region records.",
918 visitRecords.visit.id,
919 )
920 n_partially_updated += 1
922 # Update obscore exposure records with region information
923 # from corresponding visits.
924 if self.config.updateObsCoreTable:
925 if obscore_manager := self.butler.registry.obsCoreTableManager:
926 obscore_updates: list[tuple[int, int, Region]] = []
927 exposure_ids = [rec.exposure for rec in visitRecords.visit_definition]
928 for record in visitRecords.visit_detector_region:
929 obscore_updates += [
930 (exposure, record.detector, record.region) for exposure in exposure_ids
931 ]
932 if obscore_updates:
933 obscore_manager.update_exposure_regions(instrument, obscore_updates)
934 else:
935 self.log.verbose("Skipped already-existing visit %s.", visitRecords.visit.id)
936 n_skipped += 1
937 self.log.info(
938 "Finished writing database records for %d visit(s): %s left unchanged, %s new, "
939 "%s updated with new detector regions, %s updated without new detector regions.",
940 len(definitions),
941 n_skipped,
942 n_new,
943 n_fully_updated,
944 n_partially_updated,
945 )
946 return Struct(
947 n_visits=len(definitions),
948 n_skipped=n_skipped,
949 n_new=n_new,
950 n_fully_updated=n_fully_updated,
951 n_partially_updated=n_partially_updated,
952 n_filtered=n_filtered,
953 )
955 @contextmanager
956 def _query_exposures(self, exposure_records: dict[DataCoordinate, DimensionRecord]) -> Iterator[Query]:
957 with self.butler.query() as query:
958 yield query.join_data_coordinates(exposure_records.keys())
960 def _prefilter(
961 self,
962 exposure_records: dict[DataCoordinate, DimensionRecord],
963 query: Query,
964 visits_with_complete_regions: set[DataCoordinate] | None,
965 ) -> int:
966 n_filtered: int = 0
967 exposure_dimension_group = self.butler.dimensions["exposure"].minimal_group
968 visit_dimension_group = self.butler.dimensions["visit"].minimal_group
969 for dual_data_id in query.data_ids(["exposure", "visit"]):
970 if (
971 visits_with_complete_regions is None
972 or dual_data_id.subset(visit_dimension_group) in visits_with_complete_regions
973 ):
974 if exposure_records.pop(dual_data_id.subset(exposure_dimension_group), None) is not None:
975 n_filtered += 1
976 if n_filtered:
977 self.log.info(
978 "Filtered out %d on-sky exposure(s) (of %d) that were already associated with a visit.",
979 n_filtered,
980 n_filtered + len(exposure_records),
981 )
982 return n_filtered
984 def _find_visits_with_all_detector_regions(self, query: Query) -> set[DataCoordinate]:
985 all_detectors: dict[str, set[int]] = {}
986 for detector_data_id in query.data_ids(["detector"]):
987 all_detectors.setdefault(cast(str, detector_data_id["instrument"]), set()).add(
988 cast(int, detector_data_id["detector"])
989 )
990 visit_dimension_group = self.butler.dimensions["visit"].minimal_group
991 detectors_with_regions_by_visit: dict[DataCoordinate, set[int]] = {}
992 for vdr in query.dimension_records("visit_detector_region"):
993 detectors_with_regions_by_visit.setdefault(vdr.dataId.subset(visit_dimension_group), set()).add(
994 vdr.detector
995 )
996 result: set[DataCoordinate] = set()
997 for visit_data_id, detectors_for_visit in detectors_with_regions_by_visit.items():
998 if len(detectors_for_visit) == len(all_detectors[cast(str, visit_data_id["instrument"])]):
999 result.add(visit_data_id)
1000 return result
1003_T = TypeVar("_T")
1006def _reduceOrNone(func: Callable[[_T, _T], _T | None], iterable: Iterable[_T | None]) -> _T | None:
1007 """Apply a binary function to pairs of elements in an iterable until a
1008 single value is returned, but return `None` if any element is `None` or
1009 there are no elements.
1010 """
1011 r: _T | None = None
1012 for v in iterable:
1013 if v is None:
1014 return None
1015 if r is None:
1016 r = v
1017 else:
1018 r = func(r, v)
1019 return r
1022def _value_if_equal(a: _T, b: _T) -> _T | None:
1023 """Return either argument if they are equal, or `None` if they are not."""
1024 return a if a == b else None
1027def _calc_mean_angle(angles: list[float]) -> float:
1028 """Calculate the mean angle, taking into account 0/360 wrapping.
1030 Parameters
1031 ----------
1032 angles : `list` [`float`]
1033 Angles to average together, in degrees.
1035 Returns
1036 -------
1037 average : `float`
1038 Average angle in degrees.
1039 """
1040 # Save on all the math if we only have one value.
1041 if len(angles) == 1:
1042 return angles[0]
1044 # Convert polar coordinates of unit circle to complex values.
1045 # Average the complex values.
1046 # Convert back to a phase angle.
1047 return math.degrees(cmath.phase(sum(cmath.rect(1.0, math.radians(d)) for d in angles) / len(angles)))
1050class _GroupExposuresOneToOneConfig(GroupExposuresConfig):
1051 visitSystemId: Field[int] = Field(
1052 doc="Integer ID of the visit_system implemented by this grouping algorithm.",
1053 dtype=int,
1054 default=0,
1055 deprecated="No longer used. Replaced by enum.",
1056 )
1057 visitSystemName: Field[str] = Field(
1058 doc="String name of the visit_system implemented by this grouping algorithm.",
1059 dtype=str,
1060 default="one-to-one",
1061 deprecated="No longer used. Replaced by enum.",
1062 )
1065@registerConfigurable("one-to-one", GroupExposuresTask.registry)
1066class _GroupExposuresOneToOneTask(GroupExposuresTask, metaclass=ABCMeta):
1067 """An exposure grouping algorithm that simply defines one visit for each
1068 exposure, reusing the exposures identifiers for the visit.
1069 """
1071 ConfigClass = _GroupExposuresOneToOneConfig
1073 def find_missing(
1074 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
1075 ) -> list[DimensionRecord]:
1076 # By definition no exposures can be missing.
1077 return []
1079 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
1080 # No grouping.
1081 return {exposure.id: [exposure] for exposure in exposures}
1083 def group(
1084 self, exposures: list[DimensionRecord], instrument: Instrument
1085 ) -> Iterable[VisitDefinitionData]:
1086 # Docstring inherited from GroupExposuresTask.
1087 visit_systems = {VisitSystem.from_name("one-to-one")}
1088 for exposure in exposures:
1089 yield VisitDefinitionData(
1090 instrument=exposure.instrument,
1091 id=exposure.id,
1092 name=exposure.obs_id,
1093 exposures=[exposure],
1094 visit_systems=visit_systems,
1095 )
1097 def getVisitSystems(self) -> set[VisitSystem]:
1098 # Docstring inherited from GroupExposuresTask.
1099 return set(VisitSystem.from_names(["one-to-one"]))
1102class _GroupExposuresByGroupMetadataConfig(GroupExposuresConfig):
1103 visitSystemId: Field[int] = Field(
1104 doc="Integer ID of the visit_system implemented by this grouping algorithm.",
1105 dtype=int,
1106 default=1,
1107 deprecated="No longer used. Replaced by enum.",
1108 )
1109 visitSystemName: Field[str] = Field(
1110 doc="String name of the visit_system implemented by this grouping algorithm.",
1111 dtype=str,
1112 default="by-group-metadata",
1113 deprecated="No longer used. Replaced by enum.",
1114 )
1117@registerConfigurable("by-group-metadata", GroupExposuresTask.registry)
1118class _GroupExposuresByGroupMetadataTask(GroupExposuresTask, metaclass=ABCMeta):
1119 """An exposure grouping algorithm that uses the exposure group.
1121 This algorithm uses the ``group`` dimension for modern universes and the
1122 ``exposure.group_id`` for older universes.
1124 This algorithm *assumes* group ID (generally populated from
1125 `astro_metadata_translator.ObservationInfo.visit_id`) is not just unique,
1126 but disjoint from all `ObservationInfo.exposure_id` values - if it isn't,
1127 it will be impossible to ever use both this grouping algorithm and the
1128 one-to-one algorithm for a particular camera in the same data repository.
1129 """
1131 ConfigClass = _GroupExposuresByGroupMetadataConfig
1133 def find_missing(
1134 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
1135 ) -> list[DimensionRecord]:
1136 groups = self.group_exposures(exposures)
1137 # Determine which group implementation we are using.
1138 if "group" in registry.dimensions["exposure"].implied:
1139 group_key = "group"
1140 else:
1141 group_key = "group_name"
1142 missing_exposures: list[DimensionRecord] = []
1143 for exposures_in_group in groups.values():
1144 # We can not tell how many exposures are expected to be in the
1145 # visit so we have to query every time.
1146 first = exposures_in_group[0]
1147 records = set(
1148 registry.queryDimensionRecords(
1149 "exposure",
1150 where=f"exposure.{group_key} = groupnam",
1151 bind={"groupnam": getattr(first, group_key)},
1152 instrument=first.instrument,
1153 )
1154 )
1155 records.difference_update(set(exposures_in_group))
1156 missing_exposures.extend(list(records))
1157 return missing_exposures
1159 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
1160 groups = defaultdict(list)
1161 group_key = "group"
1162 if exposures and hasattr(exposures[0], "group_name"):
1163 group_key = "group_name"
1164 for exposure in exposures:
1165 groups[getattr(exposure, group_key)].append(exposure)
1166 return groups
1168 def group(
1169 self, exposures: list[DimensionRecord], instrument: Instrument
1170 ) -> Iterable[VisitDefinitionData]:
1171 # Docstring inherited from GroupExposuresTask.
1172 visit_systems = {VisitSystem.from_name("by-group-metadata")}
1173 groups = self.group_exposures(exposures)
1174 has_group_dimension: bool | None = None
1175 for visitName, exposuresInGroup in groups.items():
1176 instrument_name = exposuresInGroup[0].instrument
1177 assert instrument_name == instrument.getName(), "Inconsistency in instrument name"
1178 visit_ids: set[int] = set()
1179 if has_group_dimension is None:
1180 has_group_dimension = hasattr(exposuresInGroup[0], "group")
1181 if has_group_dimension:
1182 visit_ids = {instrument.group_name_to_group_id(e.group) for e in exposuresInGroup}
1183 else:
1184 visit_ids = {e.group_id for e in exposuresInGroup}
1185 assert len(visit_ids) == 1, "Grouping by exposure group does not yield consistent group IDs"
1186 yield VisitDefinitionData(
1187 instrument=instrument_name,
1188 id=visit_ids.pop(),
1189 name=visitName,
1190 exposures=exposuresInGroup,
1191 visit_systems=visit_systems,
1192 )
1194 def getVisitSystems(self) -> set[VisitSystem]:
1195 # Docstring inherited from GroupExposuresTask.
1196 return set(VisitSystem.from_names(["by-group-metadata"]))
1199class _GroupExposuresByCounterAndExposuresConfig(GroupExposuresConfig):
1200 visitSystemId: Field[int] = Field(
1201 doc="Integer ID of the visit_system implemented by this grouping algorithm.",
1202 dtype=int,
1203 default=2,
1204 deprecated="No longer used. Replaced by enum.",
1205 )
1206 visitSystemName: Field[str] = Field(
1207 doc="String name of the visit_system implemented by this grouping algorithm.",
1208 dtype=str,
1209 default="by-counter-and-exposures",
1210 deprecated="No longer used. Replaced by enum.",
1211 )
1214@registerConfigurable("one-to-one-and-by-counter", GroupExposuresTask.registry)
1215class _GroupExposuresByCounterAndExposuresTask(GroupExposuresTask, metaclass=ABCMeta):
1216 """An exposure grouping algorithm that uses the sequence start and
1217 sequence end metadata to create multi-exposure visits, but also
1218 creates one-to-one visits.
1220 This algorithm uses the exposure.seq_start and
1221 exposure.seq_end fields to collect related snaps.
1222 It also groups single exposures.
1223 """
1225 ConfigClass = _GroupExposuresByCounterAndExposuresConfig
1227 def find_missing(
1228 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
1229 ) -> list[DimensionRecord]:
1230 """Analyze the exposures and return relevant exposures known to
1231 registry.
1232 """
1233 groups = self.group_exposures(exposures)
1234 missing_exposures: list[DimensionRecord] = []
1235 for exposures_in_group in groups.values():
1236 sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num)
1237 first = sorted_exposures[0]
1239 # Only need to look for the seq_nums that we don't already have.
1240 seq_nums = set(range(first.seq_start, first.seq_end + 1))
1241 seq_nums.difference_update({exp.seq_num for exp in sorted_exposures})
1243 if seq_nums:
1244 # Missing something. Check registry.
1245 records = list(
1246 registry.queryDimensionRecords(
1247 "exposure",
1248 where="exposure.seq_start = seq_start AND exposure.seq_end = seq_end AND "
1249 "exposure.seq_num IN (seq_nums)",
1250 bind={"seq_start": first.seq_start, "seq_end": first.seq_end, "seq_nums": seq_nums},
1251 instrument=first.instrument,
1252 )
1253 )
1254 missing_exposures.extend(records)
1256 return missing_exposures
1258 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
1259 groups = defaultdict(list)
1260 for exposure in exposures:
1261 groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
1262 return groups
1264 def group(
1265 self, exposures: list[DimensionRecord], instrument: Instrument
1266 ) -> Iterable[VisitDefinitionData]:
1267 # Docstring inherited from GroupExposuresTask.
1268 system_one_to_one = VisitSystem.from_name("one-to-one")
1269 system_seq_start_end = VisitSystem.from_name("by-seq-start-end")
1271 groups = self.group_exposures(exposures)
1272 for visit_key, exposures_in_group in groups.items():
1273 instrument_name = exposures_in_group[0].instrument
1275 # It is possible that the first exposure in a visit has not
1276 # been ingested. This can be determined and if that is the case
1277 # we can not reliably define the multi-exposure visit.
1278 skip_multi = False
1279 sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num)
1280 first = sorted_exposures.pop(0)
1281 if first.seq_num != first.seq_start:
1282 # Special case seq_num == 0 since that implies that the
1283 # instrument has no counters and therefore no multi-exposure
1284 # visits.
1285 if first.seq_num != 0:
1286 self.log.warning(
1287 "First exposure for visit %s is not present. Skipping the multi-snap definition.",
1288 visit_key,
1289 )
1290 skip_multi = True
1292 multi_exposure = False
1293 if first.seq_start != first.seq_end:
1294 # This is a multi-exposure visit regardless of the number
1295 # of exposures present.
1296 multi_exposure = True
1298 # Define the one-to-one visits.
1299 for exposure in exposures_in_group:
1300 # Default is to use the exposure ID and name unless
1301 # this is the first exposure in a multi-exposure visit.
1302 visit_name = exposure.obs_id
1303 visit_id = exposure.id
1304 visit_systems = {system_one_to_one}
1306 if not multi_exposure:
1307 # This is also a by-counter visit.
1308 # It will use the same visit_name and visit_id.
1309 visit_systems.add(system_seq_start_end)
1311 elif not skip_multi and exposure == first:
1312 # This is the first legitimate exposure in a multi-exposure
1313 # visit. It therefore needs a modified visit name and ID
1314 # so it does not clash with the multi-exposure visit
1315 # definition.
1316 visit_name = f"{visit_name}_first"
1317 visit_id = int(f"9{visit_id}")
1319 yield VisitDefinitionData(
1320 instrument=instrument_name,
1321 id=visit_id,
1322 name=visit_name,
1323 exposures=[exposure],
1324 visit_systems=visit_systems,
1325 )
1327 # Multi-exposure visit.
1328 if not skip_multi and multi_exposure:
1329 # Define the visit using the first exposure
1330 visit_name = first.obs_id
1331 visit_id = first.id
1333 yield VisitDefinitionData(
1334 instrument=instrument_name,
1335 id=visit_id,
1336 name=visit_name,
1337 exposures=exposures_in_group,
1338 visit_systems={system_seq_start_end},
1339 )
1341 def getVisitSystems(self) -> set[VisitSystem]:
1342 # Docstring inherited from GroupExposuresTask.
1343 # Using a Config for this is difficult because what this grouping
1344 # algorithm is doing is using two visit systems.
1345 # One is using metadata (but not by-group) and the other is the
1346 # one-to-one. For now hard-code in class.
1347 return set(VisitSystem.from_names(["one-to-one", "by-seq-start-end"]))
1350class _ComputeVisitRegionsFromSingleRawWcsConfig(ComputeVisitRegionsConfig):
1351 mergeExposures: Field[bool] = Field(
1352 doc=(
1353 "If True, merge per-detector regions over all exposures in a "
1354 "visit (via convex hull) instead of using the first exposure and "
1355 "assuming its regions are valid for all others."
1356 ),
1357 dtype=bool,
1358 default=False,
1359 )
1360 detectorId: Field[int | None] = Field(
1361 doc=(
1362 "Load the WCS for the detector with this ID. If None, use an "
1363 "arbitrary detector (the first found in a query of the data "
1364 "repository for each exposure (or all exposures, if "
1365 "mergeExposures is True)."
1366 ),
1367 dtype=int,
1368 optional=True,
1369 default=None,
1370 )
1371 requireVersionedCamera: Field[bool] = Field(
1372 doc=(
1373 "If True, raise LookupError if version camera geometry cannot be "
1374 "loaded for an exposure. If False, use the nominal camera from "
1375 "the Instrument class instead."
1376 ),
1377 dtype=bool,
1378 optional=False,
1379 default=False,
1380 )
1383@registerConfigurable("single-raw-wcs", ComputeVisitRegionsTask.registry)
1384class _ComputeVisitRegionsFromSingleRawWcsTask(ComputeVisitRegionsTask):
1385 """A visit region calculator that uses a single raw WCS (recomputed from
1386 the ``exposure`` dimension record) and a camera to project the bounding
1387 boxes of all detectors onto the sky, relating different detectors by their
1388 positions in focal plane coordinates.
1390 Notes
1391 -----
1392 Most instruments should have their raw WCSs determined from a combination
1393 of boresight angle, rotator angle, and camera geometry, and hence this
1394 algorithm should produce stable results regardless of which detector the
1395 raw corresponds to. If this is not the case (e.g. because a per-file FITS
1396 WCS is used instead), either the ID of the detector should be fixed (see
1397 the ``detectorId`` config parameter) or a different algorithm used.
1398 """
1400 ConfigClass = _ComputeVisitRegionsFromSingleRawWcsConfig
1401 config: _ComputeVisitRegionsFromSingleRawWcsConfig
1403 def computeExposureBounds(
1404 self, exposure: DimensionRecord, *, collections: Any = None
1405 ) -> dict[int, list[UnitVector3d]]:
1406 """Compute the lists of unit vectors on the sphere that correspond to
1407 the sky positions of detector corners.
1409 Parameters
1410 ----------
1411 exposure : `DimensionRecord`
1412 Dimension record for the exposure.
1413 collections : Any, optional
1414 Collections to be searched for raws and camera geometry, overriding
1415 ``self.butler.collections.defaults``.
1416 Can be any of the types supported by the ``collections`` argument
1417 to butler construction.
1419 Returns
1420 -------
1421 bounds : `dict`
1422 Dictionary mapping detector ID to a list of unit vectors on the
1423 sphere representing that detector's corners projected onto the sky.
1424 """
1425 if collections is None:
1426 collections = list(self.butler.collections.defaults)
1427 camera, versioned = loadCamera(self.butler, exposure.dataId, collections=collections)
1428 if not versioned and self.config.requireVersionedCamera:
1429 raise LookupError(f"No versioned camera found for exposure {exposure.dataId}.")
1431 orientation = lsst.geom.Angle(exposure.sky_angle, lsst.geom.degrees)
1432 radec = lsst.geom.SpherePoint(
1433 lsst.geom.Angle(exposure.tracking_ra, lsst.geom.degrees),
1434 lsst.geom.Angle(exposure.tracking_dec, lsst.geom.degrees),
1435 )
1437 if self.config.detectorId is None:
1438 detectorId = next(camera.getIdIter())
1439 else:
1440 detectorId = self.config.detectorId
1441 wcsDetector = camera[detectorId]
1443 # Ask the raw formatter to create the relevant WCS
1444 # This allows flips to be taken into account
1445 instrument = self.getInstrument(exposure.instrument)
1446 rawFormatter = instrument.getRawFormatter({"detector": detectorId})
1448 try:
1449 wcs = rawFormatter.makeRawSkyWcsFromBoresight(radec, orientation, wcsDetector) # type: ignore
1450 except AttributeError:
1451 raise TypeError(
1452 f"Raw formatter is {get_full_type_name(rawFormatter)} but visit"
1453 " definition requires it to support 'makeRawSkyWcsFromBoresight'"
1454 ) from None
1456 fpToSky = wcsDetector.getTransform(FOCAL_PLANE, PIXELS).then(wcs.getTransform())
1457 bounds = {}
1458 for detector in camera:
1459 pixelsToSky = detector.getTransform(PIXELS, FOCAL_PLANE).then(fpToSky)
1460 pixCorners = Box2D(detector.getBBox().dilatedBy(self.config.padding)).getCorners()
1461 bounds[detector.getId()] = [
1462 skyCorner.getVector() for skyCorner in pixelsToSky.applyForward(pixCorners)
1463 ]
1464 return bounds
1466 def compute(
1467 self, visit: VisitDefinitionData, *, collections: Any = None
1468 ) -> tuple[Region, dict[int, Region]]:
1469 # Docstring inherited from ComputeVisitRegionsTask.
1470 if self.config.mergeExposures:
1471 detectorBounds: dict[int, list[UnitVector3d]] = defaultdict(list)
1472 for exposure in visit.exposures:
1473 exposureDetectorBounds = self.computeExposureBounds(exposure, collections=collections)
1474 for detectorId, bounds in exposureDetectorBounds.items():
1475 detectorBounds[detectorId].extend(bounds)
1476 else:
1477 detectorBounds = self.computeExposureBounds(visit.exposures[0], collections=collections)
1478 visitBounds = []
1479 detectorRegions = {}
1480 for detectorId, bounds in detectorBounds.items():
1481 detectorRegions[detectorId] = ConvexPolygon.convexHull(bounds)
1482 visitBounds.extend(bounds)
1483 return ConvexPolygon.convexHull(visitBounds), detectorRegions