Coverage for python / lsst / rucio / register / script.py: 0%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:56 +0000

1# This file is part of rucio_register 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22 

23import itertools 

24import logging 

25import os 

26from typing import Any 

27 

28import click 

29 

30from lsst.daf.butler import Butler 

31from lsst.daf.butler.cli.opt import ( 

32 log_level_option, 

33 options_file_option, 

34 query_datasets_options, 

35) 

36from lsst.daf.butler.script.queryDatasets import QueryDatasets 

37from lsst.resources import ResourcePath 

38from lsst.rucio.register.data_type import DataType 

39from lsst.rucio.register.rucio_interface import RucioInterface 

40from lsst.rucio.register.rucio_register_config import RucioRegisterConfig 

41 

42logger = logging.getLogger(__name__) 

43_FORMAT = ( 

44 "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" 

45) 

46 

47RUCIO_REGISTER_CONFIG = "RUCIO_REGISTER_CONFIG" 

48_MSG = "environment variable not set, and no configuration was specified on the command line" 

49 

50 

51def chunks(refs, chunk_size): 

52 it = iter(refs) 

53 while True: 

54 chunk = itertools.islice(it, chunk_size) 

55 try: 

56 start = next(chunk) 

57 except StopIteration: 

58 return 

59 yield itertools.chain((start,), chunk) 

60 

61 

62def _getRucioInterface(repo, rucio_register_config, rubin_butler_type): 

63 # default to using RUCIO_REGISTER_CONFIG env variable 

64 # if that's not set, try to use the command line 

65 # if neither are set, then raise an Exception 

66 config_file = os.environ.get(RUCIO_REGISTER_CONFIG, rucio_register_config) 

67 if config_file is None: 

68 raise RuntimeError(f"{RUCIO_REGISTER_CONFIG} {_MSG}") 

69 

70 config = RucioRegisterConfig(config_file) 

71 

72 rucio_rse = config.rucio_rse 

73 scope = config.scope 

74 rse_root = config.rse_root 

75 dtn_url = config.dtn_url 

76 

77 butler = None 

78 if repo: 

79 butler = Butler(repo) 

80 

81 # create RucioInterface object used to register replicas into datasets 

82 ri = RucioInterface( 

83 butler=butler, 

84 rucio_rse=rucio_rse, 

85 scope=scope, 

86 rse_root=rse_root, 

87 dtn_url=dtn_url, 

88 rubin_butler_type=rubin_butler_type, 

89 ) 

90 return ri, butler 

91 

92 

93def _register(ri, dataset_refs, chunk_size, rucio_dataset): 

94 # register dataset_refs with Rucio into the rucio dataset, in chunks 

95 for refs in chunks(dataset_refs, chunk_size): 

96 cnt = ri.register_as_replicas(rucio_dataset, refs) 

97 logger.debug("%d butler datasets registered", cnt) 

98 

99 

100def _register_zips(ri, zip_files, chunk_size, rucio_dataset): 

101 # register dataset_refs with Rucio into the rucio dataset, in chunks 

102 for zip_file in zip_files: 

103 rp = ResourcePath(zip_file) 

104 cnt = ri.register_zips(rucio_dataset, [rp]) 

105 logger.debug("%d zips registered", cnt) 

106 

107 

108def _register_dims(ri, dim_files, chunk_size, rucio_dataset): 

109 # register dataset_refs with Rucio into the rucio dataset, in chunks 

110 for dim_file in dim_files: 

111 rp = ResourcePath(dim_file) 

112 cnt = ri.register_dims(rucio_dataset, [rp]) 

113 logger.debug("%d dimension files registered", cnt) 

114 

115 

116def _set_log_level(log_level): 

117 if len(log_level): 

118 level = log_level[None] 

119 logging_num_level = getattr(logging, level.upper(), None) 

120 else: 

121 logging_num_level = logging.INFO 

122 logging.basicConfig(level=logging_num_level, format=(_FORMAT), datefmt="%Y-%m-%d %H:%M:%S") 

123 

124 

125@click.group(context_settings={"help_option_names": ["-h", "--help"]}) 

126def main(): 

127 pass 

128 

129 

130def _get_and_delete(kwargs, key): 

131 x = kwargs.get(key, None) 

132 if x is None: 

133 return x 

134 del kwargs[key] 

135 return x 

