Coverage for python / lsst / display / ds9 / ds9.py: 20%

262 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:27 +0000

1# This file is part of display_ds9. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["Ds9Error", "getXpaAccessPoint", "ds9Version", "Buffer", 

23 "selectFrame", "ds9Cmd", "initDS9", "Ds9Event", "DisplayImpl"] 

24 

25import os 

26import re 

27import shutil 

28import subprocess 

29import sys 

30import time 

31 

32import numpy as np 

33 

34import lsst.afw.display.interface as interface 

35import lsst.afw.display.virtualDevice as virtualDevice 

36import lsst.afw.display.ds9Regions as ds9Regions 

37 

38try: 

39 from . import xpa as xpa 

40except ImportError as e: 

41 print(f"Cannot import xpa: {e}", file=sys.stderr) 

42 

43import lsst.afw.display as afwDisplay 

44import lsst.afw.math as afwMath 

45 

46try: 

47 needShow 

48except NameError: 

49 needShow = True # Used to avoid a bug in ds9 5.4 

50 

51 

52class Ds9Error(IOError): 

53 """Represents an error communicating with DS9. 

54 """ 

55 

56 

57try: 

58 _maskTransparency 

59except NameError: 

60 _maskTransparency = None 

61 

62 

63def getXpaAccessPoint(): 

64 """Parse XPA_PORT if set and return an identifier to send DS9 commands. 

65 

66 Returns 

67 ------- 

68 

69 xpaAccessPoint : `str` 

70 Either a reference to the local host with the configured port, or the 

71 string ``"ds9"``. 

72 

73 Notes 

74 ----- 

75 If you don't have XPA_PORT set, the usual xpans tricks will be played 

76 when we return ``"ds9"``. 

77 """ 

78 xpa_port = os.environ.get("XPA_PORT") 

79 if xpa_port: 

80 mat = re.search(r"^DS9:ds9\s+(\d+)\s+(\d+)", xpa_port) 

81 if mat: 

82 port1, port2 = mat.groups() 

83 

84 return f"127.0.0.1:{port1}" 

85 else: 

86 print(f"Failed to parse XPA_PORT={xpa_port}", file=sys.stderr) 

87 

88 return "ds9" 

89 

90 

91def ds9Version(): 

92 """Get the version of DS9 in use. 

93 

94 Returns 

95 ------- 

96 version : `str` 

97 Version of DS9 in use. 

98 """ 

99 try: 

100 v = ds9Cmd("about", get=True) 

101 return v.splitlines()[1].split()[1] 

102 except Exception as e: 

103 print(f"Error reading version: {e}", file=sys.stderr) 

104 return "0.0.0" 

105 

106 

107try: 

108 cmdBuffer 

109except NameError: 

110 # internal buffersize in xpa. Sigh; esp. as the 100 is some needed slop 

111 XPA_SZ_LINE = 4096 - 100 

112 

113 class Buffer: 

114 """Buffer to control sending commands to DS9. 

115 

116 Notes 

117 ----- 

118 The usual usage pattern is: 

119 

120 >>> with ds9.Buffering(): 

121 ... # bunches of ds9.{dot,line} commands 

122 ... ds9.flush() 

123 ... # bunches more ds9.{dot,line} commands 

124 """ 

125 

126 def __init__(self, size=0): 

127 self._commands = "" # list of pending commands 

128 self._lenCommands = len(self._commands) 

129 self._bufsize = [] # stack of bufsizes 

130 

131 self._bufsize.append(size) # don't call self.size() as ds9Cmd isn't defined yet 

132 

133 def set(self, size, silent=True): 

134 """Set the ds9 buffer size to size. 

135 

136 Parameters 

137 ---------- 

138 size : `int` 

139 Size of buffer. Requesting a negative size provides the 

140 largest possible buffer given bugs in xpa. 

141 silent : `bool`, optional 

142 Do not print error messages (default `True`). 

143 """ 

144 if size < 0: 

145 size = XPA_SZ_LINE - 5 

146 

147 if size > XPA_SZ_LINE: 

148 print("xpa silently hardcodes a limit of %d for buffer sizes (you asked for %d) " % 

149 (XPA_SZ_LINE, size), file=sys.stderr) 

150 self.set(-1) # use max buffersize 

151 return 

152 

153 if self._bufsize: 

154 self._bufsize[-1] = size # change current value 

155 else: 

156 self._bufsize.append(size) # there is no current value; set one 

157 

