Coverage for python / lsst / cp / verify / mergeResults.py: 29%

289 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:56 +0000

1# This file is part of cp_verify. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21from astropy.table import vstack, Table, TableMergeError 

22 

23import lsst.pipe.base as pipeBase 

24import lsst.pipe.base.connectionTypes as cT 

25import lsst.pex.config as pexConfig 

26 

27 

28__all__ = ['CpVerifyExpMergeConfig', 'CpVerifyExpMergeTask', 

29 'CpVerifyRunMergeConfig', 'CpVerifyRunMergeTask', 

30 'CpVerifyExpMergeByFilterConfig', 'CpVerifyExpMergeByFilterTask', 

31 'CpVerifyRunMergeByFilterConfig', 'CpVerifyRunMergeByFilterTask', 

32 'CpVerifyVisitExpMergeConfig', 'CpVerifyVisitExpMergeTask', 

33 'CpVerifyVisitRunMergeConfig', 'CpVerifyVisitRunMergeTask', 

34 'CpVerifyCalibMergeConfig', 'CpVerifyCalibMergeTask'] 

35 

36 

37class CpVerifyExpMergeConnections(pipeBase.PipelineTaskConnections, 

38 dimensions={"instrument", "exposure"}, 

39 defaultTemplates={}): 

40 inputStats = cT.Input( 

41 name="detectorStats", 

42 doc="Input statistics to merge.", 

43 storageClass="StructuredDataDict", 

44 dimensions=["instrument", "exposure", "detector"], 

45 multiple=True, 

46 ) 

47 inputResults = cT.Input( 

48 name="detectorResults", 

49 doc="Input results to merge.", 

50 storageClass="ArrowAstropy", 

51 dimensions=["instrument", "exposure", "detector"], 

52 multiple=True, 

53 ) 

54 inputMatrix = cT.Input( 

55 name="detectorMatrix", 

56 doc="Input matrix to merge.", 

57 storageClass="ArrowAstropy", 

58 dimensions=["instrument", "exposure", "detector"], 

59 multiple=True, 

60 ) 

61 camera = cT.PrerequisiteInput( 

62 name="camera", 

63 storageClass="Camera", 

64 doc="Input camera.", 

65 dimensions=["instrument", ], 

66 isCalibration=True, 

67 ) 

68 

69 outputStats = cT.Output( 

70 name="exposureStats", 

71 doc="Output statistics.", 

72 storageClass="StructuredDataDict", 

73 dimensions=["instrument", "exposure"], 

74 ) 

75 outputResults = cT.Output( 

76 name="exposureResults", 

77 doc="Output results.", 

78 storageClass="ArrowAstropy", 

79 dimensions=["instrument", "exposure"], 

80 ) 

81 outputMatrix = cT.Output( 

82 name="exposureMatrix", 

83 doc="Output matrix.", 

84 storageClass="ArrowAstropy", 

85 dimensions=["instrument", "exposure"], 

86 ) 

87 

88 def __init__(self, *, config=None): 

89 super().__init__(config=config) 

90 

91 if not self.config.hasMatrixCatalog: 

92 self.inputs.remove("inputMatrix") 

93 self.outputs.remove("outputMatrix") 

94 if not self.config.hasInputResults: 

95 self.inputs.remove("inputResults") 

96 if self.config.doDropStats: 

97 self.outputs.remove("outputStats") 

98 

99 

100class CpVerifyExpMergeConfig(pipeBase.PipelineTaskConfig, 

101 pipelineConnections=CpVerifyExpMergeConnections): 

102 """Configuration parameters for exposure stats merging. 

103 """ 

104 statKeywords = pexConfig.DictField( 

105 keytype=str, 

106 itemtype=str, 

107 doc="Dictionary of statistics to run on the set of detector values. The key should be the test " 

108 "name to record in the output, and the value should be the `lsst.afw.math` statistic name string.", 

109 default={}, 

110 ) 

111 hasMatrixCatalog = pexConfig.Field( 

112 dtype=bool, 

113 doc="Is there matrix catalog to merge?", 

114 default=False, 

115 ) 

116 hasInputResults = pexConfig.Field( 

117 dtype=bool, 

118 doc="Are there results tables to merge?", 

119 default=False, 

120 ) 

