Coverage for python / lsst / dax / apdb / cassandra / apdbCassandraAdmin.py: 13%

267 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ApdbCassandraAdmin"] 

25 

26import dataclasses 

27import itertools 

28import logging 

29import warnings 

30from collections import defaultdict 

31from collections.abc import Iterable, Mapping 

32from typing import TYPE_CHECKING, Protocol 

33 

34import astropy.time 

35 

36from lsst.sphgeom import LonLat, UnitVector3d 

37from lsst.utils.iteration import chunk_iterable 

38 

39try: 

40 import cassandra 

41except ImportError: 

42 pass 

43 

44from ..apdbAdmin import ApdbAdmin, DiaForcedSourceLocator, DiaObjectLocator, DiaSourceLocator 

45from ..apdbSchema import ApdbTables 

46from ..monitor import MonAgent 

47from ..timer import Timer 

48from .cassandra_utils import execute_concurrent, quote_id 

49from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange 

50from .sessionFactory import SessionContext 

51 

52if TYPE_CHECKING: 

53 from .apdbCassandra import ApdbCassandra 

54 from .partitioner import Partitioner 

55 

56_LOG = logging.getLogger(__name__) 

57 

58_MON = MonAgent(__name__) 

59 

60 

61class ConfirmDeletePartitions(Protocol): 

62 """Protocol for callable which confirms deletion of partitions.""" 

63 

64 def __call__(self, *, partitions: list[int], tables: list[str], partitioner: Partitioner) -> bool: ... 64 ↛ exitline 64 didn't return from function '__call__' because

65 

66 

67@dataclasses.dataclass 

68class DatabaseInfo: 

69 """Collection of information about a specific database.""" 

70 

71 name: str 

72 """Keyspace name.""" 

73 

74 permissions: dict[str, set[str]] | None = None 

75 """Roles that can access the database and their permissions. 

76 

77 `None` means that authentication information is not accessible due to 

78 system table permissions. If anonymous access is enabled then dictionary 

79 will be empty but not `None`. 

80 """ 

81 

82 

83class ApdbCassandraAdmin(ApdbAdmin): 

84 """Implementation of `ApdbAdmin` for Cassandra backend. 

85 

86 Parameters 

87 ---------- 

88 apdb : `ApdbCassandra` 

89 APDB implementation. 

90 """ 

91 

92 def __init__(self, apdb: ApdbCassandra): 

93 self._apdb = apdb 

94 

95 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer: 

96 """Create `Timer` instance given its name.""" 

97 return Timer(name, _MON, _LOG, tags=tags) 

98 

99 @classmethod 

100 def list_databases(cls, host: str) -> Iterable[DatabaseInfo]: 

101 """Return the list of keyspaces with APDB databases. 

102 

103 Parameters 

104 ---------- 

105 host : `str` 

106 Name of one of the hosts in Cassandra cluster. 

107 

108 Returns 

109 ------- 

110 databases : `~collections.abc.Iterable` [`DatabaseInfo`] 

111 Information about databases that contain APDB instance. 

112 """ 

113 # For DbAuth we need to use database name "*" to try to match any 

114 # database. 

115 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*") 

116 with SessionContext(config) as session: 

117 # Get names of all keyspaces containing DiaSource table 

118 table_name = ApdbTables.DiaSource.table_name() 

119 query = "select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING" 

120 result = session.execute(query, (table_name,)) 

121 keyspaces = [row[0] for row in result.all()] 

122 

123 if not keyspaces: 

124 return [] 

125 

126 # Retrieve roles for each keyspace. 

127 template = ", ".join(["%s"] * len(keyspaces)) 

128 query = ( 

129 "SELECT resource, role, permissions FROM system_auth.role_permissions " 

130 f"WHERE resource IN ({template}) ALLOW FILTERING" 

131 ) 

132 resources = [f"data/{keyspace}" for keyspace in keyspaces] 

133 try: 

134 result = session.execute(query, resources) 

135 # If anonymous access is enabled then result will be empty, 

136 # set infos to have empty permissions dict in that case. 

