Coverage for python / lsst / dax / apdb / cassandra / queries.py: 28%

244 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:35 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ColumnExpr", "QExpr", "Query", "Select"] 

25 

26import abc 

27import itertools 

28from collections.abc import Generator, Iterable 

29from typing import Any, overload 

30 

31 

32class ColumnExpr(str): 

33 """A wrapper for a string representing an expression that can be used 

34 where column name is expected, for example "select(*)". This string will be 

35 rendered in a query without quoting. 

36 """ 

37 

38 pass 

39 

40 

41def CSP(n: int) -> str: 

42 """Generate a string of the comma-separate placeholders.""" 

43 return ",".join(["{}"] * n) 

44 

45 

46def _quote_id(columnName: str) -> str: 

47 """Smart quoting for column names. Lower-case names are not quoted.""" 

48 if columnName != "*": 

49 if not isinstance(columnName, ColumnExpr): 

50 if not columnName.islower(): 

51 columnName = '"' + columnName + '"' 

52 return columnName 

53 

54 

55class Query(abc.ABC): 

56 """Abstract base class for all query types.""" 

57 

58 @property 

59 @abc.abstractmethod 

60 def can_prepare(self) -> bool: 

61 """If `False` then this query should not be prepared.""" 

62 raise NotImplementedError() 

63 

64 @property 

65 @abc.abstractmethod 

66 def parameters(self) -> tuple: 

67 """All query parameters in the same order as in query text.""" 

68 raise NotImplementedError() 

69 

70 @abc.abstractmethod 

71 def render(self, placeholder: str | None = None) -> str: 

72 """Generate query string with placeholders for parameters. 

73 

74 Parameters 

75 ---------- 

76 placeholder : `str`, optional 

77 String to replace ``{}`` in the query text with (e.g. "?" or "%s"). 

78 If not specified then original text is returned. 

79 """ 

80 raise NotImplementedError() 

81 

82 def __str__(self) -> str: 

83 """Generate query string with placeholders for parameters.""" 

84 return self.render() 

85 

86 

87class QExpr: 

88 """Class representing a part of query string, such as WHEREexpression or 

89 column list. 

90 

91 Parameters 

92 ---------- 

93 expression : `str` 

94 An expression, arbitrary text with placeholders for parameters 

95 represented by ``{}``. If expression contains "{*}" string it will be 

96 replaced by comma-separated placeholders matching the number of 

97 parameters. 

98 parameters : `tuple`, optional 

99 Parameter values, order must match placeholders in expression. 

100 can_prepare : `bool`, optional 

101 If `False` then the statement which includes this expression should not 

102 be prepared. If not specified as True or False then it is determined by 

103 the presence of parameters, assumed to be False if no parameters are 

104 present. 

105 """ 

106 

107 def __init__(self, expression: str, parameters: Iterable = (), *, can_prepare: bool | None = None): 

108 # Check that number of placeholders matches number of parameters. 

109 self.parameters = tuple(parameters) 

110 if "{*}" in expression: 

111 expression = expression.replace("{*}", CSP(len(self.parameters))) 

112 if expression.count("{}") != len(self.parameters): 

113 raise ValueError( 

114 f"Number of placeholders in expression ({expression}) " 

115 f"does not match number of parameters {parameters}" 

116 ) 

117 self.expression = expression 

118 self._can_prepare = can_prepare 

119 

120 @property 

121 def can_prepare(self) -> bool: 

122 """If `False` then this query should not be prepared.""" 

123 # If there are no parameters then it's likely to contain rendered 

124 # values and we do not want to prepare it. 

125 if self._can_prepare is None: 

126 return bool(self.parameters) 

127 return self._can_prepare 

128 

129 def join(self, other: QExpr, separator: str) -> QExpr: 

130 """Combine two clauses using given separator.""" 