121 doDropStats = pexConfig.Field( 

122 dtype=bool, 

123 doc="Drop to-be-deprecated internal stats files to avoid butler dimension mismatch?", 

124 default=False, 

125 ) 

126 

127 mergeDimension = pexConfig.Field( 

128 dtype=str, 

129 doc="Dimension name that these inputs will be merged over.", 

130 default="detector", 

131 ) 

132 stageName = pexConfig.Field( 

133 dtype=str, 

134 doc="Stage name to use in any further analysis.", 

135 default="stageName", 

136 ) 

137 

138 

139class CpVerifyExpMergeTask(pipeBase.PipelineTask): 

140 """Merge statistics from detectors together. 

141 """ 

142 ConfigClass = CpVerifyExpMergeConfig 

143 _DefaultName = 'cpVerifyExpMerge' 

144 

145 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

146 inputs = butlerQC.get(inputRefs) 

147 

148 dimensions = [dict(exp.dataId.required) for exp in inputRefs.inputStats] 

149 inputs['inputDims'] = dimensions 

150 

151 outputs = self.run(**inputs) 

152 butlerQC.put(outputs, outputRefs) 

153 

154 def run(self, inputStats, inputDims, camera=None, inputResults=None, inputMatrix=None,): 

155 """Merge statistics. 

156 

157 Parameters 

158 ---------- 

159 inputStats : `dict` 

160 A nested dictionary of measured statistics and 

161 verification results. 

162 inputDims : `dict` 

163 The input dimensions for each element of the inputStats. 

164 camera : `lsst.afw.cameraGeom.Camera`, optional 

165 The camera definition, used for identifying amplifier and 

166 detector names. 

167 inputResults : `astropy.Table`, optional 

168 The statistics information, formatted into a flat table. 

169 inputMatrix : `astropy.Table`, optional 

170 A table of results that represent the elements of matrices 

171 of values. 

172 

173 Returns 

174 ------- 

175 outputStats : `dict` 

176 A nested dictionary of merged statistics and verification 

177 results. 

178 outputResults : `astropy.Table` 

179 Flat table containing the merged results from all 

180 inputResults. 

181 outputMatrix : `astropy.Table` 

182 Table containing the merge results from all inputMatrix. 

183 

184 See Also 

185 -------- 

186 lsst.cp.verify.CpVerifyStatsTask 

187 """ 

188 outputStats = {} # This contains failure information 

189 success = True 

190 

191 mergedStats = {} # This contains the merged set of subcomponent stats. 

192 for inStats, dimensions in zip(inputStats, inputDims): 

193 thisId = dimensions[self.config.mergeDimension] 

194 thisName = thisId 

195 

196 if self.config.mergeDimension == 'detector': 

197 thisName = camera[thisId].getName() 

198 

199 calcStats = {} 

200 

201 mergedStats[thisName] = inStats 

202 

203 if inStats['SUCCESS'] is True: 

204 calcStats['SUCCESS'] = True 

205 else: 

206 calcStats['SUCCESS'] = False 

207 calcStats['FAILURES'] = list() 

208 success = False 

209 

210 # See if we have verify information to check: 

211 if 'VERIFY' in inStats: 

212 # See if an exposure failed 

213 if 'EXP' in inStats['VERIFY'] and len(inStats['VERIFY']['EXP']) > 0: 

214 expSuccess = inStats['VERIFY']['EXP'].pop('SUCCESS', False) 

215 if not expSuccess: 

216 for testName, testResult in inStats['VERIFY']['EXP'].items(): 

217 if testResult is False: 

218 calcStats['FAILURES'].append(testName) 

219 

220 # See if a detector failed 

221 if 'DET' in inStats['VERIFY'] and len(inStats['VERIFY']['DET']) > 0: 

222 detSuccess = inStats['VERIFY']['DET'].pop('SUCCESS', False) 

223 if not detSuccess: 

224 for testName, testResult in inStats['VERIFY']['DET'].items(): 

225 if testResult is False: 

226 calcStats['FAILURES'].append(testName) 

227 

228 # See if an amplifier failed 

229 if 'AMP' in inStats['VERIFY'] and len(inStats['VERIFY']['AMP']) > 0: 

