Coverage for python / lsst / rucio / register / script.py: 0%
120 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:42 +0000
1# This file is part of rucio_register
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
23import itertools
24import logging
25import os
26from typing import Any
28import click
30from lsst.daf.butler import Butler
31from lsst.daf.butler.cli.opt import (
32 log_level_option,
33 options_file_option,
34 query_datasets_options,
35)
36from lsst.daf.butler.script.queryDatasets import QueryDatasets
37from lsst.resources import ResourcePath
38from lsst.rucio.register.data_type import DataType
39from lsst.rucio.register.rucio_interface import RucioInterface
40from lsst.rucio.register.rucio_register_config import RucioRegisterConfig
42logger = logging.getLogger(__name__)
43_FORMAT = (
44 "%(levelname) -10s %(asctime)s.%(msecs)03dZ %(name) -30s %(funcName) -35s %(lineno) -5d: %(message)s"
45)
47RUCIO_REGISTER_CONFIG = "RUCIO_REGISTER_CONFIG"
48_MSG = "environment variable not set, and no configuration was specified on the command line"
51def chunks(refs, chunk_size):
52 it = iter(refs)
53 while True:
54 chunk = itertools.islice(it, chunk_size)
55 try:
56 start = next(chunk)
57 except StopIteration:
58 return
59 yield itertools.chain((start,), chunk)
62def _getRucioInterface(repo, rucio_register_config, rubin_butler_type):
63 # default to using RUCIO_REGISTER_CONFIG env variable
64 # if that's not set, try to use the command line
65 # if neither are set, then raise an Exception
66 config_file = os.environ.get(RUCIO_REGISTER_CONFIG, rucio_register_config)
67 if config_file is None:
68 raise RuntimeError(f"{RUCIO_REGISTER_CONFIG} {_MSG}")
70 config = RucioRegisterConfig(config_file)
72 rucio_rse = config.rucio_rse
73 scope = config.scope
74 rse_root = config.rse_root
75 dtn_url = config.dtn_url
77 butler = None
78 if repo:
79 butler = Butler(repo)
81 # create RucioInterface object used to register replicas into datasets
82 ri = RucioInterface(
83 butler=butler,
84 rucio_rse=rucio_rse,
85 scope=scope,
86 rse_root=rse_root,
87 dtn_url=dtn_url,
88 rubin_butler_type=rubin_butler_type,
89 )
90 return ri, butler
93def _register(ri, dataset_refs, chunk_size, rucio_dataset):
94 # register dataset_refs with Rucio into the rucio dataset, in chunks
95 for refs in chunks(dataset_refs, chunk_size):
96 cnt = ri.register_as_replicas(rucio_dataset, refs)
97 logger.debug("%d butler datasets registered", cnt)
100def _register_zips(ri, zip_files, chunk_size, rucio_dataset):
101 # register dataset_refs with Rucio into the rucio dataset, in chunks
102 for zip_file in zip_files:
103 rp = ResourcePath(zip_file)
104 cnt = ri.register_zips(rucio_dataset, [rp])
105 logger.debug("%d zips registered", cnt)
108def _register_dims(ri, dim_files, chunk_size, rucio_dataset):
109 # register dataset_refs with Rucio into the rucio dataset, in chunks
110 for dim_file in dim_files:
111 rp = ResourcePath(dim_file)
112 cnt = ri.register_dims(rucio_dataset, [rp])
113 logger.debug("%d dimension files registered", cnt)
116def _set_log_level(log_level):
117 if len(log_level):
118 level = log_level[None]
119 logging_num_level = getattr(logging, level.upper(), None)
120 else:
121 logging_num_level = logging.WARNING
122 logging.basicConfig(level=logging_num_level, format=(_FORMAT), datefmt="%Y-%m-%d %H:%M:%S")
125@click.group(context_settings={"help_option_names": ["-h", "--help"]})
126def main():
127 pass
130@click.option("-r", "--repo", required=True, type=str, help="butler repository")
131@click.option("-c", "--collections", required=True, type=str, help="collections for lookup")
132@click.option("-t", "--dataset-type", required=True, type=str, help="dataset type for lookup")
133@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
134@click.option(
135 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
136)
137@click.option(
138 "-s",
139 "--chunk-size",
140 required=False,
141 type=int,
142 default=30,
143 help="number of replica requests to make at once",
144)
145@log_level_option()
146@main.command()
147def data_products(
148 repo, collections, dataset_type, rucio_dataset, rucio_register_config, chunk_size, log_level
149):
150 _set_log_level(log_level)
152 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.DATA_PRODUCT)
154 # query the butler for the datasets specified on the commmand line
155 dataset_refs = butler.registry.queryDatasets(dataset_type, collections=collections)
157 _register(ri, dataset_refs, chunk_size, rucio_dataset)
160def _get_and_delete(kwargs, key):
161 x = kwargs.get(key, None)
162 if x is None:
163 return x
164 del kwargs[key]
165 return x
168@main.command()
169@click.option("-r", "--repo", required=True, type=str, help="butler repository")
170@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
171@click.option(
172 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
173)
174@click.option(
175 "-s",
176 "--chunk-size",
177 required=False,
178 type=int,
179 default=30,
180 help="number of replica requests to make at once",
181)
182@log_level_option()
183@options_file_option()
184@query_datasets_options(repo=False, showUri=True)
185@log_level_option()
186def raws(**kwargs: Any) -> None:
187 # get and delete from kwargs; QueryDatasets doesn't like extra args
188 log_level = _get_and_delete(kwargs, "log_level")
189 _set_log_level(log_level)
191 rucio_register_config = _get_and_delete(kwargs, "rucio_register_config")
192 rucio_dataset = _get_and_delete(kwargs, "rucio_dataset")
193 chunk_size = _get_and_delete(kwargs, "chunk_size")
195 repo = kwargs["repo"]
197 ri, butler = _getRucioInterface(repo, rucio_register_config, DataType.RAW_FILE)
199 # chain is needed to flatten the list of lists returned by getDatasets()
200 dataset_refs = itertools.chain.from_iterable(QueryDatasets(**kwargs).getDatasets())
202 _register(ri, dataset_refs, chunk_size, rucio_dataset)
205@main.command()
206@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
207@click.option(
208 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
209)
210@click.option(
211 "-s",
212 "--chunk-size",
213 required=False,
214 type=int,
215 default=30,
216 help="number of replica requests to make at once",
217)
218@click.option("-z", "--zip-file", required=True, help="zip file to register")
219@log_level_option()
220def zips(rucio_dataset, rucio_register_config, chunk_size, zip_file, log_level):
221 _set_log_level(log_level)
223 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.ZIP_FILE)
225 _register_zips(ri, [zip_file], chunk_size, rucio_dataset)
228@main.command()
229@click.option("-d", "--rucio-dataset", required=True, type=str, help="rucio dataset to register files to")
230@click.option(
231 "-C", "--rucio-register-config", required=False, type=str, help="configuration file used for registration"
232)
233@click.option(
234 "-s",
235 "--chunk-size",
236 required=False,
237 type=int,
238 default=30,
239 help="number of replica requests to make at once",
240)
241@click.option("-D", "--dimension-file", required=True, help="dimension file to register")
242@log_level_option()
243def dimensions(rucio_dataset, rucio_register_config, chunk_size, dimension_file, log_level):
244 _set_log_level(log_level)
246 ri, butler = _getRucioInterface(None, rucio_register_config, DataType.DIM_FILE)
248 _register_dims(ri, [dimension_file], chunk_size, rucio_dataset)