Coverage for python / lsst / ip / isr / calibType.py: 12%

369 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:28 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21__all__ = ["IsrCalib", "IsrProvenance"] 

22 

23import abc 

24import datetime 

25import logging 

26import os.path 

27import warnings 

28import yaml 

29import json 

30import numpy as np 

31 

32from astro_metadata_translator import merge_headers 

33from astropy.table import Table, Column 

34from astropy.io import fits 

35 

36from lsst.daf.base import PropertyList 

37from lsst.utils.introspection import get_full_type_name 

38from lsst.utils import doImport 

39 

40 

41class IsrCalib(abc.ABC): 

42 """Generic calibration type. 

43 

44 Subclasses must implement the toDict, fromDict, toTable, fromTable 

45 methods that allow the calibration information to be converted 

46 from dictionaries and afw tables. This will allow the calibration 

47 to be persisted using the base class read/write methods. 

48 

49 The validate method is intended to provide a common way to check 

50 that the calibration is valid (internally consistent) and 

51 appropriate (usable with the intended data). The apply method is 

52 intended to allow the calibration to be applied in a consistent 

53 manner. 

54 

55 Parameters 

56 ---------- 

57 camera : `lsst.afw.cameraGeom.Camera`, optional 

58 Camera to extract metadata from. 

59 detector : `lsst.afw.cameraGeom.Detector`, optional 

60 Detector to extract metadata from. 

61 log : `logging.Logger`, optional 

62 Log for messages. 

63 """ 

64 _OBSTYPE = "generic" 

65 _SCHEMA = "NO SCHEMA" 

66 _VERSION = 0 

67 

68 def __init__(self, camera=None, detector=None, log=None, **kwargs): 

69 self._instrument = None 

70 self._raftName = None 

71 self._slotName = None 

72 self._detectorName = None 

73 self._detectorSerial = None 

74 self._detectorId = None 

75 self._filter = None 

76 self._calibId = None 

77 self._seqfile = None 

78 self._seqname = None 

79 self._seqcksum = None 

80 self._metadata = PropertyList() 

81 self.setMetadata(PropertyList()) 

82 self.calibInfoFromDict(kwargs) 

83 

84 # Define the required attributes for this calibration. These 

85 # are entries that are automatically filled and propagated 

86 # from the metadata. The existence of the attribute is 

87 # required (they're checked for equivalence), but they do not 

88 # necessarily need to have a value (None == None in this 

89 # case). 

90 self.requiredAttributes = set(["_OBSTYPE", "_SCHEMA", "_VERSION"]) 

91 self.requiredAttributes.update(["_instrument", "_raftName", "_slotName", 

92 "_detectorName", "_detectorSerial", "_detectorId", 

93 "_filter", "_calibId", "_seqfile", "_seqname", "_seqcksum", 

94 "_metadata"]) 

95 

96 self.log = log if log else logging.getLogger(__name__) 

97 

98 if detector: 

99 self.fromDetector(detector) 

100 self.updateMetadata(camera=camera, detector=detector) 

101 

102 def __str__(self): 

103 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, detector={self._detectorName}, )" 

104 

105 def __eq__(self, other): 

106 """Calibration equivalence. 

107 

108 Running ``calib.log.setLevel(0)`` enables debug statements to 

109 identify problematic fields. 

110 """ 

111 if not isinstance(other, self.__class__): 

112 self.log.debug("Incorrect class type: %s %s", self.__class__, other.__class__) 

113 return False 

114 

115 for attr in self._requiredAttributes: 

116 attrSelf = getattr(self, attr) 

117 attrOther = getattr(other, attr) 

118 

119 if isinstance(attrSelf, dict): 

120 # Dictionary of arrays. 

121 if attrSelf.keys() != attrOther.keys(): 

122 self.log.debug("Dict Key Failure: %s %s %s", attr, attrSelf.keys(), attrOther.keys()) 

123 return False 

124 for key in attrSelf: 

125 try: 

126 if not np.allclose(attrSelf[key], attrOther[key], equal_nan=True): 