230 for ampName, ampStats in inStats['VERIFY']['AMP'].items(): 

231 ampSuccess = ampStats.pop('SUCCESS') 

232 if not ampSuccess: 

233 for testName, testResult in ampStats.items(): 

234 if testResult is False: 

235 calcStats['FAILURES'].append(ampName + " " + testName) 

236 

237 # See if a catalog failed 

238 if 'CATALOG' in inStats['VERIFY'] and len(inStats['VERIFY']['CATALOG']) > 0: 

239 for testName, testResult in inStats['VERIFY']['CATALOG'].items(): 

240 if testResult is False: 

241 calcStats['FAILURES'].append(testName) 

242 else: 

243 # No VERIFY info? This must be partially accumulated. 

244 # But we know there are failures somewhere. 

245 # Drop any "SUCCESS" keys 

246 _ = inStats.pop("SUCCESS", False) 

247 for statKey, statDict in inStats.items(): 

248 if 'SUCCESS' in statDict and not statDict['SUCCESS']: 

249 for failure in statDict['FAILURES']: 

250 calcStats['FAILURES'].append(f"{statKey} {failure}") 

251 

252 outputStats[thisName] = calcStats 

253 

254 if self.config.mergeDimension == 'detector': 

255 outKey = 'EXP' 

256 else: 

257 outKey = 'RUN' 

258 

259 groupSuccess = True 

260 if len(self.config.statKeywords): 

261 outputStats[outKey] = self.calcStatistics(mergedStats) 

262 outputStats['VERIFY'], groupSuccess = self.verify(mergedStats, outputStats) 

263 

264 outputStats['SUCCESS'] = success & groupSuccess 

265 

266 additionalResults = None 

267 if outKey in outputStats: 

268 # This is the only new information generated here. 

269 additionalResults, _ = self.pack(outputStats, inputDims, outKey) 

270 

271 outputResults = self.mergeTable(inputResults, additionalResults) 

272 if inputMatrix is not None: 

273 outputMatrix = self.mergeTable(inputMatrix) 

274 else: 

275 outputMatrix = None 

276 

277 return pipeBase.Struct( 

278 outputStats=outputStats, 

279 outputResults=outputResults, 

280 outputMatrix=outputMatrix, 

281 ) 

282 

283 @staticmethod 

284 def mergeTable(inputResults, newStats=None): 

285 """Merge input tables. 

286 

287 Parameters 

288 ---------- 

289 inputResults : `list` [`astropy.table.Table`] 

290 Input tables to merge. 

291 newStats : `astropy.table.Table` 

292 Additional table to merge. 

293 

294 Returns 

295 ------- 

296 merged : `astropy.table.Table` 

297 "Outer-join" merged table. 

298 """ 

299 if inputResults is None: 

300 return Table() 

301 

302 if len(inputResults) > 0: 

303 try: 

304 outputResults = vstack(inputResults) 

305 except TableMergeError: 

306 # astropy requires everything in a column to have the 

307 # same shape. Enforce that here. 

308 import numpy as np 

309 max_shapes = {} 

310 columns_to_fix = set() 

311 

312 # Scan the data we're merging to find the max sizes. 

313 for result in inputResults: 

314 for column in result.columns: 

315 shape = result[column].shape 

316 if column in max_shapes: 

317 max_shapes[column] = np.max([max_shapes[column], shape], axis=0) 

318 else: 

319 max_shapes[column] = shape 

320 

321 # Decide what needs fixing. 

322 for result in inputResults: 

323 for column in result.columns: 

324 shape = result[column].shape 

325 if column not in columns_to_fix: 

326 need_to_fix = False 

327 

328 for counter, (lh, rh) in enumerate(zip(shape, max_shapes[column])): 

329 if counter == 0: 

330 continue 

331 if lh != rh: 

332 need_to_fix = True 

333 if need_to_fix: 

334 columns_to_fix.add(column) 

335 

336 # Do the fix. 

337 for result in inputResults: 

338 for column in result.columns: 

339 if column not in columns_to_fix: 

340 continue 

341 

342 # The first dimension of these shapes is the 

343 # column length, which we can ignore in 

344 # vstacking. 

345 to_pad = np.subtract(max_shapes[column], result[column].shape) 

