Coverage for python / lsst / pipe / tasks / measurementDriver.py: 0%

447 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:38 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = [ 

23 "SingleBandMeasurementDriverConfig", 

24 "SingleBandMeasurementDriverTask", 

25 "MultiBandMeasurementDriverConfig", 

26 "MultiBandMeasurementDriverTask", 

27 "ForcedMeasurementDriverConfig", 

28 "ForcedMeasurementDriverTask", 

29] 

30 

31import copy 

32import logging 

33from abc import ABCMeta, abstractmethod 

34 

35import astropy 

36import lsst.afw.detection as afwDetection 

37import lsst.afw.geom as afwGeom 

38import lsst.afw.image as afwImage 

39import lsst.afw.math as afwMath 

40import lsst.afw.table as afwTable 

41import lsst.geom 

42import lsst.meas.algorithms as measAlgorithms 

43import lsst.meas.base as measBase 

44import lsst.meas.deblender as measDeblender 

45import lsst.meas.extensions.scarlet as scarlet 

46import lsst.pipe.base as pipeBase 

47import numpy as np 

48from lsst.meas.extensions.scarlet.deconvolveExposureTask import DeconvolveExposureTask 

49from lsst.pex.config import Config, ConfigurableField, Field 

50 

51logging.basicConfig(level=logging.INFO) 

52 

53 

54class MeasurementDriverBaseConfig(Config): 

55 """Base configuration for measurement driver tasks. 

56 

57 This class provides foundational configuration for its subclasses to handle 

58 single-band and multi-band data. It defines variance scaling, detection, 

59 deblending, measurement, aperture correction, and catalog calculation 

60 subtasks, which are intended to be executed in sequence by the driver task. 

61 """ 

62 

63 doScaleVariance = Field[bool](doc="Scale variance plane using empirical noise?", default=False) 

64 

65 scaleVariance = ConfigurableField( 

66 doc="Subtask to rescale variance plane", target=measAlgorithms.ScaleVarianceTask 

67 ) 

68 

69 doDetect = Field[bool](doc="Run the source detection algorithm?", default=True) 

70 

71 detection = ConfigurableField( 

72 doc="Subtask to detect sources in the image", target=measAlgorithms.SourceDetectionTask 

73 ) 

74 

75 doDeblend = Field[bool](doc="Run the source deblending algorithm?", default=True) 

76 # N.B. The 'deblend' configurable field should be defined in subclasses. 

77 

78 doMeasure = Field[bool](doc="Run the source measurement algorithm?", default=True) 

79 

80 measurement = ConfigurableField( 

81 doc="Subtask to measure sources and populate the output catalog", 

82 target=measBase.SingleFrameMeasurementTask, 

83 ) 

84 

85 psfCache = Field[int]( 

86 doc="Maximum number of PSFs to cache, preventing repeated PSF evaluations at the same " 

87 "point across different measurement plugins. Defaults to -1, which auto-sizes the cache " 

88 "based on the plugin count.", 

89 default=-1, 

90 ) 

91 

92 checkUnitsParseStrict = Field[str]( 

93 doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'", 

94 default="raise", 

95 ) 

96 

97 doApCorr = Field[bool]( 

98 doc="Apply aperture corrections? If yes, your image must have an aperture correction map", 

99 default=False, 

100 ) 

101 

102 applyApCorr = ConfigurableField( 

103 doc="Subtask to apply aperture corrections", 

104 target=measBase.ApplyApCorrTask, 

105 ) 

106 

107 doRunCatalogCalculation = Field[bool](doc="Run catalogCalculation task?", default=False) 

108 

109 catalogCalculation = ConfigurableField( 

110 doc="Subtask to run catalogCalculation plugins on catalog", target=measBase.CatalogCalculationTask 

111 ) 

112 

113 doOptions = [ 

114 "doScaleVariance", 

115 "doDetect", 

116 "doDeblend", 

117 "doMeasure", 

118 "doApCorr", 

119 "doRunCatalogCalculation", 

120 ] 

121 

122 def validate(self): 

123 """Ensure that at least one processing step is enabled.""" 

124 super().validate() 

125 

126 if not any(getattr(self, opt) for opt in self.doOptions): 

127 raise ValueError(f"At least one of these options must be enabled: {self.doOptions}") 

128 

129 if self.doApCorr and not self.doMeasure: 

130 raise ValueError("Aperture correction requires measurement to be enabled.") 

131 

132 if self.doRunCatalogCalculation and not self.doMeasure: 

133 raise ValueError("Catalog calculation requires measurement to be enabled.") 

134 

135 

136class MeasurementDriverBaseTask(pipeBase.Task, metaclass=ABCMeta): 

137 """Base class for the mid-level driver running variance scaling, detection, 

138 deblending, measurement, apperture correction, and catalog calculation in 

139 one go. 

140 

141 Users don't need to Butlerize their input data, which is a significant 

142 advantage for quick data exploration and testing. This driver simplifies 

143 the process of applying measurement algorithms to images by abstracting 

144 away low-level implementation details such as Schema and table boilerplate. 

145 It's a convenient way to process images into catalogs with a user-friendly 

146 interface for non-developers while allowing extensive configuration and 

147 integration into unit tests for developers. It also considerably improves 

148 how demos and workflows are showcased in Jupyter notebooks. 

149 

150 Parameters 

151 ---------- 

152 schema : 

153 Schema used to create the output `~lsst.afw.table.SourceCatalog`, 

154 modified in place with fields that will be written by this task. 

155 peakSchema : 

156 Schema of Footprint Peaks that will be passed to the deblender. 

157 **kwargs : 

158 Additional kwargs to pass to lsst.pipe.base.Task.__init__() 

159 

160 Notes 

161 ----- 

162 Subclasses (e.g., single-band vs. multi-band) share most methods and config 

163 options but differ in handling and validating inputs by overriding the base 

164 config class and any methods that require their own logic. 

165 """ 

166 

167 ConfigClass = MeasurementDriverBaseConfig 

168 _DefaultName = "measurementDriverBase" 

169 _Deblender = "" 

170 

171 def __init__(self, schema: afwTable.Schema = None, peakSchema: afwTable.Schema = None, **kwargs: dict): 

172 super().__init__(**kwargs) 

173 

174 # Schema for the output catalog. 

175 self.schema = schema 

176 

177 # Schema for deblender peaks. 

178 self.peakSchema = peakSchema 

179 

180 # Placeholders for subclasses to populate. 

181 self.mapper: afwTable.SchemaMapper 

182 self.scaleVariance: measAlgorithms.ScaleVarianceTask 

183 self.detection: measAlgorithms.SourceDetectionTask 

184 self.deblend: measDeblender.SourceDeblendTask | scarlet.ScarletDeblendTask 

185 self.measurement: measBase.SingleFrameMeasurementTask | measBase.ForcedMeasurementTask 

186 self.applyApCorr: measBase.ApplyApCorrTask 

187 self.catalogCalculation: measBase.CatalogCalculationTask 

188 

189 # Store the initial Schema to use for reinitialization if necessary. 

190 self.initSchema: afwTable.Schema 

191 # To safeguard against user tampering and ensure predictable behavior, 

192 # the following attribute can only be modified within the class using 

193 # a controlled setter. 

194 super().__setattr__("initSchema", copy.deepcopy(schema)) 

195 

196 def __setattr__(self, name, value): 

197 """Prevent external modifications of the initial Schema.""" 

198 if name == "initSchema": 

199 raise AttributeError(f"Cannot modify {name} directly") 

200 super().__setattr__(name, value) 

201 

202 @abstractmethod 

203 def run(self, *args, **kwargs) -> pipeBase.Struct: 

204 """Run the measurement driver task. Subclasses must implement this 

205 method using their own logic to handle single-band or multi-band data. 

206 """ 

207 raise NotImplementedError("This is not implemented on the base class") 

208 

209 def _ensureValidInputs( 

210 self, 

211 catalog: afwTable.SourceCatalog | None, 

212 ): 

213 """Perform validation and adjustments of inputs without heavy 

214 computation. 

215 

216 Parameters 

217 ---------- 

218 catalog : 

219 Catalog to be extended by the driver task. 

220 """ 

221 # Validate the configuration before proceeding. 

222 self.config.validate() 

223 

224 if self.config.doDetect: 

225 if catalog is not None: 

226 raise RuntimeError( 

227 "An input catalog was given to bypass detection, but 'doDetect' is still on" 

228 ) 

229 else: 

230 if catalog is None: 

231 raise RuntimeError("Cannot run without detection if no 'catalog' is provided") 

232 

233 def _initializeSchema(self, catalog: afwTable.SourceCatalog = None): 

234 """Initialize the Schema to be used for constructing the subtasks. 

235 

236 Though it may seem clunky, this workaround is necessary to ensure 

237 Schema consistency across all subtasks. 

238 

239 Parameters 

240 ---------- 

241 catalog : 

242 Catalog from which to extract the Schema. If not provided, the 

243 user-provided Schema and if that is also not provided during 

244 initialization, a minimal Schema will be used. 

245 """ 

