Coverage for python / lsst / pipe / tasks / postprocess.py: 25%

859 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:39 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask", 

23 "WriteSourceTableConfig", "WriteSourceTableTask", 

24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask", 

25 "PostprocessAnalysis", 

26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask", 

27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask", 

28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask", 

29 "TransformSourceTableConfig", "TransformSourceTableTask", 

30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask", 

31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask", 

32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask", 

33 "MakeVisitTableConfig", "MakeVisitTableTask", 

34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask", 

35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask", 

36 "ConsolidateTractConfig", "ConsolidateTractTask", 

37 "ComputeColumnsAction", "ModelExtendednessColumnAction", 

38 ] 

39 

40from collections import defaultdict 

41import dataclasses 

42from deprecated.sphinx import deprecated 

43import functools 

44import logging 

45import numbers 

46import os 

47from typing import Iterable 

48 

49import numpy as np 

50import pandas as pd 

51import astropy.table 

52import astropy.units 

53from numpy.typing import NDArray 

54 

55import lsst.geom 

56import lsst.pex.config as pexConfig 

57import lsst.pipe.base as pipeBase 

58import lsst.daf.base as dafBase 

59from lsst.daf.butler.formatters.parquet import pandas_to_astropy 

60from lsst.pex.config.configurableActions import ConfigurableAction, ConfigurableActionStructField 

61from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes 

62import lsst.afw.table as afwTable 

63from lsst.afw.image import ExposureSummaryStats, ExposureF 

64from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig 

65from lsst.obs.base.utils import strip_provenance_from_fits_header, TableVStack 

66from lsst.meas.astrom.refit_pointing import RefitPointingTask 

67 

68from .coaddBase import reorderRefs 

69from .functors import CompositeFunctor, Column 

70from .schemaUtils import convertDataFrameToSdmSchema, readSdmSchemaFile 

71 

72log = logging.getLogger(__name__) 

73 

74 

75def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None): 

76 """Flattens a dataframe with multilevel column index. 

77 """ 

78 newDf = pd.DataFrame() 

79 # band is the level 0 index 

80 dfBands = df.columns.unique(level=0).values 

81 for band in dfBands: 

82 subdf = df[band] 

83 columnFormat = "{0}{1}" if camelCase else "{0}_{1}" 

84 newColumns = {c: columnFormat.format(band, c) 

85 for c in subdf.columns if c not in noDupCols} 

86 cols = list(newColumns.keys()) 

87 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1) 

88 

89 # Band must be present in the input and output or else column is all NaN: 

90 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands)) 

91 # Get the unexploded columns from any present band's partition 

92 noDupDf = df[presentBands[0]][noDupCols] 

93 newDf = pd.concat([noDupDf, newDf], axis=1) 

94 return newDf 

95 

96 

97class WriteObjectTableConnections(pipeBase.PipelineTaskConnections, 

98 defaultTemplates={"coaddName": "deep"}, 

99 dimensions=("tract", "patch", "skymap")): 

100 inputCatalogMeas = connectionTypes.Input( 

101 doc="Catalog of source measurements on the deepCoadd.", 

102 dimensions=("tract", "patch", "band", "skymap"), 

103 storageClass="SourceCatalog", 

104 name="{coaddName}Coadd_meas", 

105 multiple=True 

106 ) 

107 inputCatalogForcedSrc = connectionTypes.Input( 

108 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.", 

109 dimensions=("tract", "patch", "band", "skymap"), 

110 storageClass="SourceCatalog", 

111 name="{coaddName}Coadd_forced_src", 

112 multiple=True 

113 ) 

114 inputCatalogPsfsMultiprofit = connectionTypes.Input( 

115 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.", 

116 dimensions=("tract", "patch", "band", "skymap"), 

117 storageClass="ArrowAstropy", 

118 name="{coaddName}Coadd_psfs_multiprofit", 

119 multiple=True, 

120 ) 

121 outputCatalog = connectionTypes.Output( 

122 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, " 

123 "stored as a DataFrame with a multi-level column index per-patch.", 

124 dimensions=("tract", "patch", "skymap"), 

125 storageClass="DataFrame", 

126 name="{coaddName}Coadd_obj" 

127 ) 

128 

129 

130class WriteObjectTableConfig(pipeBase.PipelineTaskConfig, 

131 pipelineConnections=WriteObjectTableConnections): 

132 coaddName = pexConfig.Field( 

133 dtype=str, 

134 default="deep", 

135 doc="Name of coadd" 

136 ) 

137 

138 

139class WriteObjectTableTask(pipeBase.PipelineTask): 

140 """Write filter-merged object tables as a DataFrame in parquet format. 

141 """ 

142 _DefaultName = "writeObjectTable" 

143 ConfigClass = WriteObjectTableConfig 

144 

145 # Tag of output dataset written by `MergeSourcesTask.write` 

146 outputDataset = "obj" 

147 

148 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

149 inputs = butlerQC.get(inputRefs) 

150 

151 catalogs = defaultdict(dict) 

152 for dataset, connection in ( 

153 ("meas", "inputCatalogMeas"), 

154 ("forced_src", "inputCatalogForcedSrc"), 

155 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"), 

156 ): 

157 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]): 

158 catalogs[ref.dataId["band"]][dataset] = cat 

159 

160 dataId = butlerQC.quantum.dataId 

161 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"]) 

162 outputs = pipeBase.Struct(outputCatalog=df) 

163 butlerQC.put(outputs, outputRefs) 

164 

165 def run(self, catalogs, tract, patch): 

166 """Merge multiple catalogs. 

167 

168 Parameters 

169 ---------- 

170 catalogs : `dict` 

171 Mapping from filter names to dict of catalogs. 

172 tract : int 

173 tractId to use for the tractId column. 

174 patch : str 

175 patchId to use for the patchId column. 

176 

177 Returns 

178 ------- 

179 catalog : `pandas.DataFrame` 

180 Merged dataframe. 

181 

182 Raises 

183 ------ 

184 ValueError 

185 Raised if any of the catalogs is of an unsupported type. 

186 """ 

187 dfs = [] 

188 for filt, tableDict in catalogs.items(): 

189 for dataset, table in tableDict.items(): 

190 # Convert afwTable to pandas DataFrame if needed 

191 if isinstance(table, pd.DataFrame): 

192 df = table 

193 elif isinstance(table, afwTable.SourceCatalog): 

194 df = table.asAstropy().to_pandas() 

195 elif isinstance(table, astropy.table.Table): 

196 df = table.to_pandas() 

197 else: 

198 raise ValueError(f"{dataset=} has unsupported {type(table)=}") 

199 df.set_index("id", drop=True, inplace=True) 

200 

201 # Sort columns by name, to ensure matching schema among patches 

202 df = df.reindex(sorted(df.columns), axis=1) 

203 df = df.assign(tractId=tract, patchId=patch) 

204 

205 # Make columns a 3-level MultiIndex 

206 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns], 

207 names=("dataset", "band", "column")) 

208 dfs.append(df) 

209 

210 # We do this dance and not `pd.concat(dfs)` because the pandas 

211 # concatenation uses infinite memory. 

212 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs) 

213 return catalog 

214 

215 

216class WriteSourceTableConnections(pipeBase.PipelineTaskConnections, 

217 defaultTemplates={"catalogType": ""}, 

218 dimensions=("instrument", "visit", "detector")): 

219 

220 catalog = connectionTypes.Input( 

221 doc="Input full-depth catalog of sources produced by CalibrateTask", 

222 name="{catalogType}src", 

223 storageClass="SourceCatalog", 

224 dimensions=("instrument", "visit", "detector") 

225 ) 

226 outputCatalog = connectionTypes.Output( 

227 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.", 

228 name="{catalogType}source", 

229 storageClass="ArrowAstropy", 

230 dimensions=("instrument", "visit", "detector") 

231 ) 

232 

233 

234class WriteSourceTableConfig(pipeBase.PipelineTaskConfig, 

235 pipelineConnections=WriteSourceTableConnections): 

236 pass 

237 

238 

239class WriteSourceTableTask(pipeBase.PipelineTask): 

240 """Write source table to DataFrame Parquet format. 

241 """ 

242 _DefaultName = "writeSourceTable" 

243 ConfigClass = WriteSourceTableConfig 

244 

245 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

246 inputs = butlerQC.get(inputRefs) 

247 inputs["visit"] = butlerQC.quantum.dataId["visit"] 

248 inputs["detector"] = butlerQC.quantum.dataId["detector"] 

249 result = self.run(**inputs) 

250 outputs = pipeBase.Struct(outputCatalog=result.table) 

251 butlerQC.put(outputs, outputRefs) 

252 

253 def run(self, catalog, visit, detector, **kwargs): 

254 """Convert `src` catalog to an Astropy table. 

255 

256 Parameters 

257 ---------- 

258 catalog: `afwTable.SourceCatalog` 

259 catalog to be converted 

260 visit, detector: `int` 

261 Visit and detector ids to be added as columns. 

262 **kwargs 

263 Additional keyword arguments are ignored as a convenience for 

264 subclasses that pass the same arguments to several different 

265 methods. 

266 

267 Returns 

268 ------- 

269 result : `~lsst.pipe.base.Struct` 

270 ``table`` 

271 `astropy.table.Table` version of the input catalog 

272 """ 

273 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector) 

274 tbl = catalog.asAstropy() 

275 tbl["visit"] = visit 

276 # int16 instead of uint8 because databases don't like unsigned bytes. 

277 tbl["detector"] = np.int16(detector) 

278 

279 return pipeBase.Struct(table=tbl) 

280 

281 

282class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections, 

283 defaultTemplates={"catalogType": ""}, 

284 dimensions=("instrument", "visit", "detector", "skymap")): 

285 visitSummary = connectionTypes.Input( 

286 doc="Input visit-summary catalog with updated calibration objects.", 

287 name="finalVisitSummary", 

288 storageClass="ExposureCatalog", 

289 dimensions=("instrument", "visit",), 

290 ) 

291 

292 def __init__(self, config): 

293 # We don't want the input catalog here to be an initial existence 

294 # constraint in QG generation, because that can unfortunately limit the 

295 # set of data IDs of inputs to other tasks, even those that run earlier 

296 # (e.g. updateVisitSummary), when the input 'src' catalog is not 

297 # produced. It's safer to just use 'visitSummary' existence as an 

298 # initial constraint, and then let the graph prune out the detectors 

299 # that don't have a 'src' for this task only. 

300 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True) 

301 

302 

303class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig, 

304 pipelineConnections=WriteRecalibratedSourceTableConnections): 

305 

306 doReevaluatePhotoCalib = pexConfig.Field( 

307 dtype=bool, 

308 default=True, 

309 doc=("Add or replace local photoCalib columns"), 

310 ) 

311 doReevaluateSkyWcs = pexConfig.Field( 

312 dtype=bool, 

313 default=True, 

314 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"), 

315 ) 

316 

317 

318class WriteRecalibratedSourceTableTask(WriteSourceTableTask): 

319 """Write source table to DataFrame Parquet format. 

320 """ 

321 _DefaultName = "writeRecalibratedSourceTable" 

322 ConfigClass = WriteRecalibratedSourceTableConfig 

