postgresql-sdk

star 4

Integrates PostgreSQL databases using psycopg2 2.9.x and asyncpg 0.31.x with patterns for connection pooling, replication, COPY, and query parameterization.

paulpas By paulpas schedule Updated 6/4/2026

name: postgresql-sdk description: Integrates PostgreSQL databases using psycopg2 2.9.x and asyncpg 0.31.x with patterns for connection pooling, replication, COPY, and query parameterization. license: MIT compatibility: opencode metadata: version: "1.0.0" domain: coding triggers: postgresql, psycopg2, asyncpg, postgres connection pool, sql parameterization, how do i query postgres from python, database replication, libpq archetypes:

  • tactical
  • generation anti_triggers:
  • brainstorming
  • vague ideation
  • code golf
  • over-engineering response_profile: verbosity: low directive_strength: high abstraction_level: operational role: implementation scope: implementation output-format: code content-types:
  • code
  • guidance
  • do-dont
  • examples related-skills: coding-database-design-modeling, coding-database-migrations, coding-asyncio-patterns

PostgreSQL Python SDK Integration

Integrates PostgreSQL databases using psycopg2 2.9.x (synchronous, DB-API 2.0) and asyncpg 0.31.x (asyncio-native) with patterns for connection management, parameterized queries, COPY bulk operations, connection pooling, logical replication, and transaction handling.

TL;DR Checklist

  • Use psycopg2.connect() for synchronous workloads and asyncpg.connect() for asyncio
  • Always parameterize queries with %s (psycopg2) or $1 (asyncpg) — never use string formatting
  • Use psycopg2.pool.ThreadedConnectionPool or asyncpg.pool.create_pool() for connection pooling
  • Use copy_from() / copy_to() for bulk data operations, not row-by-row INSERT
  • Use conn.set_session(autocommit=True) for DDL statements that cannot run in a transaction
  • Always use context managers (with conn:) for automatic transaction management
  • Enable logical replication via asyncpg's replication() for change data capture

When to Use

Use this skill when:

  • Building Python applications that read/write to PostgreSQL databases
  • Implementing connection pooling for web services with concurrent database access
  • Performing bulk data loading or unloading with PostgreSQL COPY protocol
  • Setting up logical replication or listening to LISTEN/NOTIFY channels
  • Writing database migration scripts or ETL pipelines targeting PostgreSQL
  • Building async web frameworks (FastAPI, aiohttp) that need non-blocking database access

When NOT to Use

  • For simple key-value workloads (use Redis or DynamoDB instead)
  • When you need a document store with flexible schemas (use MongoDB instead)
  • For in-memory data caching (use Redis instead)
  • When interacting with other SQL databases (MySQL, SQLite) — use their respective drivers

Core Workflow

1. Connect to PostgreSQL

Choose the driver based on your concurrency model:

# Synchronous — psycopg2 (widely compatible, DB-API 2.0)
import psycopg2
from psycopg2 import sql, OperationalError

conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="mydb",
    user="app_user",
    password=os.environ["PGPASSWORD"],
    connect_timeout=10,
    sslmode="require",
)
# Asynchronous — asyncpg (fastest, asyncio-native)
import asyncpg
import os

conn = await asyncpg.connect(
    host="localhost",
    port=5432,
    database="mydb",
    user="app_user",
    password=os.environ["PGPASSWORD"],
    timeout=10,
    ssl="require",
)

Checkpoint: Verify connectivity with SELECT 1 before running application queries. Handle OperationalError (psycopg2) or asyncpg.InvalidPasswordError (asyncpg) at connection time — do not defer to query execution.

2. Execute Queries with Parameterization

Never interpolate values into SQL strings. Use driver-native placeholders.

# psycopg2 — %s placeholders (positional)
def get_users_by_age(conn, min_age: int, max_age: int) -> list[tuple]:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT id, name, email FROM users WHERE age BETWEEN %s AND %s",
            (min_age, max_age),
        )
        return cur.fetchall()

# asyncpg — $1, $2 placeholders (positional, 1-indexed)
async def get_users_by_age_async(conn, min_age: int, max_age: int) -> list[asyncpg.Record]:
    return await conn.fetch(
        "SELECT id, name, email FROM users WHERE age BETWEEN $1 AND $2",
        min_age, max_age,
    )

Checkpoint: Verify all external input is passed as query parameters, not interpolated. Use sql.Identifier() for dynamic table/column names in psycopg2.

3. Manage Transactions with Context Managers

Use with conn: to automatically commit or rollback on exception.

def transfer_funds(
    conn,
    from_account: str,
    to_account: str,
    amount: Decimal,
) -> None:
    """Execute a funds transfer atomically."""
    with conn:  # Begins transaction; commits on success, rolls back on exception
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                (amount, from_account),
            )
            if cur.rowcount != 1:
                raise ValueError(f"Account {from_account} not found")
            cur.execute(
                "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                (amount, to_account),
            )

Checkpoint: Never nest with conn: blocks. Use savepoints (SAVEPOINT sp; ROLLBACK TO SAVEPOINT sp;) for sub-transactions within a single connection context.

4. Use Connection Pooling for Concurrency

# psycopg2 ThreadedConnectionPool
from psycopg2.pool import ThreadedConnectionPool

pool = ThreadedConnectionPool(
    minconn=2,
    maxconn=10,
    host="localhost",
    dbname="mydb",
    user="app_user",
    password=os.environ["PGPASSWORD"],
)

def query_with_pool(query: str, params: tuple) -> list[tuple]:
    conn = pool.getconn()
    try:
        with conn.cursor() as cur:
            cur.execute(query, params)
            return cur.fetchall()
    finally:
        pool.putconn(conn)
# asyncpg connection pool
pool = await asyncpg.create_pool(
    host="localhost",
    database="mydb",
    user="app_user",
    password=os.environ["PGPASSWORD"],
    min_size=5,
    max_size=20,
)

async def query_with_pool_async(query: str, *args) -> list[asyncpg.Record]:
    async with pool.acquire() as conn:
        return await conn.fetch(query, *args)

Checkpoint: Validate pool sizing against your database max_connections setting. Never let pool max_size exceed the server's connection limit.


Implementation Patterns

Pattern 1: COPY for Bulk Data Loading

import io
import csv

def bulk_insert_users(conn, users: list[dict]) -> int:
    """Insert users using COPY for maximum throughput."""
    buffer = io.StringIO()
    writer = csv.writer(buffer)
    for user in users:
        writer.writerow([user["name"], user["email"], user["age"]])
    buffer.seek(0)

    with conn.cursor() as cur:
        cur.copy_from(
            buffer,
            "users",
            columns=("name", "email", "age"),
            sep=",",
        )
        return cur.rowcount

Pattern 2: LISTEN/NOTIFY for Real-Time Events

import select

def listen_for_events(conn, channel: str, callback) -> None:
    """Listen for PostgreSQL NOTIFY events and invoke a callback."""
    conn.set_session(autocommit=True)
    with conn.cursor() as cur:
        cur.execute(f"LISTEN {channel}")

    while True:
        if select.select([conn], [], [], 5) == ([], [], []):
            continue  # Timeout — poll for other work
        conn.poll()
        for notify in conn.notifies:
            callback(notify.channel, notify.payload)
        conn.notifies.clear()

Pattern 3: Logical Replication Consumer (asyncpg)

async def consume_replication_slot(pool):
    """Read changes from a logical replication slot."""
    async with pool.acquire() as conn:
        await conn.execute("CREATE_REPLICATION_SLOT test_slot LOGICAL pgoutput")
        async for change in conn.replication():
            # Each change is a ReplicationMessage with lsn, data, xid
            process_change(change.lsn, change.data)

BAD vs GOOD: Query Building

# ❌ BAD — String interpolation vulnerable to SQL injection
def get_user_bad(conn, user_id: str):
    cur = conn.cursor()
    cur.execute(f"SELECT * FROM users WHERE id = '{user_id}'")
    return cur.fetchone()

# ✅ GOOD — Parameterized query with %s placeholder
def get_user_good(conn, user_id: str):
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        return cur.fetchone()

BAD vs GOOD: Dynamic Table Names

# ❌ BAD — String formatting for table names (SQL injection risk)
def count_rows_bad(conn, table_name: str):
    cur = conn.cursor()
    cur.execute(f"SELECT COUNT(*) FROM {table_name}")

# ✅ GOOD — Use psycopg2.sql.Identifier for identifiers
from psycopg2 import sql

def count_rows_good(conn, table_name: str):
    with conn.cursor() as cur:
        query = sql.SQL("SELECT COUNT(*) FROM {}").format(
            sql.Identifier(table_name)
        )
        cur.execute(query)
        return cur.fetchone()[0]

Constraints

MUST DO

  • Always parameterize query values with %s (psycopg2) or $N (asyncpg) — never use f-strings or str.format()
  • Use with conn: for transaction management — auto-commits on success, rolls back on exception
  • Use connection pools in web applications to avoid connection exhaustion
  • Validate pool max_size against PostgreSQL max_connections setting
  • Call conn.close() or return connections to pool in finally blocks
  • Use copy_from() / copy_to() for bulk operations (10x+ faster than row-by-row INSERT)
  • Use sslmode="require" (psycopg2) or ssl="require" (asyncpg) in production

MUST NOT DO

  • Never concatenate user input into SQL strings — always use parameterized queries
  • Do not create a new connection per request in web apps — use connection pools
  • Never set autocommit=True unless you fully understand the transaction implications
  • Avoid SELECT * in production code — always specify columns explicitly
  • Do not use asyncpg with psycopg2 connection strings (different parameter styles)
  • Never ignore OperationalError — it indicates connection or configuration problems

Output Template

When writing PostgreSQL integration code, structure your output as:

  1. Connection Setup — Driver import, connection parameters, SSL config
  2. Pool Initialization — Pool creation with min/max sizing for concurrent access
  3. Query Execution — Parameterized SQL with with context managers for transactions
  4. Error Handling — Specific exception types (OperationalError, UniqueViolation, ForeignKeyViolation)
  5. Resource Cleanup — Return connections to pool or close them in finally / async context managers

Related Skills

Skill Purpose
coding-database-design-modeling Schema design, normalization, indexing strategies
coding-database-migrations Alembic and migration workflow patterns
coding-asyncio-patterns Asyncio patterns used with asyncpg connections

Live References

Install via CLI
npx skills add https://github.com/paulpas/agent-skill-router --skill postgresql-sdk
Repository Details
star Stars 4
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator