Coverage for python / lsst / obs / base / ingest.py: 14%

515 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:50 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "RawExposureData", 

26 "RawFileData", 

27 "RawFileDatasetInfo", 

28 "RawIngestConfig", 

29 "RawIngestTask", 

30 "makeTransferChoiceField", 

31) 

32 

33import concurrent.futures 

34import contextlib 

35import json 

36import logging 

37import re 

38import warnings 

39import zipfile 

40from collections import defaultdict 

41from collections.abc import Callable, Iterable, Iterator, MutableMapping, Sequence, Sized 

42from contextlib import contextmanager 

43from dataclasses import InitVar, dataclass 

44from typing import Any, ClassVar, cast 

45 

46from astro_metadata_translator import MetadataTranslator, ObservationInfo, merge_headers 

47from astro_metadata_translator.indexing import process_index_data, process_sidecar_data 

48from pydantic import BaseModel 

49 

50from lsst.afw.fits import readMetadata 

51from lsst.daf.butler import ( 

52 Butler, 

53 CollectionType, 

54 DataCoordinate, 

55 DatasetIdGenEnum, 

56 DatasetRef, 

57 DatasetType, 

58 DimensionRecord, 

59 DimensionUniverse, 

60 FileDataset, 

61 Formatter, 

62 FormatterV2, 

63 Progress, 

64 Timespan, 

65) 

66from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

67from lsst.pex.config import ChoiceField, Config, Field 

68from lsst.pipe.base import Instrument, Task 

69from lsst.resources import ResourcePath, ResourcePathExpression 

70from lsst.utils.logging import LsstLoggers 

71from lsst.utils.timer import time_this, timeMethod 

72 

73from ._instrument import makeExposureRecordFromObsInfo 

74 

75 

76def _do_nothing(*args: Any, **kwargs: Any) -> None: 

77 """Do nothing. 

78 

79 This is a function that accepts anything and does nothing. 

80 For use as a default in callback arguments. 

81 """ 

82 pass 

83 

84 

85def _log_msg_counter(noun: int | Sized) -> tuple[int, str]: 

86 """Count the iterable and return the count and plural modifier. 

87 

88 Parameters 

89 ---------- 

90 noun : `Sized` or `int` 

91 Thing to count. If given an integer it is assumed to be the count 

92 to use to calculate modifier. 

93 

94 Returns 

95 ------- 

96 num : `int` 

97 Number of items found in ``noun``. 

98 modifier : `str` 

99 Character to add to the end of a string referring to these items 

100 to indicate whether it was a single item or not. Returns empty 

101 string if there is one item or "s" otherwise. 

102 

103 Examples 

104 -------- 

105 .. code-block:: python 

106 

107 log.warning("Found %d file%s", *_log_msg_counter(nfiles)) 

108 """ 

109 if isinstance(noun, int): 

110 num = noun 

111 else: 

112 num = len(noun) 

113 return num, "" if num == 1 else "s" 

114 

115 

116class IngestMetrics(BaseModel): 

117 """Metrics collected during raw ingest.""" 

118 

119 time_for_metadata: float = 0.0 

120 """Wall-clock time, in seconds, spent gathering file metadata.""" 

121 

122 time_for_records: float = 0.0 

123 """Wall-clock time, in seconds, spent writing dimension records.""" 

124 

125 time_for_ingest: float = 0.0 

126 """Wall-clock time, in seconds, spent calling butler ingest.""" 

127 

128 time_for_callbacks: float = 0.0 

129 """Wall-clock time, in seconds, processing user-supplied callbacks.""" 

130 

131 def reset(self) -> None: 

132 """Reset all metrics to initial values.""" 

133 self.time_for_ingest = 0.0 

134 self.time_for_records = 0.0 

135 self.time_for_metadata = 0.0 

136 self.time_for_callbacks = 0.0 

137 

138 @contextmanager 

139 def collect_metric( 

140 self, 

141 property: str, 

142 log: LsstLoggers | None = None, 

143 msg: str | None = None, 

144 args: tuple[Any, ...] = (), 

145 ) -> Iterator[None]: 

146 with time_this(log=log, msg=msg, args=args, level=logging.INFO) as timer: 

147 yield 

148 setattr(self, property, getattr(self, property) + timer.duration) 

149 

150 

151@dataclass 

152class RawFileDatasetInfo: 

153 """Information about a single dataset within a raw file.""" 

154 

155 dataId: DataCoordinate 

156 """Data ID for this file (`lsst.daf.butler.DataCoordinate`).""" 

157 

158 obsInfo: ObservationInfo 

159 """Standardized observation metadata extracted directly from the file 

160 headers (`astro_metadata_translator.ObservationInfo`). 

161 """ 

162 

163 

164@dataclass 

165class RawFileData: 

166 """Information about a single raw file, used during ingest.""" 

167 

168 datasets: list[RawFileDatasetInfo] 

169 """The information describing each dataset within this raw file. 

170 (`list` of `RawFileDatasetInfo`) 

171 """ 

172 

173 filename: ResourcePath 

174 """URI of the file this information was extracted from (`str`). 

175 

176 This is the path prior to ingest, not the path after ingest. 

177 """ 

178 

179 FormatterClass: type[Formatter] 

180 """Formatter class that should be used to ingest this file (`type`; as 

181 subclass of `~lsst.daf.butler.Formatter`). 

182 """ 

183 

184 instrument: Instrument | None 

185 """The `Instrument` instance associated with this file. Can be `None` 

186 if ``datasets`` is an empty list.""" 

187 

188 

189@dataclass 

190class RawExposureData: 

191 """Information about a complete raw exposure, used during ingest.""" 

192 

193 dataId: DataCoordinate 

194 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`). 

195 """ 

196 

197 files: list[RawFileData] 

198 """List of structures containing file-level information. 

199 """ 

200 

201 universe: InitVar[DimensionUniverse] 

202 """Set of all known dimensions. 

203 """ 

204 

205 record: DimensionRecord 

206 """The exposure `~lsst.daf.butler.DimensionRecord` that must be inserted 

207 into the `~lsst.daf.butler.Registry` prior to file-level ingest 

208 (`~lsst.daf.butler.DimensionRecord`). 

209 """ 

210 

211 dependencyRecords: dict[str, DimensionRecord] 

212 """Additional records that must be inserted into the 

213 `~lsst.daf.butler.Registry` prior to ingesting the exposure ``record`` 

214 (e.g., to satisfy foreign key constraints), indexed by the dimension name. 

215 """ 

216 

217 

218def makeTransferChoiceField( 

219 doc: str = "How to transfer files (None for no transfer).", default: str = "auto" 

220) -> ChoiceField: 

221 """Create a Config field with options for transferring data between repos. 

222 

223 The allowed options for the field are exactly those supported by 

224 `lsst.daf.butler.Datastore.ingest`. 

225 

226 Parameters 

227 ---------- 

228 doc : `str` 

229 Documentation for the configuration field. 

230 default : `str`, optional 

231 Default transfer mode for the field. 

232 

233 Returns 

234 ------- 

235 field : `lsst.pex.config.ChoiceField` 

236 Configuration field. 

237 """ 

238 return ChoiceField( 

239 doc=doc, 

240 dtype=str, 

241 allowed={ 

242 "move": "move", 

243 "copy": "copy", 

244 "auto": "choice will depend on datastore", 

245 "direct": "use URI to ingested file directly in datastore", 

246 "link": "hard link falling back to symbolic link", 

247 "hardlink": "hard link", 

248 "symlink": "symbolic (soft) link", 

249 "relsymlink": "relative symbolic link", 

250 }, 

251 optional=True, 

252 default=default, 

253 ) 

254 

255 

256class RawIngestConfig(Config): 

257 """Configuration class for RawIngestTask.""" 

258 

259 transfer = makeTransferChoiceField() 

260 failFast: Field[bool] = Field( 

261 dtype=bool, 

262 default=False, 

263 doc="If True, stop ingest as soon as any problem is encountered with any file. " 

264 "Otherwise problem files will be skipped and logged and a report issued at completion.", 

265 ) 

266 

267 

268class RawIngestTask(Task): 

269 """Driver Task for ingesting raw data into Gen3 Butler repositories. 

270 

271 Parameters 

272 ---------- 

273 config : `RawIngestConfig` 

274 Configuration for the task. 

275 butler : `~lsst.daf.butler.Butler` 

276 Writeable butler instance, with ``butler.run`` set to the appropriate 

277 `~lsst.daf.butler.CollectionType.RUN` collection for these raw 

278 datasets. 

279 on_success : `collections.abc.Callable`, optional 

280 A callback invoked when all of the raws associated with an exposure 

281 are ingested. Will be passed a list of `~lsst.daf.butler.FileDataset` 

282 objects, each containing one or more resolved 

283 `~lsst.daf.butler.DatasetRef` objects. If this callback raises it will 

284 interrupt the entire ingest process, even if `RawIngestConfig.failFast` 

285 is `False`. 

286 on_metadata_failure : `collections.abc.Callable`, optional 

287 A callback invoked when a failure occurs trying to translate the 

288 metadata for a file. Will be passed the URI and the exception, in 

289 that order, as positional arguments. Guaranteed to be called in an 

290 ``except`` block, allowing the callback to re-raise or replace (with 

291 ``raise ... from``) to override the task's usual error handling (before 

292 `RawIngestConfig.failFast` logic occurs). This callback can be called 

293 from within a worker thread if multiple workers have been requested. 

294 Ensure that any code within the call back is thread-safe. 

295 on_ingest_failure : `collections.abc.Callable`, optional 

296 A callback invoked when dimension record or dataset insertion into the 

297 database fails for an exposure. Will be passed a `RawExposureData` 

298 instance and the exception, in that order, as positional arguments. 

299 Guaranteed to be called in an ``except`` block, allowing the callback 

300 to re-raise or replace (with ``raise ... from``) to override the task's 

301 usual error handling (before `RawIngestConfig.failFast` logic occurs). 

302 on_exposure_record : `collections.abc.Callable`, optional 

303 A callback invoked when an exposure dimension record has been created 

304 or modified. Will not be called if the record already existed. Will 

305 be called with the exposure record. 

306 **kwargs 

307 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task` 

