Coverage for python / lsst / obs / base / utils.py: 16%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:52 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = (
23 "InitialSkyWcsError",
24 "TableVStack",
25 "add_provenance_to_fits_header",
26 "bboxFromIraf",
27 "createInitialSkyWcs",
28 "createInitialSkyWcsFromBoresight",
29 "iso_date_to_curated_calib_file_root",
30 "strip_provenance_from_fits_header",
31)
33import datetime
34import re
35from collections.abc import Iterable, MutableMapping
36from typing import TYPE_CHECKING, Any, Self
38import astropy.table
39import numpy as np
40from astro_metadata_translator.headers import merge_headers
41from numpy.typing import ArrayLike
43import lsst.geom as geom
44import lsst.pex.exceptions
45from lsst.afw.cameraGeom import FIELD_ANGLE, PIXELS
46from lsst.afw.geom.skyWcs import makeSkyWcs
47from lsst.afw.image import RotType
48from lsst.daf.base import PropertyList
49from lsst.daf.butler import DatasetProvenance, DatasetRef
51if TYPE_CHECKING:
52 import lsst.afw.image
53 import lsst.geom
55_PROVENANCE_PREFIX = "LSST BUTLER"
56"""The prefix used in FITS headers for butler provenance."""
58DeferredHandles = Iterable[lsst.daf.butler.DeferredDatasetHandle]
61class InitialSkyWcsError(Exception):
62 """For handling failures when creating a SkyWcs from a camera geometry and
63 boresight.
65 Typically used as a chained exception from a lower level exception.
66 """
68 pass
71def createInitialSkyWcs(
72 visitInfo: lsst.afw.image.VisitInfo, detector: lsst.afw.cameraGeom.Detector, flipX: bool | None = None
73) -> lsst.afw.geom.SkyWcs:
74 """Create a SkyWcs from the visit information and detector geometry.
76 A typical use case for this is to create the initial WCS for a newly-read
77 raw exposure.
79 Parameters
80 ----------
81 visitInfo : `lsst.afw.image.VisitInfo`
82 Where to get the telescope boresight and rotator angle from.
83 detector : `lsst.afw.cameraGeom.Detector`
84 Where to get the camera geometry from.
85 flipX : `bool` or `None`, optional
86 If `False`, +X is along W, if `True` +X is along E. If `None`, the
87 focal plane parity in the camera geometry is assumed to be correct,
88 which may not be true for old camera datasets.
90 Returns
91 -------
92 skyWcs : `lsst.afw.geom.SkyWcs`
93 The new composed WCS.
95 Raises
96 ------
97 InitialSkyWcsError
98 Raised if there is an error generating the SkyWcs, chained from the
99 lower-level exception if available.
100 """
101 if visitInfo.getRotType() != RotType.SKY:
102 msg = (
103 "Cannot create SkyWcs from camera geometry: rotator angle defined using "
104 f"RotType={visitInfo.getRotType()} instead of SKY."
105 )
106 raise InitialSkyWcsError(msg)
107 orientation = visitInfo.getBoresightRotAngle()
108 boresight = visitInfo.getBoresightRaDec()
109 return createInitialSkyWcsFromBoresight(boresight, orientation, detector, flipX)
112def createInitialSkyWcsFromBoresight(
113 boresight: lsst.geom.SpherePoint,
114 orientation: lsst.geom.Angle,
115 detector: lsst.afw.cameraGeom.Detector,
116 flipX: bool | None = None,
117) -> lsst.afw.geom.SkyWcs:
118 """Create a SkyWcs from the telescope boresight and detector geometry.
120 A typical usecase for this is to create the initial WCS for a newly-read
121 raw exposure.
123 Parameters
124 ----------
125 boresight : `lsst.geom.SpherePoint`
126 The ICRS boresight RA/Dec
127 orientation : `lsst.geom.Angle`
128 The rotation angle of the focal plane on the sky.
129 detector : `lsst.afw.cameraGeom.Detector`
130 Where to get the camera geometry from.
131 flipX : `bool` or `None`, optional
132 If `False`, +X is along W, if `True` +X is along E. If `None`, the
133 focal plane parity in the camera geometry is assumed to be correct,
134 which may not be true for old camera datasets.
136 Returns
137 -------
138 skyWcs : `lsst.afw.geom.SkyWcs`
139 The new composed WCS.
141 Raises
142 ------
143 InitialSkyWcsError
144 Raised if there is an error generating the SkyWcs, chained from the
145 lower-level exception if available.
146 """
147 camera_parity = detector.getTransformMap().getFocalPlaneParity()
148 actual_flip_x = False
149 if flipX and not camera_parity:
150 # This is probably an old camera definition from before those could
151 # hold parity flips. We need to work around this since data
152 # repositories aren't necessarily updated in sync with code.
153 actual_flip_x = True
154 elif flipX is not None and not flipX and camera_parity:
155 raise InitialSkyWcsError(
156 "Camera geometry reports a parity flip between FOCAL_PLANE and FIELD_ANGLE, but this "
157 "was not expected by caller for this instrument."
158 )
159 try:
160 pixelsToFieldAngle = detector.getTransform(
161 detector.makeCameraSys(PIXELS), detector.makeCameraSys(FIELD_ANGLE)
162 )
163 except lsst.pex.exceptions.InvalidParameterError as e:
164 raise InitialSkyWcsError("Cannot compute PIXELS to FIELD_ANGLE Transform.") from e
165 return makeSkyWcs(pixelsToFieldAngle, orientation, actual_flip_x, boresight)
168def bboxFromIraf(irafBBoxStr: str) -> lsst.geom.Box2I:
169 """Return a Box2I corresponding to an IRAF-style BBOX.
171 [x0:x1,y0:y1] where x0 and x1 are the one-indexed start and end columns,
172 and correspondingly y0 and y1 are the start and end rows.
173 """
174 mat = re.search(r"^\[([-\d]+):([-\d]+),([-\d]+):([-\d]+)\]$", irafBBoxStr)
175 if not mat:
176 raise RuntimeError(f'Unable to parse IRAF-style bbox "{irafBBoxStr}"')
177 x0, x1, y0, y1 = (int(_) for _ in mat.groups())
179 return geom.Box2I(geom.PointI(x0 - 1, y0 - 1), geom.PointI(x1 - 1, y1 - 1))
182def _store_str_header(
183 hdr: PropertyList, key: str, value: str, comment: str | None = None, allow_long_headers: bool = True
184) -> None:
185 """Examine string header and return value that can be used.
187 Parameters
188 ----------
189 hdr : `lsst.daf.base.PropertyList`
190 Header to examine.
191 key : `str`
192 The key to use in the FITS header (without HIERARCH).
193 value : `str`
194 The value that is to be stored in the header.
195 comment : `str` or `None`, optional
196 Possible comment to add.
197 allow_long_headers : `bool`, optional
198 If `True` the value will be used unchanged. If `False` the value
199 could have some content elided and a modified version used that will
200 fit in a FITS header. If `False` the maximum length of ``key`` is
201 58 characters to prevent inconsistency where a very short string will
202 fit in 80 characters with ``HIERARCH`` but a very long string will
203 not.
204 """
205 if not allow_long_headers:
206 # Declare that we do not allow keys longer than a fixed number of
207 # characters in this mode. This is enough to allow
208 # HIERARCH {key} = 'abc '
209 # to fit into 80 characters.
210 max_card_length = 80
211 if len(key) > 58:
212 raise ValueError(
213 f"Given keyword {key} is too long when requiring {max_card_length}-character header cards"
214 )
216 # FITS pads strings to 8 characters.
217 min_len = 8
218 formatted = f"HIERARCH {key} = '{value:{min_len}s}'"
219 if (n_over := len(formatted) - max_card_length) > 0:
220 # Elide some content.
221 # Remove the over run + 3 for the ... and 1 for rounding.
222 n_remove = n_over + 1 + 3
223 middle = len(value) // 2
224 half_remove = n_remove // 2
225 value = f"{value[: middle - half_remove]}...{value[middle + half_remove :]}"
227 # Do not forward comment if we have elided.
228 comment = None
230 hdr.set(key, value, comment)
233def add_provenance_to_fits_header(
234 hdr: PropertyList | MutableMapping | None,
235 ref: DatasetRef,
236 provenance: DatasetProvenance | None = None,
237 allow_long_headers: bool = True,
238) -> None:
239 """Modify the given header to include provenance headers.
241 Parameters
242 ----------
243 hdr : `lsst.daf.base.PropertyList` or `collections.abc.MutableMapping`
244 The FITS header to modify. Assumes ``HIERARCH`` will be handled
245 implicitly by the writer.
246 ref : `lsst.daf.butler.DatasetRef`
247 The butler dataset associated with this FITS file.
248 provenance : `lsst.daf.butler.DatasetProvenance` or `None`, optional
249 Provenance for this dataset.
250 allow_long_headers : `bool`, optional
251 If `True` it is assumed that there is no limit to the length of the
252 values being stored. If `False`, assumes that FITS header cards must be
253 kept within 80 characters including quoting, ``HIERARCH``, and ``=``.
254 """
255 # Some datasets do not define a header.
256 if hdr is None:
257 return
259 if provenance is None:
260 provenance = DatasetProvenance()
262 # Get the flat dict in form suitable for FITS. Protect against abnormally
263 # large number of inputs.
264 prov_dict = provenance.to_flat_dict(
265 ref, prefix="LSST BUTLER", sep=" ", simple_types=True, max_inputs=3_000
266 )
268 # Copy keys into a PropertyList so we have the option of including
269 # comments.
270 extras = PropertyList()
272 for k, v in prov_dict.items():
273 if not allow_long_headers and isinstance(v, str):
274 _store_str_header(extras, k, v, allow_long_headers=allow_long_headers)
275 else:
276 extras.set(k, v)
277 # Only add comments if we think it's safe.
278 if allow_long_headers:
279 comment = ""
280 if re.search(r"\bRUN\b", k):
281 comment = "Run collection"
282 elif re.search(r"\bID\b", k):
283 comment = "Dataset ID"
284 elif re.search(r"\bDATAID\b", k):
285 comment = "Data identifier"
286 elif re.search(r"\bDATASETTYPE\b", k):
287 comment = "Dataset type"
288 if comment:
289 extras.setComment(k, comment)
291 # Purge old headers from metadata (important for data ID and input headers
292 # and to prevent headers accumulating in a PropertyList).
293 strip_provenance_from_fits_header(hdr)
295 # Update the header.
296 hdr.update(extras)
299def strip_provenance_from_fits_header(hdr: MutableMapping | PropertyList) -> None:
300 """Remove all FITS headers relating to butler provenance.
302 Parameters
303 ----------
304 hdr : `lsst.daf.base.PropertyList` or `collections.abc.MutableMapping`
305 The FITS header to modify. Assumes ``HIERARCH`` will be handled
306 implicitly by the writer.
308 Notes
309 -----
310 These headers will have been added by, for example, the butler formatter
311 via a call to `add_provenance_to_fits_header`.
312 """
313 for k in list(hdr):
314 if k.startswith(_PROVENANCE_PREFIX):
315 del hdr[k]
318def iso_date_to_curated_calib_file_root(valid_start: str) -> str:
319 """Parse an ISO time, potentially from a calibration valid start time,
320 and convert to form suitable for use in a curated calibration filename.
322 Parameters
323 ----------
324 valid_start : `str`
325 Validity start time in ISOT format.
327 Returns
328 -------
329 date_str : `str`
330 Form suitable for use in a curated calibration file name in a
331 data package: YYYYMMDDTHHMMSS.
332 """
333 # Parse the ISO string to ensure conformity if there is an edit to
334 # valid_start.
335 valid_date = datetime.datetime.fromisoformat(valid_start)
337 # Drop any fractional seconds.
338 return re.sub(r"\W", "", valid_date.isoformat(timespec="seconds"))
341class TableVStack:
342 """A helper class for stacking astropy tables without having them all in
343 memory at once.
345 Parameters
346 ----------
347 capacity : `int`
348 Full size of the final table.
350 Notes
351 -----
352 Unlike `astropy.table.vstack`, this class requires all tables to have the
353 exact same columns (it's slightly more strict than even the
354 ``join_type="exact"`` argument to `astropy.table.vstack`).
355 """
357 def __init__(self, capacity: int) -> None:
358 self.index: int = 0
359 self.capacity = capacity
360 self.result: astropy.table.Table | None = None
362 @classmethod
363 def set_extra_values(
364 cls,
365 table: astropy.table.Table,
366 key: str,
367 values: Any,
368 capacity: int,
369 slicer: slice | None = None,
370 validate: bool = True,
371 ) -> None:
372 """Set extra column values in a slice of a table.
374 Parameters
375 ----------
376 table : `astropy.table.Table`
377 The table to set values for.
378 key : `str`
379 The column key.
380 values : `typing.Any`
381 The value(s) to set. Can be a scalar.
382 capacity : `int`
383 The size to initialize the column with, if it doesn't exist yet.
384 slicer : `slice` or `None`, optional
385 A slice to select values to update.
386 validate : `bool`, optional
387 If True and the column already exists, will raise if the new values
388 do not match the existing ones.
389 """
390 if key in table.colnames:
391 column = table[key]
392 if validate and not np.all((table[key] if (slicer is None) else table[key][slicer]) == values):
393 raise RuntimeError(
394 f"table already contains {column=} with {key=} but values differ from {values=}"
395 )
396 table[key][slicer] = values
397 else:
398 if slicer is None:
399 table[key] = values
400 else:
401 try:
402 dtype = values.dtype
403 except AttributeError:
404 dtype = np.dtype(type(values))
405 table[key] = np.empty(capacity, dtype=dtype)
406 table[key][slicer] = values
408 @classmethod
409 def from_handles(cls, handles: DeferredHandles) -> Self:
410 """Construct from an iterable of
411 `lsst.daf.butler.DeferredDatasetHandle`.
413 Parameters
414 ----------
415 handles : `~collections.abc.Iterable` [ \
416 `lsst.daf.butler.DeferredDatasetHandle` ]
417 Iterable of handles. Must have a storage class that supports the
418 "rowcount" component, which is all that will be fetched.
420 Returns
421 -------
422 vstack : `TableVStack`
423 An instance of this class, initialized with capacity equal to the
424 sum of the rowcounts of all the given table handles.
425 """
426 capacity = sum(handle.get(component="rowcount") for handle in handles)
427 return cls(capacity=capacity)
429 def extend(self, table: astropy.table.Table, extra_values: dict[str, ArrayLike] | None = None) -> None:
430 """Add a single table to the stack.
432 Parameters
433 ----------
434 table : `astropy.table.Table`
435 An astropy table instance.
436 extra_values : `dict`
437 Dict keyed by column name with an array-like of values to set
438 for this table only.
439 """
440 if extra_values is None:
441 extra_values = {}
442 if self.result is None:
443 self.result = astropy.table.Table()
444 slicer = slice(None, len(table))
445 for name in table.colnames:
446 column = table[name]
447 column_cls = type(column)
448 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
449 self.result[name][: len(table)] = column
450 for name, values in extra_values.items():
451 self.set_extra_values(
452 table=self.result,
453 key=name,
454 values=values,
455 capacity=self.capacity,
456 slicer=slicer,
457 )
458 self.index = len(table)
459 self.result.meta = table.meta.copy()
460 else:
461 next_index = self.index + len(table)
462 slicer = slice(self.index, next_index)
463 for name in table.colnames:
464 out_col = self.result[name]
465 in_col = table[name]
466 if out_col.dtype != in_col.dtype:
467 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
468 self.result[name][slicer] = table[name]
469 for name, values in extra_values.items():
470 self.set_extra_values(
471 table=self.result,
472 key=name,
473 values=values,
474 capacity=self.capacity,
475 slicer=slicer,
476 validate=False,
477 )
478 self.index = next_index
479 # Butler provenance should be stripped on merge. It will be
480 # added by butler on write. No attempt is made here to combine
481 # provenance from multiple input tables.
482 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
483 strip_provenance_from_fits_header(self.result.meta)
485 @classmethod
486 def vstack_handles(
487 cls,
488 handles: DeferredHandles,
489 extra_values: dict[int, dict[str, ArrayLike]] | None = None,
490 kwargs_get: dict[str, Any] | None = None,
491 ) -> astropy.table.Table:
492 """Vertically stack tables represented by deferred dataset handles.
494 Parameters
495 ----------
496 handles : `~collections.abc.Iterable` [ \
497 `lsst.daf.butler.DeferredDatasetHandle` ]
498 Iterable of handles. Must have the "ArrowAstropy" storage class
499 and identical columns.
500 extra_values : `dict`
501 Dictionary keyed by index of handle of additional values
502 to pass to extend.
503 kwargs_get : `dict` [`str`, `Any`]
504 Keyword argument-value pairs to pass to handle.get().
506 Returns
507 -------
508 table : `astropy.table.Table`
509 Concatenated table with the same columns as each input table and
510 the rows of all of them, or an empty table if there are no handles.
511 """
512 if not handles:
513 return astropy.table.Table()
514 if extra_values is None:
515 extra_values = {}
516 if kwargs_get is None:
517 kwargs_get = {}
518 handles = tuple(handles) # guard against single-pass iterators
519 # Ensure that zero length catalogs are not included
520 rowcounts = tuple(handle.get(component="rowcount") for handle in handles)
521 handles = tuple(handle for handle, count in zip(handles, rowcounts) if count > 0)
523 vstack = cls(capacity=np.sum(rowcounts))
524 for idx, handle in enumerate(handles):
525 vstack.extend(handle.get(**kwargs_get), extra_values=extra_values.get(idx))
526 return astropy.table.Table() if vstack.result is None else vstack.result