Coverage for python / lsst / rucio / register / rucio_interface.py: 14%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:42 +0000

1# This file is part of rucio_register 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import hashlib 

23import logging 

24import random 

25import time 

26import zlib 

27 

28import rucio.common.exception 

29from rucio.client.didclient import DIDClient 

30from rucio.client.replicaclient import ReplicaClient 

31 

32import lsst.daf.butler 

33from lsst.daf.butler import DatasetRef 

34from lsst.resources import ResourcePath 

35from lsst.rucio.register.resource_bundle import ResourceBundle 

36from lsst.rucio.register.rubin_meta import RubinMeta 

37from lsst.rucio.register.rucio_did import RucioDID 

38 

39__all__ = ["RucioInterface"] 

40 

41logger = logging.getLogger(__name__) 

42 

43 

44class RucioInterface: 

45 """Add files as replicas in Rucio, along with metadata, 

46 and attach them to datasets. 

47 

48 Parameters 

49 ---------- 

50 butler : `lsst.daf.butler.Butler` 

51 Butler we're operating upon 

52 rucio_rse : `str` 

53 Name of the RSE that the files live in. 

54 scope : `str` 

55 Rucio scope to register the files in. 

56 rse_root : `str` 

57 Full path to root directory of RSE directory structure 

58 dtn_url : `str` 

59 Base URL of the data transfer node for the Rucio physical filename. 

60 rubin_butler_type: `str` 

61 the type registered in "rubin_butler" metadata for rucio 

62 """ 

63 

64 def __init__( 

65 self, 

66 butler: lsst.daf.butler.Butler, 

67 rucio_rse: str, 

68 scope: str, 

69 rse_root: str, 

70 dtn_url: str, 

71 rubin_butler_type: str, 

72 ): 

73 self.butler = butler 

74 self.rse = rucio_rse 

75 self.scope = scope 

76 self.rse_root = rse_root 

77 self.dtn_url = dtn_url 

78 self.pfn_base = f"{dtn_url}" 

79 self.replica_client = ReplicaClient() 

80 self.did_client = DIDClient() 

81 self.rubin_butler_type = rubin_butler_type 

82 

83 def _make_dataset_ref_bundle(self, dataset_id: str, dataset_ref: DatasetRef) -> ResourceBundle: 

84 """Make a ResourceBundle 

85 

86 Parameters 

87 ---------- 

88 dataset_id : `str` 

89 Rucio dataset name 

90 dataset_ref : `DatasetRef` 

91 Butler DatasetRef 

92 

93 Returns 

94 ------- 

95 rb : `ResourceBundle` 

96 ResourceBundle consolidating dataset id and DatasetRef 

97 """ 

98 logging.debug("%s", dataset_ref.to_json()) 

99 did = self._make_did(self.butler.getURI(dataset_ref), dataset_ref.to_json()) 

100 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

101 return rb 

102 

103 def _make_zip_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle: 

104 """Make a ResourceBundle 

105 

106 Parameters 

107 ---------- 

108 dataset_id : `str` 

109 Rucio dataset name 

110 resouce_path : `ResourcePath` 

111 ResourcePath to a file 

112 

113 Returns 

114 ------- 

115 rb: ResourceBundle 

116 ResourceBundle consolidating dataset id and ResourcePath 

117 """ 

118 did = self._make_did(resource_path) 

119 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

120 return rb 

121 

122 def _make_dim_bundle(self, dataset_id: str, resource_path: ResourcePath) -> ResourceBundle: 

123 """Make a ResourceBundle 

124 

125 Parameters 

126 ---------- 

127 dataset_id : `str` 

128 Rucio dataset name 

129 resouce_path : `lsst.resource.ResourcePath` 

130 ResourcePath to a file 

131 

132 Returns 

133 ------- 

134 rb: `lsst.rucio.register.rucio_bundle.ResourceBundle` 

135 ResourceBundle consolidating dataset id and ResourcePath 

136 """ 

137 did = self._make_did(resource_path) 

138 rb = ResourceBundle(dataset_id=dataset_id, did=did) 

139 return rb 

140 

141 def compute_hashes(self, resource_path: ResourcePath) -> tuple[int, str, str]: 

142 """Compute the length, MD5, and Adler32 hashes for a file. 

143 

144 Parameters 

145 ---------- 

146 path: `lsst.resources.ResourcePath` 

147 Path to the file. 

148 

149 Returns 

150 ------- 

151 hashes: `tuple` [ `int`, `str`, `str` ] 

152 Size in bytes, MD5 hex, and Adler32 hex hashes. 

153 """ 

154 size = 0 

155 md5 = hashlib.md5() 

156 adler32 = zlib.adler32(b"") 

157 buffer_size = 10 * 1024 * 1024 

158 with resource_path.open("rb") as f: 

159 while buffer := f.read(buffer_size): 

160 size += len(buffer) 

161 md5.update(buffer) 

162 adler32 = zlib.adler32(buffer, adler32) 