323 

324 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

325 inputs = butlerQC.get(inputRefs) 

326 

327 inputs["visit"] = butlerQC.quantum.dataId["visit"] 

328 inputs["detector"] = butlerQC.quantum.dataId["detector"] 

329 

330 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs: 

331 exposure = ExposureF() 

332 inputs["exposure"] = self.prepareCalibratedExposure( 

333 exposure=exposure, 

334 visitSummary=inputs["visitSummary"], 

335 detectorId=butlerQC.quantum.dataId["detector"] 

336 ) 

337 inputs["catalog"] = self.addCalibColumns(**inputs) 

338 

339 result = self.run(**inputs) 

340 outputs = pipeBase.Struct(outputCatalog=result.table) 

341 butlerQC.put(outputs, outputRefs) 

342 

343 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None): 

344 """Prepare a calibrated exposure and apply external calibrations 

345 if so configured. 

346 

347 Parameters 

348 ---------- 

349 exposure : `lsst.afw.image.exposure.Exposure` 

350 Input exposure to adjust calibrations. May be an empty Exposure. 

351 detectorId : `int` 

352 Detector ID associated with the exposure. 

353 visitSummary : `lsst.afw.table.ExposureCatalog`, optional 

354 Exposure catalog with all calibration objects. WCS and PhotoCalib 

355 are always applied if ``visitSummary`` is provided and those 

356 components are not `None`. 

357 

358 Returns 

359 ------- 

360 exposure : `lsst.afw.image.exposure.Exposure` 

361 Exposure with adjusted calibrations. 

362 """ 

363 if visitSummary is not None: 

364 row = visitSummary.find(detectorId) 

365 if row is None: 

366 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.") 

367 if (photoCalib := row.getPhotoCalib()) is None: 

368 self.log.warning("Detector id %s has None for photoCalib in visit summary; " 

369 "skipping reevaluation of photoCalib.", detectorId) 

370 exposure.setPhotoCalib(None) 

371 else: 

372 exposure.setPhotoCalib(photoCalib) 

373 if (skyWcs := row.getWcs()) is None: 

374 self.log.warning("Detector id %s has None for skyWcs in visit summary; " 

375 "skipping reevaluation of skyWcs.", detectorId) 

376 exposure.setWcs(None) 

377 else: 

378 exposure.setWcs(skyWcs) 

379 

380 return exposure 

381 

382 def addCalibColumns(self, catalog, exposure, **kwargs): 

383 """Add replace columns with calibs evaluated at each centroid 

384 

385 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in 

386 a source catalog, by rerunning the plugins. 

387 

388 Parameters 

389 ---------- 

390 catalog : `lsst.afw.table.SourceCatalog` 

391 catalog to which calib columns will be added 

392 exposure : `lsst.afw.image.exposure.Exposure` 

393 Exposure with attached PhotoCalibs and SkyWcs attributes to be 

394 reevaluated at local centroids. Pixels are not required. 

395 **kwargs 

396 Additional keyword arguments are ignored to facilitate passing the 

397 same arguments to several methods. 

398 

399 Returns 

400 ------- 

401 newCat: `lsst.afw.table.SourceCatalog` 

402 Source Catalog with requested local calib columns 

403 """ 

404 measureConfig = SingleFrameMeasurementTask.ConfigClass() 

405 measureConfig.doReplaceWithNoise = False 

406 

407 # Clear all slots, because we aren't running the relevant plugins. 

408 for slot in measureConfig.slots: 

409 setattr(measureConfig.slots, slot, None) 

410 

411 measureConfig.plugins.names = [] 

412 if self.config.doReevaluateSkyWcs: 

413 measureConfig.plugins.names.add("base_LocalWcs") 

414 self.log.info("Re-evaluating base_LocalWcs plugin") 

415 if self.config.doReevaluatePhotoCalib: 

416 measureConfig.plugins.names.add("base_LocalPhotoCalib") 

417 self.log.info("Re-evaluating base_LocalPhotoCalib plugin") 

418 pluginsNotToCopy = tuple(measureConfig.plugins.names) 

419 

420 # Create a new schema and catalog 

421 # Copy all columns from original except for the ones to reevaluate 

422 aliasMap = catalog.schema.getAliasMap() 

423 mapper = afwTable.SchemaMapper(catalog.schema) 

424 for item in catalog.schema: 

425 if not item.field.getName().startswith(pluginsNotToCopy): 

426 mapper.addMapping(item.key) 

427 

428 schema = mapper.getOutputSchema() 

429 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema) 

430 schema.setAliasMap(aliasMap) 

431 newCat = afwTable.SourceCatalog(schema) 

432 newCat.extend(catalog, mapper=mapper) 

433 

434 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to 

435 # update here. LocalPhotoCalibs are applied during transform tasks. 

436 # Update coord_ra/coord_dec, which are expected to be positions on the 

437 # sky and are used as such in sdm tables without transform 

438 if self.config.doReevaluateSkyWcs and exposure.wcs is not None: 

439 afwTable.updateSourceCoords(exposure.wcs, newCat) 

440 wcsPlugin = measurement.plugins["base_LocalWcs"] 

441 else: 

442 wcsPlugin = None 

443 

444 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None: 

445 pcPlugin = measurement.plugins["base_LocalPhotoCalib"] 

446 else: 

447 pcPlugin = None 

448 

449 for row in newCat: 

450 if wcsPlugin is not None: 

451 wcsPlugin.measure(row, exposure) 

452 if pcPlugin is not None: 

453 pcPlugin.measure(row, exposure) 

454 

455 return newCat 

456 

457 

458class PostprocessAnalysis(object): 

459 """Calculate columns from DataFrames or handles storing DataFrames. 

460 

461 This object manages and organizes an arbitrary set of computations 

462 on a catalog. The catalog is defined by a 

463 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object 

464 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the 

465 computations are defined by a collection of 

466 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a 

467 ``CompositeFunctor``). 

468 

469 After the object is initialized, accessing the ``.df`` attribute (which 

470 holds the `pandas.DataFrame` containing the results of the calculations) 

471 triggers computation of said dataframe. 

472 

473 One of the conveniences of using this object is the ability to define a 

474 desired common filter for all functors. This enables the same functor 

475 collection to be passed to several different `PostprocessAnalysis` objects 

476 without having to change the original functor collection, since the ``filt`` 

477 keyword argument of this object triggers an overwrite of the ``filt`` 

478 property for all functors in the collection. 

479 

480 This object also allows a list of refFlags to be passed, and defines a set 

481 of default refFlags that are always included even if not requested. 

482 

483 If a list of DataFrames or Handles is passed, rather than a single one, 

484 then the calculations will be mapped over all the input catalogs. In 

485 principle, it should be straightforward to parallelize this activity, but 

486 initial tests have failed (see TODO in code comments). 

487 

488 Parameters 

489 ---------- 

490 handles : `~lsst.daf.butler.DeferredDatasetHandle` or 

491 `~lsst.pipe.base.InMemoryDatasetHandle` or 

492 list of these. 

493 Source catalog(s) for computation. 

494 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor` 

495 Computations to do (functors that act on ``handles``). 

496 If a dict, the output 

497 DataFrame will have columns keyed accordingly. 

498 If a list, the column keys will come from the 

499 ``.shortname`` attribute of each functor. 

500 

501 filt : `str`, optional 

502 Filter in which to calculate. If provided, 

503 this will overwrite any existing ``.filt`` attribute 

504 of the provided functors. 

505 

506 flags : `list`, optional 

507 List of flags (per-band) to include in output table. 

508 Taken from the ``meas`` dataset if applied to a multilevel Object Table. 

509 

510 refFlags : `list`, optional 

511 List of refFlags (only reference band) to include in output table. 

512 

513 forcedFlags : `list`, optional 

514 List of flags (per-band) to include in output table. 

515 Taken from the ``forced_src`` dataset if applied to a 

516 multilevel Object Table. Intended for flags from measurement plugins 

517 only run during multi-band forced-photometry. 

518 """ 

519 _defaultRefFlags = [] 

520 _defaultFuncs = () 

521 

522 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None): 

523 self.handles = handles 

524 self.functors = functors 

525 

526 self.filt = filt 

527 self.flags = list(flags) if flags is not None else [] 

528 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else [] 

529 self.refFlags = list(self._defaultRefFlags) 

530 if refFlags is not None: 

531 self.refFlags += list(refFlags) 

532 

533 self._df = None 

534 

535 @property 

536 def defaultFuncs(self): 

537 funcs = dict(self._defaultFuncs) 

538 return funcs 

539 

540 @property 

541 def func(self): 

542 additionalFuncs = self.defaultFuncs 

543 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags}) 

544 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags}) 

545 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags}) 

546 

547 if isinstance(self.functors, CompositeFunctor): 

548 func = self.functors 

549 else: 

550 func = CompositeFunctor(self.functors) 

551 

552 func.funcDict.update(additionalFuncs) 

553 func.filt = self.filt 

554 

555 return func 

556 

557 @property 

558 def noDupCols(self): 

559 return [name for name, func in self.func.funcDict.items() if func.noDup] 

560 

561 @property 

562 def df(self): 

563 if self._df is None: 

564 self.compute() 

565 return self._df 

566 

567 def compute(self, dropna=False, pool=None): 

568 # map over multiple handles 

569 if type(self.handles) in (list, tuple): 

570 if pool is None: 

571 dflist = [self.func(handle, dropna=dropna) for handle in self.handles] 

572 else: 

573 # TODO: Figure out why this doesn't work (pyarrow pickling 

574 # issues?) 

575 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles) 

576 self._df = pd.concat(dflist) 

577 else: 

578 self._df = self.func(self.handles, dropna=dropna) 

579 

580 return self._df 

581 

582 

583class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections, 

584 dimensions=()): 

585 """Expected Connections for subclasses of TransformCatalogBaseTask. 

586 

587 Must be subclassed. 

588 """ 

589 inputCatalog = connectionTypes.Input( 

590 name="", 

591 storageClass="DataFrame", 

592 ) 

593 outputCatalog = connectionTypes.Output( 

594 name="", 

595 storageClass="ArrowAstropy", 

596 ) 

597 

598 

599class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig, 

600 pipelineConnections=TransformCatalogBaseConnections): 

601 functorFile = pexConfig.Field( 

602 dtype=str, 

603 doc="Path to YAML file specifying Science Data Model functors to use " 

604 "when copying columns and computing calibrated values.", 

605 default=None, 

606 optional=True 

607 ) 

608 primaryKey = pexConfig.Field( 

609 dtype=str, 

610 doc="Name of column to be set as the DataFrame index. If None, the index" 

611 "will be named `id`", 

612 default=None, 

613 optional=True 

614 ) 

615 columnsFromDataId = pexConfig.ListField( 

616 dtype=str, 

617 default=None, 

618 optional=True, 

619 doc="Columns to extract from the dataId", 

620 ) 

621 

622 

623class TransformCatalogBaseTask(pipeBase.PipelineTask): 

