Coverage for python / felis / tap_schema.py: 22%

241 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1"""Provides utilities for creating and populating the TAP_SCHEMA database.""" 

2 

3# This file is part of felis. 

4# 

5# Developed for the LSST Data Management System. 

6# This product includes software developed by the LSST Project 

7# (https://www.lsst.org). 

8# See the COPYRIGHT file at the top-level directory of this distribution 

9# for details of code ownership. 

10# 

11# This program is free software: you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation, either version 3 of the License, or 

14# (at your option) any later version. 

15# 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20# 

21# You should have received a copy of the GNU General Public License 

22# along with this program. If not, see <https://www.gnu.org/licenses/>. 

23 

24import csv 

25import io 

26import logging 

27import os 

28import re 

29from typing import IO, Any 

30 

31from lsst.resources import ResourcePath 

32from sqlalchemy import MetaData, Table, select, text 

33from sqlalchemy.exc import SQLAlchemyError 

34from sqlalchemy.sql.dml import Insert 

35 

36from . import datamodel 

37from .datamodel import Constraint, Schema 

38from .db.database_context import DatabaseContext, is_sqlite_url 

39from .metadata import MetaDataBuilder 

40from .types import FelisType 

41 

42__all__ = ["DataLoader", "MetadataInserter", "TableManager"] 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class TableManager: 

48 """Manage TAP_SCHEMA table definitions and access. 

49 

50 This class provides a streamlined interface for managing TAP_SCHEMA tables, 

51 automatically handling dialect-specific requirements and providing 

52 consistent access to TAP_SCHEMA tables through a dictionary-like interface. 

53 

54 Parameters 

55 ---------- 

56 engine_url 

57 Database engine URL for automatic dialect detection and schema 

58 handling. 

59 db_context 

60 Optional database context for reflecting existing TAP_SCHEMA tables. 

61 If None, loads from internal YAML schema. 

62 schema_name 

63 The name of the schema to use for TAP_SCHEMA tables. 

64 Defaults to "TAP_SCHEMA". 

65 table_name_postfix 

66 A string to append to standard table names for customization. 

67 extensions_path 

68 Path to additional TAP_SCHEMA table definitions. 

69 

70 Notes 

71 ----- 

72 The TableManager automatically detects SQLite vs. schema-supporting 

73 databases and handles schema application appropriately. 

74 """ 

75 

76 _TABLE_NAMES_STD = ["schemas", "tables", "columns", "keys", "key_columns"] 

77 """The standard table names for the TAP_SCHEMA tables.""" 

78 

79 _SCHEMA_NAME_STD = "TAP_SCHEMA" 

80 """The standard schema name for the TAP_SCHEMA tables.""" 

81 

82 def __init__( 

83 self, 

84 engine_url: str | None = None, 

85 db_context: DatabaseContext | None = None, 

86 schema_name: str | None = None, 

87 table_name_postfix: str = "", 

88 extensions_path: str | None = None, 

89 ): 

90 """Initialize the table manager.""" 

91 self.table_name_postfix = table_name_postfix 

92 self.schema_name = schema_name or self._SCHEMA_NAME_STD 

93 self.extensions_path = extensions_path 

94 

95 # Automatic dialect detection from engine URL 

96 if engine_url is not None: 

97 self.apply_schema_to_metadata = not is_sqlite_url(engine_url) 

98 else: 

99 # Default case: assume SQLite 

100 engine_url = "sqlite:///:memory:" 

101 self.apply_schema_to_metadata = False 

102 

103 if db_context is not None: 

104 if table_name_postfix != "": 

105 logger.warning( 

106 "Table name postfix '%s' will be ignored when reflecting TAP_SCHEMA database", 

107 table_name_postfix, 

108 ) 

109 logger.debug( 

110 "Reflecting TAP_SCHEMA database from existing database at %s", 

111 db_context.engine.url._replace(password="***"), 

112 ) 

113 self._reflect_from_database(db_context) 

114 else: 

115 self._load_from_yaml() 

116 

117 self._create_table_map() 

118 self._check_tables() 

119 

120 def _load_from_yaml(self) -> None: 

121 """Load TAP_SCHEMA from YAML resources and build metadata.""" 

122 # Load the base schema 

123 self._schema = self.load_schema_resource() 

124 

125 # Override schema name if specified 

