Coverage for python / lsst / obs / lsst / _ingest_guider.py: 10%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:47 +0000

1# This file is part of obs_lsst. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ingest_guider"] 

25 

26import contextlib 

27import json 

28import logging 

29from collections import defaultdict 

30from collections.abc import Callable 

31from typing import Any 

32 

33import astropy.io.fits 

34import lsst.afw.fits 

35import lsst.obs.lsst.translators # Force translators to import # noqa: F401 

36from astro_metadata_translator import ObservationInfo 

37from lsst.daf.butler import (Butler, DataCoordinate, DatasetIdGenEnum, 

38 DatasetRef, DatasetType, FileDataset, 

39 MissingDatasetTypeError) 

40from lsst.obs.base.formatters.fitsGeneric import FitsGenericFormatter 

41from lsst.resources import ResourcePath, ResourcePathExpression 

42 

43_LOG = logging.getLogger(__name__) 

44_DATASET_TYPE_NAME = "guider_raw" 

45DEFAULT_GUIDER_REGEX = r".*SG.*_guider\.fits$" 

46_DEFAULT_RUN_FORMAT = "{}/raw/guider" 

47 

48 

49def _do_nothing(*args: Any, **kwargs: Any) -> None: 

50 """Do nothing. 

51 

52 This is a function that accepts anything and does nothing. 

53 For use as a default in callback arguments. 

54 """ 

55 pass 

56 

57 

58def ingest_guider( 

59 butler: Butler, 

60 locations: list[ResourcePathExpression], 

61 *, 

62 file_filter: str = DEFAULT_GUIDER_REGEX, 

63 group_files: bool = True, 

64 run: str | None = None, 

65 transfer: str = "auto", 

66 register_dataset_type: bool = False, 

67 track_file_attrs: bool = True, 

68 on_success: Callable[[list[FileDataset]], Any] = _do_nothing, 

69 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing, 

70 on_undefined_exposure: Callable[[ResourcePath, str], Any] = _do_nothing, 

71 on_ingest_failure: Callable[[list[FileDataset], Exception], Any] = _do_nothing, 

72 fail_fast: bool = False, 

73) -> list[DatasetRef]: 

74 """Ingest the given files into the butler repository. 

75 

76 Parameters 

77 ---------- 

78 butler : `lsst.daf.butler.Butler` 

79 Butler in which to ingest the guider files. 

80 locations : `list` [ `lsst.resources.ResourcePathExpression` ] 

81 Guider files to ingest or directories of guider files. 

82 file_filter : `str`, optional 

83 The file filter to use if directories are to be searched. 

84 group_files: `bool`, optional 

85 If `True` files are ingested in groups based on the directories 

86 they are found in. If `False` directories are searched and all files 

87 are ingested together. If explicit files are given they are treated 

88 as their own group. 

89 run : `str` or `None`, optional 

90 The name of the run that will be receiving these datasets. By default. 

91 if `None`, a value of `<INSTRUMENT>/raw/guider` is used. 

92 transfer : `str`, optional 

93 Transfer mode to use for ingest. Default is "auto". If "direct" 

94 mode is used ingestion of a file that is already present in the 

95 butler repo will not be an error. 

96 register_dataset_type : `bool`, optional 

97 If `True` the dataset type will be created if it is not defined. 

98 Default is `False`. 

99 track_file_attrs : `bool`, optional 

100 Control whether file attributes such as the size or checksum should 

101 be tracked by the datastore. Whether this parameter is honored 

102 depends on the specific datastore implementation. 

103 on_success : `Callable`, optional 

104 A callback invoked when all of the raws associated with a group 

105 are ingested. Will be passed a list of `FileDataset` objects, each 

106 containing one or more resolved `DatasetRef` objects. If this callback 

107 raises it will interrupt the entire ingest process, even if 

108 ``fail_fast`` is `False`. 

109 on_metadata_failure : `Callable`, optional 

110 A callback invoked when a failure occurs trying to translate the 

111 metadata for a file. Will be passed the URI and the exception, in 

112 that order, as positional arguments. Guaranteed to be called in an 

113 ``except`` block, allowing the callback to re-raise or replace (with 

114 ``raise ... from``) default exception handling. 

115 on_undefined_exposure : `Callable`, optional 

116 A callback invoked when a guider file can't be ingested because the 

117 corresponding exposure dimension record does not yet exist. Will be 

118 passed the URI and the OBSID. 

119 on_ingest_failure : `Callable`, optional 

120 A callback invoked when dimension record or dataset insertion into the 

121 database fails for an exposure. Will be passed a list of 

122 `~lsst.daf.butler.FileDataset` objects corresponding to the files 

123 being ingested, and the exception as positional arguments. 

124 Guaranteed to be called in an ``except`` block, allowing the callback 

125 to re-raise or replace (with ``raise ... from``) to override the 

126 usual error handling (before ``fail_fast`` logic occurs). 

127 fail_fast : `bool`, optional 

128 If `True` ingest will abort if there is any issue with any files. If 

129 `False`, an attempt will be made 

130 

131 Returns 

132 ------- 

133 refs : `list` [ `DatasetRef` ] 

134 Butler datasets that were ingested. 

135 

136 Notes 

137 ----- 

138 Always uses a dataset type named "guider_raw" with dimensions instrument, 

139 detector, exposure. The exposure must already be defined. 

140 """ 