624 """Base class for transforming/standardizing a catalog by applying functors 

625 that convert units and apply calibrations. 

626 

627 The purpose of this task is to perform a set of computations on an input 

628 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a 

629 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to 

630 a new dataset (which needs to be declared in an ``outputDataset`` 

631 attribute). 

632 

633 The calculations to be performed are defined in a YAML file that specifies 

634 a set of functors to be computed, provided as a ``--functorFile`` config 

635 parameter. An example of such a YAML file is the following: 

636 

637 funcs: 

638 sourceId: 

639 functor: Index 

640 x: 

641 functor: Column 

642 args: slot_Centroid_x 

643 y: 

644 functor: Column 

645 args: slot_Centroid_y 

646 psfFlux: 

647 functor: LocalNanojansky 

648 args: 

649 - slot_PsfFlux_instFlux 

650 - slot_PsfFlux_instFluxErr 

651 - base_LocalPhotoCalib 

652 - base_LocalPhotoCalibErr 

653 psfFluxErr: 

654 functor: LocalNanojanskyErr 

655 args: 

656 - slot_PsfFlux_instFlux 

657 - slot_PsfFlux_instFluxErr 

658 - base_LocalPhotoCalib 

659 - base_LocalPhotoCalibErr 

660 flags: 

661 - detect_isPrimary 

662 

663 The names for each entry under "func" will become the names of columns in 

664 the output dataset. All the functors referenced are defined in 

665 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each 

666 functor are in the `args` list, and any additional entries for each column 

667 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are 

668 treated as keyword arguments to be passed to the functor initialization. 

669 

670 The "flags" entry is the default shortcut for `Column` functors. 

671 All columns listed under "flags" will be copied to the output table 

672 untransformed. They can be of any datatype. 

673 In the special case of transforming a multi-level oject table with 

674 band and dataset indices (deepCoadd_obj), these will be taked from the 

675 ``meas`` dataset and exploded out per band. 

676 

677 There are two special shortcuts that only apply when transforming 

678 multi-level Object (deepCoadd_obj) tables: 

679 - The "refFlags" entry is shortcut for `Column` functor 

680 taken from the ``ref`` dataset if transforming an ObjectTable. 

681 - The "forcedFlags" entry is shortcut for `Column` functors. 

682 taken from the ``forced_src`` dataset if transforming an ObjectTable. 

683 These are expanded out per band. 

684 

685 

686 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object 

687 to organize and excecute the calculations. 

688 """ 

689 @property 

690 def _DefaultName(self): 

691 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.") 

692 

693 @property 

694 def outputDataset(self): 

695 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.") 

696 

697 @property 

698 def inputDataset(self): 

699 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.") 

700 

701 @property 

702 def ConfigClass(self): 

703 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.") 

704 

705 def __init__(self, *args, **kwargs): 

706 super().__init__(*args, **kwargs) 

707 if self.config.functorFile: 

708 self.log.info("Loading tranform functor definitions from %s", 

709 self.config.functorFile) 

710 self.funcs = CompositeFunctor.from_file(self.config.functorFile) 

711 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs)) 

712 else: 

713 self.funcs = None 

714 

715 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

716 inputs = butlerQC.get(inputRefs) 

717 if self.funcs is None: 

718 raise ValueError("config.functorFile is None. " 

719 "Must be a valid path to yaml in order to run Task as a PipelineTask.") 

720 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs, 

721 dataId=dict(outputRefs.outputCatalog.dataId.mapping)) 

722 butlerQC.put(result, outputRefs) 

723 

724 def run(self, handle, funcs=None, dataId=None, band=None): 

725 """Do postprocessing calculations 

726 

727 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or 

728 ``DataFrame`` object and dataId, 

729 returns a dataframe with results of postprocessing calculations. 

730 

731 Parameters 

732 ---------- 

733 handles : `~lsst.daf.butler.DeferredDatasetHandle` or 

734 `~lsst.pipe.base.InMemoryDatasetHandle` or 

735 `~pandas.DataFrame`, or list of these. 

736 DataFrames from which calculations are done. 

737 funcs : `~lsst.pipe.tasks.functors.Functor` 

738 Functors to apply to the table's columns 

739 dataId : dict, optional 

740 Used to add a `patchId` column to the output dataframe. 

741 band : `str`, optional 

742 Filter band that is being processed. 

743 

744 Returns 

745 ------- 

746 result : `lsst.pipe.base.Struct` 

747 Result struct, with a single ``outputCatalog`` attribute holding 

748 the transformed catalog. 

749 """ 

750 self.log.info("Transforming/standardizing the source table dataId: %s", dataId) 

751 

752 df = self.transform(band, handle, funcs, dataId).df 

753 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df)) 

754 

755 if len(df) == 0: 

756 raise UpstreamFailureNoWorkFound( 

757 "Input catalog is empty, so there is nothing to transform/standardize", 

758 ) 

759 

760 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df)) 

761 return result 

762 

763 def getFunctors(self): 

764 return self.funcs 

765 

766 def getAnalysis(self, handles, funcs=None, band=None): 

767 if funcs is None: 

768 funcs = self.funcs 

769 analysis = PostprocessAnalysis(handles, funcs, filt=band) 

770 return analysis 

771 

772 def transform(self, band, handles, funcs, dataId): 

773 analysis = self.getAnalysis(handles, funcs=funcs, band=band) 

774 df = analysis.df 

775 if dataId and self.config.columnsFromDataId: 

776 for key in self.config.columnsFromDataId: 

777 if key in dataId: 

778 if key == "detector": 

779 # int16 instead of uint8 because databases don't like unsigned bytes. 

780 df[key] = np.int16(dataId[key]) 

781 else: 

782 df[key] = dataId[key] 

783 else: 

784 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}") 

785 

786 if self.config.primaryKey: 

787 if df.index.name != self.config.primaryKey and self.config.primaryKey in df: 

788 df.reset_index(inplace=True, drop=True) 

789 df.set_index(self.config.primaryKey, inplace=True) 

790 

791 return pipeBase.Struct( 

792 df=df, 

793 analysis=analysis 

794 ) 

795 

796 

797class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections, 

798 defaultTemplates={"coaddName": "deep"}, 

799 dimensions=("tract", "patch", "skymap")): 

800 inputCatalog = connectionTypes.Input( 

801 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, " 

802 "stored as a DataFrame with a multi-level column index per-patch.", 

803 dimensions=("tract", "patch", "skymap"), 

804 storageClass="DataFrame", 

805 name="{coaddName}Coadd_obj", 

806 deferLoad=True, 

807 ) 

808 inputCatalogRef = connectionTypes.Input( 

809 doc="Catalog marking the primary detection (which band provides a good shape and position)" 

810 "for each detection in deepCoadd_mergeDet.", 

811 dimensions=("tract", "patch", "skymap"), 

812 storageClass="SourceCatalog", 

813 name="{coaddName}Coadd_ref", 

814 deferLoad=True, 

815 ) 

816 inputCatalogExpMultiprofit = connectionTypes.Input( 

817 doc="Catalog of multiband Exponential fits.", 

818 dimensions=("tract", "patch", "skymap"), 

819 storageClass="ArrowAstropy", 

820 name="{coaddName}Coadd_Exp_multiprofit", 

821 deferLoad=True, 

822 ) 

823 inputCatalogSersicMultiprofit = connectionTypes.Input( 

824 doc="Catalog of multiband Sersic fits.", 

825 dimensions=("tract", "patch", "skymap"), 

826 storageClass="ArrowAstropy", 

827 name="{coaddName}Coadd_Sersic_multiprofit", 

828 deferLoad=True, 

829 ) 

830 inputCatalogEpoch = connectionTypes.Input( 

831 doc="Catalog of mean epochs for each object per band.", 

832 dimensions=("tract", "patch", "skymap"), 

833 storageClass="ArrowAstropy", 

834 name="object_epoch", 

835 deferLoad=True, 

836 ) 

837 outputCatalog = connectionTypes.Output( 

838 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard " 

839 "data model.", 

840 dimensions=("tract", "patch", "skymap"), 

841 storageClass="ArrowAstropy", 

842 name="objectTable" 

843 ) 

844 

845 def __init__(self, *, config=None): 

846 super().__init__(config=config) 

847 if config.multilevelOutput: 

848 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame") 

849 

850 

851class TransformObjectCatalogConfig(TransformCatalogBaseConfig, 

852 pipelineConnections=TransformObjectCatalogConnections): 

853 coaddName = pexConfig.Field( 

854 dtype=str, 

855 default="deep", 

856 doc="Name of coadd" 

857 ) 

858 outputBands = pexConfig.ListField( 

859 dtype=str, 

860 default=None, 

861 optional=True, 

862 doc=("These bands and only these bands will appear in the output," 

863 " NaN-filled if the input does not include them." 

864 " If None, then use all bands found in the input.") 

865 ) 

866 camelCase = pexConfig.Field( 

867 dtype=bool, 

868 default=False, 

869 doc=("Write per-band columns names with camelCase, else underscore " 

870 "For example: gPsFlux instead of g_PsFlux.") 

871 ) 

872 multilevelOutput = pexConfig.Field( 

873 dtype=bool, 

874 default=False, 

875 doc=("Whether results dataframe should have a multilevel column index (True) or be flat " 

876 "and name-munged (False). If True, the output storage class will be " 

877 "set to DataFrame, since astropy tables do not support multi-level indexing."), 

878 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.", 

879 ) 

880 goodFlags = pexConfig.ListField( 

881 dtype=str, 

882 default=[], 

883 doc=("List of 'good' flags that should be set False when populating empty tables. " 

884 "All other flags are considered to be 'bad' flags and will be set to True.") 

885 ) 

886 floatFillValue = pexConfig.Field( 

887 dtype=float, 

888 default=np.nan, 

889 doc="Fill value for float fields when populating empty tables." 

890 ) 

891 integerFillValue = pexConfig.Field( 

892 dtype=int, 

893 default=-1, 

894 doc="Fill value for integer fields when populating empty tables." 

895 ) 

896 

897 def setDefaults(self): 

898 super().setDefaults() 

899 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml") 

900 self.primaryKey = "objectId" 

901 self.columnsFromDataId = ["tract", "patch"] 

902 self.goodFlags = ["calib_astrometry_used", 

903 "calib_photometry_reserved", 

904 "calib_photometry_used", 

905 "calib_psf_candidate", 

906 "calib_psf_reserved", 

907 "calib_psf_used"] 

908 

909 

910class TransformObjectCatalogTask(TransformCatalogBaseTask): 

911 """Produce a flattened Object Table to match the format specified in 

912 sdm_schemas. 

913 

914 Do the same set of postprocessing calculations on all bands. 

915 

916 This is identical to `TransformCatalogBaseTask`, except for that it does 

917 the specified functor calculations for all filters present in the 

918 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified 

919 by the YAML file will be superceded. 

920 """ 

921 _DefaultName = "transformObjectCatalog" 

922 ConfigClass = TransformObjectCatalogConfig 

923 

924 # ref must go first because other datasets may need columns from it 

925 datasets_multiband = ("ref", "epoch", "Exp_multiprofit", "Sersic_multiprofit") 

926 

927 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

928 inputs = butlerQC.get(inputRefs) 

929 if self.funcs is None: 

930 raise ValueError("config.functorFile is None. " 

931 "Must be a valid path to yaml in order to run Task as a PipelineTask.") 

932 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs, 

