Coverage for python / lsst / rucio / register / script.py: 0%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:42 +0000

1# This file is part of rucio_register 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22 

23import itertools 

24import logging 

25import os 

26from typing import Any 

27 

28import click 

29 

30from lsst.daf.butler import Butler 

31from lsst.daf.butler.cli.opt import ( 

32 log_level_option, 

33 options_file_option, 

34 query_datasets_options, 

35) 

36from lsst.daf.butler.script.queryDatasets import QueryDatasets 

37from lsst.resources import ResourcePath 

38from lsst.rucio.register.data_type import DataType 

39from lsst.rucio.register.rucio_interface import RucioInterface 

40from lsst.rucio.register.rucio_register_config import RucioRegisterConfig 

41 

42logger = logging.getLogger(__name__) 

43_FORMAT = ( 

44 "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s" 

45) 

46 

47RUCIO_REGISTER_CONFIG = "RUCIO_REGISTER_CONFIG" 

48_MSG = "environment variable not set, and no configuration was specified on the command line" 

49 

50 

51def chunks(refs, chunk_size): 

52 it = iter(refs) 

53 while True: 

54 chunk = itertools.islice(it, chunk_size) 

55 try: 

56 start = next(chunk) 

57 except StopIteration: 

58 return 

59 yield itertools.chain((start,), chunk) 

60 

61 

62def _getRucioInterface(repo, rucio_register_config, rubin_butler_type): 

63 # default to using RUCIO_REGISTER_CONFIG env variable 

64 # if that's not set, try to use the command line 

65 # if neither are set, then raise an Exception 

66 config_file = os.environ.get(RUCIO_REGISTER_CONFIG, rucio_register_config) 

67 if config_file is None: 

68 raise RuntimeError(f"{RUCIO_REGISTER_CONFIG} {_MSG}") 

69 

70 config = RucioRegisterConfig(config_file) 

71 

72 rucio_rse = config.rucio_rse 

73 scope = config.scope 

74 rse_root = config.rse_root 

75 dtn_url = config.dtn_url 

76 

77 butler = None 

78 if repo: 

79 butler = Butler(repo) 

80 

81 # create RucioInterface object used to register replicas into datasets 

82 ri = RucioInterface( 

83 butler=butler, 

84 rucio_rse=rucio_rse, 

85 scope=scope, 

86 rse_root=rse_root, 

87 dtn_url=dtn_url, 

88 rubin_butler_type=rubin_butler_type, 

89 ) 

90 return ri, butler 

91 

92 

93def _register(ri, dataset_refs, chunk_size, rucio_dataset): 

94 # register dataset_refs with Rucio into the rucio dataset, in chunks 

95 for refs in chunks(dataset_refs, chunk_size): 

96 cnt = ri.register_as_replicas(rucio_dataset, refs) 

97 logger.debug("%d butler datasets registered", cnt) 

98 

99 

100def _register_zips(ri, zip_files, chunk_size, rucio_dataset): 

101 # register dataset_refs with Rucio into the rucio dataset, in chunks 

102 for zip_file in zip_files: 

103 rp = ResourcePath(zip_file) 

104 cnt = ri.register_zips(rucio_dataset, [rp]) 

105 logger.debug("%d zips registered", cnt) 

106 

107 

108def _register_dims(ri, dim_files, chunk_size, rucio_dataset): 

109 # register dataset_refs with Rucio into the rucio dataset, in chunks 

110 for dim_file in dim_files: 

111 rp = ResourcePath(dim_file) 

112 cnt = ri.register_dims(rucio_dataset, [rp]) 

113 logger.debug("%d dimension files registered", cnt) 

114 

115 

116def _set_log_level(log_level): 

117 if len(log_level): 

118 level = log_level[None] 

119 logging_num_level = getattr(logging, level.upper(), None) 

120 else: 

121 logging_num_level = logging.WARNING 

122 logging.basicConfig(level=logging_num_level, format=(_FORMAT), datefmt="%Y-%m-%d %H:%M:%S") 

