Coverage for python / lsst / obs / lsst / _ingestAuxCalibs.py: 18%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:49 +0000

1# This file is part of obs_lsst. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21__all__ = ("PhotodiodeIngestConfig", "PhotodiodeIngestTask", 

22 "ShutterMotionOpenIngestConfig", "ShutterMotionOpenIngestTask", 

23 "ShutterMotionCloseIngestConfig", "ShutterMotionCloseIngestTask") 

24 

25 

26from lsst.daf.butler import ( 

27 CollectionType, 

28 DataCoordinate, 

29 DatasetIdGenEnum, 

30 DatasetRef, 

31 DatasetType, 

32 FileDataset, 

33 Progress, 

34) 

35from lsst.ip.isr import PhotodiodeCalib, ShutterMotionProfile 

36from lsst.obs.base import makeTransferChoiceField 

37from lsst.obs.base.formatters.fitsGeneric import FitsGenericFormatter 

38from lsst.pex.config import Config, Field 

39from lsst.pipe.base import Task 

40from lsst.resources import ResourcePath 

41 

42 

43DEFAULT_PHOTODIODE_REGEX = r"Photodiode_Readings.*txt$|_photodiode.ecsv$|Electrometer.*fits$|EM.*fits$" 

44DEFAULT_SHUTTER_OPEN_REGEX = r".*shutterMotionProfileOpen.json$" 

45DEFAULT_SHUTTER_CLOSE_REGEX = r".*shutterMotionProfileClose.json$" 

46 

47 

48# Base class begin. 

49class IsrCalibIngestConfig(Config): 

50 """Configuration class for base IsrCalib ingestion task.""" 

51 transfer = makeTransferChoiceField(default="copy") 

52 

53 forceCopyOnly = Field( 

54 dtype=bool, 

55 doc="Should this ingest force transfer to be copy, to ensure the calib is rewritten?", 

56 default=True, 

57 ) 

58 doRaiseOnMissingExposure = Field( 

59 dtype=bool, 

60 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?", 

61 default=True, 

62 ) 

63 

64 def validate(self): 

65 super().validate() 

66 if self.forceCopyOnly and self.transfer != "copy": 

67 raise ValueError(f"Transfer must be 'copy' for this data: {self.transfer}") 

68 

69 

70class IsrCalibIngestTask(Task): 

71 """Base task to ingest data convertable to an IsrCalib into a butler 

72 repository 

73 

74 Parameters 

75 ---------- 

76 config : `IsrCalibIngestConfig` 

77 Configuration for the task. 

78 instrument : `~lsst.obs.base.Instrument` 

79 The instrument these datasets are from. 

80 butler : `~lsst.daf.butler.Butler` 

81 Writable butler instance, with ``butler.run`` set to the 

82 appropriate `~lsst.daf.butler.CollectionType.RUN` collection 

83 for these datasets. 

84 **kwargs 

85 Additional keyword arguments. 

86 """ 

87 

88 ConfigClass = IsrCalibIngestConfig 

89 _DefaultName = "genericIsrIngest" 

90 

91 def __init__(self, butler, instrument, config=None, **kwargs): 

92 config.validate() 

93 super().__init__(config, **kwargs) 

94 self.butler = butler 

95 self.universe = self.butler.dimensions 

96 self.datasetType = self.getDatasetType() 

97 self.progress = Progress(self.log.name) 

98 self.instrument = instrument 

99 self.camera = self.instrument.getCamera() 

100 

101 def getDatasetType(self): 

102 """Return the DatasetType to be ingested. 

103 

104 Returns 

105 ------- 

106 datasetType : `lsst.daf.butler.DatasetType` 

107 The datasetType for the ingested data. 

108 """ 

109 raise NotImplementedError( 

110 "Subclasses must define their datasetType." 

111 ) 

112 

113 def getDestinationCollection(self): 

114 """Return the collection that these datasets will be ingested to. 

115 

116 Returns 

117 ------- 

118 collectionName : `str` 

119 The collection the data will be ingested to. 

120 """ 

121 raise NotImplementedError( 

122 "Subclasses must define their target collection." 

123 ) 

124 

125 def readCalibFromFile(self, inputFile): 

126 """Read the inputFile, and determine its calibration type and read it. 

127 

128 Parameters 

129 ---------- 

130 inputFile : `lsst.resources.ResourcePath` 

131 File to be read to check ingestibility. 

132 

133 Returns 

134 ------- 

135 calib : `lsst.ip.isr.IsrCalib` 

136 The appropriately subclassed implementation for this 

137 calibration type. 

138 calibType : `str` 

139 The calibration type/version name. 

140 """ 

