Coverage for python / lsst / dax / apdb / scripts / partition.py: 12%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:43 +0000

1# This file is part of dax_ppdb 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["partition_delete_temporal", "partition_extend_temporal", "partition_show_temporal"] 

25 

26import sys 

27 

28import astropy.time 

29 

30from ..apdb import Apdb 

31from ..cassandra import ApdbCassandra 

32from ..cassandra.partitioner import Partitioner 

33 

34 

35def partition_show_temporal(apdb_config: str) -> int: 

36 """Print range of temporal partitions. 

37 

38 Parameters 

39 ---------- 

40 apdb_config : `str` 

41 URL for APDB configuration file. 

42 """ 

43 apdb = Apdb.from_uri(apdb_config) 

44 if not isinstance(apdb, ApdbCassandra): 

45 print("ERROR: Non-Cassandra APDB does not use time-partitioned tables.", file=sys.stderr) 

46 return 1 

47 

48 admin = apdb.admin 

49 try: 

50 part_range = admin.time_partitions() 

51 except TypeError as exc: 

52 print(f"ERROR: {exc}", file=sys.stderr) 

53 return 1 

54 

55 start_time, _ = admin.partitioner.partition_period(part_range.start) 

56 _, end_time = admin.partitioner.partition_period(part_range.end) 

57 

58 print( 

59 f"Current time partition range: {part_range.start} - {part_range.end} " 

60 f"[{start_time.tai.isot}, {end_time.tai.isot})" 

61 ) 

62 

63 return 0 

64 

65 

66def partition_extend_temporal(apdb_config: str, time: str, past: bool, max_days: int) -> int: 

67 """Extend the range of temporal partitions. 

68 

69 Parameters 

70 ---------- 

71 apdb_config : `str` 

72 URL for APDB configuration file. 

73 time : `str` 

74 Timestamps in ISOT format and TAI scale. 

75 past : `bool` 

76 If `True` extend the range in the past. 

77 max_days : `int` 

78 Max. number of days for extension. 

79 """ 

80 try: 

81 astro_time = astropy.time.Time(time, format="isot", scale="tai") 

82 max_delta = astropy.time.TimeDelta(float(max_days), format="jd", scale="tai") 

83 except ValueError as exc: 

84 print(f"ERROR: {exc}", file=sys.stderr) 

85 return 1 

86 

87 apdb = Apdb.from_uri(apdb_config) 

88 if not isinstance(apdb, ApdbCassandra): 

89 print("ERROR: Non-Cassandra APDB does not use time-partitioned tables.", file=sys.stderr) 

90 return 1 

91 

92 admin = apdb.admin 

93 try: 

94 result = admin.extend_time_partitions(astro_time, forward=not past, max_delta=max_delta) 

95 except (TypeError, ValueError) as exc: 

96 print(f"ERROR: {exc}", file=sys.stderr) 

97 return 1 

98 

99 part_range = admin.time_partitions() 

100 start_time, _ = admin.partitioner.partition_period(part_range.start) 

101 _, end_time = admin.partitioner.partition_period(part_range.end) 

102 

103 if result: 

104 print( 

105 "Time partition range succesfully extended.\n" 

106 f"New time partition range: {part_range.start} - {part_range.end} " 

107 f"[{start_time.tai.isot}, {end_time.tai.isot})" 

108 ) 

109 else: 

110 print( 

111 "Time partition range was not extended.\n" 

112 f"Current time partition range: {part_range.start} - {part_range.end} " 

113 f"[{start_time.tai.isot}, {end_time.tai.isot})" 

114 ) 

115 

116 return 0 

117 

118 

119def partition_delete_temporal(apdb_config: str, time: str, after: bool, force: bool) -> int: 

120 """Delete some temporal partitions. 

121 

122 Parameters 

123 ---------- 

124 apdb_config : `str` 

125 URL for APDB configuration file. 

126 time : `str` 

127 Timestamps in ISOT format and TAI scale. Partition that includes this 

128 time is not deleted. 

129 after : `bool` 

130 If `True` then delete partitions after the specified time. Default is 

131 to delete partitions before this time. 

132 force : `bool` 

133 If `True` then do not ask confirmation. 

134 """ 

135 try: 

136 astro_time = astropy.time.Time(time, format="isot", scale="tai") 

137 except ValueError as exc: 

138 print(f"ERROR: {exc}", file=sys.stderr) 

139 return 1 

140 

141 apdb = Apdb.from_uri(apdb_config) 

142 if not isinstance(apdb, ApdbCassandra): 

143 print("ERROR: Non-Cassandra APDB does not use time-partitioned tables.", file=sys.stderr) 

144 return 1 

145 

146 admin = apdb.admin 

147 try: 

148 result = admin.delete_time_partitions( 

149 astro_time, after=after, confirm=None if force else _confirm_delete 

150 ) 

151 except (TypeError, ValueError) as exc: 

152 print(f"ERROR: {exc}", file=sys.stderr) 

153 return 1 

154 

155 part_range = admin.time_partitions() 

156 start_time, _ = admin.partitioner.partition_period(part_range.start) 

157 _, end_time = admin.partitioner.partition_period(part_range.end) 

158 

159 if result: 

160 print( 

161 "Time partitions succesfully deleted.\n" 

162 f"New time partition range: {part_range.start} - {part_range.end} " 

163 f"[{start_time.tai.isot}, {end_time.tai.isot})" 

164 ) 

165 else: 

166 print( 

167 "Time partitions were not deleted.\n" 

168 f"Current time partition range: {part_range.start} - {part_range.end} " 

169 f"[{start_time.tai.isot}, {end_time.tai.isot})" 

170 ) 

171 

172 return 0 

173 

174 

175def _confirm_delete(*, partitions: list[int], tables: list[str], partitioner: Partitioner) -> bool: 

176 print("Partitions to be deleted:") 

177 for part in partitions: 

178 start_time, end_time = partitioner.partition_period(part) 

179 print(f" {part}: [{start_time.tai.isot}, {end_time.tai.isot})") 

180 answer = input("Confirm deletion y/n: ") 

181 return answer.strip().lower() == "y"