Coverage for python / lsst / obs / base / ingest.py: 14%

516 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:46 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "RawExposureData", 

26 "RawFileData", 

27 "RawFileDatasetInfo", 

28 "RawIngestConfig", 

29 "RawIngestTask", 

30 "makeTransferChoiceField", 

31) 

32 

33import concurrent.futures 

34import contextlib 

35import json 

36import logging 

37import re 

38import warnings 

39import zipfile 

40from collections import defaultdict 

41from collections.abc import Callable, Iterable, Iterator, MutableMapping, Sequence, Sized 

42from contextlib import contextmanager 

43from dataclasses import InitVar, dataclass 

44from typing import Any, ClassVar, cast 

45 

46from astro_metadata_translator import MetadataTranslator, ObservationInfo, merge_headers 

47from astro_metadata_translator.indexing import process_index_data, process_sidecar_data 

48from pydantic import BaseModel 

49 

50from lsst.afw.fits import readMetadata 

51from lsst.daf.butler import ( 

52 Butler, 

53 CollectionType, 

54 DataCoordinate, 

55 DatasetIdGenEnum, 

56 DatasetRef, 

57 DatasetType, 

58 DimensionRecord, 

59 DimensionUniverse, 

60 FileDataset, 

61 Formatter, 

62 FormatterV2, 

63 Progress, 

64 Timespan, 

65) 

66from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

67from lsst.pex.config import ChoiceField, Config, Field 

68from lsst.pipe.base import Instrument, Task 

69from lsst.resources import ResourcePath, ResourcePathExpression 

70from lsst.utils.logging import LsstLoggers 

71from lsst.utils.timer import time_this, timeMethod 

72 

73from ._instrument import makeExposureRecordFromObsInfo 

74 

75 

76def _do_nothing(*args: Any, **kwargs: Any) -> None: 

77 """Do nothing. 

78 

79 This is a function that accepts anything and does nothing. 

80 For use as a default in callback arguments. 

81 """ 

82 pass 

83 

84 

85def _log_msg_counter(noun: int | Sized) -> tuple[int, str]: 

86 """Count the iterable and return the count and plural modifier. 

87 

88 Parameters 

89 ---------- 

90 noun : `Sized` or `int` 

91 Thing to count. If given an integer it is assumed to be the count 

92 to use to calculate modifier. 

93 

94 Returns 

95 ------- 

96 num : `int` 

97 Number of items found in ``noun``. 

98 modifier : `str` 

99 Character to add to the end of a string referring to these items 

100 to indicate whether it was a single item or not. Returns empty 

101 string if there is one item or "s" otherwise. 

102 

103 Examples 

104 -------- 

105 .. code-block:: python 

106 

107 log.warning("Found %d file%s", *_log_msg_counter(nfiles)) 

108 """ 

109 if isinstance(noun, int): 

110 num = noun 

111 else: 

112 num = len(noun) 

113 return num, "" if num == 1 else "s" 

114 

115 

116class IngestMetrics(BaseModel): 

117 """Metrics collected during raw ingest.""" 

118 

119 time_for_metadata: float = 0.0 

120 """Wall-clock time, in seconds, spent gathering file metadata.""" 

121 

122 time_for_records: float = 0.0 

123 """Wall-clock time, in seconds, spent writing dimension records.""" 

124 

125 time_for_ingest: float = 0.0 

126 """Wall-clock time, in seconds, spent calling butler ingest.""" 

127 

128 time_for_callbacks: float = 0.0 

129 """Wall-clock time, in seconds, processing user-supplied callbacks.""" 

130 

131 def reset(self) -> None: 

132 """Reset all metrics to initial values.""" 

133 self.time_for_ingest = 0.0 

134 self.time_for_records = 0.0 

135 self.time_for_metadata = 0.0 

136 self.time_for_callbacks = 0.0 

137 

138 @contextmanager 

139 def collect_metric( 

140 self, 

141 property: str, 

142 log: LsstLoggers | None = None, 

143 msg: str | None = None, 

144 args: tuple[Any, ...] = (), 

145 ) -> Iterator[None]: 

146 with time_this(log=log, msg=msg, args=args, level=logging.INFO) as timer: 

147 yield 

148 setattr(self, property, getattr(self, property) + timer.duration) 

149 

150 

151@dataclass 

152class RawFileDatasetInfo: 

153 """Information about a single dataset within a raw file.""" 

154 

155 dataId: DataCoordinate 

156 """Data ID for this file (`lsst.daf.butler.DataCoordinate`).""" 

157 

158 obsInfo: ObservationInfo 

159 """Standardized observation metadata extracted directly from the file 

160 headers (`astro_metadata_translator.ObservationInfo`). 

161 """ 

162 

163 

164@dataclass 

165class RawFileData: 

166 """Information about a single raw file, used during ingest.""" 

167 

168 datasets: list[RawFileDatasetInfo] 

169 """The information describing each dataset within this raw file. 

170 (`list` of `RawFileDatasetInfo`) 

171 """ 

172 

173 filename: ResourcePath 

174 """URI of the file this information was extracted from (`str`). 

175 

176 This is the path prior to ingest, not the path after ingest. 

177 """ 

178 

179 FormatterClass: type[Formatter] 

180 """Formatter class that should be used to ingest this file (`type`; as 

181 subclass of `~lsst.daf.butler.Formatter`). 

182 """ 

183 

184 instrument: Instrument | None 

185 """The `Instrument` instance associated with this file. Can be `None` 

186 if ``datasets`` is an empty list.""" 

187 

188 

189@dataclass 

190class RawExposureData: 

191 """Information about a complete raw exposure, used during ingest.""" 

192 

193 dataId: DataCoordinate 

194 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`). 

195 """ 

196 

197 files: list[RawFileData] 

198 """List of structures containing file-level information. 

199 """ 

200 

201 universe: InitVar[DimensionUniverse] 

202 """Set of all known dimensions. 

203 """ 

204 

205 record: DimensionRecord 

206 """The exposure `~lsst.daf.butler.DimensionRecord` that must be inserted 

207 into the `~lsst.daf.butler.Registry` prior to file-level ingest 

208 (`~lsst.daf.butler.DimensionRecord`). 

209 """ 

210 

211 dependencyRecords: dict[str, DimensionRecord] 

212 """Additional records that must be inserted into the 

213 `~lsst.daf.butler.Registry` prior to ingesting the exposure ``record`` 

214 (e.g., to satisfy foreign key constraints), indexed by the dimension name. 

215 """ 

216 

217 

218def makeTransferChoiceField( 

219 doc: str = "How to transfer files (None for no transfer).", default: str = "auto" 

220) -> ChoiceField: 

221 """Create a Config field with options for transferring data between repos. 

222 

223 The allowed options for the field are exactly those supported by 

224 `lsst.daf.butler.Datastore.ingest`. 

225 

226 Parameters 

227 ---------- 

228 doc : `str` 

229 Documentation for the configuration field. 

230 default : `str`, optional 

231 Default transfer mode for the field. 

232 

233 Returns 

234 ------- 

235 field : `lsst.pex.config.ChoiceField` 

236 Configuration field. 

237 """ 

238 return ChoiceField( 

239 doc=doc, 

240 dtype=str, 

241 allowed={ 

242 "move": "move", 

243 "copy": "copy", 

244 "auto": "choice will depend on datastore", 

245 "direct": "use URI to ingested file directly in datastore", 

246 "link": "hard link falling back to symbolic link", 

247 "hardlink": "hard link", 

248 "symlink": "symbolic (soft) link", 

249 "relsymlink": "relative symbolic link", 

250 }, 

251 optional=True, 

252 default=default, 

253 ) 

254 

255 

256class RawIngestConfig(Config): 

257 """Configuration class for RawIngestTask.""" 

258 

259 transfer = makeTransferChoiceField() 

260 failFast: Field[bool] = Field( 

261 dtype=bool, 

262 default=False, 

263 doc="If True, stop ingest as soon as any problem is encountered with any file. " 

264 "Otherwise problem files will be skipped and logged and a report issued at completion.", 

265 ) 

266 

267 

268class RawIngestTask(Task): 

269 """Driver Task for ingesting raw data into Gen3 Butler repositories. 

270 

271 Parameters 

272 ---------- 

273 config : `RawIngestConfig` 

274 Configuration for the task. 

275 butler : `~lsst.daf.butler.Butler` 

276 Writeable butler instance, with ``butler.run`` set to the appropriate 

277 `~lsst.daf.butler.CollectionType.RUN` collection for these raw 

278 datasets. 

279 on_success : `collections.abc.Callable`, optional 

280 A callback invoked when all of the raws associated with an exposure 

281 are ingested. Will be passed a list of `~lsst.daf.butler.FileDataset` 

282 objects, each containing one or more resolved 

283 `~lsst.daf.butler.DatasetRef` objects. If this callback raises it will 

284 interrupt the entire ingest process, even if `RawIngestConfig.failFast` 

285 is `False`. 

286 on_metadata_failure : `collections.abc.Callable`, optional 

287 A callback invoked when a failure occurs trying to translate the 

288 metadata for a file. Will be passed the URI and the exception, in 

289 that order, as positional arguments. Guaranteed to be called in an 

290 ``except`` block, allowing the callback to re-raise or replace (with 

291 ``raise ... from``) to override the task's usual error handling (before 

292 `RawIngestConfig.failFast` logic occurs). This callback can be called 

293 from within a worker thread if multiple workers have been requested. 

294 Ensure that any code within the call back is thread-safe. 

295 on_ingest_failure : `collections.abc.Callable`, optional 

296 A callback invoked when dimension record or dataset insertion into the 

297 database fails for an exposure. Will be passed a `RawExposureData` 

298 instance and the exception, in that order, as positional arguments. 

299 Guaranteed to be called in an ``except`` block, allowing the callback 

300 to re-raise or replace (with ``raise ... from``) to override the task's 

301 usual error handling (before `RawIngestConfig.failFast` logic occurs). 

302 on_exposure_record : `collections.abc.Callable`, optional 

303 A callback invoked when an exposure dimension record has been created 

304 or modified. Will not be called if the record already existed. Will 

305 be called with the exposure record. 

306 **kwargs 

307 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task` 

308 constructor. 

309 

310 Notes 

311 ----- 

312 Each instance of `RawIngestTask` writes to the same Butler. Each 

313 invocation of `RawIngestTask.run` ingests a list of files. 

314 """ 

315 

316 ConfigClass: ClassVar[type[Config]] = RawIngestConfig 

317 

318 _DefaultName: ClassVar[str] = "ingest" 

319 

320 def getDatasetType(self) -> DatasetType: 

321 """Return the default DatasetType of the datasets ingested by this 

322 Task. 

323 

324 Returns 

325 ------- 

326 datasetType : `lsst.daf.butler.DatasetType` 

327 The default dataset type to use for the data being ingested. This 

328 is only used if the relevant `~lsst.pipe.base.Instrument` does not 

329 define an override. 

330 """ 

331 return DatasetType( 

332 "raw", 

333 ("instrument", "detector", "exposure"), 

334 "Exposure", 

335 universe=self.butler.dimensions, 

336 ) 

337 

338 # Mypy can not determine that the config passed to super() is this type. 

339 config: RawIngestConfig 

340 

341 def __init__( 

342 self, 

343 config: RawIngestConfig, 

344 *, 

345 butler: Butler, 

346 on_success: Callable[[list[FileDataset]], Any] = _do_nothing, 

347 on_metadata_failure: Callable[[ResourcePath, Exception], Any] = _do_nothing, 

348 on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing, 

349 on_exposure_record: Callable[[DimensionRecord], Any] = _do_nothing, 

350 **kwargs: Any, 

351 ): 

352 config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here. 

353 super().__init__(config, **kwargs) 

354 self.butler = butler 

355 self.universe = self.butler.dimensions 

356 self.datasetType = self.getDatasetType() 

357 self._on_success = on_success 

358 self._on_exposure_record = on_exposure_record 

359 self._on_metadata_failure = on_metadata_failure 

360 self._on_ingest_failure = on_ingest_failure 

361 self.progress = Progress("obs.base.RawIngestTask") 

362 

363 # Import all the instrument classes so that we ensure that we 

364 # have all the relevant metadata translators loaded. 

365 self.instruments = Instrument.importAll(self.butler.registry) 

366 

367 # Read all the instrument records into a cache since they will be 

368 # needed later to calculate day_obs timespans, if appropriate. 

369 self._instrument_records = { 

370 rec.name: rec for rec in butler.registry.queryDimensionRecords("instrument") 

371 } 

372 # Create empty metrics. 

373 self.metrics = IngestMetrics() 

374 

375 def _reduce_kwargs(self) -> dict[str, Any]: 

376 # Add extra parameters to pickle. 

377 return dict( 

378 **super()._reduce_kwargs(), 

379 butler=self.butler, 

380 on_success=self._on_success, 

381 on_metadata_failure=self._on_metadata_failure, 

382 on_ingest_failure=self._on_ingest_failure, 

383 on_exposure_record=self._on_exposure_record, 

384 ) 

385 

386 def _determine_instrument_formatter( 

387 self, dataId: DataCoordinate, filename: ResourcePath 

388 ) -> tuple[Instrument | None, type[Formatter | FormatterV2]]: 

389 """Determine the instrument and formatter class. 

390 

391 Parameters 

392 ---------- 

393 dataId : `lsst.daf.butler.DataCoordinate` 

394 The dataId associated with this dataset. 

395 filename : `lsst.resources.ResourcePath` 

396 URI of file used for error reporting. 

397 

398 Returns 

399 ------- 

400 instrument : `Instrument` or `None` 

401 Instance of the `Instrument` associated with this dataset. `None` 

402 indicates that the instrument could not be determined. 

403 formatterClass : `type` 

404 Class to be used as the formatter for this dataset. 

405 

406 Notes 

407 ----- 

408 Does not access butler registry since it may be called from threads. 

409 """ 

410 # The data model currently assumes that whilst multiple datasets 

411 # can be associated with a single file, they must all share the 

412 # same formatter. 

413 FormatterClass: type[Formatter | FormatterV2] = Formatter 

414 try: 

415 instrument_name = cast(str, dataId["instrument"]) 

416 instrument = self.instruments[instrument_name]() 

417 except LookupError as e: 

418 self._on_metadata_failure(filename, e) 

419 self.log.warning( 

420 "Instrument %s for file %s not known to registry", dataId["instrument"], filename 

421 ) 

422 if self.config.failFast: 

423 raise RuntimeError( 

424 f"Instrument {dataId['instrument']} for file {filename} not known to registry" 

425 ) from e 

426 # Indicate that we could not work out the instrument. 

427 instrument = None 

428 else: 

429 assert instrument is not None, "Should be guaranted by fromName succeeding." 

430 FormatterClass = instrument.getRawFormatter(dataId) 

431 return instrument, FormatterClass 

432 

433 def get_raw_datasetType( 

434 self, instrument: Instrument, cache: dict[str, DatasetType] | None = None 

435 ) -> DatasetType: 

436 """Get the raw dataset type associated with this ingest. 

437 

438 Parameters 

439 ---------- 

440 instrument : `Instrument` 

441 Class that might specify an override of the default raw dataset 

442 type. If no override is specified the task default will be used. 

443 cache : `dict` [`str`, `lsst.daf.butler.DatasetType`] \ 

444 or `None`, optional 

445 An optional cache that can be used to return a pre-existing 

446 dataset type. Is not updated. 

447 

448 Returns 

449 ------- 

450 lsst.daf.butler.DatasetType 

451 The dataset type to use for raw ingest of this instrument. 

452 """ 

453 if cache is None: 

454 cache = {} 

455 if raw_definition := getattr(instrument, "raw_definition", None): 

456 datasetTypeName, dimensions, storageClass = raw_definition 

457 if not (datasetType := cache.get(datasetTypeName)): 

458 datasetType = DatasetType( 

459 datasetTypeName, dimensions, storageClass, universe=self.butler.dimensions 

460 ) 

461 else: 

462 datasetType = self.datasetType 

463 return datasetType 

464 

465 def extractMetadata(self, filename: ResourcePath) -> RawFileData: 

466 """Extract and process metadata from a single raw file. 

467 

468 Parameters 

469 ---------- 

470 filename : `lsst.resources.ResourcePath` 

471 URI to the file. 

472 

473 Returns 

474 ------- 

475 data : `RawFileData` 

476 A structure containing the metadata extracted from the file, 

477 as well as the original filename. All fields will be populated, 

478 but the `RawFileDatasetInfo.dataId` attribute will be a minimal 

479 (unexpanded) `~lsst.daf.butler.DataCoordinate` instance. The 

480 ``instrument`` field will be `None` if there is a problem 

481 with metadata extraction. 

482 

483 Notes 

484 ----- 

485 Assumes that there is a single dataset associated with the given 

486 file. Instruments using a single file to store multiple datasets 

487 must implement their own version of this method. 

488 

489 By default the method will catch all exceptions unless the 

490 `RawIngestConfig.failFast` configuration item is `True`. If an error 

491 is encountered the supplied ``on_metadata_failure()`` 

492 method will be called. If no exceptions result and an error was 

493 encountered the returned object will have a null-instrument class and 

494 no datasets. 

495 

496 This method supports sidecar JSON files which can be used to 

497 extract metadata without having to read the data file itself. 

498 The sidecar file is always used if found. 

499 """ 

500 formatterClass: type[Formatter | FormatterV2] 

501 sidecar_fail_msg = "" # Requires prepended space when set. 

502 try: 

503 sidecar_file = filename.updatedExtension(".json") 

504 headers = [] 

505 with contextlib.suppress(Exception): 

506 # Try to read directly, bypassing existence check. 

507 content = json.loads(sidecar_file.read()) 

508 headers = [process_sidecar_data(content)] 

509 sidecar_fail_msg = " (via sidecar)" 

510 if not headers: 

511 # Read the metadata from the data file itself. 

512 

513 # For remote files download the entire file to get the 

514 # header. This is very inefficient and it would be better 

515 # to have some way of knowing where in the file the headers 

516 # are and to only download those parts of the file. 

517 with filename.as_local() as local_file: 

518 # Read the primary. This might be sufficient. 

519 header = readMetadata(local_file.ospath, 0) 

520 translator_class = None 

521 

522 try: 

523 # Try to work out a translator class early. 

524 translator_class = MetadataTranslator.determine_translator( 

525 header, filename=str(filename) 

526 ) 

527 except ValueError: 

528 # Primary header was not sufficient (maybe this file 

529 # has been compressed or is a MEF with minimal 

530 # primary). Read second header and merge with primary. 

531 header = merge_headers([header, readMetadata(local_file.ospath, 1)], mode="overwrite") 

532 

533 # Try again to work out a translator class, letting this 

534 # fail. 

535 if translator_class is None: 

536 translator_class = MetadataTranslator.determine_translator( 

537 header, filename=str(filename) 

538 ) 

539 

540 # Request the headers to use for ingest 

541 headers = list(translator_class.determine_translatable_headers(local_file.ospath, header)) 

542 

543 # Add each header to the dataset list 

544 datasets = [self._calculate_dataset_info(h, filename) for h in headers] 

545 

546 except Exception as e: 

547 self.log.debug("Problem extracting metadata from %s%s: %s", filename, sidecar_fail_msg, e) 

548 # Indicate to the caller that we failed to read. 

549 datasets = [] 

550 formatterClass = Formatter 

551 instrument = None 

552 self._on_metadata_failure(filename, e) 

553 if self.config.failFast: 

554 raise RuntimeError( 

555 f"Problem extracting metadata for file {filename}{sidecar_fail_msg}" 

556 ) from e 

557 else: 

558 self.log.debug("Extracted metadata for file %s%s", filename, sidecar_fail_msg) 

559 # The data model currently assumes that whilst multiple datasets 

560 # can be associated with a single file, they must all share the 

561 # same formatter. 

562 instrument, formatterClass = self._determine_instrument_formatter(datasets[0].dataId, filename) 

563 if instrument is None: 

564 datasets = [] 

565 

566 return RawFileData( 

567 datasets=datasets, 

568 filename=filename, 

569 # MyPy wants this to be a non-abstract class, which is not true 

570 # for the error case where instrument is None and datasets=[]. 

571 FormatterClass=formatterClass, # type: ignore 

572 instrument=instrument, 

573 ) 

574 

575 @classmethod 

576 def getObservationInfoSubsets(cls) -> tuple[set, set]: 

577 """Return subsets of fields in the 

578 `~astro_metadata_translator.ObservationInfo` that we care about. 

579 

580 These fields will be used in constructing an exposure record. 

581 

582 Returns 

583 ------- 

584 required : `set` 

585 Set of `~astro_metadata_translator.ObservationInfo` field names 

586 that are required. 

587 optional : `set` 

588 Set of `~astro_metadata_translator.ObservationInfo` field names 

589 we will use if they are available. 

590 """ 

591 # Marking the new properties "group_counter_*" and 

592 # "has_simulated_content" as required, assumes that we either 

593 # recreate any existing index/sidecar files that include translated 

594 # values, or else allow astro_metadata_translator to fill in 

595 # defaults. 

596 required = { 

597 "datetime_begin", 

598 "datetime_end", 

599 "detector_num", 

600 "exposure_group", 

601 "exposure_id", 

602 "exposure_time_requested", 

603 "group_counter_end", 

604 "group_counter_start", 

605 "has_simulated_content", 

606 "instrument", 

607 "observation_id", 

608 "observation_type", 

609 "observing_day", 

610 "physical_filter", 

611 } 

612 optional = { 

613 "altaz_begin", 

614 "boresight_rotation_coord", 

615 "boresight_rotation_angle", 

616 "dark_time", 

617 "tracking_radec", 

618 "object", 

619 "observation_counter", 

620 "observation_reason", 

621 "observing_day_offset", 

622 "science_program", 

623 "visit_id", 

624 "can_see_sky", 

625 } 

626 return required, optional 

627 

628 def _calculate_dataset_info( 

629 self, header: MutableMapping[str, Any] | ObservationInfo, filename: ResourcePath 

630 ) -> RawFileDatasetInfo: 

631 """Calculate a RawFileDatasetInfo from the supplied information. 

632 

633 Parameters 

634 ---------- 

635 header : Mapping or `astro_metadata_translator.ObservationInfo` 

636 Header from the dataset or previously-translated content. 

637 filename : `lsst.resources.ResourcePath` 

638 Filename to use for error messages. 

639 

640 Returns 

641 ------- 

642 dataset : `RawFileDatasetInfo` 

643 The dataId, and observation information associated with this 

644 dataset. 

645 """ 

646 required, optional = self.getObservationInfoSubsets() 

647 if isinstance(header, ObservationInfo): 

648 obsInfo = header 

649 missing = [] 

650 # Need to check the required properties are present. 

651 for property in required: 

652 # getattr does not need to be protected because it is using 

653 # the defined list above containing properties that must exist. 

654 value = getattr(obsInfo, property) 

655 if value is None: 

656 missing.append(property) 

657 if missing: 

658 raise ValueError( 

659 f"Requested required properties are missing from file {filename}: {missing} (via JSON)" 

660 ) 

661 

662 else: 

663 obsInfo = ObservationInfo( 

664 header, 

665 pedantic=False, 

666 filename=str(filename), 

667 required=required, 

668 subset=required | optional, 

669 ) 

670 

671 dataId = DataCoordinate.standardize( 

672 instrument=obsInfo.instrument, 

673 exposure=obsInfo.exposure_id, 

674 detector=obsInfo.detector_num, 

675 universe=self.universe, 

676 ) 

677 return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId) 

678 

679 def readZipIndexFiles( 

680 self, files: Iterable[ResourcePath] 

681 ) -> tuple[list[RawFileData], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

682 """Given a list of files, filter out zip files and look for index files 

683 inside. 

684 

685 Parameters 

686 ---------- 

687 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

688 URIs to the files to be ingested. 

689 

690 Returns 

691 ------- 

692 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

693 Merged contents of all relevant index files found in zip files. 

694 The keys include the path to the data file within the zip using 

695 the butler fragment convention of ``zip-path=PATH``. 

696 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

697 Updated list of the input files with zip entries removed. 

698 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

699 Zip files that contained index information. 

700 bad_zip_files: `set` [ `lsst.resources.ResourcePath` ] 

701 Zip files that contained no index information. 

702 """ 

703 zip_metadata_index = "_metadata_index.json" 

704 

705 # Files that weren't zip files. 

706 updated_files: list[ResourcePath] = [] 

707 

708 # Index files we failed to read. 

709 bad_index_files: set[ResourcePath] = set() 

710 

711 # Any good index files that were found and used. 

712 good_index_files: set[ResourcePath] = set() 

713 

714 # Processed content from any zip files. 

715 indexFileData: list[RawFileData] = [] 

716 

717 for file in files: 

718 if file.getExtension() != ".zip": 

719 updated_files.append(file) 

720 continue 

721 

722 zip_info: dict[str, zipfile.ZipInfo] = {} 

723 try: 

724 with file.open("rb") as fd, zipfile.ZipFile(fd) as zf: 

725 zip_info = {info.filename: info for info in zf.infolist()} 

726 content = json.loads(zf.read(zip_metadata_index)) 

727 index = process_index_data(content, force_dict=True) 

728 assert isinstance(index, MutableMapping) 

729 

730 # Try to read the ZipIndex. 

731 zip_index = ZipIndex.from_open_zip(zf) 

732 except Exception as e: 

733 if self.config.failFast: 

734 raise RuntimeError(f"Problem reading index file from zip file at {file}") from e 

735 bad_index_files.add(file) 

736 continue 

737 self.log.debug("Extracted index metadata from zip file %s", str(file)) 

738 good_index_files.add(file) 

739 

740 # All the metadata read from this index file with keys of full 

741 # path. 

742 index_entries: dict[ResourcePath, Any] = {} 

743 

744 # In theory we could scan for JSON sidecar files associated with 

745 # any files not found in the metadata index, but that is not meant 

746 # to be possible. Guider data is another issue not handled by 

747 # this code. 

748 for path_in_zip in index: 

749 if path_in_zip not in zip_info: 

750 # Index entry exists but no file for it. 

751 self.log.info( 

752 "File %s is in zip index but not in zip file %s. Ignoring.", path_in_zip, file 

753 ) 

754 continue 

755 file_to_ingest = file.replace(fragment=f"zip-path={path_in_zip}") 

756 index_entries[file_to_ingest] = index[path_in_zip] 

757 

758 file_data = self.processIndexEntries(index_entries) 

759 

760 # Validate that the index entries we have read match the 

761 # values in the butler zip index. 

762 data_ids_from_index: dict[str, tuple[DataCoordinate, ...]] = {} 

763 for f in file_data: 

764 _, path_in_zip = f.filename.fragment.split("=") 

765 data_ids_from_index[path_in_zip] = tuple(d.dataId for d in f.datasets) 

766 

767 data_ids_from_butler_index: dict[str, tuple[DataCoordinate, ...]] = {} 

768 # Refs indexed by UUID. 

769 refs = zip_index.refs.to_refs(universe=self.universe) 

770 id_to_ref = {ref.id: ref for ref in refs} 

771 for path_in_zip, index_info in zip_index.artifact_map.items(): 

772 data_ids_from_butler_index[path_in_zip] = tuple( 

773 id_to_ref[id_].dataId for id_ in index_info.ids 

774 ) 

775 

776 if data_ids_from_butler_index != data_ids_from_index: 

777 self.log.warning( 

778 "Recalculating the Data IDs for zip file %s (which may include new metadata corrections) " 

779 "results in a difference to the Data IDs recorded in the butler index in that zip. " 

780 "Consider remaking the zipped raws.", 

781 file, 

782 ) 

783 

784 indexFileData.extend(file_data) 

785 

786 return indexFileData, updated_files, good_index_files, bad_index_files 

787 

788 def locateAndReadIndexFiles( 

789 self, files: Iterable[ResourcePath] 

790 ) -> tuple[dict[ResourcePath, Any], list[ResourcePath], set[ResourcePath], set[ResourcePath]]: 

791 """Given a list of files, look for index files and read them. 

792 

793 Index files can either be explicitly in the list of files to 

794 ingest, or else located in the same directory as a file to ingest. 

795 Index entries are always used if present. 

796 

797 Parameters 

798 ---------- 

799 files : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

800 URIs to the files to be ingested. 

801 

802 Returns 

803 ------- 

804 index : `dict` [ `lsst.resources.ResourcePath`, `typing.Any` ] 

805 Merged contents of all relevant index files found. These can 

806 be explicitly specified index files or ones found in the 

807 directory alongside a data file to be ingested. 

808 updated_files : `list` [ `lsst.resources.ResourcePath` ] 

809 Updated list of the input files with entries removed that were 

810 found listed in an index file. Order is not guaranteed to 

811 match the order of the files given to this routine. 

812 good_index_files: `set` [ `lsst.resources.ResourcePath` ] 

813 Index files that were successfully read. 

814 bad_index_files: `set` [ `lsst.resources.ResourcePath` ] 

815 Files that looked like index files but failed to read properly. 

816 """ 

817 # Convert the paths to absolute for easy comparison with index content. 

818 # Do not convert to real paths since we have to assume that index 

819 # files are in this location and not the location which it links to. 

820 files = tuple(f.abspath() for f in files) 

821 

822 # Index files must be named this. 

823 index_root_file = "_index.json" 

824 

825 # Group the files by directory. 

826 files_by_directory: dict[ResourcePath, set[str]] = defaultdict(set) 

827 

828 for path in files: 

829 directory, file_in_dir = path.split() 

830 files_by_directory[directory].add(file_in_dir) 

831 

832 # All the metadata read from index files with keys of full path. 

833 index_entries: dict[ResourcePath, Any] = {} 

834 

835 # Index files we failed to read. 

836 bad_index_files: set[ResourcePath] = set() 

837 

838 # Any good index files that were found and used. 

839 good_index_files: set[ResourcePath] = set() 

840 

841 # Look for index files in those directories. 

842 for directory, files_in_directory in files_by_directory.items(): 

843 possible_index_file = directory.join(index_root_file) 

844 if possible_index_file.exists(): 

845 # If we are explicitly requesting an index file the 

846 # messages should be different. 

847 index_msg = "inferred" 

848 is_implied = True 

849 if index_root_file in files_in_directory: 

850 index_msg = "explicit" 

851 is_implied = False 

852 

853 # Try to read the index file and catch and report any 

854 # problems. 

855 try: 

856 content = json.loads(possible_index_file.read()) 

857 index = process_index_data(content, force_dict=True) 

858 # mypy should in theory know that this is a mapping 

859 # from the overload type annotation of process_index_data. 

860 assert isinstance(index, MutableMapping) 

861 except Exception as e: 

862 # Only trigger the callback if the index file 

863 # was asked for explicitly. Triggering on implied file 

864 # might be surprising. 

865 if not is_implied: 

866 self._on_metadata_failure(possible_index_file, e) 

867 if self.config.failFast: 

868 raise RuntimeError( 

869 f"Problem reading index file from {index_msg} location {possible_index_file}" 

870 ) from e 

871 bad_index_files.add(possible_index_file) 

872 continue 

873 

874 self.log.debug("Extracted index metadata from %s file %s", index_msg, possible_index_file) 

875 good_index_files.add(possible_index_file) 

876 

877 # Go through the index adding entries for files. 

878 # If we have non-index files in this directory marked for 

879 # ingest we should only get index information for those. 

880 # If the index file was explicit we use all entries. 

881 if is_implied: 

882 files_to_ingest = files_in_directory 

883 else: 

884 files_to_ingest = set(index) 

885 

886 # Copy relevant metadata into a single dict for all index 

887 # entries. 

888 for file_in_dir in files_to_ingest: 

889 # Skip an explicitly specified index file. 

890 # This should never happen because an explicit index 

891 # file will force ingest of all files in the index 

892 # and not use the explicit file list. If somehow 

893 # this is not true we continue. Raising an exception 

894 # seems like the wrong thing to do since this is harmless. 

895 if file_in_dir == index_root_file: 

896 self.log.info( 

897 "Logic error found scanning directory %s. Please file ticket.", directory 

898 ) 

899 continue 

900 if file_in_dir in index: 

901 file = directory.join(file_in_dir) 

902 if file in index_entries: 

903 # ObservationInfo overrides raw metadata 

904 if isinstance(index[file_in_dir], ObservationInfo) and not isinstance( 

905 index_entries[file], ObservationInfo 

906 ): 

907 self.log.warning( 

908 "File %s already specified in an index file but overriding" 

909 " with ObservationInfo content from %s", 

910 file, 

911 possible_index_file, 

912 ) 

913 else: 

914 self.log.warning( 

915 "File %s already specified in an index file, ignoring content from %s", 

916 file, 

917 possible_index_file, 

918 ) 

919 # Do nothing in this case 

920 continue 

921 

922 index_entries[file] = index[file_in_dir] 

923 

924 # Remove files from list that have index entries and also 

925 # any files that we determined to be explicit index files 

926 # or any index files that we failed to read. 

927 filtered = set(files) - set(index_entries) - good_index_files - bad_index_files 

928 

929 # The filtered list loses the initial order. Retaining the order 

930 # is good for testing but does have a cost if there are many 

931 # files when copying the good values out. A dict would have faster 

932 # lookups (using the files as keys) but use more memory. 

933 ordered: list[ResourcePath] = [] 

934 seen: set[ResourcePath] = set() 

935 for f in files: 

936 if f in filtered and f not in seen: 

937 ordered.append(f) 

938 seen.add(f) 

939 

940 return index_entries, ordered, good_index_files, bad_index_files 

941 

942 def processIndexEntries(self, index_entries: dict[ResourcePath, Any]) -> list[RawFileData]: 

943 """Convert index entries to RawFileData. 

944 

945 Parameters 

946 ---------- 

947 index_entries : `dict` [`lsst.resources.ResourcePath`, `typing.Any`] 

948 Dict indexed by name of file to ingest and with keys either 

949 raw metadata or translated 

950 `~astro_metadata_translator.ObservationInfo`. 

951 

952 Returns 

953 ------- 

954 data : `list` [ `RawFileData` ] 

955 Structures containing the metadata extracted from the file, 

956 as well as the original filename. All fields will be populated, 

957 but the `RawFileDatasetInfo.dataId` attributes will be minimal 

958 (unexpanded) `~lsst.daf.butler.DataCoordinate` instances. 

959 """ 

960 formatterClass: type[Formatter | FormatterV2] 

961 fileData = [] 

962 for filename, metadata in index_entries.items(): 

963 try: 

964 datasets = [self._calculate_dataset_info(metadata, filename)] 

965 except Exception as e: 

966 self.log.debug("Problem extracting metadata for file %s found in index file: %s", filename, e) 

967 datasets = [] 

968 formatterClass = Formatter 

969 instrument = None 

970 self._on_metadata_failure(filename, e) 

971 if self.config.failFast: 

972 raise RuntimeError( 

973 f"Problem extracting metadata for file {filename} found in index file" 

974 ) from e 

975 else: 

976 instrument, formatterClass = self._determine_instrument_formatter( 

977 datasets[0].dataId, filename 

978 ) 

979 if instrument is None: 

980 datasets = [] 

981 fileData.append( 

982 RawFileData( 

983 datasets=datasets, 

984 filename=filename, 

985 # MyPy wants this to be a non-abstract class, which is not 

986 # true for the error case where instrument is None and 

987 # datasets=[]. 

988 FormatterClass=formatterClass, # type: ignore 

989 instrument=instrument, 

990 ) 

991 ) 

992 return fileData 

993 

994 def groupByExposure(self, files: Iterable[RawFileData]) -> list[RawExposureData]: 

995 """Group an iterable of `RawFileData` by exposure. 

996 

997 Parameters 

998 ---------- 

999 files : iterable of `RawFileData` 

1000 File-level information to group. 

1001 

1002 Returns 

1003 ------- 

1004 exposures : `list` of `RawExposureData` 

1005 A list of structures that group the file-level information by 

1006 exposure. All fields will be populated. The 

1007 `RawExposureData.dataId` attributes will be minimal (unexpanded) 

1008 `~lsst.daf.butler.DataCoordinate` instances. 

1009 """ 

1010 exposureDimensions = self.universe["exposure"].minimal_group 

1011 byExposure = defaultdict(list) 

1012 for f in files: 

1013 # Assume that the first dataset is representative for the file. 

1014 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f) 

1015 

1016 return [ 

1017 RawExposureData( 

1018 dataId=dataId, 

1019 files=exposureFiles, 

1020 universe=self.universe, 

1021 record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe), 

1022 dependencyRecords=self.makeDependencyRecords( 

1023 exposureFiles[0].datasets[0].obsInfo, self.universe 

1024 ), 

1025 ) 

1026 for dataId, exposureFiles in byExposure.items() 

1027 ] 

1028 

1029 def makeExposureRecord( 

1030 self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any 

1031 ) -> DimensionRecord: 

1032 """Construct a registry record for an exposure. 

1033 

1034 This is a method that subclasses will often want to customize. This can 

1035 often be done by calling this base class implementation with additional 

1036 ``kwargs``. 

1037 

1038 Parameters 

1039 ---------- 

1040 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1041 Observation details for (one of the components of) the exposure. 

1042 universe : `lsst.daf.butler.DimensionUniverse` 

1043 Set of all known dimensions. 

1044 **kwargs 

1045 Additional field values for this record. 

1046 

1047 Returns 

1048 ------- 

1049 record : `lsst.daf.butler.DimensionRecord` 

1050 The exposure record that must be inserted into the 

1051 `~lsst.daf.butler.Registry` prior to file-level ingest. 

1052 """ 

1053 return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs) 

1054 

1055 def makeDependencyRecords( 

1056 self, obsInfo: ObservationInfo, universe: DimensionUniverse 

1057 ) -> dict[str, DimensionRecord]: 

1058 """Construct dependency records. 

1059 

1060 These dependency records will be inserted into the 

1061 `~lsst.daf.butler.Registry` before the exposure records, because they 

1062 are dependencies of the exposure. This allows an opportunity to satisfy 

1063 foreign key constraints that exist because of dimensions related to the 

1064 exposure. 

1065 

1066 This is a method that subclasses may want to customize, if they've 

1067 added dimensions that relate to an exposure. 

1068 

1069 Parameters 

1070 ---------- 

1071 obsInfo : `~astro_metadata_translator.ObservationInfo` 

1072 Observation details for (one of the components of) the exposure. 

1073 universe : `lsst.daf.butler.DimensionUniverse` 

1074 Set of all known dimensions. 

1075 

1076 Returns 

1077 ------- 

1078 records : `dict` [`str`, `lsst.daf.butler.DimensionRecord`] 

1079 The records to insert, indexed by dimension name. 

1080 """ 

1081 records: dict[str, DimensionRecord] = {} 

1082 if "exposure" not in universe: 

1083 return records 

1084 exposure = universe["exposure"] 

1085 if "group" in exposure.implied: 

1086 records["group"] = universe["group"].RecordClass( 

1087 name=obsInfo.exposure_group, 

1088 instrument=obsInfo.instrument, 

1089 ) 

1090 if "day_obs" in exposure.implied: 

1091 if (offset := getattr(obsInfo, "observing_day_offset")) is not None: 

1092 offset_int = round(offset.to_value("s")) 

1093 assert obsInfo.observing_day is not None 

1094 timespan = Timespan.from_day_obs(obsInfo.observing_day, offset_int) 

1095 else: 

1096 timespan = None 

1097 records["day_obs"] = universe["day_obs"].RecordClass( 

1098 instrument=obsInfo.instrument, 

1099 id=obsInfo.observing_day, 

1100 timespan=timespan, 

1101 ) 

1102 return records 

1103 

1104 def expandDataIds(self, data: RawExposureData) -> RawExposureData: 

1105 """Expand the data IDs associated with a raw exposure. 

1106 

1107 This adds the metadata records. 

1108 

1109 Parameters 

1110 ---------- 

1111 data : `RawExposureData` 

1112 A structure containing information about the exposure to be 

1113 ingested. Must have `RawExposureData.record` populated. Should 

1114 be considered consumed upon return. 

1115 

1116 Returns 

1117 ------- 

1118 exposure : `RawExposureData` 

1119 An updated version of the input structure, with 

1120 `RawExposureData.dataId` and nested `RawFileDatasetInfo.dataId` 

1121 attributes updated to data IDs for which 

1122 `~lsst.daf.butler.DataCoordinate.hasRecords` returns `True`. 

1123 """ 

1124 # We start by expanded the exposure-level data ID; we won't use that 

1125 # directly in file ingest, but this lets us do some database lookups 

1126 # once per exposure instead of once per file later. 

1127 data.dataId = self.butler.registry.expandDataId( 

1128 data.dataId, 

1129 # We pass in the records we'll be inserting shortly so they aren't 

1130 # looked up from the database. We do expect instrument and filter 

1131 # records to be retrieved from the database here (though the 

1132 # Registry may cache them so there isn't a lookup every time). 

1133 records={"exposure": data.record, **data.dependencyRecords}, 

1134 ) 

1135 # Now we expand the per-file (exposure+detector) data IDs. This time 

1136 # we pass in the records we just retrieved from the exposure data ID 

1137 # expansion. 

1138 for file in data.files: 

1139 for dataset in file.datasets: 

1140 dataset.dataId = self.butler.registry.expandDataId( 

1141 dataset.dataId, 

1142 records={k: data.dataId.records[k] for k in data.dataId.dimensions.elements}, 

1143 ) 

1144 return data 

1145 

1146 def prep( 

1147 self, 

1148 files: Iterable[ResourcePath], 

1149 *, 

1150 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1151 search_indexes: bool = True, 

1152 ) -> tuple[Iterator[RawExposureData], list[ResourcePath]]: 

1153 """Perform all non-database-updating ingest preprocessing steps. 

1154 

1155 Parameters 

1156 ---------- 

1157 files : iterable over `str` or path-like objects 

1158 Paths to the files to be ingested. Will be made absolute 

1159 if they are not already. 

1160 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1161 If not `None`, a thread pool with which to parallelize some 

1162 operations. 

1163 search_indexes : `bool`, optional 

1164 If `True` the code will search for index JSON files in given 

1165 directories. If you know for a fact that index files do not exist 

1166 set this to `False` for a slight speed up in metadata gathering. 

1167 

1168 Returns 

1169 ------- 

1170 exposures : `~collections.abc.Iterator` [ `RawExposureData` ] 

1171 Data structures containing dimension records, filenames, and data 

1172 IDs to be ingested (one structure for each exposure). 

1173 bad_files : `list` of `str` 

1174 List of all the files that could not have metadata extracted. 

1175 """ 

1176 

1177 def _partition_good_bad( 

1178 file_data: Iterable[RawFileData], 

1179 ) -> tuple[list[RawFileData], list[ResourcePath]]: 

1180 """Filter out bad files and return good with list of bad.""" 

1181 good_files = [] 

1182 bad_files = [] 

1183 for fileDatum in self.progress.wrap(file_data, desc="Reading image metadata"): 

1184 if not fileDatum.datasets: 

1185 bad_files.append(fileDatum.filename) 

1186 else: 

1187 good_files.append(fileDatum) 

1188 return good_files, bad_files 

1189 

1190 # Look for zip files. 

1191 zip_file_data, files, good_zip_files, bad_zip_files = self.readZipIndexFiles(files) 

1192 if bad_zip_files: 

1193 self.log.info("Failed to extract index metadata from the following zip files:") 

1194 for bad in sorted(bad_zip_files): 

1195 self.log.info("- %s", bad) 

1196 

1197 # Look for index files and read them. 

1198 # There should be far fewer index files than data files. 

1199 index_entries: dict[ResourcePath, Any] = {} 

1200 if search_indexes: 

1201 index_entries, files, good_index_files, bad_index_files = self.locateAndReadIndexFiles(files) 

1202 if bad_index_files: 

1203 self.log.info("Failed to read the following explicitly requested index files:") 

1204 for bad in sorted(bad_index_files): 

1205 self.log.info("- %s", bad) 

1206 else: 

1207 # We have been told explicitly there are no indexes. 

1208 index_entries = {} 

1209 good_index_files = set() 

1210 bad_index_files = set() 

1211 

1212 # Merge information from zips and standalone index files. 

1213 good_index_files.update(good_zip_files) 

1214 bad_index_files.update(bad_zip_files) 

1215 

1216 # Now convert all the index file entries to standard form for ingest. 

1217 processed_bad_index_files: list[ResourcePath] = [] 

1218 indexFileData = self.processIndexEntries(index_entries) 

1219 indexFileData.extend(zip_file_data) 

1220 if indexFileData: 

1221 indexFileData, processed_bad_index_files = _partition_good_bad(indexFileData) 

1222 self.log.info( 

1223 "Successfully extracted metadata for %d file%s found in %d index file%s with %d failure%s", 

1224 *_log_msg_counter(indexFileData), 

1225 *_log_msg_counter(good_index_files), 

1226 *_log_msg_counter(processed_bad_index_files), 

1227 ) 

1228 

1229 # Extract metadata and build per-detector regions. 

1230 # This could run in threads or a subprocess so collect all output 

1231 # before looking at failures. 

1232 fileData: Iterator[RawFileData] 

1233 if pool is None: 

1234 fileData = map(self.extractMetadata, files) 

1235 else: 

1236 fileData = pool.map(self.extractMetadata, files) 

1237 

1238 # Filter out all the failed reads and store them for later 

1239 # reporting. 

1240 good_file_data, bad_files = _partition_good_bad(fileData) 

1241 # Only report if we looked at any standalone files at all. 

1242 if files: 

1243 self.log.info( 

1244 "Successfully extracted metadata from %d file%s with %d failure%s", 

1245 *_log_msg_counter(good_file_data), 

1246 *_log_msg_counter(bad_files), 

1247 ) 

1248 

1249 # Combine with data from index files. 

1250 good_file_data.extend(indexFileData) 

1251 bad_files.extend(processed_bad_index_files) 

1252 bad_files.extend(bad_index_files) 

1253 

1254 # Use that metadata to group files (and extracted metadata) by 

1255 # exposure. Never parallelized because it's intrinsically a gather 

1256 # step. 

1257 exposureData: list[RawExposureData] = self.groupByExposure(good_file_data) 

1258 

1259 # The next operation operates on RawExposureData instances (one at 

1260 # a time) in-place and then returns the modified instance. We call it 

1261 # as a pass-through instead of relying on the arguments we pass in to 

1262 # have been modified because in the parallel case those arguments are 

1263 # going to be pickled and unpickled, and I'm not certain 

1264 # multiprocessing is careful enough with that for output arguments to 

1265 # work. 

1266 

1267 # Expand the data IDs to include all dimension metadata; we need this 

1268 # because we may need to generate path templates that rely on that 

1269 # metadata. 

1270 # This is the first step that involves actual database calls (but just 

1271 # SELECTs), so if there's going to be a problem with connections vs. 

1272 # multiple processes, or lock contention (in SQLite) slowing things 

1273 # down, it'll happen here. 

1274 return map(self.expandDataIds, exposureData), bad_files 

1275 

1276 def ingestExposureDatasets( 

1277 self, 

1278 exposure: RawExposureData, 

1279 datasetType: DatasetType, 

1280 *, 

1281 run: str, 

1282 skip_existing_exposures: bool = False, 

1283 track_file_attrs: bool = True, 

1284 ) -> list[FileDataset]: 

1285 """Ingest all raw files in one exposure. 

1286 

1287 Parameters 

1288 ---------- 

1289 exposure : `RawExposureData` 

1290 A structure containing information about the exposure to be 

1291 ingested. Must have `RawExposureData.record` populated and all 

1292 data ID attributes expanded. 

1293 datasetType : `lsst.daf.butler.DatasetType` 

1294 The dataset type associated with this exposure. 

1295 run : `str` 

1296 Name of a RUN-type collection to write to. 

1297 skip_existing_exposures : `bool`, optional 

1298 If `True` (`False` is default), skip raws that have already been 

1299 ingested (i.e. raws for which we already have a dataset with the 

1300 same data ID in the target collection, even if from another file). 

1301 Note that this is much slower than just not passing 

1302 already-ingested files as inputs, because we still need to read and 

1303 process metadata to identify which exposures to search for. 

1304 track_file_attrs : `bool`, optional 

1305 Control whether file attributes such as the size or checksum should 

1306 be tracked by the datastore. Whether this parameter is honored 

1307 depends on the specific datastore implementation. 

1308 

1309 Returns 

1310 ------- 

1311 datasets : `list` of `lsst.daf.butler.FileDataset` 

1312 Per-file structures identifying the files ingested and their 

1313 dataset representation in the data repository. 

1314 """ 

1315 # Raw files are preferentially ingested using a UUID derived from 

1316 # the collection name and dataId. 

1317 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN): 

1318 mode = DatasetIdGenEnum.DATAID_TYPE_RUN 

1319 else: 

1320 mode = DatasetIdGenEnum.UNIQUE 

1321 

1322 # The datasets for this exposure could all be from a single zip 

1323 # or be distinct files. Need to pull out the zip files. 

1324 zips: dict[ResourcePath, list[RawFileData]] = defaultdict(list) 

1325 datasets: list[FileDataset] = [] 

1326 for file in exposure.files: 

1327 if file.filename.getExtension() == ".zip": 

1328 zips[file.filename.replace(fragment="")].append(file) 

1329 continue 

1330 

1331 refs = [ 

1332 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1333 ] 

1334 if refs: 

1335 datasets.append( 

1336 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1337 ) 

1338 

1339 if datasets: 

1340 with self.butler.record_metrics() as butler_metrics: 

1341 self.butler.ingest( 

1342 *datasets, 

1343 transfer=self.config.transfer, 

1344 record_validation_info=track_file_attrs, 

1345 skip_existing=skip_existing_exposures, 

1346 ) 

1347 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1348 

1349 # In theory it is possible for the new Data IDs to differ from the Data 

1350 # IDs stored in the Zip index. That could happen if there is a metadata 

1351 # correction that changes the exposure or detector numbers. We have to 

1352 # assume that by the time the zip has been made that this correction 

1353 # has been applied. If we don't assume that then we have to 

1354 # regenerate the index but we cannot change the contents of the zip. 

1355 # We would also need the ability for butler.ingest_zip to take an 

1356 # override ZipIndex object. 

1357 # The Dataset ref IDs will only change if the data IDs change. 

1358 for zip, files in zips.items(): 

1359 zip_datasets: list[FileDataset] = [] # Needed for return value. 

1360 for file in files: 

1361 refs = [ 

1362 DatasetRef(datasetType, d.dataId, run=run, id_generation_mode=mode) for d in file.datasets 

1363 ] 

1364 if refs: 

1365 # Assumes the guiders are not included in the metadata 

1366 # index. 

1367 zip_datasets.append( 

1368 FileDataset(path=file.filename.abspath(), refs=refs, formatter=file.FormatterClass) 

1369 ) 

1370 with self.butler.record_metrics() as butler_metrics: 

1371 self.butler.ingest_zip( 

1372 zip, transfer=self.config.transfer, skip_existing=skip_existing_exposures 

1373 ) 

1374 datasets.extend(zip_datasets) 

1375 self.metrics.time_for_ingest += butler_metrics.time_in_ingest 

1376 

1377 return datasets 

1378 

1379 def ingestFiles( 

1380 self, 

1381 files: Sequence[ResourcePath], 

1382 *, 

1383 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1384 num_workers: int = 1, 

1385 run: str | None = None, 

1386 skip_existing_exposures: bool = False, 

1387 update_exposure_records: bool = False, 

1388 track_file_attrs: bool = True, 

1389 search_indexes: bool = True, 

1390 skip_ingest: bool = False, 

1391 ) -> tuple[list[DatasetRef], list[ResourcePath], int, int, int]: 

1392 """Ingest files into a Butler data repository. 

1393 