346 if len(to_pad) <= 1: 

347 continue 

348 to_pad[0] = 0 

349 # Only pad at the end. 

350 padding = [[0, int(pp)] for pp in to_pad] 

351 fixed_column = np.pad(result[column], padding, 

352 mode='constant', 

353 constant_values=np.nan) 

354 result[column] = fixed_column 

355 

356 # Will it work now? If not, this will raise 

357 outputResults = vstack(inputResults) 

358 else: 

359 outputResults = inputResults 

360 

361 if newStats: 

362 return vstack([outputResults, Table(newStats)]) 

363 else: 

364 return outputResults 

365 

366 def calcStatistics(self, statisticsDict): 

367 """Calculate exposure level statistics based on the existing 

368 per-amplifier and per-detector measurements. 

369 

370 Parameters 

371 ---------- 

372 statisticsDictionary : `dict [`str`, `dict` [`str`, scalar]], 

373 Dictionary of measured statistics. The top level 

374 dictionary is keyed on the detector names, and contains 

375 the measured statistics from the per-detector 

376 measurements. 

377 

378 Returns 

379 ------- 

380 outputStatistics : `dict` [`str, scalar] 

381 A dictionary of the statistics measured and their values. 

382 """ 

383 raise NotImplementedError("Subclasses must implement verification criteria.") 

384 

385 def verify(self, detectorStatistics, statisticsDictionary): 

386 

387 """Verify if the measured statistics meet the verification criteria. 

388 

389 Parameters 

390 ---------- 

391 detectorStatistics : `dict` [`str`, `dict` [`str`, scalar]] 

392 Merged set of input detector level statistics. 

393 statisticsDictionary : `dict` [`str`, `dict` [`str`, scalar]] 

394 Dictionary of measured statistics. The inner dictionary 

395 should have keys that are statistic names (`str`) with 

396 values that are some sort of scalar (`int` or `float` are 

397 the mostly likely types). 

398 

399 Returns 

400 ------- 

401 outputStatistics : `dict` [`str`, `dict` [`str`, `bool`]] 

402 A dictionary indexed by the amplifier name, containing 

403 dictionaries of the verification criteria. 

404 success : `bool` 

405 A boolean indicating if all tests have passed. 

406 

407 Raises 

408 ------ 

409 NotImplementedError : 

410 This method must be implemented by the calibration-type 

411 subclass. 

412 """ 

413 raise NotImplementedError("Subclasses must implement verification criteria.") 

414 

415 def pack(self, statisticsDict, dimensions, outKey): 

416 """Repack information into flat tables. 

417 

418 This method should be redefined in subclasses, if new 

419 statistics are measured. 

420 

421 Parameters 

422 ---------- 

423 statisticsDictionary : `dict` [`str`, `dict` [`str`, scalar]], 

424 Dictionary of measured statistics. The inner dictionary 

425 should have keys that are statistic names (`str`) with 

426 values that are some sort of scalar (`int` or `float` are 

427 the mostly likely types). 

428 dimensions : `dict` 

429 Dictionary of input dimensions. 

430 outKey : `str` 

431 Key to use to access the data to pack. 

432 

433 Returns 

434 ------- 

435 outputResults : `list` [`dict`] 

436 A list of rows to add to the output table. 

437 outputMatrix : `list` [`dict`] 

438 A list of rows to add to the output matrix. 

439 

440 Raises 

441 ------ 

442 NotImplementedError : 

443 This method must be implemented by the calibration-type 

444 subclass. 

445 

446 """ 

447 raise NotImplementedError("Subclasses must implement verification criteria.") 

448 

449 

450class CpVerifyRunMergeConnections(pipeBase.PipelineTaskConnections, 

451 dimensions={"instrument", }, 

452 defaultTemplates={}): 

453 inputStats = cT.Input( 

454 name="exposureStats", 

455 doc="Input statistics to merge.", 

456 storageClass="StructuredDataDict", 

457 dimensions=["instrument", "exposure"], 

458 multiple=True, 

459 ) 

460 inputResults = cT.Input( 

461 name="exposureResults", 

462 doc="Input results table to merge.", 

463 storageClass="ArrowAstropy", 

464 dimensions=["instrument", "exposure"], 

465 multiple=True, 

466 ) 

