name: postgresql-sdk description: Integrates PostgreSQL databases using psycopg2 2.9.x and asyncpg 0.31.x with patterns for connection pooling, replication, COPY, and query parameterization. license: MIT compatibility: opencode metadata: version: "1.0.0" domain: coding triggers: postgresql, psycopg2, asyncpg, postgres connection pool, sql parameterization, how do i query postgres from python, database replication, libpq archetypes:
- tactical
- generation anti_triggers:
- brainstorming
- vague ideation
- code golf
- over-engineering response_profile: verbosity: low directive_strength: high abstraction_level: operational role: implementation scope: implementation output-format: code content-types:
- code
- guidance
- do-dont
- examples related-skills: coding-database-design-modeling, coding-database-migrations, coding-asyncio-patterns
PostgreSQL Python SDK Integration
Integrates PostgreSQL databases using psycopg2 2.9.x (synchronous, DB-API 2.0) and asyncpg 0.31.x (asyncio-native) with patterns for connection management, parameterized queries, COPY bulk operations, connection pooling, logical replication, and transaction handling.
TL;DR Checklist
- Use
psycopg2.connect()for synchronous workloads andasyncpg.connect()for asyncio - Always parameterize queries with
%s(psycopg2) or$1(asyncpg) — never use string formatting - Use
psycopg2.pool.ThreadedConnectionPoolorasyncpg.pool.create_pool()for connection pooling - Use
copy_from()/copy_to()for bulk data operations, not row-by-row INSERT - Use
conn.set_session(autocommit=True)for DDL statements that cannot run in a transaction - Always use context managers (
with conn:) for automatic transaction management - Enable
logical replicationvia asyncpg'sreplication()for change data capture
When to Use
Use this skill when:
- Building Python applications that read/write to PostgreSQL databases
- Implementing connection pooling for web services with concurrent database access
- Performing bulk data loading or unloading with PostgreSQL COPY protocol
- Setting up logical replication or listening to LISTEN/NOTIFY channels
- Writing database migration scripts or ETL pipelines targeting PostgreSQL
- Building async web frameworks (FastAPI, aiohttp) that need non-blocking database access
When NOT to Use
- For simple key-value workloads (use Redis or DynamoDB instead)
- When you need a document store with flexible schemas (use MongoDB instead)
- For in-memory data caching (use Redis instead)
- When interacting with other SQL databases (MySQL, SQLite) — use their respective drivers
Core Workflow
1. Connect to PostgreSQL
Choose the driver based on your concurrency model:
# Synchronous — psycopg2 (widely compatible, DB-API 2.0)
import psycopg2
from psycopg2 import sql, OperationalError
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="mydb",
user="app_user",
password=os.environ["PGPASSWORD"],
connect_timeout=10,
sslmode="require",
)
# Asynchronous — asyncpg (fastest, asyncio-native)
import asyncpg
import os
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="mydb",
user="app_user",
password=os.environ["PGPASSWORD"],
timeout=10,
ssl="require",
)
Checkpoint: Verify connectivity with SELECT 1 before running application queries. Handle OperationalError (psycopg2) or asyncpg.InvalidPasswordError (asyncpg) at connection time — do not defer to query execution.
2. Execute Queries with Parameterization
Never interpolate values into SQL strings. Use driver-native placeholders.
# psycopg2 — %s placeholders (positional)
def get_users_by_age(conn, min_age: int, max_age: int) -> list[tuple]:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email FROM users WHERE age BETWEEN %s AND %s",
(min_age, max_age),
)
return cur.fetchall()
# asyncpg — $1, $2 placeholders (positional, 1-indexed)
async def get_users_by_age_async(conn, min_age: int, max_age: int) -> list[asyncpg.Record]:
return await conn.fetch(
"SELECT id, name, email FROM users WHERE age BETWEEN $1 AND $2",
min_age, max_age,
)
Checkpoint: Verify all external input is passed as query parameters, not interpolated. Use sql.Identifier() for dynamic table/column names in psycopg2.
3. Manage Transactions with Context Managers
Use with conn: to automatically commit or rollback on exception.
def transfer_funds(
conn,
from_account: str,
to_account: str,
amount: Decimal,
) -> None:
"""Execute a funds transfer atomically."""
with conn: # Begins transaction; commits on success, rolls back on exception
with conn.cursor() as cur:
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account),
)
if cur.rowcount != 1:
raise ValueError(f"Account {from_account} not found")
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account),
)
Checkpoint: Never nest with conn: blocks. Use savepoints (SAVEPOINT sp; ROLLBACK TO SAVEPOINT sp;) for sub-transactions within a single connection context.
4. Use Connection Pooling for Concurrency
# psycopg2 ThreadedConnectionPool
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(
minconn=2,
maxconn=10,
host="localhost",
dbname="mydb",
user="app_user",
password=os.environ["PGPASSWORD"],
)
def query_with_pool(query: str, params: tuple) -> list[tuple]:
conn = pool.getconn()
try:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
finally:
pool.putconn(conn)
# asyncpg connection pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="app_user",
password=os.environ["PGPASSWORD"],
min_size=5,
max_size=20,
)
async def query_with_pool_async(query: str, *args) -> list[asyncpg.Record]:
async with pool.acquire() as conn:
return await conn.fetch(query, *args)
Checkpoint: Validate pool sizing against your database max_connections setting. Never let pool max_size exceed the server's connection limit.
Implementation Patterns
Pattern 1: COPY for Bulk Data Loading
import io
import csv
def bulk_insert_users(conn, users: list[dict]) -> int:
"""Insert users using COPY for maximum throughput."""
buffer = io.StringIO()
writer = csv.writer(buffer)
for user in users:
writer.writerow([user["name"], user["email"], user["age"]])
buffer.seek(0)
with conn.cursor() as cur:
cur.copy_from(
buffer,
"users",
columns=("name", "email", "age"),
sep=",",
)
return cur.rowcount
Pattern 2: LISTEN/NOTIFY for Real-Time Events
import select
def listen_for_events(conn, channel: str, callback) -> None:
"""Listen for PostgreSQL NOTIFY events and invoke a callback."""
conn.set_session(autocommit=True)
with conn.cursor() as cur:
cur.execute(f"LISTEN {channel}")
while True:
if select.select([conn], [], [], 5) == ([], [], []):
continue # Timeout — poll for other work
conn.poll()
for notify in conn.notifies:
callback(notify.channel, notify.payload)
conn.notifies.clear()
Pattern 3: Logical Replication Consumer (asyncpg)
async def consume_replication_slot(pool):
"""Read changes from a logical replication slot."""
async with pool.acquire() as conn:
await conn.execute("CREATE_REPLICATION_SLOT test_slot LOGICAL pgoutput")
async for change in conn.replication():
# Each change is a ReplicationMessage with lsn, data, xid
process_change(change.lsn, change.data)
BAD vs GOOD: Query Building
# ❌ BAD — String interpolation vulnerable to SQL injection
def get_user_bad(conn, user_id: str):
cur = conn.cursor()
cur.execute(f"SELECT * FROM users WHERE id = '{user_id}'")
return cur.fetchone()
# ✅ GOOD — Parameterized query with %s placeholder
def get_user_good(conn, user_id: str):
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cur.fetchone()
BAD vs GOOD: Dynamic Table Names
# ❌ BAD — String formatting for table names (SQL injection risk)
def count_rows_bad(conn, table_name: str):
cur = conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {table_name}")
# ✅ GOOD — Use psycopg2.sql.Identifier for identifiers
from psycopg2 import sql
def count_rows_good(conn, table_name: str):
with conn.cursor() as cur:
query = sql.SQL("SELECT COUNT(*) FROM {}").format(
sql.Identifier(table_name)
)
cur.execute(query)
return cur.fetchone()[0]
Constraints
MUST DO
- Always parameterize query values with
%s(psycopg2) or$N(asyncpg) — never use f-strings orstr.format() - Use
with conn:for transaction management — auto-commits on success, rolls back on exception - Use connection pools in web applications to avoid connection exhaustion
- Validate pool
max_sizeagainst PostgreSQLmax_connectionssetting - Call
conn.close()or return connections to pool infinallyblocks - Use
copy_from()/copy_to()for bulk operations (10x+ faster than row-by-row INSERT) - Use
sslmode="require"(psycopg2) orssl="require"(asyncpg) in production
MUST NOT DO
- Never concatenate user input into SQL strings — always use parameterized queries
- Do not create a new connection per request in web apps — use connection pools
- Never set
autocommit=Trueunless you fully understand the transaction implications - Avoid
SELECT *in production code — always specify columns explicitly - Do not use
asyncpgwithpsycopg2connection strings (different parameter styles) - Never ignore
OperationalError— it indicates connection or configuration problems
Output Template
When writing PostgreSQL integration code, structure your output as:
- Connection Setup — Driver import, connection parameters, SSL config
- Pool Initialization — Pool creation with min/max sizing for concurrent access
- Query Execution — Parameterized SQL with
withcontext managers for transactions - Error Handling — Specific exception types (
OperationalError,UniqueViolation,ForeignKeyViolation) - Resource Cleanup — Return connections to pool or close them in
finally/ async context managers
Related Skills
| Skill | Purpose |
|---|---|
coding-database-design-modeling |
Schema design, normalization, indexing strategies |
coding-database-migrations |
Alembic and migration workflow patterns |
coding-asyncio-patterns |
Asyncio patterns used with asyncpg connections |
Live References
- psycopg2 Documentation — Official psycopg2 (2.9.x) documentation
- asyncpg Documentation — Official asyncpg (0.31.x) documentation
- psycopg2 Pool Guide — Connection pooling with ThreadedConnectionPool
- asyncpg pool API — asyncpg connection pool reference
- PostgreSQL COPY Documentation — COPY protocol for bulk operations
- PostgreSQL LISTEN/NOTIFY — Async notification channels
- psycopg2 sql Module — Safe SQL composition for dynamic identifiers