Coverage for python / lsst / summit / extras / imageSorter.py: 14%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 08:54 +0000

1# This file is part of summit_extras. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import pickle 

24import re 

25from os import system 

26 

27import matplotlib.pyplot as plt 

28from PIL import Image 

29 

30TAGS = """ 

31 - (Blank/no annotation) - nominally good, i.e. nothing notable in the image 

32Q - bad main star location (denoted by cross-hair on image sorter) 

33F - Obviously very poor focus (worse than just seeing, does NOT include donuts) 

34D - Donut image 

35O - Occlusion (dome or door) 

36V - No back bias suspected 

37P - Non-standard PSF (rotator/mount issues/tracking error, etc) 

38S - Satellite or plane crossing image 

39! - Something interesting/crazy - see notes on image 

40""" 

41 

42INSTRUCTIONS = ( 

43 TAGS 

44 + "\n" 

45 + """ 

46 = - apply the same annotations as the previous image 

47 To enter no tags but some notes, just start with a space 

48 """ 

49) 

50 

51 

52class ImageSorter: 

53 """Interactively tag and annotate a list of PNG images. 

54 

55 Intended to be used on images produced by 

56 `lsst.summit.extras.animation.Animator`. The user is shown each image 

57 in turn and types tag characters and/or notes; the results are 

58 written to a pickle file that can be reloaded with 

59 `loadAnnotations`. 

60 

61 Parameters 

62 ---------- 

63 fileList : `list` [`str`] 

64 List of paths to PNG images to sort, in display order. 

65 outputFilename : `str` 

66 Path to the pickle file in which annotations are persisted. The 

67 file is rewritten after every image so partial progress survives 

68 a crash. 

69 """ 

70 

71 def __init__(self, fileList: list[str], outputFilename: str): 

72 self.fileList = fileList 

73 self.outputFilename = outputFilename 

74 

75 @staticmethod 

76 def _getDataIdFromFilename(filename: str) -> tuple[str, int]: 

77 """Extract the dataId from an animator PNG filename. 

78 

79 Parameters 

80 ---------- 

81 filename : `str` 

82 Path to a file whose basename is of the form 

83 ``YYYY-MM-DD-<seqNum>-<product>.png``. 

84 

85 Returns 

86 ------- 

87 dataId : `tuple` [`str`, `int`] 

88 The ``(dayObs, seqNum)`` dataId extracted from the filename. 

89 

90 Raises 

91 ------ 

92 RuntimeError 

93 Raised if the filename does not match the expected pattern. 

94 """ 

95 # filename of the form 2021-02-18-705-quickLookExp.png 

96 filename = os.path.basename(filename) 

97 mat = re.match(r"^(\d{4}-\d{2}-\d{2})-(\d*)-.*$", filename) 

98 if not mat: 

99 raise RuntimeError(f"Failed to extract dayObs/seqNum from {filename}") 

100 dayObs = mat.group(1) # type: str 

101 seqNum = int(mat.group(2)) # type: int 

102 return (dayObs, seqNum) 

103 

104 def getPreviousAnnotation(self, info: dict[tuple[str, int], str], imNum: int) -> str: 

105 """Return the annotation for the image displayed before ``imNum``. 

106 

107 Parameters 

108 ---------- 

109 info : `dict` [`tuple` [`str`, `int`], `str`] 

110 The annotation dictionary keyed by dataId. 

111 imNum : `int` 

112 Index of the current image in ``self.fileList``. Must be 

113 greater than zero. 

114 

115 Returns 

116 ------- 

117 annotation : `str` 

118 The annotation string from the previous image. 

119 

120 Raises 

121 ------ 

122 RuntimeError 

123 Raised if ``imNum`` is zero, since there is no previous 

124 image. 

125 """ 

126 if imNum == 0: 

127 raise RuntimeError("There is no previous annotation for the first image.") 

128 

129 previousFilename = self.fileList[imNum - 1] 

130 previousDataId = self._getDataIdFromFilename(previousFilename) 

131 previousAnnotation = info[previousDataId] 

132 return previousAnnotation 

133 

134 def addData( 

135 self, dataId: tuple[str, int], info: dict[tuple[str, int], str], answer: str, mode: str, imNum: int 

136 ) -> None: 

