Coverage for python / lsst / summit / utils / consdbClient.py: 20%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 19:02 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import logging 

23import os 

24from collections.abc import Mapping 

25from dataclasses import dataclass 

26from typing import Any 

27from urllib.parse import quote, urlparse 

28 

29import requests 

30from astropy.table import Table 

31 

32__all__ = ["ConsDbClient", "FlexibleMetadataInfo"] 

33 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38def _urljoin(*args: str) -> str: 

39 """Join parts of a URL with slashes. 

40 

41 Does not do any quoting. Mostly to remove a level of list-making. 

42 

43 Parameters 

44 ---------- 

45 *args : `str` 

46 Each parameter is a URL part. 

47 

48 Returns 

49 ------- 

50 url : `str` 

51 The joined URL. 

52 """ 

53 return "/".join(args) 

54 

55 

56def _check_status(r: requests.Response) -> None: 

57 """Check the status of an HTTP response and raise if an error. 

58 

59 Adds additional response information to the raise_for_status exception. 

60 

61 Parameters 

62 ---------- 

63 r : `requests.Response` 

64 The response to check. 

65 

66 Raises 

67 ------ 

68 requests.HTTPError 

69 Raised if a non-successful status is returned. 

70 """ 

71 try: 

72 r.raise_for_status() 

73 except requests.HTTPError as e: 

74 try: 

75 json_data = e.response.json() 

76 e.add_note(str(json_data)) 

77 if "message" in json_data: 

78 e.add_note(f"\n\n{json_data['message']}") 

79 except requests.JSONDecodeError: 

80 pass 

81 raise e 

82 

83 

84def clean_url(resp: requests.Response, *args, **kwargs) -> requests.Response: 

85 """Parse url from response and remove netloc portion. 

86 

87 Set new url in response and return response 

88 

89 Parameters 

90 ---------- 

91 resp : `requests.Response` 

92 The response that could contain a URL with tokens 

93 """ 

94 url = urlparse(resp.url) 

95 short_user = f"{url.username[:2]}***" if url.username is not None else "" 

96 short_pass = f":{url.password[:2]}***" if url.password is not None else "" 

97 netloc = f"{short_user}{short_pass}@{url.hostname}" 

98 resp.url = url._replace(netloc=netloc).geturl() 

99 return resp 

100 

101 

102@dataclass 

103class FlexibleMetadataInfo: 

104 """Description of a flexible metadata value. 

105 

106 Parameters 

107 ---------- 

108 dtype : `str` 

109 Data type of the flexible metadata value. 

110 One of ``bool``, ``int``, ``float``, or ``str``. 

111 doc : `str` 

112 Documentation string. 

113 unit : `str`, optional 

114 Unit of the value. 

115 ucd : `str`, optional 

116 IVOA Unified Content Descriptor 

117 (https://www.ivoa.net/documents/UCD1+/). 

118 """ 

119 

120 dtype: str 

121 doc: str 

122 unit: str | None = None 

123 ucd: str | None = None 

124 

125 

126class ConsDbClient: 

127 """A client library for accessing the Consolidated Database. 

128 

129 This library provides a basic interface for using flexible metadata 

130 (key/value pairs associated with observation ids from an observation 

131 type table), determining the schema of ConsDB tables, querying the 

132 ConsDB using a general SQL SELECT statement, and inserting into 

133 ConsDB tables. 

134 

135 Parameters 

136 ---------- 

137 url : `str`, optional 

138 Base URL of the Web service, defaults to the value of environment 

139 variable ``LSST_CONSDB_PQ_URL`` (the location of the publish/query 

140 service). 

141 token : `str`, optional 

142 Authentication token for the RSP. The token must begin with "gt-". 

143 

144 Notes 

145 ----- 

146 This client is a thin layer over the publish/query Web service, which 

147 avoids having a dependency on database drivers. 

148 

149 It enforces the return of query results as Astropy Tables. 

150 """ 

151 

152 def __init__(self, url: str | None = None, token: str | None = None): 

153 self.session = requests.Session() 

154 self.session.hooks["response"].append(clean_url) 

155 

156 if token is not None: 

157 if not token.startswith("gt-"): 

158 raise ValueError("token must start with `gt-`.") 

159 

160 self.session.headers.update({"Authorization": f"Bearer {token}"}) 

161 

162 if url is None: 

163 self.url = os.environ["LSST_CONSDB_PQ_URL"] 

