Coverage for python / astro_metadata_translator / bin / translate.py: 13%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:43 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Implementation of the ``astrometadata translate`` command-line. 

13 

14Read file metadata from the specified files and report the translated content. 

15""" 

16 

17from __future__ import annotations 

18 

19__all__ = ("translate_or_dump_headers",) 

20 

21import logging 

22import math 

23import traceback 

24from collections import defaultdict 

25from collections.abc import Sequence 

26from typing import IO, TYPE_CHECKING, Any 

27 

28import astropy.time 

29import astropy.units as u 

30import yaml 

31from astropy.table import Column, MaskedColumn, QTable 

32from lsst.resources import ResourcePath 

33 

34from astro_metadata_translator import MetadataTranslator, ObservationInfo, fix_header 

35 

36from ..file_helpers import find_files, read_basic_metadata_from_file 

37from ..properties import PROPERTIES 

38 

39if TYPE_CHECKING: 

40 from lsst.resources import ResourcePathExpression 

41 

42log = logging.getLogger(__name__) 

43 

44# Output mode choices 

45OUTPUT_MODES = ("auto", "verbose", "table", "yaml", "fixed", "yamlnative", "fixednative", "none") 

46 

47# Number of rows per table page. 

48# This is a minimum given that DECam data files can include multiple headers. 

49MAX_TABLE_PAGE_SIZE = 500 

50 

51# Definitions for table columns 

52TABLE_COLUMNS = ( 

53 {"format": "<", "attr": "observation_id", "label": "ObsId"}, 

54 { 

55 "format": "<", 

56 "attr": "observation_type", 

57 "label": "ImgType", 

58 }, 

59 { 

60 "format": "<", 

61 "attr": "object", 

62 "label": "Object", 

63 }, 

64 { 

65 "format": "<", 

66 "attr": "physical_filter", 

67 "label": "Filter", 

68 }, 

69 {"format": ">8.8s", "attr": "detector_unique_name", "label": "Detector"}, 

70 { 

71 "format": None, 

72 "attr": "exposure_time", 

73 "label": "ExpTime", 

74 "bad": math.nan * u.s, 

75 }, 

76 {"attr": "datetime_begin", "label": "Observation Date", "bad": astropy.time.Time(0.0, format="jd")}, 

77) 

78 

79 

80def read_file( 

81 file: ResourcePathExpression, 

82 hdrnum: int, 

83 print_trace: bool, 

84 output_columns: defaultdict[str, list], 

85 outstream: IO | None = None, 

86 output_mode: str = "verbose", 

87 write_heading: bool = False, 

88 translator_name: str | None = None, 

89) -> bool: 

90 """Read the specified file and process it. 

91 

92 Parameters 

93 ---------- 

94 file : `str` or `lsst.resources.ResourcePathExpression` 

95 The file from which the header is to be read. 

96 hdrnum : `int` 

97 The HDU number to read. The primary header is always read and 

98 merged with the header from this HDU. 

99 print_trace : `bool` 

100 If there is an error reading the file and this parameter is `True`, 

101 a full traceback of the exception will be reported. If `False` prints 

102 a one line summary of the error condition. 

103 output_columns : `collections.defaultdict` [ `str`, `list` ] 

104 For table mode, a place to store the columns. 

105 outstream : `io.StringIO`, optional 

106 Output stream to use for standard messages. Defaults to `None` which 

107 uses the default output stream. 

108 output_mode : `str`, optional 

109 Output mode to use. Must be one of "verbose", "none", "table", 

110 "yaml", or "fixed". "yaml" and "fixed" can be modified with a 

111 "native" suffix to indicate that the output should be a representation 

112 of the native object type representing the header (which can be 

113 PropertyList or an Astropy header). Without this modify headers 

114 will be dumped as simple `dict` form. 

115 "auto" is used to indicate that a single file has been specified 

116 but the output will depend on whether the file is a multi-extension 

117 FITS file or not. 

118 write_heading : `bool`, optional 

119 If `True` and in table mode, write a table heading out before writing 

120 the content. 

121 translator_name : `str` or `None`, optional 