137 """Record the user's answer for a dataId into the info dict. 

138 

139 Parameters 

140 ---------- 

141 dataId : `tuple` [`str`, `int`] 

142 The ``(dayObs, seqNum)`` dataId being annotated. 

143 info : `dict` [`tuple` [`str`, `int`], `str`] 

144 The annotation dictionary to update in place. 

145 answer : `str` 

146 The user-typed annotation. If it contains ``=``, the 

147 previous image's annotation is substituted in. 

148 mode : `str` 

149 One of ``"O"`` (overwrite existing entries), ``"A"`` 

150 (append), or ``"B"`` (append, acting only on blank 

151 entries). ``"S"`` (skip) is handled upstream and not passed 

152 here. 

153 imNum : `int` 

154 Index of the current image in ``self.fileList``. 

155 

156 Raises 

157 ------ 

158 RuntimeError 

159 Raised if ``mode`` is not one of the recognized values. 

160 """ 

161 if "=" in answer: 

162 answer = self.getPreviousAnnotation(info, imNum) 

163 

164 if dataId not in info: 

165 info[dataId] = answer 

166 return 

167 

168 if mode == "O": 

169 info[dataId] = answer 

170 elif mode in ["B", "A"]: 

171 oldAnswer = info[dataId] 

172 answer = "".join([oldAnswer, answer]) 

173 info[dataId] = answer 

174 else: 

175 raise RuntimeError(f"Unrecognised mode {mode} - should be impossible") 

176 return 

177 

178 @classmethod 

179 def loadAnnotations(cls, pickleFilename: str) -> tuple[dict, dict]: 

180 """Load an annotations pickle and split it into tags and notes. 

181 

182 Anything after the first space in each raw annotation is treated 

183 as a free-form note; everything before the space is treated as 

184 the tag string (upper-cased). If the annotation starts with a 

185 space, only a note is recorded and the tag is empty. 

186 

187 Parameters 

188 ---------- 

189 pickleFilename : `str` 

190 Path to the pickle file written by `sortImages`. 

191 

192 Returns 

193 ------- 

194 tags : `dict` [`tuple` [`str`, `int`], `str`] 

195 Mapping from dataId to uppercase tag string. 

196 notes : `dict` [`tuple` [`str`, `int`], `str`] 

197 Mapping from dataId to note string. Only dataIds that have 

198 notes appear as keys. 

199 

200 Examples 

201 -------- 

202 >>> from lsst.summit.extras import ImageSorter 

203 >>> tags, notes = ImageSorter.loadAnnotations(pickleFilename) 

204 """ 

205 loaded = cls._load(pickleFilename) 

206 

207 tags, notes = {}, {} 

208 

209 for dataId, answerFull in loaded.items(): 

210 answer = answerFull.lower() 

211 if answerFull.startswith(" "): # notes only case 

212 tags[dataId] = "" 

213 notes[dataId] = answerFull.strip() 

214 continue 

215 

216 if " " in answer: 

217 answer = answerFull.split()[0] 

218 notes[dataId] = " ".join([_ for _ in answerFull.split()[1:]]) 

219 tags[dataId] = answer.upper() 

220 

221 return tags, notes 

222 

223 @staticmethod 

224 def _load(filename: str) -> dict: 

225 """Load the raw annotation pickle. 

226 

227 This returns the unprocessed ``{dataId: rawAnswer}`` dict and is 

228 intended for internal use. End users should call 

229 `loadAnnotations` instead, which splits tags from notes. 

230 

231 Parameters 

232 ---------- 

233 filename : `str` 

234 Path to the pickle file. 

235 

236 Returns 

237 ------- 

238 info : `dict` 

239 Raw annotation dictionary as written to disk. 

240 """ 

241 with open(filename, "rb") as pickleFile: 

242 info = pickle.load(pickleFile) 

243 return info 

244 

245 @staticmethod 

246 def _save(info: dict, filename: str) -> None: 

247 """Write the annotation dict to disk as a pickle. 

248 

249 Parameters 

250 ---------- 

251 info : `dict` 

252 Annotation dictionary to save. 

253 filename : `str` 

254 Path to the pickle file. 

255 """ 

256 with open(filename, "wb") as dumpFile: 

257 pickle.dump(info, dumpFile) 

258 

259 def sortImages(self) -> dict | None: 