141 raise NotImplementedError( 

142 "Subclasses must define how to read their datasets." 

143 ) 

144 

145 def getAssociationInfo(self, inputFile, calib, calibType): 

146 """Determine the information needed to create a dataId for this 

147 dataset. 

148 

149 Parameters 

150 ---------- 

151 inputFile : `lsst.resources.ResourcePath` 

152 Original file containing the dataset. Used for log messages. 

153 calib : `lsst.ip.isr.IsrCalib` 

154 The calibration dataset to study. 

155 calibType : `str` 

156 The calibration type/version name. 

157 

158 Returns 

159 ------- 

160 instrumentName : `str` 

161 Instrument this dataset belongs to. 

162 whereClause : `str` 

163 Butler query "where" that will find the exposure with 

164 matching dataId. 

165 binding : `dict` [`str`: `str`] 

166 Binding values for ``whereClause``. 

167 logId : `str` 

168 A string (or dataset convertable to string) to be used in 

169 downstream logs. 

170 """ 

171 raise NotImplementedError( 

172 "Subclasses must define how to associate their datasets." 

173 ) 

174 

175 def run(self, locations, run=None, 

176 file_filter=DEFAULT_PHOTODIODE_REGEX, 

177 track_file_attrs=None): 

178 

179 """Ingest calibration data into a Butler data repository. 

180 

181 Parameters 

182 ---------- 

183 files : iterable over `lsst.resources.ResourcePath` 

184 URIs to the files to be ingested. 

185 run : `str`, optional 

186 Name of the RUN-type collection to write to, 

187 overriding the default derived from the instrument 

188 name. 

189 skip_existing_exposures : `bool`, optional 

190 If `True`, skip photodiodes that have already been 

191 ingested (i.e. raws for which we already have a 

192 dataset with the same data ID in the target 

193 collection). 

194 track_file_attrs : `bool`, optional 

195 Control whether file attributes such as the size or 

196 checksum should be tracked by the datastore. Whether 

197 this parameter is honored depends on the specific 

198 datastore implementation. 

199 

200 Returns 

201 ------- 

202 refs : `list` [`lsst.daf.butler.DatasetRef`] 

203 Dataset references for ingested raws. 

204 

205 Raises 

206 ------ 

207 RuntimeError 

208 Raised if the number of exposures found for a photodiode 

209 file is not one 

210 """ 

211 files = ResourcePath.findFileResources(locations, file_filter) 

212 

213 registry = self.butler.registry 

214 datasetType = self.datasetType 

215 registry.registerDatasetType(datasetType) 

216 

217 # Find and register run that we will ingest to. 

218 if run is None: 

219 run = self.getDestinationCollection() 

220 registry.registerCollection(run, type=CollectionType.RUN) 

221 

222 # Use datasetIds that match the raw exposure data. 

223 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN): 

224 mode = DatasetIdGenEnum.DATAID_TYPE_RUN 

225 else: 

226 mode = DatasetIdGenEnum.UNIQUE 

227 

228 refs = [] 

229 numExisting = 0 

230 numFailed = 0 

231 numSoftFailed = 0 

232 for inputFile in files: 

233 # Convert the file into the right class. 

234 calib, calibType = self.readCalibFromFile(inputFile) 

235 

236 # Get the information we'll need to look up which exposure 

237 # it matches 

238 instrumentName, whereClause, binding, logId = self.getAssociationInfo(inputFile, calib, calibType) 

239 

240 if whereClause is None: 

241 self.log.warning("Skipping input file %s of unknown type.", 

242 inputFile) 

243 continue 

244 

245 exposureRecords = [rec for rec in registry.queryDimensionRecords("exposure", 

246 instrument=instrumentName, 

247 where=whereClause, 

248 bind=binding)] 

249 

250 nRecords = len(exposureRecords) 

251 if nRecords == 1: 

252 exposureId = exposureRecords[0].id 

253 calib.updateMetadata(camera=self.camera, exposure=exposureId) 

254 elif nRecords == 0: 

255 numSoftFailed += 1 

256 self.log.warning("Skipping instrument %s and identifiers %s: no exposures found.", 

257 instrumentName, logId) 

258 continue 

259 else: 

260 numFailed += 1 

