lsst.pipe.tasks gcf790cdeb6+f6e4da7c1c
Loading...
Searching...
No Matches
lsst.pipe.tasks.postprocess Namespace Reference

Classes

class  ConsolidateObjectTableConfig
 
class  ConsolidateObjectTableConnections
 
class  ConsolidateObjectTableTask
 
class  ConsolidateParentTractConfig
 
class  ConsolidateParentTractConnections
 
class  ConsolidateParentTractTask
 
class  ConsolidateSourceTableConfig
 
class  ConsolidateSourceTableConnections
 
class  ConsolidateSourceTableTask
 
class  ConsolidateTractConfig
 
class  ConsolidateTractConnections
 
class  ConsolidateTractTask
 
class  ConsolidateVisitSummaryConfig
 
class  ConsolidateVisitSummaryConnections
 
class  ConsolidateVisitSummaryTask
 
class  for
 
class  MakeCcdVisitTableConfig
 
class  MakeCcdVisitTableConnections
 
class  MakeCcdVisitTableTask
 
class  MakeVisitTableConfig
 
class  MakeVisitTableConnections
 
class  MakeVisitTableTask
 
class  TransformForcedSourceTableConfig
 
class  TransformForcedSourceTableConnections
 
class  TransformForcedSourceTableTask
 
class  TransformObjectCatalogConnections
 
class  TransformSourceTableConfig
 
class  TransformSourceTableConnections
 
class  TransformSourceTableTask
 
class  will
 
class  WriteForcedSourceTableConfig
 
class  WriteForcedSourceTableConnections
 
class  WriteForcedSourceTableTask
 
class  WriteObjectTableConnections
 

Functions

 flattenFilters (df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)
 
 __init__ (self, *, config=None)
 

Variables

 log = logging.getLogger(__name__)
 
 catalogs : `dict`
 
str catalog : `pandas.DataFrame`
 
 result : `~lsst.pipe.base.Struct`
 
 exposure : `lsst.afw.image.exposure.Exposure`
 
 detectorId : `int`
 
 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
 
 newCat : `lsst.afw.table.SourceCatalog`
 
 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
 
 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
 
 filt : `str`, optional
 
 flags : `list`, optional
 
 refFlags : `list`, optional
 
 forcedFlags : `list`, optional
 
 funcs : `~lsst.pipe.tasks.functors.Functor`
 
 inplace
 
 True
 
 drop
 
 primaryKey
 
 df
 
 analysis
 
tuple dimensions = ("instrument", "visit", "detector")):
 
 inputCatalog
 
 outputCatalog
 
 camera
 
 calexp
 
 visitSummarySchema
 
 visit_geometry
 
 inputCatalogs
 
 visitSummaryRefs
 
 visitSummaries
 

Function Documentation

◆ __init__()

lsst.pipe.tasks.postprocess.__init__ ( self,
* ,
config = None )

Definition at line 1445 of file postprocess.py.

◆ flattenFilters()

lsst.pipe.tasks.postprocess.flattenFilters ( df,
noDupCols = ["coord_ra", "coord_dec"],
camelCase = False,
inputBands = None )
Flattens a dataframe with multilevel column index.

Definition at line 75 of file postprocess.py.

Variable Documentation

◆ analysis

lsst.pipe.tasks.postprocess.analysis

Definition at line 793 of file postprocess.py.

◆ calexp

lsst.pipe.tasks.postprocess.calexp
Initial value:
1= connectionTypes.Input(
2 doc="Processed exposures used for metadata",
3 name="calexp",
4 storageClass="ExposureF",
5 dimensions=("instrument", "visit", "detector"),
6 deferLoad=True,
7 multiple=True,
8 )

Definition at line 1414 of file postprocess.py.

◆ camera

lsst.pipe.tasks.postprocess.camera
Initial value:
1= connectionTypes.PrerequisiteInput(
2 doc="Camera geometry.",
3 name="camera",
4 dimensions=("instrument",),
5 storageClass="Camera",
6 isCalibration=True,
7 )

