Coverage for python / lsst / drp / pipe / tests / correspondence.py: 12%

350 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:28 +0000

1# This file is part of drp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Tooling for relating one pipeline to another. 

23 

24This is intended for cases where the tasks are largely the same, but task 

25labels and dataset type names have changed substantially. 

26""" 

27 

28from __future__ import annotations 

29 

30__all__ = ("Correspondence",) 

31 

32import csv 

33import dataclasses 

34import logging 

35import os.path 

36from collections.abc import Iterable, Set 

37 

38from lsst.pex.config import compareConfigs 

39import pydantic 

40 

41from lsst.pipe.base.pipeline_graph import PipelineGraph, NodeType, TaskNode 

42 

43_LOG = logging.getLogger(__name__) 

44 

45 

46def without_automatic_connections(names: Iterable[str]) -> list[str]: 

47 return [ 

48 name 

49 for name in names 

50 if not name.endswith("_config") 

51 and not name.endswith("_log") 

52 and not name.endswith("_metadata") 

53 ] 

54 

55 

56class Correspondence(pydantic.BaseModel): 

57 """A serializable mapping from one pipeline to another.""" 

58 

59 tasks_new_to_old: dict[str, str] = pydantic.Field( 

60 default_factory=dict, 

61 description="Mapping from new task label to old task label.", 

62 ) 

63 dataset_types_new_to_old: dict[str, str] = pydantic.Field( 

64 default_factory=dict, 

65 description="Mapping from new dataset type name to old dataset type name.", 

66 ) 

67 unmappable_new_tasks: dict[str, str] = pydantic.Field( 

68 default_factory=dict, 

69 description="Tasks that exist only in the new pipeline, with values providing reasons why.", 

70 ) 

71 unmappable_old_tasks: dict[str, str] = pydantic.Field( 

72 default_factory=dict, 

73 description="Tasks that exist only in the old pipeline, with values providing reasons why.", 

74 ) 

75 unmappable_new_dataset_types: dict[str, str] = pydantic.Field( 

76 default_factory=dict, 

77 description="Dataset types that exist only in the new pipeline, with values providing reasons why.", 

78 ) 

79 unmappable_old_dataset_types: dict[str, str] = pydantic.Field( 

80 default_factory=dict, 

81 description="Dataset types that exist only in the old pipeline, with values providing reasons why.", 

82 ) 

83 config_ignores: dict[str, list[str]] = pydantic.Field( 

84 default_factory=dict, 

85 description=( 

86 "Configuration lines to ignore in the diff. " 

87 "Keys are new task labels, values are config file line prefixes." 

88 ), 

89 ) 

90 

91 @classmethod 

92 def read(cls, filename: str) -> Correspondence: 

93 """Read the correspondence from a JSON file.""" 

94 with open(filename, "r") as stream: 

95 return Correspondence.model_validate_json(stream.read()) 

96 

97 def write(self, filename: str) -> None: 

98 """Write the correspondence to a JSON file.""" 

99 with open(filename, "w") as stream: 

100 stream.write(self.model_dump_json(indent=2)) 

101 

102 def sorted(self, new: PipelineGraph, old: PipelineGraph) -> Correspondence: 

103 """Sort the correspondence by the pipeline graph order. 

104 

105 Parameters 

106 ---------- 

107 new : `lsst.pipe.base.PipelineGraph` 

108 New pipeline graph. 

109 old : `lsst.pipe.base.PipelineGraph` 

110 Old pipeline graph. 

111 

112 Returns 

113 ------- 

114 correspondence : `Correspondence` 

115 New sorted mapping between the two pipelines. 

