Coverage for python / lsst / source / injection / bin / ingest_injection_catalog.py: 19%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:38 +0000
1# This file is part of source_injection.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import logging
25import time
26from argparse import SUPPRESS, ArgumentParser
28from astropy.table import Table
30from lsst.daf.butler import Butler
31from lsst.daf.butler.formatters.parquet import ParquetFormatter, arrow_to_astropy, pq
33from ..utils import ingest_injection_catalog
34from .source_injection_help_formatter import SourceInjectionHelpFormatter
37def _is_parquet(filename: str):
38 """Return if a filename has a parquet extension.
40 Notes
41 -----
42 This could be replaced with astropy.io.misc.parquet_identify, which has
43 additional functionality to open the file and validate the first set of
44 bytes.
45 """
46 extensions = {".parquet", ".parq"} | {
47 ParquetFormatter.default_extension,
48 }
49 extensions = tuple(ext for ext in extensions if ext is not None)
51 return filename.endswith(extensions)
54def build_argparser():
55 """Build an argument parser for this script."""
56 parser = ArgumentParser(
57 description="""Ingest a source injection catalog into the butler.
59This script reads an on-disk input catalog or multiple per-band input catalogs
60and ingests these data into the butler. Catalogs are read in using the astropy
61Table API. Parquet filenames will be read through daf_butler functions, whereas
62other file types will attempt to use astropy.table.Table.read. See DM-44159 for
63details.
65An attempt at auto-identification of the input catalog file format type will be
66made for each input. A manually specified format can instead be specified for
67all input catalogs using the ``--format`` option.
69Each injection catalog must be associated with at least one band, specified
70immediately after the path to the input catalog. Multiple space-separated bands
71can be provided for a single input catalog. The injection catalog option may
72also be called multiple times to ingest multiple per-band catalogs.
73""",
74 formatter_class=SourceInjectionHelpFormatter,
75 epilog="More information is available at https://pipelines.lsst.io.",
76 add_help=False,
77 argument_default=SUPPRESS,
78 )
79 parser.add_argument(
80 "-b",
81 "--butler-config",
82 type=str,
83 help="Location of the butler/registry config file.",
84 required=True,
85 metavar="TEXT",
86 )
87 parser.add_argument(
88 "-i",
89 "--injection-catalog",
90 type=str,
91 help="Location of the input source injection catalog and all associated bands.",
92 required=True,
93 metavar=("FILE BAND", "BAND"),
94 nargs="+",
95 action="append",
96 )
97 parser.add_argument(
98 "-o",
99 "--output-collection",
100 type=str,
101 help="Name of the output collection to ingest the injection catalog into.",
102 required=True,
103 metavar="COLL",
104 )
105 parser.add_argument(
106 "-t",
107 "--dataset-type-name",
108 type=str,
109 help="Output dataset type name for the ingested source injection catalog.",
110 metavar="TEXT",
111 default="injection_catalog",
112 )
113 parser.add_argument(
114 "--format",
115 type=str,
116 help="Format of the input injection catalog(s), overriding auto-identification.",
117 metavar="TEXT",
118 )
119 parser.add_argument(
120 "-h",
121 "--help",
122 action="help",
123 help="Show this help message and exit.",
124 )
125 return parser
128def main():
129 """Use this as the main entry point when calling from the command line."""
130 # Set up logging.
131 tz = time.strftime("%z")
132 logging.basicConfig(
133 format="%(levelname)s %(asctime)s.%(msecs)03d" + tz + " - %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
134 )
135 logger = logging.getLogger(__name__)
136 logger.setLevel(logging.DEBUG)
138 args = build_argparser().parse_args()
140 injection_catalogs = []
141 for injection_catalog_schema in args.injection_catalog:
142 if len(injection_catalog_schema) < 2:
143 raise RuntimeError("Each injection catalog must be associated with at least one band.")
144 injection_catalog = injection_catalog_schema.pop(0)
145 for band in injection_catalog_schema:
146 injection_catalogs.append((injection_catalog, band))
148 writeable_butler = Butler.from_config(args.butler_config, writeable=True)
149 injection_catalog_format = vars(args).get("format", None)
151 injection_catalogs_table = Table(rows=injection_catalogs, names=("injection_catalog", "band"))
152 injection_catalogs_groups = injection_catalogs_table.group_by("band")
154 for injection_catalogs_group in injection_catalogs_groups.groups:
155 band = str(injection_catalogs_group["band"][0])
157 injection_catalogs_band = []
158 for injection_catalog in injection_catalogs_group["injection_catalog"]:
159 # The character_as_bytes=False option is preferred, if possible.
160 if isinstance(injection_catalog, str) and _is_parquet(injection_catalog):
161 tbl = arrow_to_astropy(pq.read_table(injection_catalog, use_threads=False))
162 else:
163 try:
164 tbl = Table.read(
165 injection_catalog,
166 format=injection_catalog_format,
167 character_as_bytes=False,
168 )
169 except TypeError:
170 tbl = Table.read(injection_catalog, format=injection_catalog_format)
171 injection_catalogs_band.append(tbl)
173 _ = ingest_injection_catalog(
174 writeable_butler=writeable_butler,
175 table=injection_catalogs_band,
176 band=band,
177 **{
178 k: v
179 for k, v in vars(args).items()
180 if k not in ["butler_config", "injection_catalog", "format"]
181 },
182 )