lsst.pipe.tasks g540474b770+e939cf0e26
Loading...
Searching...
No Matches
associationUtils.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Utilities for interfacing with hpgeom. Originally implemented in
23http://github.com/LSSTDESC/dia_pipe and then translated to hpgeom.
24"""
25
26__all__ = ["toIndex", "toRaDec", "eq2xyz", "eq2xyzVec", "convert_spherical",
27 "convert_spherical_array", "query_disc", "obj_id_to_ss_object_id", "ss_object_id_to_obj_id"]
28
29from astropy.time import Time
30import hpgeom as hpg
31import numpy as np
32import numpy.typing as npt
33
34
35def toIndex(nside, ra, dec):
36 """Return healpix index given ra, dec in degrees
37
38 Parameters
39 ----------
40 nside : `int`
41 Power of 2 nside healpix resolution.
42 ra : `float`
43 RA in degrees.
44 dec : `float`
45 Declination in degrees
46
47 Returns
48 -------
49 index : `int`
50 Unique healpix pixel ID containing point RA, DEC at resolution nside.
51 """
52 return hpg.angle_to_pixel(nside, ra, dec, nest=False)
53
54
55def toRaDec(nside, index):
56 """Convert from healpix index to ra,dec in degrees
57
58 Parameters
59 ----------
60 nside : `int`
61 Resolution of healpixel "grid".
62 index : `int`
63 Index of the healpix pixel we want to find the location of.
64
65 Returns
66 -------
67 pos : `numpy.ndarray`, (2,)
68 RA and DEC of healpix pixel location in degrees.
69 """
70 ra, dec = hpg.pixel_to_angle(nside, index, nest=False)
71 return np.dstack((ra, dec))[0]
72
73
74def eq2xyz(ra, dec):
75 """Convert from equatorial ra,dec in degrees to x,y,z on unit sphere.
76
77 Parameters
78 ----------
79 ra : `float`
80 RA in degrees.
81 dec : `float`
82 Declination in degrees
83
84 Returns
85 -------
86 xyz : `numpy.ndarray`, (3,)
87 Float xyz positions on the unit sphere.
88 """
89 phi = np.deg2rad(ra)
90 theta = np.pi/2 - np.deg2rad(dec)
91 sintheta = np.sin(theta)
92 x = sintheta*np.cos(phi)
93 y = sintheta*np.sin(phi)
94 z = np.cos(theta)
95 return np.array([x, y, z])
96
97
98def eq2xyzVec(ra, dec):
99 """Convert equatorial ra,dec in degrees to x,y,z on the unit sphere
100 parameters
101
102 Vectorized version of ``eq2xyz``
103
104 Parameters
105 ----------
106 ra : array_like, (N,)
107 Array of RA in degrees.
108 dec : array_like, (N,)
109 Declination in degrees
110
111 Returns
112 -------
113 vec : `numpy.ndarray`, (N,3)
114 Array of unitsphere 3-vectors.
115 """
116 ra = np.array(ra, dtype='f8', ndmin=1, copy=False)
117 dec = np.array(dec, dtype='f8', ndmin=1, copy=False)
118 if ra.size != dec.size:
119 raise ValueError("ra,dec not same size: %s,%s" % (ra.size, dec.size))
120
121 vec = eq2xyz(ra, dec)
122
123 return vec
124
125
126def convert_spherical(ra, dec):
127 """Convert from ra,dec to spherical coordinates.
128
129 Used in query_disc.
130
131 Parameters
132 ----------
133 ra : `float`
134 RA in radians.
135 dec : `float`
136 Declination in radians
137 """
138 return np.dstack([np.cos(dec*np.pi/180)*np.cos(ra*np.pi/180),
139 np.cos(dec*np.pi/180)*np.sin(ra*np.pi/180),
140 np.sin(dec*np.pi/180)])[0]
141
142
144 """Convert from and a array ra,dec to spherical coordinates.
145
146 Used in query_disc
147
148 Parameters
149 ----------
150 array : `numpy.ndarray`, (N, 2)
151 (N, 2) Array of RA, DEC values.
152
153 Returns
154 -------
155 vecs : `numpy.ndarray`, (N, 3)
156 Vectors on the unit sphere
157 """
158 ra = array[:, 0]
159 dec = array[:, 1]
160 return convert_spherical(ra, dec)
161
162
163def query_disc(nside, ra, dec, max_rad, min_rad=0):
164 """Get the list of healpix indices within max_rad, min_rad given in radians
165 around ra,dec given in degrees
166
167 Parameters
168 ----------
169 nside : `int`
170 Resolution of the healpixels to search/return.
171 ra : `float`
172 RA in degrees.
173 dec : `float`
174 Declination in degrees
175 max_rad : `float`
176 Max distance in radians to search nearby healpixels.
177 min_rad : `float`, optional
178 Minimum distance in radians to search healpixels. Default = 0.
179 """
180 ra = np.atleast_1d(ra)
181 dec = np.atleast_1d(dec)
182
183 max_rad_deg = np.rad2deg(max_rad)
184
185 pixels = np.unique(
186 [hpg.query_circle(nside, a, b, max_rad_deg, nest=False)
187 for (a, b) in zip(ra, dec)])
188
189 if min_rad > 0 and len(pixels) > 0:
190 vec0 = convert_spherical(ra, dec)
191 min_rad2 = min_rad**2
192 vecs = convert_spherical_array(toRaDec(nside, pixels))
193 dsq = np.sum((vecs - vec0)**2, axis=1)
194 match = dsq > min_rad2
195 pixels = pixels[match]
196
197 return pixels
198
199
201 if ' ' in objID:
202 objID = pack_mpc_designation(objID)
204
205
207 """Convert from Minor Planet Center packed provisional object ID to
208 Rubin ssObjectID.
209
210 Parameters
211 ----------
212 objID : `str`
213 Minor Planet Center packed provisional designation for a small solar
214 system object. Must be fewer than eight characters.
215
216 Returns
217 -------
218 ssObjectID : `int`
219 Rubin ssObjectID
220
221 Raises
222 ------
223 ValueError
224 Raised if either objID is shorter than 7 or longer than 8 characters or contains
225 illegal objID characters
226 """
227 if len(objID) > 8:
228 raise ValueError(f'objID longer than 8 characters: "{objID}"')
229 if len(objID) < 7:
230 raise ValueError(f'objID shorter than 7 characters: "{objID}"')
231 if any([ord(c) > 255 for c in objID]):
232 raise ValueError(f'{[c for c in objID if ord(c) > 255]} not legal objID characters (ascii [1, 255])')
233
234 ssObjectID = ord(objID[0])
235 for character in objID[1:]:
236 ssObjectID <<= 8
237 ssObjectID += ord(character)
238 return ssObjectID
239
240
241def ss_object_id_to_obj_id(ssObjectID, packed=False):
242 """Convert from Rubin ssObjectID to Minor Planet Center packed provisional
243 object ID.
244
245 Parameters
246 ----------
247 ssObjectID : `int`
248 Rubin ssObjectID
249
250 Returns
251 -------
252 objID : `str`
253 Minor Planet Center packed provisional designation.
254
255 Raises
256 ------
257 """
258 if ssObjectID < 0 or ssObjectID >= (1 << 64):
259 raise ValueError(f'ssObjectID ({ssObjectID}) outside [0, 2^64 - 1].')
260
261 objID = ''.join([chr((ssObjectID >> (8 * i)) % 256) for i in reversed(range(0, 8))])
262 objID = objID.replace('\x00', '')
263 if packed:
264 return objID
265 else:
266 return unpack_mpc_designation(objID)
267
268# All the below designation-related code are copied from B612's adam_core
269# adam_core should eventually be added as an external dependency, and this
270# should be replaced with imports on DM-53907
271
272
273def pack_mpc_designation(designation: str) -> str:
274 """
275 B612 code to pack a unpacked MPC designation. For example,
276 provisional designation 1998 SS162 will be packed to J98SG2S.
277 Permanent designation 323 will be packed to 00323.
278
279 TODO: add support for comet and natural satellite designations
280
281 Parameters
282 ----------
283 designation : str
284 MPC unpacked designation.
285
286 Returns
287 -------
288 designation_pf : str
289 MPC packed form designation.
290
291 Raises
292 ------
293 ValueError : If designation cannot be packed.
294 """
295 # Lets see if its a numbered object
296 try:
297 return pack_numbered_designation(designation)
298 except ValueError:
299 pass
300
301 # If its not numbered, maybe its a provisional designation
302 try:
303 return pack_provisional_designation(designation)
304 except ValueError:
305 pass
306
307 # If its a survey designation, deal with it
308 try:
309 return pack_survey_designation(designation)
310 except ValueError:
311 pass
312
313 err = (
314 "Unpacked designation '{}' could not be packed.\n"
315 "It could not be recognized as any of the following:\n"
316 " - a numbered object (e.g. '3202', '203289', '3140113')\n"
317 " - a provisional designation (e.g. '1998 SV127', '2008 AA360')\n"
318 " - a survey designation (e.g. '2040 P-L', '3138 T-1')"
319 )
320 raise ValueError(err.format(designation))
321
322
323BASE62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
324BASE62_MAP = {BASE62[i]: i for i in range(len(BASE62))}
325
326
327def _unpack_mpc_date(epoch_pf: str) -> Time:
328 # Taken from Lynne Jones' SSO TOOLS.
329 # See https://minorplanetcenter.net/iau/info/PackedDates.html
330 # for MPC documentation on packed dates.
331 # Examples:
332 # 1998 Jan. 18.73 = J981I73
333 # 2001 Oct. 22.138303 = K01AM138303
334 epoch_pf = str(epoch_pf)
335 year = int(epoch_pf[0], base=32) * 100 + int(epoch_pf[1:3])
336 month = int(epoch_pf[3], base=32)
337 day = int(epoch_pf[4], base=32)
338 isot_string = "{:d}-{:02d}-{:02d}".format(year, month, day)
339
340 if len(epoch_pf) > 5:
341 fractional_day = float("." + epoch_pf[5:])
342 hours = int((24 * fractional_day))
343 minutes = int(60 * ((24 * fractional_day) - hours))
344 seconds = 3600 * (24 * fractional_day - hours - minutes / 60)
345 isot_string += "T{:02d}:{:02d}:{:09.6f}".format(hours, minutes, seconds)
346
347 return Time(isot_string, format="isot", scale="tt")
348
349
350def convert_mpc_packed_dates(pf_tt: npt.ArrayLike) -> Time:
351 """
352 Convert MPC packed form dates (in the TT time scale) to
353 MJDs in TT. See: https://minorplanetcenter.net/iau/info/PackedDates.html
354 for details on the packed date format.
355
356 Parameters
357 ----------
358 pf_tt : `~numpy.ndarray` (N)
359 MPC-style packed form epochs in the TT time scale.
360
361 Returns
362 -------
363 mjd_tt : `~astropy.time.core.Time` (N)
364 Epochs in TT MJDs.
365 """
366 isot_tt = []
367 for epoch in pf_tt:
368 isot_tt.append(_unpack_mpc_date(epoch))
369
370 return Time(isot_tt)
371
372
373def pack_numbered_designation(designation: str) -> str:
374 """
375 Pack a numbered MPC designation.
376
377 Examples of numbered designations:
378 Numbered Packed
379 3202 03202
380 50000 50000
381 100345 A0345
382 360017 a0017
383 203289 K3289
384 620000 ~0000
385 620061 ~000z
386 3140113 ~AZaz
387 15396335 ~zzzz
388
389 Parameters
390 ----------
391 designation : str
392 MPC numbered designation.
393
394 Returns
395 -------
396 designation_pf : str
397 MPC packed numbered designation.
398
399 Raises
400 ------
401 ValueError : If the numbered designation cannot be packed.
402 If the numbered designation is larger than 15396335.
403 """
404 number = int(designation)
405 if number > 15396335:
406 raise ValueError(
407 "Numbered designation is too large. Maximum supported is 15396335."
408 )
409
410 if number <= 99999:
411 return "{:05}".format(number)
412 elif (number >= 100000) and (number <= 619999):
413 bigpart, remainder = divmod(number, 10000)
414 return f"{BASE62[bigpart]}{remainder:04}"
415 else:
416 x = number - 620000
417 number_pf = []
418 while x:
419 number_pf.append(BASE62[int(x % 62)])
420 x //= 62
421
422 number_pf.reverse()
423 return "~{}".format("".join(number_pf).zfill(4))
424
425
426def pack_provisional_designation(designation: str) -> str:
427 """
428 Pack a provisional MPC designation.
429
430 Examples of provisional designations:
431 Provisional Packed
432 1995 XA J95X00A
433 1995 XL1 J95X01L
434 1995 FB13 J95F13B
435 1998 SQ108 J98SA8Q
436 1998 SV127 J98SC7V
437 1998 SS162 J98SG2S
438 2099 AZ193 K99AJ3Z
439 2008 AA360 K08Aa0A
440 2007 TA418 K07Tf8A
441
442 Parameters
443 ----------
444 designation : str
445 MPC provisional designation.
446
447 Returns
448 -------
449 designation_pf : str
450 MPC packed provisional designation.
451
452 Raises
453 ------
454 ValueError : If the provisional designation cannot be packed.
455 The provisional designations is not at least 6 characters long.
456 The first 4 characters of the provisional designation are not a year.
457 The 5th character of the provisional designation is not a space.
458 The provisional designation contains a hyphen.
459 The half-month letter is I or Z.
460 """
461 if len(designation) < 6:
462 raise ValueError(
463 "Provisional designations should be at least 6 characters long."
464 )
465 if not designation[:3].isdecimal():
466 raise ValueError(
467 "Expected the first 4 characters of the provisional designation to be a year."
468 )
469 if designation[4] != " ":
470 raise ValueError(
471 "Expected the 5th character of the provisional designation to be a space."
472 )
473 if "-" in designation:
474 raise ValueError("Provisional designations cannot contain a hyphen.")
475
476 year = BASE62[int(designation[0:2])] + designation[2:4]
477 letter1 = designation[5]
478 letter2 = designation[6]
479 cycle = designation[7:]
480
481 if letter1 in {"I", "Z"}:
482 raise ValueError("Half-month letters cannot be I or Z.")
483 if letter1.isdecimal() or letter2.isdecimal():
484 raise ValueError("Invalid provisional designation.")
485
486 cycle_pf = "00"
487 if len(cycle) > 0:
488 cycle_int = int(cycle)
489 if cycle_int <= 99:
490 cycle_pf = str(cycle_int).zfill(2)
491 else:
492 cycle_pf = BASE62[cycle_int // 10] + str(cycle_int % 10)
493
494 designation_pf = "{}{}{}{}".format(year, letter1, cycle_pf, letter2)
495 return designation_pf
496
497
498def pack_survey_designation(designation: str) -> str:
499 """
500 Pack a survey MPC designation.
501
502 Examples of survey designations:
503 Survey Packed
504 2040 P-L PLS2040
505 3138 T-1 T1S3138
506 1010 T-2 T2S1010
507 4101 T-3 T3S4101
508
509 Parameters
510 ----------
511 designation : str
512 MPC survey designation.
513
514 Returns
515 -------
516 designation_pf : str
517 MPC packed survey designation.
518
519 Raises
520 ------
521 ValueError : If the survey designation cannot be packed.
522 The survey designation does not start with P-L, T-1, T-2, or T-3.
523 """
524 number = designation[0:4]
525 survey = designation[5:]
526
527 if survey == "P-L":
528 survey_pf = "PLS"
529
530 elif survey[0:2] == "T-" and survey[2] in {"1", "2", "3"}:
531 survey_pf = "T{}S".format(survey[2])
532
533 else:
534 raise ValueError("Survey designations must start with P-L, T-1, T-2, T-3.")
535
536 designation_pf = "{}{}".format(survey_pf, number.zfill(4))
537 return designation_pf
538
539
540def unpack_numbered_designation(designation_pf: str) -> str:
541 """
542 Unpack a numbered MPC designation.
543
544 Examples of numbered designations:
545 Numbered Unpacked
546 03202 3202
547 50000 50000
548 A0345 100345
549 a0017 360017
550 K3289 203289
551 ~0000 620000
552 ~000z 620061
553 ~AZaz 3140113
554 ~zzzz 15396335
555
556 Parameters
557 ----------
558 designation_pf : str
559 MPC packed numbered designation.
560
561 Returns
562 -------
563 designation : str
564 MPC unpacked numbered designation.
565
566 Raises
567 ------
568 ValueError : If the numbered designation cannot be unpacked.
569 The packed numbered designation is not at least 4 characters long.
570 """
571 number = None
572 # Numbered objects (1 - 99999)
573 if designation_pf.isdecimal():
574 number = int(designation_pf)
575
576 # Numbered objects (620000+)
577 elif designation_pf[0] == "~":
578 number = 620000
579 number_pf = designation_pf[1:]
580 for i, c in enumerate(number_pf):
581 power = len(number_pf) - (i + 1)
582 number += BASE62_MAP[c] * (62**power)
583
584 # Numbered objects (100000 - 619999)
585 else:
586 number = BASE62_MAP[designation_pf[0]] * 10000 + int(designation_pf[1:])
587
588 if number is None:
589 raise ValueError("Packed numbered designation could not be unpacked.")
590 else:
591 designation = str(number)
592
593 return designation
594
595
596def unpack_provisional_designation(designation_pf: str) -> str:
597 """
598 Unpack a provisional MPC designation.
599
600 Examples of provisional designations:
601 Provisional Unpacked
602 J95X00A 1995 XA
603 J95X01L 1995 XL1
604 J95F13B 1995 FB13
605 J98SA8Q 1998 SQ108
606 J98SC7V 1998 SV127
607 J98SG2S 1998 SS162
608 K99AJ3Z 2099 AZ193
609 K08Aa0A 2008 AA360
610 K07Tf8A 2007 TA418
611
612 Parameters
613 ----------
614 designation_pf : str
615 MPC packed provisional designation.
616
617 Returns
618 -------
619 designation : str
620 MPC unpacked provisional designation.
621
622 Raises
623 ------
624 ValueError : If the provisional designation cannot be unpacked.
625 The packed provisional designation is not 7 characters long.
626 The packed provisional designation does not have a year.
627 """
628 if len(designation_pf) != 7:
629 raise ValueError(f"Provisional designation must be 7 characters long: '{designation_pf}'")
630 if not designation_pf[1].isdecimal() or not designation_pf[2].isdecimal():
631 raise ValueError(f"Provisional designation must have a year: '{designation_pf}'")
632 year = str(BASE62_MAP[designation_pf[0]] * 100 + int(designation_pf[1:3]))
633 letter1 = designation_pf[3]
634 letter2 = designation_pf[6]
635 if letter1.isdecimal() or letter2.isdecimal():
636 raise ValueError(f"fourth or seventh char is decimal: '{designation_pf}'")
637 cycle1 = designation_pf[4]
638 cycle2 = designation_pf[5]
639
640 number = int(BASE62_MAP[cycle1]) * 10 + BASE62_MAP[cycle2]
641 if number == 0:
642 number_str = ""
643 else:
644 number_str = str(number)
645
646 designation = "{} {}{}{}".format(year, letter1, letter2, number_str)
647
648 return designation
649
650
651def unpack_survey_designation(designation_pf: str) -> str:
652 """
653 Unpack a survey MPC designation.
654
655 Examples of survey designations:
656 Survey Packed
657 PLS2040 2040 P-L
658 T1S3138 3138 T-1
659 T2S1010 1010 T-2
660 T3S4101 4101 T-3
661
662 Parameters
663 ----------
664 designation_pf : str
665 MPC packed survey designation.
666
667 Returns
668 -------
669 designation : str
670 MPC unpacked survey designation.
671
672 Raises
673 ------
674 ValueError : If the survey designation cannot be unpacked.
675 The packed survey designation does not start with PLS, T1S, T2S, or T3S.
676 """
677 number = int(designation_pf[3:8])
678 survey_pf = designation_pf[0:3]
679 if survey_pf not in {"PLS", "T1S", "T2S", "T3S"}:
680 raise ValueError(
681 "Packed survey designation must start with PLS, T1S, T2S, or T3S."
682 )
683
684 if survey_pf == "PLS":
685 survey = "P-L"
686
687 if survey_pf[0] == "T" and survey_pf[2] == "S":
688 survey = "T-{}".format(survey_pf[1])
689
690 designation = "{} {}".format(number, survey)
691 return designation
692
693
694def unpack_mpc_designation(designation_pf: str) -> str:
695 """
696 Unpack a packed MPC designation. For example, provisional
697 designation J98SG2S will be unpacked to 1998 SS162. Permanent
698 designation 00323 will be unpacked to 323.
699
700 TODO: add support for comet and natural satellite designations
701
702 Parameters
703 ----------
704 designation_pf : str
705 MPC packed form designation.
706
707 Returns
708 -------
709 designation : str
710 MPC unpacked designation.
711
712 Raises
713 ------
714 ValueError : If designation_pf cannot be unpacked.
715 """
716 # Lets see if its a numbered object
717 try:
718 return unpack_numbered_designation(designation_pf)
719 except ValueError:
720 pass
721
722 # Lets see if its a provisional designation
723 try:
724 return unpack_provisional_designation(designation_pf)
725 except ValueError:
726 pass
727
728 # Lets see if its a survey designation
729 try:
730 return unpack_survey_designation(designation_pf)
731 except ValueError:
732 pass
733
734 # At this point we haven't had any success so lets raise an error
735 err = (
736 "Packed form designation '{}' could not be unpacked.\n"
737 "It could not be recognized as any of the following:\n"
738 " - a numbered object (e.g. '03202', 'K3289', '~AZaz')\n"
739 " - a provisional designation (e.g. 'J98SC7V', 'K08Aa0A')\n"
740 " - a survey designation (e.g. 'PLS2040', 'T1S3138')"
741 )
742 raise ValueError(err.format(designation_pf))
str unpack_numbered_designation(str designation_pf)
str unpack_mpc_designation(str designation_pf)
str unpack_survey_designation(str designation_pf)
ss_object_id_to_obj_id(ssObjectID, packed=False)
str unpack_provisional_designation(str designation_pf)
str pack_mpc_designation(str designation)
str pack_provisional_designation(str designation)
Time convert_mpc_packed_dates(npt.ArrayLike pf_tt)
query_disc(nside, ra, dec, max_rad, min_rad=0)
str pack_survey_designation(str designation)
str pack_numbered_designation(str designation)