246 # If the Schema has been modified (either by subtasks or externally by 

247 # the user), reset it to the initial state before creating subtasks. 

248 # This would be neccessary when running the same driver task multiple 

249 # times with different configs/inputs. 

250 if self.schema != self.initSchema: 

251 self.schema = copy.deepcopy(self.initSchema) 

252 

253 if catalog is None: 

254 if self.schema is None: 

255 # Create a minimal Schema that will be extended by tasks. 

256 self.schema = afwTable.SourceTable.makeMinimalSchema() 

257 

258 # Add coordinate error fields to avoid missing field issues. 

259 self._addCoordErrorFieldsIfMissing(self.schema) 

260 else: 

261 if self.schema is not None: 

262 self.log.warning( 

263 "Both a catalog and a Schema were provided; using the Schema from the catalog only" 

264 ) 

265 

266 # Since a catalog is provided, use its Schema as the base. 

267 catalogSchema = catalog.schema 

268 

269 # Create a SchemaMapper that maps from catalogSchema to a new one 

270 # it will create. 

271 self.mapper = afwTable.SchemaMapper(catalogSchema) 

272 

273 # Ensure coordinate error fields exist in the schema. 

274 # This must be done after initializing the SchemaMapper to avoid 

275 # unequal schemas between input record and mapper during the 

276 # _updateCatalogSchema call. 

277 self._addCoordErrorFieldsIfMissing(catalogSchema) 

278 

279 # Add everything from catalogSchema to output Schema. 

280 self.mapper.addMinimalSchema(catalogSchema, True) 

281 

282 # Get the output Schema from the SchemaMapper and assign it as the 

283 # Schema to be used for constructing the subtasks. 

284 self.schema = self.mapper.getOutputSchema() 

285 

286 if isinstance(self, ForcedMeasurementDriverTask): 

287 # A trick also used in https://github.com/lsst/ap_pipe/blob/ 

288 # a221d4e43e2abac44b1cbed0533b9e220c5a67f4/python/lsst/ap/pipe/ 

289 # matchSourceInjected.py#L161 

290 self.schema.addField("deblend_nChild", "I", "Needed for minimal forced photometry schema") 

291 

292 def _addCoordErrorFieldsIfMissing(self, schema: afwTable.Schema): 

293 """Add coordinate error fields to the schema in-place if they are not 

294 already present. 

295 

296 Parameters 

297 ---------- 

298 schema : 

299 Schema to be checked for coordinate error fields. 

300 """ 

301 if not any( 

302 errorField in schema.getNames() 

303 for errorField in ("coord_raErr", "coord_decErr", "coord_ra_dec_Cov") 

304 ): 

305 afwTable.CoordKey.addErrorFields(schema) 

306 

307 def _makeSubtasks(self): 

308 """Construct subtasks based on the configuration and the Schema.""" 

309 if self.schema is None and any( 

310 getattr(self.config, attr) for attr in self.config.doOptions if attr != "doScaleVariance" 

311 ): 

312 raise RuntimeError( 

313 "Cannot create requested subtasks without a Schema; " 

314 "ensure one is provided explicitly or via a catalog" 

315 ) 

316 

317 if self.config.doScaleVariance: 

318 self.makeSubtask("scaleVariance") 

319 

320 if isinstance(self, ForcedMeasurementDriverTask): 

321 # Always True for forced measurement. 

322 if self.config.doMeasure: 

323 self.makeSubtask("measurement", refSchema=self.schema) 

324 

325 # In forced measurement, where the measurement catalog is built 

326 # internally, we need to initialize applyApCorr with the full 

327 # schema after measurement plugins have added their fields; 

328 # otherwise, it won’t see them and will silently skip applying 

329 # aperture corrections. 

330 # A related example can be found in this reference: 

331 # https://github.com/lsst/drp_tasks/blob/ 

332 # b565995b995cd5f0e40196f8d3c89cafb89aa515/python/lsst/drp/tasks/ 

333 # forcedPhotCoadd.py#L203 

334 if self.config.doApCorr: 

335 self.makeSubtask("applyApCorr", schema=self.measurement.schema) 

336 

337 # Same reference as above uses `measurement.schema` to make the 

338 # catalogCalculation subtask, so we do the same here. 

339 if self.config.doRunCatalogCalculation: 

340 self.makeSubtask("catalogCalculation", schema=self.measurement.schema) 

341 else: 

342 if self.config.doDetect: 

343 self.makeSubtask("detection", schema=self.schema) 

344 

345 if self.config.doDeblend: 

346 self.makeSubtask("deblend", schema=self.schema, peakSchema=self.peakSchema) 

347 

348 if self.config.doMeasure: 

349 self.makeSubtask("measurement", schema=self.schema) 

350 

351 if self.config.doApCorr: 

352 self.makeSubtask("applyApCorr", schema=self.measurement.schema) 

353 

354 if self.config.doRunCatalogCalculation: 

355 self.makeSubtask("catalogCalculation", schema=self.schema) 

356 

357 def _prepareSchemaAndSubtasks( 

358 self, catalog: afwTable.SourceCatalog | None 

359 ) -> afwTable.SourceCatalog | None: 

360 """Ensure subtasks are properly initialized according to the 

361 configuration and the provided catalog. 

362 

363 Parameters 

364 ---------- 

365 catalog : 

366 Optional catalog to be used for initializing the Schema and the 

367 subtasks. 

368 

369 Returns 

370 ------- 

371 catalog : 

372 Updated catalog to be passed to the subtasks, if it was provided. 

373 """ 

374 # Set up the Schema before creating subtasks. 

375 self._initializeSchema(catalog) 

376 

377 # Create subtasks, passing the same Schema to each subtask's 

378 # constructor that requires it. 

379 self._makeSubtasks() 

380 

381 # Check that all units in the Schema are valid Astropy unit strings. 

382 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict) 

383 

384 # Adjust the catalog Schema to align with changes made by the subtasks. 

385 if catalog: 

386 catalog = self._updateCatalogSchema(catalog) 

387 

388 return catalog 

389 

390 def _scaleVariance(self, exposure: afwImage.Exposure, band: str = "a single"): 

391 """Scale the variance plane of an exposure to match the observed 

392 variance. 

393 

394 Parameters 

395 ---------- 

396 exposure : 

397 Exposure on which to run the variance scaling algorithm. 

398 band : 

399 Band associated with the exposure. Used for logging. 

400 """ 

401 self.log.info(f"Scaling variance plane for {band} band") 

402 varScale = self.scaleVariance.run(exposure.maskedImage) 

403 exposure.getMetadata().add("VARIANCE_SCALE", varScale) 

404 

405 def _toContiguous( 

406 self, catalog: afwTable.SourceCatalog | dict[str, afwTable.SourceCatalog] 

407 ) -> afwTable.SourceCatalog | dict[str, afwTable.SourceCatalog]: 

408 """Make a catalog or catalogs contiguous if they are not already. 

409 

410 Parameters 

411 ---------- 

412 catalog : 

413 Catalog or dictionary of catalogs with bands as keys to be made 

414 contiguous. 

415 

416 Returns 

417 ------- 

418 catalog : 

419 Contiguous catalog or dictionary of contiguous catalogs. 

420 """ 

421 if isinstance(catalog, dict): 

422 for band, cat in catalog.items(): 

423 if not cat.isContiguous(): 

424 self.log.info(f"{band}-band catalog is not contiguous; making it contiguous") 

425 catalog[band] = cat.copy(deep=True) 

426 else: 

427 if not catalog.isContiguous(): 

428 self.log.info("Catalog is not contiguous; making it contiguous") 

429 catalog = catalog.copy(deep=True) 

430 return catalog 

431 

432 def _updateCatalogSchema(self, catalog: afwTable.SourceCatalog) -> afwTable.SourceCatalog: 

433 """Update the Schema of the provided catalog to incorporate changes 

434 made by the configured subtasks. 

435 

436 Parameters 

437 ---------- 

438 catalog : 

439 Catalog to be updated with the Schema changes. 

440 

441 Returns 

442 ------- 

443 updatedCatalog : 

444 Catalog with the updated Schema. 

445 """ 

446 # Create an empty catalog with the Schema required by the subtasks that 

447 # are configured to run. 

448 updatedCatalog = afwTable.SourceCatalog(self.schema) 

449 

450 # Transfer all records from the original catalog to the new catalog, 

451 # using the SchemaMapper to copy values. 

452 updatedCatalog.extend(catalog, mapper=self.mapper) 

453 

454 # Return the updated catalog, preserving the records while applying the 

455 # updated Schema. 

456 return updatedCatalog 

457 

458 def _detectSources( 

459 self, exposure: afwImage.Exposure | afwImage.MultibandExposure, idGenerator: measBase.IdGenerator 

460 ) -> tuple[afwTable.SourceCatalog, afwMath.BackgroundList]: 