467 inputMatrix = cT.Input( 

468 name="exposureMatrix", 

469 doc="Input matrix table to merge.", 

470 storageClass="ArrowAstropy", 

471 dimensions=["instrument", "exposure"], 

472 multiple=True, 

473 ) 

474 camera = cT.PrerequisiteInput( 

475 name="camera", 

476 storageClass="Camera", 

477 doc="Input camera.", 

478 dimensions=["instrument", ], 

479 isCalibration=True, 

480 ) 

481 

482 outputStats = cT.Output( 

483 name="runStats", 

484 doc="Output statistics.", 

485 storageClass="StructuredDataDict", 

486 dimensions=["instrument", ], 

487 ) 

488 outputResults = cT.Output( 

489 name="runResults", 

490 doc="Output merged results table.", 

491 storageClass="ArrowAstropy", 

492 dimensions=["instrument",], 

493 ) 

494 outputMatrix = cT.Output( 

495 name="runMatrix", 

496 doc="Output merged matrix table.", 

497 storageClass="ArrowAstropy", 

498 dimensions=["instrument",], 

499 ) 

500 

501 def __init__(self, *, config=None): 

502 super().__init__(config=config) 

503 

504 if not self.config.hasMatrixCatalog: 

505 self.inputs.remove("inputMatrix") 

506 self.outputs.remove("outputMatrix") 

507 if not self.config.hasInputResults: 

508 self.inputs.remove("inputResults") 

509 if self.config.doDropStats: 

510 self.outputs.remove("outputStats") 

511 

512 

513class CpVerifyRunMergeConfig(CpVerifyExpMergeConfig, 

514 pipelineConnections=CpVerifyRunMergeConnections): 

515 """Configuration paramters for exposure stats merging. 

516 """ 

517 mergeDimension = pexConfig.Field( 

518 dtype=str, 

519 doc="Dimension name for this input.", 

520 default="exposure", 

521 ) 

522 

523 

524class CpVerifyRunMergeTask(CpVerifyExpMergeTask): 

525 """Merge statistics from detectors together. 

526 """ 

527 ConfigClass = CpVerifyRunMergeConfig 

528 _DefaultName = 'cpVerifyRunMerge' 

529 

530 pass 

531# End ExpMerge/RunMerge 

532 

533 

534# Begin ExpMergeByFilter/RunMergeByFilter 

535class CpVerifyExpMergeByFilterConnections(pipeBase.PipelineTaskConnections, 

536 dimensions={"instrument", "exposure", "physical_filter"}, 

537 defaultTemplates={}): 

538 inputStats = cT.Input( 

539 name="exposureStats", 

540 doc="Input statistics to merge.", 

541 storageClass="StructuredDataDict", 

542 dimensions=["instrument", "exposure", "detector", "physical_filter"], 

543 multiple=True, 

544 ) 

545 inputResults = cT.Input( 

546 name="exposureResults", 

547 doc="Input results table to merge.", 

548 storageClass="ArrowAstropy", 

549 dimensions=["instrument", "exposure", "detector", "physical_filter"], 

550 multiple=True, 

551 ) 

552 inputMatrix = cT.Input( 

553 name="exposureMatrix", 

554 doc="Input matrix table to merge.", 

555 storageClass="ArrowAstropy", 

556 dimensions=["instrument", "exposure", "detector", "physical_filter"], 

557 multiple=True, 

558 ) 

559 camera = cT.PrerequisiteInput( 

560 name="camera", 

561 storageClass="Camera", 

562 doc="Input camera.", 

563 dimensions=["instrument", ], 

564 isCalibration=True, 

565 ) 

566 

567 outputStats = cT.Output( 

568 name="runStats", 

569 doc="Output statistics.", 

570 storageClass="StructuredDataDict", 

571 dimensions=["instrument", "exposure", "physical_filter"], 

572 ) 

573 outputResults = cT.Output( 

574 name="runResults", 

575 doc="Output merged results table.", 

576 storageClass="ArrowAstropy", 

577 dimensions=["instrument", "exposure", "physical_filter"], 

578 ) 

579 outputMatrix = cT.Output( 

580 name="runMatrix", 

581 doc="Output merged matrix table.", 

582 storageClass="ArrowAstropy", 

583 dimensions=["instrument", "exposure", "physical_filter"], 

584 ) 