131 return QExpr( 

132 expression=f"{self.expression}{separator}{other.expression}", 

133 parameters=self.parameters + other.parameters, 

134 can_prepare=self.can_prepare and other.can_prepare, 

135 ) 

136 

137 def __and__(self, other: object) -> QExpr: 

138 """Combine two clauses using AND expression.""" 

139 if not isinstance(other, QExpr): 

140 return NotImplemented 

141 return self.join(other, " AND ") 

142 

143 def __eq__(self, other: object) -> bool: 

144 """Compare two clauses for equality.""" 

145 if not isinstance(other, QExpr): 

146 return NotImplemented 

147 return ( 

148 self.expression == other.expression 

149 and self.parameters == other.parameters 

150 and self.can_prepare == other.can_prepare 

151 ) 

152 

153 def __str__(self) -> str: 

154 return f"QExpr({self.expression!r}, {self.parameters}, {self._can_prepare})" 

155 

156 def __repr__(self) -> str: 

157 return str(self) 

158 

159 @staticmethod 

160 def combine(*products: list[QExpr], extra: QExpr | None = None) -> Generator[QExpr]: 

161 """Combine multiple clauses using AND expression. 

162 

163 Parameters 

164 ---------- 

165 *products : `Iterable` [ `QExpr` ] 

166 One or more iterables of clauses to be combined. The result will 

167 include all combinations of clauses from these iterables. If any of 

168 the products is empty it is ignored. 

169 extra : `QExpr`, optional 

170 An extra clause to be added to each combination of clauses from 

171 ``products``. If `None` then no extra clause is added. 

172 

173 Yields 

174 ------ 

175 clause : `QExpr` 

176 Combined clause for one of the combinations of clauses from 

177 ``products`` and the extra clause if it is not `None`. 

178 """ 

179 for clauses in itertools.product(*[product for product in products if product]): 

180 if extra is not None: 

181 clauses = (*clauses, extra) 

182 result = None 

183 for clause in clauses: 

184 if result is None: 

185 result = clause 

186 else: 

187 result &= clause 

188 if result is not None: 

189 yield result 

190 

191 @classmethod 

192 def _from_args(cls, *args: Any, **kwargs: Any) -> QExpr: 

193 """Construct QExpr from a bunch of parameters that can be passed 

194 to Select.where(). 

195 """ 

196 match args: 

197 case (QExpr() as clause,): 

198 if kwargs: 

199 raise TypeError("No keyword arguments expected when QExpr is passed") 

200 return clause 

201 case (str() as expr,): 

202 params = () 

203 case (str() as expr, Iterable() as params): 

204 pass 

205 case _: 

206 raise TypeError(f"Unexpected arguments: {args}") 

207 

208 can_prepare = kwargs.pop("can_prepare", None) 

209 if kwargs: 

210 raise TypeError(f"Unexpected keyword arguments: {kwargs}") 

211 

212 return cls(expr, params, can_prepare=can_prepare) 

213 

214 

215class Column: 

216 """Class representing a table column, its main purpose is to generate 

217 expressions (`QExpr`). 

218 """ 

219 

220 def __init__(self, column: str): 

221 self._column = column 

222 

223 def __str__(self) -> str: 

224 return _quote_id(self._column) 

225 

226 def __eq__(self, other: Any) -> QExpr: # type: ignore[override] 

227 return QExpr(f"{self} = {{}}", [other]) 

228 

229 def __ne__(self, other: Any) -> QExpr: # type: ignore[override] 

230 return QExpr(f"{self} != {{}}", [other]) 

231 

232 def __lt__(self, other: Any) -> QExpr: 

233 return QExpr(f"{self} < {{}}", [other]) 

234 

235 def __le__(self, other: Any) -> QExpr: 

236 return QExpr(f"{self} <= {{}}", [other]) 

237 

238 def __gt__(self, other: Any) -> QExpr: 

239 return QExpr(f"{self} > {{}}", [other]) 

240 