141 dataset_type = DatasetType( 

142 _DATASET_TYPE_NAME, 

143 {"instrument", "detector", "exposure"}, 

144 "Stamps", 

145 universe=butler.dimensions, 

146 ) 

147 

148 if register_dataset_type: 

149 butler.registry.registerDatasetType(dataset_type) 

150 else: 

151 try: 

152 registry_dataset_type = butler.get_dataset_type(_DATASET_TYPE_NAME) 

153 except MissingDatasetTypeError as e: 

154 e.add_note( 

155 f"Can not ingest guider data without registering the {_DATASET_TYPE_NAME} dataset type. " 

156 "Consider re-running with 'register_dataset_type' option." 

157 ) 

158 raise 

159 if not registry_dataset_type.is_compatible_with(dataset_type): 

160 raise RuntimeError( 

161 f"Registry dataset type {registry_dataset_type} is incompatible with " 

162 f"definition required for guider ingest ({dataset_type})" 

163 ) 

164 

165 ingested_refs = [] 

166 missing_files = set() 

167 if group_files: 

168 for group in ResourcePath.findFileResources( 

169 locations, file_filter, grouped=True 

170 ): 

171 files = list(group) 

172 _LOG.info( 

173 "Found group containing %d file%s in directory %s", 

174 len(files), 

175 "" if len(files) == 1 else "s", 

176 files[0].dirname(), 

177 ) 

178 ingested, missing = _ingest_group( 

179 butler, 

180 dataset_type, 

181 files, 

182 run=run, 

183 transfer=transfer, 

184 track_file_attrs=track_file_attrs, 

185 on_ingest_failure=on_ingest_failure, 

186 on_metadata_failure=on_metadata_failure, 

187 on_undefined_exposure=on_undefined_exposure, 

188 on_success=on_success, 

189 fail_fast=fail_fast, 

190 ) 

191 ingested_refs.extend(ingested) 

192 missing_files.update(missing) 

193 else: 

194 files = list( 

195 ResourcePath.findFileResources(locations, file_filter, grouped=False) 

196 ) 

197 _LOG.info("Ingesting %d file%s", len(files), "" if len(files) == 1 else "s") 

198 ingested_refs, missing_files = _ingest_group( 

199 butler, 

200 dataset_type, 

201 files, 

202 run=run, 

203 transfer=transfer, 

204 track_file_attrs=track_file_attrs, 

205 on_ingest_failure=on_ingest_failure, 

206 on_metadata_failure=on_metadata_failure, 

207 on_undefined_exposure=on_undefined_exposure, 

208 on_success=on_success, 

209 fail_fast=fail_fast, 

210 ) 

211 

212 if n_missed := len(missing_files): 

213 msg = "\n".join(f" - {f}" for f in missing_files) 

214 _LOG.warning("Failed to ingest the following:\n%s", msg) 

215 raise RuntimeError( 

216 f"Failed to ingest {n_missed} file{'' if n_missed == 1 else 's'}." 

217 ) 

218 

219 return ingested_refs 

220 

221 

222def _ingest_group( 

223 butler: Butler, 

224 dataset_type: DatasetType, 

225 files: list[ResourcePath], 

226 *, 

227 run: str | None = None, 

228 transfer: str = "auto", 

229 track_file_attrs: bool = True, 

230 on_success: Callable[[list[FileDataset]], Any] = _do_nothing, 

231 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing, 

232 on_undefined_exposure: Callable[[ResourcePath, str], Any] = _do_nothing, 

233 on_ingest_failure: Callable[[list[FileDataset], Exception], Any] = _do_nothing, 

234 fail_fast: bool = False, 

235) -> tuple[list[DatasetRef], set[str]]: 

236 # Map filenames to initial data ID (using obs ID rather than exposure ID). 

237 raw_data_id: dict[str, ObservationInfo] = {} 

238 

239 # Map instrument name to observation IDs. 

240 obs_ids: dict[str, set[str]] = defaultdict(set) 

241 

242 # Retain information about failed metadata extraction. 

243 # Mapping of file name to error message. 

244 failed_metadata: dict[str, str] = {} 

245 

246 # The GUIDE detectors for each instrument. 

247 guide_detectors: dict[str, set[int]] = {} 

248 

249 # Since there may be multiple guider files for a single exposure, 

250 # accumulate the exposure information before converting obs_id to exposure 

251 # id. 

252 for file in files: 

253 metadata_path = file.updatedExtension(".json") 

254 if metadata_path == file: 

255 # Attempting to ingest the sidecar file. 

256 try: 

257 raise RuntimeError( 

258 f"Can not ingest sidecar file as GUIDER file (attempting {metadata_path})" 

259 ) 

260 except RuntimeError as e: 

261 failed_metadata[file] = str(e) 

262 on_metadata_failure(file, e) 

263 if fail_fast: 

264 raise 

265 continue 

266 

267 metadata = None 

268 if metadata_path.exists(): 

269 with contextlib.suppress(Exception): 

270 metadata = json.loads(metadata_path.read().decode()) 

271 if metadata is None: 

272 # Could not find sidecar file or it was corrupt. Read from the 

273 # FITS file itself. 

274 # Allow direct remote read from S3. 

275 try: 

276 fs, fspath = file.to_fsspec() 

277 with fs.open(fspath) as f, astropy.io.fits.open(f) as fits_obj: 

278 metadata = fits_obj[0].header 

279 except Exception as e: 

280 failed_metadata[file] = str(e) 

281 on_metadata_failure(file, e) 

282 if fail_fast: 

283 raise RuntimeError( 

284 f"Problem extracting metadata for file {file}" 

285 ) from e 

286 continue 

287 

288 # Do not run fix_header since we only need the OBSID and the detector 

289 # number. 

290 required = {"instrument", "detector_num", "observation_id"} 

291 try: 

292 info = ObservationInfo( 

293 metadata, filename=file, subset=required, pedantic=True 

294 ) 

295 except KeyError as e: 

296 failed_metadata[file] = str(e) 

297 on_metadata_failure(file, e) 

298 if fail_fast: 

299 raise RuntimeError(f"Problem parsing metadata for file {file}") from e 

300 continue 

301 

302 # Populate detector lookup table. 

303 if info.instrument not in guide_detectors: 

304 guide_detectors[info.instrument] = set( 

305 rec.id 

306 for rec in butler.query_dimension_records( 

307 "detector", instrument=info.instrument 

308 ) 

309 if rec.purpose == "GUIDER" 

310 ) 

311 

312 if info.detector_num not in guide_detectors[info.instrument]: 

313 # The callbacks are documented to be called within an exception. 

314 try: 

315 raise ValueError(f"File {file} is not a GUIDER observation.") 

316 except ValueError as e: 

317 failed_metadata[file] = str(e) 

318 on_metadata_failure(file, e) 

319 if fail_fast: 

320 raise 

321 continue 

322 

323 raw_data_id[file] = info 

324 obs_ids[info.instrument].add(info.observation_id) 

325 

326 if run is not None and len(obs_ids) > 1: 

327 # We do not want to ingest files from different instruments into 

