Coverage for python / lsst / obs / lsst / _packer.py: 21%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:09 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("RubinDimensionPacker",) 

25 

26import datetime 

27import math 

28 

29from lsst.daf.butler import DataCoordinate, DimensionPacker 

30from lsst.pex.config import Config, Field, DictField 

31from lsst.pipe.base import observation_packer_registry 

32from .translators.lsst import CONTROLLERS, EXPOSURE_ID_MAXDIGITS, LsstBaseTranslator 

33 

34 

35def convert_day_obs_to_ordinal(day_obs: int) -> int: 

36 """Convert a YYYYMMDD decimal-digit integer date to a day ordinal. 

37 

38 Parameters 

39 ---------- 

40 day_obs : `int` 

41 A YYYYMMDD decimal-digit integer. 

42 

43 Returns 

44 ------- 

45 day_ordinal : `int` 

46 An integer that counts days directly, with absolute offset 

47 unspecified. 

48 """ 

49 year_month, day = divmod(day_obs, 100) 

50 year, month = divmod(year_month, 100) 

51 return datetime.date(year, month, day).toordinal() 

52 

53 

54def convert_ordinal_to_day_obs(day_ordinal: int) -> int: 

55 """Convert a day ordinal to a YYYYMMDD decimal-digit integer date. 

56 

57 Parameters 

58 ---------- 

59 day_ordinal : `int` 

60 An integer that counts days directly, with absolute offset 

61 unspecified. 

62 

63 Returns 

64 ------- 

65 day_obs : `int` 

66 A YYYYMMDD decimal-digit integer. 

67 """ 

68 date = datetime.date.fromordinal(day_ordinal) 

69 return (date.year * 100 + date.month) * 100 + date.day 

70 

71 

72def _is_positive(x: int) -> bool: 

73 """Callable that tests whether an integer is positive, for use as a 

74 config ``check`` argument.""" 

75 return x > 0 

76 

77 

78class RubinDimensionPackerConfig(Config): 

79 

80 controllers = DictField( 

81 "Mapping from controller code to integer.", 

82 keytype=str, 

83 itemtype=int, 

84 # Default is to assume all data we're using a dimension packer on is 

85 # OCS, since the main use of the packer for source/object IDs. 

86 default={"O": 0}, 

87 ) 

88 

89 n_controllers = Field( 

90 "Reserved number of controller codes. May be larger than `len(controllers)`.", 

91 dtype=int, 

92 check=_is_positive, 

93 default=1, 

94 ) 

95 

96 n_visit_definitions = Field( 

97 "Reserved number of visit definitions a single exposure may belong to.", 

98 dtype=int, 

99 check=_is_positive, 

100 default=2, 

101 # We need one bit for one-to-one visits that contain only the first 

102 # exposure in a sequence that was originally observed as a multi-snap 

103 # sequence. 

104 ) 

105 

106 n_days = Field( 

107 "Reserved number of distinct valid-date day_obs values, starting from `day_obs_begin`.", 

108 dtype=int, 

109 check=_is_positive, 

110 default=16384, 

111 # Default of 16384 is about 45 years, which with day_obs_begin is 

112 # roughly consistent (and a bit bigger than) the bounds permitted by 

113 # the translator. 

114 ) 

115 

116 n_seq_nums = Field( 

117 "Reserved number of seq_num values, starting from 0.", 

118 dtype=int, 

119 check=_is_positive, 

120 default=32768, 

121 # Default is one exposure every 2.63s for a full day, which is really 

122 # close to the hardware limit of one every 2.3s, and far from what 

123 # anyone would actually do in practice. 

124 ) 

125 

126 n_detectors = Field( 

127 "Reserved number of detectors, starting from 0.", 

128 dtype=int, 

129 check=_is_positive, 

130 default=256 

131 # Default is the number of actual detectors (201, including corner 

132 # rafts) rounded up to a power of 2. 

133 ) 

134 

135 day_obs_begin = Field( 

136 "Inclusive lower bound on day_obs.", 

137 dtype=int, 

138 default=20100101 

139 # Default is just a nice round date that (with n_days) puts the end 

140 # point just after the 2050 bound in the translators. 

141 ) 