137 infos = {keyspace: DatabaseInfo(name=keyspace, permissions={}) for keyspace in keyspaces} 

138 for row in result: 

139 _, _, keyspace = row[0].partition("/") 

140 role: str = row[1] 

141 role_permissions: set[str] = set(row[2]) 

142 infos[keyspace].permissions[role] = role_permissions # type: ignore[index] 

143 except cassandra.Unauthorized as exc: 

144 # Likely that access to role_permissions is not granted for 

145 # current user. 

146 warnings.warn( 

147 f"Authentication information is not accessible to current user - {exc}", stacklevel=2 

148 ) 

149 infos = {keyspace: DatabaseInfo(name=keyspace) for keyspace in keyspaces} 

150 

151 # Would be nice to get size estimate, but this is not available 

152 # via CQL queries. 

153 return infos.values() 

154 

155 @classmethod 

156 def delete_database(cls, host: str, keyspace: str, *, timeout: int = 3600) -> None: 

157 """Delete APDB database by dropping its keyspace. 

158 

159 Parameters 

160 ---------- 

161 host : `str` 

162 Name of one of the hosts in Cassandra cluster. 

163 keyspace : `str` 

164 Name of keyspace to delete. 

165 timeout : `int`, optional 

166 Timeout for delete operation in seconds. Dropping a large keyspace 

167 can be a long operation, but this default value of one hour should 

168 be sufficient for most or all cases. 

169 """ 

170 # For DbAuth we need to use database name "*" to try to match any 

171 # database. 

172 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*") 

173 with SessionContext(config) as session: 

174 query = f"DROP KEYSPACE {quote_id(keyspace)}" 

175 session.execute(query, timeout=timeout) 

176 

177 @property 

178 def partitioner(self) -> Partitioner: 

179 """Partitoner used by this APDB instance (`Partitioner`).""" 

180 context = self._apdb._context 

181 return context.partitioner 

182 

183 def apdb_part(self, ra: float, dec: float) -> int: 

184 # docstring is inherited from a base class 

185 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec)) 

186 return self.partitioner.pixel(uv3d) 

187 

188 def apdb_time_part(self, midpointMjdTai: float) -> int: 

189 # docstring is inherited from a base class 

190 return self.partitioner.time_partition(midpointMjdTai) 

191 

192 def delete_records( 

193 self, 

194 objects: Iterable[DiaObjectLocator], 

195 sources: Iterable[DiaSourceLocator], 

196 forced_sources: Iterable[DiaForcedSourceLocator], 

197 ) -> None: 

198 # docstring is inherited from a base class 

199 context = self._apdb._context 

200 config = context.config 

201 keyspace = self._apdb._keyspace 

202 has_dia_object_table = not (config.enable_replica and config.replica_skips_diaobjects) 

203 

204 # Group objects by partition. 

205 partitions = defaultdict(list) 

206 for object in objects: 

207 apdb_part = self.apdb_part(object.ra, object.dec) 

208 partitions[apdb_part].append(object.diaObjectId) 

209 object_ids = set(itertools.chain.from_iterable(partitions.values())) 

210 

211 # Group sources by associated object ID. 

212 source_groups = defaultdict(list) 

213 for source in sources: 

214 if source.diaObjectId in object_ids: 

215 source_groups[source.diaObjectId].append(source) 

216 

217 object_deletes = [] 

218 object_count = 0 

219 # Delete from DiaObjectLast table. 

220 for apdb_part, oids in partitions.items(): 

221 oids = sorted(oids) 

222 object_count += len(oids) 

223 for oid_chunk in chunk_iterable(oids, 1000): 

224 oids_str = ",".join(str(oid) for oid in oid_chunk) 

225 object_deletes.append( 

226 ( 

227 f'DELETE FROM "{keyspace}"."DiaObjectLast" ' 

228 f'WHERE apdb_part = {apdb_part} and "diaObjectId" IN ({oids_str});', 

229 (), 

230 ) 

231 ) 

232 

233 # If DiaObject is in use then delete from that too. 