126 if self.schema_name != self._SCHEMA_NAME_STD: 

127 self._schema.name = self.schema_name 

128 else: 

129 self.schema_name = self._schema.name 

130 

131 # Apply any extensions 

132 self._apply_extensions() 

133 

134 # Build metadata using streamlined approach 

135 self._metadata = MetaDataBuilder( 

136 self._schema, 

137 apply_schema_to_metadata=self.apply_schema_to_metadata, 

138 table_name_postfix=self.table_name_postfix, 

139 ).build() 

140 

141 logger.debug("Loaded TAP_SCHEMA '%s' from YAML resource", self.schema_name) 

142 

143 def _reflect_from_database(self, db_context: DatabaseContext) -> None: 

144 """Reflect TAP_SCHEMA tables from an existing database. 

145 

146 Parameters 

147 ---------- 

148 db_context 

149 The database context to use for reflection. 

150 """ 

151 self._metadata = MetaData(schema=self.schema_name if self.apply_schema_to_metadata else None) 

152 try: 

153 self._metadata.reflect(bind=db_context.engine) 

154 except SQLAlchemyError as e: 

155 logger.error("Error reflecting TAP_SCHEMA database: %s", e) 

156 raise 

157 

158 def _apply_extensions(self) -> None: 

159 """Apply extensions from a YAML file to the TAP_SCHEMA schema. 

160 

161 This method loads extension column definitions from a YAML file and 

162 adds them to the appropriate TAP_SCHEMA tables. 

163 """ 

164 if not self.extensions_path: 

165 return 

166 

167 logger.info("Loading TAP_SCHEMA extensions from: %s", self.extensions_path) 

168 extensions_schema = Schema.from_uri(self.extensions_path, context={"id_generation": True}) 

169 

170 if not extensions_schema.tables: 

171 logger.warning("Extensions schema does not contain any tables, no extensions applied") 

172 return 

173 

174 extension_count = 0 

175 extension_tables = {table.name: table.columns for table in extensions_schema.tables} 

176 

177 for table in self.schema.tables: 

178 extension_columns = extension_tables.get(table.name) 

179 if extension_columns: 

180 table.columns = list(table.columns) + list(extension_columns) 

181 extension_count += len(extension_columns) 

182 logger.debug("Added %d extension columns to table '%s'", len(extension_columns), table.name) 

183 

184 logger.info("Applied %d extension columns to TAP_SCHEMA", extension_count) 

185 

186 def __getitem__(self, table_name: str) -> Table: 

187 """Get one of the TAP_SCHEMA tables by its standard TAP_SCHEMA name. 

188 

189 Parameters 

190 ---------- 

191 table_name 

192 The name of the table to get. 

193 

194 Returns 

195 ------- 

196 Table 

197 The table with the given name. 

198 

199 Notes 

200 ----- 

201 This implements array semantics for the table manager, allowing 

202 tables to be accessed by their standard TAP_SCHEMA names. 

203 """ 

204 if table_name not in self._table_map: 

205 raise KeyError(f"Table '{table_name}' not found in TAP_SCHEMA") 

206 return self.metadata.tables[self._table_map[table_name]] 

207 

208 @property 

209 def schema(self) -> Schema: 

210 """Get the TAP_SCHEMA schema. 

211 

212 Returns 

213 ------- 

214 Schema 

215 The TAP_SCHEMA schema. 

216 

217 Notes 

218 ----- 

219 This will only be set if the TAP_SCHEMA schema was loaded from a 

220 Felis package resource. In the case where the TAP_SCHEMA schema was 

221 reflected from an existing database, this will be None. 

222 """ 

223 return self._schema 

224 

225 @property 

226 def metadata(self) -> MetaData: 

227 """Get the metadata for the TAP_SCHEMA tables. 

228 

229 Returns 

230 ------- 

231 `~sqlalchemy.sql.schema.MetaData` 

232 The metadata for the TAP_SCHEMA tables. 

233 

234 Notes 

235 ----- 

236 This will either be the metadata that was reflected from an existing 

237 database or the metadata that was loaded from a Felis package resource. 

238 """ 

239 return self._metadata 

240 

241 @classmethod 

242 def get_tap_schema_std_path(cls) -> str: 