158 self.flush(silent=silent) 

159 

160 def _getSize(self): 

161 """Get the current DS9 buffer size. 

162 

163 Returns 

164 ------- 

165 size : `int` 

166 Size of buffer. 

167 """ 

168 return self._bufsize[-1] 

169 

170 def pushSize(self, size=-1): 

171 """Replace current DS9 command buffer size. 

172 

173 Parameters 

174 ---------- 

175 size : `int`, optional 

176 Size of buffer. A negative value sets the largest possible 

177 buffer. 

178 

179 Notes 

180 ----- 

181 See also `popSize`. 

182 """ 

183 self.flush(silent=True) 

184 self._bufsize.append(0) 

185 self.set(size, silent=True) 

186 

187 def popSize(self): 

188 """Switch back to the previous command buffer size. 

189 

190 Notes 

191 ----- 

192 See also `pushSize`. 

193 """ 

194 self.flush(silent=True) 

195 

196 if len(self._bufsize) > 1: 

197 self._bufsize.pop() 

198 

199 def flush(self, silent=True): 

200 """Flush the pending commands. 

201 

202 Parameters 

203 ---------- 

204 silent : `bool`, optional 

205 Do not print error messages. 

206 """ 

207 ds9Cmd(flush=True, silent=silent) 

208 

209 cmdBuffer = Buffer(0) 

210 

211 

212def selectFrame(frame): 

213 """Convert integer frame number to DS9 command syntax. 

214 

215 Parameters 

216 ---------- 

217 frame : `int` 

218 Frame number 

219 

220 Returns 

221 ------- 

222 frameString : `str` 

223 """ 

224 return f"frame {frame}" 

225 

226 

227def ds9Cmd(cmd=None, trap=True, flush=False, silent=True, frame=None, get=False): 

228 """Issue a DS9 command, raising errors as appropriate. 

229 

230 Parameters 

231 ---------- 

232 cmd : `str`, optional 

233 Command to execute. 

234 trap : `bool`, optional 

235 Trap errors. 

236 flush : `bool`, optional 

237 Flush the output. 

238 silent : `bool`, optional 

239 Do not print trapped error messages. 

240 frame : `int`, optional 

241 Frame number on which to execute command. 

242 get : `bool`, optional 

243 Return xpa response. 

244 """ 

245 

246 if cmd: 

247 if frame is not None: 

248 cmd = f"{selectFrame(frame)};{cmd}" 

249 

250 if get: 

251 return xpa.get(None, getXpaAccessPoint(), cmd, "").strip() 

252 

253 # Work around xpa's habit of silently truncating long lines; the value 

254 # ``5`` provides some margin to handle new lines and the like. 

255 if cmdBuffer._lenCommands + len(cmd) > XPA_SZ_LINE - 5: 

256 ds9Cmd(flush=True, silent=silent) 

257 

258 cmdBuffer._commands += ";" + cmd 

259 cmdBuffer._lenCommands += 1 + len(cmd) 

260 

261 if flush or cmdBuffer._lenCommands >= cmdBuffer._getSize(): 

262 cmd = (cmdBuffer._commands + "\n") 

263 cmdBuffer._commands = "" 

264 cmdBuffer._lenCommands = 0 

265 else: 

266 return 

267 

268 cmd = cmd.rstrip() 

269 if not cmd: 

270 return 

271 

272 try: 

273 ret = xpa.set(None, getXpaAccessPoint(), cmd, "", "", 0) 

274 if ret: 

275 raise OSError(ret) 

276 except OSError as e: 

277 if not trap: 

278 raise Ds9Error(f"XPA: {e}, ({cmd})") 

279 elif not silent: 

280 print(f"Caught ds9 exception processing command \"{cmd}\": {e}", file=sys.stderr) 

281 

282 

283def initDS9(execDs9=True): 

284 """Initialize DS9. 

285 

286 Parameters 

287 ---------- 

288 execDs9 : `bool`, optional 

289 If DS9 is not running, attempt to execute it. 

290 """ 

291 try: 

292 xpa.reset() 

293 ds9Cmd("iconify no; raise", False) 

294 ds9Cmd("wcs wcsa", False) # include the pixel coordinates WCS (WCSA) 

295 

296 v0, v1 = ds9Version().split('.')[0:2] 

297 global needShow 

298 needShow = False 

299 try: 

300 if int(v0) == 5: 

301 needShow = (int(v1) <= 4) 

302 except Exception: 