308 constructor. 

309 

310 Notes 

311 ----- 

312 Each instance of `RawIngestTask` writes to the same Butler. Each 

313 invocation of `RawIngestTask.run` ingests a list of files. 

314 """ 

315 

316 ConfigClass: ClassVar[type[Config]] = RawIngestConfig 

317 

318 _DefaultName: ClassVar[str] = "ingest" 

319 

320 def getDatasetType(self) -> DatasetType: 

321 """Return the default DatasetType of the datasets ingested by this 

322 Task. 

323 

324 Returns 

325 ------- 

326 datasetType : `lsst.daf.butler.DatasetType` 

327 The default dataset type to use for the data being ingested. This 

328 is only used if the relevant `~lsst.pipe.base.Instrument` does not 

329 define an override. 

330 """ 

331 return DatasetType( 

332 "raw", 

333 ("instrument", "detector", "exposure"), 

334 "Exposure", 

335 universe=self.butler.dimensions, 

336 ) 

337 

338 # Mypy can not determine that the config passed to super() is this type. 

339 config: RawIngestConfig 

340 

341 def __init__( 

342 self, 

343 config: RawIngestConfig, 

344 *, 

345 butler: Butler, 

346 on_success: Callable[[list[FileDataset]], Any] = _do_nothing, 

347 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing, 

348 on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing, 

349 on_exposure_record: Callable[[DimensionRecord], Any] = _do_nothing, 

350 **kwargs: Any, 

351 ): 

352 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here. 

353 super().__init__(config, **kwargs) 

354 self.butler = butler 

355 self.universe = self.butler.dimensions 

356 self.datasetType = self.getDatasetType() 

357 self._on_success = on_success 

358 self._on_exposure_record = on_exposure_record 

359 self._on_metadata_failure = on_metadata_failure 

360 self._on_ingest_failure = on_ingest_failure 

361 self.progress = Progress("obs.base.RawIngestTask") 

362 

363 # Import all the instrument classes so that we ensure that we 

364 # have all the relevant metadata translators loaded. 

365 self.instruments = Instrument.importAll(self.butler.registry) 

366 

367 # Read all the instrument records into a cache since they will be 

368 # needed later to calculate day_obs timespans, if appropriate. 

369 self._instrument_records = { 

370 rec.name: rec for rec in butler.registry.queryDimensionRecords("instrument") 

371 } 

372 # Create empty metrics. 

373 self.metrics = IngestMetrics() 

374 

375 def _reduce_kwargs(self) -> dict[str, Any]: 

376 # Add extra parameters to pickle. 

377 return dict( 

378 **super()._reduce_kwargs(), 

379 butler=self.butler, 

380 on_success=self._on_success, 

381 on_metadata_failure=self._on_metadata_failure, 

382 on_ingest_failure=self._on_ingest_failure, 

383 on_exposure_record=self._on_exposure_record, 

384 ) 

385 

386 def _determine_instrument_formatter( 

387 self, dataId: DataCoordinate, filename: ResourcePath 

388 ) -> tuple[Instrument | None, type[Formatter | FormatterV2]]: 

389 """Determine the instrument and formatter class. 

390 

391 Parameters 

392 ---------- 

393 dataId : `lsst.daf.butler.DataCoordinate` 

394 The dataId associated with this dataset. 

395 filename : `lsst.resources.ResourcePath` 

396 URI of file used for error reporting. 

397 

398 Returns 

399 ------- 

400 instrument : `Instrument` or `None` 

401 Instance of the `Instrument` associated with this dataset. `None` 

402 indicates that the instrument could not be determined. 

403 formatterClass : `type` 

404 Class to be used as the formatter for this dataset. 

405 

406 Notes 

407 ----- 

408 Does not access butler registry since it may be called from threads. 

409 """ 

410 # The data model currently assumes that whilst multiple datasets 

411 # can be associated with a single file, they must all share the 

412 # same formatter. 

413 FormatterClass: type[Formatter | FormatterV2] = Formatter 

414 try: 

415 instrument_name = cast(str, dataId["instrument"]) 

416 instrument = self.instruments[instrument_name]() 

417 except LookupError as e: 

418 self._on_metadata_failure(filename, e) 

419 self.log.warning( 

420 "Instrument %s for file %s not known to registry", dataId["instrument"], filename 

421 ) 

422 if self.config.failFast: 

423 raise RuntimeError( 

424 f"Instrument {dataId['instrument']} for file {filename} not known to registry" 

425 ) from e 

426 # Indicate that we could not work out the instrument. 

427 instrument = None 

428 else: 

429 assert instrument is not None, "Should be guaranted by fromName succeeding." 

430 FormatterClass = instrument.getRawFormatter(dataId) 

431 return instrument, FormatterClass 

432 

433 def get_raw_datasetType( 

434 self, instrument: Instrument, cache: dict[str, DatasetType] | None = None 

435 ) -> DatasetType: 

436 """Get the raw dataset type associated with this ingest. 

437 

438 Parameters 

439 ---------- 

440 instrument : `Instrument` 

441 Class that might specify an override of the default raw dataset 

442 type. If no override is specified the task default will be used. 

443 cache : `dict` [`str`, `lsst.daf.butler.DatasetType`] \ 

444 or `None`, optional 

445 An optional cache that can be used to return a pre-existing 

446 dataset type. Is not updated. 

447 

448 Returns 

449 ------- 

450 lsst.daf.butler.DatasetType 

451 The dataset type to use for raw ingest of this instrument. 

452 """ 

453 if cache is None: 

454 cache = {} 

455 if raw_definition := getattr(instrument, "raw_definition", None): 

456 datasetTypeName, dimensions, storageClass = raw_definition 

457 if not (datasetType := cache.get(datasetTypeName)): 

458 datasetType = DatasetType( 

459 datasetTypeName, dimensions, storageClass, universe=self.butler.dimensions 

460 ) 

461 else: 

462 datasetType = self.datasetType 

463 return datasetType 

464 

465 def extractMetadata(self, filename: ResourcePath) -> RawFileData: 

466 """Extract and process metadata from a single raw file. 

467 

468 Parameters 

469 ---------- 

470 filename : `lsst.resources.ResourcePath` 

471 URI to the file. 

472 

473 Returns 

474 ------- 

475 data : `RawFileData` 

476 A structure containing the metadata extracted from the file, 

477 as well as the original filename. All fields will be populated, 

478 but the `RawFileDatasetInfo.dataId` attribute will be a minimal 

479 (unexpanded) `~lsst.daf.butler.DataCoordinate` instance. The 

480 ``instrument`` field will be `None` if there is a problem 

481 with metadata extraction. 

482 

483 Notes 

484 ----- 

485 Assumes that there is a single dataset associated with the given 

486 file. Instruments using a single file to store multiple datasets 

487 must implement their own version of this method. 

488 

489 By default the method will catch all exceptions unless the 

490 `RawIngestConfig.failFast` configuration item is `True`. If an error 

491 is encountered the supplied ``on_metadata_failure()`` 

492 method will be called. If no exceptions result and an error was 

493 encountered the returned object will have a null-instrument class and 

494 no datasets. 

495 

496 This method supports sidecar JSON files which can be used to 