127 self.log.debug("Array Failure: %s %s %s", key, attrSelf[key], attrOther[key]) 

128 return False 

129 except TypeError: 

130 # If it is not something numpy can handle 

131 # (it's not a number or array of numbers), 

132 # then it needs to have its own equivalence 

133 # operator. 

134 if np.all(attrSelf[key] != attrOther[key]): 

135 return False 

136 elif (isinstance(attrSelf, np.ndarray) or isinstance(attrSelf, Column) 

137 or isinstance(attrOther, np.ndarray) or isinstance(attrOther, Column)): 

138 # Bare array. 

139 if isinstance(attrSelf[0], (str, np.str_, np.bytes_)): 

140 if not np.all(attrSelf == attrOther): 

141 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther) 

142 return False 

143 else: 

144 if not np.allclose(attrSelf, attrOther, equal_nan=True): 

145 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther) 

146 return False 

147 elif type(attrSelf) is not type(attrOther): 

148 if set([attrSelf, attrOther]) == set([None, ""]): 

149 # Fits converts None to "", but None is not "". 

150 continue 

151 self.log.debug("Type Failure: %s %s %s %s %s", attr, type(attrSelf), type(attrOther), 

152 attrSelf, attrOther) 

153 return False 

154 else: 

155 if attrSelf != attrOther: 

156 self.log.debug("Value Failure: %s %s %s", attr, attrSelf, attrOther) 

157 return False 

158 

159 return True 

160 

161 @property 

162 def requiredAttributes(self): 

163 return self._requiredAttributes 

164 

165 @requiredAttributes.setter 

166 def requiredAttributes(self, value): 

167 self._requiredAttributes = value 

168 

169 # Property accessor associated with getMetadata(). 

170 @property 

171 def metadata(self): 

172 return self._metadata 

173 

174 def getMetadata(self): 

175 """Retrieve metadata associated with this calibration. 

176 

177 Returns 

178 ------- 

179 meta : `lsst.daf.base.PropertyList` 

180 Metadata. The returned `~lsst.daf.base.PropertyList` can be 

181 modified by the caller and the changes will be written to 

182 external files. 

183 """ 

184 return self._metadata 

185 

186 def setMetadata(self, metadata): 

187 """Store a copy of the supplied metadata with this calibration. 

188 

189 Parameters 

190 ---------- 

191 metadata : `lsst.daf.base.PropertyList` 

192 Metadata to associate with the calibration. Will be copied and 

193 overwrite existing metadata. 

194 """ 

195 if metadata is not None: 

196 self._metadata.update(metadata) 

197 

198 # Ensure that we have the obs type required by calibration ingest 

199 self._metadata["OBSTYPE"] = self._OBSTYPE 

200 self._metadata[self._OBSTYPE + "_SCHEMA"] = self._SCHEMA 

201 self._metadata[self._OBSTYPE + "_VERSION"] = self._VERSION 

202 

203 if isinstance(metadata, dict): 

204 self.calibInfoFromDict(metadata) 

205 elif isinstance(metadata, PropertyList): 

206 self.calibInfoFromDict(metadata.toDict()) 

207 

208 def updateMetadata(self, camera=None, detector=None, filterName=None, 

209 setCalibId=False, setCalibInfo=False, setDate=False, 

210 **kwargs): 

211 """Update metadata keywords with new values. 

212 

213 Parameters 

214 ---------- 

215 camera : `lsst.afw.cameraGeom.Camera`, optional 

216 Reference camera to use to set ``_instrument`` field. 

217 detector : `lsst.afw.cameraGeom.Detector`, optional 

218 Reference detector to use to set ``_detector*`` fields. 

219 filterName : `str`, optional 

220 Filter name to assign to this calibration. 

221 setCalibId : `bool`, optional 

222 Construct the ``_calibId`` field from other fields. 

223 setCalibInfo : `bool`, optional 

224 Set calibration parameters from metadata. 

225 setDate : `bool`, optional 

226 Ensure the metadata ``CALIBDATE`` fields are set to the current 

227 datetime. 

228 kwargs : `dict` or `collections.abc.Mapping`, optional 

229 Set of ``key=value`` pairs to assign to the metadata. 

230 """ 

