Coverage for python / lsst / obs / base / defineVisits.py: 23%

482 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:47 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "ComputeVisitRegionsConfig", 

26 "ComputeVisitRegionsTask", 

27 "DefineVisitsConfig", 

28 "DefineVisitsTask", 

29 "GroupExposuresConfig", 

30 "GroupExposuresTask", 

31 "VisitDefinitionData", 

32 "VisitSystem", 

33] 

34 

35import cmath 

36import dataclasses 

37import enum 

38import math 

39import operator 

40from abc import ABCMeta, abstractmethod 

41from collections import defaultdict 

42from collections.abc import Callable, Iterable, Iterator, Sequence 

43from contextlib import contextmanager 

44from typing import Any, ClassVar, TypeVar, cast 

45 

46import lsst.geom 

47from lsst.afw.cameraGeom import FOCAL_PLANE, PIXELS 

48from lsst.daf.butler import Butler, DataCoordinate, DataId, DimensionRecord, Progress, Timespan 

49from lsst.daf.butler.queries import Query 

50from lsst.daf.butler.registry import ConflictingDefinitionError 

51from lsst.geom import Box2D 

52from lsst.pex.config import Config, Field, makeRegistry, registerConfigurable 

53from lsst.pipe.base import Struct, Task 

54from lsst.sphgeom import ConvexPolygon, Region, UnitVector3d 

55from lsst.utils.introspection import get_full_type_name 

56 

57from ._instrument import Instrument, loadCamera 

58 

59 

60class VisitSystem(enum.Enum): 

61 """Enumeration used to label different visit systems.""" 

62 

63 ONE_TO_ONE = 0 

64 """Each exposure is assigned to its own visit.""" 

65 

66 BY_GROUP_METADATA = 1 

67 """Visit membership is defined by the value of the group dimension or, for 

68 older dimension universes, exposure.group_id.""" 

69 

70 BY_SEQ_START_END = 2 

71 """Visit membership is defined by the values of the ``exposure.day_obs``, 

72 ``exposure.seq_start``, and ``exposure.seq_end`` values. 

73 """ 

74 

75 @classmethod 

76 def all(cls) -> frozenset[VisitSystem]: 

77 """Return a `frozenset` containing all members.""" 

78 return frozenset(cls.__members__.values()) 

79 

80 @classmethod 

81 def from_name(cls, external_name: str) -> VisitSystem: 

82 """Construct the enumeration from given name.""" 

83 name = external_name.upper() 

84 name = name.replace("-", "_") 

85 try: 

86 return cls.__members__[name] 

87 except KeyError: 

88 raise KeyError(f"Visit system named '{external_name}' not known.") from None 

89 

90 @classmethod 

91 def from_names(cls, names: Iterable[str] | None) -> frozenset[VisitSystem]: 

92 """Return a `frozenset` of all the visit systems matching the supplied 

93 names. 

94 

95 Parameters 

96 ---------- 

97 names : iterable of `str`, or `None` 

98 Names of visit systems. Case insensitive. If `None` or empty, all 

99 the visit systems are returned. 

100 

101 Returns 

102 ------- 

103 systems : `frozenset` of `VisitSystem` 

104 The matching visit systems. 

105 """ 

106 if not names: 

107 return cls.all() 

108 

109 return frozenset({cls.from_name(name) for name in names}) 

110 

111 def __str__(self) -> str: 

112 name = self.name.lower() 

113 name = name.replace("_", "-") 

114 return name 

115 

116 

117@dataclasses.dataclass 

118class VisitDefinitionData: 

119 """Struct representing a group of exposures that will be used to define a 

120 visit. 

121 """ 

122 

123 instrument: str 

124 """Name of the instrument this visit will be associated with. 

125 """ 

126 

127 id: int 

128 """Integer ID of the visit. 

129 

130 This must be unique across all visit systems for the instrument. 

131 """ 

132 

133 name: str 

134 """String name for the visit. 

135 

136 This must be unique across all visit systems for the instrument. 

137 """ 

138 

139 visit_systems: set[VisitSystem] 

140 """All the visit systems associated with this visit.""" 

141 

142 exposures: list[DimensionRecord] = dataclasses.field(default_factory=list) 

143 """Dimension records for the exposures that are part of this visit. 

144 """ 

145 

146 

147@dataclasses.dataclass 

148class _VisitRecords: 

149 """Struct containing the dimension records associated with a visit.""" 

150 

151 visit: DimensionRecord 

152 """Record for the 'visit' dimension itself. 

153 """ 

154 

155 visit_definition: list[DimensionRecord] 

156 """Records for 'visit_definition', which relates 'visit' to 'exposure'. 

157 """ 

158 

159 visit_detector_region: list[DimensionRecord] 

160 """Records for 'visit_detector_region', which associates the combination 

161 of a 'visit' and a 'detector' with a region on the sky. 

162 """ 

163 

164 visit_system_membership: list[DimensionRecord] 

165 """Records relating visits to an associated visit system.""" 

166 

167 

168class GroupExposuresConfig(Config): 

169 """Configure exposure grouping.""" 

170 

171 

172class GroupExposuresTask(Task, metaclass=ABCMeta): 

173 """Abstract base class for the subtask of `DefineVisitsTask` that is 

174 responsible for grouping exposures into visits. 

175 

176 Subclasses should be registered with `GroupExposuresTask.registry` to 

177 enable use by `DefineVisitsTask`, and should generally correspond to a 

178 particular 'visit_system' dimension value. They are also responsible for 

179 defining visit IDs and names that are unique across all visit systems in 

180 use by an instrument. 

181 

182 Parameters 

183 ---------- 

184 config : `GroupExposuresConfig` 

185 Configuration information. 

186 **kwargs 

187 Additional keyword arguments forwarded to the `lsst.pipe.base.Task` 

188 constructor. 

189 """ 

190 

191 def __init__(self, config: GroupExposuresConfig, **kwargs: Any): 

192 Task.__init__(self, config=config, **kwargs) 

193 

194 ConfigClass = GroupExposuresConfig 

195 

196 _DefaultName = "groupExposures" 

197 

198 registry = makeRegistry( 

199 doc="Registry of algorithms for grouping exposures into visits.", 

200 configBaseType=GroupExposuresConfig, 

201 ) 

202 

203 @abstractmethod 

204 def find_missing( 

205 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry 

206 ) -> list[DimensionRecord]: 

207 """Determine, if possible, which exposures might be missing. 

208 

209 Parameters 

210 ---------- 

211 exposures : `list` of `lsst.daf.butler.DimensionRecord` 

212 The exposure records to analyze. 

213 registry : `lsst.daf.butler.Registry` 

214 A butler registry that contains these exposure records. 

215 

216 Returns 

217 ------- 

218 missing : `list` of `lsst.daf.butler.DimensionRecord` 

219 Any exposure records present in registry that were related to 

220 the given exposures but were missing from that list and deemed 

221 to be relevant. 

222 

223 Notes 

224 ----- 

225 Only some grouping schemes are able to find missing exposures. It 

226 is acceptable to return an empty list. 

227 """ 

228 raise NotImplementedError() 

229 

230 @abstractmethod 

231 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]: 

232 """Group the exposures in a way most natural for this visit definition. 

233 

234 Parameters 

235 ---------- 

236 exposures : `list` of `lsst.daf.butler.DimensionRecord` 

237 The exposure records to group. 

238 

239 Returns 

240 ------- 

241 groups : `dict` [Any, `list` [ `lsst.daf.butler.DimensionRecord` ] ] 

242 Groupings of exposure records. The key type is relevant to the 

243 specific visit definition and could be a string or a tuple. 

244 """ 

