36from lsst.geom import Angle, Box2I, Extent2I, Point2I, SpherePoint, degrees
41def writeFits(filename, stamps, metadata, type_name, write_mask, write_variance, write_archive=False):
42 """Write a single FITS file containing all stamps.
47 A string indicating the output filename
48 stamps : iterable of `BaseStamp`
49 An iterable of Stamp objects
50 metadata : `PropertyList`
51 A collection of key, value metadata pairs to be
52 written to the primary header
54 Python type name of the StampsBase subclass to use
56 Write the mask data to the output file?
57 write_variance : `bool`
58 Write the variance data to the output file?
59 write_archive : `bool`, optional
60 Write an archive to store Persistables along with each stamp?
63 metadata[
"HAS_MASK"] = write_mask
64 metadata[
"HAS_VARIANCE"] = write_variance
65 metadata[
"HAS_ARCHIVE"] = write_archive
66 metadata[
"N_STAMPS"] = len(stamps)
67 metadata[
"STAMPCLS"] = type_name
69 metadata[
"VERSION"] = 1
71 fitsFile =
Fits(filename,
"w")
72 fitsFile.createEmpty()
76 archive_ids = [oa.put(stamp.archive_element)
for stamp
in stamps]
77 metadata[
"ARCHIVE_IDS"] = archive_ids
78 fitsFile.writeMetadata(metadata)
79 oa.writeFits(fitsFile)
81 fitsFile.writeMetadata(metadata)
84 for i, stamp
in enumerate(stamps):
87 metadata.update({
"EXTVER": i + 1,
"EXTNAME":
"IMAGE"})
88 stamp.stamp_im.getImage().
writeFits(filename, metadata=metadata, mode=
"a")
91 metadata.update({
"EXTVER": i + 1,
"EXTNAME":
"MASK"})
92 stamp.stamp_im.getMask().
writeFits(filename, metadata=metadata, mode=
"a")
95 metadata.update({
"EXTVER": i + 1,
"EXTNAME":
"VARIANCE"})
96 stamp.stamp_im.getVariance().
writeFits(filename, metadata=metadata, mode=
"a")
101 """Read stamps from FITS file, allowing for only a subregion of the stamps
107 A string indicating the file to read
108 stamp_factory : classmethod
109 A factory function defined on a dataclass for constructing
110 stamp objects a la `~lsst.meas.algorithm.Stamp`
111 options : `PropertyList` or `dict`
112 A collection of parameters. If it contains a bounding box
113 (``bbox`` key), or if certain other keys (``llcX``, ``llcY``,
114 ``width``, ``height``) are available for one to be constructed,
115 the bounding box is passed to the ``FitsReader`` in order to
120 stamps : `list` of dataclass objects like `Stamp`, PropertyList
121 A tuple of a list of `Stamp`-like objects
122 metadata : `PropertyList`
127 The data are read using the data type expected by the
128 `~lsst.afw.image.MaskedImage` class attached to the `AbstractStamp`
129 dataclass associated with the factory method.
132 metadata = readMetadata(filename, hdu=0)
133 nStamps = metadata[
"N_STAMPS"]
134 has_archive = metadata[
"HAS_ARCHIVE"]
136 archive_ids = metadata.getArray(
"ARCHIVE_IDS")
137 with Fits(filename,
"r")
as f:
138 nExtensions = f.countHdus()
143 if "bbox" in options.keys():
144 kwargs[
"bbox"] = options[
"bbox"]
146 elif "llcX" in options.keys():
147 llcX = options[
"llcX"]
148 llcY = options[
"llcY"]
149 width = options[
"width"]
150 height = options[
"height"]
152 kwargs[
"bbox"] = bbox
158 masked_image_cls =
None
159 for stamp_field
in fields(stamp_factory.__self__):
160 if issubclass(stamp_field.type, MaskedImage):
161 masked_image_cls = stamp_field.type
164 raise RuntimeError(
"Stamp factory does not use MaskedImage.")
165 default_dtype = np.dtype(masked_image_cls.dtype)
166 variance_dtype = np.dtype(np.float32)
170 for idx
in range(nExtensions - 1):
172 _metadata = copy.copy(metadata)
173 md = readMetadata(filename, hdu=idx + 1)
175 stamp_metadata.append(_metadata)
177 if md[
"XTENSION"] ==
"BINTABLE" and not (
"ZIMAGE" in md
and md[
"ZIMAGE"]):
178 if md[
"EXTNAME"] !=
"ARCHIVE_INDEX":
180 if md[
"EXTNAME"]
in (
"IMAGE",
"VARIANCE"):
182 if md[
"EXTNAME"] ==
"VARIANCE":
183 dtype = variance_dtype
185 dtype = default_dtype
186 elif md[
"EXTNAME"] ==
"MASK":
188 elif md[
"EXTNAME"] ==
"ARCHIVE_INDEX":
190 archive = InputArchive.readFits(f)
192 elif md[
"EXTTYPE"] ==
"ARCHIVE_DATA":
195 raise ValueError(f
"Unknown extension type: {md['EXTNAME']}")
196 stamp_parts.setdefault(md[
"EXTVER"], {})[md[
"EXTNAME"].lower()] = reader.read(dtype=dtype,
198 if len(stamp_parts) != nStamps:
200 f
"Number of stamps read ({len(stamp_parts)}) does not agree with the "
201 f
"number of stamps recorded in the metadata ({nStamps})."
205 for k
in range(nStamps):
207 maskedImage = masked_image_cls(**stamp_parts[k + 1])
208 archive_element = archive.get(archive_ids[k])
if has_archive
else None
209 stamps.append(stamp_factory(maskedImage, stamp_metadata[k], k, archive_element))
211 return stamps, metadata