585 

586 def __init__(self, *, config=None): 

587 super().__init__(config=config) 

588 

589 if not self.config.hasMatrixCatalog: 

590 self.inputs.remove("inputMatrix") 

591 self.outputs.remove("outputMatrix") 

592 if not self.config.hasInputResults: 

593 self.inputs.remove("inputResults") 

594 if self.config.doDropStats: 

595 self.outputs.remove("outputStats") 

596 

597 

598class CpVerifyExpMergeByFilterConfig(CpVerifyExpMergeConfig, 

599 pipelineConnections=CpVerifyExpMergeByFilterConnections): 

600 """Configuration paramters for exposure stats merging. 

601 """ 

602 mergeDimension = pexConfig.Field( 

603 dtype=str, 

604 doc="Dimension name for this input.", 

605 default="detector", 

606 ) 

607 

608 

609class CpVerifyExpMergeByFilterTask(CpVerifyExpMergeTask): 

610 """Merge statistics from detectors together. 

611 """ 

612 ConfigClass = CpVerifyRunMergeConfig 

613 _DefaultName = 'cpVerifyRunMerge' 

614 

615 pass 

616 

617 

618class CpVerifyRunMergeByFilterConnections(pipeBase.PipelineTaskConnections, 

619 dimensions={"instrument", "physical_filter"}, 

620 defaultTemplates={}): 

621 inputStats = cT.Input( 

622 name="exposureStats", 

623 doc="Input statistics to merge.", 

624 storageClass="StructuredDataDict", 

625 dimensions=["instrument", "exposure", "physical_filter"], 

626 multiple=True, 

627 ) 

628 inputResults = cT.Input( 

629 name="exposureResults", 

630 doc="Input results table to merge.", 

631 storageClass="ArrowAstropy", 

632 dimensions=["instrument", "exposure", "physical_filter"], 

633 multiple=True, 

634 ) 

635 inputMatrix = cT.Input( 

636 name="exposureMatrix", 

637 doc="Input matrix table to merge.", 

638 storageClass="ArrowAstropy", 

639 dimensions=["instrument", "exposure", "physical_filter"], 

640 multiple=True, 

641 ) 

642 camera = cT.PrerequisiteInput( 

643 name="camera", 

644 storageClass="Camera", 

645 doc="Input camera.", 

646 dimensions=["instrument", ], 

647 isCalibration=True, 

648 ) 

649 

650 outputStats = cT.Output( 

651 name="runStats", 

652 doc="Output statistics.", 

653 storageClass="StructuredDataDict", 

654 dimensions=["instrument", "physical_filter"], 

655 ) 

656 outputResults = cT.Output( 

657 name="runResults", 

658 doc="Output merged results table.", 

659 storageClass="ArrowAstropy", 

660 dimensions=["instrument", "physical_filter"], 

661 ) 

662 outputMatrix = cT.Output( 

663 name="runMatrix", 

664 doc="Output merged matrix table.", 

665 storageClass="ArrowAstropy", 

666 dimensions=["instrument", "physical_filter"], 

667 ) 

668 

669 def __init__(self, *, config=None): 

670 super().__init__(config=config) 

671 

672 if not self.config.hasMatrixCatalog: 

673 self.inputs.remove("inputMatrix") 

674 self.outputs.remove("outputMatrix") 

675 if not self.config.hasInputResults: 

676 self.inputs.remove("inputResults") 

677 if self.config.doDropStats: 

678 self.outputs.remove("outputStats") 

679 

680 

681class CpVerifyRunMergeByFilterConfig(CpVerifyRunMergeConfig, 

682 pipelineConnections=CpVerifyRunMergeByFilterConnections): 

683 """Configuration paramters for exposure stats merging. 

684 """ 

685 pass 

686 

687 

688class CpVerifyRunMergeByFilterTask(CpVerifyExpMergeTask): 

689 """Merge statistics from detectors together. 

690 """ 

691 ConfigClass = CpVerifyRunMergeByFilterConfig 

692 _DefaultName = 'cpVerifyRunMergeByFilter' 

693 

694 pass 

695# End ExpMergeByFilter/RunMergeByFilter 