243 """Get the path to the standard TAP_SCHEMA schema resource. 

244 

245 Returns 

246 ------- 

247 str 

248 The path to the standard TAP_SCHEMA schema resource. 

249 """ 

250 return os.path.join(os.path.dirname(__file__), "config", "tap_schema", "tap_schema_std.yaml") 

251 

252 @classmethod 

253 def get_tap_schema_std_resource(cls) -> ResourcePath: 

254 """Get the standard TAP_SCHEMA schema resource. 

255 

256 Returns 

257 ------- 

258 `~lsst.resources.ResourcePath` 

259 The standard TAP_SCHEMA schema resource. 

260 """ 

261 return ResourcePath("resource://felis/config/tap_schema/tap_schema_std.yaml") 

262 

263 @classmethod 

264 def get_table_names_std(cls) -> list[str]: 

265 """Get the standard column names for the TAP_SCHEMA tables. 

266 

267 Returns 

268 ------- 

269 list 

270 The standard table names for the TAP_SCHEMA tables. 

271 """ 

272 return cls._TABLE_NAMES_STD 

273 

274 @classmethod 

275 def get_schema_name_std(cls) -> str: 

276 """Get the standard schema name for the TAP_SCHEMA tables. 

277 

278 Returns 

279 ------- 

280 str 

281 The standard schema name for the TAP_SCHEMA tables. 

282 """ 

283 return cls._SCHEMA_NAME_STD 

284 

285 @classmethod 

286 def load_schema_resource(cls) -> Schema: 

287 """Load the standard TAP_SCHEMA schema from a Felis package 

288 resource into a Felis `~felis.datamodel.Schema`. 

289 

290 Returns 

291 ------- 

292 Schema 

293 The TAP_SCHEMA schema. 

294 """ 

295 rp = cls.get_tap_schema_std_resource() 

296 return Schema.from_uri(rp, context={"id_generation": True}) 

297 

298 def _load_schema(self) -> None: 

299 """Load the TAP_SCHEMA schema from a Felis package resource.""" 

300 self._schema = self.load_schema_resource() 

301 

302 def _create_table_map(self) -> None: 

303 """Create a mapping of standard table names to the table names modified 

304 with a postfix, as well as the prepended schema name if it is set. 

305 

306 Notes 

307 ----- 

308 This is a private method that is called during initialization, allowing 

309 us to use table names like ``schemas11`` such as those used by the CADC 

310 TAP library instead of the standard table names. It also maps between 

311 the standard table names and those with the schema name prepended like 

312 SQLAlchemy uses. The mapping is stored in ``self._table_map``. 

313 """ 

314 self._table_map = { 

315 table_name: ( 

316 f"{self.schema_name + '.' if self.apply_schema_to_metadata else ''}" 

317 f"{table_name}{self.table_name_postfix}" 

318 ) 

319 for table_name in TableManager.get_table_names_std() 

320 } 

321 logger.debug(f"Created TAP_SCHEMA table map: {self._table_map}") 

322 

323 def _check_tables(self) -> None: 

324 """Check that there is a valid mapping to each standard table. 

325 

326 Raises 

327 ------ 

328 KeyError 

329 If a table is missing from the table map. 

330 """ 

331 for table_name in TableManager.get_table_names_std(): 

332 self[table_name] 

333 

334 def initialize_database(self, db_context: DatabaseContext) -> None: 

335 """Initialize a database with the TAP_SCHEMA tables. 

336 

337 Parameters 

338 ---------- 

339 db_context 

340 The database context to use to create the tables. 

341 """ 

342 logger.info("Creating TAP_SCHEMA database '%s'", self.schema_name) 

343 db_context.initialize() 

344 db_context.create_all() 

345 

346 def select( 

347 self, 

348 db_context: DatabaseContext, 

349 table_name: str, 

350 filter_condition: str = "", 

351 ) -> list[dict[str, Any]]: 

352 """Select all rows from a TAP_SCHEMA table with an optional filter 

353 condition. 

354 

355 Parameters 

356 ---------- 

357 db_context 

358 The database context to use to connect to the database. 

359 table_name 

360 The name of the table to select from. 

361 filter_condition 

362 The filter condition as a string. If empty, no filter will be 

363 applied. 

364 

365 Returns 

366 ------- 

367 list 

368 A list of dictionaries containing the rows from the table. 

369 """ 

370 table = self[table_name] 