461 """Run the detection subtask to identify sources in the image. 

462 

463 Parameters 

464 ---------- 

465 exposure : 

466 Exposure on which to run the detection algorithm. 

467 idGenerator : 

468 Generator for unique source IDs. 

469 

470 Returns 

471 ------- 

472 catalog : 

473 A catalog containing detected sources. 

474 backgroundList : 

475 A list of background models obtained from the detection process, 

476 if available. 

477 """ 

478 self.log.info(f"Running detection on a {exposure.width}x{exposure.height} pixel exposure") 

479 

480 # Create an empty source table with the known Schema into which 

481 # detected sources will be placed next. 

482 table = afwTable.SourceTable.make(self.schema, idGenerator.make_table_id_factory()) 

483 

484 # Run the detection task on the exposure and make a source catalog. 

485 detections = self.detection.run(table, exposure) 

486 catalog = detections.sources 

487 backgroundList = afwMath.BackgroundList() 

488 

489 # Get the background model from the detection task, if available. 

490 if hasattr(detections, "background") and detections.background: 

491 for bg in detections.background: 

492 backgroundList.append(bg) 

493 

494 return catalog, backgroundList 

495 

496 @abstractmethod 

497 def _deblendSources(self, *args, **kwargs): 

498 """Run the deblending subtask to separate blended sources. Subclasses 

499 must implement this method to handle task-specific deblending logic. 

500 """ 

501 raise NotImplementedError("This is not implemented on the base class") 

502 

503 def _measureSources( 

504 self, 

505 exposure: afwImage.Exposure, 

506 catalog: afwTable.SourceCatalog, 

507 idGenerator: measBase.IdGenerator, 

508 refCat: afwTable.SourceCatalog | None = None, 

509 ): 

510 """Run the measurement subtask to compute properties of sources. 

511 

512 Parameters 

513 ---------- 

514 exposure : 

515 Exposure on which to run the measurement algorithm. 

516 catalog : 

517 Catalog containing sources on which to run the measurement subtask. 

518 idGenerator : 

519 Generator for unique source IDs. 

520 refCat : 

521 Reference catalog to be used for forced measurements, if any. 

522 If not provided, the measurement will be run on the sources in the 

523 catalog in a standard manner without reference. 

524 """ 

525 if refCat: 

526 # Note that refCat does not have a WCS, so we need to 

527 # extract the WCS from the exposure. 

528 refWcs = exposure.getWcs() 

529 # Run forced measurement since a reference catalog is provided. 

530 self.measurement.run( 

531 measCat=catalog, 

532 exposure=exposure, 

533 refCat=refCat, 

534 refWcs=refWcs, 

535 exposureId=idGenerator.catalog_id, 

536 ) 

537 else: 

538 # Run standard measurement if no reference catalog is provided. 

539 self.measurement.run(measCat=catalog, exposure=exposure, exposureId=idGenerator.catalog_id) 

540 

541 def _applyApCorr( 

542 self, exposure: afwImage.Exposure, catalog: afwTable.SourceCatalog, idGenerator: measBase.IdGenerator 

543 ): 

544 """Apply aperture corrections to the catalog. 

545 

546 Parameters 

547 ---------- 

548 exposure : 

549 Exposure on which to apply aperture corrections. 

550 catalog : 

551 Catalog to be corrected using the aperture correction map from 

552 the exposure. 

553 idGenerator : 

554 Generator for unique source IDs. 

555 """ 

556 apCorrMap = exposure.getInfo().getApCorrMap() 

557 if apCorrMap is None: 

558 self.log.warning( 

559 "Image does not have valid aperture correction map for catalog id " 

560 f"{idGenerator.catalog_id}; skipping aperture correction" 

561 ) 

562 else: 

563 self.applyApCorr.run(catalog=catalog, apCorrMap=apCorrMap) 

564 

565 def _runCatalogCalculation(self, catalog: afwTable.SourceCatalog): 

566 """Run the catalog calculation plugins on the catalog. 

567 

568 Parameters 

569 ---------- 

570 catalog : 

571 Catalog to be processed by the catalog calculation subtask. 

572 """ 

573 self.catalogCalculation.run(catalog) 

574 

575 def _processCatalog( 

576 self, 

577 exposure: afwImage.Exposure, 

578 catalog: afwTable.SourceCatalog, 

579 idGenerator: measBase.IdGenerator, 

580 band: str = "a single", 

581 refCat: afwTable.SourceCatalog | None = None, 

582 ) -> afwTable.SourceCatalog: 

583 """Process a catalog through measurement, aperture correction, and 

584 catalog calculation subtasks. 

585 

586 Parameters 

587 ---------- 

588 exposure : 

589 Exposure associated with the catalog. 

590 catalog : 

591 Catalog to be processed by the subtasks. 

592 idGenerator : 

593 Generator for unique source IDs. 

594 band : 

595 Band associated with the exposure and catalog. Used for logging. 

596 refCat : 

597 Reference catalog for forced measurements. If not provided, the 

598 measurement will be run on the sources in the catalog in a standard 

599 manner without reference. 

600 

601 Returns 

602 ------- 

603 catalog : 

604 Catalog after processing through the configured subtasks. 

605 """ 

606 # Set the PSF cache capacity to cache repeated PSF evaluations at the 

607 # same point coming from different measurement plugins. 

608 if self.config.psfCache > 0: 

609 # Set a hard limit on the number of PSFs to cache. 

610 exposure.psf.setCacheCapacity(self.config.psfCache) 

611 else: 

612 # Auto-size the cache based on the number of measurement 

613 # plugins. We assume each algorithm tries to evaluate the PSF 

614 # twice, which is more than enough since many don't evaluate it 

615 # at all, and there's no *good* reason for any algorithm to 

616 # evaluate it more than once. 

617 # (Adopted from drp_tasks/ForcedPhotCoaddTask) 

618 exposure.psf.setCacheCapacity(2 * len(self.config.measurement.plugins.names)) 

619 

620 # Measure properties of sources in the catalog. 

621 if self.config.doMeasure: 

622 self.log.info( 

623 f"Measuring {len(catalog)} sources in {band} band " 

624 f"using '{self.measurement.__class__.__name__}'" 

625 ) 

626 self._measureSources(exposure, catalog, idGenerator, refCat=refCat) 

627 

628 # Ensure contiguity again. 

629 catalog = self._toContiguous(catalog) 

630 

631 # Apply aperture corrections to the catalog. 

632 if self.config.doApCorr: 

633 self.log.info(f"Applying aperture corrections to {band} band") 

634 self._applyApCorr(exposure, catalog, idGenerator) 

635 

636 # Run catalogCalculation on the catalog. 

637 if self.config.doRunCatalogCalculation: 

638 self.log.info(f"Running catalog calculation on {band} band") 

639 self._runCatalogCalculation(catalog) 

640 

641 self.log.info( 

642 f"Finished processing for {band} band; output catalog has {catalog.schema.getFieldCount()} " 

643 f"fields and {len(catalog)} records" 

644 ) 

645 

646 return catalog 

647 

648 

649class SingleBandMeasurementDriverConfig(MeasurementDriverBaseConfig): 

650 """Configuration for the single-band measurement driver task.""" 

651 

652 deblend = ConfigurableField(target=measDeblender.SourceDeblendTask, doc="Deblender for single-band data.") 

653 

654 

655class SingleBandMeasurementDriverTask(MeasurementDriverBaseTask): 

656 """Mid-level driver for processing single-band data. 

657 

658 Offers a helper method for direct handling of raw image data in addition to 

659 the standard single-band exposure. 

660 

661 Examples 

662 -------- 

663 Here is an example of how to use this class to run variance scaling, 

664 detection, deblending, and measurement on a single-band exposure: 

665 

666 >>> from lsst.pipe.tasks.measurementDriver import ( 

667 ... SingleBandMeasurementDriverConfig, 

668 ... SingleBandMeasurementDriverTask, 

669 ... ) 

670 >>> import lsst.meas.extensions.shapeHSM # To register its plugins 

671 >>> config = SingleBandMeasurementDriverConfig() 

672 >>> config.doScaleVariance = True 

673 >>> config.doDetect = True 

674 >>> config.doDeblend = True 

675 >>> config.doMeasure = True 

676 >>> config.scaleVariance.background.binSize = 64 

677 >>> config.detection.thresholdValue = 5.5 

678 >>> config.deblend.tinyFootprintSize = 3 

679 >>> config.measurement.plugins.names |= [ 

680 ... "base_SdssCentroid", 

681 ... "base_SdssShape", 

682 ... "ext_shapeHSM_HsmSourceMoments", 

683 ... ] 

684 >>> config.measurement.slots.psfFlux = None 

685 >>> config.measurement.doReplaceWithNoise = False 

686 >>> exposure = butler.get("deepCoadd", dataId=...) 

687 >>> driver = SingleBandMeasurementDriverTask(config=config) 

688 >>> results = driver.run(exposure) 

689 >>> results.catalog.writeFits("meas_catalog.fits") 

690 

691 Alternatively, if an exposure is not available, the driver can also process 

692 raw image data: 

693 

694 >>> image = ... 

695 >>> mask = ... 

696 >>> variance = ... 

697 >>> wcs = ... 

698 >>> psf = ... 

699 >>> photoCalib = ... 

700 >>> results = driver.runFromImage( 

701 ... image, mask, variance, wcs, psf, photoCalib 

702 ... ) 

703 >>> results.catalog.writeFits("meas_catalog.fits") 

704 """ 

