Coverage for python / lsst / ip / isr / calibType.py: 12%
369 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:07 +0000
1# This file is part of ip_isr.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21__all__ = ["IsrCalib", "IsrProvenance"]
23import abc
24import datetime
25import logging
26import os.path
27import warnings
28import yaml
29import json
30import numpy as np
32from astro_metadata_translator import merge_headers
33from astropy.table import Table, Column
34from astropy.io import fits
36from lsst.daf.base import PropertyList
37from lsst.utils.introspection import get_full_type_name
38from lsst.utils import doImport
41class IsrCalib(abc.ABC):
42 """Generic calibration type.
44 Subclasses must implement the toDict, fromDict, toTable, fromTable
45 methods that allow the calibration information to be converted
46 from dictionaries and afw tables. This will allow the calibration
47 to be persisted using the base class read/write methods.
49 The validate method is intended to provide a common way to check
50 that the calibration is valid (internally consistent) and
51 appropriate (usable with the intended data). The apply method is
52 intended to allow the calibration to be applied in a consistent
53 manner.
55 Parameters
56 ----------
57 camera : `lsst.afw.cameraGeom.Camera`, optional
58 Camera to extract metadata from.
59 detector : `lsst.afw.cameraGeom.Detector`, optional
60 Detector to extract metadata from.
61 log : `logging.Logger`, optional
62 Log for messages.
63 """
64 _OBSTYPE = "generic"
65 _SCHEMA = "NO SCHEMA"
66 _VERSION = 0
68 def __init__(self, camera=None, detector=None, log=None, **kwargs):
69 self._instrument = None
70 self._raftName = None
71 self._slotName = None
72 self._detectorName = None
73 self._detectorSerial = None
74 self._detectorId = None
75 self._filter = None
76 self._calibId = None
77 self._seqfile = None
78 self._seqname = None
79 self._seqcksum = None
80 self._metadata = PropertyList()
81 self.setMetadata(PropertyList())
82 self.calibInfoFromDict(kwargs)
84 # Define the required attributes for this calibration. These
85 # are entries that are automatically filled and propagated
86 # from the metadata. The existence of the attribute is
87 # required (they're checked for equivalence), but they do not
88 # necessarily need to have a value (None == None in this
89 # case).
90 self.requiredAttributes = set(["_OBSTYPE", "_SCHEMA", "_VERSION"])
91 self.requiredAttributes.update(["_instrument", "_raftName", "_slotName",
92 "_detectorName", "_detectorSerial", "_detectorId",
93 "_filter", "_calibId", "_seqfile", "_seqname", "_seqcksum",
94 "_metadata"])
96 self.log = log if log else logging.getLogger(__name__)
98 if detector:
99 self.fromDetector(detector)
100 self.updateMetadata(camera=camera, detector=detector)
102 def __str__(self):
103 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, detector={self._detectorName}, )"
105 def __eq__(self, other):
106 """Calibration equivalence.
108 Running ``calib.log.setLevel(0)`` enables debug statements to
109 identify problematic fields.
110 """
111 if not isinstance(other, self.__class__):
112 self.log.debug("Incorrect class type: %s %s", self.__class__, other.__class__)
113 return False
115 for attr in self._requiredAttributes:
116 attrSelf = getattr(self, attr)
117 attrOther = getattr(other, attr)
119 if isinstance(attrSelf, dict):
120 # Dictionary of arrays.
121 if attrSelf.keys() != attrOther.keys():
122 self.log.debug("Dict Key Failure: %s %s %s", attr, attrSelf.keys(), attrOther.keys())
123 return False
124 for key in attrSelf:
125 try:
126 if not np.allclose(attrSelf[key], attrOther[key], equal_nan=True):
127 self.log.debug("Array Failure: %s %s %s", key, attrSelf[key], attrOther[key])
128 return False
129 except TypeError:
130 # If it is not something numpy can handle
131 # (it's not a number or array of numbers),
132 # then it needs to have its own equivalence
133 # operator.
134 if np.all(attrSelf[key] != attrOther[key]):
135 return False
136 elif (isinstance(attrSelf, np.ndarray) or isinstance(attrSelf, Column)
137 or isinstance(attrOther, np.ndarray) or isinstance(attrOther, Column)):
138 # Bare array.
139 if isinstance(attrSelf[0], (str, np.str_, np.bytes_)):
140 if not np.all(attrSelf == attrOther):
141 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther)
142 return False
143 else:
144 if not np.allclose(attrSelf, attrOther, equal_nan=True):
145 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther)
146 return False
147 elif type(attrSelf) is not type(attrOther):
148 if set([attrSelf, attrOther]) == set([None, ""]):
149 # Fits converts None to "", but None is not "".
150 continue
151 self.log.debug("Type Failure: %s %s %s %s %s", attr, type(attrSelf), type(attrOther),
152 attrSelf, attrOther)
153 return False
154 else:
155 if attrSelf != attrOther:
156 self.log.debug("Value Failure: %s %s %s", attr, attrSelf, attrOther)
157 return False
159 return True
161 @property
162 def requiredAttributes(self):
163 return self._requiredAttributes
165 @requiredAttributes.setter
166 def requiredAttributes(self, value):
167 self._requiredAttributes = value
169 # Property accessor associated with getMetadata().
170 @property
171 def metadata(self):
172 return self._metadata
174 def getMetadata(self):
175 """Retrieve metadata associated with this calibration.
177 Returns
178 -------
179 meta : `lsst.daf.base.PropertyList`
180 Metadata. The returned `~lsst.daf.base.PropertyList` can be
181 modified by the caller and the changes will be written to
182 external files.
183 """
184 return self._metadata
186 def setMetadata(self, metadata):
187 """Store a copy of the supplied metadata with this calibration.
189 Parameters
190 ----------
191 metadata : `lsst.daf.base.PropertyList`
192 Metadata to associate with the calibration. Will be copied and
193 overwrite existing metadata.
194 """
195 if metadata is not None:
196 self._metadata.update(metadata)
198 # Ensure that we have the obs type required by calibration ingest
199 self._metadata["OBSTYPE"] = self._OBSTYPE
200 self._metadata[self._OBSTYPE + "_SCHEMA"] = self._SCHEMA
201 self._metadata[self._OBSTYPE + "_VERSION"] = self._VERSION
203 if isinstance(metadata, dict):
204 self.calibInfoFromDict(metadata)
205 elif isinstance(metadata, PropertyList):
206 self.calibInfoFromDict(metadata.toDict())
208 def updateMetadata(self, camera=None, detector=None, filterName=None,
209 setCalibId=False, setCalibInfo=False, setDate=False,
210 **kwargs):
211 """Update metadata keywords with new values.
213 Parameters
214 ----------
215 camera : `lsst.afw.cameraGeom.Camera`, optional
216 Reference camera to use to set ``_instrument`` field.
217 detector : `lsst.afw.cameraGeom.Detector`, optional
218 Reference detector to use to set ``_detector*`` fields.
219 filterName : `str`, optional
220 Filter name to assign to this calibration.
221 setCalibId : `bool`, optional
222 Construct the ``_calibId`` field from other fields.
223 setCalibInfo : `bool`, optional
224 Set calibration parameters from metadata.
225 setDate : `bool`, optional
226 Ensure the metadata ``CALIBDATE`` fields are set to the current
227 datetime.
228 kwargs : `dict` or `collections.abc.Mapping`, optional
229 Set of ``key=value`` pairs to assign to the metadata.
230 """
231 mdOriginal = self.getMetadata()
232 mdSupplemental = dict()
234 for k, v in kwargs.items():
235 if isinstance(v, fits.card.Undefined):
236 kwargs[k] = None
238 if setCalibInfo:
239 self.calibInfoFromDict(kwargs)
241 if camera:
242 self._instrument = camera.getName()
244 if detector:
245 self._detectorName = detector.getName()
246 self._detectorSerial = detector.getSerial()
247 self._detectorId = detector.getId()
248 if "_" in self._detectorName:
249 (self._raftName, self._slotName) = self._detectorName.split("_")
251 if filterName:
252 # TOD0 DM-28093: I think this whole comment can go away, if we
253 # always use physicalLabel everywhere in ip_isr.
254 # If set via:
255 # exposure.getInfo().getFilter().getName()
256 # then this will hold the abstract filter.
257 self._filter = filterName
259 if setDate:
260 date = datetime.datetime.now()
261 mdSupplemental["CALIBDATE"] = date.isoformat()
262 mdSupplemental["CALIB_CREATION_DATE"] = date.date().isoformat()
263 mdSupplemental["CALIB_CREATION_TIME"] = date.time().isoformat()
265 if setCalibId:
266 values = []
267 values.append(f"instrument={self._instrument}") if self._instrument else None
268 values.append(f"raftName={self._raftName}") if self._raftName else None
269 values.append(f"detectorName={self._detectorName}") if self._detectorName else None
270 values.append(f"detector={self._detectorId}") if self._detectorId else None
271 values.append(f"filter={self._filter}") if self._filter else None
273 calibDate = mdOriginal.get("CALIBDATE", mdSupplemental.get("CALIBDATE", None))
274 values.append(f"calibDate={calibDate}") if calibDate else None
276 self._calibId = " ".join(values)
278 self._metadata["INSTRUME"] = self._instrument if self._instrument else None
279 self._metadata["RAFTNAME"] = self._raftName if self._raftName else None
280 self._metadata["SLOTNAME"] = self._slotName if self._slotName else None
281 self._metadata["DETECTOR"] = self._detectorId
282 self._metadata["DET_NAME"] = self._detectorName if self._detectorName else None
283 self._metadata["DET_SER"] = self._detectorSerial if self._detectorSerial else None
284 self._metadata["FILTER"] = self._filter if self._filter else None
285 self._metadata["CALIB_ID"] = self._calibId if self._calibId else None
286 self._metadata["SEQFILE"] = self._seqfile if self._seqfile else None
287 self._metadata["SEQNAME"] = self._seqname if self._seqname else None
288 self._metadata["SEQCKSUM"] = self._seqcksum if self._seqcksum else None
289 self._metadata["CALIBCLS"] = get_full_type_name(self)
291 mdSupplemental.update(kwargs)
292 mdOriginal.update(mdSupplemental)
294 def updateMetadataFromExposures(self, exposures):
295 """Extract and unify metadata information.
297 Parameters
298 ----------
299 exposures : `list`
300 Exposures or other calibrations to scan.
301 """
302 # This list of keywords is the set of header entries that
303 # should be checked and propagated. Not having an entry is
304 # not a failure, as they may not be defined for the exposures
305 # being used.
306 keywords = ["SEQNAME", "SEQFILE", "SEQCKSUM", "ODP", "AP0_RC"]
307 metadata = {}
309 for exp in exposures:
310 try:
311 expMeta = exp.getMetadata()
312 except AttributeError:
313 continue
314 for key in keywords:
315 if key in expMeta:
316 if key in metadata:
317 if metadata[key] != expMeta[key]:
318 self.log.warning("Metadata mismatch! Have: %s Found %s",
319 metadata[key], expMeta[key])
320 else:
321 metadata[key] = expMeta[key]
323 self.updateMetadata(**metadata, setCalibInfo=True)
325 def calibInfoFromDict(self, dictionary):
326 """Handle common keywords.
328 This isn't an ideal solution, but until all calibrations
329 expect to find everything in the metadata, they still need to
330 search through dictionaries.
332 Parameters
333 ----------
334 dictionary : `dict` or `lsst.daf.base.PropertyList`
335 Source for the common keywords.
337 Raises
338 ------
339 RuntimeError
340 Raised if the dictionary does not match the expected OBSTYPE.
341 """
343 def search(haystack, needles):
344 """Search dictionary 'haystack' for an entry in 'needles'
345 """
346 test = [haystack.get(x) for x in needles]
347 test = set([x for x in test if x is not None])
348 if len(test) == 0:
349 if "metadata" in haystack:
350 return search(haystack["metadata"], needles)
351 else:
352 return None
353 elif len(test) == 1:
354 value = list(test)[0]
355 if value == "":
356 return None
357 else:
358 return value
359 else:
360 raise ValueError(f"Too many values found: {len(test)} {test} {needles}")
362 if "metadata" in dictionary:
363 metadata = dictionary["metadata"]
365 if self._OBSTYPE != metadata["OBSTYPE"]:
366 raise RuntimeError(f"Incorrect calibration supplied. Expected {self._OBSTYPE}, "
367 f"found {metadata['OBSTYPE']}")
369 if (value := search(dictionary, ["INSTRUME", "instrument"])) is not None:
370 self._instrument = value
371 if (value := search(dictionary, ["RAFTNAME"])) is not None:
372 self._slotName = value
373 if (value := search(dictionary, ["DETECTOR", "detectorId"])) is not None:
374 self._detectorId = value
375 if (value := search(dictionary, ["DET_NAME", "DETECTOR_NAME", "detectorName"])) is not None:
376 self._detectorName = value
377 if (value := search(dictionary, ["DET_SER", "DETECTOR_SERIAL", "detectorSerial"])) is not None:
378 self._detectorSerial = value
379 if (value := search(dictionary, ["FILTER", "filterName"])) is not None:
380 self._filter = value
381 if (value := search(dictionary, ["CALIB_ID"])) is not None:
382 self._calibId = value
383 if (value := search(dictionary, ["SEQFILE"])) is not None:
384 self._seqfile = value
385 if (value := search(dictionary, ["SEQNAME"])) is not None:
386 self._seqname = value
387 if (value := search(dictionary, ["SEQCKSUM"])) is not None:
388 self._seqcksum = value
390 @classmethod
391 def determineCalibClass(cls, metadata, message):
392 """Attempt to find calibration class in metadata.
394 Parameters
395 ----------
396 metadata : `dict` or `lsst.daf.base.PropertyList`
397 Metadata possibly containing a calibration class entry.
398 message : `str`
399 Message to include in any errors.
401 Returns
402 -------
403 calibClass : `object`
404 The class to use to read the file contents. Should be an
405 `lsst.ip.isr.IsrCalib` subclass.
407 Raises
408 ------
409 ValueError
410 Raised if the resulting calibClass is the base
411 `lsst.ip.isr.IsrClass` (which does not implement the
412 content methods).
413 """
414 calibClassName = metadata.get("CALIBCLS")
415 if calibClassName is None:
416 calibClassName = metadata.get("fileType")
417 if calibClassName == "shutterMotionProfile":
418 calibClassName = "lsst.ip.isr.ShutterMotionProfile"
419 calibClass = doImport(calibClassName) if calibClassName is not None else cls
420 if calibClass is IsrCalib:
421 raise ValueError(f"Cannot use base class to read calibration data: {message}")
422 return calibClass
424 @classmethod
425 def readText(cls, filename, **kwargs):
426 """Read calibration representation from a yaml/ecsv file.
428 Parameters
429 ----------
430 filename : `str`
431 Name of the file containing the calibration definition.
432 kwargs : `dict` or collections.abc.Mapping`, optional
433 Set of key=value pairs to pass to the ``fromDict`` or
434 ``fromTable`` methods.
436 Returns
437 -------
438 calib : `~lsst.ip.isr.IsrCalibType`
439 Calibration class.
441 Raises
442 ------
443 RuntimeError
444 Raised if the filename does not end in ".ecsv" or ".yaml".
445 """
446 if filename.endswith((".ecsv", ".ECSV")):
447 data = Table.read(filename, format="ascii.ecsv")
448 calibClass = cls.determineCalibClass(data.meta, "readText/ECSV")
449 return calibClass.fromTable([data], **kwargs)
450 elif filename.endswith((".yaml", ".YAML")):
451 with open(filename, "r") as f:
452 data = yaml.load(f, Loader=yaml.CLoader)
453 calibClass = cls.determineCalibClass(data["metadata"], "readText/YAML")
454 return calibClass.fromDict(data, **kwargs)
455 elif filename.endswith((".json", ".JSON")):
456 with open(filename, "r") as f:
457 data = json.load(f)
458 calibClass = cls.determineCalibClass(data, "readText/JSON")
459 return calibClass.fromDict(data, **kwargs)
460 else:
461 raise RuntimeError(f"Unknown filename extension: {filename}")
463 def writeText(self, filename, format="auto"):
464 """Write the calibration data to a text file.
466 Parameters
467 ----------
468 filename : `str`
469 Name of the file to write.
470 format : `str`
471 Format to write the file as. Supported values are:
472 ``"auto"`` : Determine filetype from filename.
473 ``"yaml"`` : Write as yaml.
474 ``"ecsv"`` : Write as ecsv.
476 Returns
477 -------
478 used : `str`
479 The name of the file used to write the data. This may
480 differ from the input if the format is explicitly chosen.
482 Raises
483 ------
484 RuntimeError
485 Raised if filename does not end in a known extension, or
486 if all information cannot be written.
488 Notes
489 -----
490 The file is written to YAML/ECSV format and will include any
491 associated metadata.
492 """
493 if format == "yaml" or (format == "auto" and filename.lower().endswith((".yaml", ".YAML"))):
494 outDict = self.toDict()
495 path, ext = os.path.splitext(filename)
496 filename = path + ".yaml"
497 with open(filename, "w") as f:
498 yaml.dump(outDict, f)
499 elif format == "ecsv" or (format == "auto" and filename.lower().endswith((".ecsv", ".ECSV"))):
500 tableList = self.toTable()
501 if len(tableList) > 1:
502 # ECSV doesn't support multiple tables per file, so we
503 # can only write the first table.
504 raise RuntimeError(f"Unable to persist {len(tableList)}tables in ECSV format.")
506 table = tableList[0]
507 path, ext = os.path.splitext(filename)
508 filename = path + ".ecsv"
509 table.write(filename, format="ascii.ecsv")
510 else:
511 raise RuntimeError(f"Attempt to write to a file {filename} "
512 "that does not end in '.yaml' or '.ecsv'")
514 return filename
516 @classmethod
517 def readFits(cls, filename, **kwargs):
518 """Read calibration data from a FITS file.
520 Parameters
521 ----------
522 filename : `str`
523 Filename to read data from.
524 kwargs : `dict` or collections.abc.Mapping`, optional
525 Set of key=value pairs to pass to the ``fromTable``
526 method.
528 Returns
529 -------
530 calib : `lsst.ip.isr.IsrCalib`
531 Calibration contained within the file.
532 """
533 tableList = []
534 tableList.append(Table.read(filename, hdu=1, mask_invalid=False))
535 extNum = 2 # Fits indices start at 1, we've read one already.
536 keepTrying = True
538 while keepTrying:
539 with warnings.catch_warnings():
540 warnings.simplefilter("error")
541 try:
542 newTable = Table.read(filename, hdu=extNum, mask_invalid=False)
543 tableList.append(newTable)
544 extNum += 1
545 except Exception:
546 keepTrying = False
548 calibClass = cls.determineCalibClass(tableList[0].meta, "readFits")
549 for table in tableList:
550 if calibClass._OBSTYPE == "BF_DISTORTION_MATRIX":
551 table.convert_bytestring_to_unicode()
552 for k, v in table.meta.items():
553 if isinstance(v, fits.card.Undefined):
554 table.meta[k] = None
556 if calibClass._OBSTYPE in ("PHOTODIODE", ):
557 # Merge primary header, as these types store information
558 # there.
559 with fits.open(filename) as hdul:
560 primaryHeader = hdul[0].header
561 tableList[0].meta = merge_headers([tableList[0].meta, primaryHeader], mode="first")
563 return calibClass.fromTable(tableList, **kwargs)
565 def writeFits(self, filename):
566 """Write calibration data to a FITS file.
568 Parameters
569 ----------
570 filename : `str`
571 Filename to write data to.
573 Returns
574 -------
575 used : `str`
576 The name of the file used to write the data.
577 """
578 tableList = self.toTable()
579 with warnings.catch_warnings():
580 warnings.filterwarnings("ignore", category=Warning, module="astropy.io")
581 astropyList = [fits.table_to_hdu(table) for table in tableList]
582 astropyList.insert(0, fits.PrimaryHDU())
584 writer = fits.HDUList(astropyList)
585 writer.writeto(filename, overwrite=True)
586 return filename
588 def fromDetector(self, detector):
589 """Modify the calibration parameters to match the supplied detector.
591 Parameters
592 ----------
593 detector : `lsst.afw.cameraGeom.Detector`
594 Detector to use to set parameters from.
596 Raises
597 ------
598 NotImplementedError
599 Raised if not implemented by a subclass.
600 This needs to be implemented by subclasses for each
601 calibration type.
602 """
603 raise NotImplementedError("Must be implemented by subclass.")
605 @classmethod
606 def fromDict(cls, dictionary, **kwargs):
607 """Construct a calibration from a dictionary of properties.
609 Must be implemented by the specific calibration subclasses.
611 Parameters
612 ----------
613 dictionary : `dict`
614 Dictionary of properties.
615 kwargs : `dict` or collections.abc.Mapping`, optional
616 Set of key=value options.
618 Returns
619 -------
620 calib : `lsst.ip.isr.CalibType`
621 Constructed calibration.
623 Raises
624 ------
625 NotImplementedError
626 Raised if not implemented.
627 """
628 raise NotImplementedError("Must be implemented by subclass.")
630 def toDict(self):
631 """Return a dictionary containing the calibration properties.
633 The dictionary should be able to be round-tripped through
634 `fromDict`.
636 Returns
637 -------
638 dictionary : `dict`
639 Dictionary of properties.
641 Raises
642 ------
643 NotImplementedError
644 Raised if not implemented.
645 """
646 raise NotImplementedError("Must be implemented by subclass.")
648 @classmethod
649 def fromTable(cls, tableList, **kwargs):
650 """Construct a calibration from a dictionary of properties.
652 Must be implemented by the specific calibration subclasses.
654 Parameters
655 ----------
656 tableList : `list` [`lsst.afw.table.Table`]
657 List of tables of properties.
658 kwargs : `dict` or collections.abc.Mapping`, optional
659 Set of key=value options.
661 Returns
662 -------
663 calib : `lsst.ip.isr.CalibType`
664 Constructed calibration.
666 Raises
667 ------
668 NotImplementedError
669 Raised if not implemented.
670 """
671 raise NotImplementedError("Must be implemented by subclass.")
673 def toTable(self):
674 """Return a list of tables containing the calibration properties.
676 The table list should be able to be round-tripped through
677 `fromDict`.
679 Returns
680 -------
681 tableList : `list` [`lsst.afw.table.Table`]
682 List of tables of properties.
684 Raises
685 ------
686 NotImplementedError
687 Raised if not implemented.
688 """
689 raise NotImplementedError("Must be implemented by subclass.")
691 def validate(self, other=None):
692 """Validate that this calibration is defined and can be used.
694 Parameters
695 ----------
696 other : `object`, optional
697 Thing to validate against.
699 Returns
700 -------
701 valid : `bool`
702 Returns true if the calibration is valid and appropriate.
703 """
704 return False
706 def apply(self, target):
707 """Method to apply the calibration to the target object.
709 Parameters
710 ----------
711 target : `object`
712 Thing to validate against.
714 Returns
715 -------
716 valid : `bool`
717 Returns true if the calibration was applied correctly.
719 Raises
720 ------
721 NotImplementedError
722 Raised if not implemented.
723 """
724 raise NotImplementedError("Must be implemented by subclass.")
727class IsrProvenance(IsrCalib):
728 """Class for the provenance of data used to construct calibration.
730 Provenance is not really a calibration, but we would like to
731 record this when constructing the calibration, and it provides an
732 example of the base calibration class.
734 Parameters
735 ----------
736 instrument : `str`, optional
737 Name of the instrument the data was taken with.
738 calibType : `str`, optional
739 Type of calibration this provenance was generated for.
740 detectorName : `str`, optional
741 Name of the detector this calibration is for.
742 detectorSerial : `str`, optional
743 Identifier for the detector.
745 """
746 _OBSTYPE = "IsrProvenance"
748 def __init__(self, calibType="unknown",
749 **kwargs):
750 self.calibType = calibType
751 self.dimensions = set()
752 self.dataIdList = list()
754 super().__init__(**kwargs)
756 self.requiredAttributes.update(["calibType", "dimensions", "dataIdList"])
758 def __str__(self):
759 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, calibType={self.calibType}, )"
761 def __eq__(self, other):
762 return super().__eq__(other)
764 def updateMetadata(self, setDate=False, **kwargs):
765 """Update calibration metadata.
767 Parameters
768 ----------
769 setDate : `bool`, optional
770 Update the ``CALIBDATE`` fields in the metadata to the current
771 time. Defaults to False.
772 kwargs : `dict` or `collections.abc.Mapping`, optional
773 Other keyword parameters to set in the metadata.
774 """
775 kwargs["calibType"] = self.calibType
776 super().updateMetadata(setDate=setDate, **kwargs)
778 def fromDataIds(self, dataIdList):
779 """Update provenance from dataId List.
781 Parameters
782 ----------
783 dataIdList : `list` [`lsst.daf.butler.DataId`]
784 List of dataIds used in generating this calibration.
785 """
786 for dataId in dataIdList:
787 for key in dataId:
788 if key not in self.dimensions:
789 self.dimensions.add(key)
790 self.dataIdList.append(dataId)
792 @classmethod
793 def fromTable(cls, tableList):
794 """Construct provenance from table list.
796 Parameters
797 ----------
798 tableList : `list` [`lsst.afw.table.Table`]
799 List of tables to construct the provenance from.
801 Returns
802 -------
803 provenance : `lsst.ip.isr.IsrProvenance`
804 The provenance defined in the tables.
805 """
806 table = tableList[0]
807 metadata = table.meta
808 inDict = dict()
809 inDict["metadata"] = metadata
810 inDict["calibType"] = metadata["calibType"]
811 inDict["dimensions"] = set()
812 inDict["dataIdList"] = list()
814 schema = dict()
815 for colName in table.columns:
816 schema[colName.lower()] = colName
817 inDict["dimensions"].add(colName.lower())
818 inDict["dimensions"] = sorted(inDict["dimensions"])
820 for row in table:
821 entry = dict()
822 for dim in sorted(inDict["dimensions"]):
823 entry[dim] = row[schema[dim]]
824 inDict["dataIdList"].append(entry)
826 return cls.fromDict(inDict)
828 @classmethod
829 def fromDict(cls, dictionary):
830 """Construct provenance from a dictionary.
832 Parameters
833 ----------
834 dictionary : `dict`
835 Dictionary of provenance parameters.
837 Returns
838 -------
839 provenance : `lsst.ip.isr.IsrProvenance`
840 The provenance defined in the tables.
841 """
842 calib = cls()
843 if calib._OBSTYPE != dictionary["metadata"]["OBSTYPE"]:
844 raise RuntimeError(f"Incorrect calibration supplied. Expected {calib._OBSTYPE}, "
845 f"found {dictionary['metadata']['OBSTYPE']}")
846 calib.updateMetadata(setDate=False, setCalibInfo=True, **dictionary["metadata"])
848 # These properties should be in the metadata, but occasionally
849 # are found in the dictionary itself. Check both places,
850 # ending with `None` if neither contains the information.
851 calib.calibType = dictionary["calibType"]
852 calib.dimensions = set(dictionary["dimensions"])
853 calib.dataIdList = dictionary["dataIdList"]
855 calib.updateMetadata()
856 return calib
858 def toDict(self):
859 """Return a dictionary containing the provenance information.
861 Returns
862 -------
863 dictionary : `dict`
864 Dictionary of provenance.
865 """
866 self.updateMetadata()
868 outDict = {}
870 metadata = self.getMetadata()
871 outDict["metadata"] = metadata
872 outDict["detectorName"] = self._detectorName
873 outDict["detectorSerial"] = self._detectorSerial
874 outDict["detectorId"] = self._detectorId
875 outDict["instrument"] = self._instrument
876 outDict["calibType"] = self.calibType
877 outDict["dimensions"] = list(self.dimensions)
878 outDict["dataIdList"] = self.dataIdList
880 return outDict
882 def toTable(self):
883 """Return a list of tables containing the provenance.
885 This seems inefficient and slow, so this may not be the best
886 way to store the data.
888 Returns
889 -------
890 tableList : `list` [`lsst.afw.table.Table`]
891 List of tables containing the provenance information
892 """
893 tableList = []
894 self.updateMetadata(setDate=True, setCalibInfo=True)
896 catalog = Table(rows=self.dataIdList,
897 names=self.dimensions)
898 filteredMetadata = {k: v for k, v in self.getMetadata().toDict().items() if v is not None}
899 catalog.meta = filteredMetadata
900 tableList.append(catalog)
901 return tableList