Coverage for python / lsst / obs / lsst / _ingest_guider.py: 10%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:01 +0000
1# This file is part of obs_lsst.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ingest_guider"]
26import contextlib
27import json
28import logging
29from collections import defaultdict
30from collections.abc import Callable
31from typing import Any
33import astropy.io.fits
34import lsst.afw.fits
35import lsst.obs.lsst.translators # Force translators to import # noqa: F401
36from astro_metadata_translator import ObservationInfo
37from lsst.daf.butler import (Butler, DataCoordinate, DatasetIdGenEnum,
38 DatasetRef, DatasetType, FileDataset,
39 MissingDatasetTypeError)
40from lsst.obs.base.formatters.fitsGeneric import FitsGenericFormatter
41from lsst.resources import ResourcePath, ResourcePathExpression
43_LOG = logging.getLogger(__name__)
44_DATASET_TYPE_NAME = "guider_raw"
45DEFAULT_GUIDER_REGEX = r".*SG.*_guider\.fits$"
46_DEFAULT_RUN_FORMAT = "{}/raw/guider"
49def _do_nothing(*args: Any, **kwargs: Any) -> None:
50 """Do nothing.
52 This is a function that accepts anything and does nothing.
53 For use as a default in callback arguments.
54 """
55 pass
58def ingest_guider(
59 butler: Butler,
60 locations: list[ResourcePathExpression],
61 *,
62 file_filter: str = DEFAULT_GUIDER_REGEX,
63 group_files: bool = True,
64 run: str | None = None,
65 transfer: str = "auto",
66 register_dataset_type: bool = False,
67 track_file_attrs: bool = True,
68 on_success: Callable[[list[FileDataset]], Any] = _do_nothing,
69 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing,
70 on_undefined_exposure: Callable[[ResourcePath, str], Any] = _do_nothing,
71 on_ingest_failure: Callable[[list[FileDataset], Exception], Any] = _do_nothing,
72 fail_fast: bool = False,
73) -> list[DatasetRef]:
74 """Ingest the given files into the butler repository.
76 Parameters
77 ----------
78 butler : `lsst.daf.butler.Butler`
79 Butler in which to ingest the guider files.
80 locations : `list` [ `lsst.resources.ResourcePathExpression` ]
81 Guider files to ingest or directories of guider files.
82 file_filter : `str`, optional
83 The file filter to use if directories are to be searched.
84 group_files: `bool`, optional
85 If `True` files are ingested in groups based on the directories
86 they are found in. If `False` directories are searched and all files
87 are ingested together. If explicit files are given they are treated
88 as their own group.
89 run : `str` or `None`, optional
90 The name of the run that will be receiving these datasets. By default.
91 if `None`, a value of `<INSTRUMENT>/raw/guider` is used.
92 transfer : `str`, optional
93 Transfer mode to use for ingest. Default is "auto". If "direct"
94 mode is used ingestion of a file that is already present in the
95 butler repo will not be an error.
96 register_dataset_type : `bool`, optional
97 If `True` the dataset type will be created if it is not defined.
98 Default is `False`.
99 track_file_attrs : `bool`, optional
100 Control whether file attributes such as the size or checksum should
101 be tracked by the datastore. Whether this parameter is honored
102 depends on the specific datastore implementation.
103 on_success : `Callable`, optional
104 A callback invoked when all of the raws associated with a group
105 are ingested. Will be passed a list of `FileDataset` objects, each
106 containing one or more resolved `DatasetRef` objects. If this callback
107 raises it will interrupt the entire ingest process, even if
108 ``fail_fast`` is `False`.
109 on_metadata_failure : `Callable`, optional
110 A callback invoked when a failure occurs trying to translate the
111 metadata for a file. Will be passed the URI and the exception, in
112 that order, as positional arguments. Guaranteed to be called in an
113 ``except`` block, allowing the callback to re-raise or replace (with
114 ``raise ... from``) default exception handling.
115 on_undefined_exposure : `Callable`, optional
116 A callback invoked when a guider file can't be ingested because the
117 corresponding exposure dimension record does not yet exist. Will be
118 passed the URI and the OBSID.
119 on_ingest_failure : `Callable`, optional
120 A callback invoked when dimension record or dataset insertion into the
121 database fails for an exposure. Will be passed a list of
122 `~lsst.daf.butler.FileDataset` objects corresponding to the files
123 being ingested, and the exception as positional arguments.
124 Guaranteed to be called in an ``except`` block, allowing the callback
125 to re-raise or replace (with ``raise ... from``) to override the
126 usual error handling (before ``fail_fast`` logic occurs).
127 fail_fast : `bool`, optional
128 If `True` ingest will abort if there is any issue with any files. If
129 `False`, an attempt will be made
131 Returns
132 -------
133 refs : `list` [ `DatasetRef` ]
134 Butler datasets that were ingested.
136 Notes
137 -----
138 Always uses a dataset type named "guider_raw" with dimensions instrument,
139 detector, exposure. The exposure must already be defined.
140 """
141 dataset_type = DatasetType(
142 _DATASET_TYPE_NAME,
143 {"instrument", "detector", "exposure"},
144 "Stamps",
145 universe=butler.dimensions,
146 )
148 if register_dataset_type:
149 butler.registry.registerDatasetType(dataset_type)
150 else:
151 try:
152 registry_dataset_type = butler.get_dataset_type(_DATASET_TYPE_NAME)
153 except MissingDatasetTypeError as e:
154 e.add_note(
155 f"Can not ingest guider data without registering the {_DATASET_TYPE_NAME} dataset type. "
156 "Consider re-running with 'register_dataset_type' option."
157 )
158 raise
159 if not registry_dataset_type.is_compatible_with(dataset_type):
160 raise RuntimeError(
161 f"Registry dataset type {registry_dataset_type} is incompatible with "
162 f"definition required for guider ingest ({dataset_type})"
163 )
165 ingested_refs = []
166 missing_files = set()
167 if group_files:
168 for group in ResourcePath.findFileResources(
169 locations, file_filter, grouped=True
170 ):
171 files = list(group)
172 _LOG.info(
173 "Found group containing %d file%s in directory %s",
174 len(files),
175 "" if len(files) == 1 else "s",
176 files[0].dirname(),
177 )
178 ingested, missing = _ingest_group(
179 butler,
180 dataset_type,
181 files,
182 run=run,
183 transfer=transfer,
184 track_file_attrs=track_file_attrs,
185 on_ingest_failure=on_ingest_failure,
186 on_metadata_failure=on_metadata_failure,
187 on_undefined_exposure=on_undefined_exposure,
188 on_success=on_success,
189 fail_fast=fail_fast,
190 )
191 ingested_refs.extend(ingested)
192 missing_files.update(missing)
193 else:
194 files = list(
195 ResourcePath.findFileResources(locations, file_filter, grouped=False)
196 )
197 _LOG.info("Ingesting %d file%s", len(files), "" if len(files) == 1 else "s")
198 ingested_refs, missing_files = _ingest_group(
199 butler,
200 dataset_type,
201 files,
202 run=run,
203 transfer=transfer,
204 track_file_attrs=track_file_attrs,
205 on_ingest_failure=on_ingest_failure,
206 on_metadata_failure=on_metadata_failure,
207 on_undefined_exposure=on_undefined_exposure,
208 on_success=on_success,
209 fail_fast=fail_fast,
210 )
212 if n_missed := len(missing_files):
213 msg = "\n".join(f" - {f}" for f in missing_files)
214 _LOG.warning("Failed to ingest the following:\n%s", msg)
215 raise RuntimeError(
216 f"Failed to ingest {n_missed} file{'' if n_missed == 1 else 's'}."
217 )
219 return ingested_refs
222def _ingest_group(
223 butler: Butler,
224 dataset_type: DatasetType,
225 files: list[ResourcePath],
226 *,
227 run: str | None = None,
228 transfer: str = "auto",
229 track_file_attrs: bool = True,
230 on_success: Callable[[list[FileDataset]], Any] = _do_nothing,
231 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing,
232 on_undefined_exposure: Callable[[ResourcePath, str], Any] = _do_nothing,
233 on_ingest_failure: Callable[[list[FileDataset], Exception], Any] = _do_nothing,
234 fail_fast: bool = False,
235) -> tuple[list[DatasetRef], set[str]]:
236 # Map filenames to initial data ID (using obs ID rather than exposure ID).
237 raw_data_id: dict[str, ObservationInfo] = {}
239 # Map instrument name to observation IDs.
240 obs_ids: dict[str, set[str]] = defaultdict(set)
242 # Retain information about failed metadata extraction.
243 # Mapping of file name to error message.
244 failed_metadata: dict[str, str] = {}
246 # The GUIDE detectors for each instrument.
247 guide_detectors: dict[str, set[int]] = {}
249 # Since there may be multiple guider files for a single exposure,
250 # accumulate the exposure information before converting obs_id to exposure
251 # id.
252 for file in files:
253 metadata_path = file.updatedExtension(".json")
254 if metadata_path == file:
255 # Attempting to ingest the sidecar file.
256 try:
257 raise RuntimeError(
258 f"Can not ingest sidecar file as GUIDER file (attempting {metadata_path})"
259 )
260 except RuntimeError as e:
261 failed_metadata[file] = str(e)
262 on_metadata_failure(file, e)
263 if fail_fast:
264 raise
265 continue
267 metadata = None
268 if metadata_path.exists():
269 with contextlib.suppress(Exception):
270 metadata = json.loads(metadata_path.read().decode())
271 if metadata is None:
272 # Could not find sidecar file or it was corrupt. Read from the
273 # FITS file itself.
274 # Allow direct remote read from S3.
275 try:
276 fs, fspath = file.to_fsspec()
277 with fs.open(fspath) as f, astropy.io.fits.open(f) as fits_obj:
278 metadata = fits_obj[0].header
279 except Exception as e:
280 failed_metadata[file] = str(e)
281 on_metadata_failure(file, e)
282 if fail_fast:
283 raise RuntimeError(
284 f"Problem extracting metadata for file {file}"
285 ) from e
286 continue
288 # Do not run fix_header since we only need the OBSID and the detector
289 # number.
290 required = {"instrument", "detector_num", "observation_id"}
291 try:
292 info = ObservationInfo(
293 metadata, filename=file, subset=required, pedantic=True
294 )
295 except KeyError as e:
296 failed_metadata[file] = str(e)
297 on_metadata_failure(file, e)
298 if fail_fast:
299 raise RuntimeError(f"Problem parsing metadata for file {file}") from e
300 continue
302 # Populate detector lookup table.
303 if info.instrument not in guide_detectors:
304 guide_detectors[info.instrument] = set(
305 rec.id
306 for rec in butler.query_dimension_records(
307 "detector", instrument=info.instrument
308 )
309 if rec.purpose == "GUIDER"
310 )
312 if info.detector_num not in guide_detectors[info.instrument]:
313 # The callbacks are documented to be called within an exception.
314 try:
315 raise ValueError(f"File {file} is not a GUIDER observation.")
316 except ValueError as e:
317 failed_metadata[file] = str(e)
318 on_metadata_failure(file, e)
319 if fail_fast:
320 raise
321 continue
323 raw_data_id[file] = info
324 obs_ids[info.instrument].add(info.observation_id)
326 if run is not None and len(obs_ids) > 1:
327 # We do not want to ingest files from different instruments into
328 # the same run so only allow this if we are defining the RUN
329 # internally.
330 raise RuntimeError(
331 f"Can only ingest data from a single instrument into a single RUN but got {obs_ids.keys()}"
332 )
334 if failed_metadata:
335 msg = "\n".join(f" - {f}" for f in failed_metadata)
336 _LOG.warning("Failed to extract usable GUIDER metadata for:\n%s", msg)
338 # Map obs_id to a tuple of instrument and exposure ID in case we are
339 # ingesting guiders from multiple instruments.
340 obs_id_to_exposure_ids: dict[str, tuple[str, int]] = {}
342 requested_obs_ids: set[str] = set()
343 found_obs_ids: set[str] = set()
345 with butler.query() as query:
346 for instrument, observation_ids in obs_ids.items():
347 requested_obs_ids.update(observation_ids)
348 query = query.where(
349 "exposure.obs_id in (OBSID)",
350 bind={"OBSID": observation_ids},
351 instrument=instrument,
352 )
354 exposures = {
355 e.obs_id: (e.instrument, e.id)
356 for e in query.dimension_records("exposure")
357 }
358 obs_id_to_exposure_ids.update(exposures)
359 found_obs_ids.update(exposures.keys())
361 if missing_obs_ids := requested_obs_ids - found_obs_ids:
362 missing_str = "\n".join(f" - {obs}" for obs in missing_obs_ids)
363 _LOG.warning(
364 "Failed to find exposure records for the following observation IDs:\n%s",
365 missing_str,
366 )
368 # Now there is enough information to create the ingest datasets.
369 output_runs: set[str] = set()
370 failed_exposure_metadata: list[str] = []
371 datasets: list[FileDataset] = []
372 for file, info in raw_data_id.items():
373 if info.observation_id not in obs_id_to_exposure_ids:
374 failed_exposure_metadata.append(file)
375 on_undefined_exposure(file, info.observation_id)
376 if fail_fast:
377 raise RuntimeError(
378 f"Exposure metadata not yet defined for OBSID {info.observation_id} "
379 f"but required for file {file}."
380 )
381 continue
383 if run is None:
384 output_run = _DEFAULT_RUN_FORMAT.format(info.instrument)
385 else:
386 output_run = run
388 if output_run not in output_runs:
389 # Always try to create on first pass.
390 butler.collections.register(output_run)
391 output_runs.add(output_run)
393 data_id = DataCoordinate.standardize(
394 instrument=info.instrument,
395 detector=info.detector_num,
396 exposure=obs_id_to_exposure_ids[info.observation_id][1],
397 universe=butler.dimensions,
398 )
399 # These are "raw" type so we want to use a predictable UUID.
400 ref = DatasetRef(
401 dataset_type,
402 data_id,
403 output_run,
404 id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN,
405 )
406 datasets.append(
407 FileDataset(path=file, refs=[ref], formatter=FitsGenericFormatter)
408 )
410 if failed_exposure_metadata:
411 failed_exposure_msg = "\n".join(f" - {f}" for f in failed_exposure_metadata)
412 _LOG.warning(
413 "Failed to match the following files to exposure IDs:\n%s",
414 failed_exposure_msg,
415 )
417 # Now ingest the files.
418 if datasets:
419 try:
420 butler.ingest(
421 *datasets, transfer=transfer, record_validation_info=track_file_attrs
422 )
423 except Exception as e:
424 on_ingest_failure(datasets, e)
425 datasets = [] # Effectively nothing was ingested.
426 if fail_fast:
427 raise
428 else:
429 on_success(datasets)
431 missing_files = set()
432 if len(datasets) != len(files):
433 ingested_files = {d.path for d in datasets}
434 given_files = set(files)
435 missing_files = given_files - ingested_files
437 return [d.refs[0] for d in datasets], missing_files