122 Name of a specific translator to use. Must have been pre-loaded. 

123 

124 Returns 

125 ------- 

126 success : `bool` 

127 `True` if the file was handled successfully, `False` if the file 

128 could not be processed. 

129 """ 

130 if output_mode not in OUTPUT_MODES: 

131 raise ValueError(f"Output mode of '{output_mode}' is not understood.") 

132 

133 # This gets in the way in tabular mode 

134 if output_mode != "table": 

135 log.info("Analyzing %s...", file) 

136 

137 uri = ResourcePath(file, forceDirectory=False) 

138 try: 

139 md = read_basic_metadata_from_file(uri, hdrnum, can_raise=True) 

140 if md is None: 

141 raise RuntimeError(f"Failed to read file {uri} HDU={hdrnum}") 

142 

143 if output_mode.endswith("native"): 

144 # Strip native and don't change type of md 

145 output_mode = output_mode[: -len("native")] 

146 else: 

147 # Rewrite md as simple dict for output 

148 md = dict(md.items()) 

149 

150 if output_mode in ("yaml", "fixed"): 

151 if output_mode == "fixed": 

152 fix_header(md, filename=str(uri)) 

153 

154 # The header should be written out in the insertion order 

155 print(yaml.dump(md, sort_keys=False), file=outstream) 

156 return True 

157 

158 # Try to work out a translator class. 

159 if translator_name: 

160 translator_class = MetadataTranslator.get_translator_by_name(translator_name) 

161 if not translator_class: 

162 raise RuntimeError(f"Translator {translator_name} not known to translation system.") 

163 else: 

164 translator_class = MetadataTranslator.determine_translator(md, filename=str(uri)) 

165 

166 # Work out which headers to translate, assuming the default if 

167 # we have a YAML test file. 

168 if uri.getExtension() == ".yaml": 

169 headers = [md] 

170 else: 

171 headers = list(translator_class.determine_translatable_headers(uri, md)) 

172 if output_mode == "auto": 

173 output_mode = "table" if len(headers) > 1 else "verbose" 

174 

175 if output_mode == "table" and output_columns is None: 

176 raise ValueError("Table mode requires output columns") 

177 

178 for md in headers: 

179 obs_info = ObservationInfo( 

180 md, pedantic=False, filename=str(uri), translator_class=translator_class 

181 ) 

182 if output_mode == "table": 

183 for c in TABLE_COLUMNS: 

184 output_columns[c["label"]].append(getattr(obs_info, c["attr"])) 

185 elif output_mode == "verbose": 

186 print(f"{obs_info}", file=outstream) 

187 elif output_mode == "none": 

188 pass 

189 else: 

190 raise RuntimeError(f"Output mode of '{output_mode}' not recognized but should be known.") 

191 

192 except Exception as e: 

193 if print_trace: 

194 traceback.print_exc(file=outstream) 

195 else: 

196 print(f"Failure processing {uri}: {e}", file=outstream) 

197 return False 

198 return True 

199 

200 

201def _fill_bad_values(value: Any, fillvalue: Any) -> Any: 

202 """Convert None values to the fill value. 

203 

204 Parameters 

205 ---------- 

206 value : `~typing.Any` 

207 Value to check. 

208 fillvalue : `~typing.Any` 

209 Value to use if `None`. 

210 

211 Returns 

212 ------- 

213 filled : `~typing.Any` 

214 Original value or the fill value. 

215 """ 

216 return value if value is not None else fillvalue 

217 

218 

219def _dump_columns(output_columns: dict[str, list], outstream: IO | None = None) -> None: 

220 """Write columns to output stream as a table. 

221 

222 Parameters 

223 ---------- 

224 output_columns : `dict` [`str`, `list`] 

225 The columns to be written, indexed by column name. 

226 outstream : `io.StringIO` or `None`, optional 

227 Output stream to use for standard messages. Defaults to `None` which 

228 uses the default output stream. 