231 mdOriginal = self.getMetadata() 

232 mdSupplemental = dict() 

233 

234 for k, v in kwargs.items(): 

235 if isinstance(v, fits.card.Undefined): 

236 kwargs[k] = None 

237 

238 if setCalibInfo: 

239 self.calibInfoFromDict(kwargs) 

240 

241 if camera: 

242 self._instrument = camera.getName() 

243 

244 if detector: 

245 self._detectorName = detector.getName() 

246 self._detectorSerial = detector.getSerial() 

247 self._detectorId = detector.getId() 

248 if "_" in self._detectorName: 

249 (self._raftName, self._slotName) = self._detectorName.split("_") 

250 

251 if filterName: 

252 # TOD0 DM-28093: I think this whole comment can go away, if we 

253 # always use physicalLabel everywhere in ip_isr. 

254 # If set via: 

255 # exposure.getInfo().getFilter().getName() 

256 # then this will hold the abstract filter. 

257 self._filter = filterName 

258 

259 if setDate: 

260 date = datetime.datetime.now() 

261 mdSupplemental["CALIBDATE"] = date.isoformat() 

262 mdSupplemental["CALIB_CREATION_DATE"] = date.date().isoformat() 

263 mdSupplemental["CALIB_CREATION_TIME"] = date.time().isoformat() 

264 

265 if setCalibId: 

266 values = [] 

267 values.append(f"instrument={self._instrument}") if self._instrument else None 

268 values.append(f"raftName={self._raftName}") if self._raftName else None 

269 values.append(f"detectorName={self._detectorName}") if self._detectorName else None 

270 values.append(f"detector={self._detectorId}") if self._detectorId else None 

271 values.append(f"filter={self._filter}") if self._filter else None 

272 

273 calibDate = mdOriginal.get("CALIBDATE", mdSupplemental.get("CALIBDATE", None)) 

274 values.append(f"calibDate={calibDate}") if calibDate else None 

275 

276 self._calibId = " ".join(values) 

277 

278 self._metadata["INSTRUME"] = self._instrument if self._instrument else None 

279 self._metadata["RAFTNAME"] = self._raftName if self._raftName else None 

280 self._metadata["SLOTNAME"] = self._slotName if self._slotName else None 

281 self._metadata["DETECTOR"] = self._detectorId 

282 self._metadata["DET_NAME"] = self._detectorName if self._detectorName else None 

283 self._metadata["DET_SER"] = self._detectorSerial if self._detectorSerial else None 

284 self._metadata["FILTER"] = self._filter if self._filter else None 

285 self._metadata["CALIB_ID"] = self._calibId if self._calibId else None 

286 self._metadata["SEQFILE"] = self._seqfile if self._seqfile else None 

287 self._metadata["SEQNAME"] = self._seqname if self._seqname else None 

288 self._metadata["SEQCKSUM"] = self._seqcksum if self._seqcksum else None 

289 self._metadata["CALIBCLS"] = get_full_type_name(self) 

290 

291 mdSupplemental.update(kwargs) 

292 mdOriginal.update(mdSupplemental) 

293 

294 def updateMetadataFromExposures(self, exposures): 

295 """Extract and unify metadata information. 

296 

297 Parameters 

298 ---------- 

299 exposures : `list` 

300 Exposures or other calibrations to scan. 

301 """ 

302 # This list of keywords is the set of header entries that 

303 # should be checked and propagated. Not having an entry is 

304 # not a failure, as they may not be defined for the exposures 

305 # being used. 

306 keywords = ["SEQNAME", "SEQFILE", "SEQCKSUM", "ODP", "AP0_RC"] 

307 metadata = {} 

308 

309 for exp in exposures: 

310 try: 

311 expMeta = exp.getMetadata() 

312 except AttributeError: 

313 continue 

314 for key in keywords: 

315 if key in expMeta: 

316 if key in metadata: 

317 if metadata[key] != expMeta[key]: 

318 self.log.warning("Metadata mismatch! Have: %s Found %s", 

319 metadata[key], expMeta[key]) 