245 raise NotImplementedError() 

246 

247 @abstractmethod 

248 def group( 

249 self, exposures: list[DimensionRecord], instrument: Instrument 

250 ) -> Iterable[VisitDefinitionData]: 

251 """Group the given exposures into visits. 

252 

253 Parameters 

254 ---------- 

255 exposures : `list` [ `lsst.daf.butler.DimensionRecord` ] 

256 DimensionRecords (for the 'exposure' dimension) describing the 

257 exposures to group. 

258 instrument : `~lsst.obs.base.Instrument` 

259 Instrument specification that can be used to optionally support 

260 some visit ID definitions. 

261 

262 Returns 

263 ------- 

264 visits : `~collections.abc.Iterable` [ `VisitDefinitionData` ] 

265 Structs identifying the visits and the exposures associated with 

266 them. This may be an iterator or a container. 

267 """ 

268 raise NotImplementedError() 

269 

270 def getVisitSystems(self) -> set[VisitSystem]: 

271 """Return identifiers for the 'visit_system' dimension this 

272 algorithm implements. 

273 

274 Returns 

275 ------- 

276 visit_systems : `set` [`VisitSystem`] 

277 The visit systems used by this algorithm. 

278 """ 

279 raise NotImplementedError() 

280 

281 

282class ComputeVisitRegionsConfig(Config): 

283 """Configure visit region calculations.""" 

284 

285 padding: Field[int] = Field( 

286 dtype=int, 

287 default=250, 

288 doc=( 

289 "Pad raw image bounding boxes with specified number of pixels " 

290 "when calculating their (conservatively large) region on the " 

291 "sky. Note that the config value for pixelMargin of the " 

292 "reference object loaders in meas_algorithms should be <= " 

293 "the value set here." 

294 ), 

295 ) 

296 

297 

298class ComputeVisitRegionsTask(Task, metaclass=ABCMeta): 

299 """Abstract base class for the subtask of `DefineVisitsTask` that is 

300 responsible for extracting spatial regions for visits and visit+detector 

301 combinations. 

302 

303 Subclasses should be registered with `ComputeVisitRegionsTask.registry` to 

304 enable use by `DefineVisitsTask`. 

305 

306 Parameters 

307 ---------- 

308 config : `ComputeVisitRegionsConfig` 

309 Configuration information. 

310 butler : `lsst.daf.butler.Butler` 

311 The butler to use. 

312 **kwargs 

313 Additional keyword arguments forwarded to the `~lsst.pipe.base.Task` 

314 constructor. 

315 """ 

316 

317 def __init__(self, config: ComputeVisitRegionsConfig, *, butler: Butler, **kwargs: Any): 

318 Task.__init__(self, config=config, **kwargs) 

319 self.butler = butler 

320 self.instrumentMap: dict[str, Instrument] = {} 

321 

322 ConfigClass = ComputeVisitRegionsConfig 

323 

324 _DefaultName = "computeVisitRegions" 

325 

326 registry = makeRegistry( 

327 doc="Registry of algorithms for computing on-sky regions for visits and visit+detector combinations.", 

328 configBaseType=ComputeVisitRegionsConfig, 

329 ) 

330 

331 def getInstrument(self, instrumentName: str) -> Instrument: 

332 """Retrieve an `~lsst.obs.base.Instrument` associated with this 

333 instrument name. 

334 

335 Parameters 

336 ---------- 

337 instrumentName : `str` 

338 The name of the instrument. 

339 

340 Returns 

341 ------- 

342 instrument : `~lsst.obs.base.Instrument` 

343 The associated instrument object. 

344 

345 Notes 

346 ----- 

347 The result is cached. 

348 """ 

349 instrument = self.instrumentMap.get(instrumentName) 

350 if instrument is None: 

351 instrument = Instrument.fromName(instrumentName, self.butler.registry) 

352 self.instrumentMap[instrumentName] = instrument 

353 return instrument 

354 

355 @abstractmethod 

356 def compute( 

357 self, 

358 visit: VisitDefinitionData, 

359 *, 

360 collections: Sequence[str] | str | None = None, 

361 ) -> tuple[Region, dict[int, Region]]: 

362 """Compute regions for the given visit and all detectors in that visit. 

363 

364 Parameters 

365 ---------- 

366 visit : `VisitDefinitionData` 

367 Struct describing the visit and the exposures associated with it. 

368 collections : `collections.abc.Sequence` [ `str` ] or `str` or `None` 

369 Collections to be searched for camera geometry, overriding 

370 ``self.butler.collections.defaults``. Can be any of the types 

371 supported by the ``collections`` argument to butler construction. 

372 

373 Returns 

374 ------- 

375 visitRegion : `lsst.sphgeom.Region` 

376 Region for the full visit. 

377 visitDetectorRegions : `dict` [ `int`, `lsst.sphgeom.Region` ] 

378 Dictionary mapping detector ID to the region for that detector. 

379 Should include all detectors in the visit. 

380 """ 

381 raise NotImplementedError() 

382 

383 

384class DefineVisitsConfig(Config): 

385 """Configure visit definition.""" 

386 

387 groupExposures = GroupExposuresTask.registry.makeField( 

388 doc="Algorithm for grouping exposures into visits.", 

389 default="one-to-one", 

390 ) 

391 computeVisitRegions = ComputeVisitRegionsTask.registry.makeField( 

392 doc="Algorithm from computing visit and visit+detector regions.", 

393 default="single-raw-wcs", 

394 ) 

395 ignoreNonScienceExposures: Field[bool] = Field( 

396 doc=( 

397 "If True, silently ignore input exposures that do not have " 

398 "observation_type=SCIENCE. If False, raise an exception if one " 

399 "encountered." 

400 ), 

401 dtype=bool, 

402 optional=False, 

403 default=True, 

404 ) 

405 updateObsCoreTable: Field[bool] = Field( 

406 doc=( 

407 "If True, update exposure regions in obscore table after visits " 

408 "are defined. If False, do not update obscore table." 

409 ), 

410 dtype=bool, 

411 default=True, 

412 ) 

413 

414 

415class DefineVisitsTask(Task): 

416 """Driver Task for defining visits (and their spatial regions) in Gen3 

417 Butler repositories. 

418 

419 Parameters 

420 ---------- 

421 config : `DefineVisitsConfig` 

422 Configuration for the task. 

423 butler : `~lsst.daf.butler.Butler` 

424 Writeable butler instance. Will be used to read ``camera`` datasets 

425 and insert/sync dimension data. 

426 **kwargs 

427 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task` 

428 constructor. 

429 

430 Notes 

431 ----- 

432 Each instance of `DefineVisitsTask` reads from / writes to the same Butler. 

433 Each invocation of `DefineVisitsTask.run` processes an independent group of 

434 exposures into one or more new visits, all belonging to the same visit 

435 system and instrument. 

436 

437 The actual work of grouping exposures and computing regions is delegated to 

438 pluggable subtasks (`GroupExposuresTask` and `ComputeVisitRegionsTask`), 

439 respectively. The defaults are to create one visit for every exposure, and 

440 to use exactly one (arbitrary) detector-level raw dataset's WCS along with 

441 camera geometry to compute regions for all detectors, but the raw WCS is 

442 recomputed from the ``exposure`` dimension record's rotation angle and 

443 boresight rather than by loading the ``raw.wcs`` dataset directly. Other 

444 implementations can be created and configured for instruments for which 

445 these choices are unsuitable (e.g. because visits and exposures are not 

446 one-to-one, or because ``raw.wcs`` datasets for different detectors may not 

447 be consistent with camera geometry). 

448 

449 Defining the same visit the same way multiple times (e.g. via multiple 

450 invocations of this task on the same exposures, with the same 

451 configuration) is safe, but it may be inefficient, as most of the work must 

452 be done before new visits can be compared to existing visits. 

453 """ 