705 

706 ConfigClass = SingleBandMeasurementDriverConfig 

707 _DefaultName = "singleBandMeasurementDriver" 

708 _Deblender = "meas_deblender" 

709 

710 def __init__(self, *args, **kwargs): 

711 super().__init__(*args, **kwargs) 

712 

713 self.deblend: measDeblender.SourceDeblendTask 

714 self.measurement: measBase.SingleFrameMeasurementTask 

715 

716 def run( 

717 self, 

718 exposure: afwImage.Exposure, 

719 catalog: afwTable.SourceCatalog | None = None, 

720 idGenerator: measBase.IdGenerator | None = None, 

721 ) -> pipeBase.Struct: 

722 """Process a single-band exposure through the configured subtasks and 

723 return the results as a struct. 

724 

725 Parameters 

726 ---------- 

727 exposure : 

728 The exposure on which to run the driver task. 

729 catalog : 

730 Catalog to be extended by the driver task. If not provided, an 

731 empty catalog will be created and populated. 

732 idGenerator : 

733 Object that generates source IDs and provides random seeds. 

734 

735 Returns 

736 ------- 

737 result : 

738 Results as a struct with attributes: 

739 

740 ``catalog`` 

741 Catalog containing the measured sources 

742 (`~lsst.afw.table.SourceCatalog`). 

743 ``backgroundList`` 

744 List of backgrounds (`list[~lsst.afw.math.Background]`). Only 

745 populated if detection is enabled. 

746 """ 

747 

748 # Validate inputs before proceeding. 

749 self._ensureValidInputs(catalog) 

750 

751 # Prepare the Schema and subtasks for processing. 

752 catalog = self._prepareSchemaAndSubtasks(catalog) 

753 

754 # Generate catalog IDs consistently across subtasks. 

755 if idGenerator is None: 

756 idGenerator = measBase.IdGenerator() 

757 

758 # Scale the variance plane. If enabled, this should be done before 

759 # detection. 

760 if self.config.doScaleVariance: 

761 self._scaleVariance(exposure) 

762 

763 # Detect sources in the image and populate the catalog. 

764 if self.config.doDetect: 

765 catalog, backgroundList = self._detectSources(exposure, idGenerator) 

766 else: 

767 self.log.info("Skipping detection; using detections from the provided catalog") 

768 backgroundList = None 

769 

770 # Deblend detected sources and update the catalog. 

771 if self.config.doDeblend: 

772 catalog = self._deblendSources(exposure, catalog) 

773 else: 

774 self.log.info("Skipping deblending") 

775 

776 # Process catalog through measurement, aperture correction, and catalog 

777 # calculation subtasks. 

778 catalog = self._processCatalog(exposure, catalog, idGenerator) 

779 

780 return pipeBase.Struct(catalog=catalog, backgroundList=backgroundList) 

781 

782 def runFromImage( 

783 self, 

784 image: afwImage.MaskedImage | afwImage.Image | np.ndarray, 

785 mask: afwImage.Mask | np.ndarray = None, 

786 variance: afwImage.Image | np.ndarray = None, 

787 wcs: afwGeom.SkyWcs = None, 

788 psf: afwDetection.Psf | np.ndarray = None, 

789 photoCalib: afwImage.PhotoCalib = None, 

790 catalog: afwTable.SourceCatalog = None, 

791 idGenerator: measBase.IdGenerator = None, 

792 ) -> pipeBase.Struct: 

793 """Convert image data to an `Exposure`, then run it through the 

794 configured subtasks. 

795 

796 Parameters 

797 ---------- 

798 image : 

799 Input image data. Will be converted into an `Exposure` before 

800 processing. 

801 mask : 

802 Mask data for the image. Used if ``image`` is a bare `array` or 

803 `Image`. 

804 variance : 

805 Variance plane data for the image. 

806 wcs : 

807 World Coordinate System to associate with the exposure that will 

808 be created from ``image``. 

809 psf : 

810 PSF model for the exposure. 

811 photoCalib : 

812 Photometric calibration model for the exposure. 

813 catalog : 

814 Catalog to be extended by the driver task. If not provided, a new 

815 catalog will be created during detection and populated. 

816 idGenerator : 

817 Generator for unique source IDs. 

818 

819 Returns 

820 ------- 

821 result : 

822 Results as a struct with attributes: 

823 

824 ``catalog`` 

825 Catalog containing the measured sources 

826 (`~lsst.afw.table.SourceCatalog`). 

827 ``backgroundList`` 

828 List of backgrounds (`list[~lsst.afw.math.Background]`). 

829 """ 

830 # Convert raw image data into an Exposure. 

831 if isinstance(image, np.ndarray): 

832 image = afwImage.makeImageFromArray(image) 

833 if isinstance(mask, np.ndarray): 

834 mask = afwImage.makeMaskFromArray(mask) 

835 if isinstance(variance, np.ndarray): 

836 variance = afwImage.makeImageFromArray(variance) 

837 if isinstance(image, afwImage.Image): 

838 image = afwImage.makeMaskedImage(image, mask, variance) 

839 

840 # By now, the input should already be - or have been converted to - a 

841 # MaskedImage. 

842 if isinstance(image, afwImage.MaskedImage): 

843 exposure = afwImage.makeExposure(image, wcs) 

844 else: 

845 raise TypeError(f"Unsupported 'image' type: {type(image)}") 

846 

847 if psf is not None: 

848 if isinstance(psf, np.ndarray): 

849 # Create a FixedKernel using the array. 

850 psf /= psf.sum() 

851 kernel = afwMath.FixedKernel(afwImage.makeImageFromArray(psf)) 

852 # Create a KernelPsf using the kernel. 

853 psf = afwDetection.KernelPsf(kernel) 

854 elif not isinstance(psf, afwDetection.Psf): 

855 raise TypeError(f"Unsupported 'psf' type: {type(psf)}") 

856 exposure.setPsf(psf) 

857 

858 if photoCalib is not None: 

859 exposure.setPhotoCalib(photoCalib) 

860 

861 return self.run(exposure, catalog=catalog, idGenerator=idGenerator) 

862 

863 def _deblendSources( 

864 self, exposure: afwImage.Exposure, catalog: afwTable.SourceCatalog 

865 ) -> afwTable.SourceCatalog: 

866 """Run single-band deblending given an exposure and a catalog. 

867 

868 Parameters 

869 ---------- 

870 exposure : 

871 Exposure on which to run the deblending algorithm. 

872 catalog : 

873 Catalog containing sources to be deblended. 

874 

875 Returns 

876 ------- 

877 catalog : 

878 Catalog after deblending, with sources separated into their 

879 individual components if they were deblended. 

880 """ 

881 self.log.info(f"Deblending using '{self._Deblender}' on {len(catalog)} detection footprints") 

882 self.deblend.run(exposure=exposure, sources=catalog) 

883 # The deblender may not produce contiguous catalogs; ensure 

884 # contiguity for the subsequent subtasks. 

885 return self._toContiguous(catalog) 

886 

887 

888class MultiBandMeasurementDriverConfig(MeasurementDriverBaseConfig): 

889 """Configuration for the multi-band measurement driver task.""" 

890 

891 deblend = ConfigurableField( 

892 target=scarlet.ScarletDeblendTask, doc="Scarlet deblender for multi-band data" 

893 ) 

894 

895 doConserveFlux = Field[bool]( 

896 doc="Whether to use the deblender models as templates to re-distribute the flux from " 

897 "the 'exposure' (True), or to perform measurements on the deblender model footprints.", 

898 default=False, 

899 ) 

900 

901 measureOnlyInRefBand = Field[bool]( 

902 doc="If True, all measurements downstream of deblending run only in the reference band that " 

903 "was used for detection; otherwise, they are performed in all available bands, generating a " 

904 "catalog for each. Regardless of this setting, deblending still uses all available bands.", 

905 default=False, 

906 ) 

907 

908 removeScarletData = Field[bool]( 

909 doc="Whether or not to remove `ScarletBlendData` for each blend in order to save memory. " 

910 "If set to True, some sources may end up with missing footprints in catalogs other than the " 

911 "reference-band catalog, leading to failures in subsequent measurements that require footprints. " 

912 "For example, keep this False if `measureOnlyInRefBand` is set to False and " 

913 "`measurement.doReplaceWithNoise` to True, in order to make the footprints available in " 

914 "non-reference bands in addition to the reference band.", 

915 default=False, 

916 ) 

917 

918 updateFluxColumns = Field[bool]( 

919 doc="Whether or not to update the `deblend_*` columns in the catalog. This should only be " 

920 "True when the input catalog schema already contains those columns.", 

921 default=True, 

922 ) 