164 else: 

165 self.url = url 

166 self.url = self.url.rstrip("/") 

167 

168 def _handle_get(self, url: str, query: dict[str, str | list[str]] | None = None) -> Any: 

169 """Submit GET requests to the server. 

170 

171 Parameters 

172 ---------- 

173 url : `str` 

174 URL to GET. 

175 query : `dict` [`str`, `str` | `list` [`str`]], optional 

176 Query parameters to attach to the URL. 

177 

178 Raises 

179 ------ 

180 requests.RequestException 

181 Raised if any kind of connection error occurs. 

182 requests.HTTPError 

183 Raised if a non-successful status is returned. 

184 requests.JSONDecodeError 

185 Raised if the result does not decode as JSON. 

186 

187 Returns 

188 ------- 

189 result : `Any` 

190 Result of decoding the Web service result content as JSON. 

191 """ 

192 logger.debug(f"GET {url}") 

193 response = self.session.get(url, params=query) 

194 _check_status(response) 

195 return response.json() 

196 

197 def _handle_post(self, url: str, data: dict[str, Any]) -> requests.Response: 

198 """Submit POST requests to the server. 

199 

200 Parameters 

201 ---------- 

202 url : `str` 

203 URL to POST. 

204 data : `dict` [`str`, `Any`] 

205 Key/value pairs of data to POST. 

206 

207 Raises 

208 ------ 

209 requests.RequestException 

210 Raised if any kind of connection error occurs. 

211 requests.HTTPError 

212 Raised if a non-successful status is returned. 

213 

214 Returns 

215 ------- 

216 result : `requests.Response` 

217 The raw Web service result object. 

218 """ 

219 logger.debug(f"POST {url}: {data}") 

220 response = self.session.post(url, json=data) 

221 _check_status(response) 

222 return response 

223 

224 @staticmethod 

225 def compute_flexible_metadata_table_name(instrument: str, obs_type: str) -> str: 

226 """Compute the name of a flexible metadata table. 

227 

228 Each instrument and observation type made with that instrument can 

229 have a flexible metadata table. This function is useful when 

230 issuing SQL queries, and it avoids a round-trip to the server. 

231 

232 Parameters 

233 ---------- 

234 instrument : `str` 

235 Name of the instrument (e.g. ``LATISS``). 

236 obs_type : `str` 

237 Name of the observation type (e.g. ``Exposure``). 

238 

239 Returns 

240 ------- 

241 table_name : `str` 

242 Name of the appropriate flexible metadata table. 

243 """ 

244 return f"cdb_{instrument}.{obs_type}_flexdata" 

245 

246 @staticmethod 

247 def compute_fixed_metadata_namespace(instrument: str) -> str: 

248 """Compute the namespace for a fixed metadata table. 

249 

250 Each instrument has its own namespace in the ConsDB. 

251 This function is useful when issuing SQL queries, and it avoids a 

252 round-trip to the server. 

253 

254 Parameters 

255 ---------- 

256 instrument : `str` 

257 Name of the instrument (e.g. ``LATISS``). 

258 

259 Returns 

260 ------- 

261 namespace_name : `str` 

262 Name of the appropriate namespace 

263 """ 

264 return f"cdb_{instrument}" 

265 

266 def add_flexible_metadata_key( 

267 self, 

268 instrument: str, 

269 obs_type: str, 

270 key: str, 

271 dtype: str, 

272 doc: str, 

273 unit: str | None = None, 

274 ucd: str | None = None, 

275 ) -> requests.Response: 

276 """Add a key to a flexible metadata table. 

277 

278 Parameters 

279 ---------- 

280 instrument : `str` 

281 Name of the instrument (e.g. ``LATISS``). 

282 obs_type : `str` 

283 Name of the observation type (e.g. ``Exposure``). 

284 key : `str` 

285 Name of the key to be added (must not already exist). 

286 dtype : `str` 

287 One of ``bool``, ``int``, ``float``, or ``str``. 

288 doc : `str` 

289 Documentation string for the key. 

290 unit : `str`, optional 

291 Unit for the value. Should be from the IVOA 

292 (https://www.ivoa.net/documents/VOUnits/) or astropy. 

293 ucd : `str`, optional 

294 IVOA Unified Content Descriptor 

295 (https://www.ivoa.net/documents/UCD1+/). 

296 

297 Returns 

298 ------- 

299 response : `requests.Response` 

300 HTTP response from the server, with 200 status for success. 

301 

302 Raises 

303 ------ 

304 requests.RequestException 

305 Raised if any kind of connection error occurs. 

306 requests.HTTPError 

307 Raised if a non-successful status is returned. 

308 """ 