454 

455 def __init__(self, config: DefineVisitsConfig, *, butler: Butler, **kwargs: Any): 

456 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here. 

457 super().__init__(config, **kwargs) 

458 self.butler = butler 

459 self.universe = self.butler.dimensions 

460 self.progress = Progress("obs.base.DefineVisitsTask") 

461 self.makeSubtask("groupExposures") 

462 self.makeSubtask("computeVisitRegions", butler=self.butler) 

463 

464 def _reduce_kwargs(self) -> dict: 

465 # Add extra parameters to pickle 

466 return dict(**super()._reduce_kwargs(), butler=self.butler) 

467 

468 ConfigClass: ClassVar[type[Config]] = DefineVisitsConfig 

469 

470 _DefaultName: ClassVar[str] = "defineVisits" 

471 

472 config: DefineVisitsConfig 

473 groupExposures: GroupExposuresTask 

474 computeVisitRegions: ComputeVisitRegionsTask 

475 

476 def _buildVisitRecords( 

477 self, definition: VisitDefinitionData, *, collections: Sequence[str] | str | None = None 

478 ) -> _VisitRecords: 

479 """Build the DimensionRecords associated with a visit. 

480 

481 Parameters 

482 ---------- 

483 definition : `VisitDefinitionData` 

484 Struct with identifiers for the visit and records for its 

485 constituent exposures. 

486 collections : `collections.abc.Sequence` [ `str` ] or `str` or `None` 

487 Collections to be searched for camera geometry, overriding 

488 ``self.butler.collections.defaults``. Can be any of the types 

489 supported by the ``collections`` argument to butler construction. 

490 

491 Returns 

492 ------- 

493 records : `_VisitRecords` 

494 Struct containing DimensionRecords for the visit, including 

495 associated dimension elements. 

496 """ 

497 dimension = self.universe["visit"] 

498 

499 # Some registries support additional items. 

500 supported = {meta.name for meta in dimension.metadata} 

501 

502 # Compute all regions. 

503 visitRegion, visitDetectorRegions = self.computeVisitRegions.compute( 

504 definition, collections=collections 

505 ) 

506 # Aggregate other exposure quantities. 

507 timespan = Timespan( 

508 begin=_reduceOrNone(min, (e.timespan.begin for e in definition.exposures)), 

509 end=_reduceOrNone(max, (e.timespan.end for e in definition.exposures)), 

510 ) 

511 exposure_time = _reduceOrNone(operator.add, (e.exposure_time for e in definition.exposures)) 

512 physical_filter = _reduceOrNone(_value_if_equal, (e.physical_filter for e in definition.exposures)) 

513 target_name = _reduceOrNone(_value_if_equal, (e.target_name for e in definition.exposures)) 

514 science_program = _reduceOrNone(_value_if_equal, (e.science_program for e in definition.exposures)) 

515 

516 # observing day for a visit is defined by the earliest observation 

517 # of the visit 

518 observing_day = _reduceOrNone(min, (e.day_obs for e in definition.exposures)) 

519 observation_reason = _reduceOrNone( 

520 _value_if_equal, (e.observation_reason for e in definition.exposures) 

521 ) 

522 if observation_reason is None: 

523 # Be explicit about there being multiple reasons 

524 observation_reason = "various" 

525 

526 # Use the mean zenith angle as an approximation 

527 zenith_angle = _reduceOrNone(operator.add, (e.zenith_angle for e in definition.exposures)) 

528 if zenith_angle is not None: 

529 zenith_angle /= len(definition.exposures) 

530 

531 # New records that may not be supported. 

532 extras: dict[str, Any] = {} 

533 if "seq_num" in supported: 

534 extras["seq_num"] = _reduceOrNone(min, (e.seq_num for e in definition.exposures)) 

535 if "azimuth" in supported: 

536 # Must take into account 0/360 problem. 

537 extras["azimuth"] = _calc_mean_angle([e.azimuth for e in definition.exposures]) 

538 

539 # visit_system handling changed. This is the logic for visit/exposure 

540 # that has support for seq_start/seq_end. 

541 if "seq_num" in supported: 

542 # Map visit to exposure. 

543 visit_definition = [ 

544 self.universe["visit_definition"].RecordClass( 

545 instrument=definition.instrument, 

546 visit=definition.id, 

547 exposure=exposure.id, 

548 ) 

549 for exposure in definition.exposures 

550 ] 

551 

552 # Map visit to visit system. 

553 visit_system_membership = [] 

554 for visit_system in self.groupExposures.getVisitSystems(): 

555 if visit_system in definition.visit_systems: 

556 record = self.universe["visit_system_membership"].RecordClass( 

557 instrument=definition.instrument, 

558 visit=definition.id, 

559 visit_system=visit_system.value, 

560 ) 

561 visit_system_membership.append(record) 

562 

563 else: 

564 # The old approach can only handle one visit system at a time. 

565 # If we have been configured with multiple options, prefer the 

566 # one-to-one. 

567 visit_systems = self.groupExposures.getVisitSystems() 

568 if len(visit_systems) > 1: 

569 one_to_one = VisitSystem.from_name("one-to-one") 

570 if one_to_one not in visit_systems: 

571 raise ValueError( 

572 f"Multiple visit systems specified ({visit_systems}) for use with old" 

573 " dimension universe but unable to find one-to-one." 

574 ) 

575 visit_system = one_to_one 

576 else: 

577 visit_system = visit_systems.pop() 

578 

579 extras["visit_system"] = visit_system.value 

580 

581 # The old visit_definition included visit system. 

582 visit_definition = [ 

583 self.universe["visit_definition"].RecordClass( 

584 instrument=definition.instrument, 

585 visit=definition.id, 

586 exposure=exposure.id, 

587 visit_system=visit_system.value, 

588 ) 

589 for exposure in definition.exposures 

590 ] 

591 

592 # This concept does not exist in old schema. 

593 visit_system_membership = [] 

594 

595 # Construct the actual DimensionRecords. 

596 return _VisitRecords( 

597 visit=dimension.RecordClass( 

598 instrument=definition.instrument, 

599 id=definition.id, 

600 name=definition.name, 

601 physical_filter=physical_filter, 

602 target_name=target_name, 

603 science_program=science_program, 

604 observation_reason=observation_reason, 

605 day_obs=observing_day, 

606 zenith_angle=zenith_angle, 

607 exposure_time=exposure_time, 

608 timespan=timespan, 

609 region=visitRegion, 

610 # TODO: no seeing value in exposure dimension records, so we 

611 # can't set that here. But there are many other columns that 

612 # both dimensions should probably have as well. 

613 **extras, 

614 ), 

615 visit_definition=visit_definition, 

616 visit_system_membership=visit_system_membership, 

617 visit_detector_region=[ 

618 self.universe["visit_detector_region"].RecordClass( 

619 instrument=definition.instrument, 

620 visit=definition.id, 

621 detector=detectorId, 

622 region=detectorRegion, 

623 ) 

624 for detectorId, detectorRegion in visitDetectorRegions.items() 

625 ], 

626 ) 

627 