123 

124 

125@click.group(context_settings={"help_option_names": ["-h", "--help"]}) 

126def main(): 

127 pass 

128 

129 

130@click.option("-r", "--repo", required=True, type=str, help="butler repository") 

131@click.option("-c", "--collections", required=True, type=str, help="collections for lookup") 

132@click.option("-t", "--dataset-type", required=True, type=str, help="dataset type for lookup") 

133@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

134@click.option( 

135 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

136) 

137@click.option( 

138 "-s", 

139 "--chunk-size", 

140 required=False, 

141 type=int, 

142 default=30, 

143 help="number of replica requests to make at once", 

144) 

145@log_level_option() 

146@main.command() 

147def data_products( 

148 repo, collections, dataset_type, rucio_dataset, rucio_register_config, chunk_size, log_level 

149): 

150 _set_log_level(log_level) 

151 

152 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.DATA_PRODUCT) 

153 

154 # query the butler for the datasets specified on the commmand line 

155 dataset_refs = butler.registry.queryDatasets(dataset_type, collections=collections) 

156 

157 _register(ri, dataset_refs, chunk_size, rucio_dataset) 

158 

159 

160def _get_and_delete(kwargs, key): 

161 x = kwargs.get(key, None) 

162 if x is None: 

163 return x 

164 del kwargs[key] 

165 return x 

166 

167 

168@main.command() 

169@click.option("-r", "--repo", required=True, type=str, help="butler repository") 

170@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

171@click.option( 

172 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

173) 

174@click.option( 

175 "-s", 

176 "--chunk-size", 

177 required=False, 

178 type=int, 

179 default=30, 

180 help="number of replica requests to make at once", 

181) 

182@log_level_option() 

183@options_file_option() 

184@query_datasets_options(repo=False, showUri=True) 

185@log_level_option() 

186def raws(**kwargs: Any) -> None: 

187 # get and delete from kwargs; QueryDatasets doesn't like extra args 

188 log_level = _get_and_delete(kwargs, "log_level") 

189 _set_log_level(log_level) 

190 

191 rucio_register_config = _get_and_delete(kwargs, "rucio_register_config") 

192 rucio_dataset = _get_and_delete(kwargs, "rucio_dataset") 

193 chunk_size = _get_and_delete(kwargs, "chunk_size") 

194 

195 repo = kwargs["repo"] 

196 

197 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.RAW_FILE) 

198 

199 # chain is needed to flatten the list of lists returned by getDatasets() 

200 dataset_refs = itertools.chain.from_iterable(QueryDatasets(**kwargs).getDatasets()) 

201 

202 _register(ri, dataset_refs, chunk_size, rucio_dataset) 

203 

204 

205@main.command() 

206@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

207@click.option( 

208 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

209) 

210@click.option( 

211 "-s", 

212 "--chunk-size", 

213 required=False, 

214 type=int, 

215 default=30, 

216 help="number of replica requests to make at once", 

217) 

218@click.option("-z", "--zip-file", required=True, help="zip file to register") 

219@log_level_option() 

220def zips(rucio_dataset, rucio_register_config, chunk_size, zip_file, log_level): 

221 _set_log_level(log_level) 

222 

223 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.ZIP_FILE) 

224 

225 _register_zips(ri, [zip_file], chunk_size, rucio_dataset) 

226 

227 

228@main.command() 

229@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to") 

230@click.option( 

231 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration" 

232) 

233@click.option( 

234 "-s", 

235 "--chunk-size", 

236 required=False, 

237 type=int, 

238 default=30, 

239 help="number of replica requests to make at once", 

240) 

241@click.option("-D", "--dimension-file", required=True, help="dimension file to register") 

242@log_level_option() 

243def dimensions(rucio_dataset, rucio_register_config, chunk_size, dimension_file, log_level): 

244 _set_log_level(log_level) 

245 

246 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.DIM_FILE) 

247 

248 _register_dims(ri, [dimension_file], chunk_size, rucio_dataset)