116 """ 

117 result = Correspondence() 

118 for new_label in new.tasks.keys(): 

119 if (old_label := self.tasks_new_to_old.get(new_label)) is not None: 

120 result.tasks_new_to_old[new_label] = old_label 

121 elif (reason := self.unmappable_new_tasks.get(new_label)) is not None: 

122 result.unmappable_new_tasks[new_label] = reason 

123 if new_label in self.config_ignores: 

124 result.config_ignores[new_label] = self.config_ignores[new_label].copy() 

125 for old_label in old.tasks.keys(): 

126 if (reason := self.unmappable_old_tasks.get(old_label)) is not None: 

127 result.unmappable_old_tasks[old_label] = reason 

128 for new_name in new.dataset_types.keys(): 

129 if (old_name := self.dataset_types_new_to_old.get(new_name)) is not None: 

130 result.dataset_types_new_to_old[new_name] = old_name 

131 elif ( 

132 reason := self.unmappable_new_dataset_types.get(new_name) 

133 ) is not None: 

134 result.unmappable_new_dataset_types[new_name] = reason 

135 for old_name in old.dataset_types.keys(): 

136 if (reason := self.unmappable_old_dataset_types.get(old_name)) is not None: 

137 result.unmappable_old_dataset_types[old_name] = reason 

138 assert result.config_ignores.keys() == self.config_ignores.keys() 

139 return result 

140 

141 def check( 

142 self, 

143 new: PipelineGraph, 

144 old: PipelineGraph, 

145 new_name: str, 

146 old_name: str, 

147 ignore_edges: Set[tuple[str, str]] = frozenset(), 

148 ) -> list[str]: 

149 """Check the correspondence for consistency with the given pipeline 

150 graphs. 

151 

152 Parameters 

153 ---------- 

154 new : `lsst.pipe.base.PipelineGraph` 

155 New pipeline graph. 

156 old : `lsst.pipe.base.PipelineGraph` 

157 Old pipeline graph. 

158 new_name : `str` 

159 Name of the new pipeline for use in messages. 

160 old_name : `str` 

161 Name of the old pipeline for use in messages. 

162 ignore_edges : `~collections.abc.Set` [ `tuple` [ `str`, `str` ] ], \ 

163 optional 

164 Connections that are not expected to match, even though both the 

165 task and dataset type are mapped, as a set of 

166 ``(new task label, connection name)`` tuples. 

167 

168 Returns 

169 ------- 

170 messages : `list` [ `str` ] 

171 List of messages reporting problems. If empty, there are no 

172 problems. 