628 def run( 

629 self, 

630 dataIds_or_records: Iterable[DataId | DimensionRecord], 

631 *, 

632 collections: Sequence[str] | str | None = None, 

633 update_records: bool = False, 

634 incremental: bool = False, 

635 skip_conflicting: bool = False, 

636 prefilter: bool = False, 

637 check_detector_regions: bool = False, 

638 ) -> Struct: 

639 """Add visit definitions to the registry for the given exposures. 

640 

641 Parameters 

642 ---------- 

643 dataIds_or_records : `~collections.abc.Iterable` [ `dict` or \ 

644 `~lsst.daf.butler.DataCoordinate` or \ 

645 `~lsst.daf.butler.DimensionRecord` ] 

646 Exposure-level data IDs or explicit exposure records. These must 

647 all correspond to the same instrument, and are expected to be 

648 on-sky science exposures. 

649 collections : `~collections.abc.Sequence` [ `str` ] or `str` or `None` 

650 Collections to be searched for camera geometry, overriding 

651 ``self.butler.collections.defaults``. Can be any of the types 

652 supported by the ``collections`` argument to butler construction. 

653 update_records : `bool`, optional 

654 If `True` (`False` is default), update existing ``visit`` records 

655 and ``visit_detector_region`` records. THIS IS AN ADVANCED OPTION 

656 THAT SHOULD ONLY BE USED TO FIX REGIONS AND/OR METADATA THAT ARE 

657 KNOWN TO BE BAD, AND IT CANNOT BE USED TO REMOVE EXPOSURES OR 

658 DETECTORS FROM A VISIT. 

659 incremental : `bool`, optional 

660 If `True` indicate that exposures are being ingested incrementally 

661 and visit definition will be run on partial visits. This will 

662 allow the ``visit`` record to be updated if it already exists, but 

663 (unlike ``update_records=True``) it will only update the 

664 ``visit_detector_region`` records if the ``visit`` record's region 

665 changes. If there is any risk that files are being ingested 

666 incrementally it is critical that this parameter is set to `True` 

667 and not to rely on ``update_records``. 

668 skip_conflicting : `bool`, optional 

669 If `True` do not raise an error if there is a change in an existing 

670 visit definition. This can be used if you solely want to define 

671 visits that were somehow missed previously. It has no effect if 

672 ``update_records`` is `True` or incremental mode is enabled. 

673 prefilter : `bool`, optional 

674 If `True`, filter out exposures that already have a visit defined 

675 by querying in advance. This is not compatible with 

676 ``incremental=True`` or ``update_records=True``. Note that for 

677 exposures that may be associated with multiple visits, the 

678 presence of any existing visit will cause that exposure to be 

679 skipped entirely. 

680 check_detector_regions : `bool`, optional 

681 If `True`, check existing visits to see if they have a 

682 ``visit_detector_region`` record for every detector. The default 

683 is to assume that if a ``visit`` record is present, the 

684 ``visit_detector_region`` records are as well. 

685 

686 Returns 

687 ------- 

688 result : `lsst.pipe.base.Struct` 

689 Structure with the following attributes (all `int`): 

690 

691 - n_visits: total number of visits defined 

692 - n_skipped: number of visits that were already present left alone 

693 - n_new: number of new visits inserted 

694 - n_fully_updated: number of existing visits fully updated 

695 - n_partially_updated: number of visits with non-geometry updates 

696 - n_filtered: number of exposures dropped because they were 

697 already associated with visits. 

698 

699 Raises 

700 ------ 

701 lsst.daf.butler.registry.ConflictingDefinitionError 

702 Raised if a visit ID conflict is detected and the existing visit 

703 differs from the new one. 

704 """ 

705 if prefilter and (update_records or incremental): 

706 raise TypeError("prefilter=True cannot be used with update_records=True or incremental=True") 

707 

708 # Normalize, expand, and deduplicate data IDs. 

709 self.log.info("Preprocessing data IDs.") 

710 dimensions = self.universe.conform(["exposure"]) 

711 

712 exposure_records: dict[DataCoordinate, DimensionRecord] = {} 

713 instruments: set[str] = set() 

714 instrument_cls_name: str | None = None 

715 instrument_record: DimensionRecord | None = None 

716 

717 # Go through the supplied dataset extracting records. 

718 # Check that only a single instrument is being used. 

719 for external in dataIds_or_records: 

720 if isinstance(external, DimensionRecord): 

721 record = external 

722 if str(record.definition) != "exposure": 

723 raise ValueError(f"Can only define visits from exposure records, not {record}.") 

724 else: 

725 data_id = self.butler.registry.expandDataId(external, dimensions=dimensions) 

726 exp_record = data_id.records["exposure"] 

727 assert exp_record is not None, "Guaranteed by expandDataIds call earlier." 

728 record = exp_record 

729 instrument_record = data_id.records["instrument"] 

730 

731 # LSSTCam data can assign ra/dec to flats, and dome-closed 

732 # engineering tests. Do not assign a visit if we know that 

733 # can_see_sky is False. Treat None as True for this test. 

734 can_see_sky = getattr(record, "can_see_sky", True) 

735 if ( 

736 record.tracking_ra is None 

737 or record.tracking_dec is None 

738 or record.sky_angle is None 

739 or can_see_sky is False 

740 ): 

741 if self.config.ignoreNonScienceExposures: 

742 continue 

743 else: 

744 raise RuntimeError( 

745 f"Input exposure {external} has observation_type " 

746 f"{record.observation_type}, but is not on sky." 

747 ) 

748 instrument_name = record.instrument 

749 instruments.add(instrument_name) 

750 exposure_records[record.dataId] = record 

751 

752 n_filtered = 0 

753 visits_with_complete_regions: set[DataCoordinate] | None = None 

754 if prefilter or check_detector_regions: 

755 with self._query_exposures(exposure_records) as query: 

756 if check_detector_regions: 

757 visits_with_complete_regions = self._find_visits_with_all_detector_regions(query) 

758 if prefilter: 

759 n_filtered = self._prefilter(exposure_records, query, visits_with_complete_regions) 

760 

761 # Downstream APIs expect a list of records, not a dict. 

762 exposures = list(exposure_records.values()) 

763 if not exposures: 

764 self.log.info("No on-sky exposures found after filtering.") 

765 return Struct( 

766 n_visits=0, 

767 n_skipped=0, 

768 n_new=0, 

769 n_partially_updated=0, 

770 n_fully_updated=0, 

771 n_filtered=n_filtered, 

772 ) 

773 if len(instruments) > 1: 

774 raise RuntimeError( 

775 "All data IDs passed to DefineVisitsTask.run must be " 

776 f"from the same instrument; got {instruments}." 

777 ) 

778 (instrument,) = instruments 

779 

780 # Might need the instrument class for later depending on universe 

781 # and grouping scheme. 

782 if instrument_cls_name is None: 

783 if instrument_record is None: 

784 # We were given a DimensionRecord instead of a DataCoordinate. 

785 

786 instrument_records = self.butler.query_dimension_records( 

787 "instrument", instrument=instrument, limit=1 

788 ) 

789 if len(instrument_records) != 1: 

790 raise RuntimeError( 

791 f"Instrument {instrument} found in dimension record but unknown to butler." 

792 ) 

793 instrument_record = instrument_records[0] 

794 instrument_cls_name = instrument_record.class_name 

795 assert instrument_cls_name is not None, "Instrument must be defined by this point" 

796 instrument_helper = Instrument.from_string(instrument_cls_name) 

797 

798 # Ensure the visit_system our grouping algorithm uses is in the 

799 # registry, if it wasn't already. 