696 

697 

698# Begin ExpMergeByVisit/RunMergeByVisit 

699class CpVerifyVisitExpMergeConnections(pipeBase.PipelineTaskConnections, 

700 dimensions={"instrument", "visit"}, 

701 defaultTemplates={}): 

702 inputStats = cT.Input( 

703 name="detectorStats", 

704 doc="Input statistics to merge.", 

705 storageClass="StructuredDataDict", 

706 dimensions=["instrument", "exposure", "detector"], 

707 multiple=True, 

708 ) 

709 inputResults = cT.Input( 

710 name="detectorResults", 

711 doc="Input results to merge.", 

712 storageClass="ArrowAstropy", 

713 dimensions=["instrument", "exposure", "detector"], 

714 multiple=True, 

715 ) 

716 inputMatrix = cT.Input( 

717 name="detectorMatrix", 

718 doc="Input matrix to merge.", 

719 storageClass="ArrowAstropy", 

720 dimensions=["instrument", "visit", "detector"], 

721 multiple=True, 

722 ) 

723 camera = cT.PrerequisiteInput( 

724 name="camera", 

725 storageClass="Camera", 

726 doc="Input camera.", 

727 dimensions=["instrument", ], 

728 isCalibration=True, 

729 ) 

730 

731 outputStats = cT.Output( 

732 name="exposureStats", 

733 doc="Output statistics.", 

734 storageClass="StructuredDataDict", 

735 dimensions=["instrument", "visit"], 

736 ) 

737 outputResults = cT.Output( 

738 name="exposureResults", 

739 doc="Output results.", 

740 storageClass="ArrowAstropy", 

741 dimensions=["instrument", "visit"], 

742 ) 

743 outputMatrix = cT.Output( 

744 name="exposureMatrix", 

745 doc="Output matrix.", 

746 storageClass="ArrowAstropy", 

747 dimensions=["instrument", "visit"], 

748 ) 

749 

750 def __init__(self, *, config=None): 

751 super().__init__(config=config) 

752 

753 if not self.config.hasMatrixCatalog: 

754 self.inputs.remove("inputMatrix") 

755 self.outputs.remove("outputMatrix") 

756 if not self.config.hasInputResults: 

757 self.inputs.remove("inputResults") 

758 if self.config.doDropStats: 

759 self.outputs.remove("outputStats") 

760 

761 

762class CpVerifyVisitExpMergeConfig(CpVerifyExpMergeConfig, 

763 pipelineConnections=CpVerifyVisitExpMergeConnections): 

764 pass 

765 

766 

767class CpVerifyVisitExpMergeTask(CpVerifyExpMergeTask): 

768 """Merge visit based data.""" 

769 

770 ConfigClass = CpVerifyVisitExpMergeConfig 

771 _DefaultName = 'cpVerifyVisitExpMerge' 

772 

773 pass 

774 

775 

776class CpVerifyVisitRunMergeConnections(pipeBase.PipelineTaskConnections, 

777 dimensions={"instrument"}, 

778 defaultTemplates={}): 

779 inputStats = cT.Input( 

780 name="exposureStats", 

781 doc="Input statistics to merge.", 

782 storageClass="StructuredDataDict", 

783 dimensions=["instrument", "visit"], 

784 multiple=True, 

785 ) 

786 inputResults = cT.Input( 

787 name="exposureResults", 

788 doc="Input results table to merge.", 

789 storageClass="ArrowAstropy", 

790 dimensions=["instrument", "visit"], 

791 multiple=True, 

792 ) 

793 inputMatrix = cT.Input( 

794 name="exposureMatrix", 

795 doc="Input matrix table to merge.", 

796 storageClass="ArrowAstropy", 

797 dimensions=["instrument", "visit"], 

798 multiple=True, 

799 ) 

800 camera = cT.PrerequisiteInput( 

801 name="camera", 

802 storageClass="Camera", 

803 doc="Input camera.", 

804 dimensions=["instrument", ], 

805 isCalibration=True, 

806 ) 

807 

808 outputStats = cT.Output( 

809 name="runStats", 

810 doc="Output statistics.", 

811 storageClass="StructuredDataDict", 

812 dimensions=["instrument"], 

813 ) 