1394 This creates any new exposure or visit Dimension entries needed to 

1395 identify the ingested files, creates new Dataset entries in the 

1396 Registry and finally ingests the files themselves into the Datastore. 

1397 Any needed instrument, detector, and physical_filter Dimension entries 

1398 must exist in the Registry before `run` is called. 

1399 

1400 Parameters 

1401 ---------- 

1402 files : iterable over `lsst.resources.ResourcePath` 

1403 URIs to the files to be ingested. 

1404 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1405 If not `None`, a thread pool with which to parallelize some 

1406 operations. 

1407 num_workers : `int`, optional 

1408 The number of workers to use. Ignored if ``pool`` is not `None`. 

1409 run : `str`, optional 

1410 Name of a RUN-type collection to write to, overriding 

1411 the default derived from the instrument name. 

1412 skip_existing_exposures : `bool`, optional 

1413 If `True` (`False` is default), skip raws that have already been 

1414 ingested (i.e. raws for which we already have a dataset with the 

1415 same data ID in the target collection, even if from another file). 

1416 Note that this is much slower than just not passing 

1417 already-ingested files as inputs, because we still need to read and 

1418 process metadata to identify which exposures to search for. It 

1419 also will not work reliably if multiple processes are attempting to 

1420 ingest raws from the same exposure concurrently, in that different 

1421 processes may still attempt to ingest the same raw and conflict, 

1422 causing a failure that prevents other raws from the same exposure 

1423 from being ingested. 

1424 update_exposure_records : `bool`, optional 

1425 If `True` (`False` is default), update existing exposure records 

1426 that conflict with the new ones instead of rejecting them. THIS IS 

1427 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1428 KNOWN TO BE BAD. This should usually be combined with 

1429 ``skip_existing_exposures=True``. 

1430 track_file_attrs : `bool`, optional 

1431 Control whether file attributes such as the size or checksum should 

1432 be tracked by the datastore. Whether this parameter is honored 

1433 depends on the specific datastore implementation. 

1434 search_indexes : `bool`, optional 

1435 If `True` the code will search for index JSON files in given 

1436 directories. If you know for a fact that index files do not exist 

1437 set this to `False` for a slight speed up in metadata gathering. 

1438 skip_ingest : `bool`, optional 

1439 Set this to `True` to do metadata extraction and dimension record 

1440 updates without attempting to re-ingest. This can be useful if 

1441 there has been a metadata correction associated with an exposure. 

1442 

1443 Returns 

1444 ------- 

1445 refs : `list` of `lsst.daf.butler.DatasetRef` 

1446 Dataset references for ingested raws. 

1447 bad_files : `list` of `lsst.resources.ResourcePath` 

1448 Given paths that could not be ingested. 

1449 n_exposures : `int` 

1450 Number of exposures successfully ingested. 

1451 n_exposures_failed : `int` 

1452 Number of exposures that failed when inserting dimension data. 

1453 n_ingests_failed : `int` 

1454 Number of exposures that failed when ingesting raw datasets. 

1455 """ 

1456 created_pool = False 

1457 if pool is None and num_workers > 1: 

1458 pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) 

1459 created_pool = True 

1460 

1461 try: 

1462 with self.metrics.collect_metric( 

1463 "time_for_metadata", 

1464 self.log, 

1465 msg="Reading metadata from %d file%s", 

1466 args=(*_log_msg_counter(files),), 

1467 ): 

1468 exposureData, bad_files = self.prep(files, pool=pool, search_indexes=search_indexes) 

1469 finally: 

1470 if created_pool and pool: 

1471 # The pool is not needed any more so close it if we created 

1472 # it to ensure we clean up resources. 

1473 pool.shutdown(wait=True) 

1474 

1475 # Up to this point, we haven't modified the data repository at all. 

1476 # Now we finally do that, with one transaction per exposure. This is 

1477 # not parallelized at present because the performance of this step is 

1478 # limited by the database server. That may or may not change in the 

1479 # future once we increase our usage of bulk inserts and reduce our 

1480 # usage of savepoints; we've tried to get everything but the database 

1481 # operations done in advance to reduce the time spent inside 

1482 # transactions. 

1483 refs = [] 

1484 runs = set() 

1485 datasetTypes: dict[str, DatasetType] = {} 

1486 n_exposures = 0 

1487 n_exposures_failed = 0 

1488 n_ingests_failed = 0 

1489 for exposure in self.progress.wrap(exposureData, desc="Ingesting raw exposures"): 

1490 assert exposure.record is not None, "Should be guaranteed by prep()" 

1491 self.log.debug( 

1492 "Attempting to ingest %d file%s from exposure %s:%s", 

1493 *_log_msg_counter(exposure.files), 

1494 exposure.record.instrument, 

1495 exposure.record.obs_id, 

1496 ) 

1497 

1498 try: 

1499 with self.metrics.collect_metric( 

1500 "time_for_records", 

1501 self.log, 

1502 msg="Creating dimension records for instrument %s, exposure %s", 

1503 args=( 

1504 str(exposure.record.instrument), 

1505 str(exposure.record.id), 

1506 ), 

1507 ): 

1508 for name, record in exposure.dependencyRecords.items(): 

1509 self.butler.registry.syncDimensionData(name, record, update=update_exposure_records) 

1510 inserted_or_updated = self.butler.registry.syncDimensionData( 

1511 "exposure", 

1512 exposure.record, 

1513 update=update_exposure_records, 

1514 ) 

1515 if inserted_or_updated is not False: 

1516 with self.metrics.collect_metric( 

1517 "time_for_callbacks", log=self.log, msg="Exposure record updated. Calling handler" 

1518 ): 

1519 self._on_exposure_record(exposure.record) 

1520 except Exception as e: 

1521 self._on_ingest_failure(exposure, e) 

1522 n_exposures_failed += 1 

1523 self.log.warning( 

1524 "Exposure %s:%s could not be registered: %s", 

1525 exposure.record.instrument, 

1526 exposure.record.obs_id, 

1527 e, 

1528 ) 

1529 if self.config.failFast: 

1530 raise e 

1531 continue 

1532 

1533 if isinstance(inserted_or_updated, dict): 

1534 # Exposure is in the registry and we updated it, so 

1535 # syncDimensionData returned a dict. 

1536 columns_updated = list(inserted_or_updated.keys()) 

1537 s_col = "s" if len(columns_updated) != 1 else "" 

1538 w_col = "were" if len(columns_updated) != 1 else "was" 

1539 self.log.info( 

1540 "Exposure %s:%s was already present, but column%s %s %s updated.", 

1541 exposure.record.instrument, 

1542 exposure.record.obs_id, 

1543 s_col, 

1544 ", ".join(repr(c) for c in columns_updated), 

1545 w_col, 

1546 ) 

1547 

1548 if skip_ingest: 

1549 continue 

1550 

1551 # Determine the instrument so we can work out the dataset type. 

1552 instrument = exposure.files[0].instrument 

1553 assert instrument is not None, ( 

1554 "file should have been removed from this list by prep if instrument could not be found" 

1555 ) 

1556 

1557 datasetType = self.get_raw_datasetType(instrument, datasetTypes) 

1558 if datasetType.name not in datasetTypes: 

1559 self.butler.registry.registerDatasetType(datasetType) 

1560 datasetTypes[datasetType.name] = datasetType 

1561 

1562 # Override default run if nothing specified explicitly. 

1563 if run is None: 

1564 this_run = instrument.makeDefaultRawIngestRunName() 

1565 else: 

1566 this_run = run 

1567 if this_run not in runs: 

1568 self.butler.registry.registerCollection(this_run, type=CollectionType.RUN) 

1569 runs.add(this_run) 

1570 try: 

1571 datasets_for_exposure = self.ingestExposureDatasets( 

1572 exposure, 

1573 datasetType=datasetType, 

1574 run=this_run, 

1575 skip_existing_exposures=skip_existing_exposures, 

1576 track_file_attrs=track_file_attrs, 

1577 ) 

1578 except Exception as e: 

1579 self._on_ingest_failure(exposure, e) 

1580 n_ingests_failed += 1 

1581 self.log.warning("Failed to ingest the following for reason: %s", e) 

1582 for f in exposure.files: 

1583 self.log.warning("- %s", f.filename) 

1584 if self.config.failFast: 

1585 raise e 

1586 continue 

1587 else: 

1588 with self.metrics.collect_metric("time_for_callbacks", self.log, msg="Calling on_success"): 

1589 self._on_success(datasets_for_exposure) 

1590 for dataset in datasets_for_exposure: 

1591 refs.extend(dataset.refs) 

1592 

1593 # Success for this exposure. 

1594 n_exposures += 1 

1595 self.log.info( 

1596 "Exposure %s:%s ingested successfully", exposure.record.instrument, exposure.record.obs_id 

1597 ) 

1598 

1599 return refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed 

1600 

1601 @timeMethod 

1602 def run( 

1603 self, 

1604 files: Iterable[ResourcePathExpression], 

1605 *, 

1606 pool: concurrent.futures.ThreadPoolExecutor | None = None, 

1607 processes: int | None = None, # Deprecated. Use num_workers. 

1608 run: str | None = None, 

1609 file_filter: str | re.Pattern = r"\.fit[s]?\b", 

1610 group_files: bool = True, 

1611 skip_existing_exposures: bool = False, 

1612 update_exposure_records: bool = False, 

1613 track_file_attrs: bool = True, 

1614 search_indexes: bool = True, 

1615 num_workers: int = 1, 

1616 skip_ingest: bool = False, 

1617 ) -> list[DatasetRef]: 

1618 """Ingest files into a Butler data repository. 