933 dataId=dict(outputRefs.outputCatalog.dataId.mapping), 

934 handle_ref=inputs["inputCatalogRef"], 

935 handle_epoch=inputs["inputCatalogEpoch"], 

936 handle_Exp_multiprofit=inputs["inputCatalogExpMultiprofit"], 

937 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"], 

938 ) 

939 butlerQC.put(result, outputRefs) 

940 

941 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs): 

942 # NOTE: band kwarg is ignored here. 

943 # TODO: Document and improve funcs argument usage in DM-48895 

944 # self.getAnalysis only supports list, dict and CompositeFunctor 

945 if isinstance(funcs, CompositeFunctor): 

946 funcDict_in = funcs.funcDict 

947 elif isinstance(funcs, dict): 

948 funcDict_in = funcs 

949 elif isinstance(funcs, list): 

950 funcDict_in = {idx: v for idx, v in enumerate(funcs)} 

951 else: 

952 raise TypeError(f"Unsupported {type(funcs)=}") 

953 

954 handles_multi = {} 

955 funcDicts_multiband = {} 

956 for dataset in self.datasets_multiband: 

957 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None: 

958 raise RuntimeError(f"Missing required handle_{dataset} kwarg") 

959 handles_multi[dataset] = handle_multi 

960 funcDicts_multiband[dataset] = {} 

961 

962 dfDict = {} 

963 analysisDict = {} 

964 templateDf = pd.DataFrame() 

965 

966 columns = handle.get(component="columns") 

967 inputBands = columns.unique(level=1).values 

968 

969 outputBands = self.config.outputBands if self.config.outputBands else inputBands 

970 

971 # Split up funcs for per-band and multiband tables 

972 funcDict_band = {} 

973 # Check if dataset needs to add any meas table columns 

974 columns_add_from_ref = defaultdict(list) 

975 

976 for name, func in funcDict_in.items(): 

977 if func.dataset in funcDicts_multiband: 

978 # This is something like a MultibandColumn 

979 if band := getattr(func, "band_to_check", None): 

980 if band not in outputBands: 

981 continue 

982 # This is something like a ReferenceBand that has configurable bands 

983 elif hasattr(func, "bands"): 

984 # TODO: Determine if this can be avoided DM-48895 

985 # This will work fine if the init doesn't manipulate bands 

986 # If it does, then one would need to make a new functor 

987 # Determining the (kw)args is tricky in that case 

988 func.bands = tuple(inputBands) 

989 if hasattr(func, "columns_ref"): 

990 columns_add_from_ref[func.dataset].extend(func.columns_ref) 

991 

992 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band) 

993 funcDict[name] = func 

994 

995 funcs_band = CompositeFunctor(funcDict_band) 

996 

997 # Perform transform for data of filters that exist in the handle dataframe. 

998 for inputBand in inputBands: 

999 if inputBand not in outputBands: 

1000 self.log.info("Ignoring %s band data in the input", inputBand) 

1001 continue 

1002 self.log.info("Transforming the catalog of band %s", inputBand) 

1003 result = self.transform(inputBand, handle, funcs_band, dataId) 

1004 dfDict[inputBand] = result.df 

1005 analysisDict[inputBand] = result.analysis 

1006 if templateDf.empty: 

1007 templateDf = result.df 

1008 

1009 # Put filler values in columns of other wanted bands 

1010 for filt in outputBands: 

1011 if filt not in dfDict: 

1012 self.log.info("Adding empty columns for band %s", filt) 

1013 dfTemp = templateDf.copy() 

1014 for col in dfTemp.columns: 

1015 testValue = dfTemp[col].values[0] 

1016 if isinstance(testValue, (np.bool_, pd.BooleanDtype)): 

1017 # Boolean flag type, check if it is a "good" flag 

1018 if col in self.config.goodFlags: 

1019 fillValue = False 

1020 else: 

1021 fillValue = True 

1022 elif isinstance(testValue, numbers.Integral): 

1023 # Checking numbers.Integral catches all flavors 

1024 # of python, numpy, pandas, etc. integers. 

1025 # We must ensure this is not an unsigned integer. 

1026 if isinstance(testValue, np.unsignedinteger): 

1027 raise ValueError("Parquet tables may not have unsigned integer columns.") 

1028 else: 

1029 fillValue = self.config.integerFillValue 

1030 else: 

1031 fillValue = self.config.floatFillValue 

1032 dfTemp[col].values[:] = fillValue 

1033 dfDict[filt] = dfTemp 

1034 

1035 # This makes a multilevel column index, with band as first level 

1036 df = pd.concat(dfDict, axis=1, names=["band", "column"]) 

1037 name_index = df.index.name 

1038 

1039 # TODO: Remove in DM-48895 

1040 if not self.config.multilevelOutput: 

1041 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()])) 

1042 if self.config.primaryKey in noDupCols: 

1043 noDupCols.remove(self.config.primaryKey) 

1044 if dataId and self.config.columnsFromDataId: 

1045 noDupCols += self.config.columnsFromDataId 

1046 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase, 

1047 inputBands=inputBands) 

1048 

1049 # This will be assigned by running ref first 

1050 df_ref = None 

1051 

1052 # Apply per-dataset functors to each multiband dataset in turn 

1053 for dataset, funcDict in funcDicts_multiband.items(): 

1054 handle_multiband = handles_multi[dataset] 

1055 df_dataset = handle_multiband.get() 

1056 if isinstance(df_dataset, astropy.table.Table): 

1057 # Allow astropy table inputs to already have the output index 

1058 if name_index not in df_dataset.colnames: 

1059 if self.config.primaryKey in df_dataset.colnames: 

1060 name_index_ap = self.config.primaryKey 

1061 else: 

1062 raise RuntimeError( 

1063 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in" 

1064 f" {df_dataset.colnames=} for {dataset=}" 

1065 ) 

1066 else: 

1067 name_index_ap = name_index 

1068 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False) 

1069 elif isinstance(df_dataset, afwTable.SourceCatalog): 

1070 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False) 

1071 

1072 if dataset == "ref": 

1073 df_ref = df_dataset 

1074 else: 

1075 if df_ref is None: 

1076 raise RuntimeError(f"ref must be the first dataset, not {dataset}") 

1077 # Add columns as if they're a sorted set 

1078 for column in {key: None for key in columns_add_from_ref.get(dataset, [])}: 

1079 df_dataset[column] = df_ref[column] 

1080 

1081 # TODO: should funcDict have noDup funcs removed? 

1082 # noDup was intended for per-band tables. 

1083 result = self.transform( 

1084 None, 

1085 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"), 

1086 CompositeFunctor(funcDict), 

1087 dataId, 

1088 ) 

1089 result.df.index.name = name_index 

1090 # Drop columns from dataId if present (patch, tract) 

1091 if self.config.columnsFromDataId: 

1092 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df] 

1093 if columns_drop: 

1094 result.df.drop(columns_drop, axis=1, inplace=True) 

1095 # Make the same multi-index for the multiband table if needed 

1096 # This might end up making copies, one of several reasons to avoid 

1097 # using multilevel indexes, or DataFrames at all 

1098 to_concat = pd.concat( 

1099 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"] 

1100 ) if self.config.multilevelOutput else result.df 

1101 df = pd.concat([df, to_concat], axis=1) 

1102 analysisDict[dataset] = result.analysis 

1103 del result 

1104 

1105 df.index.name = self.config.primaryKey 

1106 

1107 if not self.config.multilevelOutput: 

1108 tbl = pandas_to_astropy(df) 

1109 else: 

1110 tbl = df 

1111 

1112 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl)) 

1113 

1114 return pipeBase.Struct(outputCatalog=tbl) 

1115 

1116 

1117class ComputeColumnsAction(ConfigurableAction): 

1118 """An action that computes multiple vectors from an input. 

1119 

1120 This class is meant to be compatible with analysis_tools' 

1121 AnalysisAction class, which cannot be a dependency of pipe_tasks.""" 

1122 

1123 def getInputSchema(self) -> dict[str, type[NDArray]]: 

1124 """Return the required inputs for this action. 

1125 

1126 This function is meant to be compatible with 

1127 """ 

1128 raise NotImplementedError("This method must be overloaded in subclasses") 

1129 

1130 def __call__(self, table: astropy.table.Table) -> dict[str, NDArray]: 

1131 """This method must return a dict of computed columns.""" 

1132 raise NotImplementedError("This method must be overloaded in subclasses") 

1133 

1134 

1135class ExtendednessColumnActionBase(ComputeColumnsAction): 

1136 bands = pexConfig.ListField[str]( 

1137 doc="The bands to make single-band outputs for.", 

1138 default=["u", "g", "r", "i", "z", "y"] 

1139 ) 

1140 bands_combined = pexConfig.DictField[str, str]( 

1141 doc="Multiband classification column specialization. Keys specify the" 

1142 " name of the column and values are a comma-separated list of" 

1143 " bands, all of which must be contained in the bands listed.", 

1144 default={"griz": "g,r,i,z"}, 

1145 itemCheck=lambda x: (len(y := x.split(",")) > 1) & (len(set(y)) == len(y)), 

1146 ) 

1147 model_column_flux = pexConfig.Field[str]( 

1148 doc="The model flux column to use for computing the difference to" 

1149 " to the S/N flux. Must contain the {band} and {model} templates.", 

1150 default="{band}_{model}Flux", 

1151 check=lambda x: ("{band}" in x) and ("{model}" in x,), 

1152 ) 

1153 model_column_flux_err = pexConfig.Field[str]( 

1154 doc="The model flux error column to use for computing the difference" 

1155 " to the S/N flux. Must contain the {band} and {model} templates.", 

1156 default="{band}_{model}FluxErr", 

1157 check=lambda x: ("{band}" in x) and ("{model}" in x,), 

1158 ) 

1159 model_flux_name = pexConfig.Field[str]( 

1160 doc="The extended object model to use to compared to PSF model fluxes", 

1161 default="sersic", 

1162 ) 

1163 output_column = pexConfig.Field[str]( 

1164 doc="Name of the output column. Must contain the {band} template", 

1165 default="{band}_model_extendedness", 

1166 check=lambda x: "{band}" in x, 

1167 ) 

1168 psf_column_flux = pexConfig.Field[str]( 

1169 doc="The name of the PSF flux column. Must contain the {band} template.", 

1170 default="{band}_psfFlux", 

1171 check=lambda x: "{band}" in x, 

1172 ) 

1173 psf_column_flux_err = pexConfig.Field[str]( 

1174 doc="The name of the PSF flux error column. Must contain the {band} template.", 

1175 default="{band}_psfFluxErr", 

1176 check=lambda x: "{band}" in x, 

1177 ) 

1178 size_column = pexConfig.Field[str]( 

1179 doc="The column to use for applying size cuts. Must contain the {axis} template.", 

1180 default="exponential_reff_{axis}", 

1181 ) 

1182 

1183 def getInputSchema(self) -> Iterable[tuple[str, type[NDArray]]]: 

1184 size_column = self.size_column 

1185 schema = [ 

1186 (size_column.format(axis=axis), NDArray[float]) for axis in ("x", "y") 

1187 ] 

1188 model = self.model_flux_name 