320 else: 

321 metadata[key] = expMeta[key] 

322 

323 self.updateMetadata(**metadata, setCalibInfo=True) 

324 

325 def calibInfoFromDict(self, dictionary): 

326 """Handle common keywords. 

327 

328 This isn't an ideal solution, but until all calibrations 

329 expect to find everything in the metadata, they still need to 

330 search through dictionaries. 

331 

332 Parameters 

333 ---------- 

334 dictionary : `dict` or `lsst.daf.base.PropertyList` 

335 Source for the common keywords. 

336 

337 Raises 

338 ------ 

339 RuntimeError 

340 Raised if the dictionary does not match the expected OBSTYPE. 

341 """ 

342 

343 def search(haystack, needles): 

344 """Search dictionary 'haystack' for an entry in 'needles' 

345 """ 

346 test = [haystack.get(x) for x in needles] 

347 test = set([x for x in test if x is not None]) 

348 if len(test) == 0: 

349 if "metadata" in haystack: 

350 return search(haystack["metadata"], needles) 

351 else: 

352 return None 

353 elif len(test) == 1: 

354 value = list(test)[0] 

355 if value == "": 

356 return None 

357 else: 

358 return value 

359 else: 

360 raise ValueError(f"Too many values found: {len(test)} {test} {needles}") 

361 

362 if "metadata" in dictionary: 

363 metadata = dictionary["metadata"] 

364 

365 if self._OBSTYPE != metadata["OBSTYPE"]: 

366 raise RuntimeError(f"Incorrect calibration supplied. Expected {self._OBSTYPE}, " 

367 f"found {metadata['OBSTYPE']}") 

368 

369 if (value := search(dictionary, ["INSTRUME", "instrument"])) is not None: 

370 self._instrument = value 

371 if (value := search(dictionary, ["RAFTNAME"])) is not None: 

372 self._slotName = value 

373 if (value := search(dictionary, ["DETECTOR", "detectorId"])) is not None: 

374 self._detectorId = value 

375 if (value := search(dictionary, ["DET_NAME", "DETECTOR_NAME", "detectorName"])) is not None: 

376 self._detectorName = value 

377 if (value := search(dictionary, ["DET_SER", "DETECTOR_SERIAL", "detectorSerial"])) is not None: 

378 self._detectorSerial = value 

379 if (value := search(dictionary, ["FILTER", "filterName"])) is not None: 

380 self._filter = value 

381 if (value := search(dictionary, ["CALIB_ID"])) is not None: 

382 self._calibId = value 

383 if (value := search(dictionary, ["SEQFILE"])) is not None: 

384 self._seqfile = value 

385 if (value := search(dictionary, ["SEQNAME"])) is not None: 

386 self._seqname = value 

387 if (value := search(dictionary, ["SEQCKSUM"])) is not None: 

388 self._seqcksum = value 

389 

390 @classmethod 

391 def determineCalibClass(cls, metadata, message): 

392 """Attempt to find calibration class in metadata. 

393 

394 Parameters 

395 ---------- 

396 metadata : `dict` or `lsst.daf.base.PropertyList` 

397 Metadata possibly containing a calibration class entry. 

398 message : `str` 

399 Message to include in any errors. 

400 

401 Returns 

402 ------- 

403 calibClass : `object` 

404 The class to use to read the file contents. Should be an 

405 `lsst.ip.isr.IsrCalib` subclass. 

406 

407 Raises 

408 ------ 

409 ValueError 

410 Raised if the resulting calibClass is the base 

411 `lsst.ip.isr.IsrClass` (which does not implement the 

412 content methods). 

413 """ 

414 calibClassName = metadata.get("CALIBCLS") 

415 if calibClassName is None: 

416 calibClassName = metadata.get("fileType") 

417 if calibClassName == "shutterMotionProfile": 

418 calibClassName = "lsst.ip.isr.ShutterMotionProfile" 

419 calibClass = doImport(calibClassName) if calibClassName is not None else cls 

420 if calibClass is IsrCalib: 