241 def __ge__(self, other: Any) -> QExpr: 

242 return QExpr(f"{self} >= {{}}", [other]) 

243 

244 def in_(self, other: Iterable, *, can_prepare: bool | None = None) -> QExpr: 

245 """Generate IN operator.""" 

246 return QExpr(f"{self} IN ({{*}})", other, can_prepare=can_prepare) 

247 

248 def update(self, other: Any) -> QExpr: 

249 """Generate column = value expression for UPDATE statement.""" 

250 return QExpr(f"{self} = {{}}", [other]) 

251 

252 

253class Select(Query): 

254 """Class representing SELECT query. 

255 

256 Parameters 

257 ---------- 

258 keyspace : `str` 

259 Keyspace name. 

260 table : `str` 

261 Table name. 

262 columns : `~collections.abc.Iterable` [`str`] 

263 Names of the columns to return. 

264 where_clause : `QExpr`, optional 

265 WHERE clause to be added to the query. 

266 extra_clause : `str`, optional 

267 Extra clause to be added to the query, e.g. for GROUP BY or ORDER BY. 

268 can_prepare : `bool`, optional 

269 If `False` then the statement should not be prepared. 

270 """ 

271 

272 def __init__( 

273 self, 

274 keyspace: str, 

275 table: str, 

276 columns: Iterable[str], 

277 *, 

278 where_clause: QExpr | None = None, 

279 extra_clause: str | None = None, 

280 can_prepare: bool = True, 

281 ): 

282 self._keyspace = keyspace 

283 self._table = table 

284 self._columns = tuple(columns) 

285 self._where_clause = where_clause 

286 self._extra_clause = extra_clause 

287 self._can_prepare = can_prepare 

288 

289 @property 

290 def can_prepare(self) -> bool: 

291 """If `False` then this query should not be prepared.""" 

292 if self._where_clause: 

293 return self._where_clause.can_prepare and self._can_prepare 

294 else: 

295 return self._can_prepare 

296 

297 @property 

298 def parameters(self) -> tuple: 

299 """Complete list of all query parameters.""" 

300 if self._where_clause is not None: 

301 return self._where_clause.parameters 

302 return () 

303 

304 @overload 

305 def where(self, where_clause: QExpr) -> Select: ... 305 ↛ exitline 305 didn't return from function 'where' because

306 

307 @overload 

308 def where( 308 ↛ exitline 308 didn't return from function 'where' because

309 self, expression: str, parameters: Iterable = (), *, can_prepare: bool | None = None 

310 ) -> Select: ... 

311 

312 def where(self, *args: Any, **kwargs: Any) -> Select: 

313 """Add another QExpr to the query.""" 

314 where_clause = QExpr._from_args(*args, **kwargs) 

315 if self._where_clause is None: 

316 where = where_clause 

317 else: 

318 where = self._where_clause & where_clause 

319 return Select( 

320 keyspace=self._keyspace, 

321 table=self._table, 

322 columns=self._columns, 

323 where_clause=where, 

324 extra_clause=self._extra_clause, 

325 can_prepare=self._can_prepare, 

326 ) 

327 

328 def render(self, placeholder: str | None = None) -> str: 

329 """Generate query string with placeholders for parameters.""" 

330 select = ",".join(_quote_id(column) for column in self._columns) 

331 

332 result = f"SELECT {select} FROM {_quote_id(self._keyspace)}.{_quote_id(self._table)}" 

333 if self._where_clause is not None: 

334 result += " WHERE " + self._where_clause.expression 

335 if self._extra_clause is not None: 

336 result += " " + self._extra_clause 

337 if placeholder: 

338 result = result.replace("{}", placeholder) 

339 return result 

340 

341 

342class Insert(Query): 