800 visitSystems = self.groupExposures.getVisitSystems() 

801 for visitSystem in visitSystems: 

802 self.log.info("Registering visit_system %d: %s.", visitSystem.value, visitSystem) 

803 self.butler.registry.syncDimensionData( 

804 "visit_system", 

805 {"instrument": instrument, "id": visitSystem.value, "name": str(visitSystem)}, 

806 ) 

807 

808 # In true incremental we will be given the second snap on its 

809 # own on the assumption that the previous snap was already handled. 

810 # For correct grouping we need access to the other exposures in the 

811 # visit. 

812 if incremental: 

813 exposures.extend(self.groupExposures.find_missing(exposures, self.butler.registry)) 

814 

815 # Group exposures into visits, delegating to subtask. 

816 self.log.info("Grouping %d exposure(s) into visits.", len(exposures)) 

817 definitions = list(self.groupExposures.group(exposures, instrument_helper)) 

818 # Iterate over visits, compute regions, and insert dimension data, one 

819 # transaction per visit. If a visit already exists, we skip all other 

820 # inserts. 

821 self.log.info("Computing regions and other metadata for %d visit(s).", len(definitions)) 

822 n_skipped: int = 0 

823 n_new: int = 0 

824 n_fully_updated: int = 0 

825 n_partially_updated: int = 0 

826 for visitDefinition in self.progress.wrap( 

827 definitions, total=len(definitions), desc="Computing regions and inserting visits" 

828 ): 

829 visitRecords = self._buildVisitRecords(visitDefinition, collections=collections) 

830 with self.butler.registry.transaction(): 

831 try: 

832 inserted_or_updated = self.butler.registry.syncDimensionData( 

833 "visit", 

834 visitRecords.visit, 

835 update=(update_records or incremental), 

836 ) 

837 except ConflictingDefinitionError as err: 

838 if not skip_conflicting: 

839 err.add_note(f"on visit {visitDefinition.id}") 

840 raise 

841 inserted_or_updated = False 

842 self.log.verbose("Skipping visit %d due to conflict.", visitRecords.visit.id) 

843 needs_visit_detector_region_inserts = ( 

844 visits_with_complete_regions is not None 

845 and visitRecords.visit.dataId not in visits_with_complete_regions 

846 ) 

847 if inserted_or_updated or update_records or needs_visit_detector_region_inserts: 

848 if inserted_or_updated is True: 

849 # This is a new visit, not an update to an existing 

850 # one, so insert visit definition. 

851 # We don't allow visit definitions to change even when 

852 # asked to update, because we'd have to delete the old 

853 # visit_definitions first and also worry about what 

854 # this does to datasets that already use the visit. 

855 self.butler.registry.insertDimensionData( 

856 "visit_definition", *visitRecords.visit_definition 

857 ) 

858 if visitRecords.visit_system_membership: 

859 self.butler.registry.insertDimensionData( 

860 "visit_system_membership", *visitRecords.visit_system_membership 

861 ) 

862 elif incremental and len(visitRecords.visit_definition) > 1: 

863 # The visit record was modified. This could happen 

864 # if a multi-snap visit was redefined with an 

865 # additional snap so play it safe and allow for the 

866 # visit definition to be updated. We use update=False 

867 # here since there should not be any rows updated, 

868 # just additional rows added. update=True does not work 

869 # correctly with multiple records. In incremental mode 

870 # we assume that the caller wants the visit definition 

871 # to be updated and has no worries about provenance 

872 # with the previous definition. 

873 for definition in visitRecords.visit_definition: 

874 self.butler.registry.syncDimensionData("visit_definition", definition) 

875 if inserted_or_updated is True: 

876 # Insert visit-detector regions if the visit is new. 

877 self.butler.registry.insertDimensionData( 

878 "visit_detector_region", 

879 *visitRecords.visit_detector_region, 

880 replace=False, 

881 ) 

882 self.log.verbose( 

883 "Inserted %s visit_detector_region records for new visit %s.", 

884 len(visitRecords.visit_detector_region), 

885 visitRecords.visit.id, 

886 ) 

887 n_new += 1 

888 # Cast below is because MyPy can't determine that 

889 # inserted_or_updated can only be False if update_records 

890 # is True. 

891 elif ( 

892 update_records 

893 or needs_visit_detector_region_inserts 

894 or "region" in cast(dict, inserted_or_updated) 

895 ): 

896 if needs_visit_detector_region_inserts: 

897 self.log.info( 

898 "Visit %d was missing detector region records.", visitRecords.visit.id 

899 ) 

900 # Replace visit-detector regions if we were told to 

901 # update records explicitly, or if the visit region 

902 # changed in an incremental=True update, or if this 

903 # visit didn't have complete detector regions. 

904 self.butler.registry.insertDimensionData( 

905 "visit_detector_region", 

906 *visitRecords.visit_detector_region, 

907 replace=True, 

908 ) 

909 self.log.verbose( 

910 "Re-inserted %s visit_detector_region records for updated visit %s.", 

911 len(visitRecords.visit_detector_region), 

912 visitRecords.visit.id, 

913 ) 

914 n_fully_updated += 1 

915 else: 

916 self.log.verbose( 

917 "Updated visit %s without modifying visit_detector_region records.", 

918 visitRecords.visit.id, 

919 ) 

920 n_partially_updated += 1 

921 

922 # Update obscore exposure records with region information 

923 # from corresponding visits. 

924 if self.config.updateObsCoreTable: 

925 if obscore_manager := self.butler.registry.obsCoreTableManager: 

926 obscore_updates: list[tuple[int, int, Region]] = [] 

927 exposure_ids = [rec.exposure for rec in visitRecords.visit_definition] 

928 for record in visitRecords.visit_detector_region: 

929 obscore_updates += [ 

930 (exposure, record.detector, record.region) for exposure in exposure_ids 

931 ] 

932 if obscore_updates: 

933 obscore_manager.update_exposure_regions(instrument, obscore_updates) 

934 else: 

935 self.log.verbose("Skipped already-existing visit %s.", visitRecords.visit.id) 

936 n_skipped += 1 

937 self.log.info( 

938 "Finished writing database records for %d visit(s): %s left unchanged, %s new, " 

939 "%s updated with new detector regions, %s updated without new detector regions.", 

940 len(definitions), 

941 n_skipped, 

942 n_new, 

943 n_fully_updated, 

944 n_partially_updated, 

945 ) 

946 return Struct( 

947 n_visits=len(definitions), 

948 n_skipped=n_skipped, 

949 n_new=n_new, 

950 n_fully_updated=n_fully_updated, 

951 n_partially_updated=n_partially_updated, 

952 n_filtered=n_filtered, 

953 ) 

954 

955 @contextmanager 

956 def _query_exposures(self, exposure_records: dict[DataCoordinate, DimensionRecord]) -> Iterator[Query]: 

957 with self.butler.query() as query: 

958 yield query.join_data_coordinates(exposure_records.keys()) 

959 

960 def _prefilter( 

961 self, 

962 exposure_records: dict[DataCoordinate, DimensionRecord], 

963 query: Query, 

964 visits_with_complete_regions: set[DataCoordinate] | None, 

965 ) -> int: 

966 n_filtered: int = 0 

967 exposure_dimension_group = self.butler.dimensions["exposure"].minimal_group 

968 visit_dimension_group = self.butler.dimensions["visit"].minimal_group 

969 for dual_data_id in query.data_ids(["exposure", "visit"]): 

970 if ( 

971 visits_with_complete_regions is None 

972 or dual_data_id.subset(visit_dimension_group) in visits_with_complete_regions 

973 ): 