234 if has_dia_object_table: 

235 # Need temporal partitions for DiaObject, the only source for that 

236 # is the timestamp of the associated DiaSource. Problem here is 

237 # that DiaObject temporal partitioning is based on validityStart, 

238 # which is "visit_time"", but DiaSource does not record visit_time, 

239 # it is partitioned on midpointMjdTai. There is time_processed 

240 # defined for DiaSource but it does not match "visit_time" though 

241 # it is close. I use midpointMjdTai as approximation for 

242 # validityStart, this may skip some DiaObjects, but in production 

243 # we are not going to have DiaObjects table at all. There is also 

244 # a chance that DiaObject moves from one spatial partition to 

245 # another with the same consequences, which we also ignore. 

246 oids_by_partition: dict[tuple[int, int], list[int]] = defaultdict(list) 

247 for apdb_part, oids in partitions.items(): 

248 for oid in oids: 

249 temporal_partitions = { 

250 self.apdb_time_part(src.midpointMjdTai) for src in source_groups.get(oid, []) 

251 } 

252 for time_part in temporal_partitions: 

253 oids_by_partition[(apdb_part, time_part)].append(oid) 

254 for (apdb_part, time_part), oids in oids_by_partition.items(): 

255 for oid_chunk in chunk_iterable(oids, 1000): 

256 oids_str = ",".join(str(oid) for oid in oid_chunk) 

257 if config.partitioning.time_partition_tables: 

258 table_name = context.schema.tableName(ApdbTables.DiaObject, time_part) 

259 object_deletes.append( 

260 ( 

261 f'DELETE FROM "{keyspace}"."{table_name}" ' 

262 f'WHERE apdb_part = {apdb_part} AND "diaObjectId" IN ({oids_str})', 

263 (), 

264 ) 

265 ) 

266 else: 

267 table_name = context.schema.tableName(ApdbTables.DiaObject) 

268 object_deletes.append( 

269 ( 

270 f'DELETE FROM "{keyspace}"."{table_name}" ' 

271 f"WHERE apdb_part = {apdb_part} AND apdb_time_part = {time_part} " 

272 f'AND "diaObjectId" IN ({oids_str})', 

273 (), 

274 ) 

275 ) 

276 

277 # Delete from DiaObjectLastToPartition table. 

278 for oid_chunk in chunk_iterable(sorted(object_ids), 1000): 

279 oids_str = ",".join(str(oid) for oid in oid_chunk) 

280 object_deletes.append( 

281 ( 

282 f'DELETE FROM "{keyspace}"."DiaObjectLastToPartition" ' 

283 f'WHERE "diaObjectId" IN ({oids_str})', 

284 (), 

285 ) 

286 ) 

287 

288 # Group sources by partition. 

289 source_partitions = defaultdict(list) 

290 for source in itertools.chain.from_iterable(source_groups.values()): 

291 apdb_part = self.apdb_part(source.ra, source.dec) 

292 apdb_time_part = self.apdb_time_part(source.midpointMjdTai) 

293 source_partitions[(apdb_part, apdb_time_part)].append(source) 

294 

295 source_deletes = [] 

296 source_count = 0 

297 for (apdb_part, apdb_time_part), source_list in source_partitions.items(): 

298 source_ids = sorted(source.diaSourceId for source in source_list) 

299 source_count += len(source_ids) 

300 for id_chunk in chunk_iterable(source_ids, 1000): 

301 ids_str = ",".join(str(id) for id in id_chunk) 

302 if config.partitioning.time_partition_tables: 

303 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part) 

304 source_deletes.append( 

305 ( 

306 f'DELETE FROM "{keyspace}"."{table_name}" ' 

307 f'WHERE apdb_part = {apdb_part} and "diaSourceId" IN ({ids_str})', 

308 (), 

309 ) 

310 ) 

311 else: 

312 table_name = context.schema.tableName(ApdbTables.DiaSource) 

