Coverage for python / astro_metadata_translator / bin / translate.py: 13%
116 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:38 +0000
1# This file is part of astro_metadata_translator.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Implementation of the ``astrometadata translate`` command-line.
14Read file metadata from the specified files and report the translated content.
15"""
17from __future__ import annotations
19__all__ = ("translate_or_dump_headers",)
21import logging
22import math
23import traceback
24from collections import defaultdict
25from collections.abc import Sequence
26from typing import IO, TYPE_CHECKING, Any
28import astropy.time
29import astropy.units as u
30import yaml
31from astropy.table import Column, MaskedColumn, QTable
32from lsst.resources import ResourcePath
34from astro_metadata_translator import MetadataTranslator, ObservationInfo, fix_header
36from ..file_helpers import find_files, read_basic_metadata_from_file
37from ..properties import PROPERTIES
39if TYPE_CHECKING:
40 from lsst.resources import ResourcePathExpression
42log = logging.getLogger(__name__)
44# Output mode choices
45OUTPUT_MODES = ("auto", "verbose", "table", "yaml", "fixed", "yamlnative", "fixednative", "none")
47# Number of rows per table page.
48# This is a minimum given that DECam data files can include multiple headers.
49MAX_TABLE_PAGE_SIZE = 500
51# Definitions for table columns
52TABLE_COLUMNS = (
53 {"format": "<", "attr": "observation_id", "label": "ObsId"},
54 {
55 "format": "<",
56 "attr": "observation_type",
57 "label": "ImgType",
58 },
59 {
60 "format": "<",
61 "attr": "object",
62 "label": "Object",
63 },
64 {
65 "format": "<",
66 "attr": "physical_filter",
67 "label": "Filter",
68 },
69 {"format": ">8.8s", "attr": "detector_unique_name", "label": "Detector"},
70 {
71 "format": None,
72 "attr": "exposure_time",
73 "label": "ExpTime",
74 "bad": math.nan * u.s,
75 },
76 {"attr": "datetime_begin", "label": "Observation Date", "bad": astropy.time.Time(0.0, format="jd")},
77)
80def read_file(
81 file: ResourcePathExpression,
82 hdrnum: int,
83 print_trace: bool,
84 output_columns: defaultdict[str, list],
85 outstream: IO | None = None,
86 output_mode: str = "verbose",
87 write_heading: bool = False,
88 translator_name: str | None = None,
89) -> bool:
90 """Read the specified file and process it.
92 Parameters
93 ----------
94 file : `str` or `lsst.resources.ResourcePathExpression`
95 The file from which the header is to be read.
96 hdrnum : `int`
97 The HDU number to read. The primary header is always read and
98 merged with the header from this HDU.
99 print_trace : `bool`
100 If there is an error reading the file and this parameter is `True`,
101 a full traceback of the exception will be reported. If `False` prints
102 a one line summary of the error condition.
103 output_columns : `collections.defaultdict` [ `str`, `list` ]
104 For table mode, a place to store the columns.
105 outstream : `io.StringIO`, optional
106 Output stream to use for standard messages. Defaults to `None` which
107 uses the default output stream.
108 output_mode : `str`, optional
109 Output mode to use. Must be one of "verbose", "none", "table",
110 "yaml", or "fixed". "yaml" and "fixed" can be modified with a
111 "native" suffix to indicate that the output should be a representation
112 of the native object type representing the header (which can be
113 PropertyList or an Astropy header). Without this modify headers
114 will be dumped as simple `dict` form.
115 "auto" is used to indicate that a single file has been specified
116 but the output will depend on whether the file is a multi-extension
117 FITS file or not.
118 write_heading : `bool`, optional
119 If `True` and in table mode, write a table heading out before writing
120 the content.
121 translator_name : `str` or `None`, optional
122 Name of a specific translator to use. Must have been pre-loaded.
124 Returns
125 -------
126 success : `bool`
127 `True` if the file was handled successfully, `False` if the file
128 could not be processed.
129 """
130 if output_mode not in OUTPUT_MODES:
131 raise ValueError(f"Output mode of '{output_mode}' is not understood.")
133 # This gets in the way in tabular mode
134 if output_mode != "table":
135 log.info("Analyzing %s...", file)
137 uri = ResourcePath(file, forceDirectory=False)
138 try:
139 md = read_basic_metadata_from_file(uri, hdrnum, can_raise=True)
140 if md is None:
141 raise RuntimeError(f"Failed to read file {uri} HDU={hdrnum}")
143 if output_mode.endswith("native"):
144 # Strip native and don't change type of md
145 output_mode = output_mode[: -len("native")]
146 else:
147 # Rewrite md as simple dict for output
148 md = dict(md.items())
150 if output_mode in ("yaml", "fixed"):
151 if output_mode == "fixed":
152 fix_header(md, filename=str(uri))
154 # The header should be written out in the insertion order
155 print(yaml.dump(md, sort_keys=False), file=outstream)
156 return True
158 # Try to work out a translator class.
159 if translator_name:
160 translator_class = MetadataTranslator.get_translator_by_name(translator_name)
161 if not translator_class:
162 raise RuntimeError(f"Translator {translator_name} not known to translation system.")
163 else:
164 translator_class = MetadataTranslator.determine_translator(md, filename=str(uri))
166 # Work out which headers to translate, assuming the default if
167 # we have a YAML test file.
168 if uri.getExtension() == ".yaml":
169 headers = [md]
170 else:
171 headers = list(translator_class.determine_translatable_headers(uri, md))
172 if output_mode == "auto":
173 output_mode = "table" if len(headers) > 1 else "verbose"
175 if output_mode == "table" and output_columns is None:
176 raise ValueError("Table mode requires output columns")
178 for md in headers:
179 obs_info = ObservationInfo(
180 md, pedantic=False, filename=str(uri), translator_class=translator_class
181 )
182 if output_mode == "table":
183 for c in TABLE_COLUMNS:
184 output_columns[c["label"]].append(getattr(obs_info, c["attr"]))
185 elif output_mode == "verbose":
186 print(f"{obs_info}", file=outstream)
187 elif output_mode == "none":
188 pass
189 else:
190 raise RuntimeError(f"Output mode of '{output_mode}' not recognized but should be known.")
192 except Exception as e:
193 if print_trace:
194 traceback.print_exc(file=outstream)
195 else:
196 print(f"Failure processing {uri}: {e}", file=outstream)
197 return False
198 return True
201def _fill_bad_values(value: Any, fillvalue: Any) -> Any:
202 """Convert None values to the fill value.
204 Parameters
205 ----------
206 value : `~typing.Any`
207 Value to check.
208 fillvalue : `~typing.Any`
209 Value to use if `None`.
211 Returns
212 -------
213 filled : `~typing.Any`
214 Original value or the fill value.
215 """
216 return value if value is not None else fillvalue
219def _dump_columns(output_columns: dict[str, list], outstream: IO | None = None) -> None:
220 """Write columns to output stream as a table.
222 Parameters
223 ----------
224 output_columns : `dict` [`str`, `list`]
225 The columns to be written, indexed by column name.
226 outstream : `io.StringIO` or `None`, optional
227 Output stream to use for standard messages. Defaults to `None` which
228 uses the default output stream.
229 """
230 if not output_columns:
231 return
233 qt = QTable()
234 for c in TABLE_COLUMNS:
235 data = output_columns[c["label"]]
236 # If the column has no content, assume no columns have content and
237 # abandon table output.
238 if not len(data):
239 return
241 need_mask = False
242 mask = []
243 for v in data:
244 is_none = v is None
245 if is_none:
246 need_mask = True
247 mask.append(is_none)
248 col_format = c.get("format")
250 if need_mask:
251 data = [_fill_bad_values(v, c.get("bad", "-")) for v in output_columns[c["label"]]]
253 # Quantity have to be handled in special way since they need
254 # to be merged into a single entity before they can be stored
255 # in a column.
256 if issubclass(PROPERTIES[c["attr"]].py_type, u.Quantity):
257 data = u.Quantity(data)
258 elif issubclass(PROPERTIES[c["attr"]].py_type, astropy.time.Time):
259 # Force to ISO string.
260 data = astropy.time.Time(data).isot
262 if need_mask:
263 data = MaskedColumn(name=c["label"], data=data, mask=mask, format=col_format)
264 else:
265 data = Column(data=data, name=c["label"], format=col_format)
266 qt[c["label"]] = data
268 print("\n".join(qt.pformat(max_lines=-1, max_width=-1)), file=outstream)
271def translate_or_dump_headers(
272 files: Sequence[str],
273 regex: str,
274 hdrnum: int,
275 print_trace: bool,
276 outstream: IO | None = None,
277 output_mode: str = "auto",
278 translator_name: str | None = None,
279) -> tuple[list[str], list[str]]:
280 """Read and translate metadata from the specified files.
282 Parameters
283 ----------
284 files : iterable of `str`
285 The files or directories from which the headers are to be read.
286 regex : `str`
287 Regular expression string used to filter files when a directory is
288 scanned.
289 hdrnum : `int`
290 The HDU number to read. The primary header is always read and
291 merged with the header from this HDU.
292 print_trace : `bool`
293 If there is an error reading the file and this parameter is `True`,
294 a full traceback of the exception will be reported. If `False` prints
295 a one line summary of the error condition.
296 outstream : `io.StringIO` or `None`, optional
297 Output stream to use for standard messages. Defaults to `None` which
298 uses the default output stream.
299 output_mode : `str`, optional
300 Output mode to use for the translated information.
301 "auto" switches based on how many files are found.
302 translator_name : `str` or `None`, optional
303 Name of a specific translator to use. Must have been pre-loaded.
305 Returns
306 -------
307 okay : `list` of `str`
308 All the files that were processed successfully.
309 failed : `list` of `str`
310 All the files that could not be processed.
311 """
312 found_files = find_files(files, regex)
314 # Convert "auto" to correct mode but for a single file keep it
315 # auto in case that file has multiple headers
316 if output_mode == "auto":
317 if len(found_files) > 1:
318 output_mode = "table"
320 # Process each file
321 failed = []
322 okay = []
323 heading = True
324 output_columns: defaultdict[str, list] = defaultdict(list)
325 for path in sorted(found_files):
326 isok = read_file(
327 path,
328 hdrnum,
329 print_trace,
330 output_columns,
331 outstream,
332 output_mode,
333 heading,
334 translator_name=translator_name,
335 )
336 heading = False
337 if isok:
338 okay.append(str(path))
339 else:
340 failed.append(str(path))
342 # Check if we have exceeded the page size and should dump the table.
343 if output_mode == "table" and len(output_columns[TABLE_COLUMNS[0]["label"]]) >= MAX_TABLE_PAGE_SIZE:
344 _dump_columns(output_columns, outstream)
345 output_columns = defaultdict(list)
347 if output_columns:
348 _dump_columns(output_columns, outstream)
350 return okay, failed