Coverage for python / lsst / summit / utils / consdbClient.py: 18%
174 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 09:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 09:04 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import logging
23import os
24from collections.abc import Mapping
25from dataclasses import dataclass
26from typing import Any
27from urllib.parse import quote, urlparse
29import numpy as np
30import requests
31from astropy.table import Column, Table
33__all__ = ["ConsDbClient", "FlexibleMetadataInfo", "getCcdVisitTableForDay", "getWideQuicklookTableForDay"]
36logger = logging.getLogger(__name__)
39def _urljoin(*args: str) -> str:
40 """Join parts of a URL with slashes.
42 Does not do any quoting. Mostly to remove a level of list-making.
44 Parameters
45 ----------
46 *args : `str`
47 Each parameter is a URL part.
49 Returns
50 -------
51 url : `str`
52 The joined URL.
53 """
54 return "/".join(args)
57def _check_status(r: requests.Response) -> None:
58 """Check the status of an HTTP response and raise if an error.
60 Adds additional response information to the raise_for_status exception.
62 Parameters
63 ----------
64 r : `requests.Response`
65 The response to check.
67 Raises
68 ------
69 requests.HTTPError
70 Raised if a non-successful status is returned.
71 """
72 try:
73 r.raise_for_status()
74 except requests.HTTPError as e:
75 try:
76 json_data = e.response.json()
77 e.add_note(str(json_data))
78 if "message" in json_data:
79 e.add_note(f"\n\n{json_data['message']}")
80 except requests.JSONDecodeError:
81 pass
82 raise e
85def clean_url(resp: requests.Response, *args, **kwargs) -> requests.Response:
86 """Parse url from response and remove netloc portion.
88 Set new url in response and return response
90 Parameters
91 ----------
92 resp : `requests.Response`
93 The response that could contain a URL with tokens
94 """
95 url = urlparse(resp.url)
96 short_user = f"{url.username[:2]}***" if url.username is not None else ""
97 short_pass = f":{url.password[:2]}***" if url.password is not None else ""
98 netloc = f"{short_user}{short_pass}@{url.hostname}"
99 resp.url = url._replace(netloc=netloc).geturl()
100 return resp
103@dataclass
104class FlexibleMetadataInfo:
105 """Description of a flexible metadata value.
107 Parameters
108 ----------
109 dtype : `str`
110 Data type of the flexible metadata value.
111 One of ``bool``, ``int``, ``float``, or ``str``.
112 doc : `str`
113 Documentation string.
114 unit : `str`, optional
115 Unit of the value.
116 ucd : `str`, optional
117 IVOA Unified Content Descriptor
118 (https://www.ivoa.net/documents/UCD1+/).
119 """
121 dtype: str
122 doc: str
123 unit: str | None = None
124 ucd: str | None = None
127class ConsDbClient:
128 """A client library for accessing the Consolidated Database.
130 This library provides a basic interface for using flexible metadata
131 (key/value pairs associated with observation ids from an observation
132 type table), determining the schema of ConsDB tables, querying the
133 ConsDB using a general SQL SELECT statement, and inserting into
134 ConsDB tables.
136 Parameters
137 ----------
138 url : `str`, optional
139 Base URL of the Web service, defaults to the value of environment
140 variable ``LSST_CONSDB_PQ_URL`` (the location of the publish/query
141 service).
142 token : `str`, optional
143 Authentication token for the RSP. The token must begin with "gt-".
145 Notes
146 -----
147 This client is a thin layer over the publish/query Web service, which
148 avoids having a dependency on database drivers.
150 It enforces the return of query results as Astropy Tables.
151 """
153 def __init__(self, url: str | None = None, token: str | None = None):
154 self.session = requests.Session()
155 self.session.hooks["response"].append(clean_url)
157 if token is not None:
158 if not token.startswith("gt-"):
159 raise ValueError("token must start with `gt-`.")
161 self.session.headers.update({"Authorization": f"Bearer {token}"})
163 if url is None:
164 self.url = os.environ["LSST_CONSDB_PQ_URL"]
165 else:
166 self.url = url
167 self.url = self.url.rstrip("/")
169 def _handle_get(self, url: str, query: dict[str, str | list[str]] | None = None) -> Any:
170 """Submit GET requests to the server.
172 Parameters
173 ----------
174 url : `str`
175 URL to GET.
176 query : `dict` [`str`, `str` | `list` [`str`]], optional
177 Query parameters to attach to the URL.
179 Raises
180 ------
181 requests.RequestException
182 Raised if any kind of connection error occurs.
183 requests.HTTPError
184 Raised if a non-successful status is returned.
185 requests.JSONDecodeError
186 Raised if the result does not decode as JSON.
188 Returns
189 -------
190 result : `Any`
191 Result of decoding the Web service result content as JSON.
192 """
193 logger.debug(f"GET {url}")
194 response = self.session.get(url, params=query)
195 _check_status(response)
196 return response.json()
198 def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response:
199 """Submit POST requests to the server.
201 Parameters
202 ----------
203 url : `str`
204 URL to POST.
205 data : `dict` [`str`, `Any`]
206 Key/value pairs of data to POST.
208 Raises
209 ------
210 requests.RequestException
211 Raised if any kind of connection error occurs.
212 requests.HTTPError
213 Raised if a non-successful status is returned.
215 Returns
216 -------
217 result : `requests.Response`
218 The raw Web service result object.
219 """
220 logger.debug(f"POST {url}: {data}")
221 response = self.session.post(url, json=data)
222 _check_status(response)
223 return response
225 @staticmethod
226 def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str:
227 """Compute the name of a flexible metadata table.
229 Each instrument and observation type made with that instrument can
230 have a flexible metadata table. This function is useful when
231 issuing SQL queries, and it avoids a round-trip to the server.
233 Parameters
234 ----------
235 instrument : `str`
236 Name of the instrument (e.g. ``LATISS``).
237 obs_type : `str`
238 Name of the observation type (e.g. ``Exposure``).
240 Returns
241 -------
242 table_name : `str`
243 Name of the appropriate flexible metadata table.
244 """
245 return f"cdb_{instrument}.{obs_type}_flexdata"
247 @staticmethod
248 def compute_fixed_metadata_namespace(instrument: str) -> str:
249 """Compute the namespace for a fixed metadata table.
251 Each instrument has its own namespace in the ConsDB.
252 This function is useful when issuing SQL queries, and it avoids a
253 round-trip to the server.
255 Parameters
256 ----------
257 instrument : `str`
258 Name of the instrument (e.g. ``LATISS``).
260 Returns
261 -------
262 namespace_name : `str`
263 Name of the appropriate namespace
264 """
265 return f"cdb_{instrument}"
267 def add_flexible_metadata_key(
268 self,
269 instrument: str,
270 obs_type: str,
271 key: str,
272 dtype: str,
273 doc: str,
274 unit: str | None = None,
275 ucd: str | None = None,
276 ) -> requests.Response:
277 """Add a key to a flexible metadata table.
279 Parameters
280 ----------
281 instrument : `str`
282 Name of the instrument (e.g. ``LATISS``).
283 obs_type : `str`
284 Name of the observation type (e.g. ``Exposure``).
285 key : `str`
286 Name of the key to be added (must not already exist).
287 dtype : `str`
288 One of ``bool``, ``int``, ``float``, or ``str``.
289 doc : `str`
290 Documentation string for the key.
291 unit : `str`, optional
292 Unit for the value. Should be from the IVOA
293 (https://www.ivoa.net/documents/VOUnits/) or astropy.
294 ucd : `str`, optional
295 IVOA Unified Content Descriptor
296 (https://www.ivoa.net/documents/UCD1+/).
298 Returns
299 -------
300 response : `requests.Response`
301 HTTP response from the server, with 200 status for success.
303 Raises
304 ------
305 requests.RequestException
306 Raised if any kind of connection error occurs.
307 requests.HTTPError
308 Raised if a non-successful status is returned.
309 """
310 data = {"key": key, "dtype": dtype, "doc": doc}
311 if unit is not None:
312 data["unit"] = unit
313 if ucd is not None:
314 data["ucd"] = ucd
315 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey")
316 return self._handle_post(url, data)
318 def get_flexible_metadata_keys(self, instrument: str, obs_type: str) -> dict[str, FlexibleMetadataInfo]:
319 """Retrieve the valid keys for a flexible metadata table.
321 Parameters
322 ----------
323 instrument : `str`
324 Name of the instrument (e.g. ``LATISS``).
325 obs_type : `str`
326 Name of the observation type (e.g. ``Exposure``).
328 Returns
329 -------
330 key_info : `dict` [ `str`, `FlexibleMetadataInfo` ]
331 Dict of keys and information values.
333 Raises
334 ------
335 requests.RequestException
336 Raised if any kind of connection error occurs.
337 requests.HTTPError
338 Raised if a non-successful status is returned.
339 """
340 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema")
341 result = self._handle_get(url)
342 return {key: FlexibleMetadataInfo(*value) for key, value in result.items()}
344 def get_flexible_metadata(
345 self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None
346 ) -> dict[str, Any]:
347 """Get the flexible metadata for an observation.
349 Parameters
350 ----------
351 instrument : `str`
352 Name of the instrument (e.g. ``LATISS``).
353 obs_type : `str`
354 Name of the observation type (e.g. ``Exposure``).
355 obs_id : `int`
356 Unique observation id.
357 keys : `list` [ `str` ], optional
358 List of keys to be retrieved; all if not specified.
360 Returns
361 -------
362 result_dict : `dict` [ `str`, `Any` ]
363 Dictionary of key/value pairs for the observation.
365 Raises
366 ------
367 requests.RequestException
368 Raised if any kind of connection error occurs.
369 requests.HTTPError
370 Raised if a non-successful status is returned.
371 """
372 url = _urljoin(
373 self.url,
374 "flex",
375 quote(instrument),
376 quote(obs_type),
377 "obs",
378 quote(str(obs_id)),
379 )
380 return self._handle_get(url, {"k": keys} if keys else None)
382 def get_all_metadata(
383 self, instrument: str, obs_type: str, obs_id: int, flex: bool = False
384 ) -> dict[str, Any]:
385 """Get all metadata for an observation.
387 Parameters
388 ----------
389 instrument : `str`
390 Name of the instrument (e.g. ``LATISS``).
391 obs_type : `str`
392 Name of the observation type (e.g. ``Exposure``).
393 obs_id : `int`
394 Unique observation id.
395 flex : `bool`
396 Include flexible metadata.
398 Returns
399 -------
400 result_dict : `dict` [ `str`, `Any` ]
401 Dictionary of key/value pairs for the observation.
403 Raises
404 ------
405 requests.RequestException
406 Raised if any kind of connection error occurs.
407 requests.HTTPError
408 Raised if a non-successful status is returned.
409 """
410 url = _urljoin(
411 self.url,
412 "query",
413 quote(instrument),
414 quote(obs_type),
415 "obs",
416 quote(str(obs_id)),
417 )
418 return self._handle_get(url, {"flex": "1"} if flex else None)
420 def insert_flexible_metadata(
421 self,
422 instrument: str,
423 obs_type: str,
424 obs_id: int,
425 values: dict[str, Any] | None = None,
426 *,
427 allow_update: bool = False,
428 **kwargs,
429 ) -> requests.Response:
430 """Set flexible metadata values for an observation.
432 Parameters
433 ----------
434 instrument : `str`
435 Name of the instrument (e.g. ``LATISS``).
436 obs_type : `str`
437 Name of the observation type (e.g. ``Exposure``).
438 obs_id : `int`
439 Unique observation id.
440 values : `dict` [ `str`, `Any` ], optional
441 Dictionary of key/value pairs to add for the observation.
442 allow_update : `bool`, optional
443 If ``True``, allow replacement of values of existing keys.
444 **kwargs : `dict`
445 Additional key/value pairs, overriding ``values``.
447 Returns
448 -------
449 response : `requests.Response`
450 HTTP response from the server, with 200 status for success.
452 Raises
453 ------
454 ValueError
455 Raised if no values are provided in ``values`` or kwargs.
456 requests.RequestException
457 Raised if any kind of connection error occurs.
458 requests.HTTPError
459 Raised if a non-successful status is returned.
460 """
461 if values:
462 values.update(kwargs)
463 else:
464 values = kwargs
465 if not values:
466 raise ValueError(f"No values to set for {instrument} {obs_type} {obs_id}")
467 data = {"values": values}
468 url = _urljoin(
469 self.url,
470 "flex",
471 quote(instrument),
472 quote(obs_type),
473 "obs",
474 quote(str(obs_id)),
475 )
476 if allow_update:
477 url += "?u=1"
478 return self._handle_post(url, data)
480 def insert(
481 self,
482 instrument: str,
483 table: str,
484 obs_id: tuple[int, int] | int,
485 values: Mapping[str, Any],
486 *,
487 allow_update: bool = False,
488 **kwargs,
489 ) -> requests.Response:
490 """Insert values into a single ConsDB fixed metadata table.
492 Parameters
493 ----------
494 instrument : `str`
495 Name of the instrument (e.g. ``LATISS``).
496 table : `str`
497 Name of the table to insert into.
498 obs_id : `tuple` [ `int`, `int`] or `int`
499 Unique observation id or day_obs and seq_num.
500 values : `Mapping` [ `str`, `Any` ]
501 Dictionary-like mapping of column/value pairs to add for the
502 observation.
503 allow_update : `bool`, optional
504 If ``True``, allow replacement of values of existing columns.
505 **kwargs : `dict`
506 Additional column/value pairs, overriding ``values``.
508 Returns
509 -------
510 response : `requests.Response`
511 HTTP response from the server, with 200 status for success.
513 Raises
514 ------
515 ValueError
516 Raised if no values are provided in ``values`` or kwargs.
517 requests.RequestException
518 Raised if any kind of connection error occurs.
519 requests.HTTPError
520 Raised if a non-successful status is returned.
521 """
522 # Build a new merged dict to avoid mutating the incoming Mapping.
523 merged_values: dict[str, Any] = {**(dict(values) if values else {}), **kwargs}
524 if not merged_values:
525 raise ValueError(f"No values to insert for {instrument} {table} {obs_id}")
527 data: dict[str, Any]
528 if isinstance(obs_id, tuple):
529 data = {"table": table, "values": merged_values}
530 url = _urljoin(
531 self.url,
532 "insert",
533 quote(instrument),
534 quote(table),
535 "by_seq_num",
536 quote(str(obs_id[0])),
537 quote(str(obs_id[1])),
538 )
539 else:
540 data = {"table": table, "obs_id": obs_id, "values": merged_values}
541 url = _urljoin(
542 self.url,
543 "insert",
544 quote(instrument),
545 quote(table),
546 "obs",
547 quote(str(obs_id)),
548 )
549 if allow_update:
550 url += "?u=1"
551 return self._handle_post(url, data)
553 def insert_multiple(
554 self,
555 instrument: str,
556 table: str,
557 obs_dict: dict[int, dict[str, Any]],
558 *,
559 allow_update=False,
560 ) -> requests.Response:
561 """Insert values into a single ConsDB fixed metadata table.
563 Parameters
564 ----------
565 instrument : `str`
566 Name of the instrument (e.g. ``LATISS``).
567 table : `str`
568 Name of the table to insert into.
569 obs_dict : `dict` [ `int`, `dict` [ `str`, `Any` ] ]
570 Dictionary of observation ids, each with a dictionary of
571 column/value pairs to add for each observation.
572 allow_update : `bool`, optional
573 If ``True``, allow replacement of values of existing columns.
575 Returns
576 -------
577 response : `requests.Response`
578 HTTP response from the server, with 200 status for success.
580 Raises
581 ------
582 ValueError
583 Raised if no values are provided in ``obs_dict``.
584 requests.RequestException
585 Raised if any kind of connection error occurs.
586 requests.HTTPError
587 Raised if a non-successful status is returned.
588 """
589 if not obs_dict:
590 raise ValueError(f"No values to insert for {instrument} {table}")
591 data = {"table": table, "obs_dict": obs_dict}
592 url = _urljoin(
593 self.url,
594 "insert",
595 quote(instrument),
596 quote(table),
597 )
598 if allow_update:
599 url += "?u=1"
600 return self._handle_post(url, data)
602 def query(self, query: str) -> Table:
603 """Query the ConsDB database.
605 Parameters
606 ----------
607 query : `str`
608 A SQL query (currently) to the database.
610 Returns
611 -------
612 result : `Table`
613 An ``astropy.Table`` containing the query results.
615 Raises
616 ------
617 requests.RequestException
618 Raised if any kind of connection error occurs.
619 requests.HTTPError
620 Raised if a non-successful status is returned.
622 Notes
623 -----
624 This is a very general query interface because it is expected that
625 a wide variety of types of queries will be needed. If some types prove
626 to be common, syntactic sugar could be added to make them simpler.
627 """
628 url = _urljoin(self.url, "query")
629 data = {"query": query}
630 result = self._handle_post(url, data).json()
632 columns = result.get("columns", [])
633 if not columns:
634 # No result columns
635 return Table(rows=[])
637 rows = result.get("data", [])
638 if not rows:
639 # No result rows
640 return Table(names=columns)
642 return Table(rows=rows, names=columns)
644 def schema(
645 self, instrument: str | None = None, table: str | None = None
646 ) -> dict[str, tuple[str, str]] | list[str]:
647 """Retrieve information about ConsDB.
649 If ``instrument`` and ``table`` are given, return the schema of a
650 fixed metadata table in ConsDB.
652 If only ``instrument`` is given, return the names of all tables
653 for that instrument.
655 If no arguments are given, return the names of all instruments.
657 Parameters
658 ----------
659 instrument : `str`, optional
660 Name of the instrument (e.g. ``LATISS``).
661 table : `str`, optional
662 Name of the table to insert into.
664 Returns
665 -------
666 info : `list` [ `str` ] or `dict` [ `str`, `tuple` [ `str`, `str` ] ]
667 A list of instrument strings or table names, or else a dict of
668 columns with values that are tuples containing a data type string
669 and a documentation string.
671 Raises
672 ------
673 ValueError
674 Raised if only ``table`` is given.
675 requests.RequestException
676 Raised if any kind of connection error occurs.
677 requests.HTTPError
678 Raised if a non-successful status is returned.
680 Notes
681 -----
682 Fixed metadata data types may use the full database vocabulary,
683 unlike flexible metadata data types.
684 """
685 if instrument is None:
686 if table is not None:
687 raise ValueError("Must specify instrument if table is given")
688 url = _urljoin(self.url, "schema")
689 elif table is None:
690 url = _urljoin(self.url, "schema", quote(instrument))
691 else:
692 url = _urljoin(self.url, "schema", quote(instrument), quote(table))
693 result = self._handle_get(url)
694 if instrument is not None and table is not None:
695 return {key: (str(value[0]), str(value[1])) for key, value in result.items()}
696 else:
697 return [str(value) for value in result]
700def getCcdVisitTableForDay(
701 client: ConsDbClient,
702 dayObs: int,
703 visitTableItems: list[str] | None = None,
704 detectors: list[int] | None = None,
705 withZeropoint: bool = False,
706) -> Table:
707 """Get the ccdvisit1_quicklook table for a given dayObs.
709 Parameters
710 ----------
711 client : `ConsDbClient`
712 The ConsDbClient to use.
713 dayObs : `int`
714 The dayObs to query for.
715 visitTableItems : `list` of `str`, optional
716 Additional items from the visit1 table to include.
717 detectors : `list` of `int`, optional
718 If given, only return rows for these detectors.
719 withZeropoint : `bool`, optional
720 If ``True``, only return rows with a non-null zeropoint.
722 Returns
723 -------
724 table : `astropy.table.Table`
725 The resulting table.
726 """
727 extraVisit: str = ", " + ", ".join(f"v.{item}" for item in visitTableItems) if visitTableItems else ""
728 query = (
729 "SELECT cvq.*, "
730 "cv.detector, cv.visit_id, "
731 f"v.band, v.exp_time, v.seq_num, v.day_obs, v.img_type{extraVisit} "
732 "FROM cdb_LSSTCam.ccdvisit1_quicklook as cvq, "
733 "cdb_LSSTCam.ccdvisit1 as cv, "
734 "cdb_LSSTCam.visit1 as v "
735 )
736 where = f"WHERE cvq.ccdvisit_id=cv.ccdvisit_id and cv.visit_id=v.visit_id and v.day_obs={dayObs}"
737 if detectors:
738 where += f" and detector in ({','.join([str(d) for d in detectors])})"
739 if withZeropoint:
740 where += " and cvq.zero_point is not null"
742 table = client.query(query + where)
743 return table
746def columnsEqual(a: Column, b: Column) -> bool:
747 """Check if two columns are equal, taking masks into account.
749 Parameters
750 ----------
751 a : `Column`
752 First column to compare.
753 b : `Column`
754 Second column to compare.
756 Returns
757 -------
758 equal : `bool`
759 True if the columns are equal, False otherwise.
760 """
761 aArr = np.asanyarray(a)
762 bArr = np.asanyarray(b)
763 if aArr.shape != bArr.shape:
764 return False
766 aMask = getattr(a, "mask", None)
767 bMask = getattr(b, "mask", None)
769 if aMask is None and bMask is None:
770 return bool(np.all(aArr == bArr))
772 if aMask is None:
773 aMask = np.zeros(aArr.shape, dtype=bool)
774 if bMask is None:
775 bMask = np.zeros(bArr.shape, dtype=bool)
777 aMaskArr = np.asanyarray(aMask)
778 bMaskArr = np.asanyarray(bMask)
780 if np.any(aMaskArr ^ bMaskArr):
781 return False # one masked where the other isn't
783 present = ~aMaskArr
784 return bool(np.all(aArr[present] == bArr[present]))
787def getWideQuicklookTableForDay(client: ConsDbClient, dayObs: int) -> Table:
788 """Get a wide quicklook table for a given dayObs.
790 Joins all columns from the visit1 table to the visit1_quicklook table. Note
791 that the visit1 table already contains all the columns from the exposure
792 table, and is just keyed by the exposure_id instead of the visit_id.
794 Parameters
795 ----------
796 client : `ConsDbClient`
797 The ConsDbClient to use.
798 dayObs : `int`
799 The dayObs to query for.
801 Returns
802 -------
803 table : `astropy.table.Table`
804 The resulting wide quicklook table.
805 """
806 vqCols = set(client.query("SELECT * FROM cdb_LSSTCam.visit1_quicklook LIMIT 0").colnames)
807 vCols = set(client.query("SELECT * FROM cdb_LSSTCam.visit1 LIMIT 0").colnames)
809 vOnlyCols = vCols - vqCols # exclude visit_id and all duplicates
811 selectClauses = ["vq.*"] + [f"v.{col}" for col in sorted(vOnlyCols)]
813 query = f"""
814 SELECT {', '.join(selectClauses)}
815 FROM cdb_LSSTCam.visit1_quicklook vq
816 INNER JOIN cdb_LSSTCam.visit1 v USING (visit_id)
817 WHERE vq.day_obs = {dayObs}
818 """
819 return client.query(query)