db.py (3606B)
1 """PostgreSQL connection, DSN construction, and metadata queries.""" 2 3 from __future__ import annotations 4 5 import os 6 from dataclasses import dataclass, field 7 8 import psycopg 9 10 11 def _getenv(key: str, fallback: str = "") -> str: 12 return os.environ.get(key) or fallback 13 14 15 def dsn_from_env() -> str: 16 """Build a PostgreSQL connection string from standard PG* environment variables.""" 17 host = _getenv("PGHOST", "wrds-pgdata.wharton.upenn.edu") 18 port = _getenv("PGPORT", "9737") 19 user = _getenv("PGUSER") 20 password = _getenv("PGPASSWORD") 21 database = _getenv("PGDATABASE", "wrds") 22 23 if not user: 24 raise RuntimeError("PGUSER not set") 25 26 dsn = f"host={host} port={port} user={user} sslmode=require" 27 if password: 28 dsn += f" password={password}" 29 if database: 30 dsn += f" dbname={database}" 31 return dsn 32 33 34 def connect() -> psycopg.Connection: 35 """Return a psycopg connection using DSN from environment variables.""" 36 return psycopg.connect(dsn_from_env()) 37 38 39 def quote_ident(s: str) -> str: 40 """Quote a PostgreSQL identifier to prevent SQL injection.""" 41 return '"' + s.replace('"', '""') + '"' 42 43 44 def build_query( 45 schema: str, 46 table: str, 47 columns: str = "*", 48 where: str = "", 49 limit: int = 0, 50 ) -> str: 51 """Build a SELECT query with quoted identifiers.""" 52 if columns and columns != "*": 53 parts = [quote_ident(c.strip()) for c in columns.split(",")] 54 sel = ", ".join(parts) 55 else: 56 sel = "*" 57 58 q = f"SELECT {sel} FROM {quote_ident(schema)}.{quote_ident(table)}" 59 if where: 60 q += f" WHERE {where}" 61 if limit > 0: 62 q += f" LIMIT {limit}" 63 return q 64 65 66 @dataclass 67 class ColumnMeta: 68 name: str 69 data_type: str 70 nullable: bool 71 description: str 72 73 74 @dataclass 75 class TableMeta: 76 schema: str 77 table: str 78 comment: str = "" 79 row_count: int = 0 80 size: str = "" 81 columns: list[ColumnMeta] = field(default_factory=list) 82 83 84 def table_meta(conn: psycopg.Connection, schema: str, table: str) -> TableMeta: 85 """Fetch catalog metadata for a table (no data scan).""" 86 meta = TableMeta(schema=schema, table=table) 87 88 # Table-level stats (best effort). 89 with conn.cursor() as cur: 90 cur.execute( 91 """ 92 SELECT c.reltuples::bigint, 93 COALESCE(pg_size_pretty(pg_total_relation_size(c.oid)), ''), 94 COALESCE(obj_description(c.oid), '') 95 FROM pg_class c 96 JOIN pg_namespace n ON n.oid = c.relnamespace 97 WHERE n.nspname = %s AND c.relname = %s 98 """, 99 (schema, table), 100 ) 101 row = cur.fetchone() 102 if row: 103 meta.row_count, meta.size, meta.comment = row 104 105 # Column metadata with descriptions from pg_description. 106 with conn.cursor() as cur: 107 cur.execute( 108 """ 109 SELECT a.attname, 110 pg_catalog.format_type(a.atttypid, a.atttypmod), 111 NOT a.attnotnull, 112 COALESCE(d.description, '') 113 FROM pg_attribute a 114 JOIN pg_class c ON a.attrelid = c.oid 115 JOIN pg_namespace n ON c.relnamespace = n.oid 116 LEFT JOIN pg_description d ON d.objoid = c.oid AND d.objsubid = a.attnum 117 WHERE n.nspname = %s AND c.relname = %s 118 AND a.attnum > 0 AND NOT a.attisdropped 119 ORDER BY a.attnum 120 """, 121 (schema, table), 122 ) 123 for name, dtype, nullable, desc in cur: 124 meta.columns.append(ColumnMeta(name, dtype, nullable, desc)) 125 126 return meta