328 # the same run so only allow this if we are defining the RUN 

329 # internally. 

330 raise RuntimeError( 

331 f"Can only ingest data from a single instrument into a single RUN but got {obs_ids.keys()}" 

332 ) 

333 

334 if failed_metadata: 

335 msg = "\n".join(f" - {f}" for f in failed_metadata) 

336 _LOG.warning("Failed to extract usable GUIDER metadata for:\n%s", msg) 

337 

338 # Map obs_id to a tuple of instrument and exposure ID in case we are 

339 # ingesting guiders from multiple instruments. 

340 obs_id_to_exposure_ids: dict[str, tuple[str, int]] = {} 

341 

342 requested_obs_ids: set[str] = set() 

343 found_obs_ids: set[str] = set() 

344 

345 with butler.query() as query: 

346 for instrument, observation_ids in obs_ids.items(): 

347 requested_obs_ids.update(observation_ids) 

348 query = query.where( 

349 "exposure.obs_id in (OBSID)", 

350 bind={"OBSID": observation_ids}, 

351 instrument=instrument, 

352 ) 

353 

354 exposures = { 

355 e.obs_id: (e.instrument, e.id) 

356 for e in query.dimension_records("exposure") 

357 } 

358 obs_id_to_exposure_ids.update(exposures) 

359 found_obs_ids.update(exposures.keys()) 

360 

361 if missing_obs_ids := requested_obs_ids - found_obs_ids: 

362 missing_str = "\n".join(f" - {obs}" for obs in missing_obs_ids) 

363 _LOG.warning( 

364 "Failed to find exposure records for the following observation IDs:\n%s", 

365 missing_str, 

366 ) 

367 

368 # Now there is enough information to create the ingest datasets. 

369 output_runs: set[str] = set() 

370 failed_exposure_metadata: list[str] = [] 

371 datasets: list[FileDataset] = [] 

372 for file, info in raw_data_id.items(): 

373 if info.observation_id not in obs_id_to_exposure_ids: 

374 failed_exposure_metadata.append(file) 

375 on_undefined_exposure(file, info.observation_id) 

376 if fail_fast: 

377 raise RuntimeError( 

378 f"Exposure metadata not yet defined for OBSID {info.observation_id} " 

379 f"but required for file {file}." 

380 ) 

381 continue 

382 

383 if run is None: 

384 output_run = _DEFAULT_RUN_FORMAT.format(info.instrument) 

385 else: 

386 output_run = run 

387 

388 if output_run not in output_runs: 

389 # Always try to create on first pass. 

390 butler.collections.register(output_run) 

391 output_runs.add(output_run) 

392 

393 data_id = DataCoordinate.standardize( 

394 instrument=info.instrument, 

395 detector=info.detector_num, 

396 exposure=obs_id_to_exposure_ids[info.observation_id][1], 

397 universe=butler.dimensions, 

398 ) 

399 # These are "raw" type so we want to use a predictable UUID. 

400 ref = DatasetRef( 

401 dataset_type, 

402 data_id, 

403 output_run, 

404 id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN, 

405 ) 

406 datasets.append( 

407 FileDataset(path=file, refs=[ref], formatter=FitsGenericFormatter) 

408 ) 

409 

410 if failed_exposure_metadata: 

411 failed_exposure_msg = "\n".join(f" - {f}" for f in failed_exposure_metadata) 

412 _LOG.warning( 

413 "Failed to match the following files to exposure IDs:\n%s", 

414 failed_exposure_msg, 

415 ) 

416 

417 # Now ingest the files. 

418 if datasets: 

419 try: 

420 butler.ingest( 

421 *datasets, transfer=transfer, record_validation_info=track_file_attrs 

422 ) 

423 except Exception as e: 

424 on_ingest_failure(datasets, e) 

425 datasets = [] # Effectively nothing was ingested. 

426 if fail_fast: 

427 raise 

428 else: 

429 on_success(datasets) 

430 

431 missing_files = set() 

432 if len(datasets) != len(files): 

433 ingested_files = {d.path for d in datasets} 

434 given_files = set(files) 

435 missing_files = given_files - ingested_files 

436 

437 return [d.refs[0] for d in datasets], missing_files