497 extract metadata without having to read the data file itself. 

498 The sidecar file is always used if found. 

499 """ 

500 formatterClass: type[Formatter | FormatterV2] 

501 sidecar_fail_msg = "" # Requires prepended space when set. 

502 try: 

503 sidecar_file = filename.updatedExtension(".json") 

504 headers = [] 

505 with contextlib.suppress(Exception): 

506 # Try to read directly, bypassing existence check. 

507 content = json.loads(sidecar_file.read()) 

508 headers = [process_sidecar_data(content)] 

509 sidecar_fail_msg = " (via sidecar)" 

510 if not headers: 

511 # Read the metadata from the data file itself. 

512 

513 # For remote files download the entire file to get the 

514 # header. This is very inefficient and it would be better 

515 # to have some way of knowing where in the file the headers 

516 # are and to only download those parts of the file. 

517 with filename.as_local() as local_file: 

518 # Read the primary. This might be sufficient. 

519 header = readMetadata(local_file.ospath, 0) 

520 translator_class = None 

521 

522 try: 

523 # Try to work out a translator class early. 

524 translator_class = MetadataTranslator.determine_translator( 

525 header, filename=str(filename) 

526 ) 

527 except ValueError: 

528 # Primary header was not sufficient (maybe this file 

529 # has been compressed or is a MEF with minimal 

530 # primary). Read second header and merge with primary. 

531 header = merge_headers([header, readMetadata(local_file.ospath, 1)], mode="overwrite") 

532 

533 # Try again to work out a translator class, letting this 

534 # fail. 

535 if translator_class is None: 

536 translator_class = MetadataTranslator.determine_translator( 

537 header, filename=str(filename) 

538 ) 

539 

540 # Request the headers to use for ingest 

541 headers = list(translator_class.determine_translatable_headers(local_file.ospath, header)) 

542 

543 # Add each header to the dataset list 

544 datasets = [self._calculate_dataset_info(h, filename) for h in headers] 

545 

546 except Exception as e: 

547 self.log.debug("Problem extracting metadata from %s%s: %s", filename, sidecar_fail_msg, e) 

548 # Indicate to the caller that we failed to read. 

549 datasets = [] 

550 formatterClass = Formatter 

551 instrument = None 

552 self._on_metadata_failure(filename, e) 

553 if self.config.failFast: 

554 raise RuntimeError( 

555 f"Problem extracting metadata for file {filename}{sidecar_fail_msg}" 

556 ) from e 

557 else: 

558 self.log.debug("Extracted metadata for file %s%s", filename, sidecar_fail_msg) 

559 # The data model currently assumes that whilst multiple datasets 

560 # can be associated with a single file, they must all share the 

561 # same formatter. 

562 instrument, formatterClass = self._determine_instrument_formatter(datasets[0].dataId, filename) 

563 if instrument is None: 

564 datasets = [] 

565 

566 return RawFileData( 

567 datasets=datasets, 

568 filename=filename, 

569 # MyPy wants this to be a non-abstract class, which is not true 

570 # for the error case where instrument is None and datasets=[]. 

571 FormatterClass=formatterClass, # type: ignore 

572 instrument=instrument, 

573 ) 

574 

575 @classmethod 

576 def getObservationInfoSubsets(cls) -> tuple[set, set]: 

577 """Return subsets of fields in the 

578 `~astro_metadata_translator.ObservationInfo` that we care about. 

579 

580 These fields will be used in constructing an exposure record. 

581 

582 Returns 

583 ------- 

584 required : `set` 

585 Set of `~astro_metadata_translator.ObservationInfo` field names 

586 that are required. 

587 optional : `set` 

588 Set of `~astro_metadata_translator.ObservationInfo` field names 

589 we will use if they are available. 

590 """ 

591 # Marking the new properties "group_counter_*" and 

592 # "has_simulated_content" as required, assumes that we either 

593 # recreate any existing index/sidecar files that include translated 

594 # values, or else allow astro_metadata_translator to fill in 

595 # defaults. 

596 required = { 

597 "datetime_begin", 

598 "datetime_end", 

599 "detector_num", 

600 "exposure_group", 

601 "exposure_id", 

602 "exposure_time_requested", 

603 "group_counter_end", 

604 "group_counter_start", 

605 "has_simulated_content", 

606 "instrument", 

607 "observation_id", 

608 "observation_type", 

609 "observing_day", 

610 "physical_filter", 

611 } 

612 optional = { 

613 "altaz_begin", 

614 "boresight_rotation_coord", 

615 "boresight_rotation_angle", 

616 "dark_time", 

617 "tracking_radec", 

618 "object", 

619 "observation_counter", 

620 "observation_reason", 

621 "observing_day_offset", 

622 "science_program", 

623 "visit_id", 

624 "can_see_sky", 

625 } 

626 return required, optional 

627 

628 def _calculate_dataset_info( 

629 self, header: MutableMapping[str, Any] | ObservationInfo, filename: ResourcePath 

630 ) -> RawFileDatasetInfo: 

631 """Calculate a RawFileDatasetInfo from the supplied information. 

632 

633 Parameters 

634 ---------- 

635 header : Mapping or `astro_metadata_translator.ObservationInfo` 

636 Header from the dataset or previously-translated content. 

637 filename : `lsst.resources.ResourcePath` 

638 Filename to use for error messages. 

639 

640 Returns 

641 ------- 

642 dataset : `RawFileDatasetInfo` 

643 The dataId, and observation information associated with this 

644 dataset. 

645 """ 

646 required, optional = self.getObservationInfoSubsets() 

647 if isinstance(header, ObservationInfo): 

648 obsInfo = header 

649 missing = [] 

650 # Need to check the required properties are present. 

651 for property in required: 

652 # getattr does not need to be protected because it is using 

653 # the defined list above containing properties that must exist. 

654 value = getattr(obsInfo, property) 

655 if value is None: 

656 missing.append(property) 

657 if missing: 

658 raise ValueError( 

659 f"Requested required properties are missing from file {filename}: {missing} (via JSON)" 

660 ) 

661 

662 else: 

663 obsInfo = ObservationInfo( 

664 header, 

665 pedantic=False, 

666 filename=str(filename), 

667 required=required, 

668 subset=required | optional, 

669 ) 

670 

671 dataId = DataCoordinate.standardize( 

672 instrument=obsInfo.instrument, 

673 exposure=obsInfo.exposure_id, 

674 detector=obsInfo.detector_num, 

675 universe=self.universe, 

676 ) 

677 return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId) 

678 

679 def readZipIndexFiles( 

680 self, files: Iterable[ResourcePath] 

681 ) -> tuple[list[RawFileData], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

682 """Given a list of files, filter out zip files and look for index files 

683 inside. 

684 

685 Parameters 

686 ---------- 

687 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

688 URIs to the files to be ingested. 

689 

690 Returns 

691 ------- 

692 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

693 Merged contents of all relevant index files found in zip files. 

694 The keys include the path to the data file within the zip using 

695 the butler fragment convention of ``zip-path=PATH``. 

696 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

697 Updated list of the input files with zip entries removed. 

698 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

699 Zip files that contained index information. 

700 bad_zip_files: `set` [ `lsst.resources.ResourcePath` ] 

701 Zip files that contained no index information. 

