Coverage for python / lsst / summit / utils / consdbClient.py: 20%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:33 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import logging
23import os
24from collections.abc import Mapping
25from dataclasses import dataclass
26from typing import Any
27from urllib.parse import quote, urlparse
29import requests
30from astropy.table import Table
32__all__ = ["ConsDbClient", "FlexibleMetadataInfo"]
35logger = logging.getLogger(__name__)
38def _urljoin(*args: str) -> str:
39 """Join parts of a URL with slashes.
41 Does not do any quoting. Mostly to remove a level of list-making.
43 Parameters
44 ----------
45 *args : `str`
46 Each parameter is a URL part.
48 Returns
49 -------
50 url : `str`
51 The joined URL.
52 """
53 return "/".join(args)
56def _check_status(r: requests.Response) -> None:
57 """Check the status of an HTTP response and raise if an error.
59 Adds additional response information to the raise_for_status exception.
61 Parameters
62 ----------
63 r : `requests.Response`
64 The response to check.
66 Raises
67 ------
68 requests.HTTPError
69 Raised if a non-successful status is returned.
70 """
71 try:
72 r.raise_for_status()
73 except requests.HTTPError as e:
74 try:
75 json_data = e.response.json()
76 e.add_note(str(json_data))
77 if "message" in json_data:
78 e.add_note(f"\n\n{json_data['message']}")
79 except requests.JSONDecodeError:
80 pass
81 raise e
84def clean_url(resp: requests.Response, *args, **kwargs) -> requests.Response:
85 """Parse url from response and remove netloc portion.
87 Set new url in response and return response
89 Parameters
90 ----------
91 resp : `requests.Response`
92 The response that could contain a URL with tokens
93 """
94 url = urlparse(resp.url)
95 short_user = f"{url.username[:2]}***" if url.username is not None else ""
96 short_pass = f":{url.password[:2]}***" if url.password is not None else ""
97 netloc = f"{short_user}{short_pass}@{url.hostname}"
98 resp.url = url._replace(netloc=netloc).geturl()
99 return resp
102@dataclass
103class FlexibleMetadataInfo:
104 """Description of a flexible metadata value.
106 Parameters
107 ----------
108 dtype : `str`
109 Data type of the flexible metadata value.
110 One of ``bool``, ``int``, ``float``, or ``str``.
111 doc : `str`
112 Documentation string.
113 unit : `str`, optional
114 Unit of the value.
115 ucd : `str`, optional
116 IVOA Unified Content Descriptor
117 (https://www.ivoa.net/documents/UCD1+/).
118 """
120 dtype: str
121 doc: str
122 unit: str | None = None
123 ucd: str | None = None
126class ConsDbClient:
127 """A client library for accessing the Consolidated Database.
129 This library provides a basic interface for using flexible metadata
130 (key/value pairs associated with observation ids from an observation
131 type table), determining the schema of ConsDB tables, querying the
132 ConsDB using a general SQL SELECT statement, and inserting into
133 ConsDB tables.
135 Parameters
136 ----------
137 url : `str`, optional
138 Base URL of the Web service, defaults to the value of environment
139 variable ``LSST_CONSDB_PQ_URL`` (the location of the publish/query
140 service).
141 token : `str`, optional
142 Authentication token for the RSP. The token must begin with "gt-".
144 Notes
145 -----
146 This client is a thin layer over the publish/query Web service, which
147 avoids having a dependency on database drivers.
149 It enforces the return of query results as Astropy Tables.
150 """
152 def __init__(self, url: str | None = None, token: str | None = None):
153 self.session = requests.Session()
154 self.session.hooks["response"].append(clean_url)
156 if token is not None:
157 if not token.startswith("gt-"):
158 raise ValueError("token must start with `gt-`.")
160 self.session.headers.update({"Authorization": f"Bearer {token}"})
162 if url is None:
163 self.url = os.environ["LSST_CONSDB_PQ_URL"]
164 else:
165 self.url = url
166 self.url = self.url.rstrip("/")
168 def _handle_get(self, url: str, query: dict[str, str | list[str]] | None = None) -> Any:
169 """Submit GET requests to the server.
171 Parameters
172 ----------
173 url : `str`
174 URL to GET.
175 query : `dict` [`str`, `str` | `list` [`str`]], optional
176 Query parameters to attach to the URL.
178 Raises
179 ------
180 requests.RequestException
181 Raised if any kind of connection error occurs.
182 requests.HTTPError
183 Raised if a non-successful status is returned.
184 requests.JSONDecodeError
185 Raised if the result does not decode as JSON.
187 Returns
188 -------
189 result : `Any`
190 Result of decoding the Web service result content as JSON.
191 """
192 logger.debug(f"GET {url}")
193 response = self.session.get(url, params=query)
194 _check_status(response)
195 return response.json()
197 def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response:
198 """Submit POST requests to the server.
200 Parameters
201 ----------
202 url : `str`
203 URL to POST.
204 data : `dict` [`str`, `Any`]
205 Key/value pairs of data to POST.
207 Raises
208 ------
209 requests.RequestException
210 Raised if any kind of connection error occurs.
211 requests.HTTPError
212 Raised if a non-successful status is returned.
214 Returns
215 -------
216 result : `requests.Response`
217 The raw Web service result object.
218 """
219 logger.debug(f"POST {url}: {data}")
220 response = self.session.post(url, json=data)
221 _check_status(response)
222 return response
224 @staticmethod
225 def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str:
226 """Compute the name of a flexible metadata table.
228 Each instrument and observation type made with that instrument can
229 have a flexible metadata table. This function is useful when
230 issuing SQL queries, and it avoids a round-trip to the server.
232 Parameters
233 ----------
234 instrument : `str`
235 Name of the instrument (e.g. ``LATISS``).
236 obs_type : `str`
237 Name of the observation type (e.g. ``Exposure``).
239 Returns
240 -------
241 table_name : `str`
242 Name of the appropriate flexible metadata table.
243 """
244 return f"cdb_{instrument}.{obs_type}_flexdata"
246 @staticmethod
247 def compute_fixed_metadata_namespace(instrument: str) -> str:
248 """Compute the namespace for a fixed metadata table.
250 Each instrument has its own namespace in the ConsDB.
251 This function is useful when issuing SQL queries, and it avoids a
252 round-trip to the server.
254 Parameters
255 ----------
256 instrument : `str`
257 Name of the instrument (e.g. ``LATISS``).
259 Returns
260 -------
261 namespace_name : `str`
262 Name of the appropriate namespace
263 """
264 return f"cdb_{instrument}"
266 def add_flexible_metadata_key(
267 self,
268 instrument: str,
269 obs_type: str,
270 key: str,
271 dtype: str,
272 doc: str,
273 unit: str | None = None,
274 ucd: str | None = None,
275 ) -> requests.Response:
276 """Add a key to a flexible metadata table.
278 Parameters
279 ----------
280 instrument : `str`
281 Name of the instrument (e.g. ``LATISS``).
282 obs_type : `str`
283 Name of the observation type (e.g. ``Exposure``).
284 key : `str`
285 Name of the key to be added (must not already exist).
286 dtype : `str`
287 One of ``bool``, ``int``, ``float``, or ``str``.
288 doc : `str`
289 Documentation string for the key.
290 unit : `str`, optional
291 Unit for the value. Should be from the IVOA
292 (https://www.ivoa.net/documents/VOUnits/) or astropy.
293 ucd : `str`, optional
294 IVOA Unified Content Descriptor
295 (https://www.ivoa.net/documents/UCD1+/).
297 Returns
298 -------
299 response : `requests.Response`
300 HTTP response from the server, with 200 status for success.
302 Raises
303 ------
304 requests.RequestException
305 Raised if any kind of connection error occurs.
306 requests.HTTPError
307 Raised if a non-successful status is returned.
308 """
309 data = {"key": key, "dtype": dtype, "doc": doc}
310 if unit is not None:
311 data["unit"] = unit
312 if ucd is not None:
313 data["ucd"] = ucd
314 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey")
315 return self._handle_post(url, data)
317 def get_flexible_metadata_keys(self, instrument: str, obs_type: str) -> dict[str, FlexibleMetadataInfo]:
318 """Retrieve the valid keys for a flexible metadata table.
320 Parameters
321 ----------
322 instrument : `str`
323 Name of the instrument (e.g. ``LATISS``).
324 obs_type : `str`
325 Name of the observation type (e.g. ``Exposure``).
327 Returns
328 -------
329 key_info : `dict` [ `str`, `FlexibleMetadataInfo` ]
330 Dict of keys and information values.
332 Raises
333 ------
334 requests.RequestException
335 Raised if any kind of connection error occurs.
336 requests.HTTPError
337 Raised if a non-successful status is returned.
338 """
339 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema")
340 result = self._handle_get(url)
341 return {key: FlexibleMetadataInfo(*value) for key, value in result.items()}
343 def get_flexible_metadata(
344 self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None
345 ) -> dict[str, Any]:
346 """Get the flexible metadata for an observation.
348 Parameters
349 ----------
350 instrument : `str`
351 Name of the instrument (e.g. ``LATISS``).
352 obs_type : `str`
353 Name of the observation type (e.g. ``Exposure``).
354 obs_id : `int`
355 Unique observation id.
356 keys : `list` [ `str` ], optional
357 List of keys to be retrieved; all if not specified.
359 Returns
360 -------
361 result_dict : `dict` [ `str`, `Any` ]
362 Dictionary of key/value pairs for the observation.
364 Raises
365 ------
366 requests.RequestException
367 Raised if any kind of connection error occurs.
368 requests.HTTPError
369 Raised if a non-successful status is returned.
370 """
371 url = _urljoin(
372 self.url,
373 "flex",
374 quote(instrument),
375 quote(obs_type),
376 "obs",
377 quote(str(obs_id)),
378 )
379 return self._handle_get(url, {"k": keys} if keys else None)
381 def get_all_metadata(
382 self, instrument: str, obs_type: str, obs_id: int, flex: bool = False
383 ) -> dict[str, Any]:
384 """Get all metadata for an observation.
386 Parameters
387 ----------
388 instrument : `str`
389 Name of the instrument (e.g. ``LATISS``).
390 obs_type : `str`
391 Name of the observation type (e.g. ``Exposure``).
392 obs_id : `int`
393 Unique observation id.
394 flex : `bool`
395 Include flexible metadata.
397 Returns
398 -------
399 result_dict : `dict` [ `str`, `Any` ]
400 Dictionary of key/value pairs for the observation.
402 Raises
403 ------
404 requests.RequestException
405 Raised if any kind of connection error occurs.
406 requests.HTTPError
407 Raised if a non-successful status is returned.
408 """
409 url = _urljoin(
410 self.url,
411 "query",
412 quote(instrument),
413 quote(obs_type),
414 "obs",
415 quote(str(obs_id)),
416 )
417 return self._handle_get(url, {"flex": "1"} if flex else None)
419 def insert_flexible_metadata(
420 self,
421 instrument: str,
422 obs_type: str,
423 obs_id: int,
424 values: dict[str, Any] | None = None,
425 *,
426 allow_update: bool = False,
427 **kwargs,
428 ) -> requests.Response:
429 """Set flexible metadata values for an observation.
431 Parameters
432 ----------
433 instrument : `str`
434 Name of the instrument (e.g. ``LATISS``).
435 obs_type : `str`
436 Name of the observation type (e.g. ``Exposure``).
437 obs_id : `int`
438 Unique observation id.
439 values : `dict` [ `str`, `Any` ], optional
440 Dictionary of key/value pairs to add for the observation.
441 allow_update : `bool`, optional
442 If ``True``, allow replacement of values of existing keys.
443 **kwargs : `dict`
444 Additional key/value pairs, overriding ``values``.
446 Returns
447 -------
448 response : `requests.Response`
449 HTTP response from the server, with 200 status for success.
451 Raises
452 ------
453 ValueError
454 Raised if no values are provided in ``values`` or kwargs.
455 requests.RequestException
456 Raised if any kind of connection error occurs.
457 requests.HTTPError
458 Raised if a non-successful status is returned.
459 """
460 if values:
461 values.update(kwargs)
462 else:
463 values = kwargs
464 if not values:
465 raise ValueError(f"No values to set for {instrument} {obs_type} {obs_id}")
466 data = {"values": values}
467 url = _urljoin(
468 self.url,
469 "flex",
470 quote(instrument),
471 quote(obs_type),
472 "obs",
473 quote(str(obs_id)),
474 )
475 if allow_update:
476 url += "?u=1"
477 return self._handle_post(url, data)
479 def insert(
480 self,
481 instrument: str,
482 table: str,
483 obs_id: tuple[int, int] | int,
484 values: Mapping[str, Any],
485 *,
486 allow_update: bool = False,
487 **kwargs,
488 ) -> requests.Response:
489 """Insert values into a single ConsDB fixed metadata table.
491 Parameters
492 ----------
493 instrument : `str`
494 Name of the instrument (e.g. ``LATISS``).
495 table : `str`
496 Name of the table to insert into.
497 obs_id : `tuple` [ `int`, `int`] or `int`
498 Unique observation id or day_obs and seq_num.
499 values : `Mapping` [ `str`, `Any` ]
500 Dictionary-like mapping of column/value pairs to add for the
501 observation.
502 allow_update : `bool`, optional
503 If ``True``, allow replacement of values of existing columns.
504 **kwargs : `dict`
505 Additional column/value pairs, overriding ``values``.
507 Returns
508 -------
509 response : `requests.Response`
510 HTTP response from the server, with 200 status for success.
512 Raises
513 ------
514 ValueError
515 Raised if no values are provided in ``values`` or kwargs.
516 requests.RequestException
517 Raised if any kind of connection error occurs.
518 requests.HTTPError
519 Raised if a non-successful status is returned.
520 """
521 # Build a new merged dict to avoid mutating the incoming Mapping.
522 merged_values: dict[str, Any] = {**(dict(values) if values else {}), **kwargs}
523 if not merged_values:
524 raise ValueError(f"No values to insert for {instrument} {table} {obs_id}")
526 data: dict[str, Any]
527 if isinstance(obs_id, tuple):
528 data = {"table": table, "values": merged_values}
529 url = _urljoin(
530 self.url,
531 "insert",
532 quote(instrument),
533 quote(table),
534 "by_seq_num",
535 quote(str(obs_id[0])),
536 quote(str(obs_id[1])),
537 )
538 else:
539 data = {"table": table, "obs_id": obs_id, "values": merged_values}
540 url = _urljoin(
541 self.url,
542 "insert",
543 quote(instrument),
544 quote(table),
545 "obs",
546 quote(str(obs_id)),
547 )
548 if allow_update:
549 url += "?u=1"
550 return self._handle_post(url, data)
552 def insert_multiple(
553 self,
554 instrument: str,
555 table: str,
556 obs_dict: dict[int, dict[str, Any]],
557 *,
558 allow_update=False,
559 ) -> requests.Response:
560 """Insert values into a single ConsDB fixed metadata table.
562 Parameters
563 ----------
564 instrument : `str`
565 Name of the instrument (e.g. ``LATISS``).
566 table : `str`
567 Name of the table to insert into.
568 obs_dict : `dict` [ `int`, `dict` [ `str`, `Any` ] ]
569 Dictionary of observation ids, each with a dictionary of
570 column/value pairs to add for each observation.
571 allow_update : `bool`, optional
572 If ``True``, allow replacement of values of existing columns.
574 Returns
575 -------
576 response : `requests.Response`
577 HTTP response from the server, with 200 status for success.
579 Raises
580 ------
581 ValueError
582 Raised if no values are provided in ``obs_dict``.
583 requests.RequestException
584 Raised if any kind of connection error occurs.
585 requests.HTTPError
586 Raised if a non-successful status is returned.
587 """
588 if not obs_dict:
589 raise ValueError(f"No values to insert for {instrument} {table}")
590 data = {"table": table, "obs_dict": obs_dict}
591 url = _urljoin(
592 self.url,
593 "insert",
594 quote(instrument),
595 quote(table),
596 )
597 if allow_update:
598 url += "?u=1"
599 return self._handle_post(url, data)
601 def query(self, query: str) -> Table:
602 """Query the ConsDB database.
604 Parameters
605 ----------
606 query : `str`
607 A SQL query (currently) to the database.
609 Returns
610 -------
611 result : `Table`
612 An ``astropy.Table`` containing the query results.
614 Raises
615 ------
616 requests.RequestException
617 Raised if any kind of connection error occurs.
618 requests.HTTPError
619 Raised if a non-successful status is returned.
621 Notes
622 -----
623 This is a very general query interface because it is expected that
624 a wide variety of types of queries will be needed. If some types prove
625 to be common, syntactic sugar could be added to make them simpler.
626 """
627 url = _urljoin(self.url, "query")
628 data = {"query": query}
629 result = self._handle_post(url, data).json()
631 columns = result.get("columns", [])
632 if not columns:
633 # No result columns
634 return Table(rows=[])
636 rows = result.get("data", [])
637 if not rows:
638 # No result rows
639 return Table(names=columns)
641 return Table(rows=rows, names=columns)
643 def schema(
644 self, instrument: str | None = None, table: str | None = None
645 ) -> dict[str, tuple[str, str]] | list[str]:
646 """Retrieve information about ConsDB.
648 If ``instrument`` and ``table`` are given, return the schema of a
649 fixed metadata table in ConsDB.
651 If only ``instrument`` is given, return the names of all tables
652 for that instrument.
654 If no arguments are given, return the names of all instruments.
656 Parameters
657 ----------
658 instrument : `str`, optional
659 Name of the instrument (e.g. ``LATISS``).
660 table : `str`, optional
661 Name of the table to insert into.
663 Returns
664 -------
665 info : `list` [ `str` ] or `dict` [ `str`, `tuple` [ `str`, `str` ] ]
666 A list of instrument strings or table names, or else a dict of
667 columns with values that are tuples containing a data type string
668 and a documentation string.
670 Raises
671 ------
672 ValueError
673 Raised if only ``table`` is given.
674 requests.RequestException
675 Raised if any kind of connection error occurs.
676 requests.HTTPError
677 Raised if a non-successful status is returned.
679 Notes
680 -----
681 Fixed metadata data types may use the full database vocabulary,
682 unlike flexible metadata data types.
683 """
684 if instrument is None:
685 if table is not None:
686 raise ValueError("Must specify instrument if table is given")
687 url = _urljoin(self.url, "schema")
688 elif table is None:
689 url = _urljoin(self.url, "schema", quote(instrument))
690 else:
691 url = _urljoin(self.url, "schema", quote(instrument), quote(table))
692 result = self._handle_get(url)
693 if instrument is not None and table is not None:
694 return {key: (str(value[0]), str(value[1])) for key, value in result.items()}
695 else:
696 return [str(value) for value in result]