Definition at line 1407 of file postprocess.py.

◆ catalog

lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame`
dfs = []
for filt, tableDict in catalogs.items():
    for dataset, table in tableDict.items():
        # Convert afwTable to pandas DataFrame if needed
        if isinstance(table, pd.DataFrame):
            df = table
        elif isinstance(table, afwTable.SourceCatalog):
            df = table.asAstropy().to_pandas()
        elif isinstance(table, astropy.table.Table):
            df = table.to_pandas()
        else:
            raise ValueError(f"{dataset=} has unsupported {type(table)=}")
        df.set_index("id", drop=True, inplace=True)

        # Sort columns by name, to ensure matching schema among patches
        df = df.reindex(sorted(df.columns), axis=1)
        df = df.assign(tractId=tract, patchId=patch)

        # Make columns a 3-level MultiIndex
        df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
                                               names=("dataset", "band", "column"))
        dfs.append(df)

# We do this dance and not `pd.concat(dfs)` because the pandas
# concatenation uses infinite memory.
catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog


class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
                          defaultTemplates={"catalogType": ""},
                          dimensions=("instrument", "visit", "detector")):

catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="{catalogType}src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Astropy/Parquet format.  Columns are unchanged.",
name="{catalogType}source",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector")
)


class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
                     pipelineConnections=WriteSourceTableConnections):
pass


class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)
    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]
    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def run(self, catalog, visit, detector, **kwargs):
if visitSummary is not None:
    row = visitSummary.find(detectorId)
    if row is None:
        raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
    if (photoCalib := row.getPhotoCalib()) is None:
        self.log.warning("Detector id %s has None for photoCalib in visit summary; "
                         "skipping reevaluation of photoCalib.", detectorId)
        exposure.setPhotoCalib(None)
    else:
        exposure.setPhotoCalib(photoCalib)
    if (skyWcs := row.getWcs()) is None:
        self.log.warning("Detector id %s has None for skyWcs in visit summary; "
                         "skipping reevaluation of skyWcs.", detectorId)
        exposure.setWcs(None)
    else:
        exposure.setWcs(skyWcs)

return exposure

def addCalibColumns(self, catalog, exposure, **kwargs):

Definition at line 179 of file postprocess.py.

◆ catalogs

lsst.pipe.tasks.postprocess.catalogs : `dict`
_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig

# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = "obj"

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    catalogs = defaultdict(dict)
    for dataset, connection in (
        ("meas", "inputCatalogMeas"),
        ("forced_src", "inputCatalogForcedSrc"),
        ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
    ):
        for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
            catalogs[ref.dataId["band"]][dataset] = cat

    dataId = butlerQC.quantum.dataId
    df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
    outputs = pipeBase.Struct(outputCatalog=df)
    butlerQC.put(outputs, outputRefs)

def run(self, catalogs, tract, patch):

Definition at line 170 of file postprocess.py.

◆ detectorId

lsst.pipe.tasks.postprocess.detectorId : `int`

Definition at line 351 of file postprocess.py.

◆ df

lsst.pipe.tasks.postprocess.df

Definition at line 792 of file postprocess.py.

◆ dimensions

tuple lsst.pipe.tasks.postprocess.dimensions = ("instrument", "visit", "detector")):

Definition at line 1369 of file postprocess.py.

◆ drop

lsst.pipe.tasks.postprocess.drop

Definition at line 788 of file postprocess.py.

◆ exposure

lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure`
self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
tbl = catalog.asAstropy()
tbl["visit"] = visit
# int16 instead of uint8 because databases don't like unsigned bytes.
tbl["detector"] = np.int16(detector)

return pipeBase.Struct(table=tbl)


class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
                                      defaultTemplates={"catalogType": ""},
                                      dimensions=("instrument", "visit", "detector", "skymap")):
