Coverage for python / lsst / rucio / register / rucio_interface.py: 14%
185 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
1# This file is part of rucio_register
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import hashlib
23import logging
24import random
25import time
26import zlib
28import rucio.common.exception
29from rucio.client.didclient import DIDClient
30from rucio.client.replicaclient import ReplicaClient
32import lsst.daf.butler
33from lsst.daf.butler import DatasetRef
34from lsst.resources import ResourcePath
35from lsst.rucio.register.resource_bundle import ResourceBundle
36from lsst.rucio.register.rubin_meta import RubinMeta
37from lsst.rucio.register.rucio_did import RucioDID
39__all__ = ["RucioInterface"]
41logger = logging.getLogger(__name__)
44class RucioInterface:
45 """Add files as replicas in Rucio, along with metadata,
46 and attach them to datasets.
48 Parameters
49 ----------
50 butler : `lsst.daf.butler.Butler`
51 Butler we're operating upon
52 rucio_rse : `str`
53 Name of the RSE that the files live in.
54 scope : `str`
55 Rucio scope to register the files in.
56 rse_root : `str`
57 Full path to root directory of RSE directory structure
58 dtn_url : `str`
59 Base URL of the data transfer node for the Rucio physical filename.
60 rubin_butler_type: `str`
61 the type registered in "rubin_butler" metadata for rucio
62 """
64 def __init__(
65 self,
66 butler: lsst.daf.butler.Butler,
67 rucio_rse: str,
68 scope: str,
69 rse_root: str,
70 dtn_url: str,
71 rubin_butler_type: str,
72 ):
73 self.butler = butler
74 self.rse = rucio_rse
75 self.scope = scope
76 self.rse_root = rse_root
77 self.dtn_url = dtn_url
78 self.pfn_base = f"{dtn_url}"
79 self.replica_client = ReplicaClient()
80 self.did_client = DIDClient()
81 self.rubin_butler_type = rubin_butler_type
83 def _make_dataset_ref_bundle(self, dataset_id: str, dataset_ref: DatasetRef) -> ResourceBundle:
84 """Make a ResourceBundle
86 Parameters
87 ----------
88 dataset_id : `str`
89 Rucio dataset name
90 dataset_ref : `DatasetRef`
91 Butler DatasetRef
93 Returns
94 -------
95 rb : `ResourceBundle`
96 ResourceBundle consolidating dataset id and DatasetRef
97 """
98 logging.debug("%s", dataset_ref.to_json())
99 did = self._make_did(self.butler.getURI(dataset_ref), dataset_ref.to_json())
100 rb = ResourceBundle(dataset_id=dataset_id, did=did)
101 return rb
103 def _make_zip_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle:
104 """Make a ResourceBundle
106 Parameters
107 ----------
108 dataset_id : `str`
109 Rucio dataset name
110 resouce_path : `ResourcePath`
111 ResourcePath to a file
113 Returns
114 -------
115 rb: ResourceBundle
116 ResourceBundle consolidating dataset id and ResourcePath
117 """
118 did = self._make_did(resource_path)
119 rb = ResourceBundle(dataset_id=dataset_id, did=did)
120 return rb
122 def _make_dim_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle:
123 """Make a ResourceBundle
125 Parameters
126 ----------
127 dataset_id : `str`
128 Rucio dataset name
129 resouce_path : `lsst.resource.ResourcePath`
130 ResourcePath to a file
132 Returns
133 -------
134 rb: `lsst.rucio.register.rucio_bundle.ResourceBundle`
135 ResourceBundle consolidating dataset id and ResourcePath
136 """
137 did = self._make_did(resource_path)
138 rb = ResourceBundle(dataset_id=dataset_id, did=did)
139 return rb
141 def compute_hashes(self, resource_path: ResourcePath) -> tuple[int, str, str]:
142 """Compute the length, MD5, and Adler32 hashes for a file.
144 Parameters
145 ----------
146 path: `lsst.resources.ResourcePath`
147 Path to the file.
149 Returns
150 -------
151 hashes: `tuple` [ `int`, `str`, `str` ]
152 Size in bytes, MD5 hex, and Adler32 hex hashes.
153 """
154 size = 0
155 md5 = hashlib.md5()
156 adler32 = zlib.adler32(b"")
157 buffer_size = 10 * 1024 * 1024
158 with resource_path.open("rb") as f:
159 while buffer := f.read(buffer_size):
160 size += len(buffer)
161 md5.update(buffer)
162 adler32 = zlib.adler32(buffer, adler32)
163 md5_digest = md5.hexdigest()
164 adler32_digest = f"{adler32:08x}"
165 return (size, md5_digest, adler32_digest)
167 def _make_did(self, resource_path: ResourcePath, metadata: str = None) -> RucioDID:
168 """Make a Rucio data identifier dictionary from a resource.
170 Parameters
171 ----------
172 resource_path: ResourcePath
173 ResourcePath object
175 metadata: `str`
176 String containing Rubin dataset specific metadata
178 Returns
179 -------
180 did : `dict` [`str`, `str`|`int`]
181 Rucio data identifier including physical and logical names,
182 byte length, adler32 and MD5 checksums, meta, and scope.
183 """
185 size, md5, adler32 = self.compute_hashes(resource_path)
186 path = resource_path.unquoted_path.removeprefix(self.rse_root)
187 pfn = self.pfn_base + path
188 logging.debug("pfn=%s", pfn)
189 name = path.removeprefix("/" + self.scope + "/")
190 logging.debug("name=%s", name)
191 logging.debug("path=%s", path)
193 if metadata:
194 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar=metadata)
195 else:
196 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar="")
197 d = RucioDID(
198 pfn=pfn,
199 bytes=size,
200 adler32=adler32,
201 md5=md5,
202 name=name,
203 scope=self.scope,
204 meta=meta,
205 )
207 return d
209 def _add_replicas(self, bundles: list[ResourceBundle]) -> None:
210 """Call the Rucio method add_replica for a list of DIDs
212 Parameters
213 ----------
214 bundles : `list` [`ResourceBundle`]
215 A list of ResourceBundles
216 """
217 dids = [bundle.get_did() for bundle in bundles]
218 retries = 0
219 max_retries = 5
220 while True:
221 try:
222 self.replica_client.add_replicas(rse=self.rse, files=dids)
223 break
224 except rucio.common.exception.RucioException:
225 retries += 1
226 if retries < max_retries:
227 seconds = random.randint(10, 20)
228 logger.debug("failed to add_replicas; sleeping %d seconds", seconds)
229 time.sleep(seconds)
230 self.replica_client = ReplicaClient() # XXX not sure we need to do this.
231 else:
232 raise Exception(f"Tried {max_retries} times and couldn't add_replicas")
234 def _add_file_to_dataset_with_retries(self, dataset_id, did):
235 retries = 0
236 max_retries = 5
237 while True:
238 try:
239 self.did_client.add_files_to_dataset(
240 scope=self.scope, name=dataset_id, files=[did], rse=self.rse
241 )
242 break
243 except rucio.common.exception.FileAlreadyExists:
244 if "pfn" in did:
245 logger.debug("file %s already registered in dataset %s", did["pfn"], dataset_id)
246 return # we can return, because it's already in the dataset
247 except rucio.common.exception.RucioException:
248 retries += 1
249 if retries < max_retries:
250 seconds = random.randint(10, 20)
251 logger.debug("failed to register one did to %s; sleeping %d seconds", dataset_id, seconds)
252 time.sleep(seconds)
253 self.did_client = DIDClient() # XXX not sure we need to do this.
254 else:
255 # we tried max_retries times, and failed, so we'll bail out
256 raise Exception(f"Couldn't add {did['pfn']} to dataset {dataset_id}")
258 def _add_files_to_dataset(self, dataset_id: str, dids: list[dict]) -> None:
259 """Attach a list of files specified by Rucio DIDs to a Rucio dataset.
261 Ignores already-attached files for idempotency.
263 Parameters
264 ----------
265 dataset_id : `str`
266 Logical name of the Rucio dataset.
267 dids : `list` [`dict` [`str`, `str`|`int`] ]
268 List of Rucio data identifiers.
269 """
270 retries = 0
271 max_retries = 5
272 while True:
273 try:
274 self.did_client.add_files_to_dataset(
275 scope=self.scope,
276 name=dataset_id,
277 files=dids,
278 rse=self.rse,
279 )
280 return
281 except rucio.common.exception.FileAlreadyExists:
282 # At least one already is in the dataset.
283 # This shouldn't happen, but if it does,
284 # we have to retry each individually.
285 for did in dids:
286 self._add_file_to_dataset_with_retries(
287 dataset_id=dataset_id,
288 did=did,
289 )
290 return
291 except rucio.common.exception.DataIdentifierNotFound as e:
292 raise e
293 except rucio.common.exception.RucioException:
294 retries += 1
295 if retries < max_retries:
296 seconds = random.randint(10, 20)
297 logger.debug("failed to register dids to %s; sleeping %d", dataset_id, seconds)
298 time.sleep(seconds)
299 continue
300 else:
301 raise Exception(f"Couldn't add files to dataset {dataset_id}")
303 def _add_dataset_with_retries(self, dataset_id: str, statuses: dict) -> None:
304 retries = 0
305 max_retries = 5
306 while True:
307 try:
308 self.did_client.add_dataset(
309 scope=self.scope,
310 name=dataset_id,
311 statuses=statuses,
312 rse=self.rse,
313 )
314 return
315 except rucio.common.exception.DataIdentifierAlreadyExists as e:
316 # If someone else created it in the meantime
317 raise e
318 except rucio.common.exception.RucioException:
319 retries += 1
320 if retries < max_retries:
321 seconds = random.randint(10, 20)
322 logger.debug("couldn't register dids to %s; waiting %d", dataset_id, seconds)
323 time.sleep(seconds)
324 continue
325 else:
326 raise Exception(f"Tried {max_retries} times and couldn't add dataset {dataset_id}")
328 def register_to_dataset(self, bundles) -> None:
329 """Register a list of files in Rucio.
331 Parameters
332 ----------
333 bundles : `list` [`ResourceBundle`]
334 List of resource bundles
335 """
336 logger.debug("register to dataset")
338 datasets = dict()
339 for bundle in bundles:
340 dataset_id = bundle.dataset_id
341 datasets.setdefault(dataset_id, []).append(bundle)
343 for dataset_id, bundles in datasets.items():
344 try:
345 dids = [rb.get_did() for rb in bundles]
346 names = [did["pfn"] for did in dids]
347 logger.info("Registering %s in dataset %s, RSE %s", names, dataset_id, self.rse)
348 self._add_files_to_dataset(dataset_id, dids)
349 except rucio.common.exception.DataIdentifierNotFound:
350 # No such dataset, so create it
351 try:
352 logger.info("Creating Rucio dataset %s", dataset_id)
353 self._add_dataset_with_retries(
354 dataset_id=dataset_id,
355 statuses={"monotonic": True},
356 )
357 except rucio.common.exception.DataIdentifierAlreadyExists:
358 # If someone else created it in the meantime
359 pass
360 # And then retry adding DIDs
361 self._add_files_to_dataset(dataset_id, dids)
363 logger.debug("Done with Rucio for %s", bundles)
365 def register_as_replicas(self, dataset_id, dataset_refs) -> None:
366 """Register a list of DatasetRefs to a Rucio dataset
368 Parameters
369 ----------
370 dataset_id : `str`
371 RUCIO dataset id
372 dataset_refs : `list` [`DatasetRef`]
373 list of Butler DatasetRefs
374 """
375 bundles = []
376 for dataset_ref in dataset_refs:
377 if type(dataset_ref) is list:
378 for dsr in dataset_ref:
379 bundles.append(self._make_dataset_ref_bundle(dataset_id, dsr))
380 else:
381 bundles.append(self._make_dataset_ref_bundle(dataset_id, dataset_ref))
382 if len(bundles) == 0:
383 return 0
384 self._add_replicas(bundles)
385 self.register_to_dataset(bundles)
386 return len(bundles)
388 def register_zips(self, dataset_id: str, zip_files: list) -> int:
389 """Register a list of zips to a Rucio Dataset
391 Parameters
392 ----------
393 dataset_id : `str`
394 RUCIO dataset id
395 zip_files : `list` [`ResourcePath`]
396 list of ResourcePath
398 Returns
399 -------
400 num : `int`
401 number of zip files ingested
402 """
403 bundles = []
404 for zip_file in zip_files:
405 bundles.append(self._make_zip_bundle(dataset_id, zip_file))
406 self._add_replicas(bundles)
407 self.register_to_dataset(bundles)
408 return len(bundles)
410 def register_dims(self, dataset_id: str, dim_files: list) -> int:
411 """Register a list of dimension files to a Rucio Dataset
413 Parameters
414 ----------
415 dataset_id : `str`
416 RUCIO dataset id
417 dim_files : `list` [`lsst.resource.ResourcePath`]
418 list of ResourcePath
420 Returns
421 -------
422 num : `int`
423 number of dimension files ingested
424 """
425 bundles = []
426 for dim_file in dim_files:
427 bundles.append(self._make_dim_bundle(dataset_id, dim_file))
428 self._add_replicas(bundles)
429 self.register_to_dataset(bundles)
430 return len(bundles)