923 

924 

925class MultiBandMeasurementDriverTask(MeasurementDriverBaseTask): 

926 """Mid-level driver for processing multi-band data. 

927 

928 The default behavior is to run detection on the reference band, use all 

929 available bands for deblending, and then process everything downstream 

930 separately for each band making per-band catalogs unless configured 

931 otherwise. This subclass provides functionality for handling a singe-band 

932 exposure and a list of single-band exposures in addition to a standard 

933 multi-band exposure. 

934 

935 Examples 

936 -------- 

937 Here is an example of how to use this class to run variance scaling, 

938 detection, deblending, measurement, and aperture correction on a multi-band 

939 exposure: 

940 

941 >>> from lsst.afw.image import MultibandExposure 

942 >>> from lsst.pipe.tasks.measurementDriver import ( 

943 ... MultiBandMeasurementDriverConfig, 

944 ... MultiBandMeasurementDriverTask, 

945 ... ) 

946 >>> import lsst.meas.extensions.shapeHSM # To register its plugins 

947 >>> config = MultiBandMeasurementDriverConfig() 

948 >>> config.doScaleVariance = True 

949 >>> config.doDetect = True 

950 >>> config.doDeblend = True 

951 >>> config.doMeasure = True 

952 >>> config.doApCorr = True 

953 >>> config.scaleVariance.background.binSize = 64 

954 >>> config.detection.thresholdValue = 5.5 

955 >>> config.deblend.minSNR = 42.0 

956 >>> config.deblend.maxIter = 20 

957 >>> config.measurement.plugins.names |= [ 

958 ... "base_SdssCentroid", 

959 ... "base_SdssShape", 

960 ... "ext_shapeHSM_HsmSourceMoments", 

961 ... ] 

962 >>> config.measurement.slots.psfFlux = None 

963 >>> config.measurement.doReplaceWithNoise = False 

964 >>> config.applyApCorr.doFlagApCorrFailures = False 

965 >>> mExposure = MultibandExposure.fromButler( 

966 ... butler, ["g", "r", "i"], "deepCoadd_calexp", ... 

967 ... ) 

968 >>> driver = MultiBandMeasurementDriverTask(config=config) 

969 >>> results = driver.run(mExposure, "r") 

970 >>> for band, catalog in results.catalogs.items(): 

971 ... catalog.writeFits(f"meas_catalog_{band}.fits") 

972 """ 

973 

974 ConfigClass = MultiBandMeasurementDriverConfig 

975 _DefaultName = "multiBandMeasurementDriver" 

976 _Deblender = "scarlet" 

977 

978 def __init__(self, *args, **kwargs): 

979 super().__init__(*args, **kwargs) 

980 

981 self.deblend: scarlet.ScarletDeblendTask 

982 

983 # Placeholder for the model data produced by the deblender. Caching 

984 # this data has proven be useful for debugging. 

985 self.modelData: scarlet.io.LsstScarletModelData 

986 

987 def run( 

988 self, 

989 mExposure: afwImage.MultibandExposure | list[afwImage.Exposure] | afwImage.Exposure, 

990 mDeconvolved: afwImage.MultibandExposure | list[afwImage.Exposure] | afwImage.Exposure | None = None, 

991 refBand: str | None = None, 

992 bands: list[str] | None = None, 

993 catalog: afwTable.SourceCatalog = None, 

994 idGenerator: measBase.IdGenerator = None, 

995 ) -> pipeBase.Struct: 

996 """Process an exposure through the configured subtasks while using 

997 multi-band information for deblending. 

998 

999 Parameters 

1000 ---------- 

1001 mExposure : 

1002 Multi-band data containing images of the same shape and region of 

1003 the sky. May be a `MultibandExposure`, a single-band exposure 

1004 (i.e., `Exposure`), or a list of single-band exposures associated 

1005 with different bands in which case ``bands`` must be provided. If a 

1006 single-band exposure is given, it will be treated as a 

1007 `MultibandExposure` that contains only that one band whose name may 

1008 be "unknown" unless either ``bands`` or ``refBand`` is provided. 

1009 mDeconvolved : 

1010 Multi-band deconvolved images of the same shape and region of the 

1011 sky. Follows the same type conventions as ``mExposure``. If not 

1012 provided, the deblender will run the deconvolution internally 

1013 using the provided ``mExposure``. 

1014 refBand : 

1015 Reference band to use for detection. Not required for single-band 

1016 exposures. If `measureOnlyInRefBand` is enabled while detection is 

1017 disabled and a catalog of detected sources is provided, this 

1018 should specify the band the sources were detected on (or the band 

1019 you want to use to perform measurements on exclusively). If 

1020 `measureOnlyInRefBand` is disabled instead in the latter scenario, 

1021 ``refBand`` does not need to be provided. 

1022 bands : 

1023 List of bands associated with the exposures in ``mExposure``. Only 

1024 required if ``mExposure`` is a list of single-band exposures. If 

1025 provided for a multi-band exposure, it will be used to only process 

1026 that subset of bands from the available ones in the exposure. 

1027 catalog : 

1028 Catalog to be extended by the driver task. If not provided, a new 

1029 catalog will be created and populated. 

1030 idGenerator : 

1031 Generator for unique source IDs. 

1032 

1033 Returns 

1034 ------- 

1035 result : 

1036 Results as a struct with attributes: 

1037 

1038 ``catalogs`` 

1039 Dictionary of catalogs containing the measured sources with 

1040 bands as keys (`dict[str, ~lsst.afw.table.SourceCatalog]`). If 

1041 `measureOnlyInRefBand` is enabled or deblending is disabled, 

1042 this will only contain the reference-band catalog; otherwise, 

1043 it will contain a catalog for each band. 

1044 ``backgroundList`` 

1045 List of backgrounds (`list[~lsst.afw.math.Background]`). Will 

1046 be None if detection is disabled. 

1047 ``modelData`` 

1048 Multiband scarlet models produced during deblending 

1049 (`~scarlet.io.LsstScarletModelData`). Will be None if 

1050 deblending is disabled. 

1051 """ 

1052 

1053 # Validate inputs and adjust them as necessary. 

1054 mExposure, mDeconvolved, refBand, bands = self._ensureValidInputs( 

1055 mExposure, mDeconvolved, refBand, bands, catalog 

1056 ) 

1057 

1058 # Prepare the Schema and subtasks for processing. 

1059 catalog = self._prepareSchemaAndSubtasks(catalog) 

1060 

1061 # Generate catalog IDs consistently across subtasks. 

1062 if idGenerator is None: 

1063 idGenerator = measBase.IdGenerator() 

1064 

1065 # Scale the variance plane. If enabled, this should be done before 

1066 # detection. 

1067 if self.config.doScaleVariance: 

1068 # Here, we iterate over references to the exposures, not copies. 

1069 for band in mExposure.bands: 

1070 self._scaleVariance(mExposure[band], band=f"'{band}'") 

1071 

1072 # Detect sources in the reference band and populate the catalog. 

1073 if self.config.doDetect: 

1074 catalog, backgroundList = self._detectSources(mExposure[refBand], idGenerator) 

1075 else: 

1076 self.log.info("Skipping detection; using detections from provided catalog") 

1077 backgroundList = None 

1078 

1079 # Deblend detected sources and update the catalog(s). 

1080 if self.config.doDeblend: 

1081 catalogs, self.modelData = self._deblendSources(mExposure, mDeconvolved, catalog, refBand=refBand) 

1082 else: 

1083 self.log.warning( 

1084 "Skipping deblending; proceeding with the provided catalog in the reference band" 

1085 ) 

1086 catalogs = {refBand: catalog} 

1087 self.modelData = None 

1088 

1089 # Process catalog(s) through measurement, aperture correction, and 

1090 # catalog calculation subtasks. 

1091 for band, catalog in catalogs.items(): 

1092 exposure = mExposure[band] 

1093 self._processCatalog(exposure, catalog, idGenerator, band=f"'{band}'") 

1094 

1095 return pipeBase.Struct(catalogs=catalogs, backgroundList=backgroundList, modelData=self.modelData) 

1096 

1097 def _ensureValidInputs( 

1098 self, 

1099 mExposure: afwImage.MultibandExposure | list[afwImage.Exposure] | afwImage.Exposure, 

1100 mDeconvolved: afwImage.MultibandExposure | list[afwImage.Exposure] | afwImage.Exposure | None, 

1101 refBand: str | None, 

1102 bands: list[str] | None, 

1103 catalog: afwTable.SourceCatalog | None = None, 

1104 ) -> tuple[afwImage.MultibandExposure, afwImage.MultibandExposure, str, list[str] | None]: 