261 self.log.warning("Multiple exposure entries found for instrument %s and " 

262 "identifiers %s.", instrumentName, logId) 

263 continue 

264 

265 # Generate the dataId for this file. 

266 dataId = DataCoordinate.standardize( 

267 instrument=self.instrument.getName(), 

268 exposure=exposureId, 

269 universe=self.universe, 

270 ) 

271 

272 # If this already exists, we should skip it and continue. 

273 existing = { 

274 ref.dataId 

275 for ref in self.butler.registry.queryDatasets(self.datasetType, collections=[run], 

276 dataId=dataId) 

277 } 

278 if existing: 

279 self.log.debug("Skipping instrument %s and identifiers %s: already exists in run %s.", 

280 instrumentName, logId, run) 

281 numExisting += 1 

282 continue 

283 

284 # Ingest must work from a file, but we can't use the 

285 # original, as we've added new metadata and reformatted 

286 # it. Write it to a temp file that we can use to ingest. 

287 # If we can have the files written appropriately, this 

288 # will be a direct ingest of those files. 

289 if self.config.transfer == "copy": 

290 with ResourcePath.temporary_uri(suffix=".fits") as tempFile: 

291 calib.writeFits(tempFile.ospath) 

292 

293 ref = DatasetRef(self.datasetType, dataId, run=run, id_generation_mode=mode) 

294 dataset = FileDataset(path=tempFile, refs=ref, formatter=FitsGenericFormatter) 

295 

296 # No try, as if this fails, we should stop. 

297 self.butler.ingest(dataset, transfer=self.config.transfer, 

298 record_validation_info=track_file_attrs) 

299 self.log.info("Dataset %s:%d (%s) ingested successfully", instrumentName, exposureId, 

300 logId) 

301 refs.append(dataset) 

302 elif self.config.transfer == "direct": 

303 if self.config.forceCopyOnly: 

304 raise RuntimeError("I probably can never happen.") 

305 

306 ref = DatasetRef(self.datasetType, dataId, run=run, id_generation_mode=mode) 

307 dataset = FileDataset(path=inputFile, refs=ref, formatter=FitsGenericFormatter) # ?? 

308 self.butler.ingest(dataset, transfer=self.config.transfer, 

309 record_validation_info=track_file_attrs) 

310 self.log.info("Dataset %s:%d (%s) ingested successfully", instrumentName, exposureId, 

311 logId) 

312 refs.append(dataset) 

313 

314 if numExisting != 0: 

315 self.log.warning("Skipped %d entries that already existed in run %s", numExisting, run) 

316 

317 if numSoftFailed != 0: 

318 print(self.config.doRaiseOnMissingExposure) 

319 if self.config.doRaiseOnMissingExposure: 

320 raise RuntimeError(f"Failed to ingest {numSoftFailed} entries due to " 

321 "missing exposure information.") 

322 else: 

323 self.log.warning("Skipped %d entries that had no associated exposure", numSoftFailed) 

324 if numFailed != 0: 

325 raise RuntimeError(f"Failed to ingest {numFailed} entries due to missing exposure information.") 

326 

327 

328# Photodiode implementation begin. 

329class PhotodiodeIngestConfig(IsrCalibIngestConfig): 

330 """Configuration class for PhotodiodeIngestTask.""" 

331 doRaiseOnMissingExposure = Field( 

332 dtype=bool, 

333 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?", 

334 default=True 

335 ) 

336 

337 

338class PhotodiodeIngestTask(IsrCalibIngestTask): 

339 """Task to ingest photodiode data into a butler repository. 

340 

341 Parameters 

342 ---------- 

343 config : `PhotodiodeIngestConfig` 

344 Configuration for the task. 

345 instrument : `~lsst.obs.base.Instrument` 

346 The instrument these photodiode datasets are from. 

347 butler : `~lsst.daf.butler.Butler` 

348 Writable butler instance, with ``butler.run`` set to the 

349 appropriate `~lsst.daf.butler.CollectionType.RUN` collection 

350 for these datasets. 

351 **kwargs 

352 Additional keyword arguments. 

353 """ 

354 

355 ConfigClass = PhotodiodeIngestConfig 

356 _DefaultName = "photodiodeIngest" 

357 

358 def getDatasetType(self): 

359 return DatasetType( 

360 "photodiode", 

361 ("instrument", "exposure"), 

362 "IsrCalib", 

363 universe=self.universe, 

364 ) 

365 