702 """ 

703 zip_metadata_index = "_metadata_index.json" 

704 

705 # Files that weren't zip files. 

706 updated_files: list[ResourcePath] = [] 

707 

708 # Index files we failed to read. 

709 bad_index_files: set[ResourcePath] = set() 

710 

711 # Any good index files that were found and used. 

712 good_index_files: set[ResourcePath] = set() 

713 

714 # Processed content from any zip files. 

715 indexFileData: list[RawFileData] = [] 

716 

717 for file in files: 

718 if file.getExtension() != ".zip": 

719 updated_files.append(file) 

720 continue 

721 

722 zip_info: dict[str, zipfile.ZipInfo] = {} 

723 try: 

724 with file.open("rb") as fd, zipfile.ZipFile(fd) as zf: 

725 zip_info = {info.filename: info for info in zf.infolist()} 

726 content = json.loads(zf.read(zip_metadata_index)) 

727 index = process_index_data(content, force_dict=True) 

728 assert isinstance(index, MutableMapping) 

729 

730 # Try to read the ZipIndex. 

731 zip_index = ZipIndex.from_open_zip(zf) 

732 except Exception as e: 

733 if self.config.failFast: 

734 raise RuntimeError(f"Problem reading index file from zip file at {file}") from e 

735 bad_index_files.add(file) 

736 continue 

737 self.log.debug("Extracted index metadata from zip file %s", str(file)) 

738 good_index_files.add(file) 

739 

740 # All the metadata read from this index file with keys of full 

741 # path. 

742 index_entries: dict[ResourcePath, Any] = {} 

743 

744 # In theory we could scan for JSON sidecar files associated with 

745 # any files not found in the metadata index, but that is not meant 

746 # to be possible. Guider data is another issue not handled by 

747 # this code. 

748 for path_in_zip in index: 

749 if path_in_zip not in zip_info: 

750 # Index entry exists but no file for it. 

751 self.log.info( 

752 "File %s is in zip index but not in zip file %s. Ignoring.", path_in_zip, file 

753 ) 

754 continue 

755 file_to_ingest = file.replace(fragment=f"zip-path={path_in_zip}") 

756 index_entries[file_to_ingest] = index[path_in_zip] 

757 

758 file_data = self.processIndexEntries(index_entries) 

759 

760 # Validate that the index entries we have read match the 

761 # values in the butler zip index. 

762 data_ids_from_index: dict[str, tuple[DataCoordinate, ...]] = {} 

763 for f in file_data: 

764 _, path_in_zip = f.filename.fragment.split("=") 

765 data_ids_from_index[path_in_zip] = tuple(d.dataId for d in f.datasets) 

766 

767 data_ids_from_butler_index: dict[str, tuple[DataCoordinate, ...]] = {} 

768 # Refs indexed by UUID. 

769 refs = zip_index.refs.to_refs(universe=self.universe) 

770 id_to_ref = {ref.id: ref for ref in refs} 

771 for path_in_zip, index_info in zip_index.artifact_map.items(): 

772 data_ids_from_butler_index[path_in_zip] = tuple( 

773 id_to_ref[id_].dataId for id_ in index_info.ids 

774 ) 

775 

776 if data_ids_from_butler_index != data_ids_from_index: 

777 self.log.warning( 

778 "Recalculating the Data IDs for zip file %s (which may include new metadata corrections) " 

779 "results in a difference to the Data IDs recorded in the butler index in that zip. " 

780 "Consider remaking the zipped raws.", 

781 file, 

782 ) 

783 

784 indexFileData.extend(file_data) 

785 

786 return indexFileData, updated_files, good_index_files, bad_index_files 

787 

788 def locateAndReadIndexFiles( 

789 self, files: Iterable[ResourcePath] 

790 ) -> tuple[dict[ResourcePath, Any], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

791 """Given a list of files, look for index files and read them. 

792 

793 Index files can either be explicitly in the list of files to 

794 ingest, or else located in the same directory as a file to ingest. 

795 Index entries are always used if present. 

796 

797 Parameters 

798 ---------- 

799 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

800 URIs to the files to be ingested. 

801 

802 Returns 

803 ------- 

804 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

805 Merged contents of all relevant index files found. These can 

806 be explicitly specified index files or ones found in the 

807 directory alongside a data file to be ingested. 

808 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

809 Updated list of the input files with entries removed that were 

810 found listed in an index file. Order is not guaranteed to 

811 match the order of the files given to this routine. 

812 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

813 Index files that were successfully read. 

814 bad_index_files: `set` [ `lsst.resources.ResourcePath` ] 

815 Files that looked like index files but failed to read properly. 

816 """ 

817 # Convert the paths to absolute for easy comparison with index content. 

818 # Do not convert to real paths since we have to assume that index 

819 # files are in this location and not the location which it links to. 

820 files = tuple(f.abspath() for f in files) 

821 

822 # Index files must be named this. 

823 index_root_file = "_index.json" 

824 

825 # Group the files by directory. 

826 files_by_directory: dict[ResourcePath, set[str]] = defaultdict(set) 

827 

828 for path in files: 

829 directory, file_in_dir = path.split() 

830 files_by_directory[directory].add(file_in_dir) 

831 

832 # All the metadata read from index files with keys of full path. 

833 index_entries: dict[ResourcePath, Any] = {} 

834 

835 # Index files we failed to read. 

836 bad_index_files: set[ResourcePath] = set() 

837 

838 # Any good index files that were found and used. 

839 good_index_files: set[ResourcePath] = set() 

840 

841 # Look for index files in those directories. 

842 for directory, files_in_directory in files_by_directory.items(): 

843 possible_index_file = directory.join(index_root_file) 

844 if possible_index_file.exists(): 

845 # If we are explicitly requesting an index file the 

846 # messages should be different. 

847 index_msg = "inferred" 

848 is_implied = True 

849 if index_root_file in files_in_directory: 

850 index_msg = "explicit" 

851 is_implied = False 

852 

853 # Try to read the index file and catch and report any 

854 # problems. 

855 try: 

856 content = json.loads(possible_index_file.read()) 

857 index = process_index_data(content, force_dict=True) 

858 # mypy should in theory know that this is a mapping 

859 # from the overload type annotation of process_index_data. 

860 assert isinstance(index, MutableMapping) 

861 except Exception as e: 

862 # Only trigger the callback if the index file 

863 # was asked for explicitly. Triggering on implied file 

864 # might be surprising. 

865 if not is_implied: 

866 self._on_metadata_failure(possible_index_file, e) 

867 if self.config.failFast: 

868 raise RuntimeError( 

869 f"Problem reading index file from {index_msg} location {possible_index_file}" 

870 ) from e 

871 bad_index_files.add(possible_index_file) 

872 continue 

873 

874 self.log.debug("Extracted index metadata from %s file %s", index_msg, possible_index_file) 

875 good_index_files.add(possible_index_file) 

876 

877 # Go through the index adding entries for files. 

878 # If we have non-index files in this directory marked for 

879 # ingest we should only get index information for those. 

880 # If the index file was explicit we use all entries. 

881 if is_implied: 

882 files_to_ingest = files_in_directory 

883 else: 

884 files_to_ingest = set(index) 

885 

886 # Copy relevant metadata into a single dict for all index 

887 # entries. 

888 for file_in_dir in files_to_ingest: 

889 # Skip an explicitly specified index file. 

890 # This should never happen because an explicit index 

891 # file will force ingest of all files in the index 

892 # and not use the explicit file list. If somehow 

893 # this is not true we continue. Raising an exception 

894 # seems like the wrong thing to do since this is harmless. 

895 if file_in_dir == index_root_file: 

896 self.log.info( 

897 "Logic error found scanning directory %s. Please file ticket.", directory 

898 ) 

899 continue 

900 if file_in_dir in index: 

901 file = directory.join(file_in_dir) 

902 if file in index_entries: 

903 # ObservationInfo overrides raw metadata 

904 if isinstance(index[file_in_dir], ObservationInfo) and not isinstance( 

905 index_entries[file], ObservationInfo 

906 ): 

907 self.log.warning( 

908 "File %s already specified in an index file but overriding" 

909 " with ObservationInfo content from %s", 

910 file, 

911 possible_index_file, 

912 ) 

913 else: 

914 self.log.warning( 

915 "File %s already specified in an index file, ignoring content from %s", 

916 file, 

917 possible_index_file, 

918 ) 

919 # Do nothing in this case 

920 continue 

921 

922 index_entries[file] = index[file_in_dir] 

923 

924 # Remove files from list that have index entries and also 

925 # any files that we determined to be explicit index files 

926 # or any index files that we failed to read. 

927 filtered = set(files) - set(index_entries) - good_index_files - bad_index_files 

928 

929 # The filtered list loses the initial order. Retaining the order 

930 # is good for testing but does have a cost if there are many 

931 # files when copying the good values out. A dict would have faster 

932 # lookups (using the files as keys) but use more memory. 

933 ordered: list[ResourcePath] = [] 

934 seen: set[ResourcePath] = set() 

935 for f in files: 

936 if f in filtered and f not in seen: 

937 ordered.append(f) 

938 seen.add(f) 

939 

940 return index_entries, ordered, good_index_files, bad_index_files 

941 

942 def processIndexEntries(self, index_entries: dict[ResourcePath, Any]) -> list[RawFileData]: 

943 """Convert index entries to RawFileData. 

944 

945 Parameters 

946 ---------- 

947 index_entries : `dict` [`lsst.resources.ResourcePath`, `typing.Any`] 

948 Dict indexed by name of file to ingest and with keys either 

949 raw metadata or translated 

950 `~astro_metadata_translator.ObservationInfo`. 

951 

952 Returns 

953 ------- 

954 data : `list` [ `RawFileData` ] 

955 Structures containing the metadata extracted from the file, 

956 as well as the original filename. All fields will be populated, 

957 but the `RawFileDatasetInfo.dataId` attributes will be minimal 

958 (unexpanded) `~lsst.daf.butler.DataCoordinate` instances. 

