Coverage for tests / test_cliCmdQueryCollections.py: 24%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Unit tests for daf_butler CLI query-collections command.""" 

29 

30import unittest 

31 

32from astropy.table import Table 

33from numpy import array 

34 

35from lsst.daf.butler import Butler, CollectionType 

36from lsst.daf.butler.cli.butler import cli 

37from lsst.daf.butler.cli.cmd import query_collections 

38from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg 

39from lsst.daf.butler.script import queryCollections 

40from lsst.daf.butler.tests import CliCmdTestBase, DatastoreMock 

41from lsst.daf.butler.tests.utils import ButlerTestHelper, readTable 

42 

43 

44class QueryCollectionsCmdTest(CliCmdTestBase, unittest.TestCase): 

45 """Test the query-collections command-line.""" 

46 

47 mockFuncName = "lsst.daf.butler.cli.cmd.commands.script.queryCollections" 

48 

49 @staticmethod 

50 def defaultExpected(): 

51 return dict( 

52 repo=None, collection_type=tuple(CollectionType.__members__.values()), chains="TABLE", glob=() 

53 ) 

54 

55 @staticmethod 

56 def command(): 

57 return query_collections 

58 

59 def test_minimal(self): 

60 """Test only required parameters, and omit optional parameters.""" 

61 self.run_test( 

62 ["query-collections", "here", "--chains", "TABLE"], 

63 self.makeExpected( 

64 repo="here", 

65 show_dataset_types=False, 

66 exclude_dataset_types=("*_config", "*_log", "*_metadata", "packages"), 

67 ), 

68 ) 

69 

70 def test_all(self): 

71 """Test all parameters""" 

72 self.run_test( 

73 [ 

74 "query-collections", 

75 "here", 

76 "foo*", 

77 "--collection-type", 

78 "TAGGED", 

79 "--collection-type", 

80 "RUN", 

81 "--chains", 

82 "TABLE", 

83 ], 

84 self.makeExpected( 

85 repo="here", 

86 glob=("foo*",), 

87 collection_type=(CollectionType.TAGGED, CollectionType.RUN), 

88 chains="TABLE", 

89 show_dataset_types=False, 

90 exclude_dataset_types=("*_config", "*_log", "*_metadata", "packages"), 

91 ), 

92 ) 

93 

94 def test_show_dataset_types(self): 

95 """Test the --show-dataset-types option.""" 

96 self.run_test( 

97 ["query-collections", "here", "--chains", "TABLE", "--show-dataset-types"], 

98 self.makeExpected( 

99 repo="here", 

100 chains="TABLE", 

101 show_dataset_types=True, 

102 exclude_dataset_types=("*_config", "*_log", "*_metadata", "packages"), 

103 ), 

104 ) 

105 self.run_test( 

106 ["query-collections", "here", "--chains", "TREE", "--show-dataset-types"], 

107 self.makeExpected( 

108 repo="here", 

109 chains="TREE", 

110 show_dataset_types=True, 

111 exclude_dataset_types=("*_config", "*_log", "*_metadata", "packages"), 

112 ), 

113 ) 

114 self.run_test( 

115 ["query-collections", "here", "--chains", "FLATTEN", "--show-dataset-types"], 

116 self.makeExpected( 

117 repo="here", 

118 chains="FLATTEN", 

119 show_dataset_types=True, 

120 exclude_dataset_types=("*_config", "*_log", "*_metadata", "packages"), 

121 ), 

122 ) 

123 

124 def test_exclude_dataset_types(self): 

125 """Test the --exclude-dataset-types option.""" 

126 self.run_test( 

127 [ 

128 "query-collections", 

129 "here", 

130 "--chains", 

131 "TABLE", 

132 "--show-dataset-types", 

133 "--exclude-dataset-types", 

134 "flat", 

135 ], 

136 self.makeExpected( 

137 repo="here", 

138 show_dataset_types=True, 

139 exclude_dataset_types=("flat",), 

140 ), 

141 ) 

142 

143 

144class QueryCollectionsScriptTest(ButlerTestHelper, unittest.TestCase): 

145 """Test the query-collections script interface.""" 

146 

147 def setUp(self): 

148 self.runner = LogCliRunner() 

149 

150 def testGetCollections(self): 

151 run = "ingest/run" 

152 tag = "tag" 

153 with self.runner.isolated_filesystem(): 

154 butlerCfg = Butler.makeRepo("here") 

155 # the purpose of this call is to create some collections 

156 butler = Butler.from_config(butlerCfg, run=run, collections=[tag], writeable=True) 

157 self.enterContext(butler) 

158 butler.registry.registerCollection(tag, CollectionType.TAGGED) 

159 

160 # Verify collections that were created are found by 

161 # query-collections. 

162 result = self.runner.invoke(cli, ["query-collections", "here"]) 

163 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

164 expected = Table((("ingest/run", "tag"), ("RUN", "TAGGED")), names=("Name", "Type")) 

165 self.assertAstropyTablesEqual(readTable(result.output), expected) 

166 

167 # Verify that with a glob argument, that only collections whose 

168 # name matches with the specified pattern are returned. 

169 result = self.runner.invoke(cli, ["query-collections", "here", "t*"]) 

170 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

171 expected = Table((("tag",), ("TAGGED",)), names=("Name", "Type")) 

172 self.assertAstropyTablesEqual(readTable(result.output), expected) 

173 

174 # Verify that with a collection type argument, only collections of 

175 # that type are returned. 

176 result = self.runner.invoke(cli, ["query-collections", "here", "--collection-type", "RUN"]) 

177 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

178 expected = Table((("ingest/run",), ("RUN",)), names=("Name", "Type")) 

179 self.assertAstropyTablesEqual(readTable(result.output), expected) 

180 

181 

182class ChainedCollectionsTest(ButlerTestHelper, unittest.TestCase): 

183 """Test the collection-chain command-line interface.""" 

184 

185 def setUp(self): 

186 self.runner = LogCliRunner() 

187 

188 def assertChain(self, args: list[str], expected: str): 

189 """Run collection-chain and check the expected result""" 

190 result = self.runner.invoke(cli, ["collection-chain", "here", *args]) 

191 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

192 self.assertEqual(result.output.strip(), expected, clickResultMsg(result)) 

193 

194 def testChained(self): 

195 with self.runner.isolated_filesystem(): 

196 # Create a butler and add some chained collections: 

197 butlerCfg = Butler.makeRepo("here") 

198 

199 butler1 = Butler.from_config(butlerCfg, writeable=True) 

200 self.enterContext(butler1) 

201 

202 # Replace datastore functions with mocks: 

203 DatastoreMock.apply(butler1) 

204 

205 butler1.import_(filename="resource://lsst.daf.butler/tests/registry_data/base.yaml") 

206 butler1.import_(filename="resource://lsst.daf.butler/tests/registry_data/datasets.yaml") 

207 registry1 = butler1.registry 

208 registry1.registerRun("run1") 

209 registry1.registerCollection("tag1", CollectionType.TAGGED) 

210 registry1.registerCollection("calibration1", CollectionType.CALIBRATION) 

211 

212 # Create the collection chain 

213 self.assertChain(["chain2", "calibration1", "run1"], "[calibration1, run1]") 

214 self.assertChain( 

215 ["--mode", "redefine", "chain1", "tag1", "run1", "chain2"], "[tag1, run1, chain2]" 

216 ) 

217 

218 # Use the script function to test the query-collections TREE 

219 # option, because the astropy.table.Table.read method, which we are 

220 # using for verification elsewhere in this file, seems to strip 

221 # leading whitespace from columns. This makes it impossible to test 

222 # the nested TREE output of the query-collections subcommand from 

223 # the command line interface. 

224 table = queryCollections("here", glob=(), collection_type=CollectionType.all(), chains="TREE") 

225 

226 expected = Table( 

227 array( 

228 ( 

229 ("calibration1", "CALIBRATION"), 

230 ("chain1", "CHAINED"), 

231 (" tag1", "TAGGED"), 

232 (" run1", "RUN"), 

233 (" chain2", "CHAINED"), 

234 (" calibration1", "CALIBRATION"), 

235 (" run1", "RUN"), 

236 ("chain2", "CHAINED"), 

237 (" calibration1", "CALIBRATION"), 

238 (" run1", "RUN"), 

239 ("imported_g", "RUN"), 

240 ("imported_r", "RUN"), 

241 ("run1", "RUN"), 

242 ("tag1", "TAGGED"), 

243 ) 

244 ), 

245 names=("Name", "Type"), 

246 ) 

247 self.assertAstropyTablesEqual(table, expected) 

248 

249 # Test table with inverse == True 

250 table = queryCollections( 

251 "here", 

252 glob=(), 

253 collection_type=CollectionType.all(), 

254 chains="INVERSE-TREE", 

255 ) 

256 expected = Table( 

257 array( 

258 ( 

259 ("calibration1", "CALIBRATION"), 

260 (" chain2", "CHAINED"), 

261 (" chain1", "CHAINED"), 

262 ("chain1", "CHAINED"), 

263 ("chain2", "CHAINED"), 

264 (" chain1", "CHAINED"), 

265 ("imported_g", "RUN"), 

266 ("imported_r", "RUN"), 

267 ("run1", "RUN"), 

268 (" chain1", "CHAINED"), 

269 (" chain2", "CHAINED"), 

270 (" chain1", "CHAINED"), 

271 ("tag1", "TAGGED"), 

272 (" chain1", "CHAINED"), 

273 ) 

274 ), 

275 names=("Name", "Type"), 

276 ) 

277 self.assertAstropyTablesEqual(table, expected) 

278 

279 # Test table with show_dataset_types == True 

280 table = queryCollections( 

281 "here", 

282 glob=(), 

283 collection_type=CollectionType.all(), 

284 chains="TREE", 

285 show_dataset_types=True, 

286 ) 

287 expected = Table( 

288 array( 

289 ( 

290 ("calibration1", "CALIBRATION", ""), 

291 ("chain1", "CHAINED", ""), 

292 (" tag1", "TAGGED", ""), 

293 (" run1", "RUN", ""), 

294 (" chain2", "CHAINED", ""), 

295 (" calibration1", "CALIBRATION", ""), 

296 (" run1", "RUN", ""), 

297 ("chain2", "CHAINED", ""), 

298 (" calibration1", "CALIBRATION", ""), 

299 (" run1", "RUN", ""), 

300 ("imported_g", "RUN", "bias"), 

301 ("", "", "flat"), 

302 ("imported_r", "RUN", "bias"), 

303 ("", "", "flat"), 

304 ("run1", "RUN", ""), 

305 ("tag1", "TAGGED", ""), 

306 ) 

307 ), 

308 names=("Name", "Type", "Dataset Types"), 

309 ) 

310 self.assertAstropyTablesEqual(table, expected) 

311 

312 # Test table with show_dataset_types, excluding some dataset types 

313 table = queryCollections( 

314 "here", 

315 glob=(), 

316 collection_type=CollectionType.all(), 

317 chains="TREE", 

318 show_dataset_types=True, 

319 exclude_dataset_types=("bias",), 

320 ) 

321 expected = Table( 

322 array( 

323 ( 

324 ("calibration1", "CALIBRATION", ""), 

325 ("chain1", "CHAINED", ""), 

326 (" tag1", "TAGGED", ""), 

327 (" run1", "RUN", ""), 

328 (" chain2", "CHAINED", ""), 

329 (" calibration1", "CALIBRATION", ""), 

330 (" run1", "RUN", ""), 

331 ("chain2", "CHAINED", ""), 

332 (" calibration1", "CALIBRATION", ""), 

333 (" run1", "RUN", ""), 

334 ("imported_g", "RUN", "flat"), 

335 ("imported_r", "RUN", "flat"), 

336 ("run1", "RUN", ""), 

337 ("tag1", "TAGGED", ""), 

338 ) 

339 ), 

340 names=("Name", "Type", "Dataset Types"), 

341 ) 

342 self.assertAstropyTablesEqual(table, expected) 

343 

344 result = self.runner.invoke(cli, ["query-collections", "here", "--chains", "TABLE"]) 

345 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

346 expected = Table( 

347 array( 

348 ( 

349 ("calibration1", "CALIBRATION", ""), 

350 ("chain1", "CHAINED", "tag1"), 

351 ("", "", "run1"), 

352 ("", "", "chain2"), 

353 ("chain2", "CHAINED", "calibration1"), 

354 ("", "", "run1"), 

355 ("imported_g", "RUN", ""), 

356 ("imported_r", "RUN", ""), 

357 ("run1", "RUN", ""), 

358 ("tag1", "TAGGED", ""), 

359 ) 

360 ), 

361 names=("Name", "Type", "Children"), 

362 ) 

363 table = readTable(result.output) 

364 self.assertAstropyTablesEqual(readTable(result.output), expected) 

365 

366 result = self.runner.invoke(cli, ["query-collections", "here", "--chains", "INVERSE-TABLE"]) 

367 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

368 expected = Table( 

369 array( 

370 ( 

371 ("calibration1", "CALIBRATION", "chain2"), 

372 ("chain1", "CHAINED", ""), 

373 ("chain2", "CHAINED", "chain1"), 

374 ("imported_g", "RUN", ""), 

375 ("imported_r", "RUN", ""), 

376 ("run1", "RUN", "chain1"), 

377 ("", "", "chain2"), 

378 ("tag1", "TAGGED", "chain1"), 

379 ) 

380 ), 

381 names=("Name", "Type", "Parents"), 

382 ) 

383 table = readTable(result.output) 

384 self.assertAstropyTablesEqual(readTable(result.output), expected) 

385 

386 result = self.runner.invoke(cli, ["query-collections", "here", "--chains", "FLATTEN"]) 

387 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

388 expected = Table( 

389 array( 

390 ( 

391 ("calibration1", "CALIBRATION"), 

392 ("imported_g", "RUN"), 

393 ("imported_r", "RUN"), 

394 ("run1", "RUN"), 

395 ("tag1", "TAGGED"), 

396 ) 

397 ), 

398 names=("Name", "Type"), 

399 ) 

400 self.assertAstropyTablesEqual(readTable(result.output), expected, unorderedRows=True) 

401 

402 result = self.runner.invoke(cli, ["query-collections", "here", "--chains", "NO-CHILDREN"]) 

403 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

404 expected = Table( 

405 array( 

406 ( 

407 ("calibration1", "CALIBRATION"), 

408 ("chain1", "CHAINED"), 

409 ("chain2", "CHAINED"), 

410 ("imported_g", "RUN"), 

411 ("imported_r", "RUN"), 

412 ("run1", "RUN"), 

413 ("tag1", "TAGGED"), 

414 ) 

415 ), 

416 names=("Name", "Type"), 

417 ) 

418 self.assertAstropyTablesEqual(readTable(result.output), expected, unorderedRows=True) 

419 

420 # Add a couple more run collections for chain testing 

421 registry1.registerRun("run2") 

422 registry1.registerRun("run3") 

423 registry1.registerRun("run4") 

424 

425 self.assertChain(["--mode", "pop", "chain1"], "[run1, chain2]") 

426 

427 self.assertChain(["--mode", "extend", "chain1", "run2", "run3"], "[run1, chain2, run2, run3]") 

428 

429 self.assertChain(["--mode", "remove", "chain1", "chain2", "run2"], "[run1, run3]") 

430 

431 self.assertChain(["--mode", "prepend", "chain1", "chain2", "run2"], "[chain2, run2, run1, run3]") 

432 

433 self.assertChain(["--mode", "pop", "chain1", "1", "3"], "[chain2, run1]") 

434 

435 self.assertChain( 

436 ["--mode", "redefine", "chain1", "chain2", "run2", "run3,run4", "--flatten"], 

437 "[calibration1, run1, run2, run3, run4]", 

438 ) 

439 

440 self.assertChain(["--mode", "pop", "chain1", "--", "-1", "-3"], "[calibration1, run1, run3]") 

441 

442 # Out-of-bounds index 

443 result = self.runner.invoke(cli, ["collection-chain", "here", "--mode", "pop", "chain1", "10"]) 

444 self.assertEqual(result.exit_code, 1) 

445 

446 

447if __name__ == "__main__": 

448 unittest.main()