343 """Class representing INSERT query. 

344 

345 Parameters 

346 ---------- 

347 keyspace : `str` 

348 Keyspace name. 

349 table : `str` 

350 Table name. 

351 columns : `~collections.abc.Iterable` [`str`] 

352 Names of the columns to return. 

353 can_prepare : `bool`, optional 

354 If `False` then the statement should not be prepared. 

355 """ 

356 

357 def __init__( 

358 self, 

359 keyspace: str, 

360 table: str, 

361 columns: Iterable[str], 

362 *, 

363 can_prepare: bool = True, 

364 ): 

365 self._keyspace = keyspace 

366 self._table = table 

367 self._columns = tuple(columns) 

368 self._can_prepare = can_prepare 

369 

370 @property 

371 def can_prepare(self) -> bool: 

372 """If `False` then this query should not be prepared.""" 

373 return self._can_prepare 

374 

375 @property 

376 def parameters(self) -> tuple: 

377 """Complete list of all query parameters.""" 

378 return () 

379 

380 def render(self, placeholder: str | None = None) -> str: 

381 """Generate query string with placeholders for parameters.""" 

382 columns = ",".join(_quote_id(column) for column in self._columns) 

383 query = ( 

384 f"INSERT INTO {_quote_id(self._keyspace)}.{_quote_id(self._table)} ({columns}) " 

385 f"VALUES ({CSP(len(self._columns))})" 

386 ) 

387 if placeholder: 

388 query = query.replace("{}", placeholder) 

389 return query 

390 

391 

392class Delete(Query): 

393 """Class representing DELETE query. 

394 

395 Parameters 

396 ---------- 

397 keyspace : `str` 

398 Keyspace name. 

399 table : `str` 

400 Table name. 

401 where_clause : `QExpr`, optional 

402 WHERE clause to be added to the query. 

403 can_prepare : `bool`, optional 

404 If `False` then the statement should not be prepared. 

405 """ 

406 

407 def __init__( 

408 self, 

409 keyspace: str, 

410 table: str, 

411 *, 

412 where_clause: QExpr | None = None, 

413 can_prepare: bool = True, 

414 ): 

415 self._keyspace = keyspace 

416 self._table = table 

417 self._where_clause = where_clause 

418 self._can_prepare = can_prepare 

419 

420 @property 

421 def can_prepare(self) -> bool: 

422 """If `False` then this query should not be prepared.""" 

423 if self._where_clause: 

424 return self._where_clause.can_prepare and self._can_prepare 

425 else: 

426 return self._can_prepare 

427 

428 @property 

429 def parameters(self) -> tuple: 

430 """Complete list of all query parameters.""" 

431 if self._where_clause is not None: 

432 return self._where_clause.parameters 

433 return () 

434 

435 @overload 

436 def where(self, where_clause: QExpr) -> Delete: ... 436 ↛ exitline 436 didn't return from function 'where' because

437 

438 @overload 

439 def where( 439 ↛ exitline 439 didn't return from function 'where' because

440 self, expression: str, parameters: Iterable = (), *, can_prepare: bool | None = None 

441 ) -> Delete: ... 

442 

443 def where(self, *args: Any, **kwargs: Any) -> Delete: 

444 """Add another QExpr to the query.""" 

445 where_clause = QExpr._from_args(*args, **kwargs) 

446 if self._where_clause is None: 

447 where = where_clause 

448 else: 

449 where = self._where_clause & where_clause 

450 return Delete( 

451 keyspace=self._keyspace, 

452 table=self._table, 

453 where_clause=where, 

454 can_prepare=self._can_prepare, 

455 ) 

456 

457 def render(self, placeholder: str | None = None) -> str: 

458 """Generate query string with placeholders for parameters.""" 

459 # DELETE without WHERE is very likely an error. 

460 if self._where_clause is None: 

461 raise RuntimeError("DELETE statement without WHERE clause is dangerous.") 

462 

463 result = f"DELETE FROM {_quote_id(self._keyspace)}.{_quote_id(self._table)}" 