959 """ 

960 formatterClass: type[Formatter | FormatterV2] 

961 fileData = [] 

962 for filename, metadata in index_entries.items(): 

963 try: 

964 datasets = [self._calculate_dataset_info(metadata, filename)] 

965 except Exception as e: 

966 self.log.debug("Problem extracting metadata for file %s found in index file: %s", filename, e) 

967 datasets = [] 

968 formatterClass = Formatter 

969 instrument = None 

970 self._on_metadata_failure(filename, e) 

971 if self.config.failFast: 

972 raise RuntimeError( 

973 f"Problem extracting metadata for file {filename} found in index file" 

974 ) from e 

975 else: 

976 instrument, formatterClass = self._determine_instrument_formatter( 

977 datasets[0].dataId, filename 

978 ) 

979 if instrument is None: 

980 datasets = [] 

981 fileData.append( 

982 RawFileData( 

983 datasets=datasets, 

984 filename=filename, 

985 # MyPy wants this to be a non-abstract class, which is not 

986 # true for the error case where instrument is None and 

987 # datasets=[]. 

988 FormatterClass=formatterClass, # type: ignore 

989 instrument=instrument, 

990 ) 

991 ) 

992 return fileData 

993 

994 def groupByExposure(self, files: Iterable[RawFileData]) -> list[RawExposureData]: 

995 """Group an iterable of `RawFileData` by exposure. 

996 

997 Parameters 

998 ---------- 

999 files : iterable of `RawFileData` 

1000 File-level information to group. 

1001 

1002 Returns 

1003 ------- 

1004 exposures : `list` of `RawExposureData` 

1005 A list of structures that group the file-level information by 

1006 exposure. All fields will be populated. The 

1007 `RawExposureData.dataId` attributes will be minimal (unexpanded) 

1008 `~lsst.daf.butler.DataCoordinate` instances. 

1009 """ 

1010 exposureDimensions = self.universe["exposure"].minimal_group 

1011 byExposure = defaultdict(list) 

1012 for f in files: 

1013 # Assume that the first dataset is representative for the file. 

1014 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f) 

1015 

1016 return [ 

1017 RawExposureData( 

1018 dataId=dataId, 

1019 files=exposureFiles, 

1020 universe=self.universe, 

1021 record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe), 

1022 dependencyRecords=self.makeDependencyRecords( 

1023 exposureFiles[0].datasets[0].obsInfo, self.universe 

1024 ), 

1025 ) 

1026 for dataId, exposureFiles in byExposure.items() 

1027 ] 

1028 

1029 def makeExposureRecord( 

1030 self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any 

1031 ) -> DimensionRecord: 

1032 """Construct a registry record for an exposure. 

1033 

1034 This is a method that subclasses will often want to customize. This can 

1035 often be done by calling this base class implementation with additional 

1036 ``kwargs``. 

1037 

1038 Parameters 

1039 ---------- 

1040 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1041 Observation details for (one of the components of) the exposure. 

1042 universe : `lsst.daf.butler.DimensionUniverse` 

1043 Set of all known dimensions. 

1044 **kwargs 

1045 Additional field values for this record. 

1046 

1047 Returns 

1048 ------- 

1049 record : `lsst.daf.butler.DimensionRecord` 

1050 The exposure record that must be inserted into the 

1051 `~lsst.daf.butler.Registry` prior to file-level ingest. 

1052 """ 

1053 return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs) 

1054 

1055 def makeDependencyRecords( 

1056 self, obsInfo: ObservationInfo, universe: DimensionUniverse 

1057 ) -> dict[str, DimensionRecord]: 

1058 """Construct dependency records. 

1059 

1060 These dependency records will be inserted into the 

1061 `~lsst.daf.butler.Registry` before the exposure records, because they 

1062 are dependencies of the exposure. This allows an opportunity to satisfy 

1063 foreign key constraints that exist because of dimensions related to the 

1064 exposure. 

1065 

1066 This is a method that subclasses may want to customize, if they've 

1067 added dimensions that relate to an exposure. 

1068 

1069 Parameters 

1070 ---------- 

1071 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1072 Observation details for (one of the components of) the exposure. 

1073 universe : `lsst.daf.butler.DimensionUniverse` 

1074 Set of all known dimensions. 

1075 

1076 Returns 

1077 ------- 

1078 records : `dict` [`str`, `lsst.daf.butler.DimensionRecord`] 

1079 The records to insert, indexed by dimension name. 

1080 """ 

1081 records: dict[str, DimensionRecord] = {} 

1082 if "exposure" not in universe: 

1083 return records 

1084 exposure = universe["exposure"] 

1085 if "group" in exposure.implied: 

1086 records["group"] = universe["group"].RecordClass( 

1087 name=obsInfo.exposure_group, 

1088 instrument=obsInfo.instrument, 

1089 ) 

1090 if "day_obs" in exposure.implied: 

1091 if (offset := getattr(obsInfo, "observing_day_offset")) is not None: 

1092 offset_int = round(offset.to_value("s")) 

1093 timespan = Timespan.from_day_obs(obsInfo.observing_day, offset_int) 

1094 else: 

1095 timespan = None 

1096 records["day_obs"] = universe["day_obs"].RecordClass( 

1097 instrument=obsInfo.instrument, 

1098 id=obsInfo.observing_day, 

1099 timespan=timespan, 

1100 ) 

1101 return records 

1102 

1103 def expandDataIds(self, data: RawExposureData) -> RawExposureData: 

1104 """Expand the data IDs associated with a raw exposure. 

1105 

1106 This adds the metadata records. 

1107 

1108 Parameters 

1109 ---------- 

1110 data : `RawExposureData` 

1111 A structure containing information about the exposure to be 

1112 ingested. Must have `RawExposureData.record` populated. Should 

1113 be considered consumed upon return. 

1114 

1115 Returns 

1116 ------- 

1117 exposure : `RawExposureData` 

1118 An updated version of the input structure, with 

1119 `RawExposureData.dataId` and nested `RawFileDatasetInfo.dataId` 

1120 attributes updated to data IDs for which 

1121 `~lsst.daf.butler.DataCoordinate.hasRecords` returns `True`. 

1122 """ 

1123 # We start by expanded the exposure-level data ID; we won't use that 

1124 # directly in file ingest, but this lets us do some database lookups 

1125 # once per exposure instead of once per file later. 

1126 data.dataId = self.butler.registry.expandDataId( 

1127 data.dataId, 

1128 # We pass in the records we'll be inserting shortly so they aren't 

1129 # looked up from the database. We do expect instrument and filter 

1130 # records to be retrieved from the database here (though the 

1131 # Registry may cache them so there isn't a lookup every time). 

1132 records={"exposure": data.record, **data.dependencyRecords}, 

1133 ) 

1134 # Now we expand the per-file (exposure+detector) data IDs. This time 

1135 # we pass in the records we just retrieved from the exposure data ID 

1136 # expansion. 

1137 for file in data.files: 

1138 for dataset in file.datasets: 

1139 dataset.dataId = self.butler.registry.expandDataId( 

1140 dataset.dataId, 

1141 records={k: data.dataId.records[k] for k in data.dataId.dimensions.elements}, 

1142 ) 

1143 return data 

1144 

1145 def prep( 

1146 self, 

1147 files: Iterable[ResourcePath], 

1148 *, 

1149 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1150 search_indexes: bool = True, 

1151 ) -> tuple[Iterator[RawExposureData], list[ResourcePath]]: 

1152 """Perform all non-database-updating ingest preprocessing steps. 

1153 

1154 Parameters 

1155 ---------- 

1156 files : iterable over `str` or path-like objects 

1157 Paths to the files to be ingested. Will be made absolute 

1158 if they are not already. 

1159 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1160 If not `None`, a thread pool with which to parallelize some 

1161 operations. 

1162 search_indexes : `bool`, optional 

1163 If `True` the code will search for index JSON files in given 

1164 directories. If you know for a fact that index files do not exist 

1165 set this to `False` for a slight speed up in metadata gathering. 

1166 

1167 Returns 

1168 ------- 

1169 exposures : `~collections.abc.Iterator` [ `RawExposureData` ] 

1170 Data structures containing dimension records, filenames, and data 

1171 IDs to be ingested (one structure for each exposure). 

1172 bad_files : `list` of `str` 

1173 List of all the files that could not have metadata extracted. 

