Coverage for python / lsst / ctrl / mpexec / cli / butler_factory.py: 20%
168 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:48 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = [
31 "ButlerFactory",
32 "OutputChainedCollectionInfo",
33 "OutputRunCollectionInfo",
34]
36import atexit
37import shutil
38from collections.abc import Iterable, Sequence
40from lsst.daf.butler import Butler, CollectionType
41from lsst.daf.butler.datastore.cache_manager import DatastoreCacheManager
42from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
43from lsst.daf.butler.registry.wildcards import CollectionWildcard
44from lsst.pipe.base import Instrument, PipelineGraph
45from lsst.pipe.base.pipeline_graph import NodeType
46from lsst.resources import ResourcePathExpression
47from lsst.utils.logging import getLogger
49_LOG = getLogger(__name__)
52class OutputChainedCollectionInfo:
53 """A helper class for handling command-line arguments related to an output
54 `~lsst.daf.butler.CollectionType.CHAINED` collection.
56 Parameters
57 ----------
58 butler : `lsst.daf.butler.Butler`
59 Butler that collections will be added to and/or queried from.
60 name : `str`
61 Name of the collection given on the command line.
62 """
64 def __init__(self, butler: Butler, name: str):
65 self.name = name
66 try:
67 self.chain = tuple(butler.collections.get_info(name).children)
68 self.exists = True
69 except MissingCollectionError:
70 self.chain = ()
71 self.exists = False
73 def __str__(self) -> str:
74 return self.name
76 name: str
77 """Name of the collection provided on the command line (`str`).
78 """
80 exists: bool
81 """Whether this collection already exists in the butler (`bool`).
82 """
84 chain: tuple[str, ...]
85 """The definition of the collection, if it already exists (`tuple`[`str`]).
87 Empty if the collection does not already exist.
88 """
91class OutputRunCollectionInfo:
92 """A helper class for handling command-line arguments related to an output
93 `~lsst.daf.butler.CollectionType.RUN` collection.
95 Parameters
96 ----------
97 butler : `lsst.daf.butler.Butler`
98 Butler that collections will be added to and/or queried from.
99 name : `str`
100 Name of the collection given on the command line.
101 """
103 def __init__(self, butler: Butler, name: str):
104 self.name = name
105 try:
106 actual_type = butler.collections.get_info(name).type
107 if actual_type is not CollectionType.RUN:
108 raise TypeError(f"Collection '{name}' exists but has type {actual_type.name}, not RUN.")
109 self.exists = True
110 except MissingCollectionError:
111 self.exists = False
113 name: str
114 """Name of the collection provided on the command line (`str`).
115 """
117 exists: bool
118 """Whether this collection already exists in the butler (`bool`).
119 """
122class ButlerFactory:
123 """A helper class for processing command-line arguments related to input
124 and output collections.
126 Parameters
127 ----------
128 butler : `lsst.daf.butler.Butler`
129 Butler that collections will be added to and/or queried from.
130 output : `str` or `None`
131 The name of a `~lsst.daf.butler.CollectionType.CHAINED` input/output
132 collection.
133 output_run : `str` or `None`
134 The name of a `~lsst.daf.butler.CollectionType.RUN` input/output
135 collection.
136 inputs : `str` or `~collections.abc.Iterable` [`str`]
137 Input collection name or iterable of collection names.
138 extend_run : `bool`
139 A boolean indicating whether ``output_run`` should already exist and be
140 extended.
141 rebase : `bool`
142 A boolean indicating whether to force the ``output`` collection to be
143 consistent with ``inputs`` and ``output`` run such that the ``output``
144 collection has output run collections first (i.e. those that start with
145 the same prefix), then the new inputs, then any original inputs not
146 included in the new inputs.
147 writeable : `bool`
148 If `True`, a `~lsst.daf.butler.Butler` is being initialized in a
149 context where actual writes should happens, and hence no output run
150 is necessary.
152 Raises
153 ------
154 ValueError
155 Raised if ``writeable is True`` but there are no output collections.
156 """
158 def __init__(
159 self,
160 butler: Butler,
161 *,
162 output: str | None,
163 output_run: str | None,
164 inputs: str | Iterable[str],
165 extend_run: bool = False,
166 rebase: bool = False,
167 writeable: bool,
168 ):
169 if output is not None:
170 self.output = OutputChainedCollectionInfo(butler, output)
171 else:
172 self.output = None
173 if output_run is not None:
174 if rebase and self.output and not output_run.startswith(self.output.name):
175 raise ValueError("Cannot rebase if output run does not start with output collection name.")
176 self.output_run = OutputRunCollectionInfo(butler, output_run)
177 elif self.output is not None:
178 if extend_run:
179 if not self.output.chain:
180 raise ValueError("Cannot use --extend-run option with non-existing or empty output chain")
181 run_name = self.output.chain[0]
182 else:
183 run_name = f"{self.output}/{Instrument.makeCollectionTimestamp()}"
184 self.output_run = OutputRunCollectionInfo(butler, run_name)
185 elif not writeable:
186 # If we're not writing yet, ok to have no output run.
187 self.output_run = None
188 else:
189 raise ValueError("Cannot write without at least one of (--output, --output-run).")
190 # Recursively flatten any input CHAINED collections. We do this up
191 # front so we can tell if the user passes the same inputs on subsequent
192 # calls, even though we also flatten when we define the output CHAINED
193 # collection.
194 self.inputs = tuple(butler.collections.query(inputs, flatten_chains=True)) if inputs else ()
196 # If things are inconsistent and user has asked for a rebase then
197 # construct the new output chain.
198 if rebase and self._check_output_input_consistency():
199 assert self.output is not None
200 newOutputChain = [item for item in self.output.chain if item.startswith(self.output.name)]
201 newOutputChain.extend([item for item in self.inputs if item not in newOutputChain])
202 newOutputChain.extend([item for item in self.output.chain if item not in newOutputChain])
203 self.output.chain = tuple(newOutputChain)
205 def check(self, *, extend_run: bool, replace_run: bool, prune_replaced: str | None = None) -> None:
206 """Check command-line options for consistency with each other and the
207 data repository.
209 Parameters
210 ----------
211 extend_run : `bool`
212 Whether the ``output_run`` should already exist and be extended.
213 replace_run : `bool`
214 Whether the ``output_run`` should be replaced in the ``output``
215 chain.
216 prune_replaced : `str` or `None`
217 If ``replace_run=True``, whether/how datasets in the old run should
218 be removed. Options are ``"purge"``, ``"unstore"``, and `None`.
219 """
220 assert not (extend_run and replace_run), "In mutually-exclusive group in ArgumentParser."
221 if consistencyError := self._check_output_input_consistency():
222 raise ValueError(consistencyError)
224 if extend_run:
225 if self.output_run is None:
226 raise ValueError("Cannot --extend-run when no output collection is given.")
227 elif not self.output_run.exists:
228 raise ValueError(
229 f"Cannot --extend-run; output collection '{self.output_run.name}' does not exist."
230 )
231 if not extend_run and self.output_run is not None and self.output_run.exists:
232 raise ValueError(
233 f"Output run '{self.output_run.name}' already exists, but --extend-run was not given."
234 )
235 if prune_replaced and not replace_run:
236 raise ValueError("--prune-replaced requires --replace-run.")
237 if replace_run and (self.output is None or not self.output.exists):
238 raise ValueError("--output must point to an existing CHAINED collection for --replace-run.")
240 def _check_output_input_consistency(self) -> str | None:
241 if self.inputs and self.output is not None and self.output.exists:
242 # Passing the same inputs that were used to initialize the output
243 # collection is allowed; this means the inputs must appear as a
244 # contiguous subsequence of outputs (normally they're also at the
245 # end, but --rebase will in general put them in the middle).
246 for n in reversed(range(1 + len(self.output.chain) - len(self.inputs))):
247 if self.inputs == self.output.chain[n : n + len(self.inputs)]:
248 return None
249 return (
250 f"Output CHAINED collection {self.output.name!r} exists and does not include the "
251 f"same sequence of (flattened) input collections {self.inputs} as a contiguous "
252 "subsequence. "
253 "Use --rebase to ignore this problem and reset the output collection, but note that "
254 "this may obfuscate what inputs were actually used to produce these outputs."
255 )
256 return None
258 @classmethod
259 def _make_read_parts(
260 cls,
261 butler_config: ResourcePathExpression,
262 *,
263 output: str | None,
264 output_run: str | None,
265 inputs: str | Iterable[str],
266 extend_run: bool = False,
267 rebase: bool = False,
268 replace_run: bool,
269 prune_replaced: str | None = None,
270 ) -> tuple[Butler, Sequence[str], ButlerFactory]:
271 """Parse arguments to support implementations of `make_read_butler` and
272 `make_butler_and_collections`.
274 Parameters
275 ----------
276 butler_config : convertible to `lsst.resources.ResourcePath`
277 Path to configuration for the butler.
278 output : `str` or `None`
279 The name of a `~lsst.daf.butler.CollectionType.CHAINED`
280 input/output collection.
281 output_run : `str` or `None`
282 The name of a `~lsst.daf.butler.CollectionType.RUN` input/output
283 collection.
284 inputs : `str` or `~collections.abc.Iterable` [`str`]
285 Input collection name or iterable of collection names.
286 extend_run : `bool`
287 A boolean indicating whether ``output_run`` should already exist
288 and be extended.
289 rebase : `bool`
290 A boolean indicating whether to force the ``output`` collection to
291 be consistent with ``inputs`` and ``output`` run such that the
292 ``output`` collection has output run collections first (i.e. those
293 that start with the same prefix), then the new inputs, then any
294 original inputs not included in the new inputs.
295 replace_run : `bool`
296 Whether the ``output_run`` should be replaced in the ``output``
297 chain.
298 prune_replaced : `str` or `None`
299 If ``replace_run=True``, whether/how datasets in the old run should
300 be removed. Options are ``"purge"``, ``"unstore"``, and `None`.
302 Returns
303 -------
304 butler : `lsst.daf.butler.Butler`
305 A read-only butler constructed from the repo at
306 ``args.butler_config``, but with no default collections.
307 inputs : `~collections.abc.Sequence` [ `str` ]
308 A collection search path constructed according to ``args``.
309 self : `ButlerFactory`
310 A new `ButlerFactory` instance representing the processed version
311 of ``args``.
312 """
313 butler = Butler.from_config(butler_config, writeable=False)
314 try:
315 self = cls(
316 butler,
317 output=output,
318 output_run=output_run,
319 inputs=inputs,
320 extend_run=extend_run,
321 rebase=rebase,
322 writeable=False,
323 )
324 self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced)
325 if self.output and self.output.exists:
326 if replace_run:
327 replaced = self.output.chain[0]
328 inputs = list(self.output.chain[1:])
329 _LOG.debug(
330 "Simulating collection search in '%s' after removing '%s'.",
331 self.output.name,
332 replaced,
333 )
334 else:
335 inputs = [self.output.name]
336 else:
337 inputs = list(self.inputs)
338 if extend_run:
339 assert self.output_run is not None, "Output collection has to be specified."
340 inputs.insert(0, self.output_run.name)
341 collSearch = CollectionWildcard.from_expression(inputs).require_ordered()
342 except Exception:
343 butler.close()
344 raise
345 return butler, collSearch, self
347 @classmethod
348 def make_butler_and_collections(
349 cls,
350 butler_config: ResourcePathExpression,
351 *,
352 output: str | None,
353 output_run: str | None,
354 inputs: str | Iterable[str],
355 extend_run: bool = False,
356 rebase: bool = False,
357 replace_run: bool,
358 prune_replaced: str | None = None,
359 ) -> tuple[Butler, Sequence[str], str | None]:
360 """Return a read-only butler, a collection search path, and the name
361 of the run to be used for future writes.
363 Parameters
364 ----------
365 butler_config : convertible to `lsst.resources.ResourcePath`
366 Path to configuration for the butler.
367 output : `str` or `None`
368 The name of a `~lsst.daf.butler.CollectionType.CHAINED`
369 input/output collection.
370 output_run : `str` or `None`
371 The name of a `~lsst.daf.butler.CollectionType.RUN` input/output
372 collection.
373 inputs : `str` or `~collections.abc.Iterable` [`str`]
374 Input collection name or iterable of collection names.
375 extend_run : `bool`
376 A boolean indicating whether ``output_run`` should already exist
377 and be extended.
378 rebase : `bool`
379 A boolean indicating whether to force the ``output`` collection to
380 be consistent with ``inputs`` and ``output`` run such that the
381 ``output`` collection has output run collections first (i.e. those
382 that start with the same prefix), then the new inputs, then any
383 original inputs not included in the new inputs.
384 replace_run : `bool`
385 Whether the ``output_run`` should be replaced in the ``output``
386 chain.
387 prune_replaced : `str` or `None`
388 If ``replace_run=True``, whether/how datasets in the old run should
389 be removed. Options are ``"purge"``, ``"unstore"``, and `None`.
391 Returns
392 -------
393 butler : `lsst.daf.butler.Butler`
394 A read-only butler that collections will be added to and/or queried
395 from.
396 inputs : `~collections.abc.Sequence` [ `str` ]
397 Collections to search for datasets.
398 run : `str` or `None`
399 Name of the output `~lsst.daf.butler.CollectionType.RUN` collection
400 if it already exists, or `None` if it does not.
401 """
402 butler, inputs, self = cls._make_read_parts(
403 butler_config,
404 output=output,
405 output_run=output_run,
406 inputs=inputs,
407 extend_run=extend_run,
408 rebase=rebase,
409 replace_run=replace_run,
410 prune_replaced=prune_replaced,
411 )
412 run: str | None = None
413 if extend_run:
414 assert self.output_run is not None, "Output collection has to be specified."
415 if self.output_run is not None:
416 run = self.output_run.name
417 _LOG.debug("Preparing butler to read from %s and expect future writes to '%s'.", inputs, run)
418 return butler, inputs, run
420 @staticmethod
421 def define_datastore_cache() -> None:
422 """Define where datastore cache directories should be found.
424 Notes
425 -----
426 All the jobs should share a datastore cache if applicable. This
427 method asks for a shared fallback cache to be defined and then
428 configures an exit handler to clean it up.
429 """
430 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
431 if defined:
432 atexit.register(shutil.rmtree, cache_dir, ignore_errors=True)
433 _LOG.debug("Defining shared datastore cache directory to %s", cache_dir)
435 @classmethod
436 def make_write_butler(
437 cls,
438 butler_config: ResourcePathExpression,
439 pipeline_graph: PipelineGraph,
440 *,
441 output: str | None,
442 output_run: str | None,
443 inputs: str | Iterable[str],
444 extend_run: bool = False,
445 rebase: bool = False,
446 replace_run: bool,
447 prune_replaced: str | None = None,
448 ) -> Butler:
449 """Return a read-write butler initialized to write to and read from
450 the collections specified by the given command-line arguments.
452 Parameters
453 ----------
454 butler_config : convertible to `lsst.resources.ResourcePath`
455 Path to configuration for the butler.
456 pipeline_graph : `lsst.pipe.base.PipelineGraph`
457 Definitions for tasks in a pipeline.
458 output : `str` or `None`
459 The name of a `~lsst.daf.butler.CollectionType.CHAINED`
460 input/output collection.
461 output_run : `str` or `None`
462 The name of a `~lsst.daf.butler.CollectionType.RUN` input/output
463 collection.
464 inputs : `str` or `~collections.abc.Iterable` [`str`]
465 Input collection name or iterable of collection names.
466 extend_run : `bool`
467 A boolean indicating whether ``output_run`` should already exist
468 and be extended.
469 rebase : `bool`
470 A boolean indicating whether to force the ``output`` collection to
471 be consistent with ``inputs`` and ``output`` run such that the
472 ``output`` collection has output run collections first (i.e. those
473 that start with the same prefix), then the new inputs, then any
474 original inputs not included in the new inputs.
475 replace_run : `bool`
476 Whether the ``output_run`` should be replaced in the ``output``
477 chain.
478 prune_replaced : `str` or `None`
479 If ``replace_run=True``, whether/how datasets in the old run should
480 be removed. Options are ``"purge"``, ``"unstore"``, and `None`.
482 Returns
483 -------
484 butler : `lsst.daf.butler.Butler`
485 A read-write butler initialized according to the given arguments.
486 """
487 cls.define_datastore_cache() # Ensure that this butler can use a shared cache.
488 butler = Butler.from_config(butler_config, writeable=True)
489 self = cls(
490 butler,
491 output=output,
492 output_run=output_run,
493 inputs=inputs,
494 extend_run=extend_run,
495 rebase=rebase,
496 writeable=True,
497 )
498 self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced)
499 assert self.output_run is not None, "Output collection has to be specified." # for mypy
500 if self.output is not None:
501 chain_definition = list(
502 butler.collections.query(
503 self.output.chain if self.output.exists else self.inputs,
504 flatten_chains=True,
505 include_chains=False,
506 )
507 )
508 if replace_run:
509 replaced = chain_definition.pop(0)
510 if prune_replaced == "unstore":
511 # Remove datasets from datastore
512 with butler.transaction():
513 # we want to remove regular outputs from this pipeline,
514 # but keep initOutputs, configs, and versions.
515 refs = [
516 ref
517 for ref in butler.registry.queryDatasets(..., collections=replaced)
518 if (
519 (producer := pipeline_graph.producer_of(ref.datasetType.name)) is not None
520 and producer.key.node_type is NodeType.TASK # i.e. not TASK_INIT
521 )
522 ]
523 butler.pruneDatasets(refs, unstore=True, disassociate=False)
524 elif prune_replaced == "purge":
525 # Erase entire collection and all datasets, need to remove
526 # collection from its chain collection first.
527 with butler.transaction():
528 butler.collections.redefine_chain(self.output.name, chain_definition)
529 butler.removeRuns([replaced])
530 elif prune_replaced is not None:
531 raise NotImplementedError(f"Unsupported --prune-replaced option '{prune_replaced}'.")
532 if not self.output.exists:
533 butler.collections.register(self.output.name, CollectionType.CHAINED)
534 if not extend_run:
535 butler.collections.register(self.output_run.name, CollectionType.RUN)
536 chain_definition.insert(0, self.output_run.name)
537 butler.collections.redefine_chain(self.output.name, chain_definition)
538 _LOG.debug(
539 "Preparing butler to write to '%s' and read from '%s'=%s",
540 self.output_run.name,
541 self.output.name,
542 chain_definition,
543 )
544 butler.registry.defaults = RegistryDefaults(
545 run=self.output_run.name, collections=self.output.name
546 )
547 else:
548 inputs = (self.output_run.name,) + self.inputs
549 _LOG.debug("Preparing butler to write to '%s' and read from %s.", self.output_run.name, inputs)
550 butler.registry.defaults = RegistryDefaults(run=self.output_run.name, collections=inputs)
551 return butler
553 output: OutputChainedCollectionInfo | None
554 """Information about the output chained collection, if there is or will be
555 one (`OutputChainedCollectionInfo` or `None`).
556 """
558 output_run: OutputRunCollectionInfo | None
559 """Information about the output run collection, if there is or will be
560 one (`OutputRunCollectionInfo` or `None`).
561 """
563 inputs: tuple[str, ...]
564 """Input collections provided directly by the user (`tuple` [ `str` ]).
565 """