Coverage for python / lsst / obs / lsst / _ingestAuxCalibs.py: 18%
217 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:58 +0000
1# This file is part of obs_lsst.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21__all__ = ("PhotodiodeIngestConfig", "PhotodiodeIngestTask",
22 "ShutterMotionOpenIngestConfig", "ShutterMotionOpenIngestTask",
23 "ShutterMotionCloseIngestConfig", "ShutterMotionCloseIngestTask")
26from lsst.daf.butler import (
27 CollectionType,
28 DataCoordinate,
29 DatasetIdGenEnum,
30 DatasetRef,
31 DatasetType,
32 FileDataset,
33 Progress,
34)
35from lsst.ip.isr import PhotodiodeCalib, ShutterMotionProfile
36from lsst.obs.base import makeTransferChoiceField
37from lsst.obs.base.formatters.fitsGeneric import FitsGenericFormatter
38from lsst.pex.config import Config, Field
39from lsst.pipe.base import Task
40from lsst.resources import ResourcePath
43DEFAULT_PHOTODIODE_REGEX = r"Photodiode_Readings.*txt$|_photodiode.ecsv$|Electrometer.*fits$|EM.*fits$"
44DEFAULT_SHUTTER_OPEN_REGEX = r".*shutterMotionProfileOpen.json$"
45DEFAULT_SHUTTER_CLOSE_REGEX = r".*shutterMotionProfileClose.json$"
48# Base class begin.
49class IsrCalibIngestConfig(Config):
50 """Configuration class for base IsrCalib ingestion task."""
51 transfer = makeTransferChoiceField(default="copy")
53 forceCopyOnly = Field(
54 dtype=bool,
55 doc="Should this ingest force transfer to be copy, to ensure the calib is rewritten?",
56 default=True,
57 )
58 doRaiseOnMissingExposure = Field(
59 dtype=bool,
60 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?",
61 default=True,
62 )
64 def validate(self):
65 super().validate()
66 if self.forceCopyOnly and self.transfer != "copy":
67 raise ValueError(f"Transfer must be 'copy' for this data: {self.transfer}")
70class IsrCalibIngestTask(Task):
71 """Base task to ingest data convertable to an IsrCalib into a butler
72 repository
74 Parameters
75 ----------
76 config : `IsrCalibIngestConfig`
77 Configuration for the task.
78 instrument : `~lsst.obs.base.Instrument`
79 The instrument these datasets are from.
80 butler : `~lsst.daf.butler.Butler`
81 Writable butler instance, with ``butler.run`` set to the
82 appropriate `~lsst.daf.butler.CollectionType.RUN` collection
83 for these datasets.
84 **kwargs
85 Additional keyword arguments.
86 """
88 ConfigClass = IsrCalibIngestConfig
89 _DefaultName = "genericIsrIngest"
91 def __init__(self, butler, instrument, config=None, **kwargs):
92 config.validate()
93 super().__init__(config, **kwargs)
94 self.butler = butler
95 self.universe = self.butler.dimensions
96 self.datasetType = self.getDatasetType()
97 self.progress = Progress(self.log.name)
98 self.instrument = instrument
99 self.camera = self.instrument.getCamera()
101 def getDatasetType(self):
102 """Return the DatasetType to be ingested.
104 Returns
105 -------
106 datasetType : `lsst.daf.butler.DatasetType`
107 The datasetType for the ingested data.
108 """
109 raise NotImplementedError(
110 "Subclasses must define their datasetType."
111 )
113 def getDestinationCollection(self):
114 """Return the collection that these datasets will be ingested to.
116 Returns
117 -------
118 collectionName : `str`
119 The collection the data will be ingested to.
120 """
121 raise NotImplementedError(
122 "Subclasses must define their target collection."
123 )
125 def readCalibFromFile(self, inputFile):
126 """Read the inputFile, and determine its calibration type and read it.
128 Parameters
129 ----------
130 inputFile : `lsst.resources.ResourcePath`
131 File to be read to check ingestibility.
133 Returns
134 -------
135 calib : `lsst.ip.isr.IsrCalib`
136 The appropriately subclassed implementation for this
137 calibration type.
138 calibType : `str`
139 The calibration type/version name.
140 """
141 raise NotImplementedError(
142 "Subclasses must define how to read their datasets."
143 )
145 def getAssociationInfo(self, inputFile, calib, calibType):
146 """Determine the information needed to create a dataId for this
147 dataset.
149 Parameters
150 ----------
151 inputFile : `lsst.resources.ResourcePath`
152 Original file containing the dataset. Used for log messages.
153 calib : `lsst.ip.isr.IsrCalib`
154 The calibration dataset to study.
155 calibType : `str`
156 The calibration type/version name.
158 Returns
159 -------
160 instrumentName : `str`
161 Instrument this dataset belongs to.
162 whereClause : `str`
163 Butler query "where" that will find the exposure with
164 matching dataId.
165 binding : `dict` [`str`: `str`]
166 Binding values for ``whereClause``.
167 logId : `str`
168 A string (or dataset convertable to string) to be used in
169 downstream logs.
170 """
171 raise NotImplementedError(
172 "Subclasses must define how to associate their datasets."
173 )
175 def run(self, locations, run=None,
176 file_filter=DEFAULT_PHOTODIODE_REGEX,
177 track_file_attrs=None):
179 """Ingest calibration data into a Butler data repository.
181 Parameters
182 ----------
183 files : iterable over `lsst.resources.ResourcePath`
184 URIs to the files to be ingested.
185 run : `str`, optional
186 Name of the RUN-type collection to write to,
187 overriding the default derived from the instrument
188 name.
189 skip_existing_exposures : `bool`, optional
190 If `True`, skip photodiodes that have already been
191 ingested (i.e. raws for which we already have a
192 dataset with the same data ID in the target
193 collection).
194 track_file_attrs : `bool`, optional
195 Control whether file attributes such as the size or
196 checksum should be tracked by the datastore. Whether
197 this parameter is honored depends on the specific
198 datastore implementation.
200 Returns
201 -------
202 refs : `list` [`lsst.daf.butler.DatasetRef`]
203 Dataset references for ingested raws.
205 Raises
206 ------
207 RuntimeError
208 Raised if the number of exposures found for a photodiode
209 file is not one
210 """
211 files = ResourcePath.findFileResources(locations, file_filter)
213 registry = self.butler.registry
214 datasetType = self.datasetType
215 registry.registerDatasetType(datasetType)
217 # Find and register run that we will ingest to.
218 if run is None:
219 run = self.getDestinationCollection()
220 registry.registerCollection(run, type=CollectionType.RUN)
222 # Use datasetIds that match the raw exposure data.
223 if self.butler.registry.supportsIdGenerationMode(DatasetIdGenEnum.DATAID_TYPE_RUN):
224 mode = DatasetIdGenEnum.DATAID_TYPE_RUN
225 else:
226 mode = DatasetIdGenEnum.UNIQUE
228 refs = []
229 numExisting = 0
230 numFailed = 0
231 numSoftFailed = 0
232 for inputFile in files:
233 # Convert the file into the right class.
234 calib, calibType = self.readCalibFromFile(inputFile)
236 # Get the information we'll need to look up which exposure
237 # it matches
238 instrumentName, whereClause, binding, logId = self.getAssociationInfo(inputFile, calib, calibType)
240 if whereClause is None:
241 self.log.warning("Skipping input file %s of unknown type.",
242 inputFile)
243 continue
245 exposureRecords = [rec for rec in registry.queryDimensionRecords("exposure",
246 instrument=instrumentName,
247 where=whereClause,
248 bind=binding)]
250 nRecords = len(exposureRecords)
251 if nRecords == 1:
252 exposureId = exposureRecords[0].id
253 calib.updateMetadata(camera=self.camera, exposure=exposureId)
254 elif nRecords == 0:
255 numSoftFailed += 1
256 self.log.warning("Skipping instrument %s and identifiers %s: no exposures found.",
257 instrumentName, logId)
258 continue
259 else:
260 numFailed += 1
261 self.log.warning("Multiple exposure entries found for instrument %s and "
262 "identifiers %s.", instrumentName, logId)
263 continue
265 # Generate the dataId for this file.
266 dataId = DataCoordinate.standardize(
267 instrument=self.instrument.getName(),
268 exposure=exposureId,
269 universe=self.universe,
270 )
272 # If this already exists, we should skip it and continue.
273 existing = {
274 ref.dataId
275 for ref in self.butler.registry.queryDatasets(self.datasetType, collections=[run],
276 dataId=dataId)
277 }
278 if existing:
279 self.log.debug("Skipping instrument %s and identifiers %s: already exists in run %s.",
280 instrumentName, logId, run)
281 numExisting += 1
282 continue
284 # Ingest must work from a file, but we can't use the
285 # original, as we've added new metadata and reformatted
286 # it. Write it to a temp file that we can use to ingest.
287 # If we can have the files written appropriately, this
288 # will be a direct ingest of those files.
289 if self.config.transfer == "copy":
290 with ResourcePath.temporary_uri(suffix=".fits") as tempFile:
291 calib.writeFits(tempFile.ospath)
293 ref = DatasetRef(self.datasetType, dataId, run=run, id_generation_mode=mode)
294 dataset = FileDataset(path=tempFile, refs=ref, formatter=FitsGenericFormatter)
296 # No try, as if this fails, we should stop.
297 self.butler.ingest(dataset, transfer=self.config.transfer,
298 record_validation_info=track_file_attrs)
299 self.log.info("Dataset %s:%d (%s) ingested successfully", instrumentName, exposureId,
300 logId)
301 refs.append(dataset)
302 elif self.config.transfer == "direct":
303 if self.config.forceCopyOnly:
304 raise RuntimeError("I probably can never happen.")
306 ref = DatasetRef(self.datasetType, dataId, run=run, id_generation_mode=mode)
307 dataset = FileDataset(path=inputFile, refs=ref, formatter=FitsGenericFormatter) # ??
308 self.butler.ingest(dataset, transfer=self.config.transfer,
309 record_validation_info=track_file_attrs)
310 self.log.info("Dataset %s:%d (%s) ingested successfully", instrumentName, exposureId,
311 logId)
312 refs.append(dataset)
314 if numExisting != 0:
315 self.log.warning("Skipped %d entries that already existed in run %s", numExisting, run)
317 if numSoftFailed != 0:
318 print(self.config.doRaiseOnMissingExposure)
319 if self.config.doRaiseOnMissingExposure:
320 raise RuntimeError(f"Failed to ingest {numSoftFailed} entries due to "
321 "missing exposure information.")
322 else:
323 self.log.warning("Skipped %d entries that had no associated exposure", numSoftFailed)
324 if numFailed != 0:
325 raise RuntimeError(f"Failed to ingest {numFailed} entries due to missing exposure information.")
328# Photodiode implementation begin.
329class PhotodiodeIngestConfig(IsrCalibIngestConfig):
330 """Configuration class for PhotodiodeIngestTask."""
331 doRaiseOnMissingExposure = Field(
332 dtype=bool,
333 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?",
334 default=True
335 )
338class PhotodiodeIngestTask(IsrCalibIngestTask):
339 """Task to ingest photodiode data into a butler repository.
341 Parameters
342 ----------
343 config : `PhotodiodeIngestConfig`
344 Configuration for the task.
345 instrument : `~lsst.obs.base.Instrument`
346 The instrument these photodiode datasets are from.
347 butler : `~lsst.daf.butler.Butler`
348 Writable butler instance, with ``butler.run`` set to the
349 appropriate `~lsst.daf.butler.CollectionType.RUN` collection
350 for these datasets.
351 **kwargs
352 Additional keyword arguments.
353 """
355 ConfigClass = PhotodiodeIngestConfig
356 _DefaultName = "photodiodeIngest"
358 def getDatasetType(self):
359 return DatasetType(
360 "photodiode",
361 ("instrument", "exposure"),
362 "IsrCalib",
363 universe=self.universe,
364 )
366 def getDestinationCollection(self):
367 return self.instrument.makeCollectionName("calib", "photodiode")
369 def readCalibFromFile(self, inputFile):
370 try:
371 # Try reading as a fits file. This is the 2025
372 # standard, but make sure to include the format
373 # version so we can parse that below.
374 with inputFile.as_local() as localFile:
375 calib = PhotodiodeCalib.readFits(localFile.ospath)
376 fitsVersion = int(calib.getMetadata().get("FORMAT_V", 1))
377 calibType = f"fits-v{fitsVersion:d}"
378 return calib, calibType
379 except Exception:
380 try:
381 # Try reading as a text file
382 with inputFile.as_local() as localFile:
383 calib = PhotodiodeCalib.readText(localFile.ospath)
384 # This is "full" in that it has everything needed to
385 # be read from text.
386 calibType = "full"
387 return calib, calibType
388 except Exception:
389 # Try reading as a two-column file. This was the
390 # older version.
391 try:
392 with inputFile.as_local() as localFile:
393 calib = PhotodiodeCalib.readTwoColumnPhotodiodeData(localFile.ospath)
394 calibType = "two-column"
395 return calib, calibType
396 except Exception:
397 return None, "Unknown"
398 # Code should never get here
399 return None, "Unknown"
401 def getAssociationInfo(self, inputFile, calib, calibType):
402 # GET INFO BLOCK
403 # Get exposure records so we can associate the photodiode
404 # to the exposure.
405 if calibType == "fits-v1":
406 instrumentName = calib.metadata.get("INSTRUME")
407 if instrumentName is None or instrumentName != self.instrument.getName():
408 # The field is populated by the calib class, so we
409 # can't use defaults.
410 instrumentName = self.instrument.getName()
412 # This format uses the GROUPID to match what is set in
413 # the exposure. Validate this to be of the form:
414 # {initial_group}#{unique identifier}, neither of
415 # which should be blank.
416 groupId = calib.metadata.get("GROUPID")
417 validGroup = True
418 if groupId is None:
419 validGroup = False
420 elif "#" not in groupId:
421 validGroup = False
422 else:
423 splitGroup = groupId.split("#")
424 if len(splitGroup) != 2:
425 validGroup = False
426 if splitGroup[0] == "" or splitGroup[1] == "":
427 validGroup = False
428 if not validGroup:
429 self.log.warning("Skipping input file %s with malformed group %s.",
430 inputFile, groupId)
431 return None, None, None, groupId
433 whereClause = "exposure.group=groupId"
434 binding = {"groupId": groupId}
435 logId = groupId
436 elif calibType == "full":
437 instrumentName = calib.getMetadata().get("INSTRUME")
438 if instrumentName is None:
439 # The field is populated by the calib class, so we
440 # can't use defaults.
441 instrumentName = self.instrument.getName()
443 # This format uses the obsId to match what is set in
444 # the exposure.
445 obsId = calib.getMetadata()["obsId"]
446 whereClause = "exposure.obs_id=obsId"
447 binding = {"obsId": obsId}
448 logId = obsId
449 elif calibType == "two-column":
450 dayObs = calib.getMetadata()["day_obs"]
451 seqNum = calib.getMetadata()["seq_num"]
453 # This format uses dayObs and seqNum to match what is
454 # set in the exposure.
455 whereClause = "exposure.day_obs=dayObs and exposure.seq_num=seqNum"
456 instrumentName = self.instrument.getName()
457 binding = {"dayObs": dayObs, "seqNum": seqNum}
458 logId = (dayObs, seqNum)
459 else:
460 # We've failed somewhere to reach this point
461 instrumentName = None
462 whereClause = None
463 binding = None
464 logId = None
466 return instrumentName, whereClause, binding, logId
469# Shutter Motion Open / Base Class begin:
470class ShutterMotionOpenIngestConfig(IsrCalibIngestConfig):
471 """Configuration class for ShutterMotionIngestTask."""
472 doRaiseOnMissingExposure = Field(
473 dtype=bool,
474 doc="Should ingest raise if a calibration exists, but the matching exposure doesn't?",
475 default=False
476 )
479class ShutterMotionOpenIngestTask(IsrCalibIngestTask):
480 """Task to ingest shutter motion profiles into a butler repository.
482 This task specifically works on the "open" profile.
484 Parameters
485 ----------
486 config : `ShutterMotionIngestConfig`
487 Configuration for the task.
488 instrument : `~lsst.obs.base.Instrument`
489 The instrument these datasets are from.
490 butler : `~lsst.daf.butler.Butler`
491 Writable butler instance, with ``butler.run`` set to the
492 appropriate `~lsst.daf.butler.CollectionType.RUN` collection
493 for these datasets.
494 **kwargs
495 Additional keyword arguments.
496 """
498 ConfigClass = ShutterMotionOpenIngestConfig
499 _DefaultName = "shutterMotionOpenIngest"
501 def getDatasetType(self):
502 return DatasetType(
503 "shutterMotionProfileOpen",
504 ("instrument", "exposure"),
505 "IsrCalib",
506 universe=self.universe,
507 )
509 def getDestinationCollection(self):
510 return self.instrument.makeCollectionName("calib", "shutterMotion")
512 def readCalibFromFile(self, inputFile):
513 try:
514 # Try reading as a json file. This is the 2025
515 # standard, but make sure to include the format
516 # version so we can parse that below.
517 with inputFile.as_local() as localFile:
518 calib = ShutterMotionProfile.readText(localFile.ospath)
519 fitsVersion = int(calib.getMetadata().get("FORMAT_V", 1))
520 calibType = f"text-v{fitsVersion:d}"
521 return calib, calibType
522 except Exception:
523 return None, "Unknown"
525 # Code should never get here
526 return None, "Unknown"
528 def getAssociationInfo(self, inputFile, calib, calibType):
529 # Get exposure records so we can associate the dataset
530 # to the exposure.
531 if calibType == "text-v1" or calibType == "text-v2":
532 instrumentName = calib.metadata.get("INSTRUME")
533 if instrumentName is None or instrumentName != self.instrument.getName():
534 # The field is populated by the calib class, so we
535 # can't use defaults.
536 instrumentName = self.instrument.getName()
538 # This format uses the GROUPID to match what is set in
539 # the exposure. Validate this to be of the form:
540 # {initial_group}#{unique identifier}, neither of
541 # which should be blank.
542 obsId = calib.metadata.get("obsId")
543 if obsId is None:
544 self.log.warning("Skipping input file %s with malformed obsId %s.",
545 inputFile, obsId)
546 return None, None, None, obsId
548 whereClause = "exposure.obs_id=obsId"
549 binding = {"obsId": obsId}
550 logId = obsId
551 else:
552 # We've failed somewhere to reach this point
553 instrumentName = None
554 whereClause = None
555 binding = None
556 logId = None
558 return instrumentName, whereClause, binding, logId
561# Shutter Motion Open / Base Class begin:
562class ShutterMotionCloseIngestConfig(ShutterMotionOpenIngestConfig):
563 """Configuration class for ShutterMotionIngestTask."""
564 pass
567class ShutterMotionCloseIngestTask(ShutterMotionOpenIngestTask):
568 """Task to ingest shutter motion profiles into a butler repository.
570 This task specifically works on the "Close" profile.
572 Parameters
573 ----------
574 config : `ShutterMotionIngestConfig`
575 Configuration for the task.
576 instrument : `~lsst.obs.base.Instrument`
577 The instrument these profiles are from.
578 butler : `~lsst.daf.butler.Butler`
579 Writable butler instance, with ``butler.run`` set to the
580 appropriate `~lsst.daf.butler.CollectionType.RUN` collection
581 for these datasets.
582 **kwargs
583 Additional keyword arguments.
584 """
586 ConfigClass = ShutterMotionCloseIngestConfig
587 _DefaultName = "shutterMotionCloseIngest"
589 def getDatasetType(self):
590 return DatasetType(
591 "shutterMotionProfileClose",
592 ("instrument", "exposure"),
593 "IsrCalib",
594 universe=self.universe,
595 )