371 query = select(table) 

372 if filter_condition: 

373 query = query.where(text(filter_condition)) 

374 with db_context.engine.connect() as connection: 

375 result = connection.execute(query) 

376 rows = [dict(row._mapping) for row in result] 

377 return rows 

378 

379 

380class DataLoader: 

381 """Load data into the TAP_SCHEMA tables. 

382 

383 Parameters 

384 ---------- 

385 schema 

386 The Felis ``Schema`` to load into the TAP_SCHEMA tables. 

387 mgr 

388 The table manager that contains the TAP_SCHEMA tables. 

389 db_context 

390 The database context to use to connect to the database. 

391 tap_schema_index 

392 The index of the schema in the TAP_SCHEMA database. 

393 output_file 

394 The file object to write the SQL statements to. If None, file output 

395 will be suppressed. 

396 print_sql 

397 If True, print the SQL statements that will be executed. 

398 dry_run 

399 If True, the data will not be loaded into the database. 

400 unique_keys 

401 If True, prepend the schema name to the key name to make it unique 

402 when loading data into the keys and key_columns tables. 

403 """ 

404 

405 def __init__( 

406 self, 

407 schema: Schema, 

408 mgr: TableManager, 

409 db_context: DatabaseContext, 

410 tap_schema_index: int = 0, 

411 output_file: IO[str] | None = None, 

412 print_sql: bool = False, 

413 dry_run: bool = False, 

414 unique_keys: bool = False, 

415 ): 

416 self.schema = schema 

417 self.mgr = mgr 

418 self._db_context = db_context 

419 self.tap_schema_index = tap_schema_index 

420 self.inserts: list[Insert] = [] 

421 self.output_file = output_file 

422 self.print_sql = print_sql 

423 self.dry_run = dry_run 

424 self.unique_keys = unique_keys 

425 

426 def load(self) -> None: 

427 """Load the schema data into the TAP_SCHEMA tables. 

428 

429 Notes 

430 ----- 

431 This will generate inserts for the data, print the SQL statements if 

432 requested, save the SQL statements to a file if requested, and load the 

433 data into the database if not in dry run mode. These are done as 

434 sequential operations rather than for each insert. The logic is that 

435 the user may still want the complete SQL output to be printed or saved 

436 to a file even if loading into the database causes errors. If there are 

437 errors when inserting into the database, the SQLAlchemy error message 

438 should indicate which SQL statement caused the error. 

439 """ 

440 self._generate_all_inserts() 

441 if self.print_sql: 

442 # Print to stdout. 

443 self._print_sql() 

444 if self.output_file: 

445 # Print to an output file. 

446 self._write_sql_to_file() 

447 if not self.dry_run: 

448 # Execute the inserts if not in dry run mode. 

449 self._execute_inserts() 

450 else: 

451 logger.info("Dry run - skipped loading into database") 

452 

453 def _insert_schemas(self) -> None: 

454 """Insert the schema data into the ``schemas`` table.""" 

455 schema_record = { 

456 "schema_name": self.schema.name, 

457 "utype": self.schema.votable_utype, 

458 "description": self.schema.description, 

459 "schema_index": self.tap_schema_index, 

460 } 

461 self._insert("schemas", schema_record) 

462 

463 def _get_table_name(self, table: datamodel.Table) -> str: 

464 """Get the name of the table with the schema name prepended. 

465 

466 Parameters 

467 ---------- 

468 table 

469 The table to get the name for. 

470 

471 Returns 

472 ------- 

473 str 

474 The name of the table with the schema name prepended. 

475 """ 

476 return f"{self.schema.name}.{table.name}" 

477 

478 def _insert_tables(self) -> None: 

479 """Insert the table data into the ``tables`` table.""" 

480 for table in self.schema.tables: 

481 table_record = { 

482 "schema_name": self.schema.name, 

483 "table_name": self._get_table_name(table), 

484 "table_type": "table", 

485 "utype": table.votable_utype, 

486 "description": table.description, 

487 "table_index": 0 if table.tap_table_index is None else table.tap_table_index, 

488 } 

489 self._insert("tables", table_record) 

490 

491 def _insert_columns(self) -> None: 

492 """Insert the column data into the ``columns`` table.""" 

493 for table in self.schema.tables: 

494 for column in table.columns: 

