Coverage for python / lsst / source / injection / bin / ingest_injection_catalog.py: 19%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:38 +0000

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import logging 

25import time 

26from argparse import SUPPRESS, ArgumentParser 

27 

28from astropy.table import Table 

29 

30from lsst.daf.butler import Butler 

31from lsst.daf.butler.formatters.parquet import ParquetFormatter, arrow_to_astropy, pq 

32 

33from ..utils import ingest_injection_catalog 

34from .source_injection_help_formatter import SourceInjectionHelpFormatter 

35 

36 

37def _is_parquet(filename: str): 

38 """Return if a filename has a parquet extension. 

39 

40 Notes 

41 ----- 

42 This could be replaced with astropy.io.misc.parquet_identify, which has 

43 additional functionality to open the file and validate the first set of 

44 bytes. 

45 """ 

46 extensions = {".parquet", ".parq"} | { 

47 ParquetFormatter.default_extension, 

48 } 

49 extensions = tuple(ext for ext in extensions if ext is not None) 

50 

51 return filename.endswith(extensions) 

52 

53 

54def build_argparser(): 

55 """Build an argument parser for this script.""" 

56 parser = ArgumentParser( 

57 description="""Ingest a source injection catalog into the butler. 

58 

59This script reads an on-disk input catalog or multiple per-band input catalogs 

60and ingests these data into the butler. Catalogs are read in using the astropy 

61Table API. Parquet filenames will be read through daf_butler functions, whereas 

62other file types will attempt to use astropy.table.Table.read. See DM-44159 for 

63details. 

64 

65An attempt at auto-identification of the input catalog file format type will be 

66made for each input. A manually specified format can instead be specified for 

67all input catalogs using the ``--format`` option. 

68 

69Each injection catalog must be associated with at least one band, specified 

70immediately after the path to the input catalog. Multiple space-separated bands 

71can be provided for a single input catalog. The injection catalog option may 

72also be called multiple times to ingest multiple per-band catalogs. 

73""", 

74 formatter_class=SourceInjectionHelpFormatter, 

75 epilog="More information is available at https://pipelines.lsst.io.", 

76 add_help=False, 

77 argument_default=SUPPRESS, 

78 ) 

79 parser.add_argument( 

80 "-b", 

81 "--butler-config", 

82 type=str, 

83 help="Location of the butler/registry config file.", 

84 required=True, 

85 metavar="TEXT", 

86 ) 

87 parser.add_argument( 

88 "-i", 

89 "--injection-catalog", 

90 type=str, 

91 help="Location of the input source injection catalog and all associated bands.", 

92 required=True, 

93 metavar=("FILE BAND", "BAND"), 

94 nargs="+", 

95 action="append", 

96 ) 

97 parser.add_argument( 

98 "-o", 

99 "--output-collection", 

100 type=str, 

101 help="Name of the output collection to ingest the injection catalog into.", 

102 required=True, 

103 metavar="COLL", 

104 ) 

105 parser.add_argument( 

106 "-t", 

107 "--dataset-type-name", 

108 type=str, 

109 help="Output dataset type name for the ingested source injection catalog.", 

110 metavar="TEXT", 

111 default="injection_catalog", 

112 ) 

113 parser.add_argument( 

114 "--format", 

115 type=str, 

116 help="Format of the input injection catalog(s), overriding auto-identification.", 

117 metavar="TEXT", 

118 ) 

119 parser.add_argument( 

120 "-h", 

121 "--help", 

122 action="help", 

123 help="Show this help message and exit.", 

124 ) 

125 return parser 

126 

127 

128def main(): 

129 """Use this as the main entry point when calling from the command line.""" 

130 # Set up logging. 

131 tz = time.strftime("%z") 

132 logging.basicConfig( 

133 format="%(levelname)s %(asctime)s.%(msecs)03d" + tz + " - %(message)s", datefmt="%Y-%m-%dT%H:%M:%S" 

134 ) 

135 logger = logging.getLogger(__name__) 

136 logger.setLevel(logging.DEBUG) 

137 

138 args = build_argparser().parse_args() 

139 

140 injection_catalogs = [] 

141 for injection_catalog_schema in args.injection_catalog: 

142 if len(injection_catalog_schema) < 2: 

143 raise RuntimeError("Each injection catalog must be associated with at least one band.") 

144 injection_catalog = injection_catalog_schema.pop(0) 

145 for band in injection_catalog_schema: 

146 injection_catalogs.append((injection_catalog, band)) 

147 

148 writeable_butler = Butler.from_config(args.butler_config, writeable=True) 

149 injection_catalog_format = vars(args).get("format", None) 

150 

151 injection_catalogs_table = Table(rows=injection_catalogs, names=("injection_catalog", "band")) 

152 injection_catalogs_groups = injection_catalogs_table.group_by("band") 

153 

154 for injection_catalogs_group in injection_catalogs_groups.groups: 

155 band = str(injection_catalogs_group["band"][0]) 

156 

157 injection_catalogs_band = [] 

158 for injection_catalog in injection_catalogs_group["injection_catalog"]: 

159 # The character_as_bytes=False option is preferred, if possible. 

160 if isinstance(injection_catalog, str) and _is_parquet(injection_catalog): 

161 tbl = arrow_to_astropy(pq.read_table(injection_catalog, use_threads=False)) 

162 else: 

163 try: 

164 tbl = Table.read( 

165 injection_catalog, 

166 format=injection_catalog_format, 

167 character_as_bytes=False, 

168 ) 

169 except TypeError: 

170 tbl = Table.read(injection_catalog, format=injection_catalog_format) 

171 injection_catalogs_band.append(tbl) 

172 

173 _ = ingest_injection_catalog( 

174 writeable_butler=writeable_butler, 

175 table=injection_catalogs_band, 

176 band=band, 

177 **{ 

178 k: v 

179 for k, v in vars(args).items() 

180 if k not in ["butler_config", "injection_catalog", "format"] 

181 }, 

182 )