309 data = {"key": key, "dtype": dtype, "doc": doc} 

310 if unit is not None: 

311 data["unit"] = unit 

312 if ucd is not None: 

313 data["ucd"] = ucd 

314 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "addkey") 

315 return self._handle_post(url, data) 

316 

317 def get_flexible_metadata_keys(self, instrument: str, obs_type: str) -> dict[str, FlexibleMetadataInfo]: 

318 """Retrieve the valid keys for a flexible metadata table. 

319 

320 Parameters 

321 ---------- 

322 instrument : `str` 

323 Name of the instrument (e.g. ``LATISS``). 

324 obs_type : `str` 

325 Name of the observation type (e.g. ``Exposure``). 

326 

327 Returns 

328 ------- 

329 key_info : `dict` [ `str`, `FlexibleMetadataInfo` ] 

330 Dict of keys and information values. 

331 

332 Raises 

333 ------ 

334 requests.RequestException 

335 Raised if any kind of connection error occurs. 

336 requests.HTTPError 

337 Raised if a non-successful status is returned. 

338 """ 

339 url = _urljoin(self.url, "flex", quote(instrument), quote(obs_type), "schema") 

340 result = self._handle_get(url) 

341 return {key: FlexibleMetadataInfo(*value) for key, value in result.items()} 

342 

343 def get_flexible_metadata( 

344 self, instrument: str, obs_type: str, obs_id: int, keys: list[str] | None = None 

345 ) -> dict[str, Any]: 

346 """Get the flexible metadata for an observation. 

347 

348 Parameters 

349 ---------- 

350 instrument : `str` 

351 Name of the instrument (e.g. ``LATISS``). 

352 obs_type : `str` 

353 Name of the observation type (e.g. ``Exposure``). 

354 obs_id : `int` 

355 Unique observation id. 

356 keys : `list` [ `str` ], optional 

357 List of keys to be retrieved; all if not specified. 

358 

359 Returns 

360 ------- 

361 result_dict : `dict` [ `str`, `Any` ] 

362 Dictionary of key/value pairs for the observation. 

363 

364 Raises 

365 ------ 

366 requests.RequestException 

367 Raised if any kind of connection error occurs. 

368 requests.HTTPError 

369 Raised if a non-successful status is returned. 

370 """ 

371 url = _urljoin( 

372 self.url, 

373 "flex", 

374 quote(instrument), 

375 quote(obs_type), 

376 "obs", 

377 quote(str(obs_id)), 

378 ) 

379 return self._handle_get(url, {"k": keys} if keys else None) 

380 

381 def get_all_metadata( 

382 self, instrument: str, obs_type: str, obs_id: int, flex: bool = False 

383 ) -> dict[str, Any]: 

384 """Get all metadata for an observation. 

385 

386 Parameters 

387 ---------- 

388 instrument : `str` 

389 Name of the instrument (e.g. ``LATISS``). 

390 obs_type : `str` 

391 Name of the observation type (e.g. ``Exposure``). 

392 obs_id : `int` 

393 Unique observation id. 

394 flex : `bool` 

395 Include flexible metadata. 

396 

397 Returns 

398 ------- 

399 result_dict : `dict` [ `str`, `Any` ] 

400 Dictionary of key/value pairs for the observation. 

401 

402 Raises 

403 ------ 

404 requests.RequestException 

405 Raised if any kind of connection error occurs. 

406 requests.HTTPError 

407 Raised if a non-successful status is returned. 

408 """ 

409 url = _urljoin( 

410 self.url, 

411 "query", 

412 quote(instrument), 

413 quote(obs_type), 

414 "obs", 

415 quote(str(obs_id)), 

416 ) 

417 return self._handle_get(url, {"flex": "1"} if flex else None) 

418 

419 def insert_flexible_metadata( 

420 self, 

421 instrument: str, 

422 obs_type: str, 

423 obs_id: int, 

424 values: dict[str, Any] | None = None, 

425 *, 

426 allow_update: bool = False, 

427 **kwargs, 

428 ) -> requests.Response: 