142 

143 def use_controllers(self) -> None: 

144 """Configure this packer to include all known controllers, instead 

145 of eliminating that field to save bit space. 

146 

147 This still does not make the packing of controller, day_obs, and 

148 seq_num here the same as what is done in the translator class 

149 calculations of exposure_id, because that translator calculation is 

150 day_obs dependent. 

151 """ 

152 self.controllers = {c: i for i, c in enumerate(CONTROLLERS)} 

153 self.n_controllers = 8 

154 

155 def validate(self): 

156 super().validate() 

157 for c, i in self.controllers.items(): 

158 if i >= self.n_controllers: 

159 raise ValueError( 

160 f"Controller code {c!r} has index {i}, which is out of bounds " 

161 f"for n_controllers={self.n_controllers}." 

162 ) 

163 

164 

165class RubinDimensionPacker(DimensionPacker): 

166 """A data ID packer that converts Rubin visit+detector and 

167 exposure+detector data IDs to integers. 

168 

169 Parameters 

170 ---------- 

171 data_id : `lsst.daf.butler.DataCoordinate` 

172 Data ID identifying at least the instrument dimension. Does not need 

173 to have dimension records attached. 

174 config : `RubinDimensionPackerConfig`, optional 

175 Configuration for this dimension packer. 

176 is_exposure : `bool`, optional 

177 If `False`, construct a packer for visit+detector data IDs. If `True`, 

178 construct a packer for exposure+detector data IDs. If `None`, this is 

179 determined based on whether ``visit`` or ``exposure`` is present in 

180 ``data_id``, with ``visit`` checked first and hence used if both are 

181 present. 

182 

183 Notes 

184 ----- 

185 The packing used by this class is considered stable and part of its public 

186 interface so it can be reimplemented in contexts where delegation to this 

187 code is impractical (e.g. SQL user-defined functions):: 

188 

189 packed = \ 

190 detector + config.n_detectors * ( 

191 seq_num + config.n_seq_nums * ( 

192 convert_day_obs_to_ordinal(day_obs) 

193 - convert_day_obs_to_ordinal(config.day_obs_begin) 

194 + config.n_days * ( 

195 config.controllers[controllers] 

196 config.n_controllers * is_one_to_one_reinterpretation 

197 ) 

198 ) 

199 ) 

200 

201 See `RubinDimensionPackerConfig` and `pack_decomposition` for definitions 

202 of the above variables. 

203 """ 

204 

205 ConfigClass = RubinDimensionPackerConfig 

206 

207 def __init__( 

208 self, 

209 data_id: DataCoordinate, 

210 *, 

211 config: RubinDimensionPackerConfig | None = None, 

212 is_exposure: bool | None = None, 

213 ): 

214 if config is None: 

215 config = RubinDimensionPackerConfig() 

216 fixed = data_id.subset(data_id.universe.conform(["instrument"])) 

217 if is_exposure is None and data_id is not None: 

218 if "visit" in data_id.graph.names: 

219 is_exposure = False 

220 elif "exposure" in data_id.graph.names: 

221 is_exposure = True 

222 else: 

223 raise ValueError( 

224 "'is_exposure' was not provided and 'data_id' has no visit or exposure value." 

225 ) 

226 if is_exposure: 

227 dimensions = fixed.universe.conform(["instrument", "exposure", "detector"]) 

228 else: 

229 dimensions = fixed.universe.conform(["instrument", "visit", "detector"]) 

230 super().__init__(fixed, dimensions) 

231 self.config = config 

232 self.is_exposure = is_exposure 

233 self._max_bits = ( 

234 math.prod( 

235 [ 

236 self.config.n_visit_definitions, 

237 self.config.n_controllers, 

238 self.config.n_days, 

239 self.config.n_seq_nums, 

240 self.config.n_detectors, 

241 ] 

242 ) 

243 - 1 

244 ).bit_length() 

245 

246 @property 

247 def maxBits(self) -> int: 

248 # Docstring inherited from DimensionPacker.maxBits 

249 return self._max_bits 

250 

251 def _pack(self, dataId: DataCoordinate) -> int: 

252 # Docstring inherited from DimensionPacker._pack 

253 is_one_to_one_reinterpretation = False 

254 if not self.is_exposure: 

255 # Using a leading "9" as the indicator of a 

256 # one_to_one_reinterpretation visit is _slightly_ distasteful, as 

257 # it'd be better to delegate that to something in obs_base closer 

258 # to what puts the "9" there in the first place, but that class 

259 # doesn't have its own public interface where we could put such 

260 # things, and we don't have much choice but to assume which of the 

261 # visit system definitions we're using anyway. Good news is that 

262 # this is all very strictly RFC-controlled stable stuff that is 

263 # not going to change out from under us without warning. 

264 nine_if_special, exposure_id = divmod( 

265 dataId["visit"], 10**EXPOSURE_ID_MAXDIGITS 

266 ) 

267 if nine_if_special == 9: 

268 is_one_to_one_reinterpretation = True 

269 elif nine_if_special != 0: 

270 raise ValueError(f"Could not parse visit in {dataId}.") 

271 else: 

272 exposure_id = dataId["exposure"] 

273 # We unpack the exposure ID (which may really be [the remnant of] a 

274 # visit ID) instead of extracting these values from dimension records 

275 # because we really don't want to demand that the given data ID have 

276 # records attached. 

277 return self.pack_id_pair( 

278 exposure_id, 

279 dataId["detector"], 

280 is_one_to_one_reinterpretation, 

281 config=self.config, 

282 ) 

283 

284 def unpack(self, packedId: int) -> DataCoordinate: 

285 # Docstring inherited from DimensionPacker.unpack 

286 ( 

287 exposure_id, 

288 detector, 

289 is_one_to_one_reinterpretation, 

290 ) = self.unpack_id_pair(packedId, config=self.config) 

291 if self.is_exposure: 

292 if is_one_to_one_reinterpretation: 

293 raise ValueError( 

294 f"Packed data ID {packedId} may correspond to a valid visit ID, " 

295 "but not a valid exposure ID." 

296 ) 

297 return DataCoordinate.standardize( 

298 self.fixed, exposure=exposure_id, detector=detector 

299 ) 

300 else: 

301 if is_one_to_one_reinterpretation: 

302 visit_id = int(f"9{exposure_id}") 

303 else: 

304 visit_id = exposure_id 

305 return DataCoordinate.standardize( 

306 self.fixed, visit=visit_id, detector=detector 

307 ) 

308 

309 @staticmethod 

310 def pack_id_pair( 

311 exposure_id: int, 

312 detector: int, 

313 is_one_to_one_reinterpretation: bool = False, 

314 config: RubinDimensionPackerConfig | None = None, 

315 ) -> int: 

316 """Pack data ID values passed as arguments. 

317 

318 Parameters 

319 ---------- 

320 exposure_id : `int` 

321 Integer that uniquely identifies an exposure. 

322 detector : `int` 

323 Integer that uniquely identifies a detector. 

324 is_one_to_one_reinterpretation : `bool`, optional 

325 If `True`, instead of packing the given ``exposure_id``, pack a 

326 visit ID that represents the alternate interpretation of that 

327 exposure (which must be the first snap in a multi-snap sequence) as 

328 a standalone visit. 

329 

330 Returns 

331 ------- 

332 packed_id : `int` 

333 Integer that reversibly combines all of the given arguments. 

334 

335 Notes 

336 ----- 

337 This is a `staticmethod` and hence does not respect the config passed 

338 in at construction when called on an instance. This is to support 

339 usage in contexts where construction (which requires a 

340 `lsst.daf.butler.DimensionUniverse`) is inconvenient or impossible. 

341 """ 

342 day_obs, seq_num, controller = LsstBaseTranslator.unpack_exposure_id( 

343 exposure_id 

344 ) 

345 return RubinDimensionPacker.pack_decomposition( 

346 int(day_obs), 

347 seq_num, 

348 detector=detector, 

349 controller=controller, 

350 is_one_to_one_reinterpretation=is_one_to_one_reinterpretation, 

351 config=config, 

352 ) 

353 

354 @staticmethod 

355 def unpack_id_pair( 

356 packed_id: int, config: RubinDimensionPackerConfig | None = None 

357 ) -> tuple[int, int, bool]: 

358 """Unpack data ID values directly. 

359 

360 Parameters 

361 ---------- 

362 packed_id : `int` 

363 Integer produced by one of the methods of this class using the same 

364 configuration. 

365 

366 Returns 

367 ------- 

368 exposure_id : `int` 

369 Integer that uniquely identifies an exposure. 

370 detector : `int` 

371 Integer that uniquely identifies a detector. 

372 is_one_to_one_reinterpretation : `bool`, optional 

373 If `True`, instead of packing the given ``exposure_id``, the packed 

374 ID corresponds to the visit that represents the alternate 

375 interpretation of the first snap in a multi-snap sequence as a 

376 standalone visit. 

377 

378 Notes 

379 ----- 

380 This is a `staticmethod` and hence does not respect the config passed 

381 in at construction when called on an instance. This is to support 

382 usage in contexts where construction (which requires a 

383 `lsst.daf.butler.DimensionUniverse`) is inconvenient or impossible. 

384 """ 

385 ( 

386 day_obs, 

387 seq_num, 

388 detector, 

389 controller, 

390 is_one_to_one_reinterpretation, 

391 ) = RubinDimensionPacker.unpack_decomposition(packed_id, config=config) 

392 return ( 

393 LsstBaseTranslator.compute_exposure_id(str(day_obs), seq_num, controller), 

394 detector, 

395 is_one_to_one_reinterpretation, 

396 ) 

397 

398 @staticmethod 

399 def pack_decomposition( 

400 day_obs: int, 

401 seq_num: int, 

402 detector: int, 

403 controller: str = "O", 

404 is_one_to_one_reinterpretation: bool = False, 

405 config: RubinDimensionPackerConfig | None = None, 

406 ) -> int: 

407 """Pack Rubin-specific identifiers directly into an integer. 

408 

409 Parameters 

410 ---------- 

411 day_obs : `int` 

412 Day of observation as a YYYYMMDD decimal integer. 

413 seq_num : `int` 

414 Sequence number 

415 detector : `int` 

416 Detector ID. 

417 controller : `str`, optional 

418 Single-character controller code defined in 

419 `RubinDimensionPackerConfig.controllers`. 

420 is_one_to_one_reinterpretation : `bool`, optional 

421 If `True`, this is a visit ID that differs from the exposure ID of 

422 its first snap because it is the alternate interpretation of that 

423 first snap as a standalone visit. 

424 config : `RubinDimensionPackerConfig`, optional 

425 Configuration, including upper bounds on all arguments. 

426 

427 Returns 

428 ------- 

429 packed_id : `int` 

430 Integer that reversibly combines all of the given arguments. 

431 

432 Notes 

433 ----- 

434 This is a `staticmethod` and hence does not respect the config passed 

435 in at construction when called on an instance. This is to support 

436 usage in contexts where construction (which requires a 

437 `lsst.daf.butler.DimensionUniverse`) is inconvenient or impossible. 

438 """ 

439 if config is None: 

440 config = RubinDimensionPackerConfig() 

441 day_obs_ordinal_begin = convert_day_obs_to_ordinal(config.day_obs_begin) 

442 result = int(is_one_to_one_reinterpretation) 

443 result *= config.n_controllers 

444 try: 

445 result += config.controllers[controller] 

446 except KeyError: 

447 raise ValueError(f"Unrecognized controller code {controller!r}.") from None 

448 day_obs_ordinal = convert_day_obs_to_ordinal(day_obs) - day_obs_ordinal_begin 

449 if day_obs_ordinal < 0: 

450 raise ValueError( 

451 f"day_obs {day_obs} is out of bounds; must be >= " 

452 f"{convert_ordinal_to_day_obs(day_obs_ordinal_begin)}." 

453 ) 