421 raise ValueError(f"Cannot use base class to read calibration data: {message}") 

422 return calibClass 

423 

424 @classmethod 

425 def readText(cls, filename, **kwargs): 

426 """Read calibration representation from a yaml/ecsv file. 

427 

428 Parameters 

429 ---------- 

430 filename : `str` 

431 Name of the file containing the calibration definition. 

432 kwargs : `dict` or collections.abc.Mapping`, optional 

433 Set of key=value pairs to pass to the ``fromDict`` or 

434 ``fromTable`` methods. 

435 

436 Returns 

437 ------- 

438 calib : `~lsst.ip.isr.IsrCalibType` 

439 Calibration class. 

440 

441 Raises 

442 ------ 

443 RuntimeError 

444 Raised if the filename does not end in ".ecsv" or ".yaml". 

445 """ 

446 if filename.endswith((".ecsv", ".ECSV")): 

447 data = Table.read(filename, format="ascii.ecsv") 

448 calibClass = cls.determineCalibClass(data.meta, "readText/ECSV") 

449 return calibClass.fromTable([data], **kwargs) 

450 elif filename.endswith((".yaml", ".YAML")): 

451 with open(filename, "r") as f: 

452 data = yaml.load(f, Loader=yaml.CLoader) 

453 calibClass = cls.determineCalibClass(data["metadata"], "readText/YAML") 

454 return calibClass.fromDict(data, **kwargs) 

455 elif filename.endswith((".json", ".JSON")): 

456 with open(filename, "r") as f: 

457 data = json.load(f) 

458 calibClass = cls.determineCalibClass(data, "readText/JSON") 

459 return calibClass.fromDict(data, **kwargs) 

460 else: 

461 raise RuntimeError(f"Unknown filename extension: {filename}") 

462 

463 def writeText(self, filename, format="auto"): 

464 """Write the calibration data to a text file. 

465 

466 Parameters 

467 ---------- 

468 filename : `str` 

469 Name of the file to write. 

470 format : `str` 

471 Format to write the file as. Supported values are: 

472 ``"auto"`` : Determine filetype from filename. 

473 ``"yaml"`` : Write as yaml. 

474 ``"ecsv"`` : Write as ecsv. 

475 

476 Returns 

477 ------- 

478 used : `str` 

479 The name of the file used to write the data. This may 

480 differ from the input if the format is explicitly chosen. 

481 

482 Raises 

483 ------ 

484 RuntimeError 

485 Raised if filename does not end in a known extension, or 

486 if all information cannot be written. 

487 

488 Notes 

489 ----- 

490 The file is written to YAML/ECSV format and will include any 

491 associated metadata. 

492 """ 

493 if format == "yaml" or (format == "auto" and filename.lower().endswith((".yaml", ".YAML"))): 

494 outDict = self.toDict() 

495 path, ext = os.path.splitext(filename) 

496 filename = path + ".yaml" 

497 with open(filename, "w") as f: 

498 yaml.dump(outDict, f) 

499 elif format == "ecsv" or (format == "auto" and filename.lower().endswith((".ecsv", ".ECSV"))): 

500 tableList = self.toTable() 

501 if len(tableList) > 1: 

502 # ECSV doesn't support multiple tables per file, so we 

503 # can only write the first table. 

504 raise RuntimeError(f"Unable to persist {len(tableList)}tables in ECSV format.") 

505 

506 table = tableList[0] 

507 path, ext = os.path.splitext(filename) 

508 filename = path + ".ecsv" 

509 table.write(filename, format="ascii.ecsv") 

510 else: 

511 raise RuntimeError(f"Attempt to write to a file {filename} " 

512 "that does not end in '.yaml' or '.ecsv'") 

513 

514 return filename 

515 

516 @classmethod 

517 def readFits(cls, filename, **kwargs): 

518 """Read calibration data from a FITS file. 

519 

520 Parameters 

521 ---------- 

522 filename : `str` 

523 Filename to read data from. 

524 kwargs : `dict` or collections.abc.Mapping`, optional 

525 Set of key=value pairs to pass to the ``fromTable`` 

526 method. 

527 

528 Returns 

529 ------- 

530 calib : `lsst.ip.isr.IsrCalib` 

531 Calibration contained within the file. 

532 """ 

