Coverage for python / lsst / source / injection / utils / _ingest_injection_catalog.py: 26%

23 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:55 +0000

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["ingest_injection_catalog"] 

25 

26import logging 

27 

28import esutil 

29from astropy.table import Table, vstack 

30 

31from lsst.daf.butler import Butler, CollectionType, DatasetRef, DatasetType 

32 

33 

34def ingest_injection_catalog( 

35 writeable_butler: Butler, 

36 table: Table | list[Table], 

37 band: str, 

38 output_collection: str, 

39 dataset_type_name: str = "injection_catalog", 

40 log_level: int = logging.INFO, 

41) -> list[DatasetRef]: 

42 """Ingest a source table into the butler. 

43 

44 This function ingests either a single astropy Table or list of Tables into 

45 the Butler. If a list of Tables is provided, these will be vertically 

46 stacked together into one single Table for ingestion. Input source tables 

47 are expected to contain the columns ``ra`` and ``dec``, with data in units 

48 of degrees. This spatial information will be used to shard up the source 

49 table on-disk using a depth 7 Hierarchical Triangular Mesh (HTM7) format. 

50 HTM7 trixels have an area of ~0.315 square degrees. 

51 

52 Parameters 

53 ---------- 

54 writeable_butler : `lsst.daf.butler.Butler` 

55 An instantiated writeable butler. 

56 table : `astropy.table.Table` or `list` [`astropy.table.Table`] 

57 Input source table(s). Requires columns ``ra`` and ``dec`` in degrees. 

58 band : `str` 

59 Band associated with the input source table(s). 

60 output_collection : `str` 

61 Name of the output collection to ingest the consolidated table into. 

62 dataset_type_name : `str`, optional 

63 Dataset type name for the ingested consolidated table. 

64 log_level : `int`, optional 

65 The log level to use for logging. 

66 

67 Returns 

68 ------- 

69 dataset_refs : `list` [`lsst.daf.butler.DatasetRef`] 

70 List containing the dataset refs for the ingested source table. 

71 """ 

72 # Instantiate logger. 

73 logger = logging.getLogger(__name__) 

74 logger.setLevel(log_level) 

75 

76 # Concatenate inputs to a single Table if required. 

77 if isinstance(table, list): 

78 table = vstack(table) 

79 

80 # Register the dataset type and collection. 

81 table_dataset_type = DatasetType( 

82 dataset_type_name, 

83 ("htm7", "band"), 

84 "ArrowAstropy", 

85 universe=writeable_butler.dimensions, 

86 ) 

87 writeable_butler.registry.registerDatasetType(table_dataset_type) 

88 writeable_butler.registry.registerCollection( 

89 name=output_collection, 

90 type=CollectionType.RUN, 

91 ) 

92 

93 # Generate HTM trixel indices. 

94 htm = esutil.htm.HTM(7) 

95 htm_indices = htm.lookup_id(table["ra"], table["dec"]) # type: ignore 

96 

97 # Loop over all unique trixel indices and ingest. 

98 dataset_refs: list[DatasetRef] = [] 

99 for htm_index in set(htm_indices): 

100 table_subset = table[htm_indices == htm_index] 

101 dataset_ref = writeable_butler.put( 

102 table_subset, 

103 table_dataset_type, 

104 {"htm7": htm_index, "band": band}, 

105 run=output_collection, 

106 ) 

107 dataset_refs.append(dataset_ref) 

108 

109 # Log results and return. 

110 logger.info( 

111 "Ingested %d %s-band %s DatasetRef%s into: %s", 

112 len(dataset_refs), 

113 band, 

114 dataset_type_name, 

115 "" if len(dataset_refs) == 1 else "s", 

116 output_collection, 

117 ) 

118 return dataset_refs