173 """ 

174 messages: list[str] = [] 

175 tasks_old_to_new = { 

176 old_label: new_label 

177 for new_label, old_label in self.tasks_new_to_old.items() 

178 } 

179 if len(tasks_old_to_new) != len(self.tasks_new_to_old): 

180 for new_label, old_label in self.tasks_new_to_old.items(): 

181 if tasks_old_to_new[old_label] != new_label: 

182 messages.append( 

183 f"Task {old_label} in {old_name} is mapped to both {new_label} and " 

184 f"{tasks_old_to_new[old_label]} in {new_name}." 

185 ) 

186 for label in sorted(tasks_old_to_new.keys() - old.tasks.keys()): 

187 messages.append(f"Task {label!r} is mapped but is not part of {old_name}.") 

188 for label in sorted(self.tasks_new_to_old.keys() - new.tasks.keys()): 

189 messages.append(f"Task {label!r} is mapped but is not part of {new_name}.") 

190 for label in sorted(self.unmappable_old_tasks.keys() - old.tasks.keys()): 

191 messages.append( 

192 f"Task {label!r} is marked as unmappable but is not part of {old_name}." 

193 ) 

194 for label in sorted(self.unmappable_new_tasks.keys() - new.tasks.keys()): 

195 messages.append( 

196 f"Task {label!r} is marked as unmappable but is not part of {new_name}." 

197 ) 

198 missing_old_tasks = set(old.tasks.keys()) 

199 missing_old_tasks.difference_update(tasks_old_to_new.keys()) 

200 missing_old_tasks.difference_update(self.unmappable_old_tasks) 

201 for label in sorted(missing_old_tasks): 

202 messages.append( 

203 f"Task {label!r} in {old_name} is missing from the " 

204 "correspondence; it needs to be mapped or marked as unmappable." 

205 ) 

206 missing_new_tasks = set(new.tasks.keys()) 

207 missing_new_tasks.difference_update(self.tasks_new_to_old.keys()) 

208 missing_new_tasks.difference_update(self.unmappable_new_tasks) 

209 for label in sorted(missing_new_tasks): 

210 messages.append( 

211 f"Task {label!r} in {new_name} is missing from the " 

212 "correspondence; it needs to be mapped or marked as unmappable." 

213 ) 

214 dataset_types_old_to_new = { 

215 old_label: new_label 

216 for new_label, old_label in self.dataset_types_new_to_old.items() 

217 } 

218 old_dataset_types = set(without_automatic_connections(old.dataset_types.keys())) 

219 new_dataset_types = set(without_automatic_connections(new.dataset_types.keys())) 

220 if len(dataset_types_old_to_new) != len(self.dataset_types_new_to_old): 

221 for new_label, old_label in self.dataset_types_new_to_old.items(): 

222 if dataset_types_old_to_new[old_label] != new_label: 

223 messages.append( 

224 f"Dataset type {old_label} in {old_name} is mapped to both {new_label} and " 

225 f"{dataset_types_old_to_new[old_label]} in {new_name}." 

226 ) 

227 for name in sorted(dataset_types_old_to_new.keys() - old_dataset_types): 

228 messages.append( 

229 f"Dataset type {name!r} is mapped by the correspondence but is not part of {old_name}." 

230 ) 

231 for name in sorted(self.dataset_types_new_to_old.keys() - new_dataset_types): 

232 messages.append( 

233 f"Dataset type {name!r} is mapped by the correspondence but is not part of {new_name}." 

234 ) 

235 for name in sorted( 

236 self.unmappable_old_dataset_types.keys() - old_dataset_types 

237 ): 

238 messages.append( 

239 f"Dataset type {name!r} is marked as unmappable but is not part of {old_name}." 

240 ) 

241 for name in sorted( 

242 self.unmappable_new_dataset_types.keys() - new_dataset_types 

243 ): 

244 messages.append( 

245 f"Dataset type {name!r} is marked as unmappable but is not part of {new_name}." 

246 ) 

247 missing_old_dataset_types = set( 

248 without_automatic_connections(old.dataset_types.keys()) 

249 ) 

250 missing_old_dataset_types.difference_update(dataset_types_old_to_new.keys()) 

251 missing_old_dataset_types.difference_update(self.unmappable_old_dataset_types) 

252 for name in sorted(missing_old_dataset_types): 

253 messages.append( 

254 f"Dataset type {name!r} in {old_name} is missing from the " 

255 "correspondence; it needs to be mapped or marked as unmappable." 

256 ) 

257 missing_new_dataset_types = set( 

258 without_automatic_connections(new_dataset_types) 

259 ) 

260 missing_new_dataset_types.difference_update( 

261 self.dataset_types_new_to_old.keys() 

262 ) 

263 missing_new_dataset_types.difference_update(self.unmappable_new_dataset_types) 

264 for name in sorted(missing_new_dataset_types): 

265 messages.append( 

266 f"Dataset type {name!r} in {new_name} is missing from the " 

267 "correspondence; it needs to be mapped or marked as unmappable." 

268 ) 

269 for new_task_label, old_task_label in self.tasks_new_to_old.items(): 

270 new_task_node = new.tasks[new_task_label] 

271 old_task_node = old.tasks[old_task_label] 

272 for new_edge_map, old_edge_map in [ 

273 (new_task_node.inputs, old_task_node.inputs), 

274 (new_task_node.prerequisite_inputs, old_task_node.prerequisite_inputs), 

275 (new_task_node.outputs, old_task_node.outputs), 

276 ]: 

277 for connection_name, new_edge in new_edge_map.items(): 

278 if (new_task_label, connection_name) in ignore_edges: 

279 continue 

280 new_name = new_edge.parent_dataset_type_name 

281 old_collection_name = connection_name 

282 if mapped_old_name := self.dataset_types_new_to_old.get(new_name): 

283 if not (old_edge := old_edge_map.get(connection_name)): 

284 # Some analysis_tools tasks change their connection 

285 # name when configured differently, but use the 

286 # dataset type name as the connection. 

287 old_connection_name = mapped_old_name 

288 if not (old_edge := old_edge_map.get(old_connection_name)): 

289 # This is some other kind of dynamic connection 

290 # that we just can't check. 

291 continue 

292 old_name = old_edge.parent_dataset_type_name 

293 if old_name != mapped_old_name: 

294 messages.append( 

295 f"Connection {new_task_label}.{connection_name}={new_name} is mapped to " 

296 f"{mapped_old_name}, but {old_task_label}.{old_collection_name}={old_name}." 

297 ) 

298 return messages 

299 

300 def find_matches(self, new: PipelineGraph, old: PipelineGraph) -> Correspondence: 

301 """Return a new `Correspondence` that includes new mappings inferred 

302 from the graph structure. 

303 

304 Parameters 

305 ---------- 

306 new : `lsst.pipe.base.PipelineGraph` 

307 New pipeline graph. 

308 old : `lsst.pipe.base.PipelineGraph` 

309 Old pipeline graph. 

310 

311 Returns 

312 ------- 

313 correspondence : `Correspondence` 

314 New mapping between the two pipelines. 

