Coverage for python / astro_metadata_translator / file_helpers.py: 14%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:38 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Support functions for script implementations. 

13 

14These functions should not be treated as part of the public API. 

15""" 

16 

17from __future__ import annotations 

18 

19__all__ = ("find_files", "read_basic_metadata_from_file", "read_file_info") 

20 

21import json 

22import logging 

23import re 

24import traceback 

25from collections.abc import Iterable, MutableMapping 

26from typing import IO, TYPE_CHECKING, Any 

27 

28from astropy.io import fits 

29from lsst.resources import ResourcePath 

30 

31from .headers import merge_headers 

32from .observationInfo import ObservationInfo 

33from .tests import read_test_file 

34 

35if TYPE_CHECKING: 

36 from lsst.resources import ResourcePathExpression 

37 

38log = logging.getLogger(__name__) 

39 

40# Prefer afw over Astropy 

41try: 

42 import lsst.daf.base # noqa: F401 need PropertyBase for readMetadata 

43 from lsst.afw.fits import FitsError, readMetadata 

44 

45 have_afw = True 

46 

47 def _read_fits_metadata_afw( 

48 file: str, hdu: int, can_raise: bool = False 

49 ) -> MutableMapping[str, Any] | None: 

50 # Only works with local files. 

51 # Tries to catch a FitsError 104 and convert to `FileNotFoundError`. 

52 # For detailed docstrings see the _read_fits_metadata implementation 

53 # below. 

54 try: 

55 return readMetadata(file, hdu=hdu) 

56 except FitsError as e: 

57 if can_raise: 

58 # Try to convert a basic fits error code 

59 if "(104)" in str(e): 

60 raise FileNotFoundError(f"No such file or directory: {file}") from e 

61 raise e 

62 return None 

63 

64except ImportError: 

65 have_afw = False 

66 

67 

68def _read_fits_metadata_astropy( 

69 file: ResourcePathExpression, hdu: int, can_raise: bool = False 

70) -> MutableMapping[str, Any] | None: 

71 """Read a FITS header using astropy. 

72 

73 Parameters 

74 ---------- 

75 file : `str` or `lsst.resources.ResourcePathExpression` 

76 The file to read. 

77 hdu : `int` 

78 The header number to read. 

79 can_raise : `bool`, optional 

80 Indicate whether the function can raise and exception (default) 

81 or should return `None` on error. Can still raise if an unexpected 

82 error is encountered. 

83 

84 Returns 

85 ------- 

86 md : `dict` 

87 The requested header. `None` if it could not be read and 

88 ``can_raise`` is `False`. 

89 """ 

90 header = None 

91 uri = ResourcePath(file, forceDirectory=False) 

92 try: 

93 fs, fspath = uri.to_fsspec() 

94 with fs.open(fspath) as f, fits.open(f) as fits_file: 

95 try: 

96 # Copy forces a download of the remote resource. 

97 header = fits_file[hdu].header.copy() 

98 except IndexError as e: 

99 if can_raise: 

100 raise e 

101 except Exception as e: 

102 if can_raise: 

103 raise e 

104 return header 

105 

106 

107def _read_fits_metadata( 

108 file: ResourcePathExpression, hdu: int, can_raise: bool = False 

109) -> MutableMapping[str, Any] | None: 

110 """Read a FITS header using afw or astropy. 

111 

112 Prefer afw for local reads if available. 

113 

114 Parameters 

115 ---------- 

116 file : `str` or `lsst.resources.ResourcePathExpression` 

117 The file to read. 

118 hdu : `int` 

119 The header number to read. 

120 can_raise : `bool`, optional 

121 Indicate whether the function can raise and exception (default) 

122 or should return `None` on error. Can still raise if an unexpected 

123 error is encountered. 

124 

125 Returns 

126 ------- 

127 md : `dict` 

128 The requested header. `None` if it could not be read and 

129 ``can_raise`` is `False`. 

130 """ 

131 uri = ResourcePath(file, forceAbsolute=False) 

132 if have_afw and uri.isLocal: 

133 return _read_fits_metadata_afw(uri.ospath, hdu, can_raise=can_raise) 

134 return _read_fits_metadata_astropy(uri, hdu, can_raise=can_raise) 

135 

136 

137def find_files(files: Iterable[ResourcePathExpression], regex: str) -> list[ResourcePath]: 

138 """Find files for processing. 

139 

140 Parameters 

141 ---------- 

142 files : iterable of `lsst.resources.ResourcePathExpression` 

143 The files or directories from which the headers are to be read. 

144 regex : `str` 

145 Regular expression string used to filter files when a directory is 

146 scanned. 

147 

148 Returns 

149 ------- 

150 found_files : `list` of `lsst.resources.ResourcePath` 

151 The files that were found. 

152 """ 

153 file_regex = re.compile(regex) 

154 found_files: list[ResourcePath] = [] 

155 

156 # Find all the files of interest 

157 for candidate in files: 

158 uri = ResourcePath(candidate, forceAbsolute=False) 

159 if uri.isdir(): 

160 found_files.extend(ResourcePath.findFileResources([uri], file_filter=file_regex, grouped=False)) 

161 else: 

162 found_files.append(uri) 

163 

164 return found_files 

165 

166 

167def read_basic_metadata_from_file( 

168 file: ResourcePathExpression, hdrnum: int, can_raise: bool = True 

169) -> MutableMapping[str, Any] | None: 

170 """Read a raw header from a file, merging if necessary. 

171 

172 Parameters 

173 ---------- 

174 file : `str` or `lsst.resources.ResourcePathExpression` 

175 Name of file to read. Can be FITS, YAML or JSON. YAML or JSON must be 

176 a simple top-level dict. 

177 hdrnum : `int` 

178 Header number to read. Only relevant for FITS. If greater than 1 

179 it will be merged with the primary header. If a negative number is 

180 given the second header, if present, will be merged with the primary 

181 header. If there is only a primary header a negative number behaves 

182 identically to specifying 0 for the HDU number. 

183 can_raise : `bool`, optional 

184 Indicate whether the function can raise an exception (default) 

185 or should return `None` on error. Can still raise if an unexpected 

186 error is encountered. 

187 

188 Returns 

189 ------- 

190 header : `dict` 

191 The header as a dict. Can be `None` if there was a problem reading 

192 the file. 

193 """ 

194 uri = ResourcePath(file, forceAbsolute=False) 

195 if uri.getExtension() in (".yaml", ".json"): 

196 try: 

197 md = read_test_file( 

198 uri, 

199 ) 

200 except Exception as e: 

201 if not can_raise: 

202 md = None 

203 else: 

204 raise e 

205 if hdrnum != 0: 

206 # YAML can't have HDUs so skip merging below 

207 hdrnum = 0 

208 else: 

209 md = _read_fits_metadata(uri, 0, can_raise=can_raise) 

210 if md is None: 

211 log.warning("Unable to open file %s", file) 

212 return None 

213 if hdrnum < 0: 

214 if "EXTEND" in md and md["EXTEND"]: 

215 hdrnum = 1 

216 if hdrnum > 0: 

217 # Allow this to fail 

218 mdn = _read_fits_metadata(uri, int(hdrnum), can_raise=False) 

219 # Astropy does not allow append mode since it does not 

220 # convert lists to multiple cards. Overwrite for now 

221 if mdn is not None: 

222 md = merge_headers([md, mdn], mode="overwrite") 

223 else: 

224 log.warning("HDU %d was not found in file %s. Ignoring request.", hdrnum, uri) 

225 

226 return md 

227 

228 

229def read_file_info( 

230 file: ResourcePathExpression, 

231 hdrnum: int, 

232 print_trace: bool | None = None, 

233 content_mode: str = "translated", 

234 content_type: str = "simple", 

235 outstream: IO | None = None, 

236) -> str | MutableMapping[str, Any] | ObservationInfo | None: 

237 """Read information from file. 

238 

239 Parameters 

240 ---------- 

241 file : `str` 

242 The file from which the header is to be read. 

243 hdrnum : `int` 

244 The HDU number to read. The primary header is always read and 

245 merged with the header from this HDU. 

246 print_trace : `bool` or `None` 

247 If there is an error reading the file and this parameter is `True`, 

248 a full traceback of the exception will be reported. If `False` prints 

249 a one line summary of the error condition. If `None` the exception 

250 will be allowed to propagate. 

251 content_mode : `str` 

252 Content returned. This can be: ``metadata`` to return the unfixed 

253 metadata headers; ``translated`` to return the output from metadata 

254 translation. 

255 content_type : `str`, optional 

256 Form of content to be returned. Can be either ``json`` to return a 

257 JSON string, ``simple`` to always return a `dict`, or ``native`` to 

258 return either a `dict` (for ``metadata``) or 

259 `~astro_metadata_translator.ObservationInfo` for ``translated``. 

260 outstream : `io.StringIO` or `None`, optional 

261 Output stream to use for standard messages. Defaults to `None` which 

262 uses the default output stream. 

263 

264 Returns 

265 ------- 

266 simple : `dict` of `str` or `~astro_metadata_translator.ObservationInfo` 

267 The return value of 

268 `~astro_metadata_translator.ObservationInfo.to_simple`. Returns 

269 `None` if there was a problem and ``print_trace`` is not `None`. 

270 """ 

271 if content_mode not in ("metadata", "translated"): 

272 raise ValueError(f"Unrecognized content mode request: {content_mode}") 

273 

274 if content_type not in ("native", "simple", "json"): 

275 raise ValueError(f"Unrecognized content type request {content_type}") 

276 

277 uri = ResourcePath(file, forceAbsolute=False) 

278 try: 

279 # Calculate the JSON from the file 

280 md = read_basic_metadata_from_file(uri, hdrnum, can_raise=True if print_trace is None else False) 

281 if md is None: 

282 return None 

283 if content_mode == "metadata": 

284 # Do not fix the header 

285 if content_type == "json": 

286 # Add a key to tell the reader whether this is md or translated 

287 md["__CONTENT__"] = content_mode 

288 try: 

289 json_str = json.dumps(md) 

290 except TypeError: 

291 # Cast to dict and try again -- PropertyList is a problem 

292 json_str = json.dumps(dict(md)) 

293 return json_str 

294 return md 

295 obs_info = ObservationInfo(md, pedantic=True, filename=str(uri)) 

296 if content_type == "native": 

297 return obs_info 

298 simple = obs_info.to_simple() 

299 if content_type == "simple": 

300 return simple 

301 if content_type == "json": 

302 # Add a key to tell the reader if this is metadata or translated 

303 simple["__CONTENT__"] = content_mode 

304 return json.dumps(simple) 

305 raise RuntimeError(f"Logic error. Unrecognized mode for reading file: {content_mode}/{content_type}") 

306 except Exception as e: 

307 if print_trace is None: 

308 raise e 

309 if print_trace: 

310 traceback.print_exc(file=outstream) 

311 else: 

312 print(repr(e), file=outstream) 

313 return None