495 felis_type = FelisType.felis_type(column.datatype.value) 

496 arraysize = str(column.votable_arraysize) if column.votable_arraysize else None 

497 size = DataLoader._get_size(column) 

498 indexed = DataLoader._is_indexed(column, table) 

499 tap_column_index = column.tap_column_index 

500 unit = column.ivoa_unit or column.fits_tunit 

501 

502 column_record = { 

503 "table_name": self._get_table_name(table), 

504 "column_name": column.name, 

505 "datatype": felis_type.votable_name, 

506 "arraysize": arraysize, 

507 "size": size, 

508 "xtype": column.votable_xtype, 

509 "description": column.description, 

510 "utype": column.votable_utype, 

511 "unit": unit, 

512 "ucd": column.ivoa_ucd, 

513 "indexed": indexed, 

514 "principal": column.tap_principal, 

515 "std": column.tap_std, 

516 "column_index": tap_column_index, 

517 } 

518 self._insert("columns", column_record) 

519 

520 def _get_key(self, constraint: Constraint) -> str: 

521 """Get the key name for a constraint. 

522 

523 Parameters 

524 ---------- 

525 constraint 

526 The constraint to get the key name for. 

527 

528 Returns 

529 ------- 

530 str 

531 The key name for the constraint. 

532 

533 Notes 

534 ----- 

535 This will prepend the name of the schema to the key name if the 

536 `unique_keys` attribute is set to True. Otherwise, it will just return 

537 the name of the constraint. 

538 """ 

539 if self.unique_keys: 

540 key_id = f"{self.schema.name}_{constraint.name}" 

541 logger.debug("Generated unique key_id: %s -> %s", constraint.name, key_id) 

542 else: 

543 key_id = constraint.name 

544 return key_id 

545 

546 def _insert_keys(self) -> None: 

547 """Insert the foreign keys into the ``keys`` and ``key_columns`` 

548 tables. 

549 """ 

550 for table in self.schema.tables: 

551 for constraint in table.constraints: 

552 if isinstance(constraint, datamodel.ForeignKeyConstraint): 

553 ########################################################### 

554 # Handle keys table 

555 ########################################################### 

556 referenced_column = self.schema.find_object_by_id( 

557 constraint.referenced_columns[0], datamodel.Column 

558 ) 

559 referenced_table = self.schema.get_table_by_column(referenced_column) 

560 key_id = self._get_key(constraint) 

561 key_record = { 

562 "key_id": key_id, 

563 "from_table": self._get_table_name(table), 

564 "target_table": self._get_table_name(referenced_table), 

565 "description": constraint.description, 

566 "utype": constraint.votable_utype, 

567 } 

568 self._insert("keys", key_record) 

569 

570 ########################################################### 

571 # Handle key_columns table 

572 ########################################################### 

573 # Loop over the corresponding columns and referenced 

574 # columns and insert a record for each pair. This is 

575 # necessary for proper handling of composite keys. 

576 for from_column_id, target_column_id in zip( 

577 constraint.columns, constraint.referenced_columns 

578 ): 

579 from_column = self.schema.find_object_by_id(from_column_id, datamodel.Column) 

580 target_column = self.schema.find_object_by_id(target_column_id, datamodel.Column) 

581 key_columns_record = { 

582 "key_id": key_id, 

583 "from_column": from_column.name, 

584 "target_column": target_column.name, 

585 } 

586 self._insert("key_columns", key_columns_record) 

587 

588 def _generate_all_inserts(self) -> None: 

589 """Generate the inserts for all the data.""" 

590 self.inserts.clear() 

591 self._insert_schemas() 

592 self._insert_tables() 

593 self._insert_columns() 

594 self._insert_keys() 

595 logger.debug("Generated %d insert statements", len(self.inserts)) 

596 

597 def _execute_inserts(self) -> None: 

598 """Load the `~felis.datamodel.Schema` data into the TAP_SCHEMA 

599 tables. 

600 """ 

601 try: 

602 with self._db_context.engine.begin() as connection: 

603 for insert in self.inserts: 

604 connection.execute(insert) 

605 except Exception as e: 

606 logger.error("Error loading data into database: %s", e) 

607 raise 

608 

609 def _compiled_inserts(self) -> list[str]: 

610 """Compile the inserts to SQL. 

611 

612 Returns 

613 ------- 

614 list 

615 A list of the compiled insert statements. 

616 """ 