visitSummary = connectionTypes.Input(
doc="Input visit-summary catalog with updated calibration objects.",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit",),
)

def __init__(self, config):
# We don't want the input catalog here to be an initial existence
# constraint in QG generation, because that can unfortunately limit the
# set of data IDs of inputs to other tasks, even those that run earlier
# (e.g. updateVisitSummary), when the input 'src' catalog is not
# produced.  It's safer to just use 'visitSummary' existence as an
# initial constraint, and then let the graph prune out the detectors
# that don't have a 'src' for this task only.
self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)


class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
                                 pipelineConnections=WriteRecalibratedSourceTableConnections):

doReevaluatePhotoCalib = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local photoCalib columns"),
)
doReevaluateSkyWcs = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
)


class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable"
ConfigClass = WriteRecalibratedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]

    if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
        exposure = ExposureF()
        inputs["exposure"] = self.prepareCalibratedExposure(
            exposure=exposure,
            visitSummary=inputs["visitSummary"],
            detectorId=butlerQC.quantum.dataId["detector"]
        )
        inputs["catalog"] = self.addCalibColumns(**inputs)

    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):

Definition at line 349 of file postprocess.py.

◆ filt

lsst.pipe.tasks.postprocess.filt : `str`, optional

Definition at line 501 of file postprocess.py.

◆ flags

lsst.pipe.tasks.postprocess.flags : `list`, optional

Definition at line 506 of file postprocess.py.

◆ forcedFlags

lsst.pipe.tasks.postprocess.forcedFlags : `list`, optional

Definition at line 513 of file postprocess.py.

◆ funcs

lsst.pipe.tasks.postprocess.funcs : `~lsst.pipe.tasks.functors.Functor`

Definition at line 737 of file postprocess.py.

◆ functors