429 """Set flexible metadata values for an observation. 

430 

431 Parameters 

432 ---------- 

433 instrument : `str` 

434 Name of the instrument (e.g. ``LATISS``). 

435 obs_type : `str` 

436 Name of the observation type (e.g. ``Exposure``). 

437 obs_id : `int` 

438 Unique observation id. 

439 values : `dict` [ `str`, `Any` ], optional 

440 Dictionary of key/value pairs to add for the observation. 

441 allow_update : `bool`, optional 

442 If ``True``, allow replacement of values of existing keys. 

443 **kwargs : `dict` 

444 Additional key/value pairs, overriding ``values``. 

445 

446 Returns 

447 ------- 

448 response : `requests.Response` 

449 HTTP response from the server, with 200 status for success. 

450 

451 Raises 

452 ------ 

453 ValueError 

454 Raised if no values are provided in ``values`` or kwargs. 

455 requests.RequestException 

456 Raised if any kind of connection error occurs. 

457 requests.HTTPError 

458 Raised if a non-successful status is returned. 

459 """ 

460 if values: 

461 values.update(kwargs) 

462 else: 

463 values = kwargs 

464 if not values: 

465 raise ValueError(f"No values to set for {instrument} {obs_type} {obs_id}") 

466 data = {"values": values} 

467 url = _urljoin( 

468 self.url, 

469 "flex", 

470 quote(instrument), 

471 quote(obs_type), 

472 "obs", 

473 quote(str(obs_id)), 

474 ) 

475 if allow_update: 

476 url += "?u=1" 

477 return self._handle_post(url, data) 

478 

479 def insert( 

480 self, 

481 instrument: str, 

482 table: str, 

483 obs_id: tuple[int, int] | int, 

484 values: Mapping[str, Any], 

485 *, 

486 allow_update: bool = False, 

487 **kwargs, 

488 ) -> requests.Response: 

489 """Insert values into a single ConsDB fixed metadata table. 

490 

491 Parameters 

492 ---------- 

493 instrument : `str` 

494 Name of the instrument (e.g. ``LATISS``). 

495 table : `str` 

496 Name of the table to insert into. 

497 obs_id : `tuple` [ `int`, `int`] or `int` 

498 Unique observation id or day_obs and seq_num. 

499 values : `Mapping` [ `str`, `Any` ] 

500 Dictionary-like mapping of column/value pairs to add for the 

501 observation. 

502 allow_update : `bool`, optional 

503 If ``True``, allow replacement of values of existing columns. 

504 **kwargs : `dict` 

505 Additional column/value pairs, overriding ``values``. 

506 

507 Returns 

508 ------- 

509 response : `requests.Response` 

510 HTTP response from the server, with 200 status for success. 

511 

512 Raises 

513 ------ 

514 ValueError 

515 Raised if no values are provided in ``values`` or kwargs. 

516 requests.RequestException 

517 Raised if any kind of connection error occurs. 

518 requests.HTTPError 

519 Raised if a non-successful status is returned. 

520 """ 

521 # Build a new merged dict to avoid mutating the incoming Mapping. 

522 merged_values: dict[str, Any] = {**(dict(values) if values else {}), **kwargs} 

523 if not merged_values: 

524 raise ValueError(f"No values to insert for {instrument} {table} {obs_id}") 

525 

526 data: dict[str, Any] 

527 if isinstance(obs_id, tuple): 

528 data = {"table": table, "values": merged_values} 

529 url = _urljoin( 

530 self.url, 

531 "insert", 

532 quote(instrument), 

533 quote(table), 

534 "by_seq_num", 

535 quote(str(obs_id[0])), 

536 quote(str(obs_id[1])), 

537 ) 

538 else: 

539 data = {"table": table, "obs_id": obs_id, "values": merged_values} 

540 url = _urljoin( 

541 self.url, 

542 "insert", 

543 quote(instrument), 

544 quote(table), 

545 "obs", 

546 quote(str(obs_id)), 

547 ) 

548 if allow_update: 

549 url += "?u=1" 

550 return self._handle_post(url, data) 

551 

552 def insert_multiple( 

553 self, 

554 instrument: str, 

555 table: str, 

556 obs_dict: dict[int, dict[str, Any]], 

557 *, 

558 allow_update=False, 

559 ) -> requests.Response: 