974 if exposure_records.pop(dual_data_id.subset(exposure_dimension_group), None) is not None: 

975 n_filtered += 1 

976 if n_filtered: 

977 self.log.info( 

978 "Filtered out %d on-sky exposure(s) (of %d) that were already associated with a visit.", 

979 n_filtered, 

980 n_filtered + len(exposure_records), 

981 ) 

982 return n_filtered 

983 

984 def _find_visits_with_all_detector_regions(self, query: Query) -> set[DataCoordinate]: 

985 all_detectors: dict[str, set[int]] = {} 

986 for detector_data_id in query.data_ids(["detector"]): 

987 all_detectors.setdefault(cast(str, detector_data_id["instrument"]), set()).add( 

988 cast(int, detector_data_id["detector"]) 

989 ) 

990 visit_dimension_group = self.butler.dimensions["visit"].minimal_group 

991 detectors_with_regions_by_visit: dict[DataCoordinate, set[int]] = {} 

992 for vdr in query.dimension_records("visit_detector_region"): 

993 detectors_with_regions_by_visit.setdefault(vdr.dataId.subset(visit_dimension_group), set()).add( 

994 vdr.detector 

995 ) 

996 result: set[DataCoordinate] = set() 

997 for visit_data_id, detectors_for_visit in detectors_with_regions_by_visit.items(): 

998 if len(detectors_for_visit) == len(all_detectors[cast(str, visit_data_id["instrument"])]): 

999 result.add(visit_data_id) 

1000 return result 

1001 

1002 

1003_T = TypeVar("_T") 

1004 

1005 

1006def _reduceOrNone(func: Callable[[_T, _T], _T | None], iterable: Iterable[_T | None]) -> _T | None: 

1007 """Apply a binary function to pairs of elements in an iterable until a 

1008 single value is returned, but return `None` if any element is `None` or 

1009 there are no elements. 

1010 """ 

1011 r: _T | None = None 

1012 for v in iterable: 

1013 if v is None: 

1014 return None 

1015 if r is None: 

1016 r = v 

1017 else: 

1018 r = func(r, v) 

1019 return r 

1020 

1021 

1022def _value_if_equal(a: _T, b: _T) -> _T | None: 

1023 """Return either argument if they are equal, or `None` if they are not.""" 

1024 return a if a == b else None 

1025 

1026 

1027def _calc_mean_angle(angles: list[float]) -> float: 

1028 """Calculate the mean angle, taking into account 0/360 wrapping. 

1029 

1030 Parameters 

1031 ---------- 

1032 angles : `list` [`float`] 

1033 Angles to average together, in degrees. 

1034 

1035 Returns 

1036 ------- 

1037 average : `float` 

1038 Average angle in degrees. 

1039 """ 

1040 # Save on all the math if we only have one value. 

1041 if len(angles) == 1: 

1042 return angles[0] 

1043 

1044 # Convert polar coordinates of unit circle to complex values. 

1045 # Average the complex values. 

1046 # Convert back to a phase angle. 

1047 return math.degrees(cmath.phase(sum(cmath.rect(1.0, math.radians(d)) for d in angles) / len(angles))) 

1048 

1049 

1050class _GroupExposuresOneToOneConfig(GroupExposuresConfig): 

1051 visitSystemId: Field[int] = Field( 

1052 doc="Integer ID of the visit_system implemented by this grouping algorithm.", 

1053 dtype=int, 

1054 default=0, 

1055 deprecated="No longer used. Replaced by enum.", 

1056 ) 

1057 visitSystemName: Field[str] = Field( 

1058 doc="String name of the visit_system implemented by this grouping algorithm.", 

1059 dtype=str, 

1060 default="one-to-one", 

1061 deprecated="No longer used. Replaced by enum.", 

1062 ) 

1063 

1064 

1065@registerConfigurable("one-to-one", GroupExposuresTask.registry) 

1066class _GroupExposuresOneToOneTask(GroupExposuresTask, metaclass=ABCMeta): 

1067 """An exposure grouping algorithm that simply defines one visit for each 

1068 exposure, reusing the exposures identifiers for the visit. 

1069 """ 

1070 

1071 ConfigClass = _GroupExposuresOneToOneConfig 

1072 

1073 def find_missing( 

1074 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry 

1075 ) -> list[DimensionRecord]: 

1076 # By definition no exposures can be missing. 

1077 return [] 

1078 

1079 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]: 

1080 # No grouping. 

1081 return {exposure.id: [exposure] for exposure in exposures} 

1082 

1083 def group( 

1084 self, exposures: list[DimensionRecord], instrument: Instrument 

1085 ) -> Iterable[VisitDefinitionData]: 

1086 # Docstring inherited from GroupExposuresTask. 

1087 visit_systems = {VisitSystem.from_name("one-to-one")} 

1088 for exposure in exposures: 

1089 yield VisitDefinitionData( 

1090 instrument=exposure.instrument, 

1091 id=exposure.id, 

1092 name=exposure.obs_id, 

1093 exposures=[exposure], 

1094 visit_systems=visit_systems, 

1095 ) 

1096 

1097 def getVisitSystems(self) -> set[VisitSystem]: 

1098 # Docstring inherited from GroupExposuresTask. 

1099 return set(VisitSystem.from_names(["one-to-one"])) 

1100 

1101 

1102class _GroupExposuresByGroupMetadataConfig(GroupExposuresConfig): 

1103 visitSystemId: Field[int] = Field( 

1104 doc="Integer ID of the visit_system implemented by this grouping algorithm.", 

1105 dtype=int, 

1106 default=1, 

1107 deprecated="No longer used. Replaced by enum.", 

1108 ) 

1109 visitSystemName: Field[str] = Field( 

1110 doc="String name of the visit_system implemented by this grouping algorithm.", 

1111 dtype=str, 

1112 default="by-group-metadata", 

1113 deprecated="No longer used. Replaced by enum.", 

1114 ) 

1115 

1116 

1117@registerConfigurable("by-group-metadata", GroupExposuresTask.registry) 

1118class _GroupExposuresByGroupMetadataTask(GroupExposuresTask, metaclass=ABCMeta): 

1119 """An exposure grouping algorithm that uses the exposure group. 

1120 

1121 This algorithm uses the ``group`` dimension for modern universes and the 

1122 ``exposure.group_id`` for older universes. 

1123 

1124 This algorithm *assumes* group ID (generally populated from 

1125 `astro_metadata_translator.ObservationInfo.visit_id`) is not just unique, 

1126 but disjoint from all `ObservationInfo.exposure_id` values - if it isn't, 

1127 it will be impossible to ever use both this grouping algorithm and the 

1128 one-to-one algorithm for a particular camera in the same data repository. 

1129 """ 

1130 

1131 ConfigClass = _GroupExposuresByGroupMetadataConfig 

1132 

1133 def find_missing( 

1134 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry 

1135 ) -> list[DimensionRecord]: 

1136 groups = self.group_exposures(exposures) 

1137 # Determine which group implementation we are using. 

1138 if "group" in registry.dimensions["exposure"].implied: 

1139 group_key = "group" 

1140 else: 

1141 group_key = "group_name" 

1142 missing_exposures: list[DimensionRecord] = [] 

1143 for exposures_in_group in groups.values(): 

1144 # We can not tell how many exposures are expected to be in the 

1145 # visit so we have to query every time. 

1146 first = exposures_in_group[0] 