315 """ 

316 # Switch to a new data structure that's duplicative but more symmetric 

317 # for matching. At the same time, we trim out tasks and dataset types 

318 # that aren't present in their respective pipelines so we have a chance 

319 # at automatically recovering after updates and renames. 

320 new_side = _CorrespondenceFinderSide(new) 

321 tasks_new_to_old = { 

322 new_label: old_label 

323 for new_label, old_label in self.tasks_new_to_old.items() 

324 if new_label in new.tasks.keys() and old_label in old.tasks.keys() 

325 } 

326 dataset_types_new_to_old = { 

327 new_name: old_name 

328 for new_name, old_name in self.dataset_types_new_to_old.items() 

329 if new_name in new.dataset_types.keys() 

330 and old_name in old.dataset_types.keys() 

331 } 

332 new_side.map_tasks.update(tasks_new_to_old) 

333 new_side.map_dataset_types.update(dataset_types_new_to_old) 

334 new_side.unmappable_tasks.update( 

335 self.unmappable_new_tasks.keys() & new.tasks.keys() 

336 ) 

337 new_side.unmappable_dataset_types.update( 

338 self.unmappable_new_dataset_types.keys() & new.dataset_types.keys() 

339 ) 

340 old_side = _CorrespondenceFinderSide(old) 

341 old_side.map_tasks.update({old: new for new, old in tasks_new_to_old.items()}) 

342 old_side.map_dataset_types.update( 

343 {old: new for new, old in dataset_types_new_to_old.items()} 

344 ) 

345 old_side.unmappable_tasks.update( 

346 self.unmappable_old_tasks.keys() & old.tasks.keys() 

347 ) 

348 old_side.unmappable_dataset_types.update( 

349 self.unmappable_old_dataset_types.keys() & old.dataset_types.keys() 

350 ) 

351 # Start by assuming identical names should correspond, as long as they 

352 # haven't explicitly been matched to something else already. 

353 for task_label in new_side.unmatched_tasks & old_side.unmatched_tasks: 

354 if new_side.task_nodes_match(task_label, task_label, old_side): 

355 new_side.relate_tasks(task_label, task_label, old_side) 

356 for dataset_type_name in ( 

357 new_side.unmatched_dataset_types & old_side.unmatched_dataset_types 

358 ): 

359 if new_side.dataset_type_nodes_match( 

360 dataset_type_name, dataset_type_name, old_side 

361 ): 

362 new_side.relate_dataset_types( 

363 dataset_type_name, dataset_type_name, old_side 

364 ) 

365 _LOG.info( 

366 f"{len(new_side.map_tasks)} tasks, {len(new_side.map_dataset_types)} dataset types " 

367 "mapped after direct-name matching." 

368 ) 

369 # Attempt to incrementally improve the correspondence by looking at 

370 # matching bits of graph structure. 

371 successes = True 

372 n_iterations = 0 

373 while successes: 

374 successes = 0 

375 for new_task_label in new_side.unmatched_tasks: 

376 successes += new_side.match_task_via_output_producers( 

377 new_task_label, old_side, "<-" 

378 ) 

379 for old_task_label in old_side.unmatched_tasks: 

380 successes += old_side.match_task_via_output_producers( 

381 old_task_label, new_side, "->" 

382 ) 

383 for new_dataset_type_name in new_side.unmatched_dataset_types: 

384 successes += new_side.match_dataset_type_via_producer_outputs( 

385 new_dataset_type_name, old_side, "<-" 

386 ) 

387 for old_dataset_type_name in old_side.unmatched_tasks: 

388 successes += old_side.match_dataset_type_via_producer_outputs( 

389 old_dataset_type_name, new_side, "->" 

390 ) 

391 for new_task_label in new_side.unmatched_tasks: 

392 successes += new_side.match_task_via_input_consumers( 

393 new_task_label, old_side, "<-" 

394 ) 

395 for old_task_label in old_side.unmatched_tasks: 

396 successes += old_side.match_task_via_input_consumers( 

397 old_task_label, new_side, "->" 

398 ) 

399 for new_dataset_type_name in new_side.unmatched_dataset_types: 

400 successes += new_side.match_dataset_type_via_consumer_inputs( 

401 new_dataset_type_name, old_side, "<-" 

402 ) 

403 for old_dataset_type_name in old_side.unmatched_tasks: 

404 successes += old_side.match_dataset_type_via_consumer_inputs( 

405 old_dataset_type_name, new_side, "->" 

406 ) 

407 n_iterations += 1 

408 _LOG.info( 

409 f"{len(new_side.map_tasks)} tasks, {len(new_side.map_dataset_types)} dataset types " 

410 f"after iteration {n_iterations}." 

411 ) 

412 result = Correspondence() 

413 result.tasks_new_to_old.update(new_side.map_tasks.items()) 

414 result.dataset_types_new_to_old.update(new_side.map_dataset_types.items()) 

415 result.unmappable_new_tasks.update( 

416 { 

417 label: self.unmappable_new_tasks.get(label, "") 

418 for label in new_side.unmappable_tasks 

419 } 

420 ) 

421 result.unmappable_old_tasks.update( 

422 { 

423 label: self.unmappable_old_tasks.get(label, "") 

424 for label in old_side.unmappable_tasks 

425 } 

426 ) 

427 result.unmappable_new_dataset_types.update( 

428 { 

429 name: self.unmappable_new_dataset_types.get(name, "") 

430 for name in new_side.unmappable_dataset_types 

431 } 

432 ) 

433 result.unmappable_old_dataset_types.update( 

434 { 

435 name: self.unmappable_old_dataset_types.get(name, "") 

436 for name in old_side.unmappable_dataset_types 

437 } 

438 ) 

439 result.config_ignores = { 

440 new_label: ignore_lines.copy() 

441 for new_label, ignore_lines in self.config_ignores.items() 

442 } 

443 return result 

444 

445 def diff_task_configs(self, new: TaskNode, old: TaskNode) -> list[str]: 

446 # We'll do a diff of the config strings (in config-override-file form), 

447 # but excise all of the config.connections lines that we know will have 

448 # differences, as well as all of the comments and blank lines. 

449 ignore_prefixes = ["connections.", "idGenerator.release_id", "id_generator.release_id"] 

450 ignore_prefixes.extend(self.config_ignores.get(new.label, [])) 

451 messages: list[str] = [] 

452 

453 def output(msg: str) -> None: 

454 if not any(msg.startswith(prefix) for prefix in ignore_prefixes): 

455 messages.append(msg) 

456 

457 compareConfigs(new.label, new.config, old.config, output=output) 

458 return messages 

459 

460 def write_task_csv( 

461 self, filename: str, new: PipelineGraph, old: PipelineGraph 

462 ) -> None: 

463 """Write the mapping between tasks to a CSV file. 