229 """ 

230 if not output_columns: 

231 return 

232 

233 qt = QTable() 

234 for c in TABLE_COLUMNS: 

235 data = output_columns[c["label"]] 

236 # If the column has no content, assume no columns have content and 

237 # abandon table output. 

238 if not len(data): 

239 return 

240 

241 need_mask = False 

242 mask = [] 

243 for v in data: 

244 is_none = v is None 

245 if is_none: 

246 need_mask = True 

247 mask.append(is_none) 

248 col_format = c.get("format") 

249 

250 if need_mask: 

251 data = [_fill_bad_values(v, c.get("bad", "-")) for v in output_columns[c["label"]]] 

252 

253 # Quantity have to be handled in special way since they need 

254 # to be merged into a single entity before they can be stored 

255 # in a column. 

256 if issubclass(PROPERTIES[c["attr"]].py_type, u.Quantity): 

257 data = u.Quantity(data) 

258 elif issubclass(PROPERTIES[c["attr"]].py_type, astropy.time.Time): 

259 # Force to ISO string. 

260 data = astropy.time.Time(data).isot 

261 

262 if need_mask: 

263 data = MaskedColumn(name=c["label"], data=data, mask=mask, format=col_format) 

264 else: 

265 data = Column(data=data, name=c["label"], format=col_format) 

266 qt[c["label"]] = data 

267 

268 print("\n".join(qt.pformat(max_lines=-1, max_width=-1)), file=outstream) 

269 

270 

271def translate_or_dump_headers( 

272 files: Sequence[str], 

273 regex: str, 

274 hdrnum: int, 

275 print_trace: bool, 

276 outstream: IO | None = None, 

277 output_mode: str = "auto", 

278 translator_name: str | None = None, 

279) -> tuple[list[str], list[str]]: 

280 """Read and translate metadata from the specified files. 

281 

282 Parameters 

283 ---------- 

284 files : iterable of `str` 

285 The files or directories from which the headers are to be read. 

286 regex : `str` 

287 Regular expression string used to filter files when a directory is 

288 scanned. 

289 hdrnum : `int` 

290 The HDU number to read. The primary header is always read and 

291 merged with the header from this HDU. 

292 print_trace : `bool` 

293 If there is an error reading the file and this parameter is `True`, 

294 a full traceback of the exception will be reported. If `False` prints 

295 a one line summary of the error condition. 

296 outstream : `io.StringIO` or `None`, optional 

297 Output stream to use for standard messages. Defaults to `None` which 

298 uses the default output stream. 

299 output_mode : `str`, optional 

300 Output mode to use for the translated information. 

301 "auto" switches based on how many files are found. 

302 translator_name : `str` or `None`, optional 

303 Name of a specific translator to use. Must have been pre-loaded. 

304 

305 Returns 

306 ------- 

307 okay : `list` of `str` 

308 All the files that were processed successfully. 

309 failed : `list` of `str` 

310 All the files that could not be processed. 

311 """ 

312 found_files = find_files(files, regex) 

313 

314 # Convert "auto" to correct mode but for a single file keep it 

315 # auto in case that file has multiple headers 

316 if output_mode == "auto": 

317 if len(found_files) > 1: 

318 output_mode = "table" 

319 

320 # Process each file 

321 failed = [] 

322 okay = [] 

323 heading = True 

324 output_columns: defaultdict[str, list] = defaultdict(list) 

325 for path in sorted(found_files): 

326 isok = read_file( 

327 path, 

328 hdrnum, 

329 print_trace, 

330 output_columns, 

331 outstream, 

332 output_mode, 

333 heading, 

334 translator_name=translator_name, 

335 ) 

336 heading = False 

337 if isok: 

338 okay.append(str(path)) 

339 else: 

340 failed.append(str(path)) 

341 

342 # Check if we have exceeded the page size and should dump the table. 

343 if output_mode == "table" and len(output_columns[TABLE_COLUMNS[0]["label"]]) >= MAX_TABLE_PAGE_SIZE: 

344 _dump_columns(output_columns, outstream) 

345 output_columns = defaultdict(list) 

346 

347 if output_columns: 

348 _dump_columns(output_columns, outstream) 

349 

350 return okay, failed