1189 for column in ( 

1190 self.psf_column_flux, self.psf_column_flux_err, 

1191 self.model_column_flux, self.model_column_flux_err, 

1192 ): 

1193 schema.extend([ 

1194 (column.format(band=band, model=model), NDArray[float]) for band in self.bands 

1195 ]) 

1196 

1197 return schema 

1198 

1199 def validate(self): 

1200 super().validate() 

1201 errors = [] 

1202 for name, band_combined in self.bands_combined.items(): 

1203 bands = band_combined.split(",") 

1204 bands_missing = [band for band in bands if band not in self.bands] 

1205 if bands_missing: 

1206 errors.append( 

1207 f"self.bands_combined[{name}] contains bands={bands_missing} not in {self.bands=}" 

1208 ) 

1209 if errors: 

1210 raise ValueError(f"Validation failed due to errors: {'; '.join(errors)}") 

1211 

1212 def _get_fluxes(self, table, band: str): 

1213 model = self.model_flux_name 

1214 flux_psf, fluxerr_psf, flux_model, fluxerr_model = ( 

1215 np.array(table[column.format(band=band, model=model)]) 

1216 for column in ( 

1217 self.psf_column_flux, self.psf_column_flux_err, 

1218 self.model_column_flux, self.model_column_flux_err, 

1219 ) 

1220 ) 

1221 return flux_psf, fluxerr_psf, flux_model, fluxerr_model 

1222 

1223 

1224class ModelExtendednessColumnAction(ExtendednessColumnActionBase): 

1225 fluxerr_coefficent = pexConfig.Field[float]( 

1226 doc="The coefficient to multiply the flux error by when adding to the model flux.", 

1227 default=0.5, 

1228 check=lambda x: x >= 0, 

1229 ) 

1230 fluxerr_stretch = pexConfig.Field[float]( 

1231 doc="The factor to multiply flux error-scaled ratios by to derive extendedness.", 

1232 default=5.0, 

1233 check=lambda x: x > 0, 

1234 ) 

1235 good_sn_min = pexConfig.Field[float]( 

1236 doc="Minimum PSF S/N to include objects if" 

1237 " min_n_good_to_shift_flux_ratio is > 0; ignored otherwise.", 

1238 default=10., 

1239 ) 

1240 max_reff_compact = pexConfig.Field[float]( 

1241 doc="The maximum effective radius in pixels below which an object is" 

1242 " classified as not extended, regardless of other parameter values.", 

1243 default=0.25, 

1244 ) 

1245 min_n_good_to_shift_flux_ratio = pexConfig.Field[int]( 

1246 doc="Minimum number of objects with PSF S/N > good_sn_min and with " 

1247 " size larger than max_reff_compact to use to compute the median " 

1248 " PSF-to-model flux ratio, which is assumed to be 1 otherwise." 

1249 " If this value is not >0, the median flux ratio will be kept 1.", 

1250 default=0, 

1251 ) 

1252 

1253 def __call__(self, table: astropy.table.Table) -> dict[str, NDArray]: 

1254 size_column = self.size_column 

1255 size_model = np.sqrt( 

1256 0.5*(table[size_column.format(axis='x')]**2 + table[size_column.format(axis='y')]**2) 

1257 ) 

1258 small = size_model < self.max_reff_compact 

1259 n_obj = len(table) 

1260 band_mappings_to_process = {band: [band] for band in self.bands} 

1261 band_mappings_to_process.update({k: v.split(",") for k, v in self.bands_combined.items()}) 

1262 output = {} 

1263 for output_band, input_bands in band_mappings_to_process.items(): 

1264 if len(input_bands) > 1: 

1265 flux_psf, fluxerr_psf_sq, flux_model, fluxerr_model_sq = ( 

1266 np.zeros(n_obj, dtype=float) for _ in range(4)) 

1267 for input_band in input_bands: 

1268 flux_psf_b, fluxerr_psf_b, flux_model_b, fluxerr_model_b = self._get_fluxes( 

1269 table, band=input_band) 

1270 # There's no point adding S/N < 0 fluxes 

1271 good = np.isfinite(flux_psf_b) & np.isfinite(flux_model_b) & ( 

1272 flux_psf_b > 0) & (flux_model_b > 0) & (fluxerr_psf_b > 0) & (fluxerr_model_b > 0) 

1273 flux_psf[good] += flux_psf_b[good] 

1274 flux_model[good] += flux_model_b[good] 

1275 fluxerr_psf_sq[good] += fluxerr_psf_b[good]**2 

1276 fluxerr_model_sq[good] += fluxerr_model_b[good]**2 

1277 fluxerr_psf = np.sqrt(fluxerr_psf_sq) 

1278 fluxerr_model = np.sqrt(fluxerr_model_sq) 

1279 fluxerr_psf[fluxerr_psf == 0] = np.inf 

1280 fluxerr_model[fluxerr_model == 0] = np.inf 

1281 else: 

1282 flux_psf, fluxerr_psf, flux_model, fluxerr_model = self._get_fluxes( 

1283 table, band=input_bands[0]) 

1284 

1285 psf_sn = flux_psf/fluxerr_psf 

1286 flux_ratio = np.array(flux_psf / flux_model) 

1287 

1288 if self.min_n_good_to_shift_flux_ratio > 0: 

1289 good = small & (psf_sn > self.good_sn_min) 

1290 # Attempt to correct any flux-independent systematic offset 

1291 # Might need to be a function of S/N 

1292 if np.sum(good == True) > self.min_n_good_to_shift_flux_ratio: # noqa: E712 

1293 flux_ratio *= 1./np.nanmedian(flux_ratio[good]) 

1294 

1295 flux_ratio_err = np.sqrt( 

1296 (fluxerr_psf/flux_model)**2 + (fluxerr_model*fluxerr_psf/flux_model**2)**2 

1297 ) 

1298 extendedness = (1 - flux_ratio) + self.fluxerr_coefficent*flux_ratio_err 

1299 extendedness *= np.sqrt(size_model/self.max_reff_compact) 

1300 extendedness[(extendedness < 0) & (extendedness > -np.inf)] = 0 

1301 # Make it sigmoid-like with a stretch 

1302 stretch = self.fluxerr_stretch 

1303 extendedness *= stretch 

1304 extendedness = np.clip((stretch + 1)/stretch*extendedness/(1 + extendedness), 0, 1) 

1305 

1306 column_out = self.output_column.format(band=output_band) 

1307 output[column_out] = extendedness 

1308 return output 

1309 

1310 

1311class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections, 

1312 dimensions=("tract", "skymap")): 

1313 inputCatalogs = connectionTypes.Input( 

1314 doc="Per-Patch objectTables conforming to the standard data model.", 

1315 name="objectTable", 

1316 storageClass="ArrowAstropy", 

1317 dimensions=("tract", "patch", "skymap"), 

1318 multiple=True, 

1319 deferLoad=True, 

1320 ) 

1321 outputCatalog = connectionTypes.Output( 

1322 doc="Pre-tract horizontal concatenation of the input objectTables", 

1323 name="objectTable_tract", 

1324 storageClass="ArrowAstropy", 

1325 dimensions=("tract", "skymap"), 

1326 ) 

1327 

1328 

1329class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig, 

1330 pipelineConnections=ConsolidateObjectTableConnections): 

1331 actions = ConfigurableActionStructField[ComputeColumnsAction]( 

1332 doc="Actions to add columns to the final object table", 

1333 ) 

1334 coaddName = pexConfig.Field[str]( 

1335 default="deep", 

1336 doc="Name of coadd" 

1337 ) 

1338 

1339 def setDefaults(self): 

1340 super().setDefaults() 

1341 self.actions.extendedness = ModelExtendednessColumnAction() 

1342 

1343 

1344class ConsolidateObjectTableTask(pipeBase.PipelineTask): 

1345 """Write patch-merged source tables to a tract-level DataFrame Parquet file. 

1346 

1347 Concatenates `objectTable` list into a per-visit `objectTable_tract`. 

1348 """ 

1349 _DefaultName = "consolidateObjectTable" 

1350 ConfigClass = ConsolidateObjectTableConfig 

1351 

1352 inputDataset = "objectTable" 

1353 outputDataset = "objectTable_tract" 

1354 

1355 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1356 inputs = butlerQC.get(inputRefs) 

1357 self.log.info("Concatenating %s per-patch Object Tables", 

1358 len(inputs["inputCatalogs"])) 

1359 table = TableVStack.vstack_handles(inputs["inputCatalogs"]) 

1360 for action in self.config.actions: 

1361 computed = action(table) 

1362 for key, values in computed.items(): 

1363 table[key] = values.astype(np.float32) if values.dtype == np.float64 else values 

1364 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs) 

1365 

1366 

1367class TransformSourceTableConnections(pipeBase.PipelineTaskConnections, 

1368 defaultTemplates={"catalogType": ""}, 

1369 dimensions=("instrument", "visit", "detector")): 

1370 

1371 inputCatalog = connectionTypes.Input( 

1372 doc="Wide input catalog of sources produced by WriteSourceTableTask", 

1373 name="{catalogType}source", 

1374 storageClass="DataFrame", 

1375 dimensions=("instrument", "visit", "detector"), 

1376 deferLoad=True 

1377 ) 

1378 outputCatalog = connectionTypes.Output( 

1379 doc="Narrower, per-detector Source Table transformed and converted per a " 

1380 "specified set of functors", 

1381 name="{catalogType}sourceTable", 

1382 storageClass="ArrowAstropy", 

1383 dimensions=("instrument", "visit", "detector") 

1384 ) 

1385 

1386 

1387class TransformSourceTableConfig(TransformCatalogBaseConfig, 

1388 pipelineConnections=TransformSourceTableConnections): 

1389 

1390 def setDefaults(self): 

1391 super().setDefaults() 

1392 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml") 

1393 self.primaryKey = "sourceId" 

1394 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"] 

1395 

1396 

1397class TransformSourceTableTask(TransformCatalogBaseTask): 

1398 """Transform/standardize a source catalog 

1399 """ 

1400 _DefaultName = "transformSourceTable" 

1401 ConfigClass = TransformSourceTableConfig 

1402 

1403 

1404class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections, 

1405 dimensions=("instrument", "visit",), 

1406 defaultTemplates={"calexpType": ""}): 

1407 camera = connectionTypes.PrerequisiteInput( 

1408 doc="Camera geometry.", 

1409 name="camera", 

1410 dimensions=("instrument",), 

1411 storageClass="Camera", 

1412 isCalibration=True, 

1413 ) 

1414 calexp = connectionTypes.Input( 

1415 doc="Processed exposures used for metadata", 

1416 name="calexp", 

1417 storageClass="ExposureF", 

1418 dimensions=("instrument", "visit", "detector"), 

1419 deferLoad=True, 

1420 multiple=True, 

1421 ) 

1422 visitSummary = connectionTypes.Output( 

1423 doc=("Per-visit consolidated exposure metadata. These catalogs use " 

1424 "detector id for the id and are sorted for fast lookups of a " 

1425 "detector."), 

1426 name="visitSummary", 

1427 storageClass="ExposureCatalog", 

1428 dimensions=("instrument", "visit"), 

1429 ) 