1174 """ 

1175 

1176 def _partition_good_bad( 

1177 file_data: Iterable[RawFileData], 

1178 ) -> tuple[list[RawFileData], list[ResourcePath]]: 

1179 """Filter out bad files and return good with list of bad.""" 

1180 good_files = [] 

1181 bad_files = [] 

1182 for fileDatum in self.progress.wrap(file_data, desc="Reading image metadata"): 

1183 if not fileDatum.datasets: 

1184 bad_files.append(fileDatum.filename) 

1185 else: 

1186 good_files.append(fileDatum) 

1187 return good_files, bad_files 

1188 

1189 # Look for zip files. 

1190 zip_file_data, files, good_zip_files, bad_zip_files = self.readZipIndexFiles(files) 

1191 if bad_zip_files: 

1192 self.log.info("Failed to extract index metadata from the following zip files:") 

1193 for bad in sorted(bad_zip_files): 

1194 self.log.info("- %s", bad) 

1195 

1196 # Look for index files and read them. 

1197 # There should be far fewer index files than data files. 

1198 index_entries: dict[ResourcePath, Any] = {} 

1199 if search_indexes: 

1200 index_entries, files, good_index_files, bad_index_files = self.locateAndReadIndexFiles(files) 

1201 if bad_index_files: 

1202 self.log.info("Failed to read the following explicitly requested index files:") 

1203 for bad in sorted(bad_index_files): 

1204 self.log.info("- %s", bad) 

1205 else: 

1206 # We have been told explicitly there are no indexes. 

1207 index_entries = {} 

1208 good_index_files = set() 

1209 bad_index_files = set() 

1210 

1211 # Merge information from zips and standalone index files. 

1212 good_index_files.update(good_zip_files) 

1213 bad_index_files.update(bad_zip_files) 

1214 

1215 # Now convert all the index file entries to standard form for ingest. 

1216 processed_bad_index_files: list[ResourcePath] = [] 

1217 indexFileData = self.processIndexEntries(index_entries) 

1218 indexFileData.extend(zip_file_data) 

1219 if indexFileData: 

1220 indexFileData, processed_bad_index_files = _partition_good_bad(indexFileData) 

1221 self.log.info( 

1222 "Successfully extracted metadata for %d file%s found in %d index file%s with %d failure%s", 

1223 *_log_msg_counter(indexFileData), 

1224 *_log_msg_counter(good_index_files), 

1225 *_log_msg_counter(processed_bad_index_files), 

1226 ) 

1227 

1228 # Extract metadata and build per-detector regions. 

1229 # This could run in threads or a subprocess so collect all output 

1230 # before looking at failures. 

1231 fileData: Iterator[RawFileData] 

1232 if pool is None: 

1233 fileData = map(self.extractMetadata, files) 

1234 else: 

1235 fileData = pool.map(self.extractMetadata, files) 

1236 

1237 # Filter out all the failed reads and store them for later 

1238 # reporting. 

1239 good_file_data, bad_files = _partition_good_bad(fileData) 

1240 # Only report if we looked at any standalone files at all. 

1241 if files: 

1242 self.log.info( 

1243 "Successfully extracted metadata from %d file%s with %d failure%s", 

1244 *_log_msg_counter(good_file_data), 

1245 *_log_msg_counter(bad_files), 

1246 ) 

1247 

1248 # Combine with data from index files. 

1249 good_file_data.extend(indexFileData) 

1250 bad_files.extend(processed_bad_index_files) 

1251 bad_files.extend(bad_index_files) 

1252 

1253 # Use that metadata to group files (and extracted metadata) by 

1254 # exposure. Never parallelized because it's intrinsically a gather 

1255 # step. 

1256 exposureData: list[RawExposureData] = self.groupByExposure(good_file_data) 

1257 

1258 # The next operation operates on RawExposureData instances (one at 

1259 # a time) in-place and then returns the modified instance. We call it 

1260 # as a pass-through instead of relying on the arguments we pass in to 

1261 # have been modified because in the parallel case those arguments are 

1262 # going to be pickled and unpickled, and I'm not certain 

1263 # multiprocessing is careful enough with that for output arguments to 

1264 # work. 

1265 

1266 # Expand the data IDs to include all dimension metadata; we need this 

1267 # because we may need to generate path templates that rely on that 

1268 # metadata. 

1269 # This is the first step that involves actual database calls (but just 

1270 # SELECTs), so if there's going to be a problem with connections vs. 

1271 # multiple processes, or lock contention (in SQLite) slowing things 

1272 # down, it'll happen here. 

1273 return map(self.expandDataIds, exposureData), bad_files 

1274 

1275 def ingestExposureDatasets( 

1276 self, 

1277 exposure: RawExposureData, 

1278 datasetType: DatasetType, 

1279 *, 

1280 run: str, 

1281 skip_existing_exposures: bool = False, 

1282 track_file_attrs: bool = True, 

1283 ) -> list[FileDataset]: 

1284 """Ingest all raw files in one exposure. 

1285 

1286 Parameters 

1287 ---------- 

1288 exposure : `RawExposureData` 

1289 A structure containing information about the exposure to be 

1290 ingested. Must have `RawExposureData.record` populated and all 

1291 data ID attributes expanded. 

1292 datasetType : `lsst.daf.butler.DatasetType` 

1293 The dataset type associated with this exposure. 

1294 run : `str` 

1295 Name of a RUN-type collection to write to. 

1296 skip_existing_exposures : `bool`, optional 

1297 If `True` (`False` is default), skip raws that have already been 

1298 ingested (i.e. raws for which we already have a dataset with the 

1299 same data ID in the target collection, even if from another file). 

1300 Note that this is much slower than just not passing 

1301 already-ingested files as inputs, because we still need to read and 

1302 process metadata to identify which exposures to search for. 

1303 track_file_attrs : `bool`, optional 

1304 Control whether file attributes such as the size or checksum should 

1305 be tracked by the datastore. Whether this parameter is honored 

1306 depends on the specific datastore implementation. 

1307 

1308 Returns 

1309 ------- 

1310 datasets : `list` of `lsst.daf.butler.FileDataset` 

1311 Per-file structures identifying the files ingested and their 

1312 dataset representation in the data repository. 

1313 """ 

1314 # Raw files are preferentially ingested using a UUID derived from 

1315 # the collection name and dataId. 

1316 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN): 

1317 mode = DatasetIdGenEnum.DATAID_TYPE_RUN 

1318 else: 

1319 mode = DatasetIdGenEnum.UNIQUE 

1320 

1321 # The datasets for this exposure could all be from a single zip 

1322 # or be distinct files. Need to pull out the zip files. 

1323 zips: dict[ResourcePath, list[RawFileData]] = defaultdict(list) 

1324 datasets: list[FileDataset] = [] 

1325 for file in exposure.files: 

1326 if file.filename.getExtension() == ".zip": 

1327 zips[file.filename.replace(fragment="")].append(file) 

1328 continue 

1329 

1330 refs = [ 

1331 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1332 ] 

1333 if refs: 

1334 datasets.append( 

1335 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1336 ) 

1337 

1338 if datasets: 

1339 with self.butler.record_metrics() as butler_metrics: 

1340 self.butler.ingest( 

1341 *datasets, 

1342 transfer=self.config.transfer, 

1343 record_validation_info=track_file_attrs, 

1344 skip_existing=skip_existing_exposures, 

1345 ) 

1346 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1347 

1348 # In theory it is possible for the new Data IDs to differ from the Data 

1349 # IDs stored in the Zip index. That could happen if there is a metadata 

1350 # correction that changes the exposure or detector numbers. We have to 

1351 # assume that by the time the zip has been made that this correction 

1352 # has been applied. If we don't assume that then we have to 

1353 # regenerate the index but we cannot change the contents of the zip. 

1354 # We would also need the ability for butler.ingest_zip to take an 

1355 # override ZipIndex object. 

1356 # The Dataset ref IDs will only change if the data IDs change. 

1357 for zip, files in zips.items(): 

1358 zip_datasets: list[FileDataset] = [] # Needed for return value. 

1359 for file in files: 

1360 refs = [ 

1361 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1362 ] 

1363 if refs: 

1364 # Assumes the guiders are not included in the metadata 

1365 # index. 

1366 zip_datasets.append( 

1367 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1368 ) 

1369 with self.butler.record_metrics() as butler_metrics: 

1370 self.butler.ingest_zip( 

1371 zip, transfer=self.config.transfer, skip_existing=skip_existing_exposures 

1372 ) 

1373 datasets.extend(zip_datasets) 

1374 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1375 

1376 return datasets 

1377 

1378 def ingestFiles( 

1379 self, 

1380 files: Sequence[ResourcePath], 

1381 *, 

1382 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1383 num_workers: int = 1, 

1384 run: str | None = None, 

1385 skip_existing_exposures: bool = False, 

1386 update_exposure_records: bool = False, 

1387 track_file_attrs: bool = True, 

1388 search_indexes: bool = True, 

1389 skip_ingest: bool = False, 

1390 ) -> tuple[list[DatasetRef], list[ResourcePath], int, int, int]: 

1391 """Ingest files into a Butler data repository. 