464 

465 Parameters 

466 ---------- 

467 filename : `str` 

468 Name of the file. 

469 new : `lsst.pipe.base.PipelineGraph` 

470 New pipeline graph. 

471 old : `lsst.pipe.base.PipelineGraph` 

472 Old pipeline graph. 

473 """ 

474 with open(filename, "w") as stream: 

475 writer = csv.writer(stream, delimiter=";") 

476 n = 0 

477 for new_label, new_task_node in new.tasks.items(): 

478 step = new.get_task_step(new_label) 

479 old_label = self.tasks_new_to_old.get(new_label, "") 

480 writer.writerow( 

481 [ 

482 n, 

483 step, 

484 new_label, 

485 old_label, 

486 new_task_node.task_class_name, 

487 ", ".join(new_task_node.dimensions.required), 

488 ] 

489 ) 

490 n += 1 

491 for old_label, old_task_node in old.tasks.items(): 

492 if old_label in self.unmappable_old_tasks: 

493 writer.writerow( 

494 [ 

495 n, 

496 "", 

497 "", 

498 old_label, 

499 old_task_node.task_class_name, 

500 ", ".join(old_task_node.dimensions.required), 

501 ] 

502 ) 

503 n += 1 

504 

505 def write_dataset_type_csv( 

506 self, filename: str, new: PipelineGraph, old: PipelineGraph 

507 ) -> None: 

508 """Write the mapping between dataset types to a CSV file. 

509 

510 Parameters 

511 ---------- 

512 filename : `str` 

513 Name of the file. 

514 new : `lsst.pipe.base.PipelineGraph` 

515 New pipeline graph. 

516 old : `lsst.pipe.base.PipelineGraph` 

517 Old pipeline graph. 