303 pass 

304 except Ds9Error as e: 

305 if not re.search('xpa', os.environ['PATH']): 

306 raise Ds9Error('You need the xpa binaries in your path to use ds9 with python') 

307 

308 if not execDs9: 

309 raise Ds9Error 

310 

311 if not shutil.which("ds9"): 

312 raise NameError("ds9 doesn't appear to be on your path") 

313 if "DISPLAY" not in os.environ: 

314 raise RuntimeError("$DISPLAY isn't set, so I won't be able to start ds9 for you") 

315 

316 print(f"ds9 doesn't appear to be running ({e}), I'll try to exec it for you") 

317 

318 os.system('ds9 &') 

319 for i in range(10): 

320 try: 

321 ds9Cmd(selectFrame(1), False) 

322 break 

323 except Ds9Error: 

324 print("waiting for ds9...\r", end="") 

325 sys.stdout.flush() 

326 time.sleep(0.5) 

327 else: 

328 print(" \r", end="") 

329 break 

330 

331 sys.stdout.flush() 

332 

333 raise Ds9Error 

334 

335 

336class Ds9Event(interface.Event): 

337 """An event generated by a mouse or key click on the display. 

338 """ 

339 

340 def __init__(self, k, x, y): 

341 interface.Event.__init__(self, k, x, y) 

342 

343 

344class DisplayImpl(virtualDevice.DisplayImpl): 

345 """Virtual device display implementation. 

346 """ 

347 

348 def __init__(self, display, verbose=False, *args, **kwargs): 

349 virtualDevice.DisplayImpl.__init__(self, display, verbose) 

350 

351 def _close(self): 

352 """Called when the device is closed. 

353 """ 

354 pass 

355 

356 def _setMaskTransparency(self, transparency, maskplane): 

357 """Specify DS9's mask transparency. 

358 

359 Parameters 

360 ---------- 

361 transparency : `int` 

362 Percent transparency. 

363 maskplane : `NoneType` 

364 If `None`, transparency is enabled. Otherwise, this parameter is 

365 ignored. 

366 """ 

367 if maskplane is not None: 

368 print(f"ds9 is unable to set transparency for individual maskplanes ({maskplane})", 

369 file=sys.stderr) 

370 return 

371 ds9Cmd(f"mask transparency {transparency}", frame=self.display.frame) 

372 

373 def _getMaskTransparency(self, maskplane): 

374 """Return the current DS9's mask transparency. 

375 

376 Parameters 

377 ---------- 

378 maskplane : unused 

379 This parameter does nothing. 

380 """ 

381 selectFrame(self.display.frame) 

382 return float(ds9Cmd("mask transparency", get=True)) 

383 

384 def _show(self): 

385 """Uniconify and raise DS9. 

386 

387 Notes 

388 ----- 

389 Raises if ``self.display.frame`` doesn't exist. 

390 """ 

391 ds9Cmd("raise", trap=False, frame=self.display.frame) 

392 

393 def _mtv(self, image, mask=None, wcs=None, title="", metadata=None): 

394 """Display an Image and/or Mask on a DS9 display. 

395 

396 Parameters 

397 ---------- 

398 image : subclass of `lsst.afw.image.Image` 

399 Image to display. 

400 mask : subclass of `lsst.afw.image.Mask`, optional 

401 Mask. 

402 wcs : `lsst.afw.geom.SkyWcs`, optional 

403 WCS of data 

404 title : `str`, optional 

405 Title of image. 

406 metadata : `lsst.daf.base`, optional 

407 Additional metadata. 

408 """ 

409 

410 for i in range(3): 

411 try: 

412 initDS9(i == 0) 

413 except Ds9Error: 

414 print("waiting for ds9...\r", end="") 

415 sys.stdout.flush() 

416 time.sleep(0.5) 

417 else: 

418 if i > 0: 

419 print(" \r", end="") 

420 sys.stdout.flush() 

421 break 

422 

423 ds9Cmd(selectFrame(self.display.frame)) 

424 ds9Cmd("smooth no") 

425 self._erase() 

426 

427 if image: 

428 _i_mtv(image, wcs, title, False, metadata=metadata) 

429 

430 if mask: 

431 maskPlanes = mask.getMaskPlaneDict() 

432 nMaskPlanes = max(maskPlanes.values()) + 1 

433 

434 planes = {} # build inverse dictionary 

435 for key in maskPlanes: 

436 planes[maskPlanes[key]] = key 