533 tableList = [] 

534 tableList.append(Table.read(filename, hdu=1, mask_invalid=False)) 

535 extNum = 2 # Fits indices start at 1, we've read one already. 

536 keepTrying = True 

537 

538 while keepTrying: 

539 with warnings.catch_warnings(): 

540 warnings.simplefilter("error") 

541 try: 

542 newTable = Table.read(filename, hdu=extNum, mask_invalid=False) 

543 tableList.append(newTable) 

544 extNum += 1 

545 except Exception: 

546 keepTrying = False 

547 

548 calibClass = cls.determineCalibClass(tableList[0].meta, "readFits") 

549 for table in tableList: 

550 if calibClass._OBSTYPE == "BF_DISTORTION_MATRIX": 

551 table.convert_bytestring_to_unicode() 

552 for k, v in table.meta.items(): 

553 if isinstance(v, fits.card.Undefined): 

554 table.meta[k] = None 

555 

556 if calibClass._OBSTYPE in ("PHOTODIODE", ): 

557 # Merge primary header, as these types store information 

558 # there. 

559 with fits.open(filename) as hdul: 

560 primaryHeader = hdul[0].header 

561 tableList[0].meta = merge_headers([tableList[0].meta, primaryHeader], mode="first") 

562 

563 return calibClass.fromTable(tableList, **kwargs) 

564 

565 def writeFits(self, filename): 

566 """Write calibration data to a FITS file. 

567 

568 Parameters 

569 ---------- 

570 filename : `str` 

571 Filename to write data to. 

572 

573 Returns 

574 ------- 

575 used : `str` 

576 The name of the file used to write the data. 

577 """ 

578 tableList = self.toTable() 

579 with warnings.catch_warnings(): 

580 warnings.filterwarnings("ignore", category=Warning, module="astropy.io") 

581 astropyList = [fits.table_to_hdu(table) for table in tableList] 

582 astropyList.insert(0, fits.PrimaryHDU()) 

583 

584 writer = fits.HDUList(astropyList) 

585 writer.writeto(filename, overwrite=True) 

586 return filename 

587 

588 def fromDetector(self, detector): 

589 """Modify the calibration parameters to match the supplied detector. 

590 

591 Parameters 

592 ---------- 

593 detector : `lsst.afw.cameraGeom.Detector` 

594 Detector to use to set parameters from. 

595 

596 Raises 

597 ------ 

598 NotImplementedError 

599 Raised if not implemented by a subclass. 

600 This needs to be implemented by subclasses for each 

601 calibration type. 

602 """ 

603 raise NotImplementedError("Must be implemented by subclass.") 

604 

605 @classmethod 

606 def fromDict(cls, dictionary, **kwargs): 

607 """Construct a calibration from a dictionary of properties. 

608 

609 Must be implemented by the specific calibration subclasses. 

610 

611 Parameters 

612 ---------- 

613 dictionary : `dict` 

614 Dictionary of properties. 

615 kwargs : `dict` or collections.abc.Mapping`, optional 

616 Set of key=value options. 

617 

618 Returns 

619 ------- 

620 calib : `lsst.ip.isr.CalibType` 

621 Constructed calibration. 

622 

623 Raises 

624 ------ 

625 NotImplementedError 

626 Raised if not implemented. 

627 """ 

628 raise NotImplementedError("Must be implemented by subclass.") 

629 

630 def toDict(self): 

631 """Return a dictionary containing the calibration properties. 

632 

633 The dictionary should be able to be round-tripped through 

634 `fromDict`. 

635 

636 Returns 

637 ------- 

638 dictionary : `dict` 

639 Dictionary of properties. 

640 

641 Raises 

642 ------ 

643 NotImplementedError 

644 Raised if not implemented. 

645 """ 

646 raise NotImplementedError("Must be implemented by subclass.") 

647 

648 @classmethod 

649 def fromTable(cls, tableList, **kwargs): 