1147 records = set( 

1148 registry.queryDimensionRecords( 

1149 "exposure", 

1150 where=f"exposure.{group_key} = groupnam", 

1151 bind={"groupnam": getattr(first, group_key)}, 

1152 instrument=first.instrument, 

1153 ) 

1154 ) 

1155 records.difference_update(set(exposures_in_group)) 

1156 missing_exposures.extend(list(records)) 

1157 return missing_exposures 

1158 

1159 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]: 

1160 groups = defaultdict(list) 

1161 group_key = "group" 

1162 if exposures and hasattr(exposures[0], "group_name"): 

1163 group_key = "group_name" 

1164 for exposure in exposures: 

1165 groups[getattr(exposure, group_key)].append(exposure) 

1166 return groups 

1167 

1168 def group( 

1169 self, exposures: list[DimensionRecord], instrument: Instrument 

1170 ) -> Iterable[VisitDefinitionData]: 

1171 # Docstring inherited from GroupExposuresTask. 

1172 visit_systems = {VisitSystem.from_name("by-group-metadata")} 

1173 groups = self.group_exposures(exposures) 

1174 has_group_dimension: bool | None = None 

1175 for visitName, exposuresInGroup in groups.items(): 

1176 instrument_name = exposuresInGroup[0].instrument 

1177 assert instrument_name == instrument.getName(), "Inconsistency in instrument name" 

1178 visit_ids: set[int] = set() 

1179 if has_group_dimension is None: 

1180 has_group_dimension = hasattr(exposuresInGroup[0], "group") 

1181 if has_group_dimension: 

1182 visit_ids = {instrument.group_name_to_group_id(e.group) for e in exposuresInGroup} 

1183 else: 

1184 visit_ids = {e.group_id for e in exposuresInGroup} 

1185 assert len(visit_ids) == 1, "Grouping by exposure group does not yield consistent group IDs" 

1186 yield VisitDefinitionData( 

1187 instrument=instrument_name, 

1188 id=visit_ids.pop(), 

1189 name=visitName, 

1190 exposures=exposuresInGroup, 

1191 visit_systems=visit_systems, 

1192 ) 

1193 

1194 def getVisitSystems(self) -> set[VisitSystem]: 

1195 # Docstring inherited from GroupExposuresTask. 

1196 return set(VisitSystem.from_names(["by-group-metadata"])) 

1197 

1198 

1199class _GroupExposuresByCounterAndExposuresConfig(GroupExposuresConfig): 

1200 visitSystemId: Field[int] = Field( 

1201 doc="Integer ID of the visit_system implemented by this grouping algorithm.", 

1202 dtype=int, 

1203 default=2, 

1204 deprecated="No longer used. Replaced by enum.", 

1205 ) 

1206 visitSystemName: Field[str] = Field( 

1207 doc="String name of the visit_system implemented by this grouping algorithm.", 

1208 dtype=str, 

1209 default="by-counter-and-exposures", 

1210 deprecated="No longer used. Replaced by enum.", 

1211 ) 

1212 

1213 

1214@registerConfigurable("one-to-one-and-by-counter", GroupExposuresTask.registry) 

1215class _GroupExposuresByCounterAndExposuresTask(GroupExposuresTask, metaclass=ABCMeta): 

1216 """An exposure grouping algorithm that uses the sequence start and 

1217 sequence end metadata to create multi-exposure visits, but also 

1218 creates one-to-one visits. 

1219 

1220 This algorithm uses the exposure.seq_start and 

1221 exposure.seq_end fields to collect related snaps. 

1222 It also groups single exposures. 

1223 """ 

1224 

1225 ConfigClass = _GroupExposuresByCounterAndExposuresConfig 

1226 

1227 def find_missing( 

1228 self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry 

1229 ) -> list[DimensionRecord]: 

1230 """Analyze the exposures and return relevant exposures known to 

1231 registry. 

1232 """ 

1233 groups = self.group_exposures(exposures) 

1234 missing_exposures: list[DimensionRecord] = [] 

1235 for exposures_in_group in groups.values(): 

1236 sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num) 

1237 first = sorted_exposures[0] 

1238 

1239 # Only need to look for the seq_nums that we don't already have. 

1240 seq_nums = set(range(first.seq_start, first.seq_end + 1)) 

1241 seq_nums.difference_update({exp.seq_num for exp in sorted_exposures}) 

1242 

1243 if seq_nums: 

1244 # Missing something. Check registry. 

1245 records = list( 

1246 registry.queryDimensionRecords( 

1247 "exposure", 

1248 where="exposure.seq_start = seq_start AND exposure.seq_end = seq_end AND " 

1249 "exposure.seq_num IN (seq_nums)", 

1250 bind={"seq_start": first.seq_start, "seq_end": first.seq_end, "seq_nums": seq_nums}, 

1251 instrument=first.instrument, 

1252 ) 

1253 ) 

1254 missing_exposures.extend(records) 

1255 

1256 return missing_exposures 

1257 

1258 def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]: 

1259 groups = defaultdict(list) 

1260 for exposure in exposures: 

1261 groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure) 

1262 return groups 

1263 

1264 def group( 

1265 self, exposures: list[DimensionRecord], instrument: Instrument 

1266 ) -> Iterable[VisitDefinitionData]: 

1267 # Docstring inherited from GroupExposuresTask. 

1268 system_one_to_one = VisitSystem.from_name("one-to-one") 

1269 system_seq_start_end = VisitSystem.from_name("by-seq-start-end") 

1270 

1271 groups = self.group_exposures(exposures) 

1272 for visit_key, exposures_in_group in groups.items(): 

1273 instrument_name = exposures_in_group[0].instrument 

1274 

1275 # It is possible that the first exposure in a visit has not 

1276 # been ingested. This can be determined and if that is the case 

1277 # we can not reliably define the multi-exposure visit. 

1278 skip_multi = False 

1279 sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num) 

1280 first = sorted_exposures.pop(0) 

1281 if first.seq_num != first.seq_start: 

1282 # Special case seq_num == 0 since that implies that the 

1283 # instrument has no counters and therefore no multi-exposure 

1284 # visits. 

1285 if first.seq_num != 0: 

1286 self.log.warning( 

1287 "First exposure for visit %s is not present. Skipping the multi-snap definition.", 

1288 visit_key, 

1289 ) 

1290 skip_multi = True 

1291 

1292 multi_exposure = False 

1293 if first.seq_start != first.seq_end: 

1294 # This is a multi-exposure visit regardless of the number 

1295 # of exposures present. 

1296 multi_exposure = True 

1297 

1298 # Define the one-to-one visits. 

1299 for exposure in exposures_in_group: 

1300 # Default is to use the exposure ID and name unless 

1301 # this is the first exposure in a multi-exposure visit. 

1302 visit_name = exposure.obs_id 

1303 visit_id = exposure.id 

1304 visit_systems = {system_one_to_one} 

1305 

1306 if not multi_exposure: 

1307 # This is also a by-counter visit. 

1308 # It will use the same visit_name and visit_id. 

1309 visit_systems.add(system_seq_start_end) 

1310 

1311 elif not skip_multi and exposure == first: 

1312 # This is the first legitimate exposure in a multi-exposure 

1313 # visit. It therefore needs a modified visit name and ID 

1314 # so it does not clash with the multi-exposure visit 

1315 # definition. 

1316 visit_name = f"{visit_name}_first" 

1317 visit_id = int(f"9{visit_id}") 

1318 

1319 yield VisitDefinitionData( 

1320 instrument=instrument_name, 

1321 id=visit_id, 

1322 name=visit_name, 

1323 exposures=[exposure], 

1324 visit_systems=visit_systems, 

1325 ) 