814 outputResults = cT.Output( 

815 name="runResults", 

816 doc="Output merged results table.", 

817 storageClass="ArrowAstropy", 

818 dimensions=["instrument", ], 

819 ) 

820 outputMatrix = cT.Output( 

821 name="runMatrix", 

822 doc="Output merged matrix table.", 

823 storageClass="ArrowAstropy", 

824 dimensions=["instrument", ], 

825 ) 

826 

827 def __init__(self, *, config=None): 

828 super().__init__(config=config) 

829 

830 if not self.config.hasMatrixCatalog: 

831 self.inputs.remove("inputMatrix") 

832 self.outputs.remove("outputMatrix") 

833 if not self.config.hasInputResults: 

834 self.inputs.remove("inputResults") 

835 if self.config.doDropStats: 

836 self.outputs.remove("outputStats") 

837 

838 

839class CpVerifyVisitRunMergeConfig(CpVerifyRunMergeConfig, 

840 pipelineConnections=CpVerifyVisitRunMergeConnections): 

841 pass 

842 

843 

844class CpVerifyVisitRunMergeTask(CpVerifyRunMergeTask): 

845 """Merge visit based data.""" 

846 

847 ConfigClass = CpVerifyVisitRunMergeConfig 

848 _DefaultName = 'cpVerifyVisitRunMerge' 

849 

850 pass 

851# End ExpMergeByVisit/RunMergeByVisit 

852 

853 

854# Begin CalibMerge (this is a one-step) 

855class CpVerifyCalibMergeConnections(pipeBase.PipelineTaskConnections, 

856 dimensions={"instrument"}, 

857 defaultTemplates={}): 

858 inputStats = cT.Input( 

859 name="exposureStats", 

860 doc="Input statistics to merge.", 

861 storageClass="StructuredDataDict", 

862 dimensions=["instrument", "detector"], 

863 multiple=True, 

864 ) 

865 inputResults = cT.Input( 

866 name="exposureResults", 

867 doc="Input results table to merge.", 

868 storageClass="ArrowAstropy", 

869 dimensions=["instrument", "detector"], 

870 multiple=True, 

871 ) 

872 inputMatrix = cT.Input( 

873 name="exposureMatrix", 

874 doc="Input matrix table to merge.", 

875 storageClass="ArrowAstropy", 

876 dimensions=["instrument", "detector"], 

877 multiple=True, 

878 ) 

879 camera = cT.PrerequisiteInput( 

880 name="camera", 

881 storageClass="Camera", 

882 doc="Input camera.", 

883 dimensions=["instrument", ], 

884 isCalibration=True, 

885 ) 

886 

887 outputStats = cT.Output( 

888 name="exposureStats", 

889 doc="Output statistics.", 

890 storageClass="StructuredDataDict", 

891 dimensions=["instrument"], 

892 ) 

893 outputResults = cT.Output( 

894 name="runResults", 

895 doc="Output merged results table.", 

896 storageClass="ArrowAstropy", 

897 dimensions=["instrument", ], 

898 ) 

899 outputMatrix = cT.Output( 

900 name="runMatrix", 

901 doc="Output merged matrix table.", 

902 storageClass="ArrowAstropy", 

903 dimensions=["instrument", ], 

904 ) 

905 

906 def __init__(self, *, config=None): 

907 super().__init__(config=config) 

908 

909 if not self.config.hasMatrixCatalog: 

910 self.inputs.remove("inputMatrix") 

911 self.outputs.remove("outputMatrix") 

912 if not self.config.hasInputResults: 

913 self.inputs.remove("inputResults") 

914 if self.config.doDropStats: 

915 self.outputs.remove("outputStats") 

916 

917 

918class CpVerifyCalibMergeConfig(CpVerifyRunMergeConfig, 

919 pipelineConnections=CpVerifyCalibMergeConnections): 

920 """Configuration paramters for exposure stats merging. 

921 """ 

922 pass 

923 

924 

925class CpVerifyCalibMergeTask(CpVerifyRunMergeTask): 

926 """Merge statistics from detectors together. 

927 """ 

928 ConfigClass = CpVerifyCalibMergeConfig 

929 _DefaultName = 'cpVerifyCalibMerge' 

930 

931 pass 

932# End CalibMerge