260 """Display the image list and collect user annotations. 

261 

262 Runs an interactive loop: for each image the user is prompted 

263 for a tag/notes string, which is added to the annotation dict 

264 (respecting the mode chosen at startup) and the dict is 

265 re-pickled to disk. If an output file already exists, the user 

266 is first asked whether to append, overwrite, skip, or display. 

267 

268 Returns 

269 ------- 

270 info : `dict` or `None` 

271 The final annotation dictionary. Returns `None` if the user 

272 entered an unrecognized mode at the prompt (which causes a 

273 recursive restart). 

274 """ 

275 mode = "A" 

276 info = {} 

277 if os.path.exists(self.outputFilename): 

278 info = self._load(self.outputFilename) 

279 

280 print(f"Output file {self.outputFilename} exists with info on {len(info)} files:") 

281 print("Press A - view all images, appending info to existing entries") 

282 print("Press O - view all images, overwriting existing entries") 

283 print("Press S - skip all images with existing annotations, including blank annotations") 

284 print("Press B - skip all images with annotations that are not blank") 

285 print("Press D - just display existing data and exit") 

286 print("Press Q to quit") 

287 mode = input() 

288 mode = mode[0].upper() 

289 

290 if mode == "Q": 

291 exit() 

292 elif mode == "D": 

293 for dataId, value in info.items(): 

294 print(f"{dataId[0]} - {dataId[1]}: {value}") 

295 exit() 

296 elif mode in "AOSB": 

297 pass 

298 else: 

299 print("Unrecognised response - try again") 

300 self.sortImages() 

301 return None # don't run twice in this case! 

302 

303 # need to write file first, even if empty, because _load and _save 

304 # are inside the loop to ensure that annotations aren't lost even on 

305 # full crash 

306 print(INSTRUCTIONS) 

307 self._save(info, self.outputFilename) 

308 

309 plt.figure(figsize=(10, 10)) 

310 for imNum, filename in enumerate(self.fileList): 

311 info = self._load(self.outputFilename) 

312 

313 dataId = self._getDataIdFromFilename(filename) 

314 if dataId in info and mode in ["S", "B"]: # always skip if found for S and if not blank for B 

315 if (mode == "S") or (mode == "B" and info[dataId] != ""): 

316 continue 

317 

318 with Image.open(filename) as pilImage: 

319 pilImage = Image.open(filename) 

320 width, height = pilImage.size 

321 cropLR, cropUD = 100 - 50, 180 - 50 

322 cropped = pilImage.crop((cropLR, cropUD, width - cropLR, height - cropUD)) 

323 plt.clf() 

324 plt.imshow(cropped, interpolation="bicubic") 

325 plt.show(block=False) 

326 plt.draw() # without this you get the same image each time 

327 plt.tight_layout() 

328 osascriptCall = """/usr/bin/osascript -e 'tell app "Finder" to """ 

329 osascriptCall += """set frontmost of process "Terminal" to true' """ 

330 system(osascriptCall) 

331 

332 oldAnswer = None # just so we can display existing info with the dataId 

333 if dataId in info: 

334 oldAnswer = info[dataId] 

335 inputStr = f"{dataId[0]} - {dataId[1]}: %s" % ("" if oldAnswer is None else oldAnswer) 

336 answer = input(inputStr) 

337 if "exit" in answer: 

338 break # break don't exit so data is written! 

339 

340 self.addData(dataId, info, answer, mode, imNum) 

341 self._save(info, self.outputFilename) 

342 

343 print(f"Info written to {self.outputFilename}") 

344 

345 return info 

346 

347 

348if __name__ == "__main__": 348 ↛ 350line 348 didn't jump to line 350 because the condition on line 348 was never true

349 # TODO: DM-34239 Remove this 

350 fileList = [ 

351 "/Users/merlin/rsync/animatorOutput/pngs/dayObs-2020-02-17-seqNum-232-calexp.png", 

352 "/Users/merlin/rsync/animatorOutput/pngs/dayObs-2020-02-17-seqNum-233-calexp.png", 

353 "/Users/merlin/rsync/animatorOutput/pngs/dayObs-2020-02-17-seqNum-234-calexp.png", 

354 "/Users/merlin/rsync/animatorOutput/pngs/dayObs-2020-02-17-seqNum-235-calexp.png", 

355 ] 

356 

357 sorter = ImageSorter(fileList, "/Users/merlin/scratchfile.txt") 

358 sorter.sortImages()