Coverage for python / lsst / pipe / tasks / ssp / felis.py: 0%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:40 +0000

1import numpy as np 

2from typing import Mapping, Any 

3from textwrap import wrap 

4 

5import argparse 

6import sys 

7import yaml 

8 

9# Default tables to process if none specified 

10DEFAULT_TABLES = ["SSObject", "SSSource", "mpc_orbits", "current_identifications", "numbered_identifications"] 

11 

12 

13# ---------------------------------------------------------------------- 

14# Helper: timestamp precision → numpy time unit 

15# ---------------------------------------------------------------------- 

16def _timestamp_precision_to_unit(prec: int) -> str: 

17 """ 

18 Map a Felis timestamp precision to a numpy datetime64 unit. 

19 

20 prec = number of decimal places of seconds to retain. 

21 """ 

22 if prec <= 0: 

23 return "s" 

24 elif prec <= 2: 

25 return "ms" 

26 elif prec <= 5: 

27 return "us" 

28 else: 

29 return "ns" # max precision numpy supports 

30 

31 

32# ---------------------------------------------------------------------- 

33# Column → NumPy dtype 

34# ---------------------------------------------------------------------- 

35def _felis_column_to_numpy_dtype(col: Mapping[str, Any]) -> tuple[str, Any]: 

36 name = col["name"] 

37 

38 dt = col.get("datatype") 

39 if dt is None: 

40 raise ValueError(f"Column {name!r} has no datatype") 

41 

42 dt = dt.lower() 

43 

44 # ---------- numeric ------------------------------------------------------ 

45 if dt == "int8": 

46 return name, np.int8 

47 if dt in ("int16", "short"): 

48 return name, np.int16 

49 if dt in ("int32", "int"): 

50 return name, np.int32 

51 if dt in ("int64", "long", "bigint"): 

52 return name, np.int64 

53 

54 if dt == "uint8": 

55 return name, np.uint8 

56 if dt == "uint16": 

57 return name, np.uint16 

58 if dt == "uint32": 

59 return name, np.uint32 

60 if dt == "uint64": 

61 return name, np.uint64 

62 

63 if dt in ("float32", "float"): 

64 return name, np.float32 

65 if dt in ("float64", "double"): 

66 return name, np.float64 

67 

68 if dt in ("bool", "boolean"): 

69 return name, np.bool_ 

70 

71 # ---------- timestamps --------------------------------------------------- 

72 if dt == "timestamp": 

73 prec = col.get("precision", 0) 

74 if not isinstance(prec, int): 

75 raise ValueError(f"Timestamp field {name!r} has non-integer precision") 

76 unit = _timestamp_precision_to_unit(prec) 

77 return name, np.dtype(f"datetime64[{unit}]") 

78 

79 # ---------- fixed-size binary via length? -------------------------------- 

80 # If you want to support `datatype: binary` later, we can add that here. 

81 

82 # ---------- strings ------------------------------------------------------ 

83 if dt in ("string", "unicode", "str", "char"): 

84 L = col.get("length") 

85 if isinstance(L, int): 

86 return name, np.dtype(f"U{L}") 

87 return name, np.dtype("U") 

88 

89 # ---------- lists / arrays / unknown → object ---------------------------- 

90 return name, object 

91 

92 

93# ---------------------------------------------------------------------- 

94# Table → NumPy dtype with metadata 

95# ---------------------------------------------------------------------- 

96def felis_table_to_numpy_dtype(table: Mapping[str, Any]) -> np.dtype: 

97 """ 

98 Convert a Felis table definition (YAML → dict) into a NumPy dtype. 

99 Metadata stored: 

100 

101 dtype.metadata["description"] = table description 

102 dtype.metadata["columns"] = {name: "[unit] description"} 

103 """ 

104 cols = table.get("columns") 

105 if cols is None: 

106 raise ValueError("Table definition has no 'columns' key") 

107 

108 # Field dtypes 

109 fields = [_felis_column_to_numpy_dtype(c) for c in cols] 

