Coverage for python / lsst / summit / utils / blockUtils.py: 20%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:28 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23import logging 

24import re 

25import time 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING 

28 

29import numpy as np 

30import pandas as pd 

31from astropy.time import Time 

32 

33from .dateTime import efdTimestampToAstropy 

34from .efdUtils import getEfdData, makeEfdClient 

35from .enums import ScriptState 

36 

37if TYPE_CHECKING: 

38 from .tmaUtils import TMAEvent 

39 

40 try: 

41 from lsst_efd_client import EfdClient 

42 except ImportError: 

43 EfdClient = None # this is currently just for mypy 

44 

45__all__ = ("BlockParser", "BlockInfo", "ScriptStatePoint") 

46 

47 

48@dataclass(kw_only=True, frozen=True) 

49class BlockInfo: 

50 """Information about the execution of a "block". 

51 

52 Each BlockInfo instance contains information about a single block 

53 execution. This is identified by the block number and sequence number, 

54 which, when combined with the dayObs, define the block ID. 

55 

56 Each BlockInfo instance contains the following information: 

57 * The block ID - this is the primary identifier, as a string, for 

58 example "BL52_20230615_02", which is parsed into: 

59 * The block number, as an integer, for example 52, for "BLOCK-52". 

60 * The dayObs, as an integer, for example 20230615. 

61 * The seqNum - the execution number of that block on that day. 

62 * The begin and end times of the block execution, as astropy.time.Time 

63 * The SAL indices which were involved in the block execution, as a list 

64 * The SITCOM tickets which were involved in the block execution, as a 

65 list of strings, including the SITCOM- prefix. 

66 * The states of the script during the block execution, as a list of 

67 ``ScriptStatePoint`` instances. 

68 

69 Parameters 

70 ---------- 

71 blockNumber : `str` 

72 The block number, as a str - sometimes it'll be like 123 but others it 

73 will be like "T123" for test blocks. 

74 blockId : `str` 

75 The block ID, as a string. 

76 dayObs : `int` 

77 The dayObs the block was run on. 

78 seqNum : `int` 

79 The sequence number of the block. 

80 begin : `astropy.time.Time` 

81 The time the block execution began. 

82 end : `astropy.time.Time` 

83 The time the block execution ended. 

84 isTestCase : `bool` 

85 Whether this block is a test case type block, or a regular block. This 

86 is also reflected in the blockNumber. 

87 salIndices : `list` of `int` 

88 One or more SAL indices, relating to the block. 

89 tickets : `list` of `str` 

90 One or more SITCOM tickets, relating to the block. 

91 states : `list` of `lsst.summit.utils.blockUtils.ScriptStatePoint` 

92 The states of the script during the block. Each element is a 

93 ``ScriptStatePoint`` which contains: 

94 - the time, as an astropy.time.Time 

95 - the state, as a ``ScriptState`` enum 

96 - the reason for state change, as a string, if present 

97 """ 

98 

99 blockNumber: str 

100 blockId: str 

101 dayObs: int 

102 seqNum: int 

103 begin: Time 

104 end: Time 

105 salIndices: list 

106 tickets: list 

107 states: list 

108 isTestCase: bool 

109 

110 def __repr__(self) -> str: 

111 return ( 

112 f"BlockInfo(blockNumber={self.blockNumber}, blockId={self.blockId}, salIndices={self.salIndices}," 

113 f" tickets={self.tickets}, states={self.states!r}" 

114 ) 

115 

116 def _ipython_display_(self) -> None: 

117 """This is the function which runs when someone executes a cell in a 

118 notebook with just the class instance on its own, without calling 

119 print() or str() on it. 

120 """ 

121 print(self.__str__()) 

122 

123 def __str__(self) -> str: 

124 # no literal \n allowed inside {} portion of f-strings until python 

125 # 3.12, but it can go in via a variable 

126 newline = " \n" 

127 return ( 

128 f"dayObs: {self.dayObs}\n" 

129 f"seqNum: {self.seqNum}\n" 

130 f"blockNumber: {self.blockNumber}\n" 

131 f"blockId: {self.blockId}\n" 

132 f"begin: {self.begin.isot}\n" 

133 f"end: {self.end.isot}\n" 

134 f"salIndices: {self.salIndices}\n" 

135 f"tickets: {self.tickets}\n" 

136 f"states: \n{newline.join([str(state) for state in self.states])}" 

137 ) 

138 

139 

140@dataclass(kw_only=True, frozen=True) 

141class ScriptStatePoint: 

142 """The execution state of a script at a point in time. 

143 

144 Parameters 

145 ---------- 

146 time : `astropy.time.Time` 

147 The time of the state change. 

148 state : `lsst.summit.utils.enums.ScriptState` 

149 The state of the script at this point in time. 

150 reason : `str` 

151 The reason for the state change, if given. 

152 """ 

