Coverage for python / lsst / source / injection / utils / _ingest_injection_catalog.py: 26%
23 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:22 +0000
1# This file is part of source_injection.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["ingest_injection_catalog"]
26import logging
28import esutil
29from astropy.table import Table, vstack
31from lsst.daf.butler import Butler, CollectionType, DatasetRef, DatasetType
34def ingest_injection_catalog(
35 writeable_butler: Butler,
36 table: Table | list[Table],
37 band: str,
38 output_collection: str,
39 dataset_type_name: str = "injection_catalog",
40 log_level: int = logging.INFO,
41) -> list[DatasetRef]:
42 """Ingest a source table into the butler.
44 This function ingests either a single astropy Table or list of Tables into
45 the Butler. If a list of Tables is provided, these will be vertically
46 stacked together into one single Table for ingestion. Input source tables
47 are expected to contain the columns ``ra`` and ``dec``, with data in units
48 of degrees. This spatial information will be used to shard up the source
49 table on-disk using a depth 7 Hierarchical Triangular Mesh (HTM7) format.
50 HTM7 trixels have an area of ~0.315 square degrees.
52 Parameters
53 ----------
54 writeable_butler : `lsst.daf.butler.Butler`
55 An instantiated writeable butler.
56 table : `astropy.table.Table` or `list` [`astropy.table.Table`]
57 Input source table(s). Requires columns ``ra`` and ``dec`` in degrees.
58 band : `str`
59 Band associated with the input source table(s).
60 output_collection : `str`
61 Name of the output collection to ingest the consolidated table into.
62 dataset_type_name : `str`, optional
63 Dataset type name for the ingested consolidated table.
64 log_level : `int`, optional
65 The log level to use for logging.
67 Returns
68 -------
69 dataset_refs : `list` [`lsst.daf.butler.DatasetRef`]
70 List containing the dataset refs for the ingested source table.
71 """
72 # Instantiate logger.
73 logger = logging.getLogger(__name__)
74 logger.setLevel(log_level)
76 # Concatenate inputs to a single Table if required.
77 if isinstance(table, list):
78 table = vstack(table)
80 # Register the dataset type and collection.
81 table_dataset_type = DatasetType(
82 dataset_type_name,
83 ("htm7", "band"),
84 "ArrowAstropy",
85 universe=writeable_butler.dimensions,
86 )
87 writeable_butler.registry.registerDatasetType(table_dataset_type)
88 writeable_butler.registry.registerCollection(
89 name=output_collection,
90 type=CollectionType.RUN,
91 )
93 # Generate HTM trixel indices.
94 htm = esutil.htm.HTM(7)
95 htm_indices = htm.lookup_id(table["ra"], table["dec"]) # type: ignore
97 # Loop over all unique trixel indices and ingest.
98 dataset_refs: list[DatasetRef] = []
99 for htm_index in set(htm_indices):
100 table_subset = table[htm_indices == htm_index]
101 dataset_ref = writeable_butler.put(
102 table_subset,
103 table_dataset_type,
104 {"htm7": htm_index, "band": band},
105 run=output_collection,
106 )
107 dataset_refs.append(dataset_ref)
109 # Log results and return.
110 logger.info(
111 "Ingested %d %s-band %s DatasetRef%s into: %s",
112 len(dataset_refs),
113 band,
114 dataset_type_name,
115 "" if len(dataset_refs) == 1 else "s",
116 output_collection,
117 )
118 return dataset_refs