1619 

1620 This creates any new exposure or visit Dimension entries needed to 

1621 identify the ingested files, creates new Dataset entries in the 

1622 Registry and finally ingests the files themselves into the Datastore. 

1623 Any needed instrument, detector, and physical_filter Dimension entries 

1624 must exist in the Registry before `run` is called. 

1625 

1626 Parameters 

1627 ---------- 

1628 files : iterable `lsst.resources.ResourcePath`, `str` or path-like 

1629 Paths to the files to be ingested. Can refer to directories. 

1630 Will be made absolute if they are not already. 

1631 pool : `concurrent.futures.ThreadPoolExecutor`, optional 

1632 If not `None`, a process pool with which to parallelize some 

1633 operations. This parameter was previously a `multiprocessing.Pool` 

1634 but that option is no longer supported since it is slow compared 

1635 to futures. 

1636 processes : `int`, optional 

1637 The number of processes to use. Ignored if ``pool`` is not `None`. 

1638 Deprecated. Please use ``num_workers`` parameter instead. 

1639 run : `str`, optional 

1640 Name of a RUN-type collection to write to, overriding 

1641 the default derived from the instrument name. 

1642 file_filter : `str` or `re.Pattern`, optional 

1643 Pattern to use to discover files to ingest within directories. 

1644 The default is to search for FITS files. The regex applies to 

1645 files within the directory. 

1646 group_files : `bool`, optional 

1647 Group files by directory if they have been discovered in 

1648 directories. Will not affect files explicitly provided. 

1649 skip_existing_exposures : `bool`, optional 

1650 If `True` (`False` is default), skip raws that have already been 

1651 ingested (i.e. raws for which we already have a dataset with the 

1652 same data ID in the target collection, even if from another file). 

1653 Note that this is much slower than just not passing 

1654 already-ingested files as inputs, because we still need to read and 

1655 process metadata to identify which exposures to search for. It 

1656 also will not work reliably if multiple processes are attempting to 

1657 ingest raws from the same exposure concurrently, in that different 

1658 processes may still attempt to ingest the same raw and conflict, 

1659 causing a failure that prevents other raws from the same exposure 

1660 from being ingested. 

1661 update_exposure_records : `bool`, optional 

1662 If `True` (`False` is default), update existing exposure records 

1663 that conflict with the new ones instead of rejecting them. THIS IS 

1664 AN ADVANCED OPTION THAT SHOULD ONLY BE USED TO FIX METADATA THAT IS 

1665 KNOWN TO BE BAD. This should usually be combined with 

1666 ``skip_existing_exposures=True``. 

1667 track_file_attrs : `bool`, optional 

1668 Control whether file attributes such as the size or checksum should 

1669 be tracked by the datastore. Whether this parameter is honored 

1670 depends on the specific datastore implementation. 

1671 search_indexes : `bool`, optional 

1672 If `True` the code will search for index JSON files in given 

1673 directories. If you know for a fact that index files do not exist 

1674 set this to `False` for a slight speed up in metadata gathering. 

1675 num_workers : `int`, optional 

1676 The number of workers to use. Ignored if ``pool`` parameter is 

1677 given. 

1678 skip_ingest : `bool`, optional 

1679 Set this to `True` to do metadata extraction and dimension record 

1680 updates without attempting to re-ingest. This can be useful if 

1681 there has been a metadata correction associated with an exposure. 

1682 

1683 Returns 

1684 ------- 

1685 refs : `list` of `lsst.daf.butler.DatasetRef` 

1686 Dataset references for ingested raws. 

1687 

1688 Notes 

1689 ----- 

1690 This method inserts all datasets for an exposure within a transaction, 

1691 guaranteeing that partial exposures are never ingested. The exposure 

1692 dimension record is inserted with 

1693 `lsst.daf.butler.Registry.syncDimensionData` first (in its own 

1694 transaction), which inserts only if a record with the same 

1695 primary key does not already exist. This allows different files within 

1696 the same exposure to be ingested in different runs. 

1697 """ 

1698 if pool and not isinstance(pool, concurrent.futures.ThreadPoolExecutor): 

1699 raise ValueError(f"This parameter must now be a ThreadPoolExecutor but was given {pool}.") 

1700 

1701 if processes is not None: 

1702 warnings.warn( 

1703 "Processes parameter is deprecated. Please use num_workers parameter.", 

1704 FutureWarning, 

1705 stacklevel=3, # Jump above the timeMethod wrapper. 

1706 ) 

1707 num_workers = processes 

1708 

1709 refs = [] 

1710 bad_files = [] 

1711 n_exposures = 0 

1712 n_exposures_failed = 0 

1713 n_ingests_failed = 0 

1714 self.metrics.reset() # Clear previous metrics. 

1715 ingest_duration = 0.0 

1716 if group_files: 

1717 with time_this(log=self.log, msg="Processing ingest groups") as timer: 

1718 for group in ResourcePath.findFileResources(files, file_filter, group_files): 

1719 new_refs, bad, n_exp, n_exp_fail, n_ingest_fail = self.ingestFiles( 

1720 tuple(group), 

1721 pool=pool, 

1722 num_workers=num_workers, 

1723 run=run, 

1724 skip_existing_exposures=skip_existing_exposures, 

1725 update_exposure_records=update_exposure_records, 

1726 track_file_attrs=track_file_attrs, 

1727 search_indexes=search_indexes, 

1728 skip_ingest=skip_ingest, 

1729 ) 

1730 refs.extend(new_refs) 

1731 bad_files.extend(bad) 

1732 n_exposures += n_exp 

1733 n_exposures_failed += n_exp_fail 

1734 n_ingests_failed += n_ingest_fail 

1735 ingest_duration = timer.duration 

1736 else: 

1737 with time_this(log=self.log, msg="Ingesting all files in one batch") as timer: 

1738 refs, bad_files, n_exposures, n_exposures_failed, n_ingests_failed = self.ingestFiles( 

1739 tuple(ResourcePath.findFileResources(files, file_filter, group_files)), 

1740 pool=pool, 

1741 num_workers=num_workers, 

1742 run=run, 

1743 skip_existing_exposures=skip_existing_exposures, 

1744 update_exposure_records=update_exposure_records, 

1745 track_file_attrs=track_file_attrs, 

1746 search_indexes=search_indexes, 

1747 skip_ingest=skip_ingest, 

1748 ) 

1749 ingest_duration = timer.duration 

1750 

1751 had_failure = False 

1752 

1753 if bad_files: 

1754 had_failure = True 

1755 self.log.warning("Could not extract observation metadata from the following:") 

1756 for f in bad_files: 

1757 self.log.warning("- %s", f) 

1758 

1759 if skip_ingest: 

1760 ingest_text = "" 

1761 else: 

1762 ingest_text = f" - time in butler ingest: {self.metrics.time_for_ingest} s\n" 

1763 

1764 self.log.info( 

1765 "Successfully processed data from %d exposure%s with %d failure%s from exposure" 

1766 " registration and %d failure%s from file ingest.\n" 

1767 "Timing breakdown:\n" 

1768 " - time in metadata gathering: %f s\n" 

1769 " - time in dimension record writing: %f s\n" 

1770 "%s" 

1771 " - time in user-supplied callbacks: %f s\n", 

1772 *_log_msg_counter(n_exposures), 

1773 *_log_msg_counter(n_exposures_failed), 

1774 *_log_msg_counter(n_ingests_failed), 

1775 self.metrics.time_for_metadata, 

1776 self.metrics.time_for_records, 

1777 ingest_text, 

1778 self.metrics.time_for_callbacks, 

1779 ) 

1780 if n_exposures_failed > 0 or n_ingests_failed > 0: 

1781 had_failure = True 

1782 if not skip_ingest: 

1783 self.log.info( 

1784 "Ingested %d distinct Butler dataset%s in %f sec", *_log_msg_counter(refs), ingest_duration 

1785 ) 

1786 

1787 if had_failure: 

1788 raise RuntimeError("Some failures encountered during ingestion") 

1789 

1790 return refs