110 

111 # Table-level description 

112 table_desc = table.get("description") 

113 

114 # Column metadata with optional unit prepended 

115 colmeta = {} 

116 for c in cols: 

117 name = c["name"] 

118 desc = c.get("description") 

119 unit = c.get("ivoa:unit") 

120 

121 if unit is not None: 

122 if desc: 

123 full = f"[{unit}] {desc}" 

124 else: 

125 full = f"[{unit}]" 

126 else: 

127 full = desc 

128 

129 if full is not None: 

130 colmeta[name] = full 

131 

132 metadata = {} 

133 if table_desc is not None: 

134 metadata["description"] = table_desc 

135 if colmeta: 

136 metadata["columns"] = colmeta 

137 

138 if metadata: 

139 return np.dtype(fields, metadata=metadata) 

140 return np.dtype(fields) 

141 

142 

143def pretty_print_dtype( 

144 dtype: np.dtype, 

145 table_name: str, 

146 target_comment_col: int = 36, 

147 max_line_length: int = 110, 

148) -> str: 

149 """ 

150 Pretty-print a structured NumPy dtype (with Felis-derived metadata) 

151 as valid, readable Python code: 

152 

153 # Wrapped table description... 

154 # 

155 <table_name>Dtype = np.dtype([ 

156 ('field1', '<i8'), # comment... 

157 ('very_long_field', ... # comment juts, next long field aligned 

158 # to same jutter column... 

159 ]) 

160 

161 Parameters 

162 ---------- 

163 dtype : np.dtype 

164 Structured dtype with metadata fields: 

165 metadata["description"] : table-level description (optional) 

166 metadata["columns"] : {col_name: per-column description} 

167 table_name : str 

168 Name used for the assignment, e.g. <table_name>Dtype. 

169 target_comment_col : int, default=36 

170 Preferred starting column for comments when the field fits before it. 

171 If the field text is longer than this, a "juttering group" alignment 

172 logic kicks in to prevent jagged right edges. 

173 max_line_length : int, default=110 

174 Maximum line length for wrapping table descriptions and comments. 

175 Lines will be wrapped to be strictly less than this length. 

176 

177 Behavior 

178 -------- 

179 * Table description is wrapped to < max_line_length chars, placed above 

180 dtype assignment, followed by a blank line. 

181 * Field comments: 

182 - If the field length <= target_comment_col - 1 → comment starts at 

183 target_comment_col, and the "juttering group" resets. 

184 - If the field length >= target_comment_col → the comment "juts out". 

185 + First such field sets the group's jutter column. 

186 + Next juttering fields use max(previous_jut_col, natural_jut_col). 

187 + This avoids jaggedness. 

188 * Final lines never exceed max_line_length - 1 chars. 

189 * Per-field comments wrap into at most 2 lines, with "..." if needed. 

190 * dtype.metadata is NOT emitted; only used for comments. 

191 

192 Returns 

193 ------- 

194 str 

195 Pretty Python code string. 

196 """ 

197 if not isinstance(dtype, np.dtype) or not dtype.fields: 

198 raise TypeError("Expected a structured numpy.dtype with fields") 

199 

200 md = dtype.metadata or {} 

201 table_desc = md.get("description") 

202 col_descs = md.get("columns", {}) 

203 

204 lines: list[str] = [] 

205 

206 # ---- Table description: wrap to < max_line_length chars ----------- 

207 if table_desc: 

208 txt = f"{table_name}: {table_desc}" 

209 for w in wrap(txt, width=max_line_length - 3): 

210 lines.append(f"# {w}") 

211 

212 # ---- Begin dtype assignment --------------------------------------------- 

213 lines.append(f"{table_name}Dtype = np.dtype([") 

214 

215 # Build base field specs 

216 field_entries = [] 

217 for name, (ftype, _) in dtype.fields.items(): 

218 base = f" ({name!r}, {ftype.str!r})," 

219 comment = col_descs.get(name) 