464 if self._where_clause is not None: 

465 result += " WHERE " + self._where_clause.expression 

466 if placeholder: 

467 result = result.replace("{}", placeholder) 

468 return result 

469 

470 

471class Update(Query): 

472 """Class representing UPDATE query. 

473 

474 Parameters 

475 ---------- 

476 keyspace : `str` 

477 Keyspace name. 

478 table : `str` 

479 Table name. 

480 where_clause : `QExpr`, optional 

481 WHERE clause to be added to the query. 

482 values : `QExpr`, optional 

483 SET clause to be added to the query. 

484 can_prepare : `bool`, optional 

485 If `False` then the statement should not be prepared. 

486 """ 

487 

488 def __init__( 

489 self, 

490 keyspace: str, 

491 table: str, 

492 *, 

493 where_clause: QExpr | None = None, 

494 values: QExpr | None = None, 

495 can_prepare: bool = True, 

496 ): 

497 self._keyspace = keyspace 

498 self._table = table 

499 self._where_clause = where_clause 

500 self._values = values 

501 self._can_prepare = can_prepare 

502 

503 @property 

504 def can_prepare(self) -> bool: 

505 """If `False` then this query should not be prepared.""" 

506 if self._where_clause: 

507 return self._where_clause.can_prepare and self._can_prepare 

508 else: 

509 return self._can_prepare 

510 

511 @property 

512 def parameters(self) -> tuple: 

513 """Complete list of all query parameters.""" 

514 set_params = () if self._values is None else self._values.parameters 

515 where_params = () if self._where_clause is None else self._where_clause.parameters 

516 return set_params + where_params 

517 

518 @overload 

519 def where(self, where_clause: QExpr) -> Update: ... 519 ↛ exitline 519 didn't return from function 'where' because

520 

521 @overload 

522 def where( 522 ↛ exitline 522 didn't return from function 'where' because

523 self, expression: str, parameters: Iterable = (), *, can_prepare: bool | None = None 

524 ) -> Update: ... 

525 

526 def where(self, *args: Any, **kwargs: Any) -> Update: 

527 """Add another QExpr to the query.""" 

528 where_clause = QExpr._from_args(*args, **kwargs) 

529 if self._where_clause is None: 

530 where = where_clause 

531 else: 

532 where = self._where_clause & where_clause 

533 return Update( 

534 keyspace=self._keyspace, 

535 table=self._table, 

536 where_clause=where, 

537 values=self._values, 

538 can_prepare=self._can_prepare, 

539 ) 

540 

541 def values(self, *updates: QExpr) -> Update: 

542 """Define SET clause. 

543 

544 Parameters 

545 ---------- 

546 updates : `QExpr` 

547 One ot more SET clauses, e.g. "x = {}", "y = 10". 

548 """ 

549 if not updates: 

550 return self 

551 values = self._values 

552 if values is None: 

553 values = updates[0] 

554 updates = updates[1:] 

555 for update in updates: 

556 values = values.join(update, ", ") 

557 return Update( 

558 keyspace=self._keyspace, 

559 table=self._table, 

560 where_clause=self._where_clause, 

561 values=values, 

562 can_prepare=self._can_prepare, 

563 ) 

564 

565 def render(self, placeholder: str | None = None) -> str: 

566 """Generate query string with placeholders for parameters.""" 

567 # UPDATE without WHERE is very likely an error. 

568 if self._where_clause is None: 

569 raise RuntimeError("UPDATE statement without WHERE clause is dangerous.") 

570 if self._values is None: 

571 raise RuntimeError("UPDATE statement without SET clause.") 

572 

573 result = f"UPDATE {_quote_id(self._keyspace)}.{_quote_id(self._table)} SET " 

574 result += self._values.expression 

575 result += " WHERE " + self._where_clause.expression 

576 if placeholder: 

577 result = result.replace("{}", placeholder) 

578 return result