313 source_deletes.append( 

314 ( 

315 f'DELETE FROM "{keyspace}"."{table_name}" ' 

316 f"WHERE apdb_part = {apdb_part} AND apdb_time_part = {apdb_time_part} " 

317 f'AND "diaSourceId" IN ({ids_str})', 

318 (), 

319 ) 

320 ) 

321 

322 # Group forced sources by partition. 

323 forced_source_partitions = defaultdict(list) 

324 for forced_source in forced_sources: 

325 if forced_source.diaObjectId in object_ids: 

326 apdb_part = self.apdb_part(forced_source.ra, forced_source.dec) 

327 apdb_time_part = self.apdb_time_part(forced_source.midpointMjdTai) 

328 forced_source_partitions[(apdb_part, apdb_time_part)].append(forced_source) 

329 

330 forced_source_deletes = [] 

331 forced_source_count = 0 

332 for (apdb_part, apdb_time_part), forced_source_list in forced_source_partitions.items(): 

333 clustering_keys = sorted( 

334 (fsource.diaObjectId, fsource.visit, fsource.detector) for fsource in forced_source_list 

335 ) 

336 forced_source_count += len(clustering_keys) 

337 for key_chunk in chunk_iterable(clustering_keys, 1000): 

338 cl_str = ",".join(f"({oid}, {v}, {d})" for oid, v, d in key_chunk) 

339 if config.partitioning.time_partition_tables: 

340 table_name = context.schema.tableName(ApdbTables.DiaForcedSource, apdb_time_part) 

341 forced_source_deletes.append( 

342 ( 

343 f'DELETE FROM "{keyspace}"."{table_name}" ' 

344 f"WHERE apdb_part = {apdb_part}" 

345 f'AND ("diaObjectId", visit, detector) IN ({cl_str})', 

346 (), 

347 ) 

348 ) 

349 else: 

350 table_name = context.schema.tableName(ApdbTables.DiaForcedSource) 

351 forced_source_deletes.append( 

352 ( 

353 f'DELETE FROM "{keyspace}"."{table_name}" ' 

354 f"WHERE apdb_part = {apdb_part} " 

355 f"AND apdb_time_part = {apdb_time_part} " 

356 f'AND ("diaObjectId", visit, detector) IN ({cl_str})', 

357 (), 

358 ) 

359 ) 

360 

361 _LOG.info( 

362 "Deleting %d objects, %d sources, and %d forced sources", 

363 object_count, 

364 source_count, 

365 forced_source_count, 

366 ) 

367 

368 # Now run all queries. 

369 with self._timer("delete_forced_sources"): 

370 execute_concurrent(context.session, forced_source_deletes) 

371 with self._timer("delete_sources"): 

372 execute_concurrent(context.session, source_deletes) 

373 with self._timer("delete_objects"): 

374 execute_concurrent(context.session, object_deletes) 

375 

376 def time_partitions(self) -> ApdbCassandraTimePartitionRange: 

377 """Return range of existing time partitions. 

378 

379 Returns 

380 ------- 

381 range : `ApdbCassandraTimePartitionRange` 

382 Time partition range. 

383 

384 Raises 

385 ------ 

386 TypeError 

387 Raised if APDB instance does not use time-partition tables. 

388 """ 

389 context = self._apdb._context 

390 part_range = context.time_partitions_range 

391 if not part_range: 

392 raise TypeError("This APDB instance does not use time-partitioned tables.") 

393 return part_range 

394 

395 def extend_time_partitions( 

396 self, 

397 time: astropy.time.Time, 

398 forward: bool = True, 

399 max_delta: astropy.time.TimeDelta | None = None, 

400 ) -> list[int]: 

401 """Extend set of time-partitioned tables to include specified time. 

402 

403 Parameters 

404 ---------- 

405 time : `astropy.time.Time` 

406 Time to which to extend partitions. 

407 forward : `bool`, optional 

408 If `True` then extend partitions into the future, time should be 

409 later than the end time of the last existing partition. If `False` 

410 then extend partitions into the past, time should be earlier than 

411 the start time of the first existing partition. 

412 max_delta : `astropy.time.TimeDelta`, optional 

413 Maximum possible extension of the aprtitions, default is 365 days. 

414 

415 Returns 

416 ------- 

417 partitions : `list` [`int`] 

418 List of partitons added to the database, empty list returned if 

419 ``time`` is already in the existing partition range. 

420 

421 Raises 

422 ------ 

423 TypeError 

424 Raised if APDB instance does not use time-partition tables. 

425 ValueError 

426 Raised if extension request exceeds time limit of ``max_delta``. 

427 """ 