437 

438 planeList = range(nMaskPlanes) 

439 usedPlanes = int(afwMath.makeStatistics(mask, afwMath.SUM).getValue()) 

440 mask1 = mask.Factory(mask.getBBox()) # Mask containing just one bitplane 

441 

442 colorGenerator = self.display.maskColorGenerator(omitBW=True) 

443 for p in planeList: 

444 if planes.get(p): 

445 pname = planes[p] 

446 

447 if not ((1 << p) & usedPlanes): # no pixels have this bitplane set 

448 continue 

449 

450 mask1[:] = mask 

451 mask1 &= (1 << p) 

452 

453 color = self.display.getMaskPlaneColor(pname) 

454 

455 if not color: # none was specified 

456 color = next(colorGenerator) 

457 elif color.lower() == "ignore": 

458 continue 

459 

460 ds9Cmd(f"mask color {color}") 

461 _i_mtv(mask1, wcs, title, True, metadata=metadata) 

462 # 

463 # Graphics commands 

464 # 

465 

466 def _buffer(self, enable=True): 

467 """Push and pop buffer size. 

468 

469 Parameters 

470 ---------- 

471 enable : `bool`, optional 

472 If `True` (default), push size; else pop it. 

473 """ 

474 if enable: 

475 cmdBuffer.pushSize() 

476 else: 

477 cmdBuffer.popSize() 

478 

479 def _flush(self): 

480 """Flush buffer. 

481 """ 

482 cmdBuffer.flush() 

483 

484 def _erase(self): 

485 """Erase all regions in current frame. 

486 """ 

487 ds9Cmd("regions delete all", flush=True, frame=self.display.frame) 

488 

489 def _dot(self, symb, c, r, size, ctype, fontFamily="helvetica", textAngle=None): 

490 """Draw a symbol onto the specified DS9 frame. 

491 

492 Parameters 

493 ---------- 

494 symb : `str`, or subclass of `lsst.afw.geom.ellipses.BaseCore` 

495 Symbol to be drawn. Possible values are: 

496 

497 - ``"+"``: Draw a "+" 

498 - ``"x"``: Draw an "x" 

499 - ``"*"``: Draw a "*" 

500 - ``"o"``: Draw a circle 

501 - ``"@:Mxx,Mxy,Myy"``: Draw an ellipse with moments (Mxx, Mxy, 

502 Myy);(the ``size`` parameter is ignored) 

503 - An object derived from `lsst.afw.geom.ellipses.BaseCore`: Draw 

504 the ellipse (argument size is ignored) 

505 

506 Any other value is interpreted as a string to be drawn. 

507 c : `int` 

508 Column to draw symbol [0-based coordinates]. 

509 r : `int` 

510 Row to draw symbol [0-based coordinates]. 

511 size : `float` 

512 Size of symbol. 

513 ctype : `str` 

514 the name of a colour (e.g. ``"red"``) 

515 fontFamily : `str`, optional 

516 String font. May be extended with other characteristics, 

517 e.g. ``"times bold italic"``. 

518 textAngle: `float`, optional 

519 Text will be drawn rotated by ``textAngle``. 

520 

521 Notes 

522 ----- 

523 Objects derived from `lsst.afw.geom.ellipses.BaseCore` include 

524 `~lsst.afw.geom.ellipses.Axes` and `lsst.afw.geom.ellipses.Quadrupole`. 

525 """ 

526 cmd = selectFrame(self.display.frame) + "; " 

527 for region in ds9Regions.dot(symb, c, r, size, ctype, fontFamily, textAngle): 

528 cmd += f'regions command {{{region}}}; ' 

529 

530 ds9Cmd(cmd, silent=True) 

531 

532 def _drawLines(self, points, ctype): 

533 """Connect the points. 

534 

535 Parameters 

536 ----------- 

537 points : `list` of (`int`, `int`) 

538 A list of points specified as (col, row). 

539 ctype : `str` 

540 The name of a colour (e.g. ``"red"``). 

541 """ 

542 cmd = selectFrame(self.display.frame) + "; " 

543 for region in ds9Regions.drawLines(points, ctype): 

544 cmd += f'regions command {{{region}}}; ' 

545 

546 ds9Cmd(cmd) 

547 

548 def _scale(self, algorithm, min, max, unit, *args, **kwargs): 