1392 

1393 This creates any new exposure or visit Dimension entries needed to 

1394 identify the ingested files, creates new Dataset entries in the 

1395 Registry and finally ingests the files themselves into the Datastore. 

1396 Any needed instrument, detector, and physical_filter Dimension entries 

1397 must exist in the Registry before `run` is called. 

1398 

1399 Parameters 

1400 ---------- 

1401 files : iterable over `lsst.resources.ResourcePath` 

1402 URIs to the files to be ingested. 

1403 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1404 If not `None`, a thread pool with which to parallelize some 

1405 operations. 

1406 num_workers : `int`, optional 

1407 The number of workers to use. Ignored if ``pool`` is not `None`. 

1408 run : `str`, optional 

1409 Name of a RUN-type collection to write to, overriding 

1410 the default derived from the instrument name. 

1411 skip_existing_exposures : `bool`, optional 

1412 If `True` (`False` is default), skip raws that have already been 

1413 ingested (i.e. raws for which we already have a dataset with the 

1414 same data ID in the target collection, even if from another file). 

1415 Note that this is much slower than just not passing 

1416 already-ingested files as inputs, because we still need to read and 

1417 process metadata to identify which exposures to search for. It 

1418 also will not work reliably if multiple processes are attempting to 

1419 ingest raws from the same exposure concurrently, in that different 

1420 processes may still attempt to ingest the same raw and conflict, 

1421 causing a failure that prevents other raws from the same exposure 

1422 from being ingested. 

1423 update_exposure_records : `bool`, optional 

1424 If `True` (`False` is default), update existing exposure records 

1425 that conflict with the new ones instead of rejecting them. THIS IS 

1426 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1427 KNOWN TO BE BAD. This should usually be combined with 

1428 ``skip_existing_exposures=True``. 

1429 track_file_attrs : `bool`, optional 

1430 Control whether file attributes such as the size or checksum should 

1431 be tracked by the datastore. Whether this parameter is honored 

1432 depends on the specific datastore implementation. 

1433 search_indexes : `bool`, optional 

1434 If `True` the code will search for index JSON files in given 

1435 directories. If you know for a fact that index files do not exist 

1436 set this to `False` for a slight speed up in metadata gathering. 

1437 skip_ingest : `bool`, optional 

1438 Set this to `True` to do metadata extraction and dimension record 

1439 updates without attempting to re-ingest. This can be useful if 

1440 there has been a metadata correction associated with an exposure. 

1441 

1442 Returns 

1443 ------- 

1444 refs : `list` of `lsst.daf.butler.DatasetRef` 

1445 Dataset references for ingested raws. 

1446 bad_files : `list` of `lsst.resources.ResourcePath` 

1447 Given paths that could not be ingested. 

1448 n_exposures : `int` 

1449 Number of exposures successfully ingested. 

1450 n_exposures_failed : `int` 

1451 Number of exposures that failed when inserting dimension data. 

1452 n_ingests_failed : `int` 

1453 Number of exposures that failed when ingesting raw datasets. 

1454 """ 

1455 created_pool = False 

1456 if pool is None and num_workers > 1: 

1457 pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) 

1458 created_pool = True 

1459 

1460 try: 

1461 with self.metrics.collect_metric( 

1462 "time_for_metadata", 

1463 self.log, 

1464 msg="Reading metadata from %d file%s", 

1465 args=(*_log_msg_counter(files),), 

1466 ): 

1467 exposureData, bad_files = self.prep(files, pool=pool, search_indexes=search_indexes) 

1468 finally: 

1469 if created_pool and pool: 

1470 # The pool is not needed any more so close it if we created 

1471 # it to ensure we clean up resources. 

1472 pool.shutdown(wait=True) 

1473 

1474 # Up to this point, we haven't modified the data repository at all. 

1475 # Now we finally do that, with one transaction per exposure. This is 

1476 # not parallelized at present because the performance of this step is 

1477 # limited by the database server. That may or may not change in the 

1478 # future once we increase our usage of bulk inserts and reduce our 

1479 # usage of savepoints; we've tried to get everything but the database 

1480 # operations done in advance to reduce the time spent inside 

1481 # transactions. 

1482 refs = [] 

1483 runs = set() 

1484 datasetTypes: dict[str, DatasetType] = {} 

1485 n_exposures = 0 

1486 n_exposures_failed = 0 

1487 n_ingests_failed = 0 

1488 for exposure in self.progress.wrap(exposureData, desc="Ingesting raw exposures"): 

1489 assert exposure.record is not None, "Should be guaranteed by prep()" 

1490 self.log.debug( 

1491 "Attempting to ingest %d file%s from exposure %s:%s", 

1492 *_log_msg_counter(exposure.files), 

1493 exposure.record.instrument, 

1494 exposure.record.obs_id, 

1495 ) 

1496 

1497 try: 

1498 with self.metrics.collect_metric( 

1499 "time_for_records", 

1500 self.log, 

1501 msg="Creating dimension records for instrument %s, exposure %s", 

1502 args=( 

1503 str(exposure.record.instrument), 

1504 str(exposure.record.id), 

1505 ), 

1506 ): 

1507 for name, record in exposure.dependencyRecords.items(): 

1508 self.butler.registry.syncDimensionData(name, record, update=update_exposure_records) 

1509 inserted_or_updated = self.butler.registry.syncDimensionData( 

1510 "exposure", 

1511 exposure.record, 

1512 update=update_exposure_records, 

1513 ) 

1514 if inserted_or_updated is not False: 

1515 with self.metrics.collect_metric( 

1516 "time_for_callbacks", log=self.log, msg="Exposure record updated. Calling handler" 

1517 ): 

1518 self._on_exposure_record(exposure.record) 

1519 except Exception as e: 

1520 self._on_ingest_failure(exposure, e) 

1521 n_exposures_failed += 1 

1522 self.log.warning( 

1523 "Exposure %s:%s could not be registered: %s", 

1524 exposure.record.instrument, 

1525 exposure.record.obs_id, 

1526 e, 

1527 ) 

1528 if self.config.failFast: 

1529 raise e 

1530 continue 

1531 

1532 if isinstance(inserted_or_updated, dict): 

1533 # Exposure is in the registry and we updated it, so 

1534 # syncDimensionData returned a dict. 

1535 columns_updated = list(inserted_or_updated.keys()) 

1536 s_col = "s" if len(columns_updated) != 1 else "" 

1537 w_col = "were" if len(columns_updated) != 1 else "was" 

1538 self.log.info( 

1539 "Exposure %s:%s was already present, but column%s %s %s updated.", 

1540 exposure.record.instrument, 

1541 exposure.record.obs_id, 

1542 s_col, 

1543 ", ".join(repr(c) for c in columns_updated), 

1544 w_col, 

1545 ) 

1546 

1547 if skip_ingest: 

1548 continue 

1549 

1550 # Determine the instrument so we can work out the dataset type. 

1551 instrument = exposure.files[0].instrument 

1552 assert instrument is not None, ( 

1553 "file should have been removed from this list by prep if instrument could not be found" 

1554 ) 

1555 

1556 datasetType = self.get_raw_datasetType(instrument, datasetTypes) 

1557 if datasetType.name not in datasetTypes: 

1558 self.butler.registry.registerDatasetType(datasetType) 

1559 datasetTypes[datasetType.name] = datasetType 

1560 

1561 # Override default run if nothing specified explicitly. 

1562 if run is None: 

1563 this_run = instrument.makeDefaultRawIngestRunName() 

1564 else: 

1565 this_run = run 

1566 if this_run not in runs: 

1567 self.butler.registry.registerCollection(this_run, type=CollectionType.RUN) 

1568 runs.add(this_run) 

1569 try: 

1570 datasets_for_exposure = self.ingestExposureDatasets( 

1571 exposure, 

1572 datasetType=datasetType, 

1573 run=this_run, 

1574 skip_existing_exposures=skip_existing_exposures, 

1575 track_file_attrs=track_file_attrs, 

1576 ) 

1577 except Exception as e: 

1578 self._on_ingest_failure(exposure, e) 

1579 n_ingests_failed += 1 

1580 self.log.warning("Failed to ingest the following for reason: %s", e) 

1581 for f in exposure.files: 

1582 self.log.warning("- %s", f.filename) 

1583 if self.config.failFast: 

1584 raise e 

1585 continue 

1586 else: 

1587 with self.metrics.collect_metric("time_for_callbacks", self.log, msg="Calling on_success"): 

1588 self._on_success(datasets_for_exposure) 

1589 for dataset in datasets_for_exposure: 

1590 refs.extend(dataset.refs) 

1591 

1592 # Success for this exposure. 

1593 n_exposures += 1 

1594 self.log.info( 

1595 "Exposure %s:%s ingested successfully", exposure.record.instrument, exposure.record.obs_id 

1596 ) 

1597 

1598 return refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed 

1599 

1600 @timeMethod 

1601 def run( 

1602 self, 

1603 files: Iterable[ResourcePathExpression], 

1604 *, 

1605 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1606 processes: int | None = None, # Deprecated. Use num_workers. 

1607 run: str | None = None, 

1608 file_filter: str | re.Pattern = r"\.fit[s]?\b", 

1609 group_files: bool = True, 

1610 skip_existing_exposures: bool = False, 

1611 update_exposure_records: bool = False, 

1612 track_file_attrs: bool = True, 

1613 search_indexes: bool = True, 

1614 num_workers: int = 1, 

1615 skip_ingest: bool = False, 

1616 ) -> list[DatasetRef]: 

1617 """Ingest files into a Butler data repository. 