560 """Insert values into a single ConsDB fixed metadata table. 

561 

562 Parameters 

563 ---------- 

564 instrument : `str` 

565 Name of the instrument (e.g. ``LATISS``). 

566 table : `str` 

567 Name of the table to insert into. 

568 obs_dict : `dict` [ `int`, `dict` [ `str`, `Any` ] ] 

569 Dictionary of observation ids, each with a dictionary of 

570 column/value pairs to add for each observation. 

571 allow_update : `bool`, optional 

572 If ``True``, allow replacement of values of existing columns. 

573 

574 Returns 

575 ------- 

576 response : `requests.Response` 

577 HTTP response from the server, with 200 status for success. 

578 

579 Raises 

580 ------ 

581 ValueError 

582 Raised if no values are provided in ``obs_dict``. 

583 requests.RequestException 

584 Raised if any kind of connection error occurs. 

585 requests.HTTPError 

586 Raised if a non-successful status is returned. 

587 """ 

588 if not obs_dict: 

589 raise ValueError(f"No values to insert for {instrument} {table}") 

590 data = {"table": table, "obs_dict": obs_dict} 

591 url = _urljoin( 

592 self.url, 

593 "insert", 

594 quote(instrument), 

595 quote(table), 

596 ) 

597 if allow_update: 

598 url += "?u=1" 

599 return self._handle_post(url, data) 

600 

601 def query(self, query: str) -> Table: 

602 """Query the ConsDB database. 

603 

604 Parameters 

605 ---------- 

606 query : `str` 

607 A SQL query (currently) to the database. 

608 

609 Returns 

610 ------- 

611 result : `Table` 

612 An ``astropy.Table`` containing the query results. 

613 

614 Raises 

615 ------ 

616 requests.RequestException 

617 Raised if any kind of connection error occurs. 

618 requests.HTTPError 

619 Raised if a non-successful status is returned. 

620 

621 Notes 

622 ----- 

623 This is a very general query interface because it is expected that 

624 a wide variety of types of queries will be needed. If some types prove 

625 to be common, syntactic sugar could be added to make them simpler. 

626 """ 

627 url = _urljoin(self.url, "query") 

628 data = {"query": query} 

629 result = self._handle_post(url, data).json() 

630 

631 columns = result.get("columns", []) 

632 if not columns: 

633 # No result columns 

634 return Table(rows=[]) 

635 

636 rows = result.get("data", []) 

637 if not rows: 

638 # No result rows 

639 return Table(names=columns) 

640 

641 return Table(rows=rows, names=columns) 

642 

643 def schema( 

644 self, instrument: str | None = None, table: str | None = None 

645 ) -> dict[str, tuple[str, str]] | list[str]: 

646 """Retrieve information about ConsDB. 

647 

648 If ``instrument`` and ``table`` are given, return the schema of a 

649 fixed metadata table in ConsDB. 

650 

651 If only ``instrument`` is given, return the names of all tables 

652 for that instrument. 

653 

654 If no arguments are given, return the names of all instruments. 

655 

656 Parameters 

657 ---------- 

658 instrument : `str`, optional 

659 Name of the instrument (e.g. ``LATISS``). 

660 table : `str`, optional 

661 Name of the table to insert into. 

662 

663 Returns 

664 ------- 

665 info : `list` [ `str` ] or `dict` [ `str`, `tuple` [ `str`, `str` ] ] 

666 A list of instrument strings or table names, or else a dict of 

667 columns with values that are tuples containing a data type string 

668 and a documentation string. 

669 

670 Raises 

671 ------ 

672 ValueError 

673 Raised if only ``table`` is given. 

674 requests.RequestException 

675 Raised if any kind of connection error occurs. 

676 requests.HTTPError 

677 Raised if a non-successful status is returned. 

678 

679 Notes 

680 ----- 

681 Fixed metadata data types may use the full database vocabulary, 

682 unlike flexible metadata data types. 

683 """ 

684 if instrument is None: 

685 if table is not None: 

686 raise ValueError("Must specify instrument if table is given") 

687 url = _urljoin(self.url, "schema") 

688 elif table is None: 

689 url = _urljoin(self.url, "schema", quote(instrument)) 

690 else: 

691 url = _urljoin(self.url, "schema", quote(instrument), quote(table)) 

692 result = self._handle_get(url) 

693 if instrument is not None and table is not None: 

694 return {key: (str(value[0]), str(value[1])) for key, value in result.items()} 

695 else: 

696 return [str(value) for value in result]