Coverage for python / lsst / dax / apdb / scripts / replication.py: 14%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:43 +0000
1# This file is part of dax_ppdb
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["replication_delete_chunks", "replication_list_chunks"]
26import sys
27from collections.abc import Collection
29from ..apdbReplica import ApdbReplica, ReplicaChunk
32def replication_list_chunks(apdb_config: str) -> None:
33 """Print full list of replica chunks existing in APDB.
35 Parameters
36 ----------
37 apdb_config : `str`
38 URL for APDB configuration file.
39 """
40 apdb = ApdbReplica.from_uri(apdb_config)
41 chunks = apdb.getReplicaChunks()
42 if chunks is not None:
43 chunks = sorted(chunks, key=lambda chunk: chunk.id)
44 _print_chunks(chunks)
45 else:
46 print("APDB instance does not support replication")
49def replication_delete_chunks(apdb_config: str, chunk_id: int, force: bool, print_only: bool) -> int:
50 """Delete replication chunks from APDB.
52 Parameters
53 ----------
54 apdb_config : `str`
55 URL for APDB configuration file.
56 chunk_id : `int`
57 Chunk to delete, all earlier chunks are deleted as well.
58 force : `bool`
59 If `True` do not ask confirmation.
60 print_only : `bool`
61 If `True` print the list of chunks, but do not delete anything.
62 """
63 apdb = ApdbReplica.from_uri(apdb_config)
64 chunks = apdb.getReplicaChunks()
65 if chunks is None:
66 print("APDB instance does not support replications")
67 else:
68 chunks = sorted(chunks, key=lambda chunk: chunk.id)
69 # Chunks to delete.
70 chunks = [chunk for chunk in chunks if chunk.id <= chunk_id]
71 # Check that given chunk ID actually exists.
72 if not chunks or chunks[-1].id != chunk_id:
73 print(f"ERROR: Replication chunk with ID={chunk_id} does not exist", file=sys.stderr)
74 return 1
76 if print_only:
77 print("Following chunks will be deleted:")
78 _print_chunks(chunks)
79 return 0
81 if not force:
82 try:
83 response = input(f"{len(chunks)} chunks will be removed, y[n]? ")
84 except EOFError:
85 response = ""
86 if response not in ("y", "Y"):
87 return 0
89 apdb.deleteReplicaChunks(chunk.id for chunk in chunks)
91 return 0
94def _print_chunks(chunks: Collection[ReplicaChunk]) -> None:
95 """Print the list of chunks.
97 Parameters
98 ----------
99 chunks : `~collections.abc.Collection` [`ReplicaChunk`]
100 Chunks to print.
101 """
102 print(" Chunk Id Update time Unique Id")
103 sep = "-" * 77
104 print(sep)
105 for chunk in chunks:
106 insert_time = chunk.last_update_time
107 print(f"{chunk.id:10d} {insert_time.tai.isot}/tai {chunk.unique_id}")
108 print(sep)
109 print(f"Total: {len(chunks)}")