22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
31import astropy.table
as tb
39import lsst.pipe.base
as pipeBase
43from .coaddBase
import makeSkyInfo
44from .schemaUtils
import convertDataFrameToSdmSchema, readSdmSchemaFile
45from .simpleAssociation
import SimpleAssociationTask
49 dimensions=(
"tract",
"patch",
"skymap"),
50 defaultTemplates={
"coaddName":
"deep",
53 diaSourceTables = pipeBase.connectionTypes.Input(
54 doc=
"Set of catalogs of calibrated DiaSources.",
55 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
56 storageClass=
"ArrowAstropy",
57 dimensions=(
"instrument",
"visit",
"detector"),
61 ssObjectTableRefs = pipeBase.connectionTypes.Input(
62 doc=
"Reference to catalogs of SolarSolarSystem objects expected to be "
63 "observable in each (visit, detector).",
64 name=
"preloaded_ss_object_visit",
65 storageClass=
"ArrowAstropy",
66 dimensions=(
"instrument",
"visit"),
71 skyMap = pipeBase.connectionTypes.Input(
72 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
74 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
75 storageClass=
"SkyMap",
76 dimensions=(
"skymap", ),
78 finalVisitSummaryRefs = pipeBase.connectionTypes.Input(
79 doc=
"Reference to finalVisitSummary of each exposure, containing "
80 "visitInfo, bbox, and wcs.",
81 name=
"finalVisitSummary",
82 storageClass=
"ExposureCatalog",
83 dimensions=(
"instrument",
"visit"),
87 assocDiaSourceTable = pipeBase.connectionTypes.Output(
88 doc=
"Catalog of DiaSources covering the patch and associated with a "
90 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
91 storageClass=
"DataFrame",
92 dimensions=(
"tract",
"patch"),
94 associatedSsSources = pipeBase.connectionTypes.Output(
95 doc=
"Optional output storing ssSource data computed during association.",
96 name=
"{fakesType}{coaddName}Diff_assocSsSrcTable",
97 storageClass=
"ArrowAstropy",
98 dimensions=(
"tract",
"patch"),
100 unassociatedSsObjects = pipeBase.connectionTypes.Output(
101 doc=
"Expected locations of ssObjects with no associated source.",
102 name=
"{fakesType}{coaddName}Diff_unassocSsObjTable",
103 storageClass=
"ArrowAstropy",
104 dimensions=(
"tract",
"patch"),
106 diaObjectTable = pipeBase.connectionTypes.Output(
107 doc=
"Catalog of DiaObjects created from spatially associating "
109 name=
"{fakesType}{coaddName}Diff_diaObjTable",
110 storageClass=
"DataFrame",
111 dimensions=(
"tract",
"patch"),
114 def __init__(self, *, config=None):
115 super().__init__(config=config)
117 if not config.doSolarSystemAssociation:
118 del self.ssObjectTableRefs
119 del self.associatedSsSources
120 del self.unassociatedSsObjects
121 del self.finalVisitSummaryRefs
124class DrpAssociationPipeConfig(
125 pipeBase.PipelineTaskConfig,
126 pipelineConnections=DrpAssociationPipeConnections):
127 associator = pexConfig.ConfigurableField(
128 target=SimpleAssociationTask,
129 doc=
"Task used to associate DiaSources with DiaObjects.",
131 solarSystemAssociator = pexConfig.ConfigurableField(
132 target=SolarSystemAssociationTask,
133 doc=
"Task used to associate DiaSources with SolarSystemObjects.",
135 doAddDiaObjectCoords = pexConfig.Field(
138 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
139 "Duplicates information, but needed for bulk ingest into qserv."
141 doWriteEmptyTables = pexConfig.Field(
144 doc=
"If True, construct and write out empty diaSource and diaObject "
145 "tables. If False, raise NoWorkFound"
147 doSolarSystemAssociation = pexConfig.Field(
150 doc=
"Process SolarSystem objects through the pipeline.",
152 doUseSchema = pexConfig.Field(
155 doc=
"Use an existing schema to coerce the data types of the output columns."
157 schemaDir = pexConfig.Field(
159 doc=
"Path to the directory containing schema definitions.",
160 default=os.path.join(
"${SDM_SCHEMAS_DIR}",
163 schemaFile = pexConfig.Field(
165 doc=
"Yaml file specifying the schema of the output catalog.",
166 default=
"lsstcam.yaml",
168 idGenerator = SkyMapIdGeneratorConfig.make_field()
171class DrpAssociationPipeTask(pipeBase.PipelineTask):
172 """Driver pipeline for loading DiaSource catalogs in a patch/tract
173 region and associating them.
175 ConfigClass = DrpAssociationPipeConfig
176 _DefaultName =
"drpAssociation"
178 def __init__(self, **kwargs):
179 super().__init__(**kwargs)
180 self.makeSubtask(
'associator')
182 if self.config.doUseSchema:
183 schemaFile = os.path.join(self.config.schemaDir, self.config.schemaFile)
184 self.schema = readSdmSchemaFile(schemaFile)
187 if self.config.doSolarSystemAssociation:
188 self.makeSubtask(
"solarSystemAssociator")
190 def runQuantum(self, butlerQC, inputRefs, outputRefs):
191 inputs = butlerQC.get(inputRefs)
193 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
194 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
195 inputs[
"idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
196 if not self.config.doSolarSystemAssociation:
197 inputs[
"ssObjectTableRefs"] = []
198 inputs[
"finalVisitSummaryRefs"] = []
199 outputs = self.run(**inputs)
200 butlerQC.put(outputs, outputRefs)
206 finalVisitSummaryRefs,
210 """Trim DiaSources to the current Patch and run association.
212 Takes in the set of DiaSource catalogs that covers the current patch,
213 trims them to the dimensions of the patch, and [TODO: eventually]
214 runs association on the concatenated DiaSource Catalog.
218 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
219 Set of DiaSource catalogs potentially covering this patch/tract.
220 ssObjectTableRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
221 Set of known SSO ephemerides potentially covering this patch/tract.
222 skyMap : `lsst.skymap.BaseSkyMap`
223 SkyMap defining the patch/tract
224 finalVisitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
225 Reference to finalVisitSummary of each exposure potentially
226 covering this patch/tract, which contain visitInfo, bbox, and wcs
228 Id of current tract being processed.
230 Id of current patch being processed.
231 idGenerator : `lsst.meas.base.IdGenerator`, optional
232 Object that generates Object IDs and random number generator seeds.
236 output : `lsst.pipe.base.Struct`
237 Results struct with attributes:
239 ``assocDiaSourceTable``
240 Table of DiaSources with updated value for diaObjectId.
243 Table of DiaObjects from matching DiaSources
244 (`pandas.DataFrame`).
246 self.log.info(
"Running DPR Association on patch %i, tract %i...",
249 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
252 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
253 outerPatchBox =
geom.Box2D(skyInfo.patchInfo.getOuterBBox())
254 innerTractSkyRegion = skyInfo.tractInfo.getInnerSkyRegion()
259 finalVisitSummaryIdDict =
prepareCatalogDict(finalVisitSummaryRefs, useVisitDetector=
False)
264 diaSourceHistory, ssDiaSourceHistory, ssSourceHistory, unassociatedSsObjectHistory = [], [], [], []
265 nSsSrc, nSsObj = 0, 0
266 visits = set([v
for v, _
in diaIdDict.keys()])
270 visitSummary = finalVisitSummaryIdDict[visit].get()
if visit
in finalVisitSummaryIdDict
else None
271 ssCat = ssObjectIdDict[visit].get()
if visit
in ssObjectIdDict
else None
272 detectors = [det
for (v, det)
in diaIdDict.keys()
if v == visit]
273 for detector
in detectors:
274 diaCat = diaIdDict[(visit, detector)].get()
275 nDiaSrcIn = len(diaCat)
276 if (ssCat
is not None)
and (visitSummary
is not None):
277 ssoAssocResult = self.runSolarSystemAssociation(diaCat,
279 visitSummary=visitSummary,
280 patchBbox=innerPatchBox,
281 patchWcs=skyInfo.wcs,
282 innerTractSkyRegion=innerTractSkyRegion,
287 nSsSrc = len(ssoAssocResult.associatedSsSources)
288 nSsObj = len(ssoAssocResult.unassociatedSsObjects)
292 diaCat = ssoAssocResult.unassociatedDiaSources
294 nSsSrc, nSsObj = 0, 0
300 diaInPatch = self._trimToPatch(diaCat.to_pandas(), outerPatchBox, skyInfo.wcs)
302 nDiaSrc = diaInPatch.sum()
305 "Read DiaSource catalog of length %i from visit %i, "
306 "detector %i. Found %i sources within the patch/tract "
307 "footprint, including %i associated with SSOs.",
308 nDiaSrcIn, visit, detector, nDiaSrc + nSsSrc, nSsSrc)
311 diaSourceHistory.append(diaCat[diaInPatch])
313 ssSourceHistory.append(ssoAssocResult.associatedSsSources)
314 ssDiaSourceHistory.append(ssoAssocResult.associatedSsDiaSources)
317 unassociatedSsObjectHistory.append(ssoAssocResult.unassociatedSsObjects)
321 diaSourceHistoryCat = self._stackCatalogs(diaSourceHistory)
322 ssDiaSourceHistoryCat = self._stackCatalogs(ssDiaSourceHistory)
323 if self.config.doSolarSystemAssociation:
324 ssSourceHistoryCat = self._stackCatalogs(ssSourceHistory, remove_columns=[
'ra',
'dec'])
325 nSsSrcTotal = len(ssSourceHistoryCat)
if ssSourceHistoryCat
else 0
326 unassociatedSsObjectHistoryCat = self._stackCatalogs(unassociatedSsObjectHistory)
327 nSsObjTotal = len(unassociatedSsObjectHistoryCat)
if unassociatedSsObjectHistoryCat
else 0
328 self.log.info(
"Found %i ssSources and %i missing ssObjects in patch %i, tract %i",
329 nSsSrcTotal, nSsObjTotal, patchId, tractId)
331 ssSourceHistoryCat =
None
332 unassociatedSsObjectHistoryCat =
None
334 if (
not diaSourceHistory)
and (
not ssSourceHistory):
335 if not self.config.doWriteEmptyTables:
336 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
338 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
339 len(diaSourceHistoryCat), patchId, tractId)
341 diaSourceTable = diaSourceHistoryCat.to_pandas()
342 diaSourceTable.set_index(
"diaSourceId", drop=
False)
344 assocResult = self.associator.run(diaSourceTable, idGenerator=idGenerator)
348 objectsInTractPatch = self._trimToPatch(assocResult.diaObjects,
351 innerTractSkyRegion=innerTractSkyRegion)
352 diaObjects = assocResult.diaObjects[objectsInTractPatch]
357 assocDiaSources = self.dropDiaSourceByDiaObjectId(assocResult.diaObjects[~objectsInTractPatch].index,
358 assocResult.assocDiaSources)
360 self.log.info(
"Associated DiaSources into %i DiaObjects", len(diaObjects))
362 if self.config.doAddDiaObjectCoords:
363 assocDiaSources = self._addDiaObjectCoords(diaObjects, assocDiaSources)
364 if ssDiaSourceHistoryCat:
365 ssDiaSourceHistoryCat = ssDiaSourceHistoryCat.to_pandas().set_index(
"diaSourceId", drop=
True)
366 assocDiaSources = pd.concat([assocDiaSources, ssDiaSourceHistoryCat])
367 if self.config.doUseSchema:
368 diaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName=
"DiaObject")
369 assocDiaSources = convertDataFrameToSdmSchema(self.schema, assocDiaSources, tableName=
"DiaSource")
370 return pipeBase.Struct(
371 diaObjectTable=diaObjects,
372 assocDiaSourceTable=assocDiaSources,
373 associatedSsSources=ssSourceHistoryCat,
374 unassociatedSsObjects=unassociatedSsObjectHistoryCat,
378 """Stack a list of catalogs.
382 catalogs : `list` of `astropy.table.Table`
383 Input catalogs with the same columns to be combined.
384 remove_columns : `list` of `str` or None, optional
385 List of column names to drop from the tables before stacking.
389 `astropy.table.Table`
390 The combined catalog.
393 sourceHistory = tb.vstack(catalogs)
394 if remove_columns
is not None:
395 sourceHistory.remove_columns(remove_columns)
408 """Run Solar System object association and filter the results.
412 diaCat : `pandas.DataFrame`
413 Catalog of detected diaSources on the image difference.
414 ssCat : `astropy.table.Table`
415 Catalog of predicted coordinates of known Solar System objects.
416 visitSummary : `lsst.afw.table.ExposureCatalog`
417 Table of calibration and metadata for all detectors in a visit.
418 patchBbox : `lsst.geom.Box2D`
419 Bounding box of the patch.
420 patchWcs : `lsst.geom.SkyWcs`
421 Wcs of the tract containing the patch.
422 innerTractSkyRegion : `lsst.sphgeom.Box`
423 Region defining the inner non-overlapping part of a tract.
425 Detector number of the science exposure.
427 Visit number of the science exposure.
431 ssoAssocResult : `lsst.pipe.base.Struct`
432 Results struct with attributes:
434 ``associatedSsSources``
435 Table of DiaSources associated with Solar System objects.
436 (`astropy.table.Table`)
437 ``associatedSsDiaSources``
438 Table of Solar System objects associated with DiaSources.
439 (`astropy.table.Table`).
440 ``unassociatedSsObjects``
441 Table of Solar System objects in the patch not associated with
442 any DiaSource (`astropy.table.Table`).
443 ``unassociatedDiaSources``
444 Table of DiaSources not associated with any Solar System object
445 (`astropy.table.Table`).
448 ssoAssocResult = self.solarSystemAssociator.run(
451 visitInfo=visitSummary.find(detector).visitInfo,
452 bbox=visitSummary.find(detector).getBBox(),
453 wcs=visitSummary.find(detector).wcs,
456 ssInTractPatch = self._trimToPatch(ssoAssocResult.associatedSsSources.to_pandas(),
459 innerTractSkyRegion=innerTractSkyRegion)
460 associatedSsSources = ssoAssocResult.associatedSsSources[ssInTractPatch].copy()
461 assocDiaSrcIds = set(associatedSsSources[
'diaSourceId'])
462 diaSrcMask = [diaId
in assocDiaSrcIds
for diaId
in ssoAssocResult.ssoAssocDiaSources[
'diaSourceId']]
463 associatedSsDiaSources = ssoAssocResult.ssoAssocDiaSources[np.array(diaSrcMask)]
465 ssObjInTractPatch = self._trimToPatch(ssoAssocResult.unassociatedSsObjects.to_pandas(),
468 innerTractSkyRegion=innerTractSkyRegion)
469 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects[ssObjInTractPatch].copy()
472 if len(unassociatedSsObjects) > 0:
473 unassociatedSsObjects[
'visit'] = visit
474 unassociatedSsObjects[
'detector'] = detector
476 return pipeBase.Struct(
477 associatedSsSources=associatedSsSources,
478 associatedSsDiaSources=associatedSsDiaSources,
479 unassociatedSsObjects=unassociatedSsObjects,
480 unassociatedDiaSources=ssoAssocResult.unAssocDiaSources
484 obj = objects[[
'ra',
'dec']].rename(columns={
"ra":
"coord_ra",
"dec":
"coord_dec"})
485 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
486 how=
'inner').set_index(
'diaSourceId')
490 """Create generator testing if a set of DiaSources are in the
495 cat : `pandas.DataFrame`
496 Catalog of DiaSources to test within patch/tract.
497 patchBox : `lsst.geom.Box2D`
498 Bounding box of the patch.
499 wcs : `lsst.geom.SkyWcs`
501 innerTractSkyRegion : `lsst.sphgeom.Box`, optional
502 Region defining the inner non-overlapping part of a tract.
506 isInPatch : `numpy.ndarray`, (N,)
507 Booleans representing if the DiaSources are contained within the
508 current patch and tract.
510 isInPatch = np.zeros(len(cat), dtype=bool)
512 for idx, row
in cat.reset_index().iterrows():
514 pxCoord = wcs.skyToPixel(spPoint)
515 ra_rad = np.deg2rad(row[
"ra"])
516 dec_rad = np.deg2rad(row[
"dec"])
517 isInPatch[idx] = patchBox.contains(pxCoord)
519 if innerTractSkyRegion
is not None:
520 isInPatch[idx] &= innerTractSkyRegion.contains(ra_rad, dec_rad)
525 """Drop diaSources with diaObject IDs in the supplied list.
529 droppedDiaObjectIds : `pandas.DataFrame`
530 DiaObjectIds to match and drop from the list of diaSources.
531 diaSources : `pandas.DataFrame`
532 Catalog of diaSources to check and filter.
536 filteredDiaSources : `pandas.DataFrame`
537 The input diaSources with any rows matching the listed diaObjectIds
540 toDrop = diaSources[
'diaObjectId'].isin(droppedDiaObjectIds)
543 return diaSources.loc[~toDrop].copy()
547 """Prepare lookup tables of the data references.
551 dataRefList : `list` of `lsst.daf.butler.DeferredDatasetHandle`
552 The data references to make a lookup table for.
553 useVisitDetector : `bool`, optional
554 Use both visit and detector in the dict key? If False, use only visit.
558 `dict` of `lsst.daf.butler.DeferredDatasetHandle`
559 Lookup table of the data references by visit (and optionally detector)
564 for dataRef
in dataRefList:
565 dataDict[(dataRef.dataId[
"visit"], dataRef.dataId[
"detector"])] = dataRef
567 for dataRef
in dataRefList:
568 dataDict[dataRef.dataId[
"visit"]] = dataRef
_addDiaObjectCoords(self, objects, sources)
_stackCatalogs(self, catalogs, remove_columns=None, empty=None)
_trimToPatch(self, cat, patchBox, wcs, innerTractSkyRegion=None)
prepareCatalogDict(dataRefList, useVisitDetector=True)
runSolarSystemAssociation(self, diaCat, ssCat, visitSummary, patchBbox, patchWcs, innerTractSkyRegion, detector, visit)
dropDiaSourceByDiaObjectId(self, droppedDiaObjectIds, diaSources)