lsst.pipe.tasks gcf790cdeb6+e07a3617c0
Loading...
Searching...
No Matches
felis.py
Go to the documentation of this file.
1import numpy as np
2from typing import Mapping, Any
3from textwrap import wrap
4
5import argparse
6import sys
7import yaml
8
9# Default tables to process if none specified
10DEFAULT_TABLES = ["SSObject", "SSSource", "mpc_orbits", "current_identifications", "numbered_identifications"]
11
12
13# ----------------------------------------------------------------------
14# Helper: timestamp precision → numpy time unit
15# ----------------------------------------------------------------------
16def _timestamp_precision_to_unit(prec: int) -> str:
17 """
18 Map a Felis timestamp precision to a numpy datetime64 unit.
19
20 prec = number of decimal places of seconds to retain.
21 """
22 if prec <= 0:
23 return "s"
24 elif prec <= 2:
25 return "ms"
26 elif prec <= 5:
27 return "us"
28 else:
29 return "ns" # max precision numpy supports
30
31
32# ----------------------------------------------------------------------
33# Column → NumPy dtype
34# ----------------------------------------------------------------------
35def _felis_column_to_numpy_dtype(col: Mapping[str, Any]) -> tuple[str, Any]:
36 name = col["name"]
37
38 dt = col.get("datatype")
39 if dt is None:
40 raise ValueError(f"Column {name!r} has no datatype")
41
42 dt = dt.lower()
43
44 # ---------- numeric ------------------------------------------------------
45 if dt == "int8":
46 return name, np.int8
47 if dt in ("int16", "short"):
48 return name, np.int16
49 if dt in ("int32", "int"):
50 return name, np.int32
51 if dt in ("int64", "long", "bigint"):
52 return name, np.int64
53
54 if dt == "uint8":
55 return name, np.uint8
56 if dt == "uint16":
57 return name, np.uint16
58 if dt == "uint32":
59 return name, np.uint32
60 if dt == "uint64":
61 return name, np.uint64
62
63 if dt in ("float32", "float"):
64 return name, np.float32
65 if dt in ("float64", "double"):
66 return name, np.float64
67
68 if dt in ("bool", "boolean"):
69 return name, np.bool_
70
71 # ---------- timestamps ---------------------------------------------------
72 if dt == "timestamp":
73 prec = col.get("precision", 0)
74 if not isinstance(prec, int):
75 raise ValueError(f"Timestamp field {name!r} has non-integer precision")
77 return name, np.dtype(f"datetime64[{unit}]")
78
79 # ---------- fixed-size binary via length? --------------------------------
80 # If you want to support `datatype: binary` later, we can add that here.
81
82 # ---------- strings ------------------------------------------------------
83 if dt in ("string", "unicode", "str", "char"):
84 L = col.get("length")
85 if isinstance(L, int):
86 return name, np.dtype(f"U{L}")
87 return name, np.dtype("U")
88
89 # ---------- lists / arrays / unknown → object ----------------------------
90 return name, object
91
92
93# ----------------------------------------------------------------------
94# Table → NumPy dtype with metadata
95# ----------------------------------------------------------------------
96def felis_table_to_numpy_dtype(table: Mapping[str, Any]) -> np.dtype:
97 """
98 Convert a Felis table definition (YAML → dict) into a NumPy dtype.
99 Metadata stored:
100
101 dtype.metadata["description"] = table description
102 dtype.metadata["columns"] = {name: "[unit] description"}
103 """
104 cols = table.get("columns")
105 if cols is None:
106 raise ValueError("Table definition has no 'columns' key")
107
108 # Field dtypes
109 fields = [_felis_column_to_numpy_dtype(c) for c in cols]
110
111 # Table-level description
112 table_desc = table.get("description")
113
114 # Column metadata with optional unit prepended
115 colmeta = {}
116 for c in cols:
117 name = c["name"]
118 desc = c.get("description")
119 unit = c.get("ivoa:unit")
120
121 if unit is not None:
122 if desc:
123 full = f"[{unit}] {desc}"
124 else:
125 full = f"[{unit}]"
126 else:
127 full = desc
128
129 if full is not None:
130 colmeta[name] = full
131
132 metadata = {}
133 if table_desc is not None:
134 metadata["description"] = table_desc
135 if colmeta:
136 metadata["columns"] = colmeta
137
138 if metadata:
139 return np.dtype(fields, metadata=metadata)
140 return np.dtype(fields)
141
142
144 dtype: np.dtype,
145 table_name: str,
146 target_comment_col: int = 36,
147 max_line_length: int = 110,
148) -> str:
149 """
150 Pretty-print a structured NumPy dtype (with Felis-derived metadata)
151 as valid, readable Python code:
152
153 # Wrapped table description...
154 #
155 <table_name>Dtype = np.dtype([
156 ('field1', '<i8'), # comment...
157 ('very_long_field', ... # comment juts, next long field aligned
158 # to same jutter column...
159 ])
160
161 Parameters
162 ----------
163 dtype : np.dtype
164 Structured dtype with metadata fields:
165 metadata["description"] : table-level description (optional)
166 metadata["columns"] : {col_name: per-column description}
167 table_name : str
168 Name used for the assignment, e.g. <table_name>Dtype.
169 target_comment_col : int, default=36
170 Preferred starting column for comments when the field fits before it.
171 If the field text is longer than this, a "juttering group" alignment
172 logic kicks in to prevent jagged right edges.
173 max_line_length : int, default=110
174 Maximum line length for wrapping table descriptions and comments.
175 Lines will be wrapped to be strictly less than this length.
176
177 Behavior
178 --------
179 * Table description is wrapped to < max_line_length chars, placed above
180 dtype assignment, followed by a blank line.
181 * Field comments:
182 - If the field length <= target_comment_col - 1 → comment starts at
183 target_comment_col, and the "juttering group" resets.
184 - If the field length >= target_comment_col → the comment "juts out".
185 + First such field sets the group's jutter column.
186 + Next juttering fields use max(previous_jut_col, natural_jut_col).
187 + This avoids jaggedness.
188 * Final lines never exceed max_line_length - 1 chars.
189 * Per-field comments wrap into at most 2 lines, with "..." if needed.
190 * dtype.metadata is NOT emitted; only used for comments.
191
192 Returns
193 -------
194 str
195 Pretty Python code string.
196 """
197 if not isinstance(dtype, np.dtype) or not dtype.fields:
198 raise TypeError("Expected a structured numpy.dtype with fields")
199
200 md = dtype.metadata or {}
201 table_desc = md.get("description")
202 col_descs = md.get("columns", {})
203
204 lines: list[str] = []
205
206 # ---- Table description: wrap to < max_line_length chars -----------
207 if table_desc:
208 txt = f"{table_name}: {table_desc}"
209 for w in wrap(txt, width=max_line_length - 3):
210 lines.append(f"# {w}")
211
212 # ---- Begin dtype assignment ---------------------------------------------
213 lines.append(f"{table_name}Dtype = np.dtype([")
214
215 # Build base field specs
216 field_entries = []
217 for name, (ftype, _) in dtype.fields.items():
218 base = f" ({name!r}, {ftype.str!r}),"
219 comment = col_descs.get(name)
220 if comment:
221 comment = " ".join(str(comment).split()) # normalize whitespace
222 field_entries.append((base, comment))
223
224 last_jut_comment_col = None # track juttering group alignment
225
226 # ---- Process each field with smoothed jutter alignment ------------------
227 for base, comment in field_entries:
228 base_len = len(base)
229
230 if not comment:
231 lines.append(base)
232 # Reset jutter group if this field doesn't jut
233 if base_len <= target_comment_col - 1:
234 last_jut_comment_col = None
235 continue
236
237 # Determine comment_col for this field
238 if base_len <= target_comment_col - 1:
239 # field fits → align to target column, reset jutter group
240 comment_col = target_comment_col
241 last_jut_comment_col = None
242 else:
243 # field juts
244 natural_col = base_len + 1 # one space after field
245
246 if last_jut_comment_col is None:
247 # first jutter field
248 comment_col = natural_col
249 last_jut_comment_col = comment_col
250 else:
251 # subsequent jutter fields (apply smoothing)
252 if natural_col <= last_jut_comment_col:
253 # would jut less; align with previous jutter column
254 comment_col = last_jut_comment_col
255 else:
256 # juts further; update group
257 comment_col = natural_col
258 last_jut_comment_col = comment_col
259
260 # Compute max allowed comment length for < max_line_length total chars
261 max_comment_width = max(10, (max_line_length - 1) - (comment_col + 2)) # 2 for "# "
262
263 # Wrap comment into segments
264 words = comment.split()
265 segments = []
266 cur = ""
267 for w in words:
268 if not cur:
269 cur = w
270 elif len(cur) + 1 + len(w) <= max_comment_width:
271 cur += " " + w
272 else:
273 segments.append(cur)
274 cur = w
275 if cur:
276 segments.append(cur)
277
278 # Ellipsize to 2 lines max
279 if len(segments) > 2:
280 segments = segments[:2]
281 if len(segments[-1]) + 3 > max_comment_width:
282 segments[-1] = segments[-1][: max_comment_width - 3].rstrip()
283 segments[-1] += "..."
284
285 # Emit first line
286 pad = " " * (comment_col - base_len)
287 lines.append(f"{base}{pad}# {segments[0]}")
288
289 # Continuations
290 cont_prefix = " " * comment_col + "# "
291 for seg in segments[1:]:
292 lines.append(f"{cont_prefix}{seg}")
293
294 lines.append("])")
295 return "\n".join(lines)
296
297
298def main():
299 parser = argparse.ArgumentParser(description="Generate NumPy dtypes from Felis YAML schema")
300 parser.add_argument("felis_yaml_file", help="Path to the YAML schema file")
301 parser.add_argument(
302 "table_names",
303 nargs="*",
304 default=DEFAULT_TABLES,
305 help=(f"Names of tables to process (default: {', '.join(DEFAULT_TABLES)})"),
306 )
307 args = parser.parse_args()
308
309 with open(args.felis_yaml_file) as fp:
310 schema = yaml.safe_load(fp)
311
312 table_schemas = {t["name"]: t for t in schema["tables"]}
313
314 # Print header
315 print("# ***** GENERATED FILE, DO NOT EDIT BY HAND *****")
316 print("# ruff: noqa: W505")
317 print(f"# generated with {' '.join(sys.argv)} # noqa: E501")
318 print()
319 print("import numpy as np")
320 print()
321
322 for i, table in enumerate(args.table_names):
323 dtype = felis_table_to_numpy_dtype(table_schemas[table])
324 print(pretty_print_dtype(dtype, table))
325 if i < len(args.table_names) - 1:
326 print()
327
328
329if __name__ == "__main__":
330 main()
tuple[str, Any] _felis_column_to_numpy_dtype(Mapping[str, Any] col)
Definition felis.py:35
str _timestamp_precision_to_unit(int prec)
Definition felis.py:16
str pretty_print_dtype(np.dtype dtype, str table_name, int target_comment_col=36, int max_line_length=110)
Definition felis.py:148
np.dtype felis_table_to_numpy_dtype(Mapping[str, Any] table)
Definition felis.py:96