Coverage for python / lsst / daf / butler / datastore / stored_file_info.py: 40%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("SerializedStoredFileInfo", "StoredDatastoreItemInfo", "StoredFileInfo")
32import inspect
33from collections.abc import Iterable, Mapping
34from dataclasses import dataclass
35from typing import TYPE_CHECKING, Any
37import pydantic
39from lsst.resources import ResourcePath
40from lsst.utils import doImportType
41from lsst.utils.introspection import get_full_type_name
43from .._formatter import Formatter, FormatterParameter, FormatterV2
44from .._location import Location, LocationFactory
45from .._storage_class import StorageClass, StorageClassFactory
47if TYPE_CHECKING:
48 from .._dataset_ref import DatasetRef
50# String to use when a Python None is encountered
51NULLSTR = "__NULL_STRING__"
54class StoredDatastoreItemInfo:
55 """Internal information associated with a stored dataset in a `Datastore`.
57 This is an empty base class. Datastore implementations are expected to
58 write their own subclasses.
59 """
61 __slots__ = ()
63 def file_location(self, factory: LocationFactory) -> Location:
64 """Return the location of artifact.
66 Parameters
67 ----------
68 factory : `LocationFactory`
69 Factory relevant to the datastore represented by this item.
71 Returns
72 -------
73 location : `Location`
74 The location of the item within this datastore.
75 """
76 raise NotImplementedError("The base class does not know how to locate an item in a datastore.")
78 @classmethod
79 def from_record(cls: type[StoredDatastoreItemInfo], record: Mapping[str, Any]) -> StoredDatastoreItemInfo:
80 """Create instance from database record.
82 Parameters
83 ----------
84 record : `dict`
85 The record associated with this item.
87 Returns
88 -------
89 info : `StoredDatastoreItemInfo`
90 The newly-constructed item corresponding to the record.
91 """
92 raise NotImplementedError()
94 def to_record(self, **kwargs: Any) -> dict[str, Any]:
95 """Convert record contents to a dictionary.
97 Parameters
98 ----------
99 **kwargs
100 Additional items to add to returned record.
101 """
102 raise NotImplementedError()
104 def update(self, **kwargs: Any) -> StoredDatastoreItemInfo:
105 """Create a new class with everything retained apart from the
106 specified values.
108 Parameters
109 ----------
110 **kwargs : `~collections.abc.Mapping`
111 Values to override.
113 Returns
114 -------
115 updated : `StoredDatastoreItemInfo`
116 A new instance of the object with updated values.
117 """
118 raise NotImplementedError()
120 @classmethod
121 def to_records(
122 cls, records: Iterable[StoredDatastoreItemInfo], **kwargs: Any
123 ) -> tuple[str, Iterable[Mapping[str, Any]]]:
124 """Convert a collection of records to dictionaries.
126 Parameters
127 ----------
128 records : `~collections.abc.Iterable` [ `StoredDatastoreItemInfo` ]
129 A collection of records, all records must be of the same type.
130 **kwargs
131 Additional items to add to each returned record.
133 Returns
134 -------
135 class_name : `str`
136 Name of the record class.
137 records : `list` [ `dict` ]
138 Records in their dictionary representation.
139 """
140 if not records:
141 return "", []
142 classes = {record.__class__ for record in records}
143 assert len(classes) == 1, f"Records have to be of the same class: {classes}"
144 return get_full_type_name(classes.pop()), [record.to_record(**kwargs) for record in records]
146 @classmethod
147 def from_records(
148 cls, class_name: str, records: Iterable[Mapping[str, Any]]
149 ) -> list[StoredDatastoreItemInfo]:
150 """Convert collection of dictionaries to records.
152 Parameters
153 ----------
154 class_name : `str`
155 Name of the record class.
156 records : `~collections.abc.Iterable` [ `dict` ]
157 Records in their dictionary representation.
159 Returns
160 -------
161 infos : `list` [`StoredDatastoreItemInfo`]
162 Sequence of records converted to typed representation.
164 Raises
165 ------
166 TypeError
167 Raised if ``class_name`` is not a sub-class of
168 `StoredDatastoreItemInfo`.
169 """
170 try:
171 klass = doImportType(class_name)
172 except ImportError:
173 # Prior to DM-41043 we were embedding a lsst.daf.butler.core
174 # path in the serialized form, which we never wanted; fix this
175 # one case.
176 if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo":
177 klass = StoredFileInfo
178 else:
179 raise
180 if not issubclass(klass, StoredDatastoreItemInfo):
181 raise TypeError(f"Class {class_name} is not a subclass of StoredDatastoreItemInfo")
182 return [klass.from_record(record) for record in records]
185@dataclass(frozen=True, slots=True)
186class StoredFileInfo(StoredDatastoreItemInfo):
187 """Datastore-private metadata associated with a Datastore file.
189 Parameters
190 ----------
191 formatter : `Formatter` or `FormatterV2` or `str`
192 The formatter to use for this dataset.
193 path : `str`
194 Path to the artifact associated with this dataset.
195 storageClass : `StorageClass` or `None`
196 The storage class associated with this dataset. If `None`,
197 ``storage_class_name`` must be provided as a keyword argument.
198 component : `str` or `None`, optional
199 The component if disassembled.
200 checksum : `str` or `None`, optional
201 The checksum of the artifact.
202 file_size : `int`
203 The size of the file in bytes. -1 indicates the size is not known.
204 storage_class_name : `str`, optional
205 Name of the storage class. This may be passed instead of
206 ``storageClass`` to defer loading storage class definitions (e.g. if a
207 butler configuration may not have been loaded yet). Note that
208 ``storageClass=None`` must be passed explicitly (for backward
209 compatibility, it remains a positional argument with no default).
210 """
212 def __init__(
213 self,
214 formatter: FormatterParameter,
215 path: str,
216 storageClass: StorageClass | None,
217 component: str | None,
218 checksum: str | None,
219 file_size: int,
220 *,
221 storage_class_name: str | None = None,
222 ):
223 # Use these shenanigans to allow us to use a frozen dataclass
224 object.__setattr__(self, "path", path)
225 if storageClass is not None:
226 object.__setattr__(self, "storage_class_name", storageClass.name)
227 else:
228 if storage_class_name is None:
229 raise TypeError("At least one of 'storageClass' and 'storage_class_name' must be provided.")
230 object.__setattr__(self, "storage_class_name", storage_class_name)
231 object.__setattr__(self, "component", component)
232 object.__setattr__(self, "checksum", checksum)
233 object.__setattr__(self, "file_size", file_size)
235 if isinstance(formatter, str):
236 # We trust that this string refers to a Formatter
237 formatterStr = formatter
238 elif isinstance(formatter, Formatter | FormatterV2) or (
239 inspect.isclass(formatter) and issubclass(formatter, Formatter | FormatterV2)
240 ):
241 formatterStr = formatter.name()
242 else:
243 raise TypeError(f"Supplied formatter '{formatter}' is not a Formatter")
244 object.__setattr__(self, "formatter", formatterStr)
246 formatter: str
247 """Fully-qualified name of Formatter. If a Formatter class or instance
248 is given the name will be extracted."""
250 path: str
251 """Path to dataset within Datastore."""
253 storage_class_name: str
254 """Name of the storage class associated with this dataset."""
256 component: str | None
257 """Component associated with this file. Can be `None` if the file does
258 not refer to a component of a composite."""
260 checksum: str | None
261 """Checksum of the serialized dataset."""
263 file_size: int
264 """Size of the serialized dataset in bytes."""
266 @property
267 def storageClass(self) -> StorageClass:
268 """Storage class associated with this dataset."""
269 return StorageClassFactory().getStorageClass(self.storage_class_name)
271 def rebase(self, ref: DatasetRef) -> StoredFileInfo:
272 """Return a copy of the record suitable for a specified reference.
274 Parameters
275 ----------
276 ref : `DatasetRef`
277 DatasetRef which provides component name and dataset ID for the
278 new returned record.
280 Returns
281 -------
282 record : `StoredFileInfo`
283 New record instance.
284 """
285 # take component from the ref, rest comes from self
286 component = ref.datasetType.component()
287 if component is None:
288 component = self.component
289 return self.update(component=component)
291 def to_record(self, **kwargs: Any) -> dict[str, Any]:
292 """Convert the supplied ref to a database record.
294 Parameters
295 ----------
296 **kwargs : `typing.Any`
297 Additional information to be added to the record.
298 """
299 component = self.component
300 if component is None:
301 # Use empty string since we want this to be part of the
302 # primary key.
303 component = NULLSTR
304 return dict(
305 formatter=self.formatter,
306 path=self.path,
307 storage_class=self.storage_class_name,
308 component=component,
309 checksum=self.checksum,
310 file_size=self.file_size,
311 **kwargs,
312 )
314 def to_simple(self) -> SerializedStoredFileInfo:
315 record = self.to_record()
316 # We allow None on the model but the record contains a "null string"
317 # instead
318 record["component"] = self.component
319 return SerializedStoredFileInfo.model_validate(record)
321 def file_location(self, factory: LocationFactory) -> Location:
322 """Return the location of artifact.
324 Parameters
325 ----------
326 factory : `LocationFactory`
327 Factory relevant to the datastore represented by this item.
329 Returns
330 -------
331 location : `Location`
332 The location of the item within this datastore.
333 """
334 uriInStore = ResourcePath(self.path, forceAbsolute=False, forceDirectory=False)
335 if uriInStore.isabs():
336 location = Location(None, uriInStore)
337 else:
338 location = factory.from_uri(uriInStore, trusted_path=True)
339 return location
341 @classmethod
342 def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredFileInfo:
343 """Create instance from database record.
345 Parameters
346 ----------
347 record : `dict`
348 The record associated with this item.
350 Returns
351 -------
352 info : `StoredFileInfo`
353 The newly-constructed item corresponding to the record.
354 """
355 # Convert name of StorageClass to instance
356 component = record["component"] if (record["component"] and record["component"] != NULLSTR) else None
357 info = cls(
358 formatter=record["formatter"],
359 path=record["path"],
360 storageClass=None,
361 storage_class_name=record["storage_class"],
362 component=component,
363 checksum=record["checksum"],
364 file_size=record["file_size"],
365 )
366 return info
368 @classmethod
369 def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo:
370 return cls.from_record(dict(model))
372 def update(self, **kwargs: Any) -> StoredFileInfo:
373 new_args: dict[str, Any] = {"storageClass": None} # so `storage_class_name` can be passed.
374 for k in self.__slots__:
375 if k in kwargs:
376 new_args[k] = kwargs.pop(k)
377 else:
378 new_args[k] = getattr(self, k)
379 if kwargs:
380 raise ValueError(f"Unexpected keyword arguments for update: {', '.join(kwargs)}")
381 return type(self)(**new_args)
383 def __reduce__(self) -> str | tuple[Any, ...]:
384 return (self.from_record, (self.to_record(),))
386 @property
387 def artifact_path(self) -> str:
388 """Path to dataset as stored in Datastore with fragments removed."""
389 if "#" in self.path:
390 return self.path[: self.path.rfind("#")]
391 return self.path
394class SerializedStoredFileInfo(pydantic.BaseModel):
395 """Serialized representation of `StoredFileInfo` properties."""
397 formatter: str
398 """Fully-qualified name of Formatter."""
400 path: str
401 """Path to dataset within Datastore."""
403 storage_class: str
404 """Name of the StorageClass associated with Dataset."""
406 component: str | None = None
407 """Component associated with this file. Can be `None` if the file does
408 not refer to a component of a composite."""
410 checksum: str | None = None
411 """Checksum of the serialized dataset."""
413 file_size: int
414 """Size of the serialized dataset in bytes."""
417def make_datastore_path_relative(path: str) -> str:
418 """Normalize a path from a `StoredFileInfo` object so
419 that it is always relative.
421 Parameters
422 ----------
423 path : `str`
424 The file path from a `StoredFileInfo`.
426 Returns
427 -------
428 normalized_path : `str`
429 The original path, if it was relative. Otherwise, a version of it that
430 was converted to a relative path, stripping URI scheme and netloc from
431 it.
432 """
433 # Force the datastore file path sent to the client to be relative, since
434 # absolute URLs in the server will generally not be reachable by the
435 # client. If an absolute URL is sent, it (or a portion of it) can end up
436 # baked into the FileDatastore that is the target of the transfer in some
437 # cases.
438 rpath = ResourcePath(path, forceAbsolute=False, forceDirectory=False)
439 if rpath.isabs():
440 relative = rpath.relativeToPathRoot
441 if rpath.fragment:
442 # Preserve the fragment, since this used to indicate special
443 # processing like zip extraction.
444 return f"{relative}#{rpath.fragment}"
445 else:
446 return relative
447 else:
448 return path