Coverage for python / lsst / summit / utils / blockUtils.py: 20%
185 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:32 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23import logging
24import re
25import time
26from dataclasses import dataclass
27from typing import TYPE_CHECKING
29import numpy as np
30import pandas as pd
31from astropy.time import Time
33from .dateTime import efdTimestampToAstropy
34from .efdUtils import getEfdData, makeEfdClient
35from .enums import ScriptState
37if TYPE_CHECKING:
38 from .tmaUtils import TMAEvent
40 try:
41 from lsst_efd_client import EfdClient
42 except ImportError:
43 EfdClient = None # this is currently just for mypy
45__all__ = ("BlockParser", "BlockInfo", "ScriptStatePoint")
48@dataclass(kw_only=True, frozen=True)
49class BlockInfo:
50 """Information about the execution of a "block".
52 Each BlockInfo instance contains information about a single block
53 execution. This is identified by the block number and sequence number,
54 which, when combined with the dayObs, define the block ID.
56 Each BlockInfo instance contains the following information:
57 * The block ID - this is the primary identifier, as a string, for
58 example "BL52_20230615_02", which is parsed into:
59 * The block number, as an integer, for example 52, for "BLOCK-52".
60 * The dayObs, as an integer, for example 20230615.
61 * The seqNum - the execution number of that block on that day.
62 * The begin and end times of the block execution, as astropy.time.Time
63 * The SAL indices which were involved in the block execution, as a list
64 * The SITCOM tickets which were involved in the block execution, as a
65 list of strings, including the SITCOM- prefix.
66 * The states of the script during the block execution, as a list of
67 ``ScriptStatePoint`` instances.
69 Parameters
70 ----------
71 blockNumber : `str`
72 The block number, as a str - sometimes it'll be like 123 but others it
73 will be like "T123" for test blocks.
74 blockId : `str`
75 The block ID, as a string.
76 dayObs : `int`
77 The dayObs the block was run on.
78 seqNum : `int`
79 The sequence number of the block.
80 begin : `astropy.time.Time`
81 The time the block execution began.
82 end : `astropy.time.Time`
83 The time the block execution ended.
84 isTestCase : `bool`
85 Whether this block is a test case type block, or a regular block. This
86 is also reflected in the blockNumber.
87 salIndices : `list` of `int`
88 One or more SAL indices, relating to the block.
89 tickets : `list` of `str`
90 One or more SITCOM tickets, relating to the block.
91 states : `list` of `lsst.summit.utils.blockUtils.ScriptStatePoint`
92 The states of the script during the block. Each element is a
93 ``ScriptStatePoint`` which contains:
94 - the time, as an astropy.time.Time
95 - the state, as a ``ScriptState`` enum
96 - the reason for state change, as a string, if present
97 """
99 blockNumber: str
100 blockId: str
101 dayObs: int
102 seqNum: int
103 begin: Time
104 end: Time
105 salIndices: list
106 tickets: list
107 states: list
108 isTestCase: bool
110 def __repr__(self) -> str:
111 return (
112 f"BlockInfo(blockNumber={self.blockNumber}, blockId={self.blockId}, salIndices={self.salIndices},"
113 f" tickets={self.tickets}, states={self.states!r}"
114 )
116 def _ipython_display_(self) -> None:
117 """This is the function which runs when someone executes a cell in a
118 notebook with just the class instance on its own, without calling
119 print() or str() on it.
120 """
121 print(self.__str__())
123 def __str__(self) -> str:
124 # no literal \n allowed inside {} portion of f-strings until python
125 # 3.12, but it can go in via a variable
126 newline = " \n"
127 return (
128 f"dayObs: {self.dayObs}\n"
129 f"seqNum: {self.seqNum}\n"
130 f"blockNumber: {self.blockNumber}\n"
131 f"blockId: {self.blockId}\n"
132 f"begin: {self.begin.isot}\n"
133 f"end: {self.end.isot}\n"
134 f"salIndices: {self.salIndices}\n"
135 f"tickets: {self.tickets}\n"
136 f"states: \n{newline.join([str(state) for state in self.states])}"
137 )
140@dataclass(kw_only=True, frozen=True)
141class ScriptStatePoint:
142 """The execution state of a script at a point in time.
144 Parameters
145 ----------
146 time : `astropy.time.Time`
147 The time of the state change.
148 state : `lsst.summit.utils.enums.ScriptState`
149 The state of the script at this point in time.
150 reason : `str`
151 The reason for the state change, if given.
152 """
154 time: Time
155 state: ScriptState
156 reason: str
158 def __repr__(self) -> str:
159 return f"ScriptStatePoint(time={self.time!r}, state={self.state!r}, reason={self.reason!r})"
161 def _ipython_display_(self) -> None:
162 """This is the function which runs when someone executes a cell in a
163 notebook with just the class instance on its own, without calling
164 print() or str() on it.
165 """
166 print(self.__str__())
168 def __str__(self) -> str:
169 reasonStr = f" - {self.reason}" if self.reason else ""
170 return f"{self.state.name:>10} @ {self.time.isot}{reasonStr}"
173class BlockParser:
174 """A class to parse BLOCK data from the EFD.
176 Information on executed blocks is stored in the EFD (Electronic Facilities
177 Database) in the ``lsst.sal.Script.logevent_state`` topic. This class
178 parses that topic and provides methods to get information on the blocks
179 which were run on a given dayObs. It also provides methods to get the
180 events which occurred during a given block, and also to get the block in
181 which a specified event occurred, if any.
183 Parameters
184 ----------
185 dayObs : `int`
186 The dayObs to get the block data for.
187 client : `lsst_efd_client.efd_client.EfdClient`, optional
188 The EFD client to use. If not specified, a new one is created.
189 """
191 def __init__(self, dayObs: int, client: EfdClient | None = None) -> None:
192 self.log = logging.getLogger("lsst.summit.utils.blockUtils.BlockParser")
193 self.dayObs = dayObs
195 self.client = client
196 if client is None:
197 self.client = makeEfdClient()
199 t0 = time.time()
200 self.getDataForDayObs()
201 self.log.debug(f"Getting data took {(time.time() - t0):.2f} seconds")
202 t0 = time.time()
203 self.augmentData()
204 self.log.debug(f"Parsing data took {(time.time() - t0):.5f} seconds")
206 def getDataForDayObs(self) -> None:
207 """Retrieve the data for the specified dayObs from the EFD."""
208 # Tiago thinks no individual block seqNums should take more than an
209 # hour to run, so pad the dayObs by 1.5 hours to make sure we catch
210 # any blocks which might span the end of the day.
211 padding = 1.5 * 60 * 60
212 data = getEfdData(
213 self.client,
214 "lsst.sal.Script.logevent_state",
215 dayObs=self.dayObs,
216 postPadding=padding,
217 raiseIfTopicNotInSchema=False,
218 )
219 self.data = data
221 def augmentData(self) -> None:
222 """Parse each row in the data frame individually, pulling the
223 information out into its own columns.
224 """
225 data = self.data
226 blockPattern = r"BLOCK-(\d+)"
227 blockIdPattern = r"B[LT]\d+(?:_\w+)+"
229 data["blockNum"] = pd.Series()
230 data["blockDayObs"] = pd.Series()
231 data["blockSeqNum"] = pd.Series()
232 data["isTestCase"] = pd.Series()
234 if "lastCheckpoint" not in self.data.columns:
235 nRows = len(self.data)
236 self.log.warning(
237 f"Found {nRows} rows of data and no 'lastCheckpoint' column was in the data,"
238 " so block data cannot be parsed."
239 )
241 # at some point the blockId column was added to the data, so if it's
242 # present, we can use it to extract the blockNum and other information,
243 # but before that we have to parse it out of the lastCheckpoint column
244 if "blockId" in data.columns:
245 blockNumberPattern = re.compile(r"[A-Z]*([0-9]+)(?=_)")
247 for index, row in data.iterrows():
248 # an example blockIdStr value is like BL365_O_20250420_000001
249 blockIdStr = row["blockId"]
250 match = blockNumberPattern.match(blockIdStr)
251 if not match: # we've failed to get the 365 part in the example
252 continue
253 blockNum = match.group(1)
255 idStrSplit = blockIdStr.split("_")
256 blockDayObs = int(idStrSplit[2])
257 if blockDayObs != self.dayObs:
258 continue # we're in the padded region
260 isTestCase = False
261 if idStrSplit[0].startswith("BT"):
262 isTestCase = True
264 blockSeqNum = int(idStrSplit[3])
265 data.at[index, "blockNum"] = f"{'T' if isTestCase else ''}{blockNum}"
266 data.at[index, "blockDayObs"] = int(blockDayObs)
267 data.at[index, "blockSeqNum"] = int(blockSeqNum)
268 data.at[index, "isTestCase"] = isTestCase
270 else:
271 data["blockId"] = pd.Series() # add it, as it's not present
272 for index, row in data.iterrows():
273 rowStr = row["lastCheckpoint"]
275 blockMatch = re.search(blockPattern, rowStr)
276 blockNumber = int(blockMatch.group(1)) if blockMatch else None
278 blockIdMatch = re.search(blockIdPattern, rowStr)
279 blockIdStr = blockIdMatch.group(0) if blockIdMatch else None
280 if blockIdStr is None:
281 continue
282 data.at[index, "blockId"] = blockIdStr
284 idStrSplit = blockIdStr.split("_")
285 blockDayObs = int(idStrSplit[2])
286 if blockDayObs != self.dayObs:
287 continue # we're in the padded region
289 isTestCase = False
290 if idStrSplit[0].startswith("BT"):
291 isTestCase = True
293 data.at[index, "blockNum"] = f"{'T' if isTestCase else ''}{blockNumber}"
294 data.at[index, "isTestCase"] = isTestCase
295 blockDayObs = int(idStrSplit[2])
296 blockSeqNum = int(idStrSplit[3])
297 data.at[index, "blockDayObs"] = blockDayObs
298 data.at[index, "blockSeqNum"] = blockSeqNum
300 def _listColumnValues(self, column: str, removeNone: bool = True) -> list[str]:
301 """Get all the different values for the specified column, as a list.
303 Parameters
304 ----------
305 column : `str`
306 The column to get the values for.
307 removeNone : `bool`
308 Whether to remove None from the list of values.
310 Returns
311 -------
312 values : `list`
313 The values for the specified column.
314 """
315 values = set(self.data[column].dropna())
316 if None in values and removeNone:
317 values.remove(None)
318 return sorted(values)
320 def getBlockNums(self) -> list[str]:
321 """Get the block numbers which were run on the specified dayObs.
323 Returns
324 -------
325 blockNums : `list` of `int`
326 The blocks which were run on the specified dayObs.
327 """
328 return self._listColumnValues("blockNum")
330 def getSeqNums(self, block: int | str) -> list[int]:
331 """Get the seqNums for the specified block.
333 Parameters
334 ----------
335 block : `int`
336 The block name or number to get the events for, e.g. 123 or T123.
338 Returns
339 -------
340 seqNums : `list` of `int`
341 The sequence numbers for the specified block.
342 """
343 if isinstance(block, int):
344 block = str(block)
346 seqNums = self.data[self.data["blockNum"] == block]["blockSeqNum"]
347 # block header rows have no blockId or seqNum, but do have a blockNum
348 # so appear here, so drop the nans as they don't relate to an actual
349 # run of a block
350 seqNums = seqNums.dropna()
351 return sorted(set(seqNums))
353 def getRows(self, block: str | int, seqNum: int | None = None) -> pd.DataFrame:
354 """Get all rows of data which relate to the specified block.
356 If the seqNum is specified, only the rows for that sequence number are
357 returned, otherwise all the rows relating to any block execution that
358 day are returned. If the specified seqNum doesn't occur on the current
359 day, an empty dataframe is returned.
361 Parameters
362 ----------
363 block : `int`
364 The block number to get the events for.
365 seqNum : `int`, optional
366 The sequence number, if specified, to get the row data for. If not
367 specified, all data for the specified block is returned.
369 Returns
370 -------
371 data : `pandas.DataFrame`
372 The row data.
373 """
374 if isinstance(block, int):
375 block = str(block)
377 # Because we query for a whole dayObs, but BLOCKs can overlap the day
378 # start/end, it's possible for the block's blockDayObs not to be the
379 # same as self.dayObs around the beginning or end of the day, so filter
380 # with an extra `& (self.data['blockDayObs'] == self.dayObs` when
381 # getting the relevant rows.
382 rowsForBlock = self.data[
383 np.logical_and(self.data["blockNum"] == block, self.data["blockDayObs"] == self.dayObs)
384 ]
385 if rowsForBlock.empty:
386 self.log.warning(f"No rows found for {block=} on dayObs={self.dayObs}")
387 if seqNum is None:
388 return rowsForBlock
389 return rowsForBlock[rowsForBlock["blockSeqNum"] == seqNum]
391 def printBlockEvolution(self, block: str | int, seqNum: int | None = None) -> None:
392 """Display the evolution of the specified block.
394 If the seqNum is specified, the evolution of that specific block
395 exection is displayed, otherwise all executions of that block are
396 printed.
398 Parameters
399 ----------
400 block : `int`
401 The block number to get the events for.
402 seqNum : `int`, optional
403 The sequence number, if specified, to print the evolution of. If
404 not specified, all sequence numbers for the block are printed.
405 """
406 if isinstance(block, int):
407 block = str(block)
409 if seqNum is None:
410 seqNums = self.getSeqNums(block)
411 else:
412 seqNums = [seqNum]
413 print(f"Evolution of BLOCK {block} for dayObs={self.dayObs} {seqNum=}:")
414 for seqNum in seqNums:
415 blockInfo = self.getBlockInfo(block, seqNum)
416 print(blockInfo, "\n")
418 def getBlockInfo(self, block: int | str, seqNum: int) -> BlockInfo | None:
419 """Get the block info for the specified block.
421 Parses the rows relating to this block execution, and returns
422 the information as a ``BlockInfo`` instance.
424 Parameters
425 ----------
426 block : `int`
427 The block number.
428 seqNum : `int`
429 The sequence number.
431 Returns
432 -------
433 blockInfo : `lsst.summit.utils.blockUtils.BlockInfo`
434 The block info.
435 """
436 if isinstance(block, int):
437 block = str(block)
439 rows = self.getRows(block, seqNum=seqNum)
440 if rows.empty:
441 print(f"No {seqNum=} on dayObs={self.dayObs} for {block=}")
442 return None
444 blockIds = set()
445 testCases = set()
446 tickets = set()
447 salIndices = set()
448 statePoints = []
449 sitcomPattern = r"SITCOM-(\d+)"
451 for index, row in rows.iterrows():
452 salIndices.add(row["salIndex"])
453 blockIds.add(row["blockId"])
454 testCases.add(row["isTestCase"])
456 lastCheckpoint = row["lastCheckpoint"]
457 sitcomMatches = re.findall(sitcomPattern, lastCheckpoint)
458 tickets.update(sitcomMatches)
460 time = efdTimestampToAstropy(row["private_efdStamp"])
461 state = ScriptState(row["state"])
462 reason = row["reason"]
463 statePoint = ScriptStatePoint(time=time, state=state, reason=reason)
464 statePoints.append(statePoint)
466 # check that blockIds, blockNames, and testCases are all length == 1
467 if any(len(s) != 1 for s in [blockIds, testCases]):
468 raise RuntimeError(
469 f"Expected exactly one unique value for blockIds and testCases, "
470 f"but found: blockIds={blockIds}, testCases={testCases} "
471 f"for {seqNum=}"
472 )
474 blockId = blockIds.pop()
475 isTestCase = testCases.pop()
477 blockInfo = BlockInfo(
478 blockNumber=block,
479 blockId=blockId,
480 dayObs=self.dayObs,
481 seqNum=seqNum,
482 begin=efdTimestampToAstropy(rows.iloc[0]["private_efdStamp"]),
483 end=efdTimestampToAstropy(rows.iloc[-1]["private_efdStamp"]),
484 salIndices=sorted(salIndices),
485 tickets=[f"SITCOM-{ticket}" for ticket in sorted(tickets)],
486 states=statePoints,
487 isTestCase=isTestCase,
488 )
490 return blockInfo
492 def getEventsForBlock(self, events: list[TMAEvent], block: str, seqNum: int) -> list[TMAEvent]:
493 """Get the events which occurred during the specified block.
495 Parameters
496 ----------
497 events : `list` of `lsst.summit.utils.tmaUtils.TMAEvent`
498 The list of candidate events.
499 block : `int`
500 The block number to get the events for.
501 seqNum : `int`
502 The sequence number to get the events for.
504 Returns
505 -------
506 events : `list` of `lsst.summit.utils.tmaUtils.TMAEvent`
507 The events.
508 """
509 blockInfo = self.getBlockInfo(block, seqNum)
510 if blockInfo is None:
511 return []
512 begin = blockInfo.begin
513 end = blockInfo.end
515 # each event's end being past the begin time and their
516 # starts being before the end time means we get all the
517 # events in the window and also those that overlap the
518 # start/end too
519 return [e for e in events if e.end >= begin and e.begin <= end]