136 

137 

138@main.command() 

139@click.option("--repo", required=True, type=str, help="butler repository") 

140@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

141@click.option("--rucio-register-config", required=False, type=str, help="registration configuration file") 

142@click.option( 

143 "--chunk-size", 

144 required=False, 

145 type=int, 

146 default=30, 

147 help="number of replica requests to make at once", 

148) 

149@log_level_option() 

150@options_file_option() 

151@query_datasets_options(repo=False, showUri=True, useArguments=False) 

152def data_products(**kwargs: Any) -> None: 

153 # get and delete from kwargs; QueryDatasets doesn't like extra args 

154 log_level = kwargs.get("log_level", None) 

155 _set_log_level(log_level) 

156 

157 rucio_register_config = kwargs.get("rucio_register_config", None) 

158 rucio_dataset = kwargs.get("rucio_dataset", None) 

159 chunk_size = kwargs.get("chunk_size", None) 

160 

161 repo = kwargs.get("repo", None) 

162 collections = kwargs.get("collections", None) 

163 where = kwargs.get("where", None) 

164 find_first = kwargs.get("find_first", None) 

165 limit = kwargs.get("limit", None) 

166 order_by = kwargs.get("order_by", None) 

167 dataset_type = kwargs.get("dataset_type", None) 

168 

169 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.DATA_PRODUCT) 

170 

171 query = QueryDatasets( 

172 butler=butler, 

173 glob=dataset_type, 

174 collections=collections, 

175 where=where, 

176 find_first=find_first, 

177 limit=limit, 

178 order_by=order_by, 

179 show_uri=False, 

180 with_dimension_records=True, 

181 ) 

182 

183 dataset_refs = itertools.chain(*query.getDatasets()) 

184 

185 _register(ri, dataset_refs, chunk_size, rucio_dataset) 

186 

187 

188@main.command() 

189@click.option("--repo", required=True, type=str, help="butler repository") 

190@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

191@click.option( 

192 "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

193) 

194@click.option( 

195 "--chunk-size", 

196 required=False, 

197 type=int, 

198 default=30, 

199 help="number of replica requests to make at once", 

200) 

201@log_level_option() 

202@options_file_option() 

203@query_datasets_options(repo=False, showUri=True) 

204def raws(**kwargs: Any) -> None: 

205 # get and delete from kwargs; QueryDatasets doesn't like extra args 

206 log_level = _get_and_delete(kwargs, "log_level") 

207 _set_log_level(log_level) 

208 

209 rucio_register_config = _get_and_delete(kwargs, "rucio_register_config") 

210 rucio_dataset = _get_and_delete(kwargs, "rucio_dataset") 

211 chunk_size = _get_and_delete(kwargs, "chunk_size") 

212 

213 repo = kwargs["repo"] 

214 

215 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.RAW_FILE) 

216 

217 # chain is needed to flatten the list of lists returned by getDatasets() 

218 dataset_refs = itertools.chain.from_iterable(QueryDatasets(**kwargs).getDatasets()) 

219 

220 _register(ri, dataset_refs, chunk_size, rucio_dataset) 

221 

222 

223@main.command() 

224@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

225@click.option( 

226 "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

227) 

228@click.option( 

229 "--chunk-size", 

230 required=False, 

231 type=int, 

232 default=30, 

233 help="number of replica requests to make at once", 

234) 

235@click.option("--zip-file", required=True, help="zip file to register") 

236@log_level_option() 

237def zips(rucio_dataset, rucio_register_config, chunk_size, zip_file, log_level): 

238 _set_log_level(log_level) 

239 

240 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.ZIP_FILE) 

241 

242 _register_zips(ri, [zip_file], chunk_size, rucio_dataset) 

243 

244 

245@main.command() 

246@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

247@click.option( 

248 "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

249) 

250@click.option( 

251 "--chunk-size", 

252 required=False, 

253 type=int, 

254 default=30, 

255 help="number of replica requests to make at once", 

256) 

257@click.option("--dimension-file", required=True, help="dimension file to register") 

258@log_level_option() 

259def dimensions(rucio_dataset, rucio_register_config, chunk_size, dimension_file, log_level): 

260 _set_log_level(log_level) 

261 

262 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.DIM_FILE) 

263 

264 _register_dims(ri, [dimension_file], chunk_size, rucio_dataset)