Coverage for python / lsst / obs / base / visit_geometry.py: 39%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:02 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("VisitGeometry",) 

25 

26import itertools 

27from typing import Any 

28 

29import pydantic 

30 

31from lsst.daf.butler import Butler, DimensionRecord 

32from lsst.daf.butler.pydantic_utils import SerializableRegion 

33from lsst.utils.logging import getLogger 

34 

35_LOG = getLogger(__name__) 

36 

37 

38class VisitGeometry(pydantic.BaseModel): 

39 """A serializable struct that holds geometry information for a visit. 

40 

41 This is intended to be used as a butler output dataset intermediary between 

42 a tasks that fit coordinate mappings from stars to reference catalogs and 

43 tool that update butler dimension record regions. 

44 """ 

45 

46 boresight_ra: float 

47 """Re-fit boresight right ascension in degrees.""" 

48 

49 boresight_dec: float 

50 """Re-fit boresight declination in degrees.""" 

51 

52 orientation: float 

53 """Re-fit rotation angle in degrees.""" 

54 

55 visit_region: SerializableRegion 

56 """Updated region for the visit.""" 

57 

58 detector_regions: dict[int, SerializableRegion] = pydantic.Field(default_factory=dict) 

59 """Updated region for each detector in this visit.""" 

60 

61 @classmethod 

62 def update_dimension_records( 

63 cls, 

64 butler: Butler, 

65 instrument: str, 

66 *where: Any, 

67 dataset_type: str = "visit_geometry", 

68 batch_size: int = 100, 

69 **kwargs: Any, 

70 ) -> int: 

71 """Update the dimension records in a data repository by querying for 

72 and reading visit geometry datasets. 

73 

74 Parameters 

75 ---------- 

76 butler : `lsst.daf.butler.Butler` 

77 Butler client. Must be writable and have its default collections 

78 set to the location of the visit geometry datasets. 

79 instrument : `str` 

80 Name of the instrument whose records will be updated. 

81 *where 

82 Positional argument query constraint terms, forwarded to 

83 `lsst.daf.butler.queries.Query.where` (e.g. an expression string or 

84 data ID). 

85 dataset_type : `str`, optional 

86 Dataset type name for the visit geometry datasets. 

87 batch_size : `int`, optional 

88 Number of visits to process at once. 

89 **kwargs 

90 Keyword-only query constraint terms, forwarded to 

91 `lsst.daf.butler.queries.Query.where` (e.g. data ID key-value 

92 pairs). 

93 

94 Returns 

95 ------- 

96 n_visits : `int` 

97 The number of visits updated. 

98 """ 

99 with butler.query() as query: 

100 query = query.join_dataset_search(dataset_type) 

101 query = query.where(instrument=instrument, *where, **kwargs) 

102 _LOG.verbose("Querying for %s datasets.", dataset_type) 

103 all_refs = list(query.datasets(dataset_type)) 

104 all_refs.sort(key=lambda ref: ref.dataId["visit"]) 

105 _LOG.verbose("Querying for visit records (%d expected).", len(all_refs)) 

106 old_visit_records = {r.id: r for r in query.dimension_records("visit")} 

107 _LOG.verbose("Querying for exposure records (%d expected).", len(all_refs)) 

108 old_exposure_records = {r.id: r for r in query.dimension_records("exposure")} 

109 vdr_record_cls = butler.dimensions.elements["visit_detector_region"].RecordClass 

110 for ref_batch in itertools.batched(all_refs, batch_size): 

111 _LOG.verbose( 

112 "Updating records for %d visits with IDs between %d and %d.", 

113 len(ref_batch), 

114 ref_batch[0].dataId["visit"], 

115 ref_batch[-1].dataId["visit"], 

116 ) 

117 new_exposure_records: list[DimensionRecord] = [] 

118 new_visit_records: list[DimensionRecord] = [] 

119 new_vdr_records: list[DimensionRecord] = [] 

120 for ref in ref_batch: 

121 visit_id = ref.dataId["visit"] 

122 visit_geometry: VisitGeometry = butler.get(ref) 

123 new_exposure_records.append( 

124 cls._make_updated_dimension_record( 

125 old_exposure_records[visit_id], 

126 tracking_ra=visit_geometry.boresight_ra, 

127 tracking_dec=visit_geometry.boresight_dec, 

128 sky_angle=visit_geometry.orientation, 

129 ) 

130 ) 

131 new_visit_records.append( 

132 cls._make_updated_dimension_record( 

133 old_visit_records[visit_id], region=visit_geometry.visit_region 

134 ) 

135 ) 

136 for detector_id, vdr_region in visit_geometry.detector_regions.items(): 

137 new_vdr_records.append( 

138 vdr_record_cls( 

139 instrument=instrument, visit=visit_id, detector=detector_id, region=vdr_region 

140 ) 

141 ) 

142 with butler.transaction(): 

143 butler.registry.insertDimensionData("exposure", *new_exposure_records, replace=True) 

144 butler.registry.insertDimensionData("visit", *new_visit_records, replace=True) 

145 butler.registry.insertDimensionData("visit_detector_region", *new_vdr_records, replace=True) 

146 _LOG.info("Records for %d visits updated.", len(all_refs)) 

147 return len(all_refs) 

148 

149 @staticmethod 

150 def _make_updated_dimension_record(original: DimensionRecord, **kwargs: Any) -> DimensionRecord: 

151 as_dict = original.toDict() 

152 as_dict.update(**kwargs) 

153 return type(original)(**as_dict)