366 def getDestinationCollection(self): 

367 return self.instrument.makeCollectionName("calib", "photodiode") 

368 

369 def readCalibFromFile(self, inputFile): 

370 try: 

371 # Try reading as a fits file. This is the 2025 

372 # standard, but make sure to include the format 

373 # version so we can parse that below. 

374 with inputFile.as_local() as localFile: 

375 calib = PhotodiodeCalib.readFits(localFile.ospath) 

376 fitsVersion = int(calib.getMetadata().get("FORMAT_V", 1)) 

377 calibType = f"fits-v{fitsVersion:d}" 

378 return calib, calibType 

379 except Exception: 

380 try: 

381 # Try reading as a text file 

382 with inputFile.as_local() as localFile: 

383 calib = PhotodiodeCalib.readText(localFile.ospath) 

384 # This is "full" in that it has everything needed to 

385 # be read from text. 

386 calibType = "full" 

387 return calib, calibType 

388 except Exception: 

389 # Try reading as a two-column file. This was the 

390 # older version. 

391 try: 

392 with inputFile.as_local() as localFile: 

393 calib = PhotodiodeCalib.readTwoColumnPhotodiodeData(localFile.ospath) 

394 calibType = "two-column" 

395 return calib, calibType 

396 except Exception: 

397 return None, "Unknown" 

398 # Code should never get here 

399 return None, "Unknown" 

400 

401 def getAssociationInfo(self, inputFile, calib, calibType): 

402 # GET INFO BLOCK 

403 # Get exposure records so we can associate the photodiode 

404 # to the exposure. 

405 if calibType == "fits-v1": 

406 instrumentName = calib.metadata.get("INSTRUME") 

407 if instrumentName is None or instrumentName != self.instrument.getName(): 

408 # The field is populated by the calib class, so we 

409 # can't use defaults. 

410 instrumentName = self.instrument.getName() 

411 

412 # This format uses the GROUPID to match what is set in 

413 # the exposure. Validate this to be of the form: 

414 # {initial_group}#{unique identifier}, neither of 

415 # which should be blank. 

416 groupId = calib.metadata.get("GROUPID") 

417 validGroup = True 

418 if groupId is None: 

419 validGroup = False 

420 elif "#" not in groupId: 

421 validGroup = False 

422 else: 

423 splitGroup = groupId.split("#") 

424 if len(splitGroup) != 2: 

425 validGroup = False 

426 if splitGroup[0] == "" or splitGroup[1] == "": 

427 validGroup = False 

428 if not validGroup: 

429 self.log.warning("Skipping input file %s with malformed group %s.", 

430 inputFile, groupId) 

431 return None, None, None, groupId 

432 

433 whereClause = "exposure.group=groupId" 

434 binding = {"groupId": groupId} 

435 logId = groupId 

436 elif calibType == "full": 

437 instrumentName = calib.getMetadata().get("INSTRUME") 

438 if instrumentName is None: 

439 # The field is populated by the calib class, so we 

440 # can't use defaults. 

441 instrumentName = self.instrument.getName() 

442 

443 # This format uses the obsId to match what is set in 

444 # the exposure. 

445 obsId = calib.getMetadata()["obsId"] 

446 whereClause = "exposure.obs_id=obsId" 

447 binding = {"obsId": obsId} 

448 logId = obsId 

449 elif calibType == "two-column": 

450 dayObs = calib.getMetadata()["day_obs"] 

451 seqNum = calib.getMetadata()["seq_num"] 

452 

453 # This format uses dayObs and seqNum to match what is 

454 # set in the exposure. 

455 whereClause = "exposure.day_obs=dayObs and exposure.seq_num=seqNum" 

456 instrumentName = self.instrument.getName() 

457 binding = {"dayObs": dayObs, "seqNum": seqNum} 

458 logId = (dayObs, seqNum) 

459 else: 

460 # We've failed somewhere to reach this point 

461 instrumentName = None 

462 whereClause = None 

463 binding = None 

464 logId = None 

465 

466 return instrumentName, whereClause, binding, logId 

467 

468 

469# Shutter Motion Open / Base Class begin: 

470class ShutterMotionOpenIngestConfig(IsrCalibIngestConfig): 

471 """Configuration class for ShutterMotionIngestTask.""" 

472 doRaiseOnMissingExposure = Field( 

473 dtype=bool, 

474 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?", 

475 default=False 

476 ) 

477 

478 