1105 """Perform validation and adjustments of inputs without heavy 

1106 computation. 

1107 

1108 Parameters 

1109 ---------- 

1110 mExposure : 

1111 Multi-band data to be processed by the driver task. 

1112 mDeconvolved : 

1113 Multi-band deconvolved data to be processed by the driver task. 

1114 refBand : 

1115 Reference band to use for detection. 

1116 bands : 

1117 List of bands associated with the exposures in ``mExposure``. 

1118 catalog : 

1119 Catalog to be extended by the driver task. 

1120 

1121 Returns 

1122 ------- 

1123 mExposure : 

1124 Multi-band exposure to be processed by the driver task. If the 

1125 input was not already a `MultibandExposure` (optionally with the 

1126 relevant ``bands``), it is converted into one and returned 

1127 here; otherwise, the original input is returned unchanged. 

1128 mDeconvolved : 

1129 Multi-band deconvolved exposure to be processed by the driver task. 

1130 Same adjustments apply as for ``mExposure`` except that it is 

1131 optional and may be returned as None if not provided as input. 

1132 refBand : 

1133 Reference band to use for detection after potential adjustments. 

1134 If not provided in the input, and only one band is set to be 

1135 processed, ``refBand`` will be chosen to be the only existing band 

1136 in the ``bands`` list, or `mExposure.bands`, and if neither is 

1137 provided, it will be set to "unknown" for single-band exposures 

1138 processed by this multi-band driver. 

1139 bands : 

1140 List of bands associated with the exposures in ``mExposure`` after 

1141 potential adjustments. If not provided in the input, it will be set 

1142 to a list containing only the provided (or inferred as "unknown") 

1143 ``refBand`` value. 

1144 """ 

1145 

1146 # Perform basic checks that are shared with all driver tasks. 

1147 super()._ensureValidInputs(catalog) 

1148 

1149 # Multi-band-specific validation and adjustments. 

1150 

1151 if bands is not None: 

1152 if not isinstance(bands, list): 

1153 raise TypeError(f"Expected 'bands' to be a list, got {type(bands)}") 

1154 if not all(isinstance(b, str) for b in bands): 

1155 raise TypeError(f"All elements in 'bands' must be strings, got {[type(b) for b in bands]}") 

1156 

1157 if refBand is not None: 

1158 if not isinstance(refBand, str): 

1159 raise TypeError(f"Reference band must be a string, got {type(refBand)}") 

1160 

1161 # Validate mExposure. 

1162 if isinstance(mExposure, afwImage.MultibandExposure): 

1163 if bands is not None: 

1164 if any(b not in mExposure.bands for b in bands): 

1165 raise ValueError( 

1166 f"Some of the provided {bands=} are not present in {mExposure.bands=}" 

1167 ) 

1168 self.log.info( 

1169 f"Using {bands=} out of the available {mExposure.bands=} in the multi-band exposures" 

1170 ) 

1171 elif isinstance(mExposure, list): 

1172 if bands is None: 

1173 raise ValueError("The 'bands' list must be provided if 'mExposure' is a list") 

1174 if len(bands) != len(mExposure): 

1175 raise ValueError("Number of bands and exposures must match") 

1176 elif isinstance(mExposure, afwImage.Exposure): 

1177 if bands is not None: 

1178 if len(bands) != 1: 

1179 raise ValueError( 

1180 f"{bands=}, if provided, must only contain a single band " 

1181 "if 'mExposure' is a single-band exposure" 

1182 ) 

1183 else: 

1184 raise TypeError(f"Unsupported 'mExposure' type: {type(mExposure)}") 

1185 

1186 # Validate mDeconvolved. 

1187 if mDeconvolved: 

1188 if isinstance(mDeconvolved, afwImage.MultibandExposure): 

1189 if bands is not None: 

1190 if any(b not in mDeconvolved.bands for b in bands): 

1191 raise ValueError( 

1192 f"Some of the provided {bands=} are not present in {mDeconvolved.bands=}" 

1193 ) 

1194 elif isinstance(mDeconvolved, list): 

1195 if bands is None: 

1196 raise ValueError("The 'bands' list must be provided if 'mDeconvolved' is a list") 

1197 if len(bands) != len(mDeconvolved): 

1198 raise ValueError("Number of bands and deconvolved exposures must match") 

1199 elif isinstance(mDeconvolved, afwImage.Exposure): 

1200 if bands is not None: 

1201 if len(bands) != 1: 

1202 raise ValueError( 

1203 f"{bands=}, if provided, must only contain a single band " 

1204 "if 'mDeconvolved' is a single-band exposure" 

1205 ) 

1206 else: 

1207 raise TypeError(f"Unsupported {type(mDeconvolved)=}") 

1208 

1209 if isinstance(mExposure, afwImage.Exposure) or isinstance(mDeconvolved, afwImage.Exposure): 

1210 if bands is not None and len(bands) != 1: 

1211 raise ValueError( 

1212 f"{bands=}, if provided, must only contain a single band " 

1213 "if one of 'mExposure' or 'mDeconvolved' is a single-band exposure" 

1214 ) 

1215 if bands is None and refBand is None: 

1216 refBand = "unknown" # Placeholder for single-band deblending 

1217 bands = [refBand] 

1218 elif bands is None and refBand is not None: 

1219 bands = [refBand] 

1220 elif bands is not None and refBand is None: 

1221 refBand = bands[0] 

1222 

1223 # Convert or subset the exposures to a MultibandExposure with the 

1224 # bands of interest. 

1225 mExposure = self._buildMultibandExposure(mExposure, bands) 

1226 

1227 if mDeconvolved: 

1228 mDeconvolved = self._buildMultibandExposure(mDeconvolved, bands) 

1229 if mExposure.bands != mDeconvolved.bands: 

1230 raise ValueError( 

1231 "The bands in 'mExposure' and 'mDeconvolved' must match; " 

1232 f"got {mExposure.bands} and {mDeconvolved.bands}" 

1233 ) 

1234 

1235 if len(mExposure.bands) == 1: 

1236 # N.B. Scarlet is designed to leverage multi-band information to 

1237 # differentiate overlapping sources based on their spectral and 

1238 # spatial profiles. However, it can also run on a single band and 

1239 # often give better results than 'meas_deblender'. 

1240 self.log.info(f"Running '{self._Deblender}' in single-band mode; make sure it was intended!") 

1241 if refBand is None: 

1242 refBand = mExposure.bands[0] 

1243 self.log.info( 

1244 "No reference band provided for single-band data; " 

1245 f"using the only available band ('{refBand}') as the reference band" 

1246 ) 

1247 else: 

1248 if catalog is None: 

1249 if self.config.measureOnlyInRefBand: 

1250 measInfo = "and everything downstream of deblending" 

1251 else: 

1252 measInfo = ( 

1253 "while subtasks downstream of deblending will be run in each of " 

1254 f"the {mExposure.bands} bands" 

1255 ) 

1256 self.log.info(f"Using '{refBand}' as the reference band for detection {measInfo}") 

1257 

1258 # Final sanity checks after all the adjustments above. 

1259 if refBand is None: 

1260 raise ValueError("Reference band must be provided for multi-band data") 

1261 

1262 if refBand not in mExposure.bands: 

1263 raise ValueError(f"Requested {refBand=} is not in {mExposure.bands=}") 

1264 

1265 if bands is not None and refBand not in bands: 

1266 raise ValueError(f"Reference {refBand=} is not in {bands=}") 

1267 

1268 return mExposure, mDeconvolved, refBand, bands 

1269 

1270 def _deblendSources( 

1271 self, 

1272 mExposure: afwImage.MultibandExposure, 

1273 mDeconvolved: afwImage.MultibandExposure | None, 

1274 catalog: afwTable.SourceCatalog, 

1275 refBand: str, 

1276 ) -> tuple[dict[str, afwTable.SourceCatalog], scarlet.io.LsstScarletModelData]: 

1277 """Run multi-band deblending given a multi-band exposure and a catalog. 

1278 

1279 Parameters 

1280 ---------- 

1281 mExposure : 

1282 Multi-band exposure on which to run the deblending algorithm. 

1283 mDeconvolved : 

1284 Multi-band deconvolved exposure to use for deblending. If None, 

1285 the deblender will create it internally using the provided 

1286 ``mExposure``. 

1287 catalog : 

1288 Catalog containing sources to be deblended. 

1289 refBand : 

1290 Reference band used for detection or the band to use for 

1291 measurements if `measureOnlyInRefBand` is enabled. 

1292 

1293 Returns 

1294 ------- 

1295 catalogs : 

1296 Dictionary of catalogs containing the deblended sources. If 

1297 `measureOnlyInRefBand` is enabled, this will only contain the 

1298 reference-band catalog; otherwise, it will contain a catalog for 

1299 each band. 

1300 modelData : 

1301 Multiband scarlet models produced during deblending. 

1302 """ 

1303 self.log.info(f"Deblending using '{self._Deblender}' on {len(catalog)} detection footprints") 

1304 

1305 if mDeconvolved is None: 

1306 # Make a deconvolved version of the multi-band exposure. 

1307 deconvolvedCoadds = [] 

1308 deconvolveTask = DeconvolveExposureTask() 

1309 for coadd in mExposure: 

1310 deconvolvedCoadd = deconvolveTask.run(coadd, catalog).deconvolved 