1326 

1327 # Multi-exposure visit. 

1328 if not skip_multi and multi_exposure: 

1329 # Define the visit using the first exposure 

1330 visit_name = first.obs_id 

1331 visit_id = first.id 

1332 

1333 yield VisitDefinitionData( 

1334 instrument=instrument_name, 

1335 id=visit_id, 

1336 name=visit_name, 

1337 exposures=exposures_in_group, 

1338 visit_systems={system_seq_start_end}, 

1339 ) 

1340 

1341 def getVisitSystems(self) -> set[VisitSystem]: 

1342 # Docstring inherited from GroupExposuresTask. 

1343 # Using a Config for this is difficult because what this grouping 

1344 # algorithm is doing is using two visit systems. 

1345 # One is using metadata (but not by-group) and the other is the 

1346 # one-to-one. For now hard-code in class. 

1347 return set(VisitSystem.from_names(["one-to-one", "by-seq-start-end"])) 

1348 

1349 

1350class _ComputeVisitRegionsFromSingleRawWcsConfig(ComputeVisitRegionsConfig): 

1351 mergeExposures: Field[bool] = Field( 

1352 doc=( 

1353 "If True, merge per-detector regions over all exposures in a " 

1354 "visit (via convex hull) instead of using the first exposure and " 

1355 "assuming its regions are valid for all others." 

1356 ), 

1357 dtype=bool, 

1358 default=False, 

1359 ) 

1360 detectorId: Field[int | None] = Field( 

1361 doc=( 

1362 "Load the WCS for the detector with this ID. If None, use an " 

1363 "arbitrary detector (the first found in a query of the data " 

1364 "repository for each exposure (or all exposures, if " 

1365 "mergeExposures is True)." 

1366 ), 

1367 dtype=int, 

1368 optional=True, 

1369 default=None, 

1370 ) 

1371 requireVersionedCamera: Field[bool] = Field( 

1372 doc=( 

1373 "If True, raise LookupError if version camera geometry cannot be " 

1374 "loaded for an exposure. If False, use the nominal camera from " 

1375 "the Instrument class instead." 

1376 ), 

1377 dtype=bool, 

1378 optional=False, 

1379 default=False, 

1380 ) 

1381 

1382 

1383@registerConfigurable("single-raw-wcs", ComputeVisitRegionsTask.registry) 

1384class _ComputeVisitRegionsFromSingleRawWcsTask(ComputeVisitRegionsTask): 

1385 """A visit region calculator that uses a single raw WCS (recomputed from 

1386 the ``exposure`` dimension record) and a camera to project the bounding 

1387 boxes of all detectors onto the sky, relating different detectors by their 

1388 positions in focal plane coordinates. 

1389 

1390 Notes 

1391 ----- 

1392 Most instruments should have their raw WCSs determined from a combination 

1393 of boresight angle, rotator angle, and camera geometry, and hence this 

1394 algorithm should produce stable results regardless of which detector the 

1395 raw corresponds to. If this is not the case (e.g. because a per-file FITS 

1396 WCS is used instead), either the ID of the detector should be fixed (see 

1397 the ``detectorId`` config parameter) or a different algorithm used. 

1398 """ 

1399 

1400 ConfigClass = _ComputeVisitRegionsFromSingleRawWcsConfig 

1401 config: _ComputeVisitRegionsFromSingleRawWcsConfig 

1402 

1403 def computeExposureBounds( 

1404 self, exposure: DimensionRecord, *, collections: Any = None 

1405 ) -> dict[int, list[UnitVector3d]]: 

1406 """Compute the lists of unit vectors on the sphere that correspond to 

1407 the sky positions of detector corners. 

1408 

1409 Parameters 

1410 ---------- 

1411 exposure : `DimensionRecord` 

1412 Dimension record for the exposure. 

1413 collections : Any, optional 

1414 Collections to be searched for raws and camera geometry, overriding 

1415 ``self.butler.collections.defaults``. 

1416 Can be any of the types supported by the ``collections`` argument 

1417 to butler construction. 

1418 

1419 Returns 

1420 ------- 

1421 bounds : `dict` 

1422 Dictionary mapping detector ID to a list of unit vectors on the 

1423 sphere representing that detector's corners projected onto the sky. 

1424 """ 

1425 if collections is None: 

1426 collections = list(self.butler.collections.defaults) 

1427 camera, versioned = loadCamera(self.butler, exposure.dataId, collections=collections) 

1428 if not versioned and self.config.requireVersionedCamera: 

1429 raise LookupError(f"No versioned camera found for exposure {exposure.dataId}.") 

1430 

1431 orientation = lsst.geom.Angle(exposure.sky_angle, lsst.geom.degrees) 

1432 radec = lsst.geom.SpherePoint( 

1433 lsst.geom.Angle(exposure.tracking_ra, lsst.geom.degrees), 

1434 lsst.geom.Angle(exposure.tracking_dec, lsst.geom.degrees), 

1435 ) 

1436 

1437 if self.config.detectorId is None: 

1438 detectorId = next(camera.getIdIter()) 

1439 else: 

1440 detectorId = self.config.detectorId 

1441 wcsDetector = camera[detectorId] 

1442 

1443 # Ask the raw formatter to create the relevant WCS 

1444 # This allows flips to be taken into account 

1445 instrument = self.getInstrument(exposure.instrument) 

1446 rawFormatter = instrument.getRawFormatter({"detector": detectorId}) 

1447 

1448 try: 

1449 wcs = rawFormatter.makeRawSkyWcsFromBoresight(radec, orientation, wcsDetector) # type: ignore 

1450 except AttributeError: 

1451 raise TypeError( 

1452 f"Raw formatter is {get_full_type_name(rawFormatter)} but visit" 

1453 " definition requires it to support 'makeRawSkyWcsFromBoresight'" 

1454 ) from None 

1455 

1456 fpToSky = wcsDetector.getTransform(FOCAL_PLANE, PIXELS).then(wcs.getTransform()) 

1457 bounds = {} 

1458 for detector in camera: 

1459 pixelsToSky = detector.getTransform(PIXELS, FOCAL_PLANE).then(fpToSky) 

1460 pixCorners = Box2D(detector.getBBox().dilatedBy(self.config.padding)).getCorners() 

1461 bounds[detector.getId()] = [ 

1462 skyCorner.getVector() for skyCorner in pixelsToSky.applyForward(pixCorners) 

1463 ] 

1464 return bounds 

1465 

1466 def compute( 

1467 self, visit: VisitDefinitionData, *, collections: Any = None 

1468 ) -> tuple[Region, dict[int, Region]]: 

1469 # Docstring inherited from ComputeVisitRegionsTask. 

1470 if self.config.mergeExposures: 

1471 detectorBounds: dict[int, list[UnitVector3d]] = defaultdict(list) 

1472 for exposure in visit.exposures: 

1473 exposureDetectorBounds = self.computeExposureBounds(exposure, collections=collections) 

1474 for detectorId, bounds in exposureDetectorBounds.items(): 

1475 detectorBounds[detectorId].extend(bounds) 

1476 else: 

1477 detectorBounds = self.computeExposureBounds(visit.exposures[0], collections=collections) 

1478 visitBounds = [] 

1479 detectorRegions = {} 

1480 for detectorId, bounds in detectorBounds.items(): 

1481 detectorRegions[detectorId] = ConvexPolygon.convexHull(bounds) 

1482 visitBounds.extend(bounds) 

1483 return ConvexPolygon.convexHull(visitBounds), detectorRegions