479class ShutterMotionOpenIngestTask(IsrCalibIngestTask): 

480 """Task to ingest shutter motion profiles into a butler repository. 

481 

482 This task specifically works on the "open" profile. 

483 

484 Parameters 

485 ---------- 

486 config : `ShutterMotionIngestConfig` 

487 Configuration for the task. 

488 instrument : `~lsst.obs.base.Instrument` 

489 The instrument these datasets are from. 

490 butler : `~lsst.daf.butler.Butler` 

491 Writable butler instance, with ``butler.run`` set to the 

492 appropriate `~lsst.daf.butler.CollectionType.RUN` collection 

493 for these datasets. 

494 **kwargs 

495 Additional keyword arguments. 

496 """ 

497 

498 ConfigClass = ShutterMotionOpenIngestConfig 

499 _DefaultName = "shutterMotionOpenIngest" 

500 

501 def getDatasetType(self): 

502 return DatasetType( 

503 "shutterMotionProfileOpen", 

504 ("instrument", "exposure"), 

505 "IsrCalib", 

506 universe=self.universe, 

507 ) 

508 

509 def getDestinationCollection(self): 

510 return self.instrument.makeCollectionName("calib", "shutterMotion") 

511 

512 def readCalibFromFile(self, inputFile): 

513 try: 

514 # Try reading as a json file. This is the 2025 

515 # standard, but make sure to include the format 

516 # version so we can parse that below. 

517 with inputFile.as_local() as localFile: 

518 calib = ShutterMotionProfile.readText(localFile.ospath) 

519 fitsVersion = int(calib.getMetadata().get("FORMAT_V", 1)) 

520 calibType = f"text-v{fitsVersion:d}" 

521 return calib, calibType 

522 except Exception: 

523 return None, "Unknown" 

524 

525 # Code should never get here 

526 return None, "Unknown" 

527 

528 def getAssociationInfo(self, inputFile, calib, calibType): 

529 # Get exposure records so we can associate the dataset 

530 # to the exposure. 

531 if calibType == "text-v1" or calibType == "text-v2": 

532 instrumentName = calib.metadata.get("INSTRUME") 

533 if instrumentName is None or instrumentName != self.instrument.getName(): 

534 # The field is populated by the calib class, so we 

535 # can't use defaults. 

536 instrumentName = self.instrument.getName() 

537 

538 # This format uses the GROUPID to match what is set in 

539 # the exposure. Validate this to be of the form: 

540 # {initial_group}#{unique identifier}, neither of 

541 # which should be blank. 

542 obsId = calib.metadata.get("obsId") 

543 if obsId is None: 

544 self.log.warning("Skipping input file %s with malformed obsId %s.", 

545 inputFile, obsId) 

546 return None, None, None, obsId 

547 

548 whereClause = "exposure.obs_id=obsId" 

549 binding = {"obsId": obsId} 

550 logId = obsId 

551 else: 

552 # We've failed somewhere to reach this point 

553 instrumentName = None 

554 whereClause = None 

555 binding = None 

556 logId = None 

557 

558 return instrumentName, whereClause, binding, logId 

559 

560 

561# Shutter Motion Open / Base Class begin: 

562class ShutterMotionCloseIngestConfig(ShutterMotionOpenIngestConfig): 

563 """Configuration class for ShutterMotionIngestTask.""" 

564 pass 

565 

566 

567class ShutterMotionCloseIngestTask(ShutterMotionOpenIngestTask): 

568 """Task to ingest shutter motion profiles into a butler repository. 

569 

570 This task specifically works on the "Close" profile. 

571 

572 Parameters 

573 ---------- 

574 config : `ShutterMotionIngestConfig` 

575 Configuration for the task. 

576 instrument : `~lsst.obs.base.Instrument` 

577 The instrument these profiles are from. 

578 butler : `~lsst.daf.butler.Butler` 

579 Writable butler instance, with ``butler.run`` set to the 

580 appropriate `~lsst.daf.butler.CollectionType.RUN` collection 

581 for these datasets. 

582 **kwargs 

583 Additional keyword arguments. 

584 """ 

585 

586 ConfigClass = ShutterMotionCloseIngestConfig 

587 _DefaultName = "shutterMotionCloseIngest" 

588 

589 def getDatasetType(self): 

590 return DatasetType( 

591 "shutterMotionProfileClose", 

592 ("instrument", "exposure"), 

593 "IsrCalib", 

594 universe=self.universe, 

595 )