1430 visitSummarySchema = connectionTypes.InitOutput( 

1431 doc="Schema of the visitSummary catalog", 

1432 name="visitSummary_schema", 

1433 storageClass="ExposureCatalog", 

1434 ) 

1435 # RefitPointingTask options and connections use snake_case for consistency 

1436 # with options of the same name in UpdateVisitSummaryTask, which seems more 

1437 # important than consistently using camelCase within this task. 

1438 visit_geometry = connectionTypes.Output( 

1439 doc="Updated visit[, detector] regions that can be used to update butler dimensions records.", 

1440 name="visit_geometry", 

1441 dimensions=("instrument", "visit"), 

1442 storageClass="VisitGeometry", 

1443 ) 

1444 

1445 def __init__(self, *, config=None): 

1446 if self.config.do_refit_pointing: 

1447 self.camera = dataclasses.replace(self.camera, dimensions=config.cameraDimensions) 

1448 else: 

1449 del self.camera 

1450 if not self.config.do_write_visit_geometry: 

1451 del self.visit_geometry 

1452 

1453 

1454class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig, 

1455 pipelineConnections=ConsolidateVisitSummaryConnections): 

1456 """Config for ConsolidateVisitSummaryTask""" 

1457 

1458 full = pexConfig.Field( 

1459 "Whether to propate all exposure components. " 

1460 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file " 

1461 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable" 

1462 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.", 

1463 dtype=bool, 

1464 default=False, 

1465 ) 

1466 refitPointing = pexConfig.ConfigurableField( 

1467 "A subtask for refitting the boresight pointing and orientation, " 

1468 "and using those to produce new regions for butler dimensions.", 

1469 target=RefitPointingTask, 

1470 ) 

1471 cameraDimensions = pexConfig.ListField( 

1472 "The dimensions of the 'camera' prerequisite input connection.", 

1473 dtype=str, 

1474 default=["instrument"], 

1475 ) 

1476 do_refit_pointing = pexConfig.Field( 

1477 "Whether to re-fit the pointing model.", 

1478 dtype=bool, 

1479 default=True, 

1480 ) 

1481 do_write_visit_geometry = pexConfig.Field( 

1482 "Whether to write refit-pointing regions that can be used to update butler dimension records.", 

1483 dtype=bool, 

1484 default=True, 

1485 ) 

1486 

1487 def validate(self): 

1488 super().validate() 

1489 if self.do_write_visit_geometry and not self.do_refit_pointing: 

1490 raise ValueError("Cannot write visit_geometry without refitting the pointing.") 

1491 

1492 def setDefaults(self): 

1493 super().setDefaults() 

1494 # This prevents a conflict with the fields added by the 

1495 # UpdateVisitSummaryTask invocation of RefitPointingTask. 

1496 self.refitPointing.schema_prefix = "preliminary_" 

1497 

1498 

1499class ConsolidateVisitSummaryTask(pipeBase.PipelineTask): 

1500 """Task to consolidate per-detector visit metadata. 

1501 

1502 This task aggregates the following metadata from all the detectors in a 

1503 single visit into an exposure catalog: 

1504 - The visitInfo. 

1505 - The wcs. 

1506 - The photoCalib. 

1507 - The physical_filter and band (if available). 

1508 - The PSF model. 

1509 - The aperture correction map. 

1510 - The transmission curve. 

1511 - The psf size, shape, and effective area at the center of the detector. 

1512 - The corners of the bounding box in right ascension/declination. 

1513 

1514 Tests for this task are performed in ci_hsc_gen3. 

1515 """ 

1516 _DefaultName = "consolidateVisitSummary" 

1517 ConfigClass = ConsolidateVisitSummaryConfig 

1518 

1519 def __init__(self, **kwargs): 

1520 super().__init__(**kwargs) 

1521 self.schema = afwTable.ExposureTable.makeMinimalSchema() 

1522 self.schema.addField("visit", type="L", doc="Visit number") 

1523 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter") 

1524 self.schema.addField("band", type="String", size=32, doc="Name of band") 

1525 ExposureSummaryStats.update_schema(self.schema) 

1526 if self.config.do_refit_pointing: 

1527 self.makeSubtask("refitPointing", schema=self.schema) 

1528 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema) 

1529 

1530 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1531 handles = butlerQC.get(inputRefs.calexp) 

1532 visit = handles[0].dataId["visit"] 

1533 

1534 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)", 

1535 len(handles), visit) 

1536 

1537 camera = butlerQC.get(inputRefs.camera) if self.config.do_refit_pointing else None 

1538 

1539 result = pipeBase.Struct() 

1540 try: 

1541 self.run(visit=visit, handles=handles, camera=camera, result=result) 

1542 except pipeBase.AlgorithmError as e: 

1543 error = pipeBase.AnnotatedPartialOutputsError.annotate( 

1544 e, self, result.visitSummary, log=self.log 

1545 ) 

1546 butlerQC.put(result, outputRefs) 

1547 raise error from e 

1548 

1549 butlerQC.put(result, outputRefs) 

1550 

1551 def run(self, *, visit, handles, camera=None, result=None): 

1552 """Make a combined exposure catalog from a list of handles. 

1553 These handles must point to exposures with wcs, summaryStats, 

1554 and other visit metadata. 

1555 

1556 Parameters 

1557 ---------- 

1558 visit : `int` 

1559 Visit identification number. 

1560 handles : `list` of `lsst.daf.butler.DeferredDatasetHandle` 

1561 List of handles in visit. 

1562 camera : `lsst.afw.cameraGeom.Camera`, optional 

1563 Camera geometry. Required if and only if 

1564 ``do_refit_pointing=True``. 

1565 result : `lsst.pipe.base.Struct`, optional 

1566 Output struct to modify in-place. 

1567 

1568 Returns 

1569 ------- 

1570 result : `lsst.pipe.base.Struct` 

1571 Struct with the following attributes: 

1572 

1573 - ``visitSummary`` (`lsst.afw.table.ExposureCatalog`): an Exposure 

1574 catalog with per-detector summary information. 

1575 - ``visit_geometry`` (`lsst.obs.base.visit_geometry.VisitGeometry`): 

1576 Regions that can be used to update butler dimension regions for 

1577 this visit. Only present if ``do_refit_pointing=True``. 

1578 """ 

1579 cat = afwTable.ExposureCatalog(self.schema) 

1580 cat.resize(len(handles)) 

1581 

1582 cat["visit"] = visit 

1583 

1584 for i, dataRef in enumerate(handles): 

1585 visitInfo = dataRef.get(component="visitInfo") 

1586 filterLabel = dataRef.get(component="filter") 

1587 summaryStats = dataRef.get(component="summaryStats") 

1588 detector = dataRef.get(component="detector") 

1589 wcs = dataRef.get(component="wcs") 

1590 photoCalib = dataRef.get(component="photoCalib") 

1591 bbox = dataRef.get(component="bbox") 

1592 validPolygon = dataRef.get(component="validPolygon") 

1593 

1594 rec = cat[i] 

1595 rec.setBBox(bbox) 

1596 rec.setVisitInfo(visitInfo) 

1597 rec.setWcs(wcs) 

1598 rec.setPhotoCalib(photoCalib) 

1599 rec.setValidPolygon(validPolygon) 

1600 

1601 if self.config.full: 

1602 rec.setPsf(dataRef.get(component="psf")) 

1603 rec.setApCorrMap(dataRef.get(component="apCorrMap")) 

1604 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve")) 

1605 

1606 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else "" 

1607 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else "" 

1608 rec.setId(detector.getId()) 

1609 summaryStats.update_record(rec) 

1610 

1611 if not cat: 

1612 raise pipeBase.NoWorkFound( 

1613 "No detectors had sufficient information to make a visit summary row." 

1614 ) 

1615 

1616 metadata = dafBase.PropertyList() 

1617 metadata.add("COMMENT", "Catalog id is detector id, sorted.") 

1618 # We are looping over existing handles, so the following is true 

1619 metadata.add("COMMENT", "Only detectors with data have entries.") 

1620 cat.setMetadata(metadata) 

1621 

1622 cat.sort() 

1623 

1624 if result is None: 

1625 result = pipeBase.Struct() 

1626 result.visitSummary = cat 

1627 

1628 if self.config.do_refit_pointing: 

1629 refitPointingResult = self.refitPointing.run(catalog=cat, camera=camera) 

1630 result.visit_geometry = refitPointingResult.regions 

1631 

1632 return result 

1633 

1634 

1635class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections, 

1636 defaultTemplates={"catalogType": ""}, 

1637 dimensions=("instrument", "visit")): 

1638 inputCatalogs = connectionTypes.Input( 

1639 doc="Input per-detector Source Tables", 

1640 name="{catalogType}sourceTable", 

1641 storageClass="ArrowAstropy", 

1642 dimensions=("instrument", "visit", "detector"), 

1643 multiple=True, 

1644 deferLoad=True, 

1645 ) 

1646 outputCatalog = connectionTypes.Output( 

1647 doc="Per-visit concatenation of Source Table", 

1648 name="{catalogType}sourceTable_visit", 

1649 storageClass="ArrowAstropy", 

1650 dimensions=("instrument", "visit") 

1651 ) 

1652 

1653 

1654class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig, 

1655 pipelineConnections=ConsolidateSourceTableConnections): 

1656 pass 

1657 

1658 

1659class ConsolidateSourceTableTask(pipeBase.PipelineTask): 

1660 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit` 

1661 """ 

1662 _DefaultName = "consolidateSourceTable" 

1663 ConfigClass = ConsolidateSourceTableConfig 

1664 

1665 inputDataset = "sourceTable" 

1666 outputDataset = "sourceTable_visit" 

1667 

1668 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1669 # Docstring inherited. 

1670 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs] 

1671 detectorOrder.sort() 

1672 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector") 

1673 inputs = butlerQC.get(inputRefs) 

1674 self.log.info("Concatenating %s per-detector Source Tables", 

1675 len(inputs["inputCatalogs"])) 

1676 table = TableVStack.vstack_handles(inputs["inputCatalogs"]) 

1677 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs) 

1678 

1679 

1680class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections, 

1681 dimensions=("instrument",), 

1682 defaultTemplates={"calexpType": ""}): 

1683 visitSummaryRefs = connectionTypes.Input( 

1684 doc="Data references for per-visit consolidated exposure metadata", 

1685 name="finalVisitSummary", 

1686 storageClass="ExposureCatalog", 

1687 dimensions=("instrument", "visit"), 

1688 multiple=True, 

1689 deferLoad=True, 

1690 ) 

1691 outputCatalog = connectionTypes.Output( 

1692 doc="CCD and Visit metadata table", 

1693 name="ccdVisitTable", 

1694 storageClass="ArrowAstropy", 

1695 dimensions=("instrument",) 

1696 ) 

1697 

1698 

1699class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig, 

1700 pipelineConnections=MakeCcdVisitTableConnections): 

1701 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

1702 

1703 

1704class MakeCcdVisitTableTask(pipeBase.PipelineTask): 

1705 """Produce a `ccdVisitTable` from the visit summary exposure catalogs. 

1706 """ 

1707 _DefaultName = "makeCcdVisitTable" 

1708 ConfigClass = MakeCcdVisitTableConfig 

1709 

1710 def run(self, visitSummaryRefs): 

1711 """Make a table of ccd information from the visit summary catalogs. 

1712 

1713 Parameters 

1714 ---------- 

1715 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` 