650 """Construct a calibration from a dictionary of properties. 

651 

652 Must be implemented by the specific calibration subclasses. 

653 

654 Parameters 

655 ---------- 

656 tableList : `list` [`lsst.afw.table.Table`] 

657 List of tables of properties. 

658 kwargs : `dict` or collections.abc.Mapping`, optional 

659 Set of key=value options. 

660 

661 Returns 

662 ------- 

663 calib : `lsst.ip.isr.CalibType` 

664 Constructed calibration. 

665 

666 Raises 

667 ------ 

668 NotImplementedError 

669 Raised if not implemented. 

670 """ 

671 raise NotImplementedError("Must be implemented by subclass.") 

672 

673 def toTable(self): 

674 """Return a list of tables containing the calibration properties. 

675 

676 The table list should be able to be round-tripped through 

677 `fromDict`. 

678 

679 Returns 

680 ------- 

681 tableList : `list` [`lsst.afw.table.Table`] 

682 List of tables of properties. 

683 

684 Raises 

685 ------ 

686 NotImplementedError 

687 Raised if not implemented. 

688 """ 

689 raise NotImplementedError("Must be implemented by subclass.") 

690 

691 def validate(self, other=None): 

692 """Validate that this calibration is defined and can be used. 

693 

694 Parameters 

695 ---------- 

696 other : `object`, optional 

697 Thing to validate against. 

698 

699 Returns 

700 ------- 

701 valid : `bool` 

702 Returns true if the calibration is valid and appropriate. 

703 """ 

704 return False 

705 

706 def apply(self, target): 

707 """Method to apply the calibration to the target object. 

708 

709 Parameters 

710 ---------- 

711 target : `object` 

712 Thing to validate against. 

713 

714 Returns 

715 ------- 

716 valid : `bool` 

717 Returns true if the calibration was applied correctly. 

718 

719 Raises 

720 ------ 

721 NotImplementedError 

722 Raised if not implemented. 

723 """ 

724 raise NotImplementedError("Must be implemented by subclass.") 

725 

726 

727class IsrProvenance(IsrCalib): 

728 """Class for the provenance of data used to construct calibration. 

729 

730 Provenance is not really a calibration, but we would like to 

731 record this when constructing the calibration, and it provides an 

732 example of the base calibration class. 

733 

734 Parameters 

735 ---------- 

736 instrument : `str`, optional 

737 Name of the instrument the data was taken with. 

738 calibType : `str`, optional 

739 Type of calibration this provenance was generated for. 

740 detectorName : `str`, optional 

741 Name of the detector this calibration is for. 

742 detectorSerial : `str`, optional 

743 Identifier for the detector. 

744 

745 """ 

746 _OBSTYPE = "IsrProvenance" 

747 

748 def __init__(self, calibType="unknown", 

749 **kwargs): 

750 self.calibType = calibType 

751 self.dimensions = set() 

752 self.dataIdList = list() 

753 

754 super().__init__(**kwargs) 

755 

756 self.requiredAttributes.update(["calibType", "dimensions", "dataIdList"]) 

757 

758 def __str__(self): 

759 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, calibType={self.calibType}, )" 

760 

761 def __eq__(self, other): 

762 return super().__eq__(other) 

763 

764 def updateMetadata(self, setDate=False, **kwargs): 

765 """Update calibration metadata. 

766 

767 Parameters 

768 ---------- 

769 setDate : `bool`, optional 

770 Update the ``CALIBDATE`` fields in the metadata to the current 

771 time. Defaults to False. 

772 kwargs : `dict` or `collections.abc.Mapping`, optional 

773 Other keyword parameters to set in the metadata. 

774 """ 

775 kwargs["calibType"] = self.calibType 

776 super().updateMetadata(setDate=setDate, **kwargs) 

777 

778 def fromDataIds(self, dataIdList): 

779 """Update provenance from dataId List. 

780 

781 Parameters 

782 ---------- 

783 dataIdList : `list` [`lsst.daf.butler.DataId`] 

784 List of dataIds used in generating this calibration. 

785 """ 

786 for dataId in dataIdList: 