1618 

1619 This creates any new exposure or visit Dimension entries needed to 

1620 identify the ingested files, creates new Dataset entries in the 

1621 Registry and finally ingests the files themselves into the Datastore. 

1622 Any needed instrument, detector, and physical_filter Dimension entries 

1623 must exist in the Registry before `run` is called. 

1624 

1625 Parameters 

1626 ---------- 

1627 files : iterable `lsst.resources.ResourcePath`, `str` or path-like 

1628 Paths to the files to be ingested. Can refer to directories. 

1629 Will be made absolute if they are not already. 

1630 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1631 If not `None`, a process pool with which to parallelize some 

1632 operations. This parameter was previously a `multiprocessing.Pool` 

1633 but that option is no longer supported since it is slow compared 

1634 to futures. 

1635 processes : `int`, optional 

1636 The number of processes to use. Ignored if ``pool`` is not `None`. 

1637 Deprecated. Please use ``num_workers`` parameter instead. 

1638 run : `str`, optional 

1639 Name of a RUN-type collection to write to, overriding 

1640 the default derived from the instrument name. 

1641 file_filter : `str` or `re.Pattern`, optional 

1642 Pattern to use to discover files to ingest within directories. 

1643 The default is to search for FITS files. The regex applies to 

1644 files within the directory. 

1645 group_files : `bool`, optional 

1646 Group files by directory if they have been discovered in 

1647 directories. Will not affect files explicitly provided. 

1648 skip_existing_exposures : `bool`, optional 

1649 If `True` (`False` is default), skip raws that have already been 

1650 ingested (i.e. raws for which we already have a dataset with the 

1651 same data ID in the target collection, even if from another file). 

1652 Note that this is much slower than just not passing 

1653 already-ingested files as inputs, because we still need to read and 

1654 process metadata to identify which exposures to search for. It 

1655 also will not work reliably if multiple processes are attempting to 

1656 ingest raws from the same exposure concurrently, in that different 

1657 processes may still attempt to ingest the same raw and conflict, 

1658 causing a failure that prevents other raws from the same exposure 

1659 from being ingested. 

1660 update_exposure_records : `bool`, optional 

1661 If `True` (`False` is default), update existing exposure records 

1662 that conflict with the new ones instead of rejecting them. THIS IS 

1663 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1664 KNOWN TO BE BAD. This should usually be combined with 

1665 ``skip_existing_exposures=True``. 

1666 track_file_attrs : `bool`, optional 

1667 Control whether file attributes such as the size or checksum should 

1668 be tracked by the datastore. Whether this parameter is honored 

1669 depends on the specific datastore implementation. 

1670 search_indexes : `bool`, optional 

1671 If `True` the code will search for index JSON files in given 

1672 directories. If you know for a fact that index files do not exist 

1673 set this to `False` for a slight speed up in metadata gathering. 

1674 num_workers : `int`, optional 

1675 The number of workers to use. Ignored if ``pool`` parameter is 

1676 given. 

1677 skip_ingest : `bool`, optional 

1678 Set this to `True` to do metadata extraction and dimension record 

1679 updates without attempting to re-ingest. This can be useful if 

1680 there has been a metadata correction associated with an exposure. 

1681 

1682 Returns 

1683 ------- 

1684 refs : `list` of `lsst.daf.butler.DatasetRef` 

1685 Dataset references for ingested raws. 

1686 

1687 Notes 

1688 ----- 

1689 This method inserts all datasets for an exposure within a transaction, 

1690 guaranteeing that partial exposures are never ingested. The exposure 

1691 dimension record is inserted with 

1692 `lsst.daf.butler.Registry.syncDimensionData` first (in its own 

1693 transaction), which inserts only if a record with the same 

1694 primary key does not already exist. This allows different files within 

1695 the same exposure to be ingested in different runs. 

1696 """ 

1697 if pool and not isinstance(pool, concurrent.futures.ThreadPoolExecutor): 

1698 raise ValueError(f"This parameter must now be a ThreadPoolExecutor but was given {pool}.") 

1699 

1700 if processes is not None: 

1701 warnings.warn( 

1702 "Processes parameter is deprecated. Please use num_workers parameter.", 

1703 FutureWarning, 

1704 stacklevel=3, # Jump above the timeMethod wrapper. 

1705 ) 

1706 num_workers = processes 

1707 

1708 refs = [] 

1709 bad_files = [] 

1710 n_exposures = 0 

1711 n_exposures_failed = 0 

1712 n_ingests_failed = 0 

1713 self.metrics.reset() # Clear previous metrics. 

1714 ingest_duration = 0.0 

1715 if group_files: 

1716 with time_this(log=self.log, msg="Processing ingest groups") as timer: 

1717 for group in ResourcePath.findFileResources(files, file_filter, group_files): 

1718 new_refs, bad, n_exp, n_exp_fail, n_ingest_fail = self.ingestFiles( 

1719 tuple(group), 

1720 pool=pool, 

1721 num_workers=num_workers, 

1722 run=run, 

1723 skip_existing_exposures=skip_existing_exposures, 

1724 update_exposure_records=update_exposure_records, 

1725 track_file_attrs=track_file_attrs, 

1726 search_indexes=search_indexes, 

1727 skip_ingest=skip_ingest, 

1728 ) 

1729 refs.extend(new_refs) 

1730 bad_files.extend(bad) 

1731 n_exposures += n_exp 

1732 n_exposures_failed += n_exp_fail 

1733 n_ingests_failed += n_ingest_fail 

1734 ingest_duration = timer.duration 

1735 else: 

1736 with time_this(log=self.log, msg="Ingesting all files in one batch") as timer: 

1737 refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed = self.ingestFiles( 

1738 tuple(ResourcePath.findFileResources(files, file_filter, group_files)), 

1739 pool=pool, 

1740 num_workers=num_workers, 

1741 run=run, 

1742 skip_existing_exposures=skip_existing_exposures, 

1743 update_exposure_records=update_exposure_records, 

1744 track_file_attrs=track_file_attrs, 

1745 search_indexes=search_indexes, 

1746 skip_ingest=skip_ingest, 

1747 ) 

1748 ingest_duration = timer.duration 

1749 

1750 had_failure = False 

1751 

1752 if bad_files: 

1753 had_failure = True 

1754 self.log.warning("Could not extract observation metadata from the following:") 

1755 for f in bad_files: 

1756 self.log.warning("- %s", f) 

1757 

1758 if skip_ingest: 

1759 ingest_text = "" 

1760 else: 

1761 ingest_text = f" - time in butler ingest: {self.metrics.time_for_ingest} s\n" 

1762 

1763 self.log.info( 

1764 "Successfully processed data from %d exposure%s with %d failure%s from exposure" 

1765 " registration and %d failure%s from file ingest.\n" 

1766 "Timing breakdown:\n" 

1767 " - time in metadata gathering: %f s\n" 

1768 " - time in dimension record writing: %f s\n" 

1769 "%s" 

1770 " - time in user-supplied callbacks: %f s\n", 

1771 *_log_msg_counter(n_exposures), 

1772 *_log_msg_counter(n_exposures_failed), 

1773 *_log_msg_counter(n_ingests_failed), 

1774 self.metrics.time_for_metadata, 

1775 self.metrics.time_for_records, 

1776 ingest_text, 

1777 self.metrics.time_for_callbacks, 

1778 ) 

1779 if n_exposures_failed > 0 or n_ingests_failed > 0: 

1780 had_failure = True 

1781 if not skip_ingest: 

1782 self.log.info( 

1783 "Ingested %d distinct Butler dataset%s in %f sec", *_log_msg_counter(refs), ingest_duration 

1784 ) 

1785 

1786 if had_failure: 

1787 raise RuntimeError("Some failures encountered during ingestion") 

1788 

1789 return refs