1716 List of DeferredDatasetHandles pointing to exposure catalogs with 

1717 per-detector summary information. 

1718 

1719 Returns 

1720 ------- 

1721 result : `~lsst.pipe.base.Struct` 

1722 Results struct with attribute: 

1723 

1724 ``outputCatalog`` 

1725 Catalog of ccd and visit information. 

1726 """ 

1727 ccdEntries = [] 

1728 for visitSummaryRef in visitSummaryRefs: 

1729 visitSummary = visitSummaryRef.get() 

1730 if not visitSummary: 

1731 continue 

1732 visitInfo = visitSummary[0].getVisitInfo() 

1733 

1734 # Strip provenance to prevent merge confusion. 

1735 strip_provenance_from_fits_header(visitSummary.metadata) 

1736 

1737 ccdEntry = {} 

1738 summaryTable = visitSummary.asAstropy() 

1739 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec", 

1740 "pixelScale", "zenithDistance", 

1741 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise", 

1742 "astromOffsetMean", "astromOffsetStd", "nPsfStar", 

1743 "psfStarDeltaE1Median", "psfStarDeltaE2Median", 

1744 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter", 

1745 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter", 

1746 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta", 

1747 "psfApFluxDelta", "psfApCorrSigmaScaledDelta", 

1748 "maxDistToNearestPsf", "starEMedian", "starUnNormalizedEMedian", 

1749 "starComa1Median", "starComa2Median", 

1750 "starTrefoil1Median", "starTrefoil2Median", 

1751 "starKurtosisMedian", "starE41Median", "starE42Median", 

1752 "effTime", "effTimePsfSigmaScale", 

1753 "effTimeSkyBgScale", "effTimeZeroPointScale", 

1754 "magLim"] 

1755 ccdEntry = summaryTable[selectColumns] 

1756 # 'visit' is the human readable visit number. 

1757 # 'visitId' is the key to the visitId table. They are the same. 

1758 # Technically you should join to get the visit from the visit 

1759 # table. 

1760 ccdEntry.rename_column("visit", "visitId") 

1761 ccdEntry.rename_column("id", "detectorId") 

1762 

1763 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards 

1764 # compatibility. To be removed after September 2023. 

1765 ccdEntry["decl"] = ccdEntry["dec"] 

1766 

1767 ccdEntry["ccdVisitId"] = [ 

1768 self.config.idGenerator.apply( 

1769 visitSummaryRef.dataId, 

1770 detector=detector_id, 

1771 is_exposure=False, 

1772 ).catalog_id # The "catalog ID" here is the ccdVisit ID 

1773 # because it's usually the ID for a whole catalog 

1774 # with a {visit, detector}, and that's the main 

1775 # use case for IdGenerator. This usage for a 

1776 # summary table is rare. 

1777 for detector_id in summaryTable["id"] 

1778 ] 

1779 ccdEntry["detector"] = summaryTable["id"] 

1780 ccdEntry["seeing"] = ( 

1781 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2)) 

1782 ) 

1783 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees() 

1784 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns") 

1785 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) 

1786 expTime = visitInfo.getExposureTime() 

1787 ccdEntry["obsStart"] = ( 

1788 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns") 

1789 ) 

1790 expTime_days = expTime / (60*60*24) 

1791 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days 

1792 ccdEntry["darkTime"] = visitInfo.getDarkTime() 

1793 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"] 

1794 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"] 

1795 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0] 

1796 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0] 

1797 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1] 

1798 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1] 

1799 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2] 

1800 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2] 

1801 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3] 

1802 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3] 

1803 # These columns are only added on certain updateVisitSummary 

1804 # configurations, and we want to rename them to camelCase and 

1805 # convert to arcseconds here. 

1806 for inName, outName in { 

1807 "wcs_corner_max_offset": "wcsCornerMaxOffset", 

1808 "wcs_detector_pointing_residual": "wcsDetectorPointingResidual", 

1809 "wcs_visit_pointing_residual": "wcsVisitPointingResidual", 

1810 "preliminary_wcs_detector_pointing_residual": "wcsPreliminaryDetectorPointingResidual", 

1811 "preliminary_wcs_visit_pointing_residual": "wcsPreliminaryVisitPointingResidual", 

1812 }.items(): 

1813 if inName in summaryTable.columns: 

1814 inCol = summaryTable[inName] 

1815 ccdEntry[outName] = astropy.table.Column( 

1816 (np.asarray(inCol) * inCol.unit).to_value(astropy.units.arcsec), 

1817 unit=astropy.units.arcsec 

1818 ) 

1819 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, 

1820 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. 

1821 # values are actually wanted. 

1822 ccdEntries.append(ccdEntry) 

1823 

1824 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact") 

1825 return pipeBase.Struct(outputCatalog=outputCatalog) 

1826 

1827 

1828class MakeVisitTableConnections(pipeBase.PipelineTaskConnections, 

1829 dimensions=("instrument",), 

1830 defaultTemplates={"calexpType": ""}): 

1831 visitSummaries = connectionTypes.Input( 

1832 doc="Per-visit consolidated exposure metadata", 

1833 name="finalVisitSummary", 

1834 storageClass="ExposureCatalog", 

1835 dimensions=("instrument", "visit",), 

1836 multiple=True, 

1837 deferLoad=True, 

1838 ) 

1839 outputCatalog = connectionTypes.Output( 

1840 doc="Visit metadata table", 

1841 name="visitTable", 

1842 storageClass="ArrowAstropy", 

1843 dimensions=("instrument",) 

1844 ) 

1845 

1846 

1847class MakeVisitTableConfig(pipeBase.PipelineTaskConfig, 

1848 pipelineConnections=MakeVisitTableConnections): 

1849 pass 

1850 

1851 

1852class MakeVisitTableTask(pipeBase.PipelineTask): 

1853 """Produce a `visitTable` from the visit summary exposure catalogs. 

1854 """ 

1855 _DefaultName = "makeVisitTable" 

1856 ConfigClass = MakeVisitTableConfig 

1857 

1858 def run(self, visitSummaries): 

1859 """Make a table of visit information from the visit summary catalogs. 

1860 

1861 Parameters 

1862 ---------- 