1311 deconvolvedCoadds.append(deconvolvedCoadd) 

1312 mDeconvolved = afwImage.MultibandExposure.fromExposures(mExposure.bands, deconvolvedCoadds) 

1313 

1314 # Run the deblender on the multi-band exposure. 

1315 result = self.deblend.run(mExposure, mDeconvolved, catalog) 

1316 catalog = result.deblendedCatalog 

1317 modelData = result.scarletModelData 

1318 

1319 # Determine which bands to process post-deblending. 

1320 bands = [refBand] if self.config.measureOnlyInRefBand else mExposure.bands 

1321 

1322 catalogs = {band: catalog.copy(deep=True) for band in bands} 

1323 for band in bands: 

1324 # The footprints need to be updated for the subsequent measurement. 

1325 imageForRedistribution = mExposure[band] if self.config.doConserveFlux else None 

1326 scarlet.io.updateCatalogFootprints( 

1327 modelData=modelData, 

1328 catalog=catalogs[band], # In-place modification 

1329 band=band, 

1330 imageForRedistribution=imageForRedistribution, 

1331 removeScarletData=self.config.removeScarletData, 

1332 updateFluxColumns=self.config.updateFluxColumns, 

1333 ) 

1334 

1335 return self._toContiguous(catalogs), modelData 

1336 

1337 def _buildMultibandExposure( 

1338 self, 

1339 mExposureData: afwImage.MultibandExposure | list[afwImage.Exposure] | afwImage.Exposure, 

1340 bands: list[str] | None, 

1341 ) -> afwImage.MultibandExposure: 

1342 """Convert a single-band exposure or a list of single-band exposures to 

1343 a `MultibandExposure` if not already of that type. 

1344 

1345 No conversion will be done if ``mExposureData`` is already a 

1346 `MultibandExposure` except it will be subsetted to the bands provided. 

1347 

1348 Parameters 

1349 ---------- 

1350 mExposureData : 

1351 Input multi-band data. 

1352 bands : 

1353 List of bands associated with the exposures in ``mExposure``. Only 

1354 required if ``mExposure`` is a list of single-band exposures. If 

1355 provided while ``mExposureData`` is a ``MultibandExposure``, it 

1356 will be used to select a specific subset of bands from the 

1357 available ones. 

1358 

1359 Returns 

1360 ------- 

1361 mExposure : 

1362 Converted multi-band exposure. 

1363 """ 

1364 if isinstance(mExposureData, afwImage.MultibandExposure): 

1365 if bands and not set(bands).issubset(mExposureData.bands): 

1366 raise ValueError( 

1367 f"Requested bands {bands} are not a subset of available bands: {mExposureData.bands}" 

1368 ) 

1369 return mExposureData[bands,] if bands and len(bands) > 1 else mExposureData 

1370 elif isinstance(mExposureData, list): 

1371 mExposure = afwImage.MultibandExposure.fromExposures(bands, mExposureData) 

1372 elif isinstance(mExposureData, afwImage.Exposure): 

1373 # We still need to build a multi-band exposure to satisfy scarlet 

1374 # function's signature, even when using a single band. 

1375 mExposure = afwImage.MultibandExposure.fromExposures(bands, [mExposureData]) 

1376 

1377 # Attach the WCS from each input exposure to the corresponding band of 

1378 # the multi-band exposure; otherwise, their WCS will be None, 

1379 # potentially causing issues downstream. Note that afwImage does not do 

1380 # this when constructing a MultibandExposure from exposures. 

1381 for band, exposure in zip(bands, mExposureData): 

1382 mExposure[band].setWcs(exposure.getWcs()) 

1383 

1384 return mExposure 

1385 

1386 

1387class ForcedMeasurementDriverConfig(SingleBandMeasurementDriverConfig): 

1388 """Configuration for the forced measurement driver task.""" 

1389 

1390 measurement = ConfigurableField( 

1391 target=measBase.ForcedMeasurementTask, 

1392 doc="Measurement task for forced measurements. This should be a " 

1393 "measurement task that does not perform detection.", 

1394 ) 

1395 

1396 def setDefaults(self): 

1397 """Set default values for the configuration. 

1398 

1399 This method overrides the base class method to ensure that `doDetect` 

1400 is set to `False` by default, as this task is intended for forced 

1401 measurements where detection is not performed. Also, it sets some 

1402 default measurement plugins by default. 

1403 """ 

1404 super().setDefaults() 

1405 self.doDetect = False 

1406 self.doDeblend = False 

1407 self.doMeasure = True 

1408 self.measurement.plugins.names = [ 

1409 "base_PixelFlags", 

1410 "base_TransformedCentroidFromCoord", 

1411 "base_PsfFlux", 

1412 "base_CircularApertureFlux", 

1413 ] 

1414 

1415 def _validate(self): 

1416 """Validate the configuration. 

1417 

1418 This method overrides the base class validation to ensure that 

1419 `doDetect` is set to `False`, as this task is intended for forced 

1420 measurements where detection is not performed. 

1421 """ 

1422 super()._validate() 

1423 if self.doDetect or self.doDeblend: 

1424 raise ValueError( 

1425 "ForcedMeasurementDriverTask should not perform detection or " 

1426 "deblending; set doDetect=False and doDeblend=False" 

1427 ) 

1428 if not self.doMeasure: 

1429 raise ValueError("ForcedMeasurementDriverTask must perform measurements; set doMeasure=True") 

1430 

1431 

1432class ForcedMeasurementDriverTask(SingleBandMeasurementDriverTask): 

1433 """Forced measurement driver task for single-band data. 

1434 

1435 This task is the 'forced' version of the `SingleBandMeasurementDriverTask`, 

1436 intended as a convenience function for performing forced photometry on an 

1437 input image given a set of IDs and RA/Dec coordinates. It is designed as a 

1438 public-facing interface, allowing users to measure sources without 

1439 explicitly instantiating and running pipeline tasks. 

1440 

1441 Examples 

1442 -------- 

1443 Here is an example of how to use this class to run forced measurements on 

1444 an exposure using an Astropy table containing source IDs and RA/Dec 

1445 coordinates: 

1446 

1447 >>> from lsst.pipe.tasks.measurementDriver import ( 

1448 ... ForcedMeasurementDriverConfig, 

1449 ... ForcedMeasurementDriverTask, 

1450 ... ) 

1451 >>> import astropy.table 

1452 >>> import lsst.afw.image as afwImage 

1453 >>> config = ForcedMeasurementDriverConfig() 

1454 >>> config.doScaleVariance = True 

1455 >>> config.scaleVariance.background.binSize = 32 

1456 >>> config.doApCorr = True 

1457 >>> config.measurement.plugins.names = [ 

1458 ... "base_PixelFlags", 

1459 ... "base_TransformedCentroidFromCoord", 

1460 ... "base_PsfFlux", 

1461 ... "base_CircularApertureFlux", 

1462 ... ] 

1463 >>> config.measurement.slots.psfFlux = "base_PsfFlux" 

1464 >>> config.measurement.slots.centroid = "base_TransformedCentroidFromCoord" 

1465 >>> config.measurement.slots.shape = None 

1466 >>> config.measurement.doReplaceWithNoise = False 

1467 >>> calexp = butler.get("deepCoadd_calexp", dataId=...) 

1468 >>> objtable = butler.get( 

1469 ... "objectTable", dataId=..., storageClass="ArrowAstropy" 

1470 ... ) 

1471 >>> table = objtable[:5].copy()["objectId", "coord_ra", "coord_dec"] 

1472 >>> driver = ForcedMeasurementDriverTask(config=config) 

1473 >>> results = driver.runFromAstropy( 

1474 ... table, 

1475 ... calexp, 

1476 ... id_column_name="objectId", 

1477 ... ra_column_name="coord_ra", 

1478 ... dec_column_name="coord_dec", 

1479 ... psf_footprint_scaling=3.0, 

1480 ... ) 

1481 >>> results.writeFits("forced_meas_catalog.fits") 

1482 """ 

1483 

1484 ConfigClass = ForcedMeasurementDriverConfig 

1485 _DefaultName = "forcedMeasurementDriver" 

1486 

1487 def __init__(self, *args, **kwargs): 

1488 """Initialize the forced measurement driver task.""" 

1489 super().__init__(*args, **kwargs) 

1490 

1491 self.measurement: measBase.ForcedMeasurementTask # To be created! 

1492 

1493 def runFromAstropy( 

1494 self, 

1495 table: astropy.table.Table, 

1496 exposure: afwImage.Exposure, 

1497 *, 

1498 id_column_name: str = "objectId", 

1499 ra_column_name: str = "coord_ra", 

1500 dec_column_name: str = "coord_dec", 

1501 psf_footprint_scaling: float = 3.0, 

1502 idGenerator: measBase.IdGenerator | None = None, 

1503 ) -> astropy.table.Table: 

