Coverage for python / astro_metadata_translator / file_helpers.py: 14%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:38 +0000
1# This file is part of astro_metadata_translator.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Support functions for script implementations.
14These functions should not be treated as part of the public API.
15"""
17from __future__ import annotations
19__all__ = ("find_files", "read_basic_metadata_from_file", "read_file_info")
21import json
22import logging
23import re
24import traceback
25from collections.abc import Iterable, MutableMapping
26from typing import IO, TYPE_CHECKING, Any
28from astropy.io import fits
29from lsst.resources import ResourcePath
31from .headers import merge_headers
32from .observationInfo import ObservationInfo
33from .tests import read_test_file
35if TYPE_CHECKING:
36 from lsst.resources import ResourcePathExpression
38log = logging.getLogger(__name__)
40# Prefer afw over Astropy
41try:
42 import lsst.daf.base # noqa: F401 need PropertyBase for readMetadata
43 from lsst.afw.fits import FitsError, readMetadata
45 have_afw = True
47 def _read_fits_metadata_afw(
48 file: str, hdu: int, can_raise: bool = False
49 ) -> MutableMapping[str, Any] | None:
50 # Only works with local files.
51 # Tries to catch a FitsError 104 and convert to `FileNotFoundError`.
52 # For detailed docstrings see the _read_fits_metadata implementation
53 # below.
54 try:
55 return readMetadata(file, hdu=hdu)
56 except FitsError as e:
57 if can_raise:
58 # Try to convert a basic fits error code
59 if "(104)" in str(e):
60 raise FileNotFoundError(f"No such file or directory: {file}") from e
61 raise e
62 return None
64except ImportError:
65 have_afw = False
68def _read_fits_metadata_astropy(
69 file: ResourcePathExpression, hdu: int, can_raise: bool = False
70) -> MutableMapping[str, Any] | None:
71 """Read a FITS header using astropy.
73 Parameters
74 ----------
75 file : `str` or `lsst.resources.ResourcePathExpression`
76 The file to read.
77 hdu : `int`
78 The header number to read.
79 can_raise : `bool`, optional
80 Indicate whether the function can raise and exception (default)
81 or should return `None` on error. Can still raise if an unexpected
82 error is encountered.
84 Returns
85 -------
86 md : `dict`
87 The requested header. `None` if it could not be read and
88 ``can_raise`` is `False`.
89 """
90 header = None
91 uri = ResourcePath(file, forceDirectory=False)
92 try:
93 fs, fspath = uri.to_fsspec()
94 with fs.open(fspath) as f, fits.open(f) as fits_file:
95 try:
96 # Copy forces a download of the remote resource.
97 header = fits_file[hdu].header.copy()
98 except IndexError as e:
99 if can_raise:
100 raise e
101 except Exception as e:
102 if can_raise:
103 raise e
104 return header
107def _read_fits_metadata(
108 file: ResourcePathExpression, hdu: int, can_raise: bool = False
109) -> MutableMapping[str, Any] | None:
110 """Read a FITS header using afw or astropy.
112 Prefer afw for local reads if available.
114 Parameters
115 ----------
116 file : `str` or `lsst.resources.ResourcePathExpression`
117 The file to read.
118 hdu : `int`
119 The header number to read.
120 can_raise : `bool`, optional
121 Indicate whether the function can raise and exception (default)
122 or should return `None` on error. Can still raise if an unexpected
123 error is encountered.
125 Returns
126 -------
127 md : `dict`
128 The requested header. `None` if it could not be read and
129 ``can_raise`` is `False`.
130 """
131 uri = ResourcePath(file, forceAbsolute=False)
132 if have_afw and uri.isLocal:
133 return _read_fits_metadata_afw(uri.ospath, hdu, can_raise=can_raise)
134 return _read_fits_metadata_astropy(uri, hdu, can_raise=can_raise)
137def find_files(files: Iterable[ResourcePathExpression], regex: str) -> list[ResourcePath]:
138 """Find files for processing.
140 Parameters
141 ----------
142 files : iterable of `lsst.resources.ResourcePathExpression`
143 The files or directories from which the headers are to be read.
144 regex : `str`
145 Regular expression string used to filter files when a directory is
146 scanned.
148 Returns
149 -------
150 found_files : `list` of `lsst.resources.ResourcePath`
151 The files that were found.
152 """
153 file_regex = re.compile(regex)
154 found_files: list[ResourcePath] = []
156 # Find all the files of interest
157 for candidate in files:
158 uri = ResourcePath(candidate, forceAbsolute=False)
159 if uri.isdir():
160 found_files.extend(ResourcePath.findFileResources([uri], file_filter=file_regex, grouped=False))
161 else:
162 found_files.append(uri)
164 return found_files
167def read_basic_metadata_from_file(
168 file: ResourcePathExpression, hdrnum: int, can_raise: bool = True
169) -> MutableMapping[str, Any] | None:
170 """Read a raw header from a file, merging if necessary.
172 Parameters
173 ----------
174 file : `str` or `lsst.resources.ResourcePathExpression`
175 Name of file to read. Can be FITS, YAML or JSON. YAML or JSON must be
176 a simple top-level dict.
177 hdrnum : `int`
178 Header number to read. Only relevant for FITS. If greater than 1
179 it will be merged with the primary header. If a negative number is
180 given the second header, if present, will be merged with the primary
181 header. If there is only a primary header a negative number behaves
182 identically to specifying 0 for the HDU number.
183 can_raise : `bool`, optional
184 Indicate whether the function can raise an exception (default)
185 or should return `None` on error. Can still raise if an unexpected
186 error is encountered.
188 Returns
189 -------
190 header : `dict`
191 The header as a dict. Can be `None` if there was a problem reading
192 the file.
193 """
194 uri = ResourcePath(file, forceAbsolute=False)
195 if uri.getExtension() in (".yaml", ".json"):
196 try:
197 md = read_test_file(
198 uri,
199 )
200 except Exception as e:
201 if not can_raise:
202 md = None
203 else:
204 raise e
205 if hdrnum != 0:
206 # YAML can't have HDUs so skip merging below
207 hdrnum = 0
208 else:
209 md = _read_fits_metadata(uri, 0, can_raise=can_raise)
210 if md is None:
211 log.warning("Unable to open file %s", file)
212 return None
213 if hdrnum < 0:
214 if "EXTEND" in md and md["EXTEND"]:
215 hdrnum = 1
216 if hdrnum > 0:
217 # Allow this to fail
218 mdn = _read_fits_metadata(uri, int(hdrnum), can_raise=False)
219 # Astropy does not allow append mode since it does not
220 # convert lists to multiple cards. Overwrite for now
221 if mdn is not None:
222 md = merge_headers([md, mdn], mode="overwrite")
223 else:
224 log.warning("HDU %d was not found in file %s. Ignoring request.", hdrnum, uri)
226 return md
229def read_file_info(
230 file: ResourcePathExpression,
231 hdrnum: int,
232 print_trace: bool | None = None,
233 content_mode: str = "translated",
234 content_type: str = "simple",
235 outstream: IO | None = None,
236) -> str | MutableMapping[str, Any] | ObservationInfo | None:
237 """Read information from file.
239 Parameters
240 ----------
241 file : `str`
242 The file from which the header is to be read.
243 hdrnum : `int`
244 The HDU number to read. The primary header is always read and
245 merged with the header from this HDU.
246 print_trace : `bool` or `None`
247 If there is an error reading the file and this parameter is `True`,
248 a full traceback of the exception will be reported. If `False` prints
249 a one line summary of the error condition. If `None` the exception
250 will be allowed to propagate.
251 content_mode : `str`
252 Content returned. This can be: ``metadata`` to return the unfixed
253 metadata headers; ``translated`` to return the output from metadata
254 translation.
255 content_type : `str`, optional
256 Form of content to be returned. Can be either ``json`` to return a
257 JSON string, ``simple`` to always return a `dict`, or ``native`` to
258 return either a `dict` (for ``metadata``) or
259 `~astro_metadata_translator.ObservationInfo` for ``translated``.
260 outstream : `io.StringIO` or `None`, optional
261 Output stream to use for standard messages. Defaults to `None` which
262 uses the default output stream.
264 Returns
265 -------
266 simple : `dict` of `str` or `~astro_metadata_translator.ObservationInfo`
267 The return value of
268 `~astro_metadata_translator.ObservationInfo.to_simple`. Returns
269 `None` if there was a problem and ``print_trace`` is not `None`.
270 """
271 if content_mode not in ("metadata", "translated"):
272 raise ValueError(f"Unrecognized content mode request: {content_mode}")
274 if content_type not in ("native", "simple", "json"):
275 raise ValueError(f"Unrecognized content type request {content_type}")
277 uri = ResourcePath(file, forceAbsolute=False)
278 try:
279 # Calculate the JSON from the file
280 md = read_basic_metadata_from_file(uri, hdrnum, can_raise=True if print_trace is None else False)
281 if md is None:
282 return None
283 if content_mode == "metadata":
284 # Do not fix the header
285 if content_type == "json":
286 # Add a key to tell the reader whether this is md or translated
287 md["__CONTENT__"] = content_mode
288 try:
289 json_str = json.dumps(md)
290 except TypeError:
291 # Cast to dict and try again -- PropertyList is a problem
292 json_str = json.dumps(dict(md))
293 return json_str
294 return md
295 obs_info = ObservationInfo(md, pedantic=True, filename=str(uri))
296 if content_type == "native":
297 return obs_info
298 simple = obs_info.to_simple()
299 if content_type == "simple":
300 return simple
301 if content_type == "json":
302 # Add a key to tell the reader if this is metadata or translated
303 simple["__CONTENT__"] = content_mode
304 return json.dumps(simple)
305 raise RuntimeError(f"Logic error. Unrecognized mode for reading file: {content_mode}/{content_type}")
306 except Exception as e:
307 if print_trace is None:
308 raise e
309 if print_trace:
310 traceback.print_exc(file=outstream)
311 else:
312 print(repr(e), file=outstream)
313 return None