Coverage for python / lsst / pipe / tasks / associationUtils.py: 9%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:38 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Utilities for interfacing with hpgeom. Originally implemented in
23http://github.com/LSSTDESC/dia_pipe and then translated to hpgeom.
24"""
26__all__ = ["toIndex", "toRaDec", "eq2xyz", "eq2xyzVec", "convert_spherical",
27 "convert_spherical_array", "query_disc", "obj_id_to_ss_object_id", "ss_object_id_to_obj_id"]
29from astropy.time import Time
30import hpgeom as hpg
31import numpy as np
32import numpy.typing as npt
35def toIndex(nside, ra, dec):
36 """Return healpix index given ra, dec in degrees
38 Parameters
39 ----------
40 nside : `int`
41 Power of 2 nside healpix resolution.
42 ra : `float`
43 RA in degrees.
44 dec : `float`
45 Declination in degrees
47 Returns
48 -------
49 index : `int`
50 Unique healpix pixel ID containing point RA, DEC at resolution nside.
51 """
52 return hpg.angle_to_pixel(nside, ra, dec, nest=False)
55def toRaDec(nside, index):
56 """Convert from healpix index to ra,dec in degrees
58 Parameters
59 ----------
60 nside : `int`
61 Resolution of healpixel "grid".
62 index : `int`
63 Index of the healpix pixel we want to find the location of.
65 Returns
66 -------
67 pos : `numpy.ndarray`, (2,)
68 RA and DEC of healpix pixel location in degrees.
69 """
70 ra, dec = hpg.pixel_to_angle(nside, index, nest=False)
71 return np.dstack((ra, dec))[0]
74def eq2xyz(ra, dec):
75 """Convert from equatorial ra,dec in degrees to x,y,z on unit sphere.
77 Parameters
78 ----------
79 ra : `float`
80 RA in degrees.
81 dec : `float`
82 Declination in degrees
84 Returns
85 -------
86 xyz : `numpy.ndarray`, (3,)
87 Float xyz positions on the unit sphere.
88 """
89 phi = np.deg2rad(ra)
90 theta = np.pi/2 - np.deg2rad(dec)
91 sintheta = np.sin(theta)
92 x = sintheta*np.cos(phi)
93 y = sintheta*np.sin(phi)
94 z = np.cos(theta)
95 return np.array([x, y, z])
98def eq2xyzVec(ra, dec):
99 """Convert equatorial ra,dec in degrees to x,y,z on the unit sphere
100 parameters
102 Vectorized version of ``eq2xyz``
104 Parameters
105 ----------
106 ra : array_like, (N,)
107 Array of RA in degrees.
108 dec : array_like, (N,)
109 Declination in degrees
111 Returns
112 -------
113 vec : `numpy.ndarray`, (N,3)
114 Array of unitsphere 3-vectors.
115 """
116 ra = np.array(ra, dtype='f8', ndmin=1, copy=False)
117 dec = np.array(dec, dtype='f8', ndmin=1, copy=False)
118 if ra.size != dec.size:
119 raise ValueError("ra,dec not same size: %s,%s" % (ra.size, dec.size))
121 vec = eq2xyz(ra, dec)
123 return vec
126def convert_spherical(ra, dec):
127 """Convert from ra,dec to spherical coordinates.
129 Used in query_disc.
131 Parameters
132 ----------
133 ra : `float`
134 RA in radians.
135 dec : `float`
136 Declination in radians
137 """
138 return np.dstack([np.cos(dec*np.pi/180)*np.cos(ra*np.pi/180),
139 np.cos(dec*np.pi/180)*np.sin(ra*np.pi/180),
140 np.sin(dec*np.pi/180)])[0]
143def convert_spherical_array(array):
144 """Convert from and a array ra,dec to spherical coordinates.
146 Used in query_disc
148 Parameters
149 ----------
150 array : `numpy.ndarray`, (N, 2)
151 (N, 2) Array of RA, DEC values.
153 Returns
154 -------
155 vecs : `numpy.ndarray`, (N, 3)
156 Vectors on the unit sphere
157 """
158 ra = array[:, 0]
159 dec = array[:, 1]
160 return convert_spherical(ra, dec)
163def query_disc(nside, ra, dec, max_rad, min_rad=0):
164 """Get the list of healpix indices within max_rad, min_rad given in radians
165 around ra,dec given in degrees
167 Parameters
168 ----------
169 nside : `int`
170 Resolution of the healpixels to search/return.
171 ra : `float`
172 RA in degrees.
173 dec : `float`
174 Declination in degrees
175 max_rad : `float`
176 Max distance in radians to search nearby healpixels.
177 min_rad : `float`, optional
178 Minimum distance in radians to search healpixels. Default = 0.
179 """
180 ra = np.atleast_1d(ra)
181 dec = np.atleast_1d(dec)
183 max_rad_deg = np.rad2deg(max_rad)
185 pixels = np.unique(
186 [hpg.query_circle(nside, a, b, max_rad_deg, nest=False)
187 for (a, b) in zip(ra, dec)])
189 if min_rad > 0 and len(pixels) > 0:
190 vec0 = convert_spherical(ra, dec)
191 min_rad2 = min_rad**2
192 vecs = convert_spherical_array(toRaDec(nside, pixels))
193 dsq = np.sum((vecs - vec0)**2, axis=1)
194 match = dsq > min_rad2
195 pixels = pixels[match]
197 return pixels
200def obj_id_to_ss_object_id(objID):
201 if ' ' in objID:
202 objID = pack_mpc_designation(objID)
203 return packed_obj_id_to_ss_object_id(objID)
206def packed_obj_id_to_ss_object_id(objID):
207 """Convert from Minor Planet Center packed provisional object ID to
208 Rubin ssObjectID.
210 Parameters
211 ----------
212 objID : `str`
213 Minor Planet Center packed provisional designation for a small solar
214 system object. Must be fewer than eight characters.
216 Returns
217 -------
218 ssObjectID : `int`
219 Rubin ssObjectID
221 Raises
222 ------
223 ValueError
224 Raised if either objID is shorter than 7 or longer than 8 characters or contains
225 illegal objID characters
226 """
227 if len(objID) > 8:
228 raise ValueError(f'objID longer than 8 characters: "{objID}"')
229 if len(objID) < 7:
230 raise ValueError(f'objID shorter than 7 characters: "{objID}"')
231 if any([ord(c) > 255 for c in objID]):
232 raise ValueError(f'{[c for c in objID if ord(c) > 255]} not legal objID characters (ascii [1, 255])')
234 ssObjectID = ord(objID[0])
235 for character in objID[1:]:
236 ssObjectID <<= 8
237 ssObjectID += ord(character)
238 return ssObjectID
241def ss_object_id_to_obj_id(ssObjectID, packed=False):
242 """Convert from Rubin ssObjectID to Minor Planet Center packed provisional
243 object ID.
245 Parameters
246 ----------
247 ssObjectID : `int`
248 Rubin ssObjectID
250 Returns
251 -------
252 objID : `str`
253 Minor Planet Center packed provisional designation.
255 Raises
256 ------
257 """
258 if ssObjectID < 0 or ssObjectID >= (1 << 64):
259 raise ValueError(f'ssObjectID ({ssObjectID}) outside [0, 2^64 - 1].')
261 objID = ''.join([chr((ssObjectID >> (8 * i)) % 256) for i in reversed(range(0, 8))])
262 objID = objID.replace('\x00', '')
263 if packed:
264 return objID
265 else:
266 return unpack_mpc_designation(objID)
268# All the below designation-related code are copied from B612's adam_core
269# adam_core should eventually be added as an external dependency, and this
270# should be replaced with imports on DM-53907
273def pack_mpc_designation(designation: str) -> str:
274 """
275 B612 code to pack a unpacked MPC designation. For example,
276 provisional designation 1998 SS162 will be packed to J98SG2S.
277 Permanent designation 323 will be packed to 00323.
279 TODO: add support for comet and natural satellite designations
281 Parameters
282 ----------
283 designation : str
284 MPC unpacked designation.
286 Returns
287 -------
288 designation_pf : str
289 MPC packed form designation.
291 Raises
292 ------
293 ValueError : If designation cannot be packed.
294 """
295 # Lets see if its a numbered object
296 try:
297 return pack_numbered_designation(designation)
298 except ValueError:
299 pass
301 # If its not numbered, maybe its a provisional designation
302 try:
303 return pack_provisional_designation(designation)
304 except ValueError:
305 pass
307 # If its a survey designation, deal with it
308 try:
309 return pack_survey_designation(designation)
310 except ValueError:
311 pass
313 err = (
314 "Unpacked designation '{}' could not be packed.\n"
315 "It could not be recognized as any of the following:\n"
316 " - a numbered object (e.g. '3202', '203289', '3140113')\n"
317 " - a provisional designation (e.g. '1998 SV127', '2008 AA360')\n"
318 " - a survey designation (e.g. '2040 P-L', '3138 T-1')"
319 )
320 raise ValueError(err.format(designation))
323BASE62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
324BASE62_MAP = {BASE62[i]: i for i in range(len(BASE62))}
327def _unpack_mpc_date(epoch_pf: str) -> Time:
328 # Taken from Lynne Jones' SSO TOOLS.
329 # See https://minorplanetcenter.net/iau/info/PackedDates.html
330 # for MPC documentation on packed dates.
331 # Examples:
332 # 1998 Jan. 18.73 = J981I73
333 # 2001 Oct. 22.138303 = K01AM138303
334 epoch_pf = str(epoch_pf)
335 year = int(epoch_pf[0], base=32) * 100 + int(epoch_pf[1:3])
336 month = int(epoch_pf[3], base=32)
337 day = int(epoch_pf[4], base=32)
338 isot_string = "{:d}-{:02d}-{:02d}".format(year, month, day)
340 if len(epoch_pf) > 5:
341 fractional_day = float("." + epoch_pf[5:])
342 hours = int((24 * fractional_day))
343 minutes = int(60 * ((24 * fractional_day) - hours))
344 seconds = 3600 * (24 * fractional_day - hours - minutes / 60)
345 isot_string += "T{:02d}:{:02d}:{:09.6f}".format(hours, minutes, seconds)
347 return Time(isot_string, format="isot", scale="tt")
350def convert_mpc_packed_dates(pf_tt: npt.ArrayLike) -> Time:
351 """
352 Convert MPC packed form dates (in the TT time scale) to
353 MJDs in TT. See: https://minorplanetcenter.net/iau/info/PackedDates.html
354 for details on the packed date format.
356 Parameters
357 ----------
358 pf_tt : `~numpy.ndarray` (N)
359 MPC-style packed form epochs in the TT time scale.
361 Returns
362 -------
363 mjd_tt : `~astropy.time.core.Time` (N)
364 Epochs in TT MJDs.
365 """
366 isot_tt = []
367 for epoch in pf_tt:
368 isot_tt.append(_unpack_mpc_date(epoch))
370 return Time(isot_tt)
373def pack_numbered_designation(designation: str) -> str:
374 """
375 Pack a numbered MPC designation.
377 Examples of numbered designations:
378 Numbered Packed
379 3202 03202
380 50000 50000
381 100345 A0345
382 360017 a0017
383 203289 K3289
384 620000 ~0000
385 620061 ~000z
386 3140113 ~AZaz
387 15396335 ~zzzz
389 Parameters
390 ----------
391 designation : str
392 MPC numbered designation.
394 Returns
395 -------
396 designation_pf : str
397 MPC packed numbered designation.
399 Raises
400 ------
401 ValueError : If the numbered designation cannot be packed.
402 If the numbered designation is larger than 15396335.
403 """
404 number = int(designation)
405 if number > 15396335:
406 raise ValueError(
407 "Numbered designation is too large. Maximum supported is 15396335."
408 )
410 if number <= 99999:
411 return "{:05}".format(number)
412 elif (number >= 100000) and (number <= 619999):
413 bigpart, remainder = divmod(number, 10000)
414 return f"{BASE62[bigpart]}{remainder:04}"
415 else:
416 x = number - 620000
417 number_pf = []
418 while x:
419 number_pf.append(BASE62[int(x % 62)])
420 x //= 62
422 number_pf.reverse()
423 return "~{}".format("".join(number_pf).zfill(4))
426def pack_provisional_designation(designation: str) -> str:
427 """
428 Pack a provisional MPC designation.
430 Examples of provisional designations:
431 Provisional Packed
432 1995 XA J95X00A
433 1995 XL1 J95X01L
434 1995 FB13 J95F13B
435 1998 SQ108 J98SA8Q
436 1998 SV127 J98SC7V
437 1998 SS162 J98SG2S
438 2099 AZ193 K99AJ3Z
439 2008 AA360 K08Aa0A
440 2007 TA418 K07Tf8A
442 Parameters
443 ----------
444 designation : str
445 MPC provisional designation.
447 Returns
448 -------
449 designation_pf : str
450 MPC packed provisional designation.
452 Raises
453 ------
454 ValueError : If the provisional designation cannot be packed.
455 The provisional designations is not at least 6 characters long.
456 The first 4 characters of the provisional designation are not a year.
457 The 5th character of the provisional designation is not a space.
458 The provisional designation contains a hyphen.
459 The half-month letter is I or Z.
460 """
461 if len(designation) < 6:
462 raise ValueError(
463 "Provisional designations should be at least 6 characters long."
464 )
465 if not designation[:3].isdecimal():
466 raise ValueError(
467 "Expected the first 4 characters of the provisional designation to be a year."
468 )
469 if designation[4] != " ":
470 raise ValueError(
471 "Expected the 5th character of the provisional designation to be a space."
472 )
473 if "-" in designation:
474 raise ValueError("Provisional designations cannot contain a hyphen.")
476 year = BASE62[int(designation[0:2])] + designation[2:4]
477 letter1 = designation[5]
478 letter2 = designation[6]
479 cycle = designation[7:]
481 if letter1 in {"I", "Z"}:
482 raise ValueError("Half-month letters cannot be I or Z.")
483 if letter1.isdecimal() or letter2.isdecimal():
484 raise ValueError("Invalid provisional designation.")
486 cycle_pf = "00"
487 if len(cycle) > 0:
488 cycle_int = int(cycle)
489 if cycle_int <= 99:
490 cycle_pf = str(cycle_int).zfill(2)
491 else:
492 cycle_pf = BASE62[cycle_int // 10] + str(cycle_int % 10)
494 designation_pf = "{}{}{}{}".format(year, letter1, cycle_pf, letter2)
495 return designation_pf
498def pack_survey_designation(designation: str) -> str:
499 """
500 Pack a survey MPC designation.
502 Examples of survey designations:
503 Survey Packed
504 2040 P-L PLS2040
505 3138 T-1 T1S3138
506 1010 T-2 T2S1010
507 4101 T-3 T3S4101
509 Parameters
510 ----------
511 designation : str
512 MPC survey designation.
514 Returns
515 -------
516 designation_pf : str
517 MPC packed survey designation.
519 Raises
520 ------
521 ValueError : If the survey designation cannot be packed.
522 The survey designation does not start with P-L, T-1, T-2, or T-3.
523 """
524 number = designation[0:4]
525 survey = designation[5:]
527 if survey == "P-L":
528 survey_pf = "PLS"
530 elif survey[0:2] == "T-" and survey[2] in {"1", "2", "3"}:
531 survey_pf = "T{}S".format(survey[2])
533 else:
534 raise ValueError("Survey designations must start with P-L, T-1, T-2, T-3.")
536 designation_pf = "{}{}".format(survey_pf, number.zfill(4))
537 return designation_pf
540def unpack_numbered_designation(designation_pf: str) -> str:
541 """
542 Unpack a numbered MPC designation.
544 Examples of numbered designations:
545 Numbered Unpacked
546 03202 3202
547 50000 50000
548 A0345 100345
549 a0017 360017
550 K3289 203289
551 ~0000 620000
552 ~000z 620061
553 ~AZaz 3140113
554 ~zzzz 15396335
556 Parameters
557 ----------
558 designation_pf : str
559 MPC packed numbered designation.
561 Returns
562 -------
563 designation : str
564 MPC unpacked numbered designation.
566 Raises
567 ------
568 ValueError : If the numbered designation cannot be unpacked.
569 The packed numbered designation is not at least 4 characters long.
570 """
571 number = None
572 # Numbered objects (1 - 99999)
573 if designation_pf.isdecimal():
574 number = int(designation_pf)
576 # Numbered objects (620000+)
577 elif designation_pf[0] == "~":
578 number = 620000
579 number_pf = designation_pf[1:]
580 for i, c in enumerate(number_pf):
581 power = len(number_pf) - (i + 1)
582 number += BASE62_MAP[c] * (62**power)
584 # Numbered objects (100000 - 619999)
585 else:
586 number = BASE62_MAP[designation_pf[0]] * 10000 + int(designation_pf[1:])
588 if number is None:
589 raise ValueError("Packed numbered designation could not be unpacked.")
590 else:
591 designation = str(number)
593 return designation
596def unpack_provisional_designation(designation_pf: str) -> str:
597 """
598 Unpack a provisional MPC designation.
600 Examples of provisional designations:
601 Provisional Unpacked
602 J95X00A 1995 XA
603 J95X01L 1995 XL1
604 J95F13B 1995 FB13
605 J98SA8Q 1998 SQ108
606 J98SC7V 1998 SV127
607 J98SG2S 1998 SS162
608 K99AJ3Z 2099 AZ193
609 K08Aa0A 2008 AA360
610 K07Tf8A 2007 TA418
612 Parameters
613 ----------
614 designation_pf : str
615 MPC packed provisional designation.
617 Returns
618 -------
619 designation : str
620 MPC unpacked provisional designation.
622 Raises
623 ------
624 ValueError : If the provisional designation cannot be unpacked.
625 The packed provisional designation is not 7 characters long.
626 The packed provisional designation does not have a year.
627 """
628 if len(designation_pf) != 7:
629 raise ValueError(f"Provisional designation must be 7 characters long: '{designation_pf}'")
630 if not designation_pf[1].isdecimal() or not designation_pf[2].isdecimal():
631 raise ValueError(f"Provisional designation must have a year: '{designation_pf}'")
632 year = str(BASE62_MAP[designation_pf[0]] * 100 + int(designation_pf[1:3]))
633 letter1 = designation_pf[3]
634 letter2 = designation_pf[6]
635 if letter1.isdecimal() or letter2.isdecimal():
636 raise ValueError(f"fourth or seventh char is decimal: '{designation_pf}'")
637 cycle1 = designation_pf[4]
638 cycle2 = designation_pf[5]
640 number = int(BASE62_MAP[cycle1]) * 10 + BASE62_MAP[cycle2]
641 if number == 0:
642 number_str = ""
643 else:
644 number_str = str(number)
646 designation = "{} {}{}{}".format(year, letter1, letter2, number_str)
648 return designation
651def unpack_survey_designation(designation_pf: str) -> str:
652 """
653 Unpack a survey MPC designation.
655 Examples of survey designations:
656 Survey Packed
657 PLS2040 2040 P-L
658 T1S3138 3138 T-1
659 T2S1010 1010 T-2
660 T3S4101 4101 T-3
662 Parameters
663 ----------
664 designation_pf : str
665 MPC packed survey designation.
667 Returns
668 -------
669 designation : str
670 MPC unpacked survey designation.
672 Raises
673 ------
674 ValueError : If the survey designation cannot be unpacked.
675 The packed survey designation does not start with PLS, T1S, T2S, or T3S.
676 """
677 number = int(designation_pf[3:8])
678 survey_pf = designation_pf[0:3]
679 if survey_pf not in {"PLS", "T1S", "T2S", "T3S"}:
680 raise ValueError(
681 "Packed survey designation must start with PLS, T1S, T2S, or T3S."
682 )
684 if survey_pf == "PLS":
685 survey = "P-L"
687 if survey_pf[0] == "T" and survey_pf[2] == "S":
688 survey = "T-{}".format(survey_pf[1])
690 designation = "{} {}".format(number, survey)
691 return designation
694def unpack_mpc_designation(designation_pf: str) -> str:
695 """
696 Unpack a packed MPC designation. For example, provisional
697 designation J98SG2S will be unpacked to 1998 SS162. Permanent
698 designation 00323 will be unpacked to 323.
700 TODO: add support for comet and natural satellite designations
702 Parameters
703 ----------
704 designation_pf : str
705 MPC packed form designation.
707 Returns
708 -------
709 designation : str
710 MPC unpacked designation.
712 Raises
713 ------
714 ValueError : If designation_pf cannot be unpacked.
715 """
716 # Lets see if its a numbered object
717 try:
718 return unpack_numbered_designation(designation_pf)
719 except ValueError:
720 pass
722 # Lets see if its a provisional designation
723 try:
724 return unpack_provisional_designation(designation_pf)
725 except ValueError:
726 pass
728 # Lets see if its a survey designation
729 try:
730 return unpack_survey_designation(designation_pf)
731 except ValueError:
732 pass
734 # At this point we haven't had any success so lets raise an error
735 err = (
736 "Packed form designation '{}' could not be unpacked.\n"
737 "It could not be recognized as any of the following:\n"
738 " - a numbered object (e.g. '03202', 'K3289', '~AZaz')\n"
739 " - a provisional designation (e.g. 'J98SC7V', 'K08Aa0A')\n"
740 " - a survey designation (e.g. 'PLS2040', 'T1S3138')"
741 )
742 raise ValueError(err.format(designation_pf))