Coverage for python / lsst / obs / base / ingest.py: 14%
516 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:33 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "RawExposureData",
26 "RawFileData",
27 "RawFileDatasetInfo",
28 "RawIngestConfig",
29 "RawIngestTask",
30 "makeTransferChoiceField",
31)
33import concurrent.futures
34import contextlib
35import json
36import logging
37import re
38import warnings
39import zipfile
40from collections import defaultdict
41from collections.abc import Callable, Iterable, Iterator, MutableMapping, Sequence, Sized
42from contextlib import contextmanager
43from dataclasses import InitVar, dataclass
44from typing import Any, ClassVar, cast
46from astro_metadata_translator import MetadataTranslator, ObservationInfo, merge_headers
47from astro_metadata_translator.indexing import process_index_data, process_sidecar_data
48from pydantic import BaseModel
50from lsst.afw.fits import readMetadata
51from lsst.daf.butler import (
52 Butler,
53 CollectionType,
54 DataCoordinate,
55 DatasetIdGenEnum,
56 DatasetRef,
57 DatasetType,
58 DimensionRecord,
59 DimensionUniverse,
60 FileDataset,
61 Formatter,
62 FormatterV2,
63 Progress,
64 Timespan,
65)
66from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex
67from lsst.pex.config import ChoiceField, Config, Field
68from lsst.pipe.base import Instrument, Task
69from lsst.resources import ResourcePath, ResourcePathExpression
70from lsst.utils.logging import LsstLoggers
71from lsst.utils.timer import time_this, timeMethod
73from ._instrument import makeExposureRecordFromObsInfo
76def _do_nothing(*args: Any, **kwargs: Any) -> None:
77 """Do nothing.
79 This is a function that accepts anything and does nothing.
80 For use as a default in callback arguments.
81 """
82 pass
85def _log_msg_counter(noun: int | Sized) -> tuple[int, str]:
86 """Count the iterable and return the count and plural modifier.
88 Parameters
89 ----------
90 noun : `Sized` or `int`
91 Thing to count. If given an integer it is assumed to be the count
92 to use to calculate modifier.
94 Returns
95 -------
96 num : `int`
97 Number of items found in ``noun``.
98 modifier : `str`
99 Character to add to the end of a string referring to these items
100 to indicate whether it was a single item or not. Returns empty
101 string if there is one item or "s" otherwise.
103 Examples
104 --------
105 .. code-block:: python
107 log.warning("Found %d file%s", *_log_msg_counter(nfiles))
108 """
109 if isinstance(noun, int):
110 num = noun
111 else:
112 num = len(noun)
113 return num, "" if num == 1 else "s"
116class IngestMetrics(BaseModel):
117 """Metrics collected during raw ingest."""
119 time_for_metadata: float = 0.0
120 """Wall-clock time, in seconds, spent gathering file metadata."""
122 time_for_records: float = 0.0
123 """Wall-clock time, in seconds, spent writing dimension records."""
125 time_for_ingest: float = 0.0
126 """Wall-clock time, in seconds, spent calling butler ingest."""
128 time_for_callbacks: float = 0.0
129 """Wall-clock time, in seconds, processing user-supplied callbacks."""
131 def reset(self) -> None:
132 """Reset all metrics to initial values."""
133 self.time_for_ingest = 0.0
134 self.time_for_records = 0.0
135 self.time_for_metadata = 0.0
136 self.time_for_callbacks = 0.0
138 @contextmanager
139 def collect_metric(
140 self,
141 property: str,
142 log: LsstLoggers | None = None,
143 msg: str | None = None,
144 args: tuple[Any, ...] = (),
145 ) -> Iterator[None]:
146 with time_this(log=log, msg=msg, args=args, level=logging.INFO) as timer:
147 yield
148 setattr(self, property, getattr(self, property) + timer.duration)
151@dataclass
152class RawFileDatasetInfo:
153 """Information about a single dataset within a raw file."""
155 dataId: DataCoordinate
156 """Data ID for this file (`lsst.daf.butler.DataCoordinate`)."""
158 obsInfo: ObservationInfo
159 """Standardized observation metadata extracted directly from the file
160 headers (`astro_metadata_translator.ObservationInfo`).
161 """
164@dataclass
165class RawFileData:
166 """Information about a single raw file, used during ingest."""
168 datasets: list[RawFileDatasetInfo]
169 """The information describing each dataset within this raw file.
170 (`list` of `RawFileDatasetInfo`)
171 """
173 filename: ResourcePath
174 """URI of the file this information was extracted from (`str`).
176 This is the path prior to ingest, not the path after ingest.
177 """
179 FormatterClass: type[Formatter]
180 """Formatter class that should be used to ingest this file (`type`; as
181 subclass of `~lsst.daf.butler.Formatter`).
182 """
184 instrument: Instrument | None
185 """The `Instrument` instance associated with this file. Can be `None`
186 if ``datasets`` is an empty list."""
189@dataclass
190class RawExposureData:
191 """Information about a complete raw exposure, used during ingest."""
193 dataId: DataCoordinate
194 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`).
195 """
197 files: list[RawFileData]
198 """List of structures containing file-level information.
199 """
201 universe: InitVar[DimensionUniverse]
202 """Set of all known dimensions.
203 """
205 record: DimensionRecord
206 """The exposure `~lsst.daf.butler.DimensionRecord` that must be inserted
207 into the `~lsst.daf.butler.Registry` prior to file-level ingest
208 (`~lsst.daf.butler.DimensionRecord`).
209 """
211 dependencyRecords: dict[str, DimensionRecord]
212 """Additional records that must be inserted into the
213 `~lsst.daf.butler.Registry` prior to ingesting the exposure ``record``
214 (e.g., to satisfy foreign key constraints), indexed by the dimension name.
215 """
218def makeTransferChoiceField(
219 doc: str = "How to transfer files (None for no transfer).", default: str = "auto"
220) -> ChoiceField:
221 """Create a Config field with options for transferring data between repos.
223 The allowed options for the field are exactly those supported by
224 `lsst.daf.butler.Datastore.ingest`.
226 Parameters
227 ----------
228 doc : `str`
229 Documentation for the configuration field.
230 default : `str`, optional
231 Default transfer mode for the field.
233 Returns
234 -------
235 field : `lsst.pex.config.ChoiceField`
236 Configuration field.
237 """
238 return ChoiceField(
239 doc=doc,
240 dtype=str,
241 allowed={
242 "move": "move",
243 "copy": "copy",
244 "auto": "choice will depend on datastore",
245 "direct": "use URI to ingested file directly in datastore",
246 "link": "hard link falling back to symbolic link",
247 "hardlink": "hard link",
248 "symlink": "symbolic (soft) link",
249 "relsymlink": "relative symbolic link",
250 },
251 optional=True,
252 default=default,
253 )
256class RawIngestConfig(Config):
257 """Configuration class for RawIngestTask."""
259 transfer = makeTransferChoiceField()
260 failFast: Field[bool] = Field(
261 dtype=bool,
262 default=False,
263 doc="If True, stop ingest as soon as any problem is encountered with any file. "
264 "Otherwise problem files will be skipped and logged and a report issued at completion.",
265 )
268class RawIngestTask(Task):
269 """Driver Task for ingesting raw data into Gen3 Butler repositories.
271 Parameters
272 ----------
273 config : `RawIngestConfig`
274 Configuration for the task.
275 butler : `~lsst.daf.butler.Butler`
276 Writeable butler instance, with ``butler.run`` set to the appropriate
277 `~lsst.daf.butler.CollectionType.RUN` collection for these raw
278 datasets.
279 on_success : `collections.abc.Callable`, optional
280 A callback invoked when all of the raws associated with an exposure
281 are ingested. Will be passed a list of `~lsst.daf.butler.FileDataset`
282 objects, each containing one or more resolved
283 `~lsst.daf.butler.DatasetRef` objects. If this callback raises it will
284 interrupt the entire ingest process, even if `RawIngestConfig.failFast`
285 is `False`.
286 on_metadata_failure : `collections.abc.Callable`, optional
287 A callback invoked when a failure occurs trying to translate the
288 metadata for a file. Will be passed the URI and the exception, in
289 that order, as positional arguments. Guaranteed to be called in an
290 ``except`` block, allowing the callback to re-raise or replace (with
291 ``raise ... from``) to override the task's usual error handling (before
292 `RawIngestConfig.failFast` logic occurs). This callback can be called
293 from within a worker thread if multiple workers have been requested.
294 Ensure that any code within the call back is thread-safe.
295 on_ingest_failure : `collections.abc.Callable`, optional
296 A callback invoked when dimension record or dataset insertion into the
297 database fails for an exposure. Will be passed a `RawExposureData`
298 instance and the exception, in that order, as positional arguments.
299 Guaranteed to be called in an ``except`` block, allowing the callback
300 to re-raise or replace (with ``raise ... from``) to override the task's
301 usual error handling (before `RawIngestConfig.failFast` logic occurs).
302 on_exposure_record : `collections.abc.Callable`, optional
303 A callback invoked when an exposure dimension record has been created
304 or modified. Will not be called if the record already existed. Will
305 be called with the exposure record.
306 **kwargs
307 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
308 constructor.
310 Notes
311 -----
312 Each instance of `RawIngestTask` writes to the same Butler. Each
313 invocation of `RawIngestTask.run` ingests a list of files.
314 """
316 ConfigClass: ClassVar[type[Config]] = RawIngestConfig
318 _DefaultName: ClassVar[str] = "ingest"
320 def getDatasetType(self) -> DatasetType:
321 """Return the default DatasetType of the datasets ingested by this
322 Task.
324 Returns
325 -------
326 datasetType : `lsst.daf.butler.DatasetType`
327 The default dataset type to use for the data being ingested. This
328 is only used if the relevant `~lsst.pipe.base.Instrument` does not
329 define an override.
330 """
331 return DatasetType(
332 "raw",
333 ("instrument", "detector", "exposure"),
334 "Exposure",
335 universe=self.butler.dimensions,
336 )
338 # Mypy can not determine that the config passed to super() is this type.
339 config: RawIngestConfig
341 def __init__(
342 self,
343 config: RawIngestConfig,
344 *,
345 butler: Butler,
346 on_success: Callable[[list[FileDataset]], Any] = _do_nothing,
347 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing,
348 on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing,
349 on_exposure_record: Callable[[DimensionRecord], Any] = _do_nothing,
350 **kwargs: Any,
351 ):
352 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
353 super().__init__(config, **kwargs)
354 self.butler = butler
355 self.universe = self.butler.dimensions
356 self.datasetType = self.getDatasetType()
357 self._on_success = on_success
358 self._on_exposure_record = on_exposure_record
359 self._on_metadata_failure = on_metadata_failure
360 self._on_ingest_failure = on_ingest_failure
361 self.progress = Progress("obs.base.RawIngestTask")
363 # Import all the instrument classes so that we ensure that we
364 # have all the relevant metadata translators loaded.
365 self.instruments = Instrument.importAll(self.butler.registry)
367 # Read all the instrument records into a cache since they will be
368 # needed later to calculate day_obs timespans, if appropriate.
369 self._instrument_records = {
370 rec.name: rec for rec in butler.registry.queryDimensionRecords("instrument")
371 }
372 # Create empty metrics.
373 self.metrics = IngestMetrics()
375 def _reduce_kwargs(self) -> dict[str, Any]:
376 # Add extra parameters to pickle.
377 return dict(
378 **super()._reduce_kwargs(),
379 butler=self.butler,
380 on_success=self._on_success,
381 on_metadata_failure=self._on_metadata_failure,
382 on_ingest_failure=self._on_ingest_failure,
383 on_exposure_record=self._on_exposure_record,
384 )
386 def _determine_instrument_formatter(
387 self, dataId: DataCoordinate, filename: ResourcePath
388 ) -> tuple[Instrument | None, type[Formatter | FormatterV2]]:
389 """Determine the instrument and formatter class.
391 Parameters
392 ----------
393 dataId : `lsst.daf.butler.DataCoordinate`
394 The dataId associated with this dataset.
395 filename : `lsst.resources.ResourcePath`
396 URI of file used for error reporting.
398 Returns
399 -------
400 instrument : `Instrument` or `None`
401 Instance of the `Instrument` associated with this dataset. `None`
402 indicates that the instrument could not be determined.
403 formatterClass : `type`
404 Class to be used as the formatter for this dataset.
406 Notes
407 -----
408 Does not access butler registry since it may be called from threads.
409 """
410 # The data model currently assumes that whilst multiple datasets
411 # can be associated with a single file, they must all share the
412 # same formatter.
413 FormatterClass: type[Formatter | FormatterV2] = Formatter
414 try:
415 instrument_name = cast(str, dataId["instrument"])
416 instrument = self.instruments[instrument_name]()
417 except LookupError as e:
418 self._on_metadata_failure(filename, e)
419 self.log.warning(
420 "Instrument %s for file %s not known to registry", dataId["instrument"], filename
421 )
422 if self.config.failFast:
423 raise RuntimeError(
424 f"Instrument {dataId['instrument']} for file {filename} not known to registry"
425 ) from e
426 # Indicate that we could not work out the instrument.
427 instrument = None
428 else:
429 assert instrument is not None, "Should be guaranted by fromName succeeding."
430 FormatterClass = instrument.getRawFormatter(dataId)
431 return instrument, FormatterClass
433 def get_raw_datasetType(
434 self, instrument: Instrument, cache: dict[str, DatasetType] | None = None
435 ) -> DatasetType:
436 """Get the raw dataset type associated with this ingest.
438 Parameters
439 ----------
440 instrument : `Instrument`
441 Class that might specify an override of the default raw dataset
442 type. If no override is specified the task default will be used.
443 cache : `dict` [`str`, `lsst.daf.butler.DatasetType`] \
444 or `None`, optional
445 An optional cache that can be used to return a pre-existing
446 dataset type. Is not updated.
448 Returns
449 -------
450 lsst.daf.butler.DatasetType
451 The dataset type to use for raw ingest of this instrument.
452 """
453 if cache is None:
454 cache = {}
455 if raw_definition := getattr(instrument, "raw_definition", None):
456 datasetTypeName, dimensions, storageClass = raw_definition
457 if not (datasetType := cache.get(datasetTypeName)):
458 datasetType = DatasetType(
459 datasetTypeName, dimensions, storageClass, universe=self.butler.dimensions
460 )
461 else:
462 datasetType = self.datasetType
463 return datasetType
465 def extractMetadata(self, filename: ResourcePath) -> RawFileData:
466 """Extract and process metadata from a single raw file.
468 Parameters
469 ----------
470 filename : `lsst.resources.ResourcePath`
471 URI to the file.
473 Returns
474 -------
475 data : `RawFileData`
476 A structure containing the metadata extracted from the file,
477 as well as the original filename. All fields will be populated,
478 but the `RawFileDatasetInfo.dataId` attribute will be a minimal
479 (unexpanded) `~lsst.daf.butler.DataCoordinate` instance. The
480 ``instrument`` field will be `None` if there is a problem
481 with metadata extraction.
483 Notes
484 -----
485 Assumes that there is a single dataset associated with the given
486 file. Instruments using a single file to store multiple datasets
487 must implement their own version of this method.
489 By default the method will catch all exceptions unless the
490 `RawIngestConfig.failFast` configuration item is `True`. If an error
491 is encountered the supplied ``on_metadata_failure()``
492 method will be called. If no exceptions result and an error was
493 encountered the returned object will have a null-instrument class and
494 no datasets.
496 This method supports sidecar JSON files which can be used to
497 extract metadata without having to read the data file itself.
498 The sidecar file is always used if found.
499 """
500 formatterClass: type[Formatter | FormatterV2]
501 sidecar_fail_msg = "" # Requires prepended space when set.
502 try:
503 sidecar_file = filename.updatedExtension(".json")
504 headers = []
505 with contextlib.suppress(Exception):
506 # Try to read directly, bypassing existence check.
507 content = json.loads(sidecar_file.read())
508 headers = [process_sidecar_data(content)]
509 sidecar_fail_msg = " (via sidecar)"
510 if not headers:
511 # Read the metadata from the data file itself.
513 # For remote files download the entire file to get the
514 # header. This is very inefficient and it would be better
515 # to have some way of knowing where in the file the headers
516 # are and to only download those parts of the file.
517 with filename.as_local() as local_file:
518 # Read the primary. This might be sufficient.
519 header = readMetadata(local_file.ospath, 0)
520 translator_class = None
522 try:
523 # Try to work out a translator class early.
524 translator_class = MetadataTranslator.determine_translator(
525 header, filename=str(filename)
526 )
527 except ValueError:
528 # Primary header was not sufficient (maybe this file
529 # has been compressed or is a MEF with minimal
530 # primary). Read second header and merge with primary.
531 header = merge_headers([header, readMetadata(local_file.ospath, 1)], mode="overwrite")
533 # Try again to work out a translator class, letting this
534 # fail.
535 if translator_class is None:
536 translator_class = MetadataTranslator.determine_translator(
537 header, filename=str(filename)
538 )
540 # Request the headers to use for ingest
541 headers = list(translator_class.determine_translatable_headers(local_file.ospath, header))
543 # Add each header to the dataset list
544 datasets = [self._calculate_dataset_info(h, filename) for h in headers]
546 except Exception as e:
547 self.log.debug("Problem extracting metadata from %s%s: %s", filename, sidecar_fail_msg, e)
548 # Indicate to the caller that we failed to read.
549 datasets = []
550 formatterClass = Formatter
551 instrument = None
552 self._on_metadata_failure(filename, e)
553 if self.config.failFast:
554 raise RuntimeError(
555 f"Problem extracting metadata for file {filename}{sidecar_fail_msg}"
556 ) from e
557 else:
558 self.log.debug("Extracted metadata for file %s%s", filename, sidecar_fail_msg)
559 # The data model currently assumes that whilst multiple datasets
560 # can be associated with a single file, they must all share the
561 # same formatter.
562 instrument, formatterClass = self._determine_instrument_formatter(datasets[0].dataId, filename)
563 if instrument is None:
564 datasets = []
566 return RawFileData(
567 datasets=datasets,
568 filename=filename,
569 # MyPy wants this to be a non-abstract class, which is not true
570 # for the error case where instrument is None and datasets=[].
571 FormatterClass=formatterClass, # type: ignore
572 instrument=instrument,
573 )
575 @classmethod
576 def getObservationInfoSubsets(cls) -> tuple[set, set]:
577 """Return subsets of fields in the
578 `~astro_metadata_translator.ObservationInfo` that we care about.
580 These fields will be used in constructing an exposure record.
582 Returns
583 -------
584 required : `set`
585 Set of `~astro_metadata_translator.ObservationInfo` field names
586 that are required.
587 optional : `set`
588 Set of `~astro_metadata_translator.ObservationInfo` field names
589 we will use if they are available.
590 """
591 # Marking the new properties "group_counter_*" and
592 # "has_simulated_content" as required, assumes that we either
593 # recreate any existing index/sidecar files that include translated
594 # values, or else allow astro_metadata_translator to fill in
595 # defaults.
596 required = {
597 "datetime_begin",
598 "datetime_end",
599 "detector_num",
600 "exposure_group",
601 "exposure_id",
602 "exposure_time_requested",
603 "group_counter_end",
604 "group_counter_start",
605 "has_simulated_content",
606 "instrument",
607 "observation_id",
608 "observation_type",
609 "observing_day",
610 "physical_filter",
611 }
612 optional = {
613 "altaz_begin",
614 "boresight_rotation_coord",
615 "boresight_rotation_angle",
616 "dark_time",
617 "tracking_radec",
618 "object",
619 "observation_counter",
620 "observation_reason",
621 "observing_day_offset",
622 "science_program",
623 "visit_id",
624 "can_see_sky",
625 }
626 return required, optional
628 def _calculate_dataset_info(
629 self, header: MutableMapping[str, Any] | ObservationInfo, filename: ResourcePath
630 ) -> RawFileDatasetInfo:
631 """Calculate a RawFileDatasetInfo from the supplied information.
633 Parameters
634 ----------
635 header : Mapping or `astro_metadata_translator.ObservationInfo`
636 Header from the dataset or previously-translated content.
637 filename : `lsst.resources.ResourcePath`
638 Filename to use for error messages.
640 Returns
641 -------
642 dataset : `RawFileDatasetInfo`
643 The dataId, and observation information associated with this
644 dataset.
645 """
646 required, optional = self.getObservationInfoSubsets()
647 if isinstance(header, ObservationInfo):
648 obsInfo = header
649 missing = []
650 # Need to check the required properties are present.
651 for property in required:
652 # getattr does not need to be protected because it is using
653 # the defined list above containing properties that must exist.
654 value = getattr(obsInfo, property)
655 if value is None:
656 missing.append(property)
657 if missing:
658 raise ValueError(
659 f"Requested required properties are missing from file {filename}: {missing} (via JSON)"
660 )
662 else:
663 obsInfo = ObservationInfo(
664 header,
665 pedantic=False,
666 filename=str(filename),
667 required=required,
668 subset=required | optional,
669 )
671 dataId = DataCoordinate.standardize(
672 instrument=obsInfo.instrument,
673 exposure=obsInfo.exposure_id,
674 detector=obsInfo.detector_num,
675 universe=self.universe,
676 )
677 return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId)
679 def readZipIndexFiles(
680 self, files: Iterable[ResourcePath]
681 ) -> tuple[list[RawFileData], list[ResourcePath], set[ResourcePath], set[ResourcePath]]:
682 """Given a list of files, filter out zip files and look for index files
683 inside.
685 Parameters
686 ----------
687 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ]
688 URIs to the files to be ingested.
690 Returns
691 -------
692 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ]
693 Merged contents of all relevant index files found in zip files.
694 The keys include the path to the data file within the zip using
695 the butler fragment convention of ``zip-path=PATH``.
696 updated_files : `list` [ `lsst.resources.ResourcePath` ]
697 Updated list of the input files with zip entries removed.
698 good_index_files: `set` [ `lsst.resources.ResourcePath` ]
699 Zip files that contained index information.
700 bad_zip_files: `set` [ `lsst.resources.ResourcePath` ]
701 Zip files that contained no index information.
702 """
703 zip_metadata_index = "_metadata_index.json"
705 # Files that weren't zip files.
706 updated_files: list[ResourcePath] = []
708 # Index files we failed to read.
709 bad_index_files: set[ResourcePath] = set()
711 # Any good index files that were found and used.
712 good_index_files: set[ResourcePath] = set()
714 # Processed content from any zip files.
715 indexFileData: list[RawFileData] = []
717 for file in files:
718 if file.getExtension() != ".zip":
719 updated_files.append(file)
720 continue
722 zip_info: dict[str, zipfile.ZipInfo] = {}
723 try:
724 with file.open("rb") as fd, zipfile.ZipFile(fd) as zf:
725 zip_info = {info.filename: info for info in zf.infolist()}
726 content = json.loads(zf.read(zip_metadata_index))
727 index = process_index_data(content, force_dict=True)
728 assert isinstance(index, MutableMapping)
730 # Try to read the ZipIndex.
731 zip_index = ZipIndex.from_open_zip(zf)
732 except Exception as e:
733 if self.config.failFast:
734 raise RuntimeError(f"Problem reading index file from zip file at {file}") from e
735 bad_index_files.add(file)
736 continue
737 self.log.debug("Extracted index metadata from zip file %s", str(file))
738 good_index_files.add(file)
740 # All the metadata read from this index file with keys of full
741 # path.
742 index_entries: dict[ResourcePath, Any] = {}
744 # In theory we could scan for JSON sidecar files associated with
745 # any files not found in the metadata index, but that is not meant
746 # to be possible. Guider data is another issue not handled by
747 # this code.
748 for path_in_zip in index:
749 if path_in_zip not in zip_info:
750 # Index entry exists but no file for it.
751 self.log.info(
752 "File %s is in zip index but not in zip file %s. Ignoring.", path_in_zip, file
753 )
754 continue
755 file_to_ingest = file.replace(fragment=f"zip-path={path_in_zip}")
756 index_entries[file_to_ingest] = index[path_in_zip]
758 file_data = self.processIndexEntries(index_entries)
760 # Validate that the index entries we have read match the
761 # values in the butler zip index.
762 data_ids_from_index: dict[str, tuple[DataCoordinate, ...]] = {}
763 for f in file_data:
764 _, path_in_zip = f.filename.fragment.split("=")
765 data_ids_from_index[path_in_zip] = tuple(d.dataId for d in f.datasets)
767 data_ids_from_butler_index: dict[str, tuple[DataCoordinate, ...]] = {}
768 # Refs indexed by UUID.
769 refs = zip_index.refs.to_refs(universe=self.universe)
770 id_to_ref = {ref.id: ref for ref in refs}
771 for path_in_zip, index_info in zip_index.artifact_map.items():
772 data_ids_from_butler_index[path_in_zip] = tuple(
773 id_to_ref[id_].dataId for id_ in index_info.ids
774 )
776 if data_ids_from_butler_index != data_ids_from_index:
777 self.log.warning(
778 "Recalculating the Data IDs for zip file %s (which may include new metadata corrections) "
779 "results in a difference to the Data IDs recorded in the butler index in that zip. "
780 "Consider remaking the zipped raws.",
781 file,
782 )
784 indexFileData.extend(file_data)
786 return indexFileData, updated_files, good_index_files, bad_index_files
788 def locateAndReadIndexFiles(
789 self, files: Iterable[ResourcePath]
790 ) -> tuple[dict[ResourcePath, Any], list[ResourcePath], set[ResourcePath], set[ResourcePath]]:
791 """Given a list of files, look for index files and read them.
793 Index files can either be explicitly in the list of files to
794 ingest, or else located in the same directory as a file to ingest.
795 Index entries are always used if present.
797 Parameters
798 ----------
799 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ]
800 URIs to the files to be ingested.
802 Returns
803 -------
804 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ]
805 Merged contents of all relevant index files found. These can
806 be explicitly specified index files or ones found in the
807 directory alongside a data file to be ingested.
808 updated_files : `list` [ `lsst.resources.ResourcePath` ]
809 Updated list of the input files with entries removed that were
810 found listed in an index file. Order is not guaranteed to
811 match the order of the files given to this routine.
812 good_index_files: `set` [ `lsst.resources.ResourcePath` ]
813 Index files that were successfully read.
814 bad_index_files: `set` [ `lsst.resources.ResourcePath` ]
815 Files that looked like index files but failed to read properly.
816 """
817 # Convert the paths to absolute for easy comparison with index content.
818 # Do not convert to real paths since we have to assume that index
819 # files are in this location and not the location which it links to.
820 files = tuple(f.abspath() for f in files)
822 # Index files must be named this.
823 index_root_file = "_index.json"
825 # Group the files by directory.
826 files_by_directory: dict[ResourcePath, set[str]] = defaultdict(set)
828 for path in files:
829 directory, file_in_dir = path.split()
830 files_by_directory[directory].add(file_in_dir)
832 # All the metadata read from index files with keys of full path.
833 index_entries: dict[ResourcePath, Any] = {}
835 # Index files we failed to read.
836 bad_index_files: set[ResourcePath] = set()
838 # Any good index files that were found and used.
839 good_index_files: set[ResourcePath] = set()
841 # Look for index files in those directories.
842 for directory, files_in_directory in files_by_directory.items():
843 possible_index_file = directory.join(index_root_file)
844 if possible_index_file.exists():
845 # If we are explicitly requesting an index file the
846 # messages should be different.
847 index_msg = "inferred"
848 is_implied = True
849 if index_root_file in files_in_directory:
850 index_msg = "explicit"
851 is_implied = False
853 # Try to read the index file and catch and report any
854 # problems.
855 try:
856 content = json.loads(possible_index_file.read())
857 index = process_index_data(content, force_dict=True)
858 # mypy should in theory know that this is a mapping
859 # from the overload type annotation of process_index_data.
860 assert isinstance(index, MutableMapping)
861 except Exception as e:
862 # Only trigger the callback if the index file
863 # was asked for explicitly. Triggering on implied file
864 # might be surprising.
865 if not is_implied:
866 self._on_metadata_failure(possible_index_file, e)
867 if self.config.failFast:
868 raise RuntimeError(
869 f"Problem reading index file from {index_msg} location {possible_index_file}"
870 ) from e
871 bad_index_files.add(possible_index_file)
872 continue
874 self.log.debug("Extracted index metadata from %s file %s", index_msg, possible_index_file)
875 good_index_files.add(possible_index_file)
877 # Go through the index adding entries for files.
878 # If we have non-index files in this directory marked for
879 # ingest we should only get index information for those.
880 # If the index file was explicit we use all entries.
881 if is_implied:
882 files_to_ingest = files_in_directory
883 else:
884 files_to_ingest = set(index)
886 # Copy relevant metadata into a single dict for all index
887 # entries.
888 for file_in_dir in files_to_ingest:
889 # Skip an explicitly specified index file.
890 # This should never happen because an explicit index
891 # file will force ingest of all files in the index
892 # and not use the explicit file list. If somehow
893 # this is not true we continue. Raising an exception
894 # seems like the wrong thing to do since this is harmless.
895 if file_in_dir == index_root_file:
896 self.log.info(
897 "Logic error found scanning directory %s. Please file ticket.", directory
898 )
899 continue
900 if file_in_dir in index:
901 file = directory.join(file_in_dir)
902 if file in index_entries:
903 # ObservationInfo overrides raw metadata
904 if isinstance(index[file_in_dir], ObservationInfo) and not isinstance(
905 index_entries[file], ObservationInfo
906 ):
907 self.log.warning(
908 "File %s already specified in an index file but overriding"
909 " with ObservationInfo content from %s",
910 file,
911 possible_index_file,
912 )
913 else:
914 self.log.warning(
915 "File %s already specified in an index file, ignoring content from %s",
916 file,
917 possible_index_file,
918 )
919 # Do nothing in this case
920 continue
922 index_entries[file] = index[file_in_dir]
924 # Remove files from list that have index entries and also
925 # any files that we determined to be explicit index files
926 # or any index files that we failed to read.
927 filtered = set(files) - set(index_entries) - good_index_files - bad_index_files
929 # The filtered list loses the initial order. Retaining the order
930 # is good for testing but does have a cost if there are many
931 # files when copying the good values out. A dict would have faster
932 # lookups (using the files as keys) but use more memory.
933 ordered: list[ResourcePath] = []
934 seen: set[ResourcePath] = set()
935 for f in files:
936 if f in filtered and f not in seen:
937 ordered.append(f)
938 seen.add(f)
940 return index_entries, ordered, good_index_files, bad_index_files
942 def processIndexEntries(self, index_entries: dict[ResourcePath, Any]) -> list[RawFileData]:
943 """Convert index entries to RawFileData.
945 Parameters
946 ----------
947 index_entries : `dict` [`lsst.resources.ResourcePath`, `typing.Any`]
948 Dict indexed by name of file to ingest and with keys either
949 raw metadata or translated
950 `~astro_metadata_translator.ObservationInfo`.
952 Returns
953 -------
954 data : `list` [ `RawFileData` ]
955 Structures containing the metadata extracted from the file,
956 as well as the original filename. All fields will be populated,
957 but the `RawFileDatasetInfo.dataId` attributes will be minimal
958 (unexpanded) `~lsst.daf.butler.DataCoordinate` instances.
959 """
960 formatterClass: type[Formatter | FormatterV2]
961 fileData = []
962 for filename, metadata in index_entries.items():
963 try:
964 datasets = [self._calculate_dataset_info(metadata, filename)]
965 except Exception as e:
966 self.log.debug("Problem extracting metadata for file %s found in index file: %s", filename, e)
967 datasets = []
968 formatterClass = Formatter
969 instrument = None
970 self._on_metadata_failure(filename, e)
971 if self.config.failFast:
972 raise RuntimeError(
973 f"Problem extracting metadata for file {filename} found in index file"
974 ) from e
975 else:
976 instrument, formatterClass = self._determine_instrument_formatter(
977 datasets[0].dataId, filename
978 )
979 if instrument is None:
980 datasets = []
981 fileData.append(
982 RawFileData(
983 datasets=datasets,
984 filename=filename,
985 # MyPy wants this to be a non-abstract class, which is not
986 # true for the error case where instrument is None and
987 # datasets=[].
988 FormatterClass=formatterClass, # type: ignore
989 instrument=instrument,
990 )
991 )
992 return fileData
994 def groupByExposure(self, files: Iterable[RawFileData]) -> list[RawExposureData]:
995 """Group an iterable of `RawFileData` by exposure.
997 Parameters
998 ----------
999 files : iterable of `RawFileData`
1000 File-level information to group.
1002 Returns
1003 -------
1004 exposures : `list` of `RawExposureData`
1005 A list of structures that group the file-level information by
1006 exposure. All fields will be populated. The
1007 `RawExposureData.dataId` attributes will be minimal (unexpanded)
1008 `~lsst.daf.butler.DataCoordinate` instances.
1009 """
1010 exposureDimensions = self.universe["exposure"].minimal_group
1011 byExposure = defaultdict(list)
1012 for f in files:
1013 # Assume that the first dataset is representative for the file.
1014 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f)
1016 return [
1017 RawExposureData(
1018 dataId=dataId,
1019 files=exposureFiles,
1020 universe=self.universe,
1021 record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe),
1022 dependencyRecords=self.makeDependencyRecords(
1023 exposureFiles[0].datasets[0].obsInfo, self.universe
1024 ),
1025 )
1026 for dataId, exposureFiles in byExposure.items()
1027 ]
1029 def makeExposureRecord(
1030 self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any
1031 ) -> DimensionRecord:
1032 """Construct a registry record for an exposure.
1034 This is a method that subclasses will often want to customize. This can
1035 often be done by calling this base class implementation with additional
1036 ``kwargs``.
1038 Parameters
1039 ----------
1040 obsInfo : `~astro_metadata_translator.ObservationInfo`
1041 Observation details for (one of the components of) the exposure.
1042 universe : `lsst.daf.butler.DimensionUniverse`
1043 Set of all known dimensions.
1044 **kwargs
1045 Additional field values for this record.
1047 Returns
1048 -------
1049 record : `lsst.daf.butler.DimensionRecord`
1050 The exposure record that must be inserted into the
1051 `~lsst.daf.butler.Registry` prior to file-level ingest.
1052 """
1053 return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs)
1055 def makeDependencyRecords(
1056 self, obsInfo: ObservationInfo, universe: DimensionUniverse
1057 ) -> dict[str, DimensionRecord]:
1058 """Construct dependency records.
1060 These dependency records will be inserted into the
1061 `~lsst.daf.butler.Registry` before the exposure records, because they
1062 are dependencies of the exposure. This allows an opportunity to satisfy
1063 foreign key constraints that exist because of dimensions related to the
1064 exposure.
1066 This is a method that subclasses may want to customize, if they've
1067 added dimensions that relate to an exposure.
1069 Parameters
1070 ----------
1071 obsInfo : `~astro_metadata_translator.ObservationInfo`
1072 Observation details for (one of the components of) the exposure.
1073 universe : `lsst.daf.butler.DimensionUniverse`
1074 Set of all known dimensions.
1076 Returns
1077 -------
1078 records : `dict` [`str`, `lsst.daf.butler.DimensionRecord`]
1079 The records to insert, indexed by dimension name.
1080 """
1081 records: dict[str, DimensionRecord] = {}
1082 if "exposure" not in universe:
1083 return records
1084 exposure = universe["exposure"]
1085 if "group" in exposure.implied:
1086 records["group"] = universe["group"].RecordClass(
1087 name=obsInfo.exposure_group,
1088 instrument=obsInfo.instrument,
1089 )
1090 if "day_obs" in exposure.implied:
1091 if (offset := getattr(obsInfo, "observing_day_offset")) is not None:
1092 offset_int = round(offset.to_value("s"))
1093 assert obsInfo.observing_day is not None
1094 timespan = Timespan.from_day_obs(obsInfo.observing_day, offset_int)
1095 else:
1096 timespan = None
1097 records["day_obs"] = universe["day_obs"].RecordClass(
1098 instrument=obsInfo.instrument,
1099 id=obsInfo.observing_day,
1100 timespan=timespan,
1101 )
1102 return records
1104 def expandDataIds(self, data: RawExposureData) -> RawExposureData:
1105 """Expand the data IDs associated with a raw exposure.
1107 This adds the metadata records.
1109 Parameters
1110 ----------
1111 data : `RawExposureData`
1112 A structure containing information about the exposure to be
1113 ingested. Must have `RawExposureData.record` populated. Should
1114 be considered consumed upon return.
1116 Returns
1117 -------
1118 exposure : `RawExposureData`
1119 An updated version of the input structure, with
1120 `RawExposureData.dataId` and nested `RawFileDatasetInfo.dataId`
1121 attributes updated to data IDs for which
1122 `~lsst.daf.butler.DataCoordinate.hasRecords` returns `True`.
1123 """
1124 # We start by expanded the exposure-level data ID; we won't use that
1125 # directly in file ingest, but this lets us do some database lookups
1126 # once per exposure instead of once per file later.
1127 data.dataId = self.butler.registry.expandDataId(
1128 data.dataId,
1129 # We pass in the records we'll be inserting shortly so they aren't
1130 # looked up from the database. We do expect instrument and filter
1131 # records to be retrieved from the database here (though the
1132 # Registry may cache them so there isn't a lookup every time).
1133 records={"exposure": data.record, **data.dependencyRecords},
1134 )
1135 # Now we expand the per-file (exposure+detector) data IDs. This time
1136 # we pass in the records we just retrieved from the exposure data ID
1137 # expansion.
1138 for file in data.files:
1139 for dataset in file.datasets:
1140 dataset.dataId = self.butler.registry.expandDataId(
1141 dataset.dataId,
1142 records={k: data.dataId.records[k] for k in data.dataId.dimensions.elements},
1143 )
1144 return data
1146 def prep(
1147 self,
1148 files: Iterable[ResourcePath],
1149 *,
1150 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1151 search_indexes: bool = True,
1152 ) -> tuple[Iterator[RawExposureData], list[ResourcePath]]:
1153 """Perform all non-database-updating ingest preprocessing steps.
1155 Parameters
1156 ----------
1157 files : iterable over `str` or path-like objects
1158 Paths to the files to be ingested. Will be made absolute
1159 if they are not already.
1160 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1161 If not `None`, a thread pool with which to parallelize some
1162 operations.
1163 search_indexes : `bool`, optional
1164 If `True` the code will search for index JSON files in given
1165 directories. If you know for a fact that index files do not exist
1166 set this to `False` for a slight speed up in metadata gathering.
1168 Returns
1169 -------
1170 exposures : `~collections.abc.Iterator` [ `RawExposureData` ]
1171 Data structures containing dimension records, filenames, and data
1172 IDs to be ingested (one structure for each exposure).
1173 bad_files : `list` of `str`
1174 List of all the files that could not have metadata extracted.
1175 """
1177 def _partition_good_bad(
1178 file_data: Iterable[RawFileData],
1179 ) -> tuple[list[RawFileData], list[ResourcePath]]:
1180 """Filter out bad files and return good with list of bad."""
1181 good_files = []
1182 bad_files = []
1183 for fileDatum in self.progress.wrap(file_data, desc="Reading image metadata"):
1184 if not fileDatum.datasets:
1185 bad_files.append(fileDatum.filename)
1186 else:
1187 good_files.append(fileDatum)
1188 return good_files, bad_files
1190 # Look for zip files.
1191 zip_file_data, files, good_zip_files, bad_zip_files = self.readZipIndexFiles(files)
1192 if bad_zip_files:
1193 self.log.info("Failed to extract index metadata from the following zip files:")
1194 for bad in sorted(bad_zip_files):
1195 self.log.info("- %s", bad)
1197 # Look for index files and read them.
1198 # There should be far fewer index files than data files.
1199 index_entries: dict[ResourcePath, Any] = {}
1200 if search_indexes:
1201 index_entries, files, good_index_files, bad_index_files = self.locateAndReadIndexFiles(files)
1202 if bad_index_files:
1203 self.log.info("Failed to read the following explicitly requested index files:")
1204 for bad in sorted(bad_index_files):
1205 self.log.info("- %s", bad)
1206 else:
1207 # We have been told explicitly there are no indexes.
1208 index_entries = {}
1209 good_index_files = set()
1210 bad_index_files = set()
1212 # Merge information from zips and standalone index files.
1213 good_index_files.update(good_zip_files)
1214 bad_index_files.update(bad_zip_files)
1216 # Now convert all the index file entries to standard form for ingest.
1217 processed_bad_index_files: list[ResourcePath] = []
1218 indexFileData = self.processIndexEntries(index_entries)
1219 indexFileData.extend(zip_file_data)
1220 if indexFileData:
1221 indexFileData, processed_bad_index_files = _partition_good_bad(indexFileData)
1222 self.log.info(
1223 "Successfully extracted metadata for %d file%s found in %d index file%s with %d failure%s",
1224 *_log_msg_counter(indexFileData),
1225 *_log_msg_counter(good_index_files),
1226 *_log_msg_counter(processed_bad_index_files),
1227 )
1229 # Extract metadata and build per-detector regions.
1230 # This could run in threads or a subprocess so collect all output
1231 # before looking at failures.
1232 fileData: Iterator[RawFileData]
1233 if pool is None:
1234 fileData = map(self.extractMetadata, files)
1235 else:
1236 fileData = pool.map(self.extractMetadata, files)
1238 # Filter out all the failed reads and store them for later
1239 # reporting.
1240 good_file_data, bad_files = _partition_good_bad(fileData)
1241 # Only report if we looked at any standalone files at all.
1242 if files:
1243 self.log.info(
1244 "Successfully extracted metadata from %d file%s with %d failure%s",
1245 *_log_msg_counter(good_file_data),
1246 *_log_msg_counter(bad_files),
1247 )
1249 # Combine with data from index files.
1250 good_file_data.extend(indexFileData)
1251 bad_files.extend(processed_bad_index_files)
1252 bad_files.extend(bad_index_files)
1254 # Use that metadata to group files (and extracted metadata) by
1255 # exposure. Never parallelized because it's intrinsically a gather
1256 # step.
1257 exposureData: list[RawExposureData] = self.groupByExposure(good_file_data)
1259 # The next operation operates on RawExposureData instances (one at
1260 # a time) in-place and then returns the modified instance. We call it
1261 # as a pass-through instead of relying on the arguments we pass in to
1262 # have been modified because in the parallel case those arguments are
1263 # going to be pickled and unpickled, and I'm not certain
1264 # multiprocessing is careful enough with that for output arguments to
1265 # work.
1267 # Expand the data IDs to include all dimension metadata; we need this
1268 # because we may need to generate path templates that rely on that
1269 # metadata.
1270 # This is the first step that involves actual database calls (but just
1271 # SELECTs), so if there's going to be a problem with connections vs.
1272 # multiple processes, or lock contention (in SQLite) slowing things
1273 # down, it'll happen here.
1274 return map(self.expandDataIds, exposureData), bad_files
1276 def ingestExposureDatasets(
1277 self,
1278 exposure: RawExposureData,
1279 datasetType: DatasetType,
1280 *,
1281 run: str,
1282 skip_existing_exposures: bool = False,
1283 track_file_attrs: bool = True,
1284 ) -> list[FileDataset]:
1285 """Ingest all raw files in one exposure.
1287 Parameters
1288 ----------
1289 exposure : `RawExposureData`
1290 A structure containing information about the exposure to be
1291 ingested. Must have `RawExposureData.record` populated and all
1292 data ID attributes expanded.
1293 datasetType : `lsst.daf.butler.DatasetType`
1294 The dataset type associated with this exposure.
1295 run : `str`
1296 Name of a RUN-type collection to write to.
1297 skip_existing_exposures : `bool`, optional
1298 If `True` (`False` is default), skip raws that have already been
1299 ingested (i.e. raws for which we already have a dataset with the
1300 same data ID in the target collection, even if from another file).
1301 Note that this is much slower than just not passing
1302 already-ingested files as inputs, because we still need to read and
1303 process metadata to identify which exposures to search for.
1304 track_file_attrs : `bool`, optional
1305 Control whether file attributes such as the size or checksum should
1306 be tracked by the datastore. Whether this parameter is honored
1307 depends on the specific datastore implementation.
1309 Returns
1310 -------
1311 datasets : `list` of `lsst.daf.butler.FileDataset`
1312 Per-file structures identifying the files ingested and their
1313 dataset representation in the data repository.
1314 """
1315 # Raw files are preferentially ingested using a UUID derived from
1316 # the collection name and dataId.
1317 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN):
1318 mode = DatasetIdGenEnum.DATAID_TYPE_RUN
1319 else:
1320 mode = DatasetIdGenEnum.UNIQUE
1322 # The datasets for this exposure could all be from a single zip
1323 # or be distinct files. Need to pull out the zip files.
1324 zips: dict[ResourcePath, list[RawFileData]] = defaultdict(list)
1325 datasets: list[FileDataset] = []
1326 for file in exposure.files:
1327 if file.filename.getExtension() == ".zip":
1328 zips[file.filename.replace(fragment="")].append(file)
1329 continue
1331 refs = [
1332 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets
1333 ]
1334 if refs:
1335 datasets.append(
1336 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass)
1337 )
1339 if datasets:
1340 with self.butler.record_metrics() as butler_metrics:
1341 self.butler.ingest(
1342 *datasets,
1343 transfer=self.config.transfer,
1344 record_validation_info=track_file_attrs,
1345 skip_existing=skip_existing_exposures,
1346 )
1347 self.metrics.time_for_ingest += butler_metrics.time_in_ingest
1349 # In theory it is possible for the new Data IDs to differ from the Data
1350 # IDs stored in the Zip index. That could happen if there is a metadata
1351 # correction that changes the exposure or detector numbers. We have to
1352 # assume that by the time the zip has been made that this correction
1353 # has been applied. If we don't assume that then we have to
1354 # regenerate the index but we cannot change the contents of the zip.
1355 # We would also need the ability for butler.ingest_zip to take an
1356 # override ZipIndex object.
1357 # The Dataset ref IDs will only change if the data IDs change.
1358 for zip, files in zips.items():
1359 zip_datasets: list[FileDataset] = [] # Needed for return value.
1360 for file in files:
1361 refs = [
1362 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets
1363 ]
1364 if refs:
1365 # Assumes the guiders are not included in the metadata
1366 # index.
1367 zip_datasets.append(
1368 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass)
1369 )
1370 with self.butler.record_metrics() as butler_metrics:
1371 self.butler.ingest_zip(
1372 zip, transfer=self.config.transfer, skip_existing=skip_existing_exposures
1373 )
1374 datasets.extend(zip_datasets)
1375 self.metrics.time_for_ingest += butler_metrics.time_in_ingest
1377 return datasets
1379 def ingestFiles(
1380 self,
1381 files: Sequence[ResourcePath],
1382 *,
1383 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1384 num_workers: int = 1,
1385 run: str | None = None,
1386 skip_existing_exposures: bool = False,
1387 update_exposure_records: bool = False,
1388 track_file_attrs: bool = True,
1389 search_indexes: bool = True,
1390 skip_ingest: bool = False,
1391 ) -> tuple[list[DatasetRef], list[ResourcePath], int, int, int]:
1392 """Ingest files into a Butler data repository.
1394 This creates any new exposure or visit Dimension entries needed to
1395 identify the ingested files, creates new Dataset entries in the
1396 Registry and finally ingests the files themselves into the Datastore.
1397 Any needed instrument, detector, and physical_filter Dimension entries
1398 must exist in the Registry before `run` is called.
1400 Parameters
1401 ----------
1402 files : iterable over `lsst.resources.ResourcePath`
1403 URIs to the files to be ingested.
1404 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1405 If not `None`, a thread pool with which to parallelize some
1406 operations.
1407 num_workers : `int`, optional
1408 The number of workers to use. Ignored if ``pool`` is not `None`.
1409 run : `str`, optional
1410 Name of a RUN-type collection to write to, overriding
1411 the default derived from the instrument name.
1412 skip_existing_exposures : `bool`, optional
1413 If `True` (`False` is default), skip raws that have already been
1414 ingested (i.e. raws for which we already have a dataset with the
1415 same data ID in the target collection, even if from another file).
1416 Note that this is much slower than just not passing
1417 already-ingested files as inputs, because we still need to read and
1418 process metadata to identify which exposures to search for. It
1419 also will not work reliably if multiple processes are attempting to
1420 ingest raws from the same exposure concurrently, in that different
1421 processes may still attempt to ingest the same raw and conflict,
1422 causing a failure that prevents other raws from the same exposure
1423 from being ingested.
1424 update_exposure_records : `bool`, optional
1425 If `True` (`False` is default), update existing exposure records
1426 that conflict with the new ones instead of rejecting them. THIS IS
1427 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS
1428 KNOWN TO BE BAD. This should usually be combined with
1429 ``skip_existing_exposures=True``.
1430 track_file_attrs : `bool`, optional
1431 Control whether file attributes such as the size or checksum should
1432 be tracked by the datastore. Whether this parameter is honored
1433 depends on the specific datastore implementation.
1434 search_indexes : `bool`, optional
1435 If `True` the code will search for index JSON files in given
1436 directories. If you know for a fact that index files do not exist
1437 set this to `False` for a slight speed up in metadata gathering.
1438 skip_ingest : `bool`, optional
1439 Set this to `True` to do metadata extraction and dimension record
1440 updates without attempting to re-ingest. This can be useful if
1441 there has been a metadata correction associated with an exposure.
1443 Returns
1444 -------
1445 refs : `list` of `lsst.daf.butler.DatasetRef`
1446 Dataset references for ingested raws.
1447 bad_files : `list` of `lsst.resources.ResourcePath`
1448 Given paths that could not be ingested.
1449 n_exposures : `int`
1450 Number of exposures successfully ingested.
1451 n_exposures_failed : `int`
1452 Number of exposures that failed when inserting dimension data.
1453 n_ingests_failed : `int`
1454 Number of exposures that failed when ingesting raw datasets.
1455 """
1456 created_pool = False
1457 if pool is None and num_workers > 1:
1458 pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)
1459 created_pool = True
1461 try:
1462 with self.metrics.collect_metric(
1463 "time_for_metadata",
1464 self.log,
1465 msg="Reading metadata from %d file%s",
1466 args=(*_log_msg_counter(files),),
1467 ):
1468 exposureData, bad_files = self.prep(files, pool=pool, search_indexes=search_indexes)
1469 finally:
1470 if created_pool and pool:
1471 # The pool is not needed any more so close it if we created
1472 # it to ensure we clean up resources.
1473 pool.shutdown(wait=True)
1475 # Up to this point, we haven't modified the data repository at all.
1476 # Now we finally do that, with one transaction per exposure. This is
1477 # not parallelized at present because the performance of this step is
1478 # limited by the database server. That may or may not change in the
1479 # future once we increase our usage of bulk inserts and reduce our
1480 # usage of savepoints; we've tried to get everything but the database
1481 # operations done in advance to reduce the time spent inside
1482 # transactions.
1483 refs = []
1484 runs = set()
1485 datasetTypes: dict[str, DatasetType] = {}
1486 n_exposures = 0
1487 n_exposures_failed = 0
1488 n_ingests_failed = 0
1489 for exposure in self.progress.wrap(exposureData, desc="Ingesting raw exposures"):
1490 assert exposure.record is not None, "Should be guaranteed by prep()"
1491 self.log.debug(
1492 "Attempting to ingest %d file%s from exposure %s:%s",
1493 *_log_msg_counter(exposure.files),
1494 exposure.record.instrument,
1495 exposure.record.obs_id,
1496 )
1498 try:
1499 with self.metrics.collect_metric(
1500 "time_for_records",
1501 self.log,
1502 msg="Creating dimension records for instrument %s, exposure %s",
1503 args=(
1504 str(exposure.record.instrument),
1505 str(exposure.record.id),
1506 ),
1507 ):
1508 for name, record in exposure.dependencyRecords.items():
1509 self.butler.registry.syncDimensionData(name, record, update=update_exposure_records)
1510 inserted_or_updated = self.butler.registry.syncDimensionData(
1511 "exposure",
1512 exposure.record,
1513 update=update_exposure_records,
1514 )
1515 if inserted_or_updated is not False:
1516 with self.metrics.collect_metric(
1517 "time_for_callbacks", log=self.log, msg="Exposure record updated. Calling handler"
1518 ):
1519 self._on_exposure_record(exposure.record)
1520 except Exception as e:
1521 self._on_ingest_failure(exposure, e)
1522 n_exposures_failed += 1
1523 self.log.warning(
1524 "Exposure %s:%s could not be registered: %s",
1525 exposure.record.instrument,
1526 exposure.record.obs_id,
1527 e,
1528 )
1529 if self.config.failFast:
1530 raise e
1531 continue
1533 if isinstance(inserted_or_updated, dict):
1534 # Exposure is in the registry and we updated it, so
1535 # syncDimensionData returned a dict.
1536 columns_updated = list(inserted_or_updated.keys())
1537 s_col = "s" if len(columns_updated) != 1 else ""
1538 w_col = "were" if len(columns_updated) != 1 else "was"
1539 self.log.info(
1540 "Exposure %s:%s was already present, but column%s %s %s updated.",
1541 exposure.record.instrument,
1542 exposure.record.obs_id,
1543 s_col,
1544 ", ".join(repr(c) for c in columns_updated),
1545 w_col,
1546 )
1548 if skip_ingest:
1549 continue
1551 # Determine the instrument so we can work out the dataset type.
1552 instrument = exposure.files[0].instrument
1553 assert instrument is not None, (
1554 "file should have been removed from this list by prep if instrument could not be found"
1555 )
1557 datasetType = self.get_raw_datasetType(instrument, datasetTypes)
1558 if datasetType.name not in datasetTypes:
1559 self.butler.registry.registerDatasetType(datasetType)
1560 datasetTypes[datasetType.name] = datasetType
1562 # Override default run if nothing specified explicitly.
1563 if run is None:
1564 this_run = instrument.makeDefaultRawIngestRunName()
1565 else:
1566 this_run = run
1567 if this_run not in runs:
1568 self.butler.registry.registerCollection(this_run, type=CollectionType.RUN)
1569 runs.add(this_run)
1570 try:
1571 datasets_for_exposure = self.ingestExposureDatasets(
1572 exposure,
1573 datasetType=datasetType,
1574 run=this_run,
1575 skip_existing_exposures=skip_existing_exposures,
1576 track_file_attrs=track_file_attrs,
1577 )
1578 except Exception as e:
1579 self._on_ingest_failure(exposure, e)
1580 n_ingests_failed += 1
1581 self.log.warning("Failed to ingest the following for reason: %s", e)
1582 for f in exposure.files:
1583 self.log.warning("- %s", f.filename)
1584 if self.config.failFast:
1585 raise e
1586 continue
1587 else:
1588 with self.metrics.collect_metric("time_for_callbacks", self.log, msg="Calling on_success"):
1589 self._on_success(datasets_for_exposure)
1590 for dataset in datasets_for_exposure:
1591 refs.extend(dataset.refs)
1593 # Success for this exposure.
1594 n_exposures += 1
1595 self.log.info(
1596 "Exposure %s:%s ingested successfully", exposure.record.instrument, exposure.record.obs_id
1597 )
1599 return refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed
1601 @timeMethod
1602 def run(
1603 self,
1604 files: Iterable[ResourcePathExpression],
1605 *,
1606 pool: concurrent.futures.ThreadPoolExecutor | None = None,
1607 processes: int | None = None, # Deprecated. Use num_workers.
1608 run: str | None = None,
1609 file_filter: str | re.Pattern = r"\.fit[s]?\b",
1610 group_files: bool = True,
1611 skip_existing_exposures: bool = False,
1612 update_exposure_records: bool = False,
1613 track_file_attrs: bool = True,
1614 search_indexes: bool = True,
1615 num_workers: int = 1,
1616 skip_ingest: bool = False,
1617 ) -> list[DatasetRef]:
1618 """Ingest files into a Butler data repository.
1620 This creates any new exposure or visit Dimension entries needed to
1621 identify the ingested files, creates new Dataset entries in the
1622 Registry and finally ingests the files themselves into the Datastore.
1623 Any needed instrument, detector, and physical_filter Dimension entries
1624 must exist in the Registry before `run` is called.
1626 Parameters
1627 ----------
1628 files : iterable `lsst.resources.ResourcePath`, `str` or path-like
1629 Paths to the files to be ingested. Can refer to directories.
1630 Will be made absolute if they are not already.
1631 pool : `concurrent.futures.ThreadPoolExecutor`, optional
1632 If not `None`, a process pool with which to parallelize some
1633 operations. This parameter was previously a `multiprocessing.Pool`
1634 but that option is no longer supported since it is slow compared
1635 to futures.
1636 processes : `int`, optional
1637 The number of processes to use. Ignored if ``pool`` is not `None`.
1638 Deprecated. Please use ``num_workers`` parameter instead.
1639 run : `str`, optional
1640 Name of a RUN-type collection to write to, overriding
1641 the default derived from the instrument name.
1642 file_filter : `str` or `re.Pattern`, optional
1643 Pattern to use to discover files to ingest within directories.
1644 The default is to search for FITS files. The regex applies to
1645 files within the directory.
1646 group_files : `bool`, optional
1647 Group files by directory if they have been discovered in
1648 directories. Will not affect files explicitly provided.
1649 skip_existing_exposures : `bool`, optional
1650 If `True` (`False` is default), skip raws that have already been
1651 ingested (i.e. raws for which we already have a dataset with the
1652 same data ID in the target collection, even if from another file).
1653 Note that this is much slower than just not passing
1654 already-ingested files as inputs, because we still need to read and
1655 process metadata to identify which exposures to search for. It
1656 also will not work reliably if multiple processes are attempting to
1657 ingest raws from the same exposure concurrently, in that different
1658 processes may still attempt to ingest the same raw and conflict,
1659 causing a failure that prevents other raws from the same exposure
1660 from being ingested.
1661 update_exposure_records : `bool`, optional
1662 If `True` (`False` is default), update existing exposure records
1663 that conflict with the new ones instead of rejecting them. THIS IS
1664 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS
1665 KNOWN TO BE BAD. This should usually be combined with
1666 ``skip_existing_exposures=True``.
1667 track_file_attrs : `bool`, optional
1668 Control whether file attributes such as the size or checksum should
1669 be tracked by the datastore. Whether this parameter is honored
1670 depends on the specific datastore implementation.
1671 search_indexes : `bool`, optional
1672 If `True` the code will search for index JSON files in given
1673 directories. If you know for a fact that index files do not exist
1674 set this to `False` for a slight speed up in metadata gathering.
1675 num_workers : `int`, optional
1676 The number of workers to use. Ignored if ``pool`` parameter is
1677 given.
1678 skip_ingest : `bool`, optional
1679 Set this to `True` to do metadata extraction and dimension record
1680 updates without attempting to re-ingest. This can be useful if
1681 there has been a metadata correction associated with an exposure.
1683 Returns
1684 -------
1685 refs : `list` of `lsst.daf.butler.DatasetRef`
1686 Dataset references for ingested raws.
1688 Notes
1689 -----
1690 This method inserts all datasets for an exposure within a transaction,
1691 guaranteeing that partial exposures are never ingested. The exposure
1692 dimension record is inserted with
1693 `lsst.daf.butler.Registry.syncDimensionData` first (in its own
1694 transaction), which inserts only if a record with the same
1695 primary key does not already exist. This allows different files within
1696 the same exposure to be ingested in different runs.
1697 """
1698 if pool and not isinstance(pool, concurrent.futures.ThreadPoolExecutor):
1699 raise ValueError(f"This parameter must now be a ThreadPoolExecutor but was given {pool}.")
1701 if processes is not None:
1702 warnings.warn(
1703 "Processes parameter is deprecated. Please use num_workers parameter.",
1704 FutureWarning,
1705 stacklevel=3, # Jump above the timeMethod wrapper.
1706 )
1707 num_workers = processes
1709 refs = []
1710 bad_files = []
1711 n_exposures = 0
1712 n_exposures_failed = 0
1713 n_ingests_failed = 0
1714 self.metrics.reset() # Clear previous metrics.
1715 ingest_duration = 0.0
1716 if group_files:
1717 with time_this(log=self.log, msg="Processing ingest groups") as timer:
1718 for group in ResourcePath.findFileResources(files, file_filter, group_files):
1719 new_refs, bad, n_exp, n_exp_fail, n_ingest_fail = self.ingestFiles(
1720 tuple(group),
1721 pool=pool,
1722 num_workers=num_workers,
1723 run=run,
1724 skip_existing_exposures=skip_existing_exposures,
1725 update_exposure_records=update_exposure_records,
1726 track_file_attrs=track_file_attrs,
1727 search_indexes=search_indexes,
1728 skip_ingest=skip_ingest,
1729 )
1730 refs.extend(new_refs)
1731 bad_files.extend(bad)
1732 n_exposures += n_exp
1733 n_exposures_failed += n_exp_fail
1734 n_ingests_failed += n_ingest_fail
1735 ingest_duration = timer.duration
1736 else:
1737 with time_this(log=self.log, msg="Ingesting all files in one batch") as timer:
1738 refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed = self.ingestFiles(
1739 tuple(ResourcePath.findFileResources(files, file_filter, group_files)),
1740 pool=pool,
1741 num_workers=num_workers,
1742 run=run,
1743 skip_existing_exposures=skip_existing_exposures,
1744 update_exposure_records=update_exposure_records,
1745 track_file_attrs=track_file_attrs,
1746 search_indexes=search_indexes,
1747 skip_ingest=skip_ingest,
1748 )
1749 ingest_duration = timer.duration
1751 had_failure = False
1753 if bad_files:
1754 had_failure = True
1755 self.log.warning("Could not extract observation metadata from the following:")
1756 for f in bad_files:
1757 self.log.warning("- %s", f)
1759 if skip_ingest:
1760 ingest_text = ""
1761 else:
1762 ingest_text = f" - time in butler ingest: {self.metrics.time_for_ingest} s\n"
1764 self.log.info(
1765 "Successfully processed data from %d exposure%s with %d failure%s from exposure"
1766 " registration and %d failure%s from file ingest.\n"
1767 "Timing breakdown:\n"
1768 " - time in metadata gathering: %f s\n"
1769 " - time in dimension record writing: %f s\n"
1770 "%s"
1771 " - time in user-supplied callbacks: %f s\n",
1772 *_log_msg_counter(n_exposures),
1773 *_log_msg_counter(n_exposures_failed),
1774 *_log_msg_counter(n_ingests_failed),
1775 self.metrics.time_for_metadata,
1776 self.metrics.time_for_records,
1777 ingest_text,
1778 self.metrics.time_for_callbacks,
1779 )
1780 if n_exposures_failed > 0 or n_ingests_failed > 0:
1781 had_failure = True
1782 if not skip_ingest:
1783 self.log.info(
1784 "Ingested %d distinct Butler dataset%s in %f sec", *_log_msg_counter(refs), ingest_duration
1785 )
1787 if had_failure:
1788 raise RuntimeError("Some failures encountered during ingestion")
1790 return refs