428 if max_delta is None: 

429 max_delta = astropy.time.TimeDelta(365, format="jd") 

430 

431 context = self._apdb._context 

432 

433 # Get current partitions. 

434 part_range = context.time_partitions_range 

435 if not part_range: 

436 raise TypeError("This APDB instance does not use time-partitioned tables.") 

437 

438 # Partitions that we need to create. 

439 partitions = self._partitions_to_add(time, forward, max_delta) 

440 if not partitions: 

441 return [] 

442 

443 _LOG.debug("New partitions to create: %s", partitions) 

444 

445 # Tables that are time-partitioned. 

446 keyspace = self._apdb._keyspace 

447 tables = context.schema.time_partitioned_tables() 

448 

449 # Easiest way to create new tables is to take DDL from existing one 

450 # and update table name. 

451 table_name_token = "%TABLE_NAME%" 

452 table_schemas = {} 

453 for table in tables: 

454 existing_table_name = context.schema.tableName(table, part_range.end) 

455 query = f'DESCRIBE TABLE "{keyspace}"."{existing_table_name}"' 

456 result = context.session.execute(query).one() 

457 if not result: 

458 raise LookupError(f'Failed to read schema for table "{keyspace}"."{existing_table_name}"') 

459 schema: str = result.create_statement 

460 schema = schema.replace(existing_table_name, table_name_token) 

461 table_schemas[table] = schema 

462 

463 # Be paranoid and check that none of the new tables exist. 

464 exsisting_tables = context.schema.existing_tables(*tables) 

465 for table in tables: 

466 new_tables = {context.schema.tableName(table, partition) for partition in partitions} 

467 old_tables = new_tables.intersection(exsisting_tables[table]) 

468 if old_tables: 

469 raise ValueError(f"Some to be created tables already exist: {old_tables}") 

470 

471 # Now can create all of them. 

472 for table, schema in table_schemas.items(): 

473 for partition in partitions: 

474 new_table_name = context.schema.tableName(table, partition) 

475 _LOG.debug("Creating table %s", new_table_name) 

476 new_ddl = schema.replace(table_name_token, new_table_name) 

477 context.session.execute(new_ddl) 

478 

479 # Update metadata. 

480 if context.has_time_partition_meta: 

481 if forward: 

482 part_range.end = max(partitions) 

483 else: 

484 part_range.start = min(partitions) 

485 part_range.save_to_meta(context.metadata) 

486 

487 return partitions 

488 

489 def _partitions_to_add( 

490 self, 

491 time: astropy.time.Time, 

492 forward: bool, 

493 max_delta: astropy.time.TimeDelta, 

494 ) -> list[int]: 

495 """Make the list of time partitions to add to current range.""" 

496 context = self._apdb._context 

497 part_range = context.time_partitions_range 

498 assert part_range is not None 

499 

500 new_partition = context.partitioner.time_partition(time) 

501 if forward: 

502 if new_partition <= part_range.end: 

503 _LOG.debug( 

504 "Partition for time=%s (%d) is below existing end (%d)", 

505 time, 

506 new_partition, 

507 part_range.end, 

508 ) 

509 return [] 

510 _, end = context.partitioner.partition_period(part_range.end) 

511 if time - end > max_delta: 

512 raise ValueError( 

513 f"Extension exceeds limit: current end time = {end.isot}, new end time = {time.isot}, " 

514 f"limit = {max_delta.jd} days" 

515 ) 

516 partitions = list(range(part_range.end + 1, new_partition + 1)) 

517 else: 

518 if new_partition >= part_range.start: 