153 

154 time: Time 

155 state: ScriptState 

156 reason: str 

157 

158 def __repr__(self) -> str: 

159 return f"ScriptStatePoint(time={self.time!r}, state={self.state!r}, reason={self.reason!r})" 

160 

161 def _ipython_display_(self) -> None: 

162 """This is the function which runs when someone executes a cell in a 

163 notebook with just the class instance on its own, without calling 

164 print() or str() on it. 

165 """ 

166 print(self.__str__()) 

167 

168 def __str__(self) -> str: 

169 reasonStr = f" - {self.reason}" if self.reason else "" 

170 return f"{self.state.name:>10} @ {self.time.isot}{reasonStr}" 

171 

172 

173class BlockParser: 

174 """A class to parse BLOCK data from the EFD. 

175 

176 Information on executed blocks is stored in the EFD (Electronic Facilities 

177 Database) in the ``lsst.sal.Script.logevent_state`` topic. This class 

178 parses that topic and provides methods to get information on the blocks 

179 which were run on a given dayObs. It also provides methods to get the 

180 events which occurred during a given block, and also to get the block in 

181 which a specified event occurred, if any. 

182 

183 Parameters 

184 ---------- 

185 dayObs : `int` 

186 The dayObs to get the block data for. 

187 client : `lsst_efd_client.efd_client.EfdClient`, optional 

188 The EFD client to use. If not specified, a new one is created. 

189 """ 

190 

191 def __init__(self, dayObs: int, client: EfdClient | None = None) -> None: 

192 self.log = logging.getLogger("lsst.summit.utils.blockUtils.BlockParser") 

193 self.dayObs = dayObs 

194 

195 self.client = client 

196 if client is None: 

197 self.client = makeEfdClient() 

198 

199 t0 = time.time() 

200 self.getDataForDayObs() 

201 self.log.debug(f"Getting data took {(time.time() - t0):.2f} seconds") 

202 t0 = time.time() 

203 self.augmentData() 

204 self.log.debug(f"Parsing data took {(time.time() - t0):.5f} seconds") 

205 

206 def getDataForDayObs(self) -> None: 

207 """Retrieve the data for the specified dayObs from the EFD.""" 

208 # Tiago thinks no individual block seqNums should take more than an 

209 # hour to run, so pad the dayObs by 1.5 hours to make sure we catch 

210 # any blocks which might span the end of the day. 

211 padding = 1.5 * 60 * 60 

212 data = getEfdData( 

213 self.client, 

214 "lsst.sal.Script.logevent_state", 

215 dayObs=self.dayObs, 

216 postPadding=padding, 

217 raiseIfTopicNotInSchema=False, 

218 ) 

219 self.data = data 

220 

221 def augmentData(self) -> None: 

222 """Parse each row in the data frame individually, pulling the 

223 information out into its own columns. 

224 """ 

225 data = self.data 

226 blockPattern = r"BLOCK-(\d+)" 

227 blockIdPattern = r"B[LT]\d+(?:_\w+)+" 

228 

229 data["blockNum"] = pd.Series() 

230 data["blockDayObs"] = pd.Series() 

231 data["blockSeqNum"] = pd.Series() 

232 data["isTestCase"] = pd.Series() 

233 

234 if "lastCheckpoint" not in self.data.columns: 

235 nRows = len(self.data) 

236 self.log.warning( 

237 f"Found {nRows} rows of data and no 'lastCheckpoint' column was in the data," 

238 " so block data cannot be parsed." 

239 ) 

240 

241 # at some point the blockId column was added to the data, so if it's 

242 # present, we can use it to extract the blockNum and other information, 

243 # but before that we have to parse it out of the lastCheckpoint column 

244 if "blockId" in data.columns: 

245 blockNumberPattern = re.compile(r"[A-Z]*([0-9]+)(?=_)") 

246 

247 for index, row in data.iterrows(): 

248 # an example blockIdStr value is like BL365_O_20250420_000001 

249 blockIdStr = row["blockId"] 

250 match = blockNumberPattern.match(blockIdStr) 

251 if not match: # we've failed to get the 365 part in the example 

252 continue 

253 blockNum = match.group(1) 

254 

255 idStrSplit = blockIdStr.split("_") 

256 blockDayObs = int(idStrSplit[2]) 

257 if blockDayObs != self.dayObs: 

258 continue # we're in the padded region 

259 

260 isTestCase = False 

261 if idStrSplit[0].startswith("BT"): 

262 isTestCase = True 

263 

264 blockSeqNum = int(idStrSplit[3]) 

265 data.at[index, "blockNum"] = f"{'T' if isTestCase else ''}{blockNum}" 

266 data.at[index, "blockDayObs"] = int(blockDayObs) 

267 data.at[index, "blockSeqNum"] = int(blockSeqNum) 