1863 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog` 

1864 List of exposure catalogs with per-detector summary information. 

1865 Returns 

1866 ------- 

1867 result : `~lsst.pipe.base.Struct` 

1868 Results struct with attribute: 

1869 

1870 ``outputCatalog`` 

1871 Catalog of visit information. 

1872 """ 

1873 visitEntries = [] 

1874 for visitSummary in visitSummaries: 

1875 visitSummary = visitSummary.get() 

1876 if not visitSummary: 

1877 continue 

1878 visitRow = visitSummary[0] 

1879 visitInfo = visitRow.getVisitInfo() 

1880 

1881 visitEntry = {} 

1882 visitEntry["visitId"] = visitRow["visit"] 

1883 visitEntry["visit"] = visitRow["visit"] 

1884 visitEntry["physical_filter"] = visitRow["physical_filter"] 

1885 visitEntry["band"] = visitRow["band"] 

1886 raDec = visitInfo.getBoresightRaDec() 

1887 visitEntry["ra"] = raDec.getRa().asDegrees() 

1888 visitEntry["dec"] = raDec.getDec().asDegrees() 

1889 

1890 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards 

1891 # compatibility. To be removed after September 2023. 

1892 visitEntry["decl"] = visitEntry["dec"] 

1893 

1894 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees() 

1895 azAlt = visitInfo.getBoresightAzAlt() 

1896 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees() 

1897 visitEntry["altitude"] = azAlt.getLatitude().asDegrees() 

1898 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees() 

1899 visitEntry["airmass"] = visitInfo.getBoresightAirmass() 

1900 expTime = visitInfo.getExposureTime() 

1901 visitEntry["expTime"] = expTime 

1902 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns") 

1903 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) 

1904 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns") 

1905 expTime_days = expTime / (60*60*24) 

1906 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days 

1907 visitEntries.append(visitEntry) 

1908 

1909 # TODO: DM-30623, Add programId, exposureType, cameraTemp, 

1910 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp, 

1911 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures. 

1912 

1913 outputCatalog = astropy.table.Table(rows=visitEntries) 

1914 return pipeBase.Struct(outputCatalog=outputCatalog) 

1915 

1916 

1917@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. " 

1918 "This task will be removed after v30.", 

1919 version="v29.0", category=FutureWarning) 

1920class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections, 

1921 dimensions=("instrument", "visit", "detector", "skymap", "tract")): 

1922 

1923 inputCatalog = connectionTypes.Input( 

1924 doc="Primary per-detector, single-epoch forced-photometry catalog. " 

1925 "By default, it is the output of ForcedPhotCcdTask on calexps", 

1926 name="forced_src", 

1927 storageClass="SourceCatalog", 

1928 dimensions=("instrument", "visit", "detector", "skymap", "tract") 

1929 ) 

1930 inputCatalogDiff = connectionTypes.Input( 

1931 doc="Secondary multi-epoch, per-detector, forced photometry catalog. " 

1932 "By default, it is the output of ForcedPhotCcdTask run on image differences.", 

1933 name="forced_diff", 

1934 storageClass="SourceCatalog", 

1935 dimensions=("instrument", "visit", "detector", "skymap", "tract") 

1936 ) 

1937 outputCatalog = connectionTypes.Output( 

1938 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format", 

1939 name="mergedForcedSource", 

1940 storageClass="DataFrame", 

1941 dimensions=("instrument", "visit", "detector", "skymap", "tract") 

1942 ) 

1943 

1944 

1945@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. " 

1946 "This task will be removed after v30.", 

1947 version="v29.0", category=FutureWarning) 

1948class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig, 

1949 pipelineConnections=WriteForcedSourceTableConnections): 

1950 key = lsst.pex.config.Field( 

1951 doc="Column on which to join the two input tables on and make the primary key of the output", 

1952 dtype=str, 

1953 default="objectId", 

1954 ) 

1955 

1956 

1957@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. " 

1958 "This task will be removed after v30.", 

1959 version="v29.0", category=FutureWarning) 

1960class WriteForcedSourceTableTask(pipeBase.PipelineTask): 

1961 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format. 

1962 

1963 Because the predecessor ForcedPhotCcdTask operates per-detector, 

1964 per-tract, (i.e., it has tract in its dimensions), detectors 

1965 on the tract boundary may have multiple forced source catalogs. 

1966 

1967 The successor task TransformForcedSourceTable runs per-patch 

1968 and temporally-aggregates overlapping mergedForcedSource catalogs from all 

1969 available multiple epochs. 

1970 """ 

1971 _DefaultName = "writeForcedSourceTable" 

1972 ConfigClass = WriteForcedSourceTableConfig 

1973 

1974 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1975 inputs = butlerQC.get(inputRefs) 

1976 inputs["visit"] = butlerQC.quantum.dataId["visit"] 

1977 inputs["detector"] = butlerQC.quantum.dataId["detector"] 

1978 inputs["band"] = butlerQC.quantum.dataId["band"] 

1979 outputs = self.run(**inputs) 

1980 butlerQC.put(outputs, outputRefs) 

1981 

1982 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None): 

1983 dfs = [] 

1984 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")): 

1985 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False) 

1986 df = df.reindex(sorted(df.columns), axis=1) 

1987 df["visit"] = visit 

1988 # int16 instead of uint8 because databases don't like unsigned bytes. 

1989 df["detector"] = np.int16(detector) 

1990 df["band"] = band if band else pd.NA 

1991 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns], 

1992 names=("dataset", "column")) 

1993 

1994 dfs.append(df) 

1995 

1996 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs) 

1997 return pipeBase.Struct(outputCatalog=outputCatalog) 

1998 

1999 

2000class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections, 

2001 dimensions=("instrument", "skymap", "patch", "tract")): 

2002 

2003 inputCatalogs = connectionTypes.Input( 

2004 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask", 

2005 name="mergedForcedSource", 

2006 storageClass="DataFrame", 

2007 dimensions=("instrument", "visit", "detector", "skymap", "tract"), 

2008 multiple=True, 

2009 deferLoad=True 

2010 ) 

2011 referenceCatalog = connectionTypes.Input( 

2012 doc="Reference catalog which was used to seed the forcedPhot. Columns " 

2013 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner " 

2014 "are expected.", 

2015 name="objectTable", 

2016 storageClass="DataFrame", 

2017 dimensions=("tract", "patch", "skymap"), 

2018 deferLoad=True 

2019 ) 

2020 outputCatalog = connectionTypes.Output( 

2021 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a " 

2022 "specified set of functors", 

2023 name="forcedSourceTable", 

2024 storageClass="ArrowAstropy", 

2025 dimensions=("tract", "patch", "skymap") 

2026 ) 

2027 

2028 

2029class TransformForcedSourceTableConfig(TransformCatalogBaseConfig, 

2030 pipelineConnections=TransformForcedSourceTableConnections): 

2031 referenceColumns = pexConfig.ListField( 

2032 dtype=str, 

2033 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"], 

2034 optional=True, 

2035 doc="Columns to pull from reference catalog", 

2036 ) 

2037 keyRef = lsst.pex.config.Field( 

2038 doc="Column on which to join the two input tables on and make the primary key of the output", 

2039 dtype=str, 

2040 default="objectId", 

2041 ) 

2042 key = lsst.pex.config.Field( 

2043 doc="Rename the output DataFrame index to this name", 

2044 dtype=str, 

2045 default="forcedSourceId", 

2046 ) 

2047 

2048 def setDefaults(self): 

2049 super().setDefaults() 

2050 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml") 

2051 self.columnsFromDataId = ["tract", "patch"] 

2052 

2053 

2054class TransformForcedSourceTableTask(TransformCatalogBaseTask): 

2055 """Transform/standardize a ForcedSource catalog 

2056 

2057 Transforms each wide, per-detector forcedSource DataFrame per the 

2058 specification file (per-camera defaults found in ForcedSource.yaml). 

2059 All epochs that overlap the patch are aggregated into one per-patch 

2060 narrow-DataFrame file. 

2061 

2062 No de-duplication of rows is performed. Duplicate resolutions flags are 

2063 pulled in from the referenceCatalog: `detect_isPrimary`, 

2064 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate 

2065 for analysis or compare duplicates for QA. 

2066 

2067 The resulting table includes multiple bands. Epochs (MJDs) and other useful 

2068 per-visit rows can be retreived by joining with the CcdVisitTable on 

2069 ccdVisitId. 

2070 """ 

2071 _DefaultName = "transformForcedSourceTable" 

2072 ConfigClass = TransformForcedSourceTableConfig 

2073 

2074 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

2075 inputs = butlerQC.get(inputRefs) 

2076 if self.funcs is None: 

2077 raise ValueError("config.functorFile is None. " 

2078 "Must be a valid path to yaml in order to run Task as a PipelineTask.") 

2079 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs, 

2080 dataId=dict(outputRefs.outputCatalog.dataId.mapping)) 

2081 

2082 butlerQC.put(outputs, outputRefs) 

2083 

2084 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None): 

2085 dfs = [] 

2086 refColumns = list(self.config.referenceColumns) 

2087 refColumns.append(self.config.keyRef) 

2088 try: 

2089 ref = referenceCatalog.get(parameters={"columns": refColumns}) 

2090 except ValueError: 

2091 raise NoWorkFound(f"No objects to run forced photometry for {dataId}.") 

2092 if ref.index.name != self.config.keyRef: 

2093 # If the DataFrame we loaded was originally written as some other 

2094 # Parquet type, it probably doesn't have the index set. If it was 

2095 # written as a DataFrame, the index should already be set and 

2096 # trying to set it again would be an error, since it doens't exist 

2097 # as a regular column anymore. 

2098 ref.set_index(self.config.keyRef, inplace=True) 

2099 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs))) 

2100 for handle in inputCatalogs: 

2101 result = self.transform(None, handle, funcs, dataId) 

2102 # Filter for only rows that were detected on (overlap) the patch 

2103 dfs.append(result.df.join(ref, how="inner")) 

2104 

2105 outputCatalog = pd.concat(dfs) 

2106 

2107 if outputCatalog.empty: 

2108 raise NoWorkFound(f"No forced photometry rows for {dataId}.") 

2109 

2110 # Now that we are done joining on config.keyRef 

2111 # Change index to config.key by 

2112 outputCatalog.index.rename(self.config.keyRef, inplace=True) 

2113 # Add config.keyRef to the column list 

2114 outputCatalog.reset_index(inplace=True) 

2115 

2116 if "forcedSourceId" in outputCatalog.columns: 

2117 # Set the forcedSourceId to the index. This is specified in the 

2118 # ForcedSource.yaml 

2119 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True) 

2120 # Rename it to the config.key 

2121 outputCatalog.index.rename(self.config.key, inplace=True) 

2122 

2123 self.log.info("Made a table of %d columns and %d rows", 

2124 len(outputCatalog.columns), len(outputCatalog)) 

2125 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog)) 

2126 

2127 

2128class ConsolidateTractConnections(pipeBase.PipelineTaskConnections, 

2129 defaultTemplates={"catalogType": ""}, 

2130 dimensions=("instrument", "tract")): 

2131 inputCatalogs = connectionTypes.Input( 

2132 doc="Input per-patch DataFrame Tables to be concatenated", 

2133 name="{catalogType}ForcedSourceTable", 

2134 storageClass="DataFrame", 

2135 dimensions=("tract", "patch", "skymap"), 

2136 multiple=True, 

2137 ) 

2138 

2139 outputCatalog = connectionTypes.Output( 

2140 doc="Output per-tract concatenation of DataFrame Tables", 

2141 name="{catalogType}ForcedSourceTable_tract", 

2142 storageClass="DataFrame", 

2143 dimensions=("tract", "skymap"), 

2144 ) 

2145 

2146 

2147class ConsolidateTractConfig(pipeBase.PipelineTaskConfig, 

2148 pipelineConnections=ConsolidateTractConnections): 

2149 

2150 doUseSchema = pexConfig.Field( 

2151 dtype=bool, 

2152 default=False, 

2153 doc="Use an existing schema to coerce the data types of the output columns." 

2154 ) 

2155 schemaDir = pexConfig.Field( 

2156 dtype=str, 

2157 doc="Path to the directory containing schema definitions.", 

2158 default=os.path.join("${SDM_SCHEMAS_DIR}", 

2159 "yml"), 

2160 optional=True, 

2161 ) 

2162 schemaFile = pexConfig.Field( 

2163 dtype=str, 

2164 doc="Yaml file specifying the schema of the output catalog.", 

2165 optional=True, 

2166 ) 

2167 tableName = pexConfig.Field( 

2168 dtype=str, 

2169 doc="Name of the table in the schema file to read.", 

2170 optional=True, 

2171 ) 

2172 

2173 

2174class ConsolidateTractTask(pipeBase.PipelineTask): 

2175 """Concatenate any per-patch, dataframe list into a single 

2176 per-tract DataFrame. 

2177 """ 

2178 _DefaultName = "ConsolidateTract" 

2179 ConfigClass = ConsolidateTractConfig 

2180 

2181 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

2182 inputs = butlerQC.get(inputRefs) 

2183 # Not checking at least one inputCatalog exists because that'd be an 

2184 # empty QG. 

2185 self.log.info("Concatenating %s per-patch %s Tables", 

2186 len(inputs["inputCatalogs"]), 

2187 inputRefs.inputCatalogs[0].datasetType.name) 

2188 df = pd.concat(inputs["inputCatalogs"]) 

2189 if self.config.doUseSchema: 

2190 schemaFile = os.path.join(self.config.schemaDir, self.config.schemaFile) 

2191 schema = readSdmSchemaFile(schemaFile) 

2192 df = convertDataFrameToSdmSchema(schema, df, tableName=self.config.tableName) 

2193 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) 

2194 

2195 

2196class ConsolidateParentTractConnections( 

2197 pipeBase.PipelineTaskConnections, 

2198 dimensions=("instrument", "tract") 

2199): 

2200 inputCatalogs = connectionTypes.Input( 

2201 doc="Parents of the deblended objects", 

2202 name="object_parent_patch", 

2203 storageClass="SourceCatalog", 

2204 dimensions=("tract", "patch", "skymap"), 

2205 multiple=True, 

2206 ) 

2207 

2208 outputCatalog = connectionTypes.Output( 

2209 doc="Output per-tract concatenation of DataFrame Tables", 

2210 name="object_parent", 

2211 storageClass="ArrowAstropy", 

2212 dimensions=("tract", "skymap"), 

2213 ) 

2214 

2215 

2216class ConsolidateParentTractConfig( 

2217 pipeBase.PipelineTaskConfig, 

2218 pipelineConnections=ConsolidateParentTractConnections, 

2219): 

2220 pass 

2221 

2222 

2223class ConsolidateParentTractTask(pipeBase.PipelineTask): 

2224 """Concatenate any per-patch, dataframe list into a single 

2225 per-tract DataFrame. 

2226 """ 

2227 _DefaultName = "ConsolidateTract" 

2228 ConfigClass = ConsolidateParentTractConfig 

2229 

2230 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

2231 self.log.info("Concatenating %s per-patch %s Tables", 

2232 len(inputRefs.inputCatalogs), 

2233 inputRefs.inputCatalogs[0].datasetType.name) 

2234 

2235 tables = [] 

2236 for ref in inputRefs.inputCatalogs: 

2237 catalog = butlerQC.get(ref) 

2238 table = catalog.asAstropy() 

2239 

2240 # Rename columns 

2241 table.rename_column("id", "objectId") 

2242 table.rename_column("parent", "parentObjectId") 

2243 table.rename_column("merge_peak_sky", "sky_object") 

2244 

2245 # Add tract and patch columns 

2246 table["tract"] = ref.dataId["tract"] 

2247 table["patch"] = ref.dataId["patch"] 

2248 

2249 tables.append(table) 

2250 outputTable = astropy.table.vstack(tables, join_type="exact") 

2251 butlerQC.put(pipeBase.Struct(outputCatalog=outputTable), outputRefs)