549 """Set image color scale. 

550 

551 Parameters 

552 ---------- 

553 algorithm : {``"linear"``, ``"log"``, ``"pow"``, ``"sqrt"``, ``"squared"``, ``"asinh"``, ``"sinh"``, ``"histequ"``} # noqa: E501 

554 Scaling algorithm. May be any value supported by DS9. 

555 min : `float` 

556 Minimum value for scale. 

557 max : `float` 

558 Maximum value for scale. 

559 unit : `str` 

560 Ignored. 

561 *args 

562 Ignored. 

563 **kwargs 

564 Ignored 

565 """ 

566 if algorithm: 

567 ds9Cmd(f"scale {algorithm}", frame=self.display.frame) 

568 

569 if min in ("minmax", "zscale"): 

570 ds9Cmd(f"scale mode {min}") 

571 else: 

572 if unit: 

573 print(f"ds9: ignoring scale unit {unit}") 

574 

575 ds9Cmd(f"scale limits {min:g} {max:g}", frame=self.display.frame) 

576 # 

577 # Zoom and Pan 

578 # 

579 

580 def _zoom(self, zoomfac): 

581 """Zoom frame by specified amount. 

582 

583 Parameters 

584 ---------- 

585 zoomfac : `int` 

586 DS9 zoom factor. 

587 """ 

588 cmd = selectFrame(self.display.frame) + "; " 

589 cmd += f"zoom to {zoomfac}; " 

590 

591 ds9Cmd(cmd, flush=True) 

592 

593 def _pan(self, colc, rowc): 

594 """Pan frame. 

595 

596 Parameters 

597 ---------- 

598 colc : `int` 

599 Physical column to which to pan. 

600 rowc : `int` 

601 Physical row to which to pan. 

602 """ 

603 cmd = selectFrame(self.display.frame) + "; " 

604 # ds9 is 1-indexed. Grrr 

605 cmd += f"pan to {colc + 1:g} {rowc + 1:g} physical; " 

606 

607 ds9Cmd(cmd, flush=True) 

608 

609 def _getEvent(self): 

610 """Listen for a key press on a frame in DS9 and return an event. 

611 

612 Returns 

613 ------- 

614 event : `Ds9Event` 

615 Event with (key, x, y). 

616 """ 

617 vals = ds9Cmd("imexam key coordinate", get=True).split() 

618 if vals[0] == "XPA$ERROR": 

619 if vals[1:4] == ['unknown', 'option', '"-state"']: 

620 pass # a ds9 bug --- you get this by hitting TAB 

621 else: 

622 print("Error return from imexam:", " ".join(vals), file=sys.stderr) 

623 return None 

624 

625 k = vals.pop(0) 

626 try: 

627 x = float(vals[0]) 

628 y = float(vals[1]) 

629 except Exception: 

630 x = float("NaN") 

631 y = float("NaN") 

632 

633 return Ds9Event(k, x, y) 

634 

635 

636try: 

637 haveGzip 

638except NameError: 

639 # does gzip work? 

640 haveGzip = not os.system("gzip < /dev/null > /dev/null 2>&1") 

641 

642 

643def _i_mtv(data, wcs, title, isMask, metadata): 

644 """Internal routine to display an image or a mask on a DS9 display. 

645 

646 Parameters 

647 ---------- 

648 data : Subclass of `lsst.afw.image.Image` or `lsst.afw.image.Mask` 

649 Data to display. 

650 wcs : `lsst.afw.geom.SkyWcs` 

651 WCS of data. 

652 title : `str` 

653 Title of display. 

654 isMask : `bool` 

655 Is ``data`` a mask? 

656 metadata : `lsst.daf.base.PropertySet` 

657 Additional metadata. 

658 """ 

659 title = str(title) if title else "" 

660 

661 if isMask: 

662 xpa_cmd = f"xpaset {getXpaAccessPoint()} fits mask" 

663 # ds9 mis-handles BZERO/BSCALE in uint16 data. 

664 # The following hack works around this. 

665 # This is a copy we're modifying 

666 if data.getArray().dtype == np.uint16: 

667 data |= 0x8000 

668 else: 

669 xpa_cmd = f"xpaset {getXpaAccessPoint()} fits" 

670 

671 if haveGzip: 

672 xpa_cmd = "gzip | " + xpa_cmd 

673 

674 with subprocess.Popen(xpa_cmd, stdin=subprocess.PIPE, shell=True) as pfd: 

675 ds9Cmd(flush=True, silent=True) 

676 afwDisplay.writeFitsImage(pfd, data, wcs, title, metadata)