268 data.at[index, "isTestCase"] = isTestCase 

269 

270 else: 

271 data["blockId"] = pd.Series() # add it, as it's not present 

272 for index, row in data.iterrows(): 

273 rowStr = row["lastCheckpoint"] 

274 

275 blockMatch = re.search(blockPattern, rowStr) 

276 blockNumber = int(blockMatch.group(1)) if blockMatch else None 

277 

278 blockIdMatch = re.search(blockIdPattern, rowStr) 

279 blockIdStr = blockIdMatch.group(0) if blockIdMatch else None 

280 if blockIdStr is None: 

281 continue 

282 data.at[index, "blockId"] = blockIdStr 

283 

284 idStrSplit = blockIdStr.split("_") 

285 blockDayObs = int(idStrSplit[2]) 

286 if blockDayObs != self.dayObs: 

287 continue # we're in the padded region 

288 

289 isTestCase = False 

290 if idStrSplit[0].startswith("BT"): 

291 isTestCase = True 

292 

293 data.at[index, "blockNum"] = f"{'T' if isTestCase else ''}{blockNumber}" 

294 data.at[index, "isTestCase"] = isTestCase 

295 blockDayObs = int(idStrSplit[2]) 

296 blockSeqNum = int(idStrSplit[3]) 

297 data.at[index, "blockDayObs"] = blockDayObs 

298 data.at[index, "blockSeqNum"] = blockSeqNum 

299 

300 def _listColumnValues(self, column: str, removeNone: bool = True) -> list[str]: 

301 """Get all the different values for the specified column, as a list. 

302 

303 Parameters 

304 ---------- 

305 column : `str` 

306 The column to get the values for. 

307 removeNone : `bool` 

308 Whether to remove None from the list of values. 

309 

310 Returns 

311 ------- 

312 values : `list` 

313 The values for the specified column. 

314 """ 

315 values = set(self.data[column].dropna()) 

316 if None in values and removeNone: 

317 values.remove(None) 

318 return sorted(values) 

319 

320 def getBlockNums(self) -> list[str]: 

321 """Get the block numbers which were run on the specified dayObs. 

322 

323 Returns 

324 ------- 

325 blockNums : `list` of `int` 

326 The blocks which were run on the specified dayObs. 

327 """ 

328 return self._listColumnValues("blockNum") 

329 

330 def getSeqNums(self, block: int | str) -> list[int]: 

331 """Get the seqNums for the specified block. 

332 

333 Parameters 

334 ---------- 

335 block : `int` 

336 The block name or number to get the events for, e.g. 123 or T123. 

337 

338 Returns 

339 ------- 

340 seqNums : `list` of `int` 

341 The sequence numbers for the specified block. 

342 """ 

343 if isinstance(block, int): 

344 block = str(block) 

345 

346 seqNums = self.data[self.data["blockNum"] == block]["blockSeqNum"] 

347 # block header rows have no blockId or seqNum, but do have a blockNum 

348 # so appear here, so drop the nans as they don't relate to an actual 

349 # run of a block 

350 seqNums = seqNums.dropna() 

351 return sorted(set(seqNums)) 

352 

353 def getRows(self, block: str | int, seqNum: int | None = None) -> pd.DataFrame: 

354 """Get all rows of data which relate to the specified block. 

355 

356 If the seqNum is specified, only the rows for that sequence number are 

357 returned, otherwise all the rows relating to any block execution that 

358 day are returned. If the specified seqNum doesn't occur on the current 

359 day, an empty dataframe is returned. 

360 

361 Parameters 

362 ---------- 

363 block : `int` 

364 The block number to get the events for. 

365 seqNum : `int`, optional 

366 The sequence number, if specified, to get the row data for. If not 

367 specified, all data for the specified block is returned. 

368 

369 Returns 

370 ------- 

371 data : `pandas.DataFrame` 

372 The row data. 

373 """ 

374 if isinstance(block, int): 

375 block = str(block) 

376 

377 # Because we query for a whole dayObs, but BLOCKs can overlap the day 

378 # start/end, it's possible for the block's blockDayObs not to be the 

379 # same as self.dayObs around the beginning or end of the day, so filter 

380 # with an extra `& (self.data['blockDayObs'] == self.dayObs` when 

381 # getting the relevant rows. 

382 rowsForBlock = self.data[ 

383 np.logical_and(self.data["blockNum"] == block, self.data["blockDayObs"] == self.dayObs) 

384 ] 

385 if rowsForBlock.empty: 

386 self.log.warning(f"No rows found for {block=} on dayObs={self.dayObs}") 

387 if seqNum is None: 

388 return rowsForBlock 

389 return rowsForBlock[rowsForBlock["blockSeqNum"] == seqNum] 

390 

391 def printBlockEvolution(self, block: str | int, seqNum: int | None = None) -> None: 