454 if day_obs_ordinal > config.n_days: 

455 raise ValueError( 

456 f"day_obs {day_obs} is out of bounds; must be < " 

457 f"{convert_ordinal_to_day_obs(day_obs_ordinal_begin + config.n_days)}." 

458 ) 

459 result *= config.n_days 

460 result += day_obs_ordinal 

461 if seq_num < 0: 

462 raise ValueError(f"seq_num {seq_num} is negative.") 

463 if seq_num >= config.n_seq_nums: 

464 raise ValueError( 

465 f"seq_num is out of bounds; must be < {config.n_seq_nums}." 

466 ) 

467 result *= config.n_seq_nums 

468 result += seq_num 

469 if detector < 0: 

470 raise ValueError(f"detector {detector} is out of bounds; must be >= 0.") 

471 if detector >= config.n_detectors: 

472 raise ValueError( 

473 f"detector {detector} is out of bounds; must be < {config.n_detectors}." 

474 ) 

475 result *= config.n_detectors 

476 result += detector 

477 return result 

478 

479 @staticmethod 

480 def unpack_decomposition( 

481 packed_id: int, config: RubinDimensionPackerConfig | None = None 

482 ) -> tuple[int, int, int, str, bool]: 

483 """Unpack an integer into Rubin-specific identifiers. 

484 

485 Parameters 

486 ---------- 

487 packed_id : `int` 

488 Integer produced by one of the methods of this class using the same 

489 configuration. 

490 config : `RubinDimensionPackerConfig`, optional 

491 Configuration, including upper bounds on all arguments. 

492 

493 Returns 

494 ------- 

495 day_obs : `int` 

496 Day of observation as a YYYYMMDD decimal integer. 

497 seq_num : `int` 

498 Sequence number 

499 detector : `int` 

500 Detector ID. 

501 controller : `str` 

502 Single-character controller code defined in 

503 `RubinDimensionPackerConfig.controllers`. 

504 is_one_to_one_reinterpretation : `bool` 

505 If `True`, this is a visit ID that differs from the exposure ID of 

506 its first snap because it is the alternate interpretation of that 

507 first snap as a standalone visit. 

508 

509 Notes 

510 ----- 

511 This is a `staticmethod` and hence does not respect the config passed 

512 in at construction when called on an instance. This is to support 

513 usage in contexts where construction (which requires a 

514 `lsst.daf.butler.DimensionUniverse`) is inconvenient or impossible. 

515 """ 

516 if config is None: 

517 config = RubinDimensionPackerConfig() 

518 rest, detector = divmod(packed_id, config.n_detectors) 

519 rest, seq_num = divmod(rest, config.n_seq_nums) 

520 rest, day_obs_ordinal = divmod(rest, config.n_days) 

521 rest, controller_int = divmod(rest, config.n_controllers) 

522 rest, is_one_to_one_reintepretation_int = divmod( 

523 rest, config.n_visit_definitions 

524 ) 

525 if rest: 

526 raise ValueError( 

527 f"Unexpected overall factor {rest} in packed data ID {packed_id}." 

528 ) 

529 for controller_code, index in config.controllers.items(): 

530 if index == controller_int: 

531 break 

532 else: 

533 raise ValueError( 

534 f"Unrecognized controller index {controller_int} in packed data ID {packed_id}." 

535 ) 

536 return ( 

537 convert_ordinal_to_day_obs(day_obs_ordinal + convert_day_obs_to_ordinal(config.day_obs_begin)), 

538 seq_num, 

539 detector, 

540 controller_code, 

541 bool(is_one_to_one_reintepretation_int), 

542 ) 

543 

544 

545# The double-registration guard here would be unnecessary if not for 

546# pytest-flake8 and some horribleness it must be doing to circumvent Python's 

547# own guards against importing the same module twice in the same process. 

548if "rubin" not in observation_packer_registry: 548 ↛ exitline 548 didn't exit the module because the condition on line 548 was always true

549 observation_packer_registry.register("rubin", RubinDimensionPacker)