lsst.pipe.tasks.postprocess.functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`

Definition at line 494 of file postprocess.py.

◆ handles

lsst.pipe.tasks.postprocess.handles : `~lsst.daf.butler.DeferredDatasetHandle` or
measureConfig = SingleFrameMeasurementTask.ConfigClass()
measureConfig.doReplaceWithNoise = False

# Clear all slots, because we aren't running the relevant plugins.
for slot in measureConfig.slots:
    setattr(measureConfig.slots, slot, None)

measureConfig.plugins.names = []
if self.config.doReevaluateSkyWcs:
    measureConfig.plugins.names.add("base_LocalWcs")
    self.log.info("Re-evaluating base_LocalWcs plugin")
if self.config.doReevaluatePhotoCalib:
    measureConfig.plugins.names.add("base_LocalPhotoCalib")
    self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
pluginsNotToCopy = tuple(measureConfig.plugins.names)

# Create a new schema and catalog
# Copy all columns from original except for the ones to reevaluate
aliasMap = catalog.schema.getAliasMap()
mapper = afwTable.SchemaMapper(catalog.schema)
for item in catalog.schema:
    if not item.field.getName().startswith(pluginsNotToCopy):
        mapper.addMapping(item.key)

schema = mapper.getOutputSchema()
measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
schema.setAliasMap(aliasMap)
newCat = afwTable.SourceCatalog(schema)
newCat.extend(catalog, mapper=mapper)

# Fluxes in sourceCatalogs are in counts, so there are no fluxes to
# update here. LocalPhotoCalibs are applied during transform tasks.
# Update coord_ra/coord_dec, which are expected to be positions on the
# sky and are used as such in sdm tables without transform
if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
    afwTable.updateSourceCoords(exposure.wcs, newCat)
    wcsPlugin = measurement.plugins["base_LocalWcs"]
else:
    wcsPlugin = None

if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
    pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
else:
    pcPlugin = None

for row in newCat:
    if wcsPlugin is not None:
        wcsPlugin.measure(row, exposure)
    if pcPlugin is not None:
        pcPlugin.measure(row, exposure)

return newCat


class PostprocessAnalysis(object):

Definition at line 490 of file postprocess.py.

◆ inplace

lsst.pipe.tasks.postprocess.inplace

Definition at line 788 of file postprocess.py.

◆ inputCatalog

lsst.pipe.tasks.postprocess.inputCatalog
Initial value:
1= connectionTypes.Input(
2 doc="Wide input catalog of sources produced by WriteSourceTableTask",
3 name="{catalogType}source",
4 storageClass="DataFrame",
5 dimensions=("instrument", "visit", "detector"),
6 deferLoad=True
7 )

Definition at line 1371 of file postprocess.py.

◆ inputCatalogs

lsst.pipe.tasks.postprocess.inputCatalogs
Initial value:
1= connectionTypes.Input(
2 doc="Input per-detector Source Tables",
3 name="{catalogType}sourceTable",
4 storageClass="ArrowAstropy",
5 dimensions=("instrument", "visit", "detector"),
6 multiple=True,
7 deferLoad=True,
8 )

Definition at line 1638 of file postprocess.py.

◆ log

lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__)

Definition at line 72 of file postprocess.py.

◆ newCat

lsst.pipe.tasks.postprocess.newCat : `lsst.afw.table.SourceCatalog`

Definition at line 401 of file postprocess.py.

◆ outputCatalog

lsst.pipe.tasks.postprocess.outputCatalog
Initial value:
1= connectionTypes.Output(
2 doc="Narrower, per-detector Source Table transformed and converted per a "
3 "specified set of functors",
4 name="{catalogType}sourceTable",
5 storageClass="ArrowAstropy",
6 dimensions=("instrument", "visit", "detector")
7 )

Definition at line 1378 of file postprocess.py.

◆ primaryKey

lsst.pipe.tasks.postprocess.primaryKey

Definition at line 789 of file postprocess.py.

◆ refFlags

lsst.pipe.tasks.postprocess.refFlags : `list`, optional

Definition at line 510 of file postprocess.py.

◆ result

lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct`

Definition at line 269 of file postprocess.py.

◆ True

lsst.pipe.tasks.postprocess.True

Definition at line 788 of file postprocess.py.

◆ visit_geometry

lsst.pipe.tasks.postprocess.visit_geometry
Initial value:
1= connectionTypes.Output(
2 doc="Updated visit[, detector] regions that can be used to update butler dimensions records.",
3 name="visit_geometry",
4 dimensions=("instrument", "visit"),
5 storageClass="VisitGeometry",
6 )

Definition at line 1438 of file postprocess.py.

◆ visitSummaries

lsst.pipe.tasks.postprocess.visitSummaries
Initial value:
1= connectionTypes.Input(
2 doc="Per-visit consolidated exposure metadata",
3 name="finalVisitSummary",
4 storageClass="ExposureCatalog",
5 dimensions=("instrument", "visit",),
6 multiple=True,
7 deferLoad=True,
8 )

Definition at line 1831 of file postprocess.py.

◆ visitSummary

lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional

Definition at line 353 of file postprocess.py.

◆ visitSummaryRefs

lsst.pipe.tasks.postprocess.visitSummaryRefs
Initial value:
1= connectionTypes.Input(
2 doc="Data references for per-visit consolidated exposure metadata",
3 name="finalVisitSummary",
4 storageClass="ExposureCatalog",
5 dimensions=("instrument", "visit"),
6 multiple=True,
7 deferLoad=True,
8 )

Definition at line 1683 of file postprocess.py.

◆ visitSummarySchema

lsst.pipe.tasks.postprocess.visitSummarySchema
Initial value:
1= connectionTypes.InitOutput(
2 doc="Schema of the visitSummary catalog",
3 name="visitSummary_schema",
4 storageClass="ExposureCatalog",
5 )

Definition at line 1430 of file postprocess.py.