Coverage for tests / test_tap_schema_postgres.py: 29%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:42 +0000

1# This file is part of felis. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import gc 

23import os 

24import unittest 

25 

26from sqlalchemy import MetaData 

27from sqlalchemy.engine import create_engine 

28from sqlalchemy.exc import SQLAlchemyError 

29from sqlalchemy.schema import CreateSchema 

30 

31from felis.datamodel import Schema 

32from felis.db.database_context import create_database_context 

33from felis.metadata import MetaDataBuilder 

34from felis.tap_schema import DataLoader, TableManager 

35 

36try: 

37 from testing.postgresql import Postgresql 

38except ImportError: 

39 Postgresql = None 

40 

41TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

42TEST_SALES = os.path.join(TEST_DIR, "data", "sales.yaml") 

43TEST_TAP_SCHEMA_NONSTD = os.path.join(TEST_DIR, "data", "test_tap_schema_nonstandard.yaml") 

44 

45 

46class TestTapSchemaPostgresql(unittest.TestCase): 

47 """Test TAP_SCHEMA for PostgreSQL""" 

48 

49 def setUp(self) -> None: 

50 """Set up a local PostgreSQL database and a test schema.""" 

51 # Skip the test if the testing.postgresql package is not installed. 

52 if not Postgresql: 

53 self.skipTest("testing.postgresql not installed") 

54 

55 # Start a PostgreSQL database for testing. 

56 self.postgresql = Postgresql() 

57 url = self.postgresql.url() 

58 self.engine = create_engine(url) 

59 

60 # Setup a test schema. 

61 self.test_schema = Schema.from_uri(TEST_SALES) 

62 

63 def test_create_metadata(self) -> None: 

64 """Test using a schema that was created by using the TAP_SCHEMA tables 

65 by the `~felis.tap_schema.TableManager`. 

66 """ 

67 # Create the TAP_SCHEMA database. 

68 mgr = TableManager(engine_url=str(self.engine.url)) 

69 with create_database_context(str(self.engine.url), mgr.metadata) as db_ctx: 

70 mgr.initialize_database(db_ctx) 

71 

72 # Load the test data into the database. 

73 loader = DataLoader(self.test_schema, mgr, db_ctx, tap_schema_index=1) 

74 loader.load() 

75 # Context manager will handle cleanup 

76 

77 def test_reflect_database(self) -> None: 

78 """Test reflecting an existing PostgreSQL TAP_SCHEMA database into a 

79 `~felis.tap_schema.TableManager`. 

80 """ 

81 mgr = None 

82 db_ctx = None 

83 try: 

84 # Build the TAP_SCHEMA database independently of the TableManager. 

85 schema = TableManager.load_schema_resource() 

86 md = MetaDataBuilder(schema).build() 

87 with self.engine.connect() as conn: 

88 trans = conn.begin() 

89 try: 

90 print(f"Creating schema '{schema.name}'") 

91 conn.execute(CreateSchema(schema.name, if_not_exists=False)) 

92 trans.commit() 

93 except SQLAlchemyError as e: 

94 trans.rollback() 

95 self.fail(f"Failed to create schema: {e}") 

96 try: 

97 print(f"Creating tables in schema: {md.schema}") 

98 md.create_all(self.engine) 

99 except SQLAlchemyError as e: 

100 self.fail(f"Failed to create database: {e}") 

101 

102 # Reflect the existing database into a TableManager. 

103 with create_database_context(str(self.engine.url), md) as db_ctx: 

104 mgr = TableManager(engine_url=str(self.engine.url), db_context=db_ctx) 

105 self.assertIsNotNone(mgr.metadata) 

106 self.assertGreater(len(mgr.metadata.tables), 0) 

107 table_names = set( 

108 [table_name.replace(f"{schema.name}.", "") for table_name in mgr.metadata.tables.keys()] 

109 ) 

110 self.assertEqual(table_names, set(TableManager.get_table_names_std())) 

111 

112 # See if test data can be loaded successfully using the 

113 # existing database 

114 loader = DataLoader(self.test_schema, mgr, db_ctx, tap_schema_index=1) 

115 loader.load() 

116 except Exception as e: 

117 self.fail(f"Test failed with exception: {e}") 

118 

119 def test_nonstandard_names(self) -> None: 

120 """Test the TAP table manager class with non-standard names for the 

121 schema and columns, which are present in the test YAML file used 

122 to create the TAP_SCHEMA database. 

123 """ 

124 with open(TEST_TAP_SCHEMA_NONSTD) as file: 

125 sch = Schema.from_stream(file, context={"id_generation": True}) 

126 md = MetaDataBuilder(sch).build() 

127 with create_database_context(str(self.engine.url), md) as ctx: 

128 ctx.initialize() 

129 ctx.create_all() 

130 

131 postfix = "11" 

132 # Create a context for reflection with the existing database 

133 with create_database_context(str(self.engine.url), md) as reflect_ctx: 

134 mgr = TableManager( 

135 engine_url=str(self.engine.url), 

136 db_context=reflect_ctx, 

137 table_name_postfix=postfix, 

138 schema_name=sch.name, 

139 ) 

140 for table_name in mgr.get_table_names_std(): 

141 table = mgr[table_name] 

142 self.assertEqual(table.name, f"{table_name}{postfix}".replace(f"{sch.name}", "")) 

143 

144 def test_bad_engine(self) -> None: 

145 """Test the TableManager class with an invalid engine.""" 

146 bad_url = "postgresql+psycopg2://fake_user:fake_password@fake_host:5555" 

147 # Create metadata for reflection attempt 

148 md = MetaData(schema="TAP_SCHEMA") 

149 with create_database_context(bad_url, md) as db_ctx: 

150 with self.assertRaises(SQLAlchemyError): 

151 # Reflection will fail when trying to connect 

152 TableManager(db_context=db_ctx) 

153 

154 def tearDown(self) -> None: 

155 """Tear down the test case.""" 

156 gc.collect() 

157 self.engine.dispose() 

158 

159 

160if __name__ == "__main__": 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 unittest.main()