617 return [ 

618 str( 

619 insert.compile( 

620 dialect=self._db_context.dialect, 

621 compile_kwargs={"literal_binds": True}, 

622 ), 

623 ) 

624 for insert in self.inserts 

625 ] 

626 

627 def _print_sql(self) -> None: 

628 """Print the generated inserts to stdout.""" 

629 for insert_str in self._compiled_inserts(): 

630 print(insert_str + ";") 

631 

632 def _write_sql_to_file(self) -> None: 

633 """Write the generated insert statements to a file.""" 

634 if not self.output_file: 

635 raise ValueError("No output file specified") 

636 for insert_str in self._compiled_inserts(): 

637 self.output_file.write(insert_str + ";" + "\n") 

638 

639 def _insert(self, table_name: str, record: list[Any] | dict[str, Any]) -> None: 

640 """Generate an insert statement for a record. 

641 

642 Parameters 

643 ---------- 

644 table_name 

645 The name of the table to insert the record into. 

646 record 

647 The record to insert into the table. 

648 """ 

649 table = self.mgr[table_name] 

650 insert_statement = table.insert().values(record) 

651 self.inserts.append(insert_statement) 

652 

653 @staticmethod 

654 def _get_size(column: datamodel.Column) -> int | None: 

655 """Get the size of the column. 

656 

657 Parameters 

658 ---------- 

659 column 

660 The column to get the size for. 

661 

662 Returns 

663 ------- 

664 int or None 

665 The size of the column or None if not applicable. 

666 """ 

667 arraysize = column.votable_arraysize 

668 

669 if not arraysize: 

670 return None 

671 

672 arraysize_str = str(arraysize) 

673 if arraysize_str.isdigit(): 

674 return int(arraysize_str) 

675 

676 match = re.match(r"^([0-9]+)\*$", arraysize_str) 

677 if match and match.group(1) is not None: 

678 return int(match.group(1)) 

679 

680 return None 

681 

682 @staticmethod 

683 def _is_indexed(column: datamodel.Column, table: datamodel.Table) -> int: 

684 """Check if the column is indexed in the table. 

685 

686 Parameters 

687 ---------- 

688 column 

689 The column to check. 

690 table 

691 The table to check. 

692 

693 Returns 

694 ------- 

695 int 

696 1 if the column is indexed, 0 otherwise. 

697 """ 

698 if isinstance(table.primary_key, str) and table.primary_key == column.id: 

699 return 1 

700 for index in table.indexes: 

701 if index.columns and len(index.columns) == 1 and index.columns[0] == column.id: 

702 return 1 

703 return 0 

704 

705 

706class MetadataInserter: 

707 """Insert TAP_SCHEMA self-description rows into the database. 

708 

709 Parameters 

710 ---------- 

711 mgr 

712 The table manager that contains the TAP_SCHEMA tables. 

713 db_context 

714 The database context for connecting to the TAP_SCHEMA database. 

715 """ 

716 

717 def __init__(self, mgr: TableManager, db_context: DatabaseContext): 

718 """Initialize the metadata inserter. 

719 

720 Parameters 

721 ---------- 

722 mgr 

723 The table manager representing the TAP_SCHEMA tables. 

724 db_context 

725 The database context for connecting to the database. 

726 """ 

727 self._mgr = mgr 

728 self._db_context = db_context 

729 

730 def insert_metadata(self) -> None: 

731 """Insert the TAP_SCHEMA metadata into the database.""" 

732 with self._db_context.engine.begin() as conn: 

733 for table_name in self._mgr.get_table_names_std(): 

734 table = self._mgr[table_name] 

735 csv_bytes = ResourcePath(f"resource://felis/config/tap_schema/{table_name}.csv").read() 

736 text_stream = io.TextIOWrapper(io.BytesIO(csv_bytes), encoding="utf-8") 

737 reader = csv.reader(text_stream) 

738 headers = next(reader) 

739 rows = [ 

740 {key: None if value == "\\N" else value for key, value in zip(headers, row)} 

741 for row in reader 

742 ] 

743 logger.debug( 

744 "Inserting %d rows into table '%s' with headers: %s", 

745 len(rows), 

746 table_name, 

747 headers, 

748 ) 

749 conn.execute(table.insert(), rows)