392 """Display the evolution of the specified block. 

393 

394 If the seqNum is specified, the evolution of that specific block 

395 exection is displayed, otherwise all executions of that block are 

396 printed. 

397 

398 Parameters 

399 ---------- 

400 block : `int` 

401 The block number to get the events for. 

402 seqNum : `int`, optional 

403 The sequence number, if specified, to print the evolution of. If 

404 not specified, all sequence numbers for the block are printed. 

405 """ 

406 if isinstance(block, int): 

407 block = str(block) 

408 

409 if seqNum is None: 

410 seqNums = self.getSeqNums(block) 

411 else: 

412 seqNums = [seqNum] 

413 print(f"Evolution of BLOCK {block} for dayObs={self.dayObs} {seqNum=}:") 

414 for seqNum in seqNums: 

415 blockInfo = self.getBlockInfo(block, seqNum) 

416 print(blockInfo, "\n") 

417 

418 def getBlockInfo(self, block: int | str, seqNum: int) -> BlockInfo | None: 

419 """Get the block info for the specified block. 

420 

421 Parses the rows relating to this block execution, and returns 

422 the information as a ``BlockInfo`` instance. 

423 

424 Parameters 

425 ---------- 

426 block : `int` 

427 The block number. 

428 seqNum : `int` 

429 The sequence number. 

430 

431 Returns 

432 ------- 

433 blockInfo : `lsst.summit.utils.blockUtils.BlockInfo` 

434 The block info. 

435 """ 

436 if isinstance(block, int): 

437 block = str(block) 

438 

439 rows = self.getRows(block, seqNum=seqNum) 

440 if rows.empty: 

441 print(f"No {seqNum=} on dayObs={self.dayObs} for {block=}") 

442 return None 

443 

444 blockIds = set() 

445 testCases = set() 

446 tickets = set() 

447 salIndices = set() 

448 statePoints = [] 

449 sitcomPattern = r"SITCOM-(\d+)" 

450 

451 for index, row in rows.iterrows(): 

452 salIndices.add(row["salIndex"]) 

453 blockIds.add(row["blockId"]) 

454 testCases.add(row["isTestCase"]) 

455 

456 lastCheckpoint = row["lastCheckpoint"] 

457 sitcomMatches = re.findall(sitcomPattern, lastCheckpoint) 

458 tickets.update(sitcomMatches) 

459 

460 time = efdTimestampToAstropy(row["private_efdStamp"]) 

461 state = ScriptState(row["state"]) 

462 reason = row["reason"] 

463 statePoint = ScriptStatePoint(time=time, state=state, reason=reason) 

464 statePoints.append(statePoint) 

465 

466 # check that blockIds, blockNames, and testCases are all length == 1 

467 if any(len(s) != 1 for s in [blockIds, testCases]): 

468 raise RuntimeError( 

469 f"Expected exactly one unique value for blockIds and testCases, " 

470 f"but found: blockIds={blockIds}, testCases={testCases} " 

471 f"for {seqNum=}" 

472 ) 

473 

474 blockId = blockIds.pop() 

475 isTestCase = testCases.pop() 

476 

477 blockInfo = BlockInfo( 

478 blockNumber=block, 

479 blockId=blockId, 

480 dayObs=self.dayObs, 

481 seqNum=seqNum, 

482 begin=efdTimestampToAstropy(rows.iloc[0]["private_efdStamp"]), 

483 end=efdTimestampToAstropy(rows.iloc[-1]["private_efdStamp"]), 

484 salIndices=sorted(salIndices), 

485 tickets=[f"SITCOM-{ticket}" for ticket in sorted(tickets)], 

486 states=statePoints, 

487 isTestCase=isTestCase, 

488 ) 

489 

490 return blockInfo 

491 

492 def getEventsForBlock(self, events: list[TMAEvent], block: str, seqNum: int) -> list[TMAEvent]: 

493 """Get the events which occurred during the specified block. 

494 

495 Parameters 

496 ---------- 

497 events : `list` of `lsst.summit.utils.tmaUtils.TMAEvent` 

498 The list of candidate events. 

499 block : `int` 

500 The block number to get the events for. 

501 seqNum : `int` 

502 The sequence number to get the events for. 

503 

504 Returns 

505 ------- 

506 events : `list` of `lsst.summit.utils.tmaUtils.TMAEvent` 

507 The events. 

508 """ 

509 blockInfo = self.getBlockInfo(block, seqNum) 

510 if blockInfo is None: 

511 return [] 

512 begin = blockInfo.begin 

513 end = blockInfo.end 

514 

515 # each event's end being past the begin time and their 

516 # starts being before the end time means we get all the 

517 # events in the window and also those that overlap the 

518 # start/end too 

519 return [e for e in events if e.end >= begin and e.begin <= end]