163 md5_digest = md5.hexdigest() 

164 adler32_digest = f"{adler32:08x}" 

165 return (size, md5_digest, adler32_digest) 

166 

167 def _make_did(self, resource_path: ResourcePath, metadata: str = None) -> RucioDID: 

168 """Make a Rucio data identifier dictionary from a resource. 

169 

170 Parameters 

171 ---------- 

172 resource_path: ResourcePath 

173 ResourcePath object 

174 

175 metadata: `str` 

176 String containing Rubin dataset specific metadata 

177 

178 Returns 

179 ------- 

180 did : `dict` [`str`, `str`|`int`] 

181 Rucio data identifier including physical and logical names, 

182 byte length, adler32 and MD5 checksums, meta, and scope. 

183 """ 

184 

185 size, md5, adler32 = self.compute_hashes(resource_path) 

186 path = resource_path.unquoted_path.removeprefix(self.rse_root) 

187 pfn = self.pfn_base + path 

188 logging.debug("pfn=%s", pfn) 

189 name = path.removeprefix("/" + self.scope + "/") 

190 logging.debug("name=%s", name) 

191 logging.debug("path=%s", path) 

192 

193 if metadata: 

194 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar=metadata) 

195 else: 

196 meta = RubinMeta(rubin_butler=self.rubin_butler_type, rubin_sidecar="") 

197 d = RucioDID( 

198 pfn=pfn, 

199 bytes=size, 

200 adler32=adler32, 

201 md5=md5, 

202 name=name, 

203 scope=self.scope, 

204 meta=meta, 

205 ) 

206 

207 return d 

208 

209 def _add_replicas(self, bundles: list[ResourceBundle]) -> None: 

210 """Call the Rucio method add_replica for a list of DIDs 

211 

212 Parameters 

213 ---------- 

214 bundles : `list` [`ResourceBundle`] 

215 A list of ResourceBundles 

216 """ 

217 dids = [bundle.get_did() for bundle in bundles] 

218 retries = 0 

219 max_retries = 5 

220 while True: 

221 try: 

222 self.replica_client.add_replicas(rse=self.rse, files=dids) 

223 break 

224 except rucio.common.exception.RucioException: 

225 retries += 1 

226 if retries < max_retries: 

227 seconds = random.randint(10, 20) 

228 logger.debug("failed to add_replicas; sleeping %d seconds", seconds) 

229 time.sleep(seconds) 

230 self.replica_client = ReplicaClient() # XXX not sure we need to do this. 

231 else: 

232 raise Exception(f"Tried {max_retries} times and couldn't add_replicas") 

233 

234 def _add_file_to_dataset_with_retries(self, dataset_id, did): 

235 retries = 0 

236 max_retries = 5 

237 while True: 

238 try: 

239 self.did_client.add_files_to_dataset( 

240 scope=self.scope, name=dataset_id, files=[did], rse=self.rse 

241 ) 

242 break 

243 except rucio.common.exception.FileAlreadyExists: 

244 if "pfn" in did: 

245 logger.debug("file %s already registered in dataset %s", did["pfn"], dataset_id) 

246 return # we can return, because it's already in the dataset 

247 except rucio.common.exception.RucioException: 

248 retries += 1 

249 if retries < max_retries: 

250 seconds = random.randint(10, 20) 

251 logger.debug("failed to register one did to %s; sleeping %d seconds", dataset_id, seconds) 

252 time.sleep(seconds) 

253 self.did_client = DIDClient() # XXX not sure we need to do this. 

254 else: 

255 # we tried max_retries times, and failed, so we'll bail out 

256 raise Exception(f"Couldn't add {did['pfn']} to dataset {dataset_id}") 

257 

258 def _add_files_to_dataset(self, dataset_id: str, dids: list[dict]) -> None: 

259 """Attach a list of files specified by Rucio DIDs to a Rucio dataset. 

260 

261 Ignores already-attached files for idempotency. 

262 

263 Parameters 

264 ---------- 

265 dataset_id : `str` 

266 Logical name of the Rucio dataset. 

267 dids : `list` [`dict` [`str`, `str`|`int`] ] 

268 List of Rucio data identifiers. 

269 """ 

270 retries = 0 

271 max_retries = 5 

272 while True: 

273 try: 

274 self.did_client.add_files_to_dataset( 

275 scope=self.scope, 

276 name=dataset_id, 

277 files=dids, 

278 rse=self.rse, 

279 ) 

280 return 

281 except rucio.common.exception.FileAlreadyExists: 

282 # At least one already is in the dataset. 

283 # This shouldn't happen, but if it does, 

284 # we have to retry each individually. 

285 for did in dids: 

286 self._add_file_to_dataset_with_retries( 

287 dataset_id=dataset_id, 

288 did=did, 

289 ) 

290 return 

291 except rucio.common.exception.DataIdentifierNotFound as e: 

292 raise e 

293 except rucio.common.exception.RucioException: 

294 retries += 1 

295 if retries < max_retries: 

