Coverage for python / lsst / drp / pipe / tests / correspondence.py: 12%
350 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 08:49 +0000
1# This file is part of drp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Tooling for relating one pipeline to another.
24This is intended for cases where the tasks are largely the same, but task
25labels and dataset type names have changed substantially.
26"""
28from __future__ import annotations
30__all__ = ("Correspondence",)
32import csv
33import dataclasses
34import logging
35import os.path
36from collections.abc import Iterable, Set
38from lsst.pex.config import compareConfigs
39import pydantic
41from lsst.pipe.base.pipeline_graph import PipelineGraph, NodeType, TaskNode
43_LOG = logging.getLogger(__name__)
46def without_automatic_connections(names: Iterable[str]) -> list[str]:
47 return [
48 name
49 for name in names
50 if not name.endswith("_config")
51 and not name.endswith("_log")
52 and not name.endswith("_metadata")
53 ]
56class Correspondence(pydantic.BaseModel):
57 """A serializable mapping from one pipeline to another."""
59 tasks_new_to_old: dict[str, str] = pydantic.Field(
60 default_factory=dict,
61 description="Mapping from new task label to old task label.",
62 )
63 dataset_types_new_to_old: dict[str, str] = pydantic.Field(
64 default_factory=dict,
65 description="Mapping from new dataset type name to old dataset type name.",
66 )
67 unmappable_new_tasks: dict[str, str] = pydantic.Field(
68 default_factory=dict,
69 description="Tasks that exist only in the new pipeline, with values providing reasons why.",
70 )
71 unmappable_old_tasks: dict[str, str] = pydantic.Field(
72 default_factory=dict,
73 description="Tasks that exist only in the old pipeline, with values providing reasons why.",
74 )
75 unmappable_new_dataset_types: dict[str, str] = pydantic.Field(
76 default_factory=dict,
77 description="Dataset types that exist only in the new pipeline, with values providing reasons why.",
78 )
79 unmappable_old_dataset_types: dict[str, str] = pydantic.Field(
80 default_factory=dict,
81 description="Dataset types that exist only in the old pipeline, with values providing reasons why.",
82 )
83 config_ignores: dict[str, list[str]] = pydantic.Field(
84 default_factory=dict,
85 description=(
86 "Configuration lines to ignore in the diff. "
87 "Keys are new task labels, values are config file line prefixes."
88 ),
89 )
91 @classmethod
92 def read(cls, filename: str) -> Correspondence:
93 """Read the correspondence from a JSON file."""
94 with open(filename, "r") as stream:
95 return Correspondence.model_validate_json(stream.read())
97 def write(self, filename: str) -> None:
98 """Write the correspondence to a JSON file."""
99 with open(filename, "w") as stream:
100 stream.write(self.model_dump_json(indent=2))
102 def sorted(self, new: PipelineGraph, old: PipelineGraph) -> Correspondence:
103 """Sort the correspondence by the pipeline graph order.
105 Parameters
106 ----------
107 new : `lsst.pipe.base.PipelineGraph`
108 New pipeline graph.
109 old : `lsst.pipe.base.PipelineGraph`
110 Old pipeline graph.
112 Returns
113 -------
114 correspondence : `Correspondence`
115 New sorted mapping between the two pipelines.
116 """
117 result = Correspondence()
118 for new_label in new.tasks.keys():
119 if (old_label := self.tasks_new_to_old.get(new_label)) is not None:
120 result.tasks_new_to_old[new_label] = old_label
121 elif (reason := self.unmappable_new_tasks.get(new_label)) is not None:
122 result.unmappable_new_tasks[new_label] = reason
123 if new_label in self.config_ignores:
124 result.config_ignores[new_label] = self.config_ignores[new_label].copy()
125 for old_label in old.tasks.keys():
126 if (reason := self.unmappable_old_tasks.get(old_label)) is not None:
127 result.unmappable_old_tasks[old_label] = reason
128 for new_name in new.dataset_types.keys():
129 if (old_name := self.dataset_types_new_to_old.get(new_name)) is not None:
130 result.dataset_types_new_to_old[new_name] = old_name
131 elif (
132 reason := self.unmappable_new_dataset_types.get(new_name)
133 ) is not None:
134 result.unmappable_new_dataset_types[new_name] = reason
135 for old_name in old.dataset_types.keys():
136 if (reason := self.unmappable_old_dataset_types.get(old_name)) is not None:
137 result.unmappable_old_dataset_types[old_name] = reason
138 assert result.config_ignores.keys() == self.config_ignores.keys()
139 return result
141 def check(
142 self,
143 new: PipelineGraph,
144 old: PipelineGraph,
145 new_name: str,
146 old_name: str,
147 ignore_edges: Set[tuple[str, str]] = frozenset(),
148 ) -> list[str]:
149 """Check the correspondence for consistency with the given pipeline
150 graphs.
152 Parameters
153 ----------
154 new : `lsst.pipe.base.PipelineGraph`
155 New pipeline graph.
156 old : `lsst.pipe.base.PipelineGraph`
157 Old pipeline graph.
158 new_name : `str`
159 Name of the new pipeline for use in messages.
160 old_name : `str`
161 Name of the old pipeline for use in messages.
162 ignore_edges : `~collections.abc.Set` [ `tuple` [ `str`, `str` ] ], \
163 optional
164 Connections that are not expected to match, even though both the
165 task and dataset type are mapped, as a set of
166 ``(new task label, connection name)`` tuples.
168 Returns
169 -------
170 messages : `list` [ `str` ]
171 List of messages reporting problems. If empty, there are no
172 problems.
173 """
174 messages: list[str] = []
175 tasks_old_to_new = {
176 old_label: new_label
177 for new_label, old_label in self.tasks_new_to_old.items()
178 }
179 if len(tasks_old_to_new) != len(self.tasks_new_to_old):
180 for new_label, old_label in self.tasks_new_to_old.items():
181 if tasks_old_to_new[old_label] != new_label:
182 messages.append(
183 f"Task {old_label} in {old_name} is mapped to both {new_label} and "
184 f"{tasks_old_to_new[old_label]} in {new_name}."
185 )
186 for label in sorted(tasks_old_to_new.keys() - old.tasks.keys()):
187 messages.append(f"Task {label!r} is mapped but is not part of {old_name}.")
188 for label in sorted(self.tasks_new_to_old.keys() - new.tasks.keys()):
189 messages.append(f"Task {label!r} is mapped but is not part of {new_name}.")
190 for label in sorted(self.unmappable_old_tasks.keys() - old.tasks.keys()):
191 messages.append(
192 f"Task {label!r} is marked as unmappable but is not part of {old_name}."
193 )
194 for label in sorted(self.unmappable_new_tasks.keys() - new.tasks.keys()):
195 messages.append(
196 f"Task {label!r} is marked as unmappable but is not part of {new_name}."
197 )
198 missing_old_tasks = set(old.tasks.keys())
199 missing_old_tasks.difference_update(tasks_old_to_new.keys())
200 missing_old_tasks.difference_update(self.unmappable_old_tasks)
201 for label in sorted(missing_old_tasks):
202 messages.append(
203 f"Task {label!r} in {old_name} is missing from the "
204 "correspondence; it needs to be mapped or marked as unmappable."
205 )
206 missing_new_tasks = set(new.tasks.keys())
207 missing_new_tasks.difference_update(self.tasks_new_to_old.keys())
208 missing_new_tasks.difference_update(self.unmappable_new_tasks)
209 for label in sorted(missing_new_tasks):
210 messages.append(
211 f"Task {label!r} in {new_name} is missing from the "
212 "correspondence; it needs to be mapped or marked as unmappable."
213 )
214 dataset_types_old_to_new = {
215 old_label: new_label
216 for new_label, old_label in self.dataset_types_new_to_old.items()
217 }
218 old_dataset_types = set(without_automatic_connections(old.dataset_types.keys()))
219 new_dataset_types = set(without_automatic_connections(new.dataset_types.keys()))
220 if len(dataset_types_old_to_new) != len(self.dataset_types_new_to_old):
221 for new_label, old_label in self.dataset_types_new_to_old.items():
222 if dataset_types_old_to_new[old_label] != new_label:
223 messages.append(
224 f"Dataset type {old_label} in {old_name} is mapped to both {new_label} and "
225 f"{dataset_types_old_to_new[old_label]} in {new_name}."
226 )
227 for name in sorted(dataset_types_old_to_new.keys() - old_dataset_types):
228 messages.append(
229 f"Dataset type {name!r} is mapped by the correspondence but is not part of {old_name}."
230 )
231 for name in sorted(self.dataset_types_new_to_old.keys() - new_dataset_types):
232 messages.append(
233 f"Dataset type {name!r} is mapped by the correspondence but is not part of {new_name}."
234 )
235 for name in sorted(
236 self.unmappable_old_dataset_types.keys() - old_dataset_types
237 ):
238 messages.append(
239 f"Dataset type {name!r} is marked as unmappable but is not part of {old_name}."
240 )
241 for name in sorted(
242 self.unmappable_new_dataset_types.keys() - new_dataset_types
243 ):
244 messages.append(
245 f"Dataset type {name!r} is marked as unmappable but is not part of {new_name}."
246 )
247 missing_old_dataset_types = set(
248 without_automatic_connections(old.dataset_types.keys())
249 )
250 missing_old_dataset_types.difference_update(dataset_types_old_to_new.keys())
251 missing_old_dataset_types.difference_update(self.unmappable_old_dataset_types)
252 for name in sorted(missing_old_dataset_types):
253 messages.append(
254 f"Dataset type {name!r} in {old_name} is missing from the "
255 "correspondence; it needs to be mapped or marked as unmappable."
256 )
257 missing_new_dataset_types = set(
258 without_automatic_connections(new_dataset_types)
259 )
260 missing_new_dataset_types.difference_update(
261 self.dataset_types_new_to_old.keys()
262 )
263 missing_new_dataset_types.difference_update(self.unmappable_new_dataset_types)
264 for name in sorted(missing_new_dataset_types):
265 messages.append(
266 f"Dataset type {name!r} in {new_name} is missing from the "
267 "correspondence; it needs to be mapped or marked as unmappable."
268 )
269 for new_task_label, old_task_label in self.tasks_new_to_old.items():
270 new_task_node = new.tasks[new_task_label]
271 old_task_node = old.tasks[old_task_label]
272 for new_edge_map, old_edge_map in [
273 (new_task_node.inputs, old_task_node.inputs),
274 (new_task_node.prerequisite_inputs, old_task_node.prerequisite_inputs),
275 (new_task_node.outputs, old_task_node.outputs),
276 ]:
277 for connection_name, new_edge in new_edge_map.items():
278 if (new_task_label, connection_name) in ignore_edges:
279 continue
280 new_name = new_edge.parent_dataset_type_name
281 old_collection_name = connection_name
282 if mapped_old_name := self.dataset_types_new_to_old.get(new_name):
283 if not (old_edge := old_edge_map.get(connection_name)):
284 # Some analysis_tools tasks change their connection
285 # name when configured differently, but use the
286 # dataset type name as the connection.
287 old_connection_name = mapped_old_name
288 if not (old_edge := old_edge_map.get(old_connection_name)):
289 # This is some other kind of dynamic connection
290 # that we just can't check.
291 continue
292 old_name = old_edge.parent_dataset_type_name
293 if old_name != mapped_old_name:
294 messages.append(
295 f"Connection {new_task_label}.{connection_name}={new_name} is mapped to "
296 f"{mapped_old_name}, but {old_task_label}.{old_collection_name}={old_name}."
297 )
298 return messages
300 def find_matches(self, new: PipelineGraph, old: PipelineGraph) -> Correspondence:
301 """Return a new `Correspondence` that includes new mappings inferred
302 from the graph structure.
304 Parameters
305 ----------
306 new : `lsst.pipe.base.PipelineGraph`
307 New pipeline graph.
308 old : `lsst.pipe.base.PipelineGraph`
309 Old pipeline graph.
311 Returns
312 -------
313 correspondence : `Correspondence`
314 New mapping between the two pipelines.
315 """
316 # Switch to a new data structure that's duplicative but more symmetric
317 # for matching. At the same time, we trim out tasks and dataset types
318 # that aren't present in their respective pipelines so we have a chance
319 # at automatically recovering after updates and renames.
320 new_side = _CorrespondenceFinderSide(new)
321 tasks_new_to_old = {
322 new_label: old_label
323 for new_label, old_label in self.tasks_new_to_old.items()
324 if new_label in new.tasks.keys() and old_label in old.tasks.keys()
325 }
326 dataset_types_new_to_old = {
327 new_name: old_name
328 for new_name, old_name in self.dataset_types_new_to_old.items()
329 if new_name in new.dataset_types.keys()
330 and old_name in old.dataset_types.keys()
331 }
332 new_side.map_tasks.update(tasks_new_to_old)
333 new_side.map_dataset_types.update(dataset_types_new_to_old)
334 new_side.unmappable_tasks.update(
335 self.unmappable_new_tasks.keys() & new.tasks.keys()
336 )
337 new_side.unmappable_dataset_types.update(
338 self.unmappable_new_dataset_types.keys() & new.dataset_types.keys()
339 )
340 old_side = _CorrespondenceFinderSide(old)
341 old_side.map_tasks.update({old: new for new, old in tasks_new_to_old.items()})
342 old_side.map_dataset_types.update(
343 {old: new for new, old in dataset_types_new_to_old.items()}
344 )
345 old_side.unmappable_tasks.update(
346 self.unmappable_old_tasks.keys() & old.tasks.keys()
347 )
348 old_side.unmappable_dataset_types.update(
349 self.unmappable_old_dataset_types.keys() & old.dataset_types.keys()
350 )
351 # Start by assuming identical names should correspond, as long as they
352 # haven't explicitly been matched to something else already.
353 for task_label in new_side.unmatched_tasks & old_side.unmatched_tasks:
354 if new_side.task_nodes_match(task_label, task_label, old_side):
355 new_side.relate_tasks(task_label, task_label, old_side)
356 for dataset_type_name in (
357 new_side.unmatched_dataset_types & old_side.unmatched_dataset_types
358 ):
359 if new_side.dataset_type_nodes_match(
360 dataset_type_name, dataset_type_name, old_side
361 ):
362 new_side.relate_dataset_types(
363 dataset_type_name, dataset_type_name, old_side
364 )
365 _LOG.info(
366 f"{len(new_side.map_tasks)} tasks, {len(new_side.map_dataset_types)} dataset types "
367 "mapped after direct-name matching."
368 )
369 # Attempt to incrementally improve the correspondence by looking at
370 # matching bits of graph structure.
371 successes = True
372 n_iterations = 0
373 while successes:
374 successes = 0
375 for new_task_label in new_side.unmatched_tasks:
376 successes += new_side.match_task_via_output_producers(
377 new_task_label, old_side, "<-"
378 )
379 for old_task_label in old_side.unmatched_tasks:
380 successes += old_side.match_task_via_output_producers(
381 old_task_label, new_side, "->"
382 )
383 for new_dataset_type_name in new_side.unmatched_dataset_types:
384 successes += new_side.match_dataset_type_via_producer_outputs(
385 new_dataset_type_name, old_side, "<-"
386 )
387 for old_dataset_type_name in old_side.unmatched_tasks:
388 successes += old_side.match_dataset_type_via_producer_outputs(
389 old_dataset_type_name, new_side, "->"
390 )
391 for new_task_label in new_side.unmatched_tasks:
392 successes += new_side.match_task_via_input_consumers(
393 new_task_label, old_side, "<-"
394 )
395 for old_task_label in old_side.unmatched_tasks:
396 successes += old_side.match_task_via_input_consumers(
397 old_task_label, new_side, "->"
398 )
399 for new_dataset_type_name in new_side.unmatched_dataset_types:
400 successes += new_side.match_dataset_type_via_consumer_inputs(
401 new_dataset_type_name, old_side, "<-"
402 )
403 for old_dataset_type_name in old_side.unmatched_tasks:
404 successes += old_side.match_dataset_type_via_consumer_inputs(
405 old_dataset_type_name, new_side, "->"
406 )
407 n_iterations += 1
408 _LOG.info(
409 f"{len(new_side.map_tasks)} tasks, {len(new_side.map_dataset_types)} dataset types "
410 f"after iteration {n_iterations}."
411 )
412 result = Correspondence()
413 result.tasks_new_to_old.update(new_side.map_tasks.items())
414 result.dataset_types_new_to_old.update(new_side.map_dataset_types.items())
415 result.unmappable_new_tasks.update(
416 {
417 label: self.unmappable_new_tasks.get(label, "")
418 for label in new_side.unmappable_tasks
419 }
420 )
421 result.unmappable_old_tasks.update(
422 {
423 label: self.unmappable_old_tasks.get(label, "")
424 for label in old_side.unmappable_tasks
425 }
426 )
427 result.unmappable_new_dataset_types.update(
428 {
429 name: self.unmappable_new_dataset_types.get(name, "")
430 for name in new_side.unmappable_dataset_types
431 }
432 )
433 result.unmappable_old_dataset_types.update(
434 {
435 name: self.unmappable_old_dataset_types.get(name, "")
436 for name in old_side.unmappable_dataset_types
437 }
438 )
439 result.config_ignores = {
440 new_label: ignore_lines.copy()
441 for new_label, ignore_lines in self.config_ignores.items()
442 }
443 return result
445 def diff_task_configs(self, new: TaskNode, old: TaskNode) -> list[str]:
446 # We'll do a diff of the config strings (in config-override-file form),
447 # but excise all of the config.connections lines that we know will have
448 # differences, as well as all of the comments and blank lines.
449 ignore_prefixes = ["connections.", "idGenerator.release_id", "id_generator.release_id"]
450 ignore_prefixes.extend(self.config_ignores.get(new.label, []))
451 messages: list[str] = []
453 def output(msg: str) -> None:
454 if not any(msg.startswith(prefix) for prefix in ignore_prefixes):
455 messages.append(msg)
457 compareConfigs(new.label, new.config, old.config, output=output)
458 return messages
460 def write_task_csv(
461 self, filename: str, new: PipelineGraph, old: PipelineGraph
462 ) -> None:
463 """Write the mapping between tasks to a CSV file.
465 Parameters
466 ----------
467 filename : `str`
468 Name of the file.
469 new : `lsst.pipe.base.PipelineGraph`
470 New pipeline graph.
471 old : `lsst.pipe.base.PipelineGraph`
472 Old pipeline graph.
473 """
474 with open(filename, "w") as stream:
475 writer = csv.writer(stream, delimiter=";")
476 n = 0
477 for new_label, new_task_node in new.tasks.items():
478 step = new.get_task_step(new_label)
479 old_label = self.tasks_new_to_old.get(new_label, "")
480 writer.writerow(
481 [
482 n,
483 step,
484 new_label,
485 old_label,
486 new_task_node.task_class_name,
487 ", ".join(new_task_node.dimensions.required),
488 ]
489 )
490 n += 1
491 for old_label, old_task_node in old.tasks.items():
492 if old_label in self.unmappable_old_tasks:
493 writer.writerow(
494 [
495 n,
496 "",
497 "",
498 old_label,
499 old_task_node.task_class_name,
500 ", ".join(old_task_node.dimensions.required),
501 ]
502 )
503 n += 1
505 def write_dataset_type_csv(
506 self, filename: str, new: PipelineGraph, old: PipelineGraph
507 ) -> None:
508 """Write the mapping between dataset types to a CSV file.
510 Parameters
511 ----------
512 filename : `str`
513 Name of the file.
514 new : `lsst.pipe.base.PipelineGraph`
515 New pipeline graph.
516 old : `lsst.pipe.base.PipelineGraph`
517 Old pipeline graph.
518 """
519 with open(filename, "w") as stream:
520 writer = csv.writer(stream, delimiter=";")
521 n = 0
522 for new_name in without_automatic_connections(new.dataset_types):
523 new_dataset_type_node = new.dataset_types[new_name]
524 old_name = self.dataset_types_new_to_old.get(new_name, "")
525 writer.writerow(
526 [
527 n,
528 new_name,
529 old_name,
530 new_dataset_type_node.storage_class_name,
531 ", ".join(new_dataset_type_node.dimensions.required),
532 ]
533 )
534 n += 1
535 for old_name in without_automatic_connections(old.dataset_types):
536 if old_name in self.unmappable_old_dataset_types:
537 old_dataset_type_node = old.dataset_types[old_name]
538 writer.writerow(
539 [
540 n,
541 "",
542 old_name,
543 old_dataset_type_node.storage_class_name,
544 ", ".join(old_dataset_type_node.dimensions.required),
545 ]
546 )
547 n += 1
550@dataclasses.dataclass
551class _CorrespondenceFinderSide:
552 """An alternate data structure for pipeline-pipeline mapping, used in
553 `Correspondence.find_matches`.
555 One instance of this class is expected to be paired with another
556 representing the reverse mapping.
557 """
559 pipeline_graph: PipelineGraph
560 """Pipeline graph this side maps from."""
562 map_tasks: dict[str, str] = dataclasses.field(default_factory=dict)
563 """Mapping of tasks, from this side to the other."""
565 map_dataset_types: dict[str, str] = dataclasses.field(default_factory=dict)
566 """Mapping of dataset types, from this side to the other."""
568 unmappable_tasks: set[str] = dataclasses.field(default_factory=set)
569 """Tasks on this side that cannot be mapped."""
571 unmappable_dataset_types: set[str] = dataclasses.field(default_factory=set)
572 """Dataset types on this side that cannot be mapped."""
574 @property
575 def unmatched_tasks(self) -> set[str]:
576 """Tasks on this side that could be mapped but have not yet been."""
577 result = set(self.pipeline_graph.tasks.keys())
578 result.difference_update(self.map_tasks.keys())
579 result.difference_update(self.unmappable_tasks)
580 return result
582 @property
583 def unmatched_dataset_types(self) -> set[str]:
584 """Dataset types on this side that could be mapped but have not yet
585 been.
586 """
587 result = set(
588 without_automatic_connections(self.pipeline_graph.dataset_types.keys())
589 )
590 result.difference_update(self.map_dataset_types.keys())
591 result.difference_update(self.unmappable_dataset_types)
592 return result
594 def relate_tasks(
595 self, label: str, other_label: str, other: _CorrespondenceFinderSide
596 ) -> None:
597 """Add a mapping between the given task labels."""
598 self.map_tasks[label] = other_label
599 other.map_tasks[other_label] = label
601 def relate_dataset_types(
602 self, name: str, other_name: str, other: _CorrespondenceFinderSide
603 ) -> None:
604 """Add a mapping between the given dataset type names."""
605 self.map_dataset_types[name] = other_name
606 other.map_dataset_types[other_name] = name
608 def task_nodes_match(
609 self, label: str, other_label: str, other: _CorrespondenceFinderSide
610 ) -> bool:
611 """Test whether two tasks can be matched.
613 This checks whether the tasks are still available to be matched and
614 whether they have the same task class and dimensions.
615 """
616 if label in self.map_tasks:
617 return False
618 if label in self.unmappable_tasks:
619 return False
620 if other_label in other.map_tasks:
621 return False
622 if other_label in other.unmappable_tasks:
623 return False
624 new_node = self.pipeline_graph.tasks[label]
625 old_node = other.pipeline_graph.tasks[other_label]
626 return (
627 new_node.dimensions == old_node.dimensions
628 and new_node.task_class_name == old_node.task_class_name
629 )
631 def dataset_type_nodes_match(
632 self, name: str, other_name: str, other: _CorrespondenceFinderSide
633 ) -> bool:
634 """Test whether two dataset types can be matched.
636 This checks whether the dataset types are still available to be matched
637 and whether they have the same storage class and dimensions.
638 """
639 if name in self.map_dataset_types:
640 return False
641 if name in self.unmappable_dataset_types:
642 return False
643 if other_name in other.map_dataset_types:
644 return False
645 if other_name in other.unmappable_dataset_types:
646 return False
647 new_node = self.pipeline_graph.dataset_types[name]
648 old_node = other.pipeline_graph.dataset_types[other_name]
649 return (
650 new_node.dimensions == old_node.dimensions
651 and new_node.storage_class_name == old_node.storage_class_name
652 )
654 def match_task_via_output_producers(
655 self, label: str, other: _CorrespondenceFinderSide, direction: str
656 ) -> bool:
657 """Look for a match for the given task by inspecting the mappings of
658 its outputs' producers.
659 """
660 my_outputs = (
661 self.pipeline_graph.outputs_of(label).keys() - self.unmappable_tasks
662 )
663 other_outputs = {
664 other_output
665 for my_output in my_outputs
666 if (other_output := self.map_dataset_types.get(my_output)) is not None
667 }
668 other_output_producers = {
669 other_producer_node.label
670 for other_output in other_outputs
671 if (other_producer_node := other.pipeline_graph.producer_of(other_output))
672 is not None
673 and self.task_nodes_match(label, other_producer_node.label, other)
674 }
675 other_output_producers -= other.unmappable_tasks
676 if len(other_output_producers) == 1:
677 other_label = other_output_producers.pop()
678 _LOG.debug(
679 f"Successful output producer match {label} {direction} {other_label}."
680 )
681 self.relate_tasks(label, other_label, other)
682 return True
683 else:
684 _LOG.info(
685 f"No unique output producer match for {label} {direction}: {other_output_producers}."
686 )
687 return False
689 def match_task_via_input_consumers(
690 self, label: str, other: _CorrespondenceFinderSide, direction: str
691 ) -> bool:
692 """Look for a match for the given task by inspecting the mappings of
693 its inputs' consumers.
694 """
695 my_inputs = (
696 self.pipeline_graph.inputs_of(label).keys() - self.unmappable_dataset_types
697 )
698 other_inputs = {
699 other_input
700 for my_input in my_inputs
701 if (other_input := self.map_dataset_types.get(my_input)) is not None
702 }
703 other_input_consumers = {
704 other_input_consumer_node.label
705 for other_input in other_inputs
706 for other_input_consumer_node in other.pipeline_graph.consumers_of(
707 other_input
708 )
709 if self.task_nodes_match(label, other_input_consumer_node.label, other)
710 }
711 other_input_consumers -= other.unmappable_tasks
712 if len(other_input_consumers) == 1:
713 other_label = other_input_consumers.pop()
714 _LOG.debug(
715 f"Successful input consumer match {label} {direction} {other_label}."
716 )
717 self.relate_tasks(label, other_label, other)
718 return True
719 else:
720 _LOG.info(
721 f"No unique input consumer match for {label} {direction}: {other_input_consumers}."
722 )
723 return False
725 def match_dataset_type_via_producer_outputs(
726 self,
727 name: str,
728 other: _CorrespondenceFinderSide,
729 direction: str,
730 ) -> bool:
731 """Look for a match for the given dataset type by inspecting the
732 mappings of its producer's outputs.
733 """
734 if (my_producer_node := self.pipeline_graph.producer_of(name)) is None:
735 _LOG.info(
736 f"No producer output match for {name} {direction}; it is an overall input."
737 )
738 return False
739 if my_producer_node.label in self.unmappable_tasks:
740 _LOG.debug(
741 f"No producer output match for {name} {direction}; its producer is unmappable."
742 )
743 self.unmappable_dataset_types.add(name)
744 return True
745 if (other_producer := self.map_tasks.get(my_producer_node.label)) is None:
746 _LOG.info(
747 f"No producer output match for {name} {direction}; its producer is not mapped yet."
748 )
749 return False
750 other_producer_outputs = {
751 other_producer_output
752 for other_producer_output in other.pipeline_graph.outputs_of(
753 other_producer,
754 init=(my_producer_node.key.node_type is NodeType.TASK_INIT),
755 )
756 if self.dataset_type_nodes_match(name, other_producer_output, other)
757 }
758 other_producer_outputs -= other.unmappable_dataset_types
759 if len(other_producer_outputs) == 1:
760 other_name = other_producer_outputs.pop()
761 _LOG.debug(f"Unique producer output match {name} {direction} {other_name}.")
762 self.relate_dataset_types(name, other_name, other)
763 return True
764 elif len(other_producer_outputs) > 1:
765 common_suffix_scores = [
766 (self.compute_common_suffix_length(name, other_name), other_name)
767 for other_name in other_producer_outputs
768 ]
769 common_suffix_scores.sort(reverse=True)
770 best_score, other_name = common_suffix_scores[0]
771 next_best_score, _ = common_suffix_scores[1]
772 if best_score > next_best_score: # no tie for first place
773 _LOG.debug(
774 f"Scored producer output match {name} {direction} {other_name}."
775 )
776 self.relate_dataset_types(name, other_name, other)
777 return True
778 _LOG.info(
779 f"No unique producer output match for {name} {direction}: {common_suffix_scores}."
780 )
781 else:
782 _LOG.info(f"No producer output matches for {name} {direction}.")
783 return False
785 def match_dataset_type_via_consumer_inputs(
786 self,
787 name: str,
788 other: _CorrespondenceFinderSide,
789 direction: str,
790 ) -> bool:
791 """Look for a match for the given dataset type by inspecting the
792 mappings of its consumers' inputs.
793 """
794 my_consumers = {
795 my_consumer_node.label
796 for my_consumer_node in self.pipeline_graph.consumers_of(name)
797 }
798 my_consumers -= self.unmappable_tasks
799 other_consumers = {
800 other_consumer
801 for my_consumer in my_consumers
802 if (other_consumer := self.map_tasks.get(my_consumer)) is not None
803 }
804 other_consumer_inputs = {
805 other_consumer_input
806 for other_consumer in other_consumers
807 for other_consumer_input in other.pipeline_graph.inputs_of(other_consumer)
808 if self.dataset_type_nodes_match(name, other_consumer_input, other)
809 }
810 other_consumer_inputs -= other.unmappable_dataset_types
811 if len(other_consumer_inputs) == 1:
812 other_name = other_consumer_inputs.pop()
813 _LOG.info(
814 f"Successful consumer input match {name} {direction} {other_name}."
815 )
816 self.relate_dataset_types(name, other_name, other)
817 return True
818 else:
819 _LOG.info(
820 f"No unique consumer input match for {name} {direction}: {other_consumer_inputs}."
821 )
822 return False
824 @staticmethod
825 def compute_common_suffix_length(name1: str, name2: str) -> int:
826 """Count the number of consecutive characters the two strings have in
827 common, starting from their ends.
828 """
829 rev1 = name1[::-1]
830 rev2 = name2[::-1]
831 return len(os.path.commonprefix([rev1, rev2]))
834def _main():
835 import argparse
836 from lsst.pipe.base import Pipeline
838 parser = argparse.ArgumentParser("Create CSV files from a pipeline correspondence.")
839 parser.add_argument("new", help="New pipeline YAML filename.")
840 parser.add_argument("old", help="Old pipeline YAML filename.")
841 parser.add_argument("correspondence", help="Correspondence JSON file.")
842 parser.add_argument("--tasks", default="tasks.csv", help="Filename for task CSV.")
843 parser.add_argument(
844 "--dataset-types",
845 default="dataset-types.csv",
846 help="Filename for dataset type CSV.",
847 )
848 args = parser.parse_args()
849 new = Pipeline.from_uri(args.new).to_graph(visualization_only=True)
850 old = Pipeline.from_uri(args.old).to_graph(visualization_only=True)
851 correspondence = Correspondence.read(args.correspondence)
852 if args.tasks:
853 correspondence.write_task_csv(args.tasks, new, old)
854 if args.dataset_types:
855 correspondence.write_dataset_type_csv(args.dataset_types, new, old)
858if __name__ == "__main__": 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true
859 _main()