519 _LOG.debug( 

520 "Partition for time=%s (%d) is above existing start (%d)", 

521 time, 

522 new_partition, 

523 part_range.start, 

524 ) 

525 return [] 

526 start, _ = context.partitioner.partition_period(part_range.start) 

527 if start - time > max_delta: 

528 raise ValueError( 

529 f"Extension exceeds limit: current start time = {start.isot}, " 

530 f"new start time = {time.isot}, " 

531 f"limit = {max_delta.jd} days" 

532 ) 

533 partitions = list(range(new_partition, part_range.start)) 

534 

535 return partitions 

536 

537 def delete_time_partitions( 

538 self, time: astropy.time.Time, after: bool = False, *, confirm: ConfirmDeletePartitions | None = None 

539 ) -> list[int]: 

540 """Delete time-partitioned tables before or after specified time. 

541 

542 Parameters 

543 ---------- 

544 time : `astropy.time.Time` 

545 Time before or after which to remove partitions. Partition that 

546 includes this time is not deleted. 

547 after : `bool`, optional 

548 If `True` then delete partitions after the specified time. Default 

549 is to delete partitions before this time. 

550 confirm : `~collections.abc.Callable`, optional 

551 A callable that will be called to confirm deletion of the 

552 partitions. The callable needs to accept three keyword arguments: 

553 

554 - `partitions` - a list of partition numbers to be deleted, 

555 - `tables` - a list of table names to be deleted, 

556 - `partitioner` - a `Partitioner` instance. 

557 

558 Partitions are deleted only if callable returns `True`. 

559 

560 Returns 

561 ------- 

562 partitions : `list` [`int`] 

563 List of partitons deleted from the database, empty list returned if 

564 nothing is deleted. 

565 

566 Raises 

567 ------ 

568 TypeError 

569 Raised if APDB instance does not use time-partition tables. 

570 ValueError 

571 Raised if requested to delete all partitions. 

572 """ 

573 context = self._apdb._context 

574 

575 # Get current partitions. 

576 part_range = context.time_partitions_range 

577 if not part_range: 

578 raise TypeError("This APDB instance does not use time-partitioned tables.") 

579 

580 partitions = self._partitions_to_delete(time, after) 

581 if not partitions: 

582 return [] 

583 

584 # Cannot delete all partitions. 

585 if min(partitions) == part_range.start and max(partitions) == part_range.end: 

586 raise ValueError("Cannot delete all partitions.") 

587 

588 # Tables that are time-partitioned. 

589 keyspace = self._apdb._keyspace 

590 tables = context.schema.time_partitioned_tables() 

591 

592 table_names = [] 

593 for table in tables: 

594 for partition in partitions: 

595 table_names.append(context.schema.tableName(table, partition)) 

596 

597 if confirm is not None: 

598 # It can raise an exception, but at this point it's completely 

599 # harmless. 

600 answer = confirm(partitions=partitions, tables=table_names, partitioner=context.partitioner) 

601 if not answer: 

602 return [] 

603 

604 for table_name in table_names: 

605 _LOG.debug("Dropping table %s", table_name) 

606 # Use IF EXISTS just in case. 

607 query = f'DROP TABLE IF EXISTS "{keyspace}"."{table_name}"' 

608 context.session.execute(query) 

609 

610 # Update metadata. 

611 if context.has_time_partition_meta: 

612 if after: 

613 part_range.end = min(partitions) - 1 

614 else: 

615 part_range.start = max(partitions) + 1 

616 part_range.save_to_meta(context.metadata) 

617 

618 return partitions 

619 

620 def _partitions_to_delete( 

621 self, 

622 time: astropy.time.Time, 

623 after: bool = False, 

624 ) -> list[int]: 

625 """Make the list of time partitions to delete.""" 

626 context = self._apdb._context 

627 part_range = context.time_partitions_range 

628 assert part_range is not None 

629 

630 partition = context.partitioner.time_partition(time) 

631 if after: 

632 return list(range(max(partition + 1, part_range.start), part_range.end + 1)) 

633 else: 

634 return list(range(part_range.start, min(partition, part_range.end + 1)))