Coverage for python / lsst / daf / butler / remote_butler / _remote_butler_collections.py: 0%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("RemoteButlerCollections",)
32from collections.abc import Iterable, Sequence, Set
34from lsst.utils.iteration import ensure_iterable
36from .._butler_collections import ButlerCollections, CollectionInfo
37from .._collection_type import CollectionType
38from .._dataset_type import DatasetType
39from ..utils import has_globs
40from ._collection_args import convert_collection_arg_to_glob_string_list
41from ._defaults import DefaultsHolder
42from ._http_connection import RemoteButlerHttpConnection, parse_model
43from ._ref_utils import normalize_dataset_type_name
44from .server_models import QueryCollectionInfoRequestModel, QueryCollectionInfoResponseModel
47class RemoteButlerCollections(ButlerCollections):
48 """Implementation of ButlerCollections for RemoteButler.
50 Parameters
51 ----------
52 defaults : `DefaultsHolder`
53 Registry object used to look up default collections.
54 connection : `RemoteButlerHttpConnection`
55 HTTP connection to Butler server.
56 """
58 def __init__(self, defaults: DefaultsHolder, connection: RemoteButlerHttpConnection):
59 self._defaults = defaults
60 self._connection = connection
62 @property
63 def defaults(self) -> Sequence[str]:
64 return self._defaults.get().collections
66 def extend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None:
67 raise NotImplementedError("Not yet available")
69 def prepend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None:
70 raise NotImplementedError("Not yet available")
72 def redefine_chain(
73 self, parent_collection_name: str, child_collection_names: str | Iterable[str]
74 ) -> None:
75 raise NotImplementedError("Not yet available")
77 def remove_from_chain(
78 self, parent_collection_name: str, child_collection_names: str | Iterable[str]
79 ) -> None:
80 raise NotImplementedError("Not yet available")
82 def query_info(
83 self,
84 expression: str | Iterable[str],
85 collection_types: Set[CollectionType] | CollectionType | None = None,
86 flatten_chains: bool = False,
87 include_chains: bool | None = None,
88 include_parents: bool = False,
89 include_summary: bool = False,
90 include_doc: bool = False,
91 summary_datasets: Iterable[DatasetType] | Iterable[str] | None = None,
92 ) -> Sequence[CollectionInfo]:
93 if collection_types is None:
94 types = list(CollectionType.all())
95 else:
96 types = list(ensure_iterable(collection_types))
98 if include_chains is None:
99 include_chains = not flatten_chains
101 if summary_datasets is None:
102 dataset_types = None
103 else:
104 dataset_types = [normalize_dataset_type_name(t) for t in summary_datasets]
106 request = QueryCollectionInfoRequestModel(
107 expression=convert_collection_arg_to_glob_string_list(expression),
108 collection_types=types,
109 flatten_chains=flatten_chains,
110 include_chains=include_chains,
111 include_parents=include_parents,
112 include_summary=include_summary,
113 include_doc=include_doc,
114 summary_datasets=dataset_types,
115 )
116 response = self._connection.post("query_collection_info", request)
117 model = parse_model(response, QueryCollectionInfoResponseModel)
119 return model.collections
121 def get_info(
122 self, name: str, include_parents: bool = False, include_summary: bool = False
123 ) -> CollectionInfo:
124 if has_globs(name):
125 raise ValueError("Search expressions are not allowed in 'name' parameter to get_info")
126 results = self.query_info(
127 name, include_parents=include_parents, include_summary=include_summary, include_doc=True
128 )
129 assert len(results) == 1, "Only one result should be returned for get_info."
130 return results[0]
132 def register(self, name: str, type: CollectionType = CollectionType.RUN, doc: str | None = None) -> bool:
133 raise NotImplementedError("Not yet available.")
135 def x_remove(self, name: str) -> None:
136 raise NotImplementedError("Not yet available.")