518 """ 

519 with open(filename, "w") as stream: 

520 writer = csv.writer(stream, delimiter=";") 

521 n = 0 

522 for new_name in without_automatic_connections(new.dataset_types): 

523 new_dataset_type_node = new.dataset_types[new_name] 

524 old_name = self.dataset_types_new_to_old.get(new_name, "") 

525 writer.writerow( 

526 [ 

527 n, 

528 new_name, 

529 old_name, 

530 new_dataset_type_node.storage_class_name, 

531 ", ".join(new_dataset_type_node.dimensions.required), 

532 ] 

533 ) 

534 n += 1 

535 for old_name in without_automatic_connections(old.dataset_types): 

536 if old_name in self.unmappable_old_dataset_types: 

537 old_dataset_type_node = old.dataset_types[old_name] 

538 writer.writerow( 

539 [ 

540 n, 

541 "", 

542 old_name, 

543 old_dataset_type_node.storage_class_name, 

544 ", ".join(old_dataset_type_node.dimensions.required), 

545 ] 

546 ) 

547 n += 1 

548 

549 

550@dataclasses.dataclass 

551class _CorrespondenceFinderSide: 

552 """An alternate data structure for pipeline-pipeline mapping, used in 

553 `Correspondence.find_matches`. 

554 

555 One instance of this class is expected to be paired with another 

556 representing the reverse mapping. 

557 """ 

558 

559 pipeline_graph: PipelineGraph 

560 """Pipeline graph this side maps from.""" 

561 

562 map_tasks: dict[str, str] = dataclasses.field(default_factory=dict) 

563 """Mapping of tasks, from this side to the other.""" 

564 

565 map_dataset_types: dict[str, str] = dataclasses.field(default_factory=dict) 

566 """Mapping of dataset types, from this side to the other.""" 

567 

568 unmappable_tasks: set[str] = dataclasses.field(default_factory=set) 

569 """Tasks on this side that cannot be mapped.""" 

570 

571 unmappable_dataset_types: set[str] = dataclasses.field(default_factory=set) 

572 """Dataset types on this side that cannot be mapped.""" 

573 

574 @property 

575 def unmatched_tasks(self) -> set[str]: 

576 """Tasks on this side that could be mapped but have not yet been.""" 

577 result = set(self.pipeline_graph.tasks.keys()) 

578 result.difference_update(self.map_tasks.keys()) 

579 result.difference_update(self.unmappable_tasks) 

580 return result 

581 

582 @property 

583 def unmatched_dataset_types(self) -> set[str]: 

584 """Dataset types on this side that could be mapped but have not yet 

585 been. 

586 """ 

587 result = set( 

588 without_automatic_connections(self.pipeline_graph.dataset_types.keys()) 

589 ) 

590 result.difference_update(self.map_dataset_types.keys()) 

591 result.difference_update(self.unmappable_dataset_types) 

592 return result 

593 

594 def relate_tasks( 

595 self, label: str, other_label: str, other: _CorrespondenceFinderSide 

596 ) -> None: 

597 """Add a mapping between the given task labels.""" 

598 self.map_tasks[label] = other_label 

599 other.map_tasks[other_label] = label 

600 

601 def relate_dataset_types( 

602 self, name: str, other_name: str, other: _CorrespondenceFinderSide 

603 ) -> None: 

604 """Add a mapping between the given dataset type names.""" 

605 self.map_dataset_types[name] = other_name 

606 other.map_dataset_types[other_name] = name 

607 

608 def task_nodes_match( 

609 self, label: str, other_label: str, other: _CorrespondenceFinderSide 

610 ) -> bool: 

611 """Test whether two tasks can be matched. 

612 

613 This checks whether the tasks are still available to be matched and 

614 whether they have the same task class and dimensions. 

615 """ 

616 if label in self.map_tasks: 

617 return False 

618 if label in self.unmappable_tasks: 

619 return False 

620 if other_label in other.map_tasks: 

621 return False 

622 if other_label in other.unmappable_tasks: 

623 return False 

624 new_node = self.pipeline_graph.tasks[label] 

625 old_node = other.pipeline_graph.tasks[other_label] 

626 return ( 

627 new_node.dimensions == old_node.dimensions 

628 and new_node.task_class_name == old_node.task_class_name 

629 ) 

630 

631 def dataset_type_nodes_match( 

632 self, name: str, other_name: str, other: _CorrespondenceFinderSide 

633 ) -> bool: 

634 """Test whether two dataset types can be matched. 

635 

636 This checks whether the dataset types are still available to be matched 

637 and whether they have the same storage class and dimensions. 