787 for key in dataId: 

788 if key not in self.dimensions: 

789 self.dimensions.add(key) 

790 self.dataIdList.append(dataId) 

791 

792 @classmethod 

793 def fromTable(cls, tableList): 

794 """Construct provenance from table list. 

795 

796 Parameters 

797 ---------- 

798 tableList : `list` [`lsst.afw.table.Table`] 

799 List of tables to construct the provenance from. 

800 

801 Returns 

802 ------- 

803 provenance : `lsst.ip.isr.IsrProvenance` 

804 The provenance defined in the tables. 

805 """ 

806 table = tableList[0] 

807 metadata = table.meta 

808 inDict = dict() 

809 inDict["metadata"] = metadata 

810 inDict["calibType"] = metadata["calibType"] 

811 inDict["dimensions"] = set() 

812 inDict["dataIdList"] = list() 

813 

814 schema = dict() 

815 for colName in table.columns: 

816 schema[colName.lower()] = colName 

817 inDict["dimensions"].add(colName.lower()) 

818 inDict["dimensions"] = sorted(inDict["dimensions"]) 

819 

820 for row in table: 

821 entry = dict() 

822 for dim in sorted(inDict["dimensions"]): 

823 entry[dim] = row[schema[dim]] 

824 inDict["dataIdList"].append(entry) 

825 

826 return cls.fromDict(inDict) 

827 

828 @classmethod 

829 def fromDict(cls, dictionary): 

830 """Construct provenance from a dictionary. 

831 

832 Parameters 

833 ---------- 

834 dictionary : `dict` 

835 Dictionary of provenance parameters. 

836 

837 Returns 

838 ------- 

839 provenance : `lsst.ip.isr.IsrProvenance` 

840 The provenance defined in the tables. 

841 """ 

842 calib = cls() 

843 if calib._OBSTYPE != dictionary["metadata"]["OBSTYPE"]: 

844 raise RuntimeError(f"Incorrect calibration supplied. Expected {calib._OBSTYPE}, " 

845 f"found {dictionary['metadata']['OBSTYPE']}") 

846 calib.updateMetadata(setDate=False, setCalibInfo=True, **dictionary["metadata"]) 

847 

848 # These properties should be in the metadata, but occasionally 

849 # are found in the dictionary itself. Check both places, 

850 # ending with `None` if neither contains the information. 

851 calib.calibType = dictionary["calibType"] 

852 calib.dimensions = set(dictionary["dimensions"]) 

853 calib.dataIdList = dictionary["dataIdList"] 

854 

855 calib.updateMetadata() 

856 return calib 

857 

858 def toDict(self): 

859 """Return a dictionary containing the provenance information. 

860 

861 Returns 

862 ------- 

863 dictionary : `dict` 

864 Dictionary of provenance. 

865 """ 

866 self.updateMetadata() 

867 

868 outDict = {} 

869 

870 metadata = self.getMetadata() 

871 outDict["metadata"] = metadata 

872 outDict["detectorName"] = self._detectorName 

873 outDict["detectorSerial"] = self._detectorSerial 

874 outDict["detectorId"] = self._detectorId 

875 outDict["instrument"] = self._instrument 

876 outDict["calibType"] = self.calibType 

877 outDict["dimensions"] = list(self.dimensions) 

878 outDict["dataIdList"] = self.dataIdList 

879 

880 return outDict 

881 

882 def toTable(self): 

883 """Return a list of tables containing the provenance. 

884 

885 This seems inefficient and slow, so this may not be the best 

886 way to store the data. 

887 

888 Returns 

889 ------- 

890 tableList : `list` [`lsst.afw.table.Table`] 

891 List of tables containing the provenance information 

892 """ 

893 tableList = [] 

894 self.updateMetadata(setDate=True, setCalibInfo=True) 

895 

896 catalog = Table(rows=self.dataIdList, 

897 names=self.dimensions) 

898 filteredMetadata = {k: v for k, v in self.getMetadata().toDict().items() if v is not None} 

899 catalog.meta = filteredMetadata 

900 tableList.append(catalog) 

901 return tableList