Coverage for python / lsst / cp / verify / mergeResults.py: 29%
289 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:08 +0000
1# This file is part of cp_verify.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21from astropy.table import vstack, Table, TableMergeError
23import lsst.pipe.base as pipeBase
24import lsst.pipe.base.connectionTypes as cT
25import lsst.pex.config as pexConfig
28__all__ = ['CpVerifyExpMergeConfig', 'CpVerifyExpMergeTask',
29 'CpVerifyRunMergeConfig', 'CpVerifyRunMergeTask',
30 'CpVerifyExpMergeByFilterConfig', 'CpVerifyExpMergeByFilterTask',
31 'CpVerifyRunMergeByFilterConfig', 'CpVerifyRunMergeByFilterTask',
32 'CpVerifyVisitExpMergeConfig', 'CpVerifyVisitExpMergeTask',
33 'CpVerifyVisitRunMergeConfig', 'CpVerifyVisitRunMergeTask',
34 'CpVerifyCalibMergeConfig', 'CpVerifyCalibMergeTask']
37class CpVerifyExpMergeConnections(pipeBase.PipelineTaskConnections,
38 dimensions={"instrument", "exposure"},
39 defaultTemplates={}):
40 inputStats = cT.Input(
41 name="detectorStats",
42 doc="Input statistics to merge.",
43 storageClass="StructuredDataDict",
44 dimensions=["instrument", "exposure", "detector"],
45 multiple=True,
46 )
47 inputResults = cT.Input(
48 name="detectorResults",
49 doc="Input results to merge.",
50 storageClass="ArrowAstropy",
51 dimensions=["instrument", "exposure", "detector"],
52 multiple=True,
53 )
54 inputMatrix = cT.Input(
55 name="detectorMatrix",
56 doc="Input matrix to merge.",
57 storageClass="ArrowAstropy",
58 dimensions=["instrument", "exposure", "detector"],
59 multiple=True,
60 )
61 camera = cT.PrerequisiteInput(
62 name="camera",
63 storageClass="Camera",
64 doc="Input camera.",
65 dimensions=["instrument", ],
66 isCalibration=True,
67 )
69 outputStats = cT.Output(
70 name="exposureStats",
71 doc="Output statistics.",
72 storageClass="StructuredDataDict",
73 dimensions=["instrument", "exposure"],
74 )
75 outputResults = cT.Output(
76 name="exposureResults",
77 doc="Output results.",
78 storageClass="ArrowAstropy",
79 dimensions=["instrument", "exposure"],
80 )
81 outputMatrix = cT.Output(
82 name="exposureMatrix",
83 doc="Output matrix.",
84 storageClass="ArrowAstropy",
85 dimensions=["instrument", "exposure"],
86 )
88 def __init__(self, *, config=None):
89 super().__init__(config=config)
91 if not self.config.hasMatrixCatalog:
92 self.inputs.remove("inputMatrix")
93 self.outputs.remove("outputMatrix")
94 if not self.config.hasInputResults:
95 self.inputs.remove("inputResults")
96 if self.config.doDropStats:
97 self.outputs.remove("outputStats")
100class CpVerifyExpMergeConfig(pipeBase.PipelineTaskConfig,
101 pipelineConnections=CpVerifyExpMergeConnections):
102 """Configuration parameters for exposure stats merging.
103 """
104 statKeywords = pexConfig.DictField(
105 keytype=str,
106 itemtype=str,
107 doc="Dictionary of statistics to run on the set of detector values. The key should be the test "
108 "name to record in the output, and the value should be the `lsst.afw.math` statistic name string.",
109 default={},
110 )
111 hasMatrixCatalog = pexConfig.Field(
112 dtype=bool,
113 doc="Is there matrix catalog to merge?",
114 default=False,
115 )
116 hasInputResults = pexConfig.Field(
117 dtype=bool,
118 doc="Are there results tables to merge?",
119 default=False,
120 )
121 doDropStats = pexConfig.Field(
122 dtype=bool,
123 doc="Drop to-be-deprecated internal stats files to avoid butler dimension mismatch?",
124 default=False,
125 )
127 mergeDimension = pexConfig.Field(
128 dtype=str,
129 doc="Dimension name that these inputs will be merged over.",
130 default="detector",
131 )
132 stageName = pexConfig.Field(
133 dtype=str,
134 doc="Stage name to use in any further analysis.",
135 default="stageName",
136 )
139class CpVerifyExpMergeTask(pipeBase.PipelineTask):
140 """Merge statistics from detectors together.
141 """
142 ConfigClass = CpVerifyExpMergeConfig
143 _DefaultName = 'cpVerifyExpMerge'
145 def runQuantum(self, butlerQC, inputRefs, outputRefs):
146 inputs = butlerQC.get(inputRefs)
148 dimensions = [dict(exp.dataId.required) for exp in inputRefs.inputStats]
149 inputs['inputDims'] = dimensions
151 outputs = self.run(**inputs)
152 butlerQC.put(outputs, outputRefs)
154 def run(self, inputStats, inputDims, camera=None, inputResults=None, inputMatrix=None,):
155 """Merge statistics.
157 Parameters
158 ----------
159 inputStats : `dict`
160 A nested dictionary of measured statistics and
161 verification results.
162 inputDims : `dict`
163 The input dimensions for each element of the inputStats.
164 camera : `lsst.afw.cameraGeom.Camera`, optional
165 The camera definition, used for identifying amplifier and
166 detector names.
167 inputResults : `astropy.Table`, optional
168 The statistics information, formatted into a flat table.
169 inputMatrix : `astropy.Table`, optional
170 A table of results that represent the elements of matrices
171 of values.
173 Returns
174 -------
175 outputStats : `dict`
176 A nested dictionary of merged statistics and verification
177 results.
178 outputResults : `astropy.Table`
179 Flat table containing the merged results from all
180 inputResults.
181 outputMatrix : `astropy.Table`
182 Table containing the merge results from all inputMatrix.
184 See Also
185 --------
186 lsst.cp.verify.CpVerifyStatsTask
187 """
188 outputStats = {} # This contains failure information
189 success = True
191 mergedStats = {} # This contains the merged set of subcomponent stats.
192 for inStats, dimensions in zip(inputStats, inputDims):
193 thisId = dimensions[self.config.mergeDimension]
194 thisName = thisId
196 if self.config.mergeDimension == 'detector':
197 thisName = camera[thisId].getName()
199 calcStats = {}
201 mergedStats[thisName] = inStats
203 if inStats['SUCCESS'] is True:
204 calcStats['SUCCESS'] = True
205 else:
206 calcStats['SUCCESS'] = False
207 calcStats['FAILURES'] = list()
208 success = False
210 # See if we have verify information to check:
211 if 'VERIFY' in inStats:
212 # See if an exposure failed
213 if 'EXP' in inStats['VERIFY'] and len(inStats['VERIFY']['EXP']) > 0:
214 expSuccess = inStats['VERIFY']['EXP'].pop('SUCCESS', False)
215 if not expSuccess:
216 for testName, testResult in inStats['VERIFY']['EXP'].items():
217 if testResult is False:
218 calcStats['FAILURES'].append(testName)
220 # See if a detector failed
221 if 'DET' in inStats['VERIFY'] and len(inStats['VERIFY']['DET']) > 0:
222 detSuccess = inStats['VERIFY']['DET'].pop('SUCCESS', False)
223 if not detSuccess:
224 for testName, testResult in inStats['VERIFY']['DET'].items():
225 if testResult is False:
226 calcStats['FAILURES'].append(testName)
228 # See if an amplifier failed
229 if 'AMP' in inStats['VERIFY'] and len(inStats['VERIFY']['AMP']) > 0:
230 for ampName, ampStats in inStats['VERIFY']['AMP'].items():
231 ampSuccess = ampStats.pop('SUCCESS')
232 if not ampSuccess:
233 for testName, testResult in ampStats.items():
234 if testResult is False:
235 calcStats['FAILURES'].append(ampName + " " + testName)
237 # See if a catalog failed
238 if 'CATALOG' in inStats['VERIFY'] and len(inStats['VERIFY']['CATALOG']) > 0:
239 for testName, testResult in inStats['VERIFY']['CATALOG'].items():
240 if testResult is False:
241 calcStats['FAILURES'].append(testName)
242 else:
243 # No VERIFY info? This must be partially accumulated.
244 # But we know there are failures somewhere.
245 # Drop any "SUCCESS" keys
246 _ = inStats.pop("SUCCESS", False)
247 for statKey, statDict in inStats.items():
248 if 'SUCCESS' in statDict and not statDict['SUCCESS']:
249 for failure in statDict['FAILURES']:
250 calcStats['FAILURES'].append(f"{statKey} {failure}")
252 outputStats[thisName] = calcStats
254 if self.config.mergeDimension == 'detector':
255 outKey = 'EXP'
256 else:
257 outKey = 'RUN'
259 groupSuccess = True
260 if len(self.config.statKeywords):
261 outputStats[outKey] = self.calcStatistics(mergedStats)
262 outputStats['VERIFY'], groupSuccess = self.verify(mergedStats, outputStats)
264 outputStats['SUCCESS'] = success & groupSuccess
266 additionalResults = None
267 if outKey in outputStats:
268 # This is the only new information generated here.
269 additionalResults, _ = self.pack(outputStats, inputDims, outKey)
271 outputResults = self.mergeTable(inputResults, additionalResults)
272 if inputMatrix is not None:
273 outputMatrix = self.mergeTable(inputMatrix)
274 else:
275 outputMatrix = None
277 return pipeBase.Struct(
278 outputStats=outputStats,
279 outputResults=outputResults,
280 outputMatrix=outputMatrix,
281 )
283 @staticmethod
284 def mergeTable(inputResults, newStats=None):
285 """Merge input tables.
287 Parameters
288 ----------
289 inputResults : `list` [`astropy.table.Table`]
290 Input tables to merge.
291 newStats : `astropy.table.Table`
292 Additional table to merge.
294 Returns
295 -------
296 merged : `astropy.table.Table`
297 "Outer-join" merged table.
298 """
299 if inputResults is None:
300 return Table()
302 if len(inputResults) > 0:
303 try:
304 outputResults = vstack(inputResults)
305 except TableMergeError:
306 # astropy requires everything in a column to have the
307 # same shape. Enforce that here.
308 import numpy as np
309 max_shapes = {}
310 columns_to_fix = set()
312 # Scan the data we're merging to find the max sizes.
313 for result in inputResults:
314 for column in result.columns:
315 shape = result[column].shape
316 if column in max_shapes:
317 max_shapes[column] = np.max([max_shapes[column], shape], axis=0)
318 else:
319 max_shapes[column] = shape
321 # Decide what needs fixing.
322 for result in inputResults:
323 for column in result.columns:
324 shape = result[column].shape
325 if column not in columns_to_fix:
326 need_to_fix = False
328 for counter, (lh, rh) in enumerate(zip(shape, max_shapes[column])):
329 if counter == 0:
330 continue
331 if lh != rh:
332 need_to_fix = True
333 if need_to_fix:
334 columns_to_fix.add(column)
336 # Do the fix.
337 for result in inputResults:
338 for column in result.columns:
339 if column not in columns_to_fix:
340 continue
342 # The first dimension of these shapes is the
343 # column length, which we can ignore in
344 # vstacking.
345 to_pad = np.subtract(max_shapes[column], result[column].shape)
346 if len(to_pad) <= 1:
347 continue
348 to_pad[0] = 0
349 # Only pad at the end.
350 padding = [[0, int(pp)] for pp in to_pad]
351 fixed_column = np.pad(result[column], padding,
352 mode='constant',
353 constant_values=np.nan)
354 result[column] = fixed_column
356 # Will it work now? If not, this will raise
357 outputResults = vstack(inputResults)
358 else:
359 outputResults = inputResults
361 if newStats:
362 return vstack([outputResults, Table(newStats)])
363 else:
364 return outputResults
366 def calcStatistics(self, statisticsDict):
367 """Calculate exposure level statistics based on the existing
368 per-amplifier and per-detector measurements.
370 Parameters
371 ----------
372 statisticsDictionary : `dict [`str`, `dict` [`str`, scalar]],
373 Dictionary of measured statistics. The top level
374 dictionary is keyed on the detector names, and contains
375 the measured statistics from the per-detector
376 measurements.
378 Returns
379 -------
380 outputStatistics : `dict` [`str, scalar]
381 A dictionary of the statistics measured and their values.
382 """
383 raise NotImplementedError("Subclasses must implement verification criteria.")
385 def verify(self, detectorStatistics, statisticsDictionary):
387 """Verify if the measured statistics meet the verification criteria.
389 Parameters
390 ----------
391 detectorStatistics : `dict` [`str`, `dict` [`str`, scalar]]
392 Merged set of input detector level statistics.
393 statisticsDictionary : `dict` [`str`, `dict` [`str`, scalar]]
394 Dictionary of measured statistics. The inner dictionary
395 should have keys that are statistic names (`str`) with
396 values that are some sort of scalar (`int` or `float` are
397 the mostly likely types).
399 Returns
400 -------
401 outputStatistics : `dict` [`str`, `dict` [`str`, `bool`]]
402 A dictionary indexed by the amplifier name, containing
403 dictionaries of the verification criteria.
404 success : `bool`
405 A boolean indicating if all tests have passed.
407 Raises
408 ------
409 NotImplementedError :
410 This method must be implemented by the calibration-type
411 subclass.
412 """
413 raise NotImplementedError("Subclasses must implement verification criteria.")
415 def pack(self, statisticsDict, dimensions, outKey):
416 """Repack information into flat tables.
418 This method should be redefined in subclasses, if new
419 statistics are measured.
421 Parameters
422 ----------
423 statisticsDictionary : `dict` [`str`, `dict` [`str`, scalar]],
424 Dictionary of measured statistics. The inner dictionary
425 should have keys that are statistic names (`str`) with
426 values that are some sort of scalar (`int` or `float` are
427 the mostly likely types).
428 dimensions : `dict`
429 Dictionary of input dimensions.
430 outKey : `str`
431 Key to use to access the data to pack.
433 Returns
434 -------
435 outputResults : `list` [`dict`]
436 A list of rows to add to the output table.
437 outputMatrix : `list` [`dict`]
438 A list of rows to add to the output matrix.
440 Raises
441 ------
442 NotImplementedError :
443 This method must be implemented by the calibration-type
444 subclass.
446 """
447 raise NotImplementedError("Subclasses must implement verification criteria.")
450class CpVerifyRunMergeConnections(pipeBase.PipelineTaskConnections,
451 dimensions={"instrument", },
452 defaultTemplates={}):
453 inputStats = cT.Input(
454 name="exposureStats",
455 doc="Input statistics to merge.",
456 storageClass="StructuredDataDict",
457 dimensions=["instrument", "exposure"],
458 multiple=True,
459 )
460 inputResults = cT.Input(
461 name="exposureResults",
462 doc="Input results table to merge.",
463 storageClass="ArrowAstropy",
464 dimensions=["instrument", "exposure"],
465 multiple=True,
466 )
467 inputMatrix = cT.Input(
468 name="exposureMatrix",
469 doc="Input matrix table to merge.",
470 storageClass="ArrowAstropy",
471 dimensions=["instrument", "exposure"],
472 multiple=True,
473 )
474 camera = cT.PrerequisiteInput(
475 name="camera",
476 storageClass="Camera",
477 doc="Input camera.",
478 dimensions=["instrument", ],
479 isCalibration=True,
480 )
482 outputStats = cT.Output(
483 name="runStats",
484 doc="Output statistics.",
485 storageClass="StructuredDataDict",
486 dimensions=["instrument", ],
487 )
488 outputResults = cT.Output(
489 name="runResults",
490 doc="Output merged results table.",
491 storageClass="ArrowAstropy",
492 dimensions=["instrument",],
493 )
494 outputMatrix = cT.Output(
495 name="runMatrix",
496 doc="Output merged matrix table.",
497 storageClass="ArrowAstropy",
498 dimensions=["instrument",],
499 )
501 def __init__(self, *, config=None):
502 super().__init__(config=config)
504 if not self.config.hasMatrixCatalog:
505 self.inputs.remove("inputMatrix")
506 self.outputs.remove("outputMatrix")
507 if not self.config.hasInputResults:
508 self.inputs.remove("inputResults")
509 if self.config.doDropStats:
510 self.outputs.remove("outputStats")
513class CpVerifyRunMergeConfig(CpVerifyExpMergeConfig,
514 pipelineConnections=CpVerifyRunMergeConnections):
515 """Configuration paramters for exposure stats merging.
516 """
517 mergeDimension = pexConfig.Field(
518 dtype=str,
519 doc="Dimension name for this input.",
520 default="exposure",
521 )
524class CpVerifyRunMergeTask(CpVerifyExpMergeTask):
525 """Merge statistics from detectors together.
526 """
527 ConfigClass = CpVerifyRunMergeConfig
528 _DefaultName = 'cpVerifyRunMerge'
530 pass
531# End ExpMerge/RunMerge
534# Begin ExpMergeByFilter/RunMergeByFilter
535class CpVerifyExpMergeByFilterConnections(pipeBase.PipelineTaskConnections,
536 dimensions={"instrument", "exposure", "physical_filter"},
537 defaultTemplates={}):
538 inputStats = cT.Input(
539 name="exposureStats",
540 doc="Input statistics to merge.",
541 storageClass="StructuredDataDict",
542 dimensions=["instrument", "exposure", "detector", "physical_filter"],
543 multiple=True,
544 )
545 inputResults = cT.Input(
546 name="exposureResults",
547 doc="Input results table to merge.",
548 storageClass="ArrowAstropy",
549 dimensions=["instrument", "exposure", "detector", "physical_filter"],
550 multiple=True,
551 )
552 inputMatrix = cT.Input(
553 name="exposureMatrix",
554 doc="Input matrix table to merge.",
555 storageClass="ArrowAstropy",
556 dimensions=["instrument", "exposure", "detector", "physical_filter"],
557 multiple=True,
558 )
559 camera = cT.PrerequisiteInput(
560 name="camera",
561 storageClass="Camera",
562 doc="Input camera.",
563 dimensions=["instrument", ],
564 isCalibration=True,
565 )
567 outputStats = cT.Output(
568 name="runStats",
569 doc="Output statistics.",
570 storageClass="StructuredDataDict",
571 dimensions=["instrument", "exposure", "physical_filter"],
572 )
573 outputResults = cT.Output(
574 name="runResults",
575 doc="Output merged results table.",
576 storageClass="ArrowAstropy",
577 dimensions=["instrument", "exposure", "physical_filter"],
578 )
579 outputMatrix = cT.Output(
580 name="runMatrix",
581 doc="Output merged matrix table.",
582 storageClass="ArrowAstropy",
583 dimensions=["instrument", "exposure", "physical_filter"],
584 )
586 def __init__(self, *, config=None):
587 super().__init__(config=config)
589 if not self.config.hasMatrixCatalog:
590 self.inputs.remove("inputMatrix")
591 self.outputs.remove("outputMatrix")
592 if not self.config.hasInputResults:
593 self.inputs.remove("inputResults")
594 if self.config.doDropStats:
595 self.outputs.remove("outputStats")
598class CpVerifyExpMergeByFilterConfig(CpVerifyExpMergeConfig,
599 pipelineConnections=CpVerifyExpMergeByFilterConnections):
600 """Configuration paramters for exposure stats merging.
601 """
602 mergeDimension = pexConfig.Field(
603 dtype=str,
604 doc="Dimension name for this input.",
605 default="detector",
606 )
609class CpVerifyExpMergeByFilterTask(CpVerifyExpMergeTask):
610 """Merge statistics from detectors together.
611 """
612 ConfigClass = CpVerifyRunMergeConfig
613 _DefaultName = 'cpVerifyRunMerge'
615 pass
618class CpVerifyRunMergeByFilterConnections(pipeBase.PipelineTaskConnections,
619 dimensions={"instrument", "physical_filter"},
620 defaultTemplates={}):
621 inputStats = cT.Input(
622 name="exposureStats",
623 doc="Input statistics to merge.",
624 storageClass="StructuredDataDict",
625 dimensions=["instrument", "exposure", "physical_filter"],
626 multiple=True,
627 )
628 inputResults = cT.Input(
629 name="exposureResults",
630 doc="Input results table to merge.",
631 storageClass="ArrowAstropy",
632 dimensions=["instrument", "exposure", "physical_filter"],
633 multiple=True,
634 )
635 inputMatrix = cT.Input(
636 name="exposureMatrix",
637 doc="Input matrix table to merge.",
638 storageClass="ArrowAstropy",
639 dimensions=["instrument", "exposure", "physical_filter"],
640 multiple=True,
641 )
642 camera = cT.PrerequisiteInput(
643 name="camera",
644 storageClass="Camera",
645 doc="Input camera.",
646 dimensions=["instrument", ],
647 isCalibration=True,
648 )
650 outputStats = cT.Output(
651 name="runStats",
652 doc="Output statistics.",
653 storageClass="StructuredDataDict",
654 dimensions=["instrument", "physical_filter"],
655 )
656 outputResults = cT.Output(
657 name="runResults",
658 doc="Output merged results table.",
659 storageClass="ArrowAstropy",
660 dimensions=["instrument", "physical_filter"],
661 )
662 outputMatrix = cT.Output(
663 name="runMatrix",
664 doc="Output merged matrix table.",
665 storageClass="ArrowAstropy",
666 dimensions=["instrument", "physical_filter"],
667 )
669 def __init__(self, *, config=None):
670 super().__init__(config=config)
672 if not self.config.hasMatrixCatalog:
673 self.inputs.remove("inputMatrix")
674 self.outputs.remove("outputMatrix")
675 if not self.config.hasInputResults:
676 self.inputs.remove("inputResults")
677 if self.config.doDropStats:
678 self.outputs.remove("outputStats")
681class CpVerifyRunMergeByFilterConfig(CpVerifyRunMergeConfig,
682 pipelineConnections=CpVerifyRunMergeByFilterConnections):
683 """Configuration paramters for exposure stats merging.
684 """
685 pass
688class CpVerifyRunMergeByFilterTask(CpVerifyExpMergeTask):
689 """Merge statistics from detectors together.
690 """
691 ConfigClass = CpVerifyRunMergeByFilterConfig
692 _DefaultName = 'cpVerifyRunMergeByFilter'
694 pass
695# End ExpMergeByFilter/RunMergeByFilter
698# Begin ExpMergeByVisit/RunMergeByVisit
699class CpVerifyVisitExpMergeConnections(pipeBase.PipelineTaskConnections,
700 dimensions={"instrument", "visit"},
701 defaultTemplates={}):
702 inputStats = cT.Input(
703 name="detectorStats",
704 doc="Input statistics to merge.",
705 storageClass="StructuredDataDict",
706 dimensions=["instrument", "exposure", "detector"],
707 multiple=True,
708 )
709 inputResults = cT.Input(
710 name="detectorResults",
711 doc="Input results to merge.",
712 storageClass="ArrowAstropy",
713 dimensions=["instrument", "exposure", "detector"],
714 multiple=True,
715 )
716 inputMatrix = cT.Input(
717 name="detectorMatrix",
718 doc="Input matrix to merge.",
719 storageClass="ArrowAstropy",
720 dimensions=["instrument", "visit", "detector"],
721 multiple=True,
722 )
723 camera = cT.PrerequisiteInput(
724 name="camera",
725 storageClass="Camera",
726 doc="Input camera.",
727 dimensions=["instrument", ],
728 isCalibration=True,
729 )
731 outputStats = cT.Output(
732 name="exposureStats",
733 doc="Output statistics.",
734 storageClass="StructuredDataDict",
735 dimensions=["instrument", "visit"],
736 )
737 outputResults = cT.Output(
738 name="exposureResults",
739 doc="Output results.",
740 storageClass="ArrowAstropy",
741 dimensions=["instrument", "visit"],
742 )
743 outputMatrix = cT.Output(
744 name="exposureMatrix",
745 doc="Output matrix.",
746 storageClass="ArrowAstropy",
747 dimensions=["instrument", "visit"],
748 )
750 def __init__(self, *, config=None):
751 super().__init__(config=config)
753 if not self.config.hasMatrixCatalog:
754 self.inputs.remove("inputMatrix")
755 self.outputs.remove("outputMatrix")
756 if not self.config.hasInputResults:
757 self.inputs.remove("inputResults")
758 if self.config.doDropStats:
759 self.outputs.remove("outputStats")
762class CpVerifyVisitExpMergeConfig(CpVerifyExpMergeConfig,
763 pipelineConnections=CpVerifyVisitExpMergeConnections):
764 pass
767class CpVerifyVisitExpMergeTask(CpVerifyExpMergeTask):
768 """Merge visit based data."""
770 ConfigClass = CpVerifyVisitExpMergeConfig
771 _DefaultName = 'cpVerifyVisitExpMerge'
773 pass
776class CpVerifyVisitRunMergeConnections(pipeBase.PipelineTaskConnections,
777 dimensions={"instrument"},
778 defaultTemplates={}):
779 inputStats = cT.Input(
780 name="exposureStats",
781 doc="Input statistics to merge.",
782 storageClass="StructuredDataDict",
783 dimensions=["instrument", "visit"],
784 multiple=True,
785 )
786 inputResults = cT.Input(
787 name="exposureResults",
788 doc="Input results table to merge.",
789 storageClass="ArrowAstropy",
790 dimensions=["instrument", "visit"],
791 multiple=True,
792 )
793 inputMatrix = cT.Input(
794 name="exposureMatrix",
795 doc="Input matrix table to merge.",
796 storageClass="ArrowAstropy",
797 dimensions=["instrument", "visit"],
798 multiple=True,
799 )
800 camera = cT.PrerequisiteInput(
801 name="camera",
802 storageClass="Camera",
803 doc="Input camera.",
804 dimensions=["instrument", ],
805 isCalibration=True,
806 )
808 outputStats = cT.Output(
809 name="runStats",
810 doc="Output statistics.",
811 storageClass="StructuredDataDict",
812 dimensions=["instrument"],
813 )
814 outputResults = cT.Output(
815 name="runResults",
816 doc="Output merged results table.",
817 storageClass="ArrowAstropy",
818 dimensions=["instrument", ],
819 )
820 outputMatrix = cT.Output(
821 name="runMatrix",
822 doc="Output merged matrix table.",
823 storageClass="ArrowAstropy",
824 dimensions=["instrument", ],
825 )
827 def __init__(self, *, config=None):
828 super().__init__(config=config)
830 if not self.config.hasMatrixCatalog:
831 self.inputs.remove("inputMatrix")
832 self.outputs.remove("outputMatrix")
833 if not self.config.hasInputResults:
834 self.inputs.remove("inputResults")
835 if self.config.doDropStats:
836 self.outputs.remove("outputStats")
839class CpVerifyVisitRunMergeConfig(CpVerifyRunMergeConfig,
840 pipelineConnections=CpVerifyVisitRunMergeConnections):
841 pass
844class CpVerifyVisitRunMergeTask(CpVerifyRunMergeTask):
845 """Merge visit based data."""
847 ConfigClass = CpVerifyVisitRunMergeConfig
848 _DefaultName = 'cpVerifyVisitRunMerge'
850 pass
851# End ExpMergeByVisit/RunMergeByVisit
854# Begin CalibMerge (this is a one-step)
855class CpVerifyCalibMergeConnections(pipeBase.PipelineTaskConnections,
856 dimensions={"instrument"},
857 defaultTemplates={}):
858 inputStats = cT.Input(
859 name="exposureStats",
860 doc="Input statistics to merge.",
861 storageClass="StructuredDataDict",
862 dimensions=["instrument", "detector"],
863 multiple=True,
864 )
865 inputResults = cT.Input(
866 name="exposureResults",
867 doc="Input results table to merge.",
868 storageClass="ArrowAstropy",
869 dimensions=["instrument", "detector"],
870 multiple=True,
871 )
872 inputMatrix = cT.Input(
873 name="exposureMatrix",
874 doc="Input matrix table to merge.",
875 storageClass="ArrowAstropy",
876 dimensions=["instrument", "detector"],
877 multiple=True,
878 )
879 camera = cT.PrerequisiteInput(
880 name="camera",
881 storageClass="Camera",
882 doc="Input camera.",
883 dimensions=["instrument", ],
884 isCalibration=True,
885 )
887 outputStats = cT.Output(
888 name="exposureStats",
889 doc="Output statistics.",
890 storageClass="StructuredDataDict",
891 dimensions=["instrument"],
892 )
893 outputResults = cT.Output(
894 name="runResults",
895 doc="Output merged results table.",
896 storageClass="ArrowAstropy",
897 dimensions=["instrument", ],
898 )
899 outputMatrix = cT.Output(
900 name="runMatrix",
901 doc="Output merged matrix table.",
902 storageClass="ArrowAstropy",
903 dimensions=["instrument", ],
904 )
906 def __init__(self, *, config=None):
907 super().__init__(config=config)
909 if not self.config.hasMatrixCatalog:
910 self.inputs.remove("inputMatrix")
911 self.outputs.remove("outputMatrix")
912 if not self.config.hasInputResults:
913 self.inputs.remove("inputResults")
914 if self.config.doDropStats:
915 self.outputs.remove("outputStats")
918class CpVerifyCalibMergeConfig(CpVerifyRunMergeConfig,
919 pipelineConnections=CpVerifyCalibMergeConnections):
920 """Configuration paramters for exposure stats merging.
921 """
922 pass
925class CpVerifyCalibMergeTask(CpVerifyRunMergeTask):
926 """Merge statistics from detectors together.
927 """
928 ConfigClass = CpVerifyCalibMergeConfig
929 _DefaultName = 'cpVerifyCalibMerge'
931 pass
932# End CalibMerge