Coverage for python / lsst / rucio / register / script.py: 0%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:53 +0000
1# This file is part of rucio_register
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
23import itertools
24import logging
25import os
26from typing import Any
28import click
30from lsst.daf.butler import Butler
31from lsst.daf.butler.cli.opt import (
32 log_level_option,
33 options_file_option,
34 query_datasets_options,
35)
36from lsst.daf.butler.script.queryDatasets import QueryDatasets
37from lsst.resources import ResourcePath
38from lsst.rucio.register.data_type import DataType
39from lsst.rucio.register.rucio_interface import RucioInterface
40from lsst.rucio.register.rucio_register_config import RucioRegisterConfig
42logger = logging.getLogger(__name__)
43_FORMAT = (
44 "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
45)
47RUCIO_REGISTER_CONFIG = "RUCIO_REGISTER_CONFIG"
48_MSG = "environment variable not set, and no configuration was specified on the command line"
51def chunks(refs, chunk_size):
52 it = iter(refs)
53 while True:
54 chunk = itertools.islice(it, chunk_size)
55 try:
56 start = next(chunk)
57 except StopIteration:
58 return
59 yield itertools.chain((start,), chunk)
62def _getRucioInterface(repo, rucio_register_config, rubin_butler_type):
63 # default to using RUCIO_REGISTER_CONFIG env variable
64 # if that's not set, try to use the command line
65 # if neither are set, then raise an Exception
66 config_file = os.environ.get(RUCIO_REGISTER_CONFIG, rucio_register_config)
67 if config_file is None:
68 raise RuntimeError(f"{RUCIO_REGISTER_CONFIG} {_MSG}")
70 config = RucioRegisterConfig(config_file)
72 rucio_rse = config.rucio_rse
73 scope = config.scope
74 rse_root = config.rse_root
75 dtn_url = config.dtn_url
77 butler = None
78 if repo:
79 butler = Butler(repo)
81 # create RucioInterface object used to register replicas into datasets
82 ri = RucioInterface(
83 butler=butler,
84 rucio_rse=rucio_rse,
85 scope=scope,
86 rse_root=rse_root,
87 dtn_url=dtn_url,
88 rubin_butler_type=rubin_butler_type,
89 )
90 return ri, butler
93def _register(ri, dataset_refs, chunk_size, rucio_dataset):
94 # register dataset_refs with Rucio into the rucio dataset, in chunks
95 for refs in chunks(dataset_refs, chunk_size):
96 cnt = ri.register_as_replicas(rucio_dataset, refs)
97 logger.debug("%d butler datasets registered", cnt)
100def _register_zips(ri, zip_files, chunk_size, rucio_dataset):
101 # register dataset_refs with Rucio into the rucio dataset, in chunks
102 for zip_file in zip_files:
103 rp = ResourcePath(zip_file)
104 cnt = ri.register_zips(rucio_dataset, [rp])
105 logger.debug("%d zips registered", cnt)
108def _register_dims(ri, dim_files, chunk_size, rucio_dataset):
109 # register dataset_refs with Rucio into the rucio dataset, in chunks
110 for dim_file in dim_files:
111 rp = ResourcePath(dim_file)
112 cnt = ri.register_dims(rucio_dataset, [rp])
113 logger.debug("%d dimension files registered", cnt)
116def _set_log_level(log_level):
117 if len(log_level):
118 level = log_level[None]
119 logging_num_level = getattr(logging, level.upper(), None)
120 else:
121 logging_num_level = logging.INFO
122 logging.basicConfig(level=logging_num_level, format=(_FORMAT), datefmt="%Y-%m-%d %H:%M:%S")
125@click.group(context_settings={"help_option_names": ["-h", "--help"]})
126def main():
127 pass
130def _get_and_delete(kwargs, key):
131 x = kwargs.get(key, None)
132 if x is None:
133 return x
134 del kwargs[key]
135 return x
138@main.command()
139@click.option("--repo", required=True, type=str, help="butler repository")
140@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
141@click.option("--rucio-register-config", required=False, type=str, help="registration configuration file")
142@click.option(
143 "--chunk-size",
144 required=False,
145 type=int,
146 default=30,
147 help="number of replica requests to make at once",
148)
149@log_level_option()
150@options_file_option()
151@query_datasets_options(repo=False, showUri=True, useArguments=False)
152def data_products(**kwargs: Any) -> None:
153 # get and delete from kwargs; QueryDatasets doesn't like extra args
154 log_level = kwargs.get("log_level", None)
155 _set_log_level(log_level)
157 rucio_register_config = kwargs.get("rucio_register_config", None)
158 rucio_dataset = kwargs.get("rucio_dataset", None)
159 chunk_size = kwargs.get("chunk_size", None)
161 repo = kwargs.get("repo", None)
162 collections = kwargs.get("collections", None)
163 where = kwargs.get("where", None)
164 find_first = kwargs.get("find_first", None)
165 limit = kwargs.get("limit", None)
166 order_by = kwargs.get("order_by", None)
167 dataset_type = kwargs.get("dataset_type", None)
169 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.DATA_PRODUCT)
171 query = QueryDatasets(
172 butler=butler,
173 glob=dataset_type,
174 collections=collections,
175 where=where,
176 find_first=find_first,
177 limit=limit,
178 order_by=order_by,
179 show_uri=False,
180 with_dimension_records=True,
181 )
183 dataset_refs = itertools.chain(*query.getDatasets())
185 _register(ri, dataset_refs, chunk_size, rucio_dataset)
188@main.command()
189@click.option("--repo", required=True, type=str, help="butler repository")
190@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
191@click.option(
192 "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
193)
194@click.option(
195 "--chunk-size",
196 required=False,
197 type=int,
198 default=30,
199 help="number of replica requests to make at once",
200)
201@log_level_option()
202@options_file_option()
203@query_datasets_options(repo=False, showUri=True)
204def raws(**kwargs: Any) -> None:
205 # get and delete from kwargs; QueryDatasets doesn't like extra args
206 log_level = _get_and_delete(kwargs, "log_level")
207 _set_log_level(log_level)
209 rucio_register_config = _get_and_delete(kwargs, "rucio_register_config")
210 rucio_dataset = _get_and_delete(kwargs, "rucio_dataset")
211 chunk_size = _get_and_delete(kwargs, "chunk_size")
213 repo = kwargs["repo"]
215 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.RAW_FILE)
217 # chain is needed to flatten the list of lists returned by getDatasets()
218 dataset_refs = itertools.chain.from_iterable(QueryDatasets(**kwargs).getDatasets())
220 _register(ri, dataset_refs, chunk_size, rucio_dataset)
223@main.command()
224@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
225@click.option(
226 "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
227)
228@click.option(
229 "--chunk-size",
230 required=False,
231 type=int,
232 default=30,
233 help="number of replica requests to make at once",
234)
235@click.option("--zip-file", required=True, help="zip file to register")
236@log_level_option()
237def zips(rucio_dataset, rucio_register_config, chunk_size, zip_file, log_level):
238 _set_log_level(log_level)
240 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.ZIP_FILE)
242 _register_zips(ri, [zip_file], chunk_size, rucio_dataset)
245@main.command()
246@click.option("--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
247@click.option(
248 "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
249)
250@click.option(
251 "--chunk-size",
252 required=False,
253 type=int,
254 default=30,
255 help="number of replica requests to make at once",
256)
257@click.option("--dimension-file", required=True, help="dimension file to register")
258@log_level_option()
259def dimensions(rucio_dataset, rucio_register_config, chunk_size, dimension_file, log_level):
260 _set_log_level(log_level)
262 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.DIM_FILE)
264 _register_dims(ri, [dimension_file], chunk_size, rucio_dataset)