638 """ 

639 if name in self.map_dataset_types: 

640 return False 

641 if name in self.unmappable_dataset_types: 

642 return False 

643 if other_name in other.map_dataset_types: 

644 return False 

645 if other_name in other.unmappable_dataset_types: 

646 return False 

647 new_node = self.pipeline_graph.dataset_types[name] 

648 old_node = other.pipeline_graph.dataset_types[other_name] 

649 return ( 

650 new_node.dimensions == old_node.dimensions 

651 and new_node.storage_class_name == old_node.storage_class_name 

652 ) 

653 

654 def match_task_via_output_producers( 

655 self, label: str, other: _CorrespondenceFinderSide, direction: str 

656 ) -> bool: 

657 """Look for a match for the given task by inspecting the mappings of 

658 its outputs' producers. 

659 """ 

660 my_outputs = ( 

661 self.pipeline_graph.outputs_of(label).keys() - self.unmappable_tasks 

662 ) 

663 other_outputs = { 

664 other_output 

665 for my_output in my_outputs 

666 if (other_output := self.map_dataset_types.get(my_output)) is not None 

667 } 

668 other_output_producers = { 

669 other_producer_node.label 

670 for other_output in other_outputs 

671 if (other_producer_node := other.pipeline_graph.producer_of(other_output)) 

672 is not None 

673 and self.task_nodes_match(label, other_producer_node.label, other) 

674 } 

675 other_output_producers -= other.unmappable_tasks 

676 if len(other_output_producers) == 1: 

677 other_label = other_output_producers.pop() 

678 _LOG.debug( 

679 f"Successful output producer match {label} {direction} {other_label}." 

680 ) 

681 self.relate_tasks(label, other_label, other) 

682 return True 

683 else: 

684 _LOG.info( 

685 f"No unique output producer match for {label} {direction}: {other_output_producers}." 

686 ) 

687 return False 

688 

689 def match_task_via_input_consumers( 

690 self, label: str, other: _CorrespondenceFinderSide, direction: str 

691 ) -> bool: 

692 """Look for a match for the given task by inspecting the mappings of 

693 its inputs' consumers. 

694 """ 

695 my_inputs = ( 

696 self.pipeline_graph.inputs_of(label).keys() - self.unmappable_dataset_types 

697 ) 

698 other_inputs = { 

699 other_input 

700 for my_input in my_inputs 

701 if (other_input := self.map_dataset_types.get(my_input)) is not None 

702 } 

703 other_input_consumers = { 

704 other_input_consumer_node.label 

705 for other_input in other_inputs 

706 for other_input_consumer_node in other.pipeline_graph.consumers_of( 

707 other_input 

708 ) 

709 if self.task_nodes_match(label, other_input_consumer_node.label, other) 

710 } 

711 other_input_consumers -= other.unmappable_tasks 

712 if len(other_input_consumers) == 1: 

713 other_label = other_input_consumers.pop() 

714 _LOG.debug( 

715 f"Successful input consumer match {label} {direction} {other_label}." 

716 ) 

717 self.relate_tasks(label, other_label, other) 

718 return True 

719 else: 

720 _LOG.info( 

721 f"No unique input consumer match for {label} {direction}: {other_input_consumers}." 

722 ) 

723 return False 

724 

725 def match_dataset_type_via_producer_outputs( 

726 self, 

727 name: str, 

728 other: _CorrespondenceFinderSide, 

729 direction: str, 

730 ) -> bool: 

731 """Look for a match for the given dataset type by inspecting the 

732 mappings of its producer's outputs. 