296 seconds = random.randint(10, 20) 

297 logger.debug("failed to register dids to %s; sleeping %d", dataset_id, seconds) 

298 time.sleep(seconds) 

299 continue 

300 else: 

301 raise Exception(f"Couldn't add files to dataset {dataset_id}") 

302 

303 def _add_dataset_with_retries(self, dataset_id: str, statuses: dict) -> None: 

304 retries = 0 

305 max_retries = 5 

306 while True: 

307 try: 

308 self.did_client.add_dataset( 

309 scope=self.scope, 

310 name=dataset_id, 

311 statuses=statuses, 

312 rse=self.rse, 

313 ) 

314 return 

315 except rucio.common.exception.DataIdentifierAlreadyExists as e: 

316 # If someone else created it in the meantime 

317 raise e 

318 except rucio.common.exception.RucioException: 

319 retries += 1 

320 if retries < max_retries: 

321 seconds = random.randint(10, 20) 

322 logger.debug("couldn't register dids to %s; waiting %d", dataset_id, seconds) 

323 time.sleep(seconds) 

324 continue 

325 else: 

326 raise Exception(f"Tried {max_retries} times and couldn't add dataset {dataset_id}") 

327 

328 def register_to_dataset(self, bundles) -> None: 

329 """Register a list of files in Rucio. 

330 

331 Parameters 

332 ---------- 

333 bundles : `list` [`ResourceBundle`] 

334 List of resource bundles 

335 """ 

336 logger.debug("register to dataset") 

337 

338 datasets = dict() 

339 for bundle in bundles: 

340 dataset_id = bundle.dataset_id 

341 datasets.setdefault(dataset_id, []).append(bundle) 

342 

343 for dataset_id, bundles in datasets.items(): 

344 try: 

345 dids = [rb.get_did() for rb in bundles] 

346 names = [did["pfn"] for did in dids] 

347 logger.info("Registering %s in dataset %s, RSE %s", names, dataset_id, self.rse) 

348 self._add_files_to_dataset(dataset_id, dids) 

349 except rucio.common.exception.DataIdentifierNotFound: 

350 # No such dataset, so create it 

351 try: 

352 logger.info("Creating Rucio dataset %s", dataset_id) 

353 self._add_dataset_with_retries( 

354 dataset_id=dataset_id, 

355 statuses={"monotonic": True}, 

356 ) 

357 except rucio.common.exception.DataIdentifierAlreadyExists: 

358 # If someone else created it in the meantime 

359 pass 

360 # And then retry adding DIDs 

361 self._add_files_to_dataset(dataset_id, dids) 

362 

363 logger.debug("Done with Rucio for %s", bundles) 

364 

365 def register_as_replicas(self, dataset_id, dataset_refs) -> None: 

366 """Register a list of DatasetRefs to a Rucio dataset 

367 

368 Parameters 

369 ---------- 

370 dataset_id : `str` 

371 RUCIO dataset id 

372 dataset_refs : `list` [`DatasetRef`] 

373 list of Butler DatasetRefs 

374 """ 

375 bundles = [] 

376 for dataset_ref in dataset_refs: 

377 if type(dataset_ref) is list: 

378 for dsr in dataset_ref: 

379 bundles.append(self._make_dataset_ref_bundle(dataset_id, dsr)) 

380 else: 

381 bundles.append(self._make_dataset_ref_bundle(dataset_id, dataset_ref)) 

382 if len(bundles) == 0: 

383 return 0 

384 self._add_replicas(bundles) 

385 self.register_to_dataset(bundles) 

386 return len(bundles) 

387 

388 def register_zips(self, dataset_id: str, zip_files: list) -> int: 

389 """Register a list of zips to a Rucio Dataset 

390 

391 Parameters 

392 ---------- 

393 dataset_id : `str` 

394 RUCIO dataset id 

395 zip_files : `list` [`ResourcePath`] 

396 list of ResourcePath 

397 

398 Returns 

399 ------- 

400 num : `int` 

401 number of zip files ingested 

402 """ 

403 bundles = [] 

404 for zip_file in zip_files: 

405 bundles.append(self._make_zip_bundle(dataset_id, zip_file)) 

406 self._add_replicas(bundles) 

407 self.register_to_dataset(bundles) 

408 return len(bundles) 

409 

410 def register_dims(self, dataset_id: str, dim_files: list) -> int: 

411 """Register a list of dimension files to a Rucio Dataset 

412 

413 Parameters 

414 ---------- 

415 dataset_id : `str` 

416 RUCIO dataset id 

417 dim_files : `list` [`lsst.resource.ResourcePath`] 

418 list of ResourcePath 

419 

420 Returns 

421 ------- 

422 num : `int` 

423 number of dimension files ingested 

424 """ 

425 bundles = [] 

426 for dim_file in dim_files: 

427 bundles.append(self._make_dim_bundle(dataset_id, dim_file)) 

428 self._add_replicas(bundles) 

429 self.register_to_dataset(bundles) 

430 return len(bundles)