1504 """Run forced measurements on an exposure using an Astropy table. 

1505 

1506 Parameters 

1507 ---------- 

1508 table : 

1509 Astropy table containing source IDs and RA/Dec coordinates. 

1510 Must contain columns with names specified by `id_column_name`, 

1511 `ra_column_name`, and `dec_column_name`. 

1512 exposure : 

1513 Exposure on which to run the forced measurements. 

1514 id_column_name : 

1515 Name of the column containing source IDs in the table. 

1516 ra_column_name : 

1517 Name of the column containing RA coordinates in the table. 

1518 dec_column_name : 

1519 Name of the column containing Dec coordinates in the table. 

1520 psf_footprint_scaling : 

1521 Scaling factor to apply to the PSF second-moments ellipse in order 

1522 to determine the footprint boundary. 

1523 idGenerator : 

1524 Object that generates source IDs and provides random seeds. 

1525 If not provided, a new `IdGenerator` will be created. 

1526 

1527 Returns 

1528 ------- 

1529 result : 

1530 Astropy table containing the measured sources with columns 

1531 corresponding to the source IDs, RA, Dec, from the input table, and 

1532 additional measurement columns defined in the configuration. 

1533 """ 

1534 # Validate inputs before proceeding. 

1535 coord_unit = self._ensureValidInputs(table, exposure, id_column_name, ra_column_name, dec_column_name) 

1536 

1537 # Generate catalog IDs consistently across subtasks. 

1538 if idGenerator is None: 

1539 idGenerator = measBase.IdGenerator() 

1540 

1541 # Get the WCS from the exposure asnd use it as the reference WCS. 

1542 refWcs = exposure.getWcs() 

1543 

1544 # Prepare the Schema and subtasks for processing. No catalog is 

1545 # provided here, as we will generate it from the reference catalog. 

1546 self._prepareSchemaAndSubtasks(catalog=None) 

1547 

1548 # Convert the Astropy table to a minimal source catalog. 

1549 # This must be done *after* `_prepareSchemaAndSubtasks`, or the schema 

1550 # won't be set up correctly. 

1551 refCat = self._makeMinimalSourceCatalogFromAstropy( 

1552 table, columns=[id_column_name, ra_column_name, dec_column_name], coord_unit=coord_unit 

1553 ) 

1554 

1555 # Check whether coords are within the image. 

1556 bbox = exposure.getBBox() 

1557 for record in refCat: 

1558 localPoint = refWcs.skyToPixel(record.getCoord()) 

1559 localIntPoint = lsst.geom.Point2I(localPoint) 

1560 assert bbox.contains(localIntPoint), ( 

1561 f"Center for record {record.getId()} is not in exposure; this should be guaranteed by " 

1562 "generateMeasCat." 

1563 ) 

1564 

1565 # Scale the variance plane. 

1566 if self.config.doScaleVariance: 

1567 self._scaleVariance(exposure) 

1568 

1569 # Generate the measurement catalog from the reference catalog. 

1570 # The `exposure` and `wcs` arguments will not actually be used by the 

1571 # call below, but we need to pass it to satisfy the interface. 

1572 catalog = self.measurement.generateMeasCat( 

1573 exposure, refCat, refWcs, idFactory=idGenerator.make_table_id_factory() 

1574 ) 

1575 

1576 # Forced measurement uses a provided catalog, so detection was skipped 

1577 # and no footprints exist. We therefore resort to approximate 

1578 # footprints by scaling the PSF's second-moments ellipse. 

1579 self.measurement.attachPsfShapeFootprints(catalog, exposure, scaling=psf_footprint_scaling) 

1580 

1581 # Process catalog through measurement, aperture correction, and catalog 

1582 # calculation subtasks. 

1583 catalog = self._processCatalog(exposure, catalog, idGenerator, refCat=refCat) 

1584 

1585 # Convert the catalog back to an Astropy table. 

1586 result = catalog.asAstropy() 

1587 

1588 # Clean up: 'id' may confuse users since 'objectId' is the expected 

1589 # identifier. 

1590 del result["id"] 

1591 

1592 return result 

1593 

1594 def run(self, *args, **kwargs): 

1595 raise NotImplementedError( 

1596 "The run method is not implemented for `ForcedMeasurementDriverTask`. " 

1597 "Use `runFromAstropy` instead." 

1598 ) 

1599 

1600 def runFromImage(self, *args, **kwargs): 

1601 raise NotImplementedError( 

1602 "The `runFromImage` method is not implemented for `ForcedMeasurementDriverTask`. " 

1603 "Use `runFromAstropy` instead." 

1604 ) 

1605 

1606 def _ensureValidInputs( 

1607 self, 

1608 table: astropy.table.Table, 

1609 exposure: afwImage.Exposure, 

1610 id_column_name: str, 

1611 ra_column_name: str, 

1612 dec_column_name: str, 

1613 ) -> None: 

1614 """Validate the inputs for the forced measurement task. 

1615 

1616 Parameters 

1617 ---------- 

1618 table : 

1619 Astropy table containing source IDs and RA/Dec coordinates. 

1620 exposure : 

1621 Exposure on which to run the forced measurements. 

1622 id_column_name : 

1623 Name of the column containing source IDs in the table. 

1624 ra_column_name : 

1625 Name of the column containing RA coordinates in the table. 

1626 dec_column_name : 

1627 Name of the column containing Dec coordinates in the table. 

1628 

1629 Returns 

1630 ------- 

1631 coord_unit : `str` 

1632 Unit of the sky coordinates extracted from the table. 

1633 """ 

1634 if not isinstance(table, astropy.table.Table): 

1635 raise TypeError(f"Expected 'table' to be an astropy Table, got {type(table)}") 

1636 

1637 if table[ra_column_name].unit == table[dec_column_name].unit: 

1638 if table[ra_column_name].unit == astropy.units.deg: 

1639 coord_unit = "degrees" 

1640 elif table[ra_column_name].unit == astropy.units.rad: 

1641 coord_unit = "radians" 

1642 else: 

1643 # Fallback if it's something else. 

1644 coord_unit = str(table[ra_column_name].unit) 

1645 else: 

1646 raise ValueError("RA and Dec columns must have the same unit") 

1647 

1648 if not isinstance(exposure, afwImage.Exposure): 

1649 raise TypeError(f"Expected 'exposure' to be an Exposure, got {type(exposure)}") 

1650 

1651 for col in [id_column_name, ra_column_name, dec_column_name]: 

1652 if col not in table.colnames: 

1653 raise ValueError(f"Column '{col}' not found in the input table") 

1654 

1655 return coord_unit 

1656 

1657 def _makeMinimalSourceCatalogFromAstropy( 

1658 self, 

1659 table: astropy.table.Table, 

1660 columns: list[str] = ["id", "ra", "dec"], 

1661 coord_unit: str = "degrees", 

1662 ): 

1663 """Convert an Astropy Table to a minimal LSST SourceCatalog. 

1664 

1665 This is intended for use with the forced measurement subtask, which 

1666 expects a `SourceCatalog` input with a minimal schema containing `id`, 

1667 `ra`, and `dec`. 

1668 

1669 Parameters 

1670 ---------- 

1671 table : 

1672 Astropy Table containing source IDs and sky coordinates. 

1673 columns : 

1674 Names of the columns in the order [id, ra, dec], where `ra` and 

1675 `dec` are in degrees by default. If the coordinates are in radians, 

1676 set `coord_unit` to "radians". 

1677 coord_unit : `str` 

1678 Unit of the sky coordinates. Can be either "degrees" or "radians". 

1679 

1680 Returns 

1681 ------- 

1682 outputCatalog : `lsst.afw.table.SourceCatalog` 

1683 A SourceCatalog with minimal schema populated from the input table. 

1684 

1685 Raises 

1686 ------ 

1687 ValueError 

1688 If `coord_unit` is not "degrees" or "radians". 

1689 If `columns` does not contain exactly 3 items. 

1690 KeyError 

1691 If any of the specified columns are missing from the input table. 

1692 """ 

1693 # TODO: Open a meas_base ticket to make this function pay attention to 

1694 # the configs, and move this from being a Task method to a free 

1695 # function that takes column names as args. 

1696 

1697 if coord_unit not in ["degrees", "radians"]: 

1698 raise ValueError(f"Invalid coordinate unit '{coord_unit}'; must be 'degrees' or 'radians'") 

1699 

1700 if len(columns) != 3: 

1701 raise ValueError("`columns` must contain exactly three elements for [id, ra, dec]") 

1702 

1703 idCol, raCol, decCol = columns 

1704 

1705 for col in columns: 

1706 if col not in table.colnames: 

1707 raise KeyError(f"Missing required column: '{col}'") 

1708 

1709 outputCatalog = lsst.afw.table.SourceCatalog(self.schema) 

1710 outputCatalog.reserve(len(table)) 

1711 

1712 for row in table: 

1713 outputRecord = outputCatalog.addNew() 

1714 outputRecord.setId(row[idCol]) 

1715 outputRecord.setCoord( 

1716 lsst.geom.SpherePoint(row[raCol], row[decCol], getattr(lsst.geom, coord_unit)) 

1717 ) 

1718 

1719 return outputCatalog