733 """ 

734 if (my_producer_node := self.pipeline_graph.producer_of(name)) is None: 

735 _LOG.info( 

736 f"No producer output match for {name} {direction}; it is an overall input." 

737 ) 

738 return False 

739 if my_producer_node.label in self.unmappable_tasks: 

740 _LOG.debug( 

741 f"No producer output match for {name} {direction}; its producer is unmappable." 

742 ) 

743 self.unmappable_dataset_types.add(name) 

744 return True 

745 if (other_producer := self.map_tasks.get(my_producer_node.label)) is None: 

746 _LOG.info( 

747 f"No producer output match for {name} {direction}; its producer is not mapped yet." 

748 ) 

749 return False 

750 other_producer_outputs = { 

751 other_producer_output 

752 for other_producer_output in other.pipeline_graph.outputs_of( 

753 other_producer, 

754 init=(my_producer_node.key.node_type is NodeType.TASK_INIT), 

755 ) 

756 if self.dataset_type_nodes_match(name, other_producer_output, other) 

757 } 

758 other_producer_outputs -= other.unmappable_dataset_types 

759 if len(other_producer_outputs) == 1: 

760 other_name = other_producer_outputs.pop() 

761 _LOG.debug(f"Unique producer output match {name} {direction} {other_name}.") 

762 self.relate_dataset_types(name, other_name, other) 

763 return True 

764 elif len(other_producer_outputs) > 1: 

765 common_suffix_scores = [ 

766 (self.compute_common_suffix_length(name, other_name), other_name) 

767 for other_name in other_producer_outputs 

768 ] 

769 common_suffix_scores.sort(reverse=True) 

770 best_score, other_name = common_suffix_scores[0] 

771 next_best_score, _ = common_suffix_scores[1] 

772 if best_score > next_best_score: # no tie for first place 

773 _LOG.debug( 

774 f"Scored producer output match {name} {direction} {other_name}." 

775 ) 

776 self.relate_dataset_types(name, other_name, other) 

777 return True 

778 _LOG.info( 

779 f"No unique producer output match for {name} {direction}: {common_suffix_scores}." 

780 ) 

781 else: 

782 _LOG.info(f"No producer output matches for {name} {direction}.") 

783 return False 

784 

785 def match_dataset_type_via_consumer_inputs( 

786 self, 

787 name: str, 

788 other: _CorrespondenceFinderSide, 

789 direction: str, 

790 ) -> bool: 

791 """Look for a match for the given dataset type by inspecting the 

792 mappings of its consumers' inputs. 

793 """ 

794 my_consumers = { 

795 my_consumer_node.label 

796 for my_consumer_node in self.pipeline_graph.consumers_of(name) 

797 } 

798 my_consumers -= self.unmappable_tasks 

799 other_consumers = { 

800 other_consumer 

801 for my_consumer in my_consumers 

802 if (other_consumer := self.map_tasks.get(my_consumer)) is not None 

803 } 

804 other_consumer_inputs = { 

805 other_consumer_input 

806 for other_consumer in other_consumers 

807 for other_consumer_input in other.pipeline_graph.inputs_of(other_consumer) 

808 if self.dataset_type_nodes_match(name, other_consumer_input, other) 

809 } 

810 other_consumer_inputs -= other.unmappable_dataset_types 

811 if len(other_consumer_inputs) == 1: 

812 other_name = other_consumer_inputs.pop() 

813 _LOG.info( 

814 f"Successful consumer input match {name} {direction} {other_name}." 

815 ) 

816 self.relate_dataset_types(name, other_name, other) 

817 return True 

818 else: 

819 _LOG.info( 

820 f"No unique consumer input match for {name} {direction}: {other_consumer_inputs}." 

821 ) 

822 return False 

823 

824 @staticmethod 

825 def compute_common_suffix_length(name1: str, name2: str) -> int: 

826 """Count the number of consecutive characters the two strings have in 

827 common, starting from their ends. 

828 """ 

829 rev1 = name1[::-1] 

830 rev2 = name2[::-1] 

831 return len(os.path.commonprefix([rev1, rev2])) 

832 

833 

834def _main(): 

835 import argparse 

836 from lsst.pipe.base import Pipeline 

837 

838 parser = argparse.ArgumentParser("Create CSV files from a pipeline correspondence.") 

839 parser.add_argument("new", help="New pipeline YAML filename.") 

840 parser.add_argument("old", help="Old pipeline YAML filename.") 

841 parser.add_argument("correspondence", help="Correspondence JSON file.") 

842 parser.add_argument("--tasks", default="tasks.csv", help="Filename for task CSV.") 

843 parser.add_argument( 

844 "--dataset-types", 

845 default="dataset-types.csv", 

846 help="Filename for dataset type CSV.", 

847 ) 

848 args = parser.parse_args() 

849 new = Pipeline.from_uri(args.new).to_graph(visualization_only=True) 

850 old = Pipeline.from_uri(args.old).to_graph(visualization_only=True) 

851 correspondence = Correspondence.read(args.correspondence) 

852 if args.tasks: 

853 correspondence.write_task_csv(args.tasks, new, old) 

854 if args.dataset_types: 

855 correspondence.write_dataset_type_csv(args.dataset_types, new, old) 

856 

857 

858if __name__ == "__main__": 858 ↛ 859line 858 didn't jump to line 859 because the condition on line 858 was never true

859 _main()