220 if comment: 

221 comment = " ".join(str(comment).split()) # normalize whitespace 

222 field_entries.append((base, comment)) 

223 

224 last_jut_comment_col = None # track juttering group alignment 

225 

226 # ---- Process each field with smoothed jutter alignment ------------------ 

227 for base, comment in field_entries: 

228 base_len = len(base) 

229 

230 if not comment: 

231 lines.append(base) 

232 # Reset jutter group if this field doesn't jut 

233 if base_len <= target_comment_col - 1: 

234 last_jut_comment_col = None 

235 continue 

236 

237 # Determine comment_col for this field 

238 if base_len <= target_comment_col - 1: 

239 # field fits → align to target column, reset jutter group 

240 comment_col = target_comment_col 

241 last_jut_comment_col = None 

242 else: 

243 # field juts 

244 natural_col = base_len + 1 # one space after field 

245 

246 if last_jut_comment_col is None: 

247 # first jutter field 

248 comment_col = natural_col 

249 last_jut_comment_col = comment_col 

250 else: 

251 # subsequent jutter fields (apply smoothing) 

252 if natural_col <= last_jut_comment_col: 

253 # would jut less; align with previous jutter column 

254 comment_col = last_jut_comment_col 

255 else: 

256 # juts further; update group 

257 comment_col = natural_col 

258 last_jut_comment_col = comment_col 

259 

260 # Compute max allowed comment length for < max_line_length total chars 

261 max_comment_width = max(10, (max_line_length - 1) - (comment_col + 2)) # 2 for "# " 

262 

263 # Wrap comment into segments 

264 words = comment.split() 

265 segments = [] 

266 cur = "" 

267 for w in words: 

268 if not cur: 

269 cur = w 

270 elif len(cur) + 1 + len(w) <= max_comment_width: 

271 cur += " " + w 

272 else: 

273 segments.append(cur) 

274 cur = w 

275 if cur: 

276 segments.append(cur) 

277 

278 # Ellipsize to 2 lines max 

279 if len(segments) > 2: 

280 segments = segments[:2] 

281 if len(segments[-1]) + 3 > max_comment_width: 

282 segments[-1] = segments[-1][: max_comment_width - 3].rstrip() 

283 segments[-1] += "..." 

284 

285 # Emit first line 

286 pad = " " * (comment_col - base_len) 

287 lines.append(f"{base}{pad}# {segments[0]}") 

288 

289 # Continuations 

290 cont_prefix = " " * comment_col + "# " 

291 for seg in segments[1:]: 

292 lines.append(f"{cont_prefix}{seg}") 

293 

294 lines.append("])") 

295 return "\n".join(lines) 

296 

297 

298def main(): 

299 parser = argparse.ArgumentParser(description="Generate NumPy dtypes from Felis YAML schema") 

300 parser.add_argument("felis_yaml_file", help="Path to the YAML schema file") 

301 parser.add_argument( 

302 "table_names", 

303 nargs="*", 

304 default=DEFAULT_TABLES, 

305 help=(f"Names of tables to process (default: {', '.join(DEFAULT_TABLES)})"), 

306 ) 

307 args = parser.parse_args() 

308 

309 with open(args.felis_yaml_file) as fp: 

310 schema = yaml.safe_load(fp) 

311 

312 table_schemas = {t["name"]: t for t in schema["tables"]} 

313 

314 # Print header 

315 print("# ***** GENERATED FILE, DO NOT EDIT BY HAND *****") 

316 print("# ruff: noqa: W505") 

317 print(f"# generated with {' '.join(sys.argv)} # noqa: E501") 

318 print() 

319 print("import numpy as np") 

320 print() 

321 

322 for i, table in enumerate(args.table_names): 

323 dtype = felis_table_to_numpy_dtype(table_schemas[table]) 

324 print(pretty_print_dtype(dtype, table)) 

325 if i < len(args.table_names) - 1: 

326 print() 

327 

328 

329if __name__ == "__main__": 

330 main()