Coverage for python / lsst / obs / base / script / updateExposures.py: 14%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:28 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import logging 

24 

25from lsst.daf.butler import Butler, DimensionRecord 

26from lsst.daf.butler.script.queryDatasets import QueryDatasets 

27from lsst.obs.base import Instrument 

28from lsst.obs.base.ingest import RawIngestTask 

29from lsst.pipe.base.configOverrides import ConfigOverrides 

30from lsst.utils import doImportType 

31 

32_LOG = logging.getLogger(__name__) 

33 

34 

35def updateExposures( 

36 repo: str, 

37 instrument: str, 

38 where: str, 

39 collections: list[str] | None, 

40 fail_fast: bool = False, 

41 config: dict[str, str] | None = None, 

42 config_file: str | None = None, 

43 processes: int = 1, 

44 ingest_task: str = "lsst.obs.base.RawIngestTask", 

45) -> None: 

46 """Ingest raw frames into the butler registry. 

47 

48 Parameters 

49 ---------- 

50 repo : `str` 

51 URI to the repository. 

52 instrument : `str` 

53 Butler name of instrument. This is used to determine the default 

54 collection and the raw dataset type. 

55 where : `str` 

56 The query to use to discover the datasets. Does not need to include 

57 the instrument name. Must be specified to reduce the number of 

58 datasets being processed. 

59 collections : `list` [`str`] or `None` 

60 Override the default raw collection. 

61 fail_fast : `bool`, optional 

62 If True, stop ingest as soon as any problem is encountered with any 

63 file. Otherwise problem files will be skipped and logged and a report 

64 issued at completion. 

65 config : `dict` [`str`, `str`] or `None`, optional 

66 Key-value pairs to apply as overrides to the ingest config. 

67 config_file : `str` or `None`, optional 

68 Path to a config file that contains overrides to the ingest config. 

69 processes : `int`, optional 

70 Number of workers to use for ingest. 

71 ingest_task : `str`, optional 

72 The fully qualified class name of the ingest task to use by default 

73 lsst.obs.base.RawIngestTask. 

74 

75 Raises 

76 ------ 

77 Exception 

78 Raised if operations on configuration object fail. 

79 """ 

80 if not where.strip(): 

81 raise ValueError("A WHERE query string must be defined.") 

82 

83 TaskClass = doImportType(ingest_task) 

84 assert issubclass(TaskClass, RawIngestTask) 

85 ingestConfig = TaskClass.ConfigClass() 

86 configOverrides = ConfigOverrides() 

87 if config_file is not None: 

88 configOverrides.addFileOverride(config_file) 

89 if config is not None: 

90 for name, value in config.items(): 

91 configOverrides.addValueOverride(name, value) 

92 if fail_fast: 

93 configOverrides.addValueOverride("failFast", True) 

94 configOverrides.applyTo(ingestConfig) 

95 

96 records_updated: list[DimensionRecord] = [] 

97 

98 def on_metadata_updates(record: DimensionRecord) -> None: 

99 records_updated.append(record) 

100 

101 with Butler.from_config(repo, writeable=True) as butler: 

102 ingester = TaskClass( 

103 config=ingestConfig, # type: ignore[arg-type] 

104 butler=butler, 

105 on_exposure_record=on_metadata_updates, 

106 ) 

107 instr_ = Instrument.from_string(instrument, registry=butler.registry) 

108 if not collections: 

109 collections = [instr_.makeDefaultRawIngestRunName()] 

110 raw_datasetType = ingester.get_raw_datasetType(instr_) 

111 

112 # Constrain the query by instrument. The QueryDatasets class does 

113 # not allow kwargs overrides. 

114 where = where + f" AND instrument = '{instr_.getName()}'" 

115 

116 # Query for datasets. 

117 query = QueryDatasets( 

118 butler=butler, 

119 glob=[raw_datasetType.name], 

120 collections=collections, 

121 where=where, 

122 find_first=False, 

123 with_dimension_records=False, 

124 show_uri=False, 

125 ) 

126 refs = set(itertools.chain(*query.getDatasets())) 

127 

128 # Since we are updating exposure records we only need one ref per 

129 # exposure. We can not expect people to add a detector constraint 

130 # themselves and we do not want to gather metadata from 200x more files 

131 # than we need for LSSTCam in the embargo rack. 

132 n_refs = len(refs) 

133 refs_by_exposure = {ref.dataId["exposure"]: ref for ref in refs} 

134 refs = set(refs_by_exposure.values()) 

135 if n_refs != (n_filtered := len(refs)): 

136 _LOG.info("Selecting one dataset per exposure. Filtering %d down to %d.", n_refs, n_filtered) 

137 

138 # The ingest code wants the raw file locations and we want the zips 

139 # without fragments. 

140 uris = butler.get_many_uris(refs) 

141 locations = { 

142 uri.replace(fragment="") 

143 for uri in itertools.chain.from_iterable(res.iter_all() for res in uris.values()) 

144 } 

145 

146 n_refs = len(refs) 

147 s_refs = "s" if n_refs != 1 else "" 

148 n_files = len(locations) 

149 s_files = "s" if n_files != 1 else "" 

150 _LOG.info( 

151 "Recalculating exposure records for %d dataset%s from %d file%s.", 

152 n_refs, 

153 s_refs, 

154 n_files, 

155 s_files, 

156 ) 

157 

158 ingester.run( 

159 locations, 

160 run=None, # Not writing to a RUN collection. 

161 num_workers=processes, 

162 update_exposure_records=True, 

163 skip_ingest=True, 

164 ) 

165 

166 if records_updated: 

167 n_updated = len(records_updated) 

168 s_updated = "s" if n_updated != 1 else "" 

169 _LOG.info("Changed %d exposure record%s", n_updated, s_updated) 

170 else: 

171 _LOG.info("No exposure records were changed.")