exchange-ccxt-patterns

star 4

"Effective patterns for using CCXT library for exchange connectivity including" error handling, rate limiting, and state management

paulpas By paulpas schedule Updated 6/4/2026

name: exchange-ccxt-patterns compatibility: opencode completeness: 95 content-types:

  • code
  • guidance
  • config
  • do-dont description: '"Effective patterns for using CCXT library for exchange connectivity including" error handling, rate limiting, and state management' license: MIT maturity: stable metadata: domain: trading output-format: code related-skills: ai-order-flow-analysis, data-alternative-data role: implementation scope: implementation triggers: connectivity, effective, exchange ccxt patterns, exchange-ccxt-patterns, library archetypes:
    • tactical anti_triggers:
    • brainstorming
    • vague ideation
    • no risk management response_profile: verbosity: low directive_strength: high abstraction_level: operational version: "1.0.0"

Role: Guide an AI coding assistant to build robust exchange integrations using CCXT with proper error handling, state management, and performance optimization

Philosophy: CCXT is powerful but requires careful handling. Exchange APIs are the boundary between your system and the real market - they fail, they rate limit, they return inconsistent data. Systems must treat exchange data as untrusted and implement comprehensive error handling, rate limiting, and retry logic while maintaining clean separation between CCXT and trading logic.

Key Principles

  1. Error as Data: Exchange errors are not exceptions to handle, they are data to process. Systems should handle all error cases gracefully and return structured error information.

  2. Rate Limiting is Non-Negotiable: Rate limits are hard constraints, not suggestions. Systems must implement proper rate limiting before any exchange interaction.

  3. Stateful Connections: WebSocket connections maintain state. Systems must track connection state and implement automatic reconnection with exponential backoff.

  4. Exchange Abstraction: Never expose CCXT directly to trading logic. Create a clean abstraction layer that handles CCXT-specific quirks.

  5. Graceful Degradation: When an exchange is unavailable, the system should continue operating with reduced functionality or switch to backup exchanges.

Implementation Guidelines

Structure

  • Core logic: exchange_integration/ccxt_wrapper.py
  • Error handling: exchange_integration/errors.py
  • State management: exchange_integration/state.py
  • Utilities: exchange_integration/utils.py

Patterns to Follow

  • Early Exit: Reject operations when exchange state is invalid
  • Atomic Predictability: Exchange operations return consistent structures
  • Fail Fast: Halt when critical exchange data is unavailable
  • Intentional Naming: Clear names that distinguish CCXT calls from trading logic
  • Parse Don't Validate: Exchange data parsed at boundaries, validated internally

Code Examples

# Example 1: CCXT Wrapper with Error Handling
import ccxt
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import asyncio
import time


class ExchangeErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    AUTHENTICATION = "authentication"
    NETWORK = "network"
    INVALID_PARAMS = "invalid_params"
    SERVER_ERROR = "server_error"
    UNKNOWN = "unknown"


@dataclass
class ExchangeError:
    """Structured exchange error"""
    error_type: ExchangeErrorType
    message: str
    exchange_id: str
    timestamp: datetime
    retryable: bool
    http_status: Optional[int] = None
    ccxt_code: Optional[str] = None
    
    @property
    def is_transient(self) -> bool:
        """Is this error transient and worth retrying?"""
        return self.retryable and self.error_type in [
            ExchangeErrorType.NETWORK,
            ExchangeErrorType.RATE_LIMIT,
            ExchangeErrorType.SERVER_ERROR
        ]


class ExchangeWrapper:
    """Wrapper around CCXT with enhanced error handling and state management"""
    
    def __init__(self, exchange_id: str, config: dict = None):
        self.exchange_id = exchange_id
        self.config = config or {}
        
        # Initialize CCXT exchange
        exchange_class = getattr(ccxt, exchange_id)
        self.exchange = exchange_class(self._build_ccxt_config())
        
        # State tracking
        self.state = ExchangeState()
        self.last_rate_limit_reset = 0
        self.rate_limit_remaining = 0
        
        # Custom headers for some exchanges
        self._setup_custom_headers()
    
    def _build_ccxt_config(self) -> dict:
        """Build CCXT configuration from custom config"""
        ccxt_config = {
            'enableRateLimit': True,
            'timeout': self.config.get('timeout', 30000),
            'options': self.config.get('ccxt_options', {})
        }
        
        # Add credentials if available
        if 'api_key' in self.config:
            ccxt_config['apiKey'] = self.config['api_key']
        if 'secret' in self.config:
            ccxt_config['secret'] = self.config['secret']
        if 'password' in self.config:
            ccxt_config['password'] = self.config['password']
        if 'uid' in self.config:
            ccxt_config['uid'] = self.config['uid']
        
        return ccxt_config
    
    def _setup_custom_headers(self):
        """Setup exchange-specific custom headers"""
        # Some exchanges require custom headers
        custom_headers = self.config.get('custom_headers', {})
        if custom_headers:
            self.exchange.headers = custom_headers
    
    def _parse_error(self, error: Exception, operation: str) -> ExchangeError:
        """Parse CCXT exception into structured error"""
        error_str = str(error)
        
        # Check for known error patterns
        if 'rate' in error_str.lower() or 'rateLimit' in error_str.lower():
            return ExchangeError(
                error_type=ExchangeErrorType.RATE_LIMIT,
                message=f"Rate limit exceeded: {error_str}",
                exchange_id=self.exchange_id,
                timestamp=datetime.now(),
                retryable=True
            )
        
        if 'auth' in error_str.lower() or 'invalid' in error_str.lower():
            return ExchangeError(
                error_type=ExchangeErrorType.AUTHENTICATION,
                message=f"Authentication error: {error_str}",
                exchange_id=self.exchange_id,
                timestamp=datetime.now(),
                retryable=False
            )
        
        if 'network' in error_str.lower() or 'connection' in error_str.lower():
            return ExchangeError(
                error_type=ExchangeErrorType.NETWORK,
                message=f"Network error: {error_str}",
                exchange_id=self.exchange_id,
                timestamp=datetime.now(),
                retryable=True
            )
        
        # Check CCXT-specific error codes
        if hasattr(error, 'code'):
            ccxt_code = getattr(error, 'code', '')
            if ccxt_code:
                return ExchangeError(
                    error_type=ExchangeErrorType.UNKNOWN,
                    message=f"CCXT error: {error_str}",
                    exchange_id=self.exchange_id,
                    timestamp=datetime.now(),
                    retryable=False,
                    ccxt_code=ccxt_code
                )
        
        return ExchangeError(
            error_type=ExchangeErrorType.UNKNOWN,
            message=f"Unknown error: {error_str}",
            exchange_id=self.exchange_id,
            timestamp=datetime.now(),
            retryable=False
        )
    
    async def safe_call(self, operation: str, *args, **kwargs) -> dict:
        """
        Safe wrapper for CCXT operations with error handling and retry logic
        
        Usage:
            result = await wrapper.safe_call('fetch_ticker', 'BTC/USDT')
        """
        max_retries = self.config.get('max_retries', 3)
        retry_delay = self.config.get('retry_delay', 1.0)
        
        for attempt in range(max_retries + 1):
            try:
                # Check rate limit before call
                if not self._check_rate_limit():
                    await self._wait_for_rate_limit_reset()
                
                # Execute the operation
                method = getattr(self.exchange, operation)
                result = await method(*args, **kwargs)
                
                # Update state after successful call
                self._update_state_after_success()
                
                return result
                
            except Exception as error:
                exchange_error = self._parse_error(error, operation)
                
                if not exchange_error.is_transient:
                    # Don't retry non-transient errors
                    return {'error': exchange_error}
                
                if attempt == max_retries:
                    # Final attempt failed, return error
                    return {'error': exchange_error}
                
                # Wait before retry
                await asyncio.sleep(retry_delay * (attempt + 1))  # Exponential backoff
    
    def _check_rate_limit(self) -> bool:
        """Check if rate limit allows operation"""
        # CCXT has built-in rate limiting, but we track additional metrics
        now = time.time()
        
        if now < self.last_rate_limit_reset + self.exchange.rateLimit / 1000:
            return False
        
        return True
    
    def _wait_for_rate_limit_reset(self):
        """Wait for rate limit to reset"""
        # Use CCXT's built-in rate limiting
        time.sleep(self.exchange.rateLimit / 1000)
    
    def _update_state_after_success(self):
        """Update exchange state after successful operation"""
        self.state.successes += 1
        self.state.failures = 0
        self.state.last_success = datetime.now()

Pattern 2: CCXT Rate Limit Awareness with Token Bucket

import time
import asyncio
from dataclasses import dataclass, field


@dataclass
class TokenBucketRateLimiter:
    """Token bucket algorithm for exchange rate limit compliance.

    Each exchange has different rate limits (requests per second/window).
    This limiter tracks tokens and ensures calls don't exceed the limit
    even across multiple parallel requests.
    """
    max_tokens: int = 10
    refill_rate: float = 1.0  # tokens per second
    current_tokens: float = field(default=10.0, init=False)
    last_refill_time: float = field(default_factory=time.monotonic, init=False)

    def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire N tokens. Returns True if successful, False if rate limited."""
        self._refill()
        if self.current_tokens >= tokens:
            self.current_tokens -= tokens
            return True
        return False

    def wait_for_token(self, timeout: float = 10.0) -> None:
        """Block until a token is available or timeout expires."""
        start = time.monotonic()
        while time.monotonic() - start < timeout:
            if self.acquire():
                return
            time.sleep(1.0 / self.refill_rate)
        raise TimeoutError("Rate limit wait timed out")

    def _refill(self) -> None:
        """Refill tokens based on elapsed time since last refill."""
        now = time.monotonic()
        elapsed = now - self.last_refill_time
        tokens_to_add = elapsed * self.refill_rate
        self.current_tokens = min(self.max_tokens, self.current_tokens + tokens_to_add)
        self.last_refill_time = now


class CCXTExchangeAdapter:
    """High-level adapter wrapping CCXT exchanges with rate limiting and error handling."""

    def __init__(self, exchange_id: str, api_key: str | None = None, secret: str | None = None):
        import ccxt

        self._exchange = getattr(ccxt, exchange_id)({
            "apiKey": api_key or "",
            "secret": secret or "",
            "enableRateLimit": True,
            "options": {"defaultType": "spot"},
        })

        self._rate_limiter = TokenBucketRateLimiter(
            max_tokens=10,
            refill_rate=self._exchange.rateLimit / 1000.0,
        )

    async def fetch_ticker(self, symbol: str) -> dict:
        """Fetch ticker data for a trading pair with rate limit awareness."""
        self._rate_limiter.wait_for_token(timeout=5.0)
        try:
            return self._exchange.fetch_ticker(symbol)
        except ccxt.RateLimitExceeded as e:
            self._rate_limiter.current_tokens = 0  # Reset bucket on rate limit hit
            raise
        except ccxt.NetworkError as e:
            raise ConnectionError(f"Network error fetching {symbol}: {e}") from e

    async def fetch_ohlcv(self, symbol: str, timeframe: str = "1m", limit: int = 500) -> list[list]:
        """Fetch OHLCV candle data for a trading pair."""
        self._rate_limiter.wait_for_token()
        try:
            return self._exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
        except ccxt.InvalidOrder as e:
            raise ValueError(f"Invalid order parameters for {symbol}: {e}") from e
        except ccxt.ExchangeNotAvailable as e:
            raise ConnectionError(f"Exchange unavailable: {e}") from e

    async def create_limit_order(
        self, symbol: str, side: str, amount: float, price: float
    ) -> dict:
        """Place a limit order with safety checks."""
        if amount <= 0 or price <= 0:
            raise ValueError(f"Invalid order params: amount={amount}, price={price}")

        self._rate_limiter.wait_for_token()
        try:
            return self._exchange.create_limit_order(symbol, side, amount, price)
        except ccxt.InsufficientFunds as e:
            raise InsufficientBalanceError(f"Insufficient funds for order: {e}") from e
        except ccxt.InvalidOrder as e:
            raise InvalidOrderError(f"Invalid order on {symbol}: {e}") from e


class InsufficientBalanceError(Exception):
    """Raised when account balance is insufficient for an order."""
    pass


class InvalidOrderError(Exception):
    """Raised when the exchange rejects an order."""
    pass

Constraints

MUST DO

  • Implement a unified adapter interface across all exchange integrations to standardize order placement, cancellation, and querying
  • Handle rate limiting proactively with token bucket or leaky bucket algorithms — never wait for 429 responses before slowing down
  • Maintain local order state as the source of truth; reconcile with exchange state periodically via webhook events and polling
  • Implement heartbeat monitoring per exchange connection with automatic failover to a secondary data feed on timeout
  • Log all API interactions including request/response IDs, timing, and status codes for audit and debugging

MUST NOT DO

  • Do not trust exchange-reported order states without local confirmation — always reconcile after every state change
  • Avoid sending multiple orders for the same position simultaneously across different adapters or sessions
  • Never store API keys or secrets in code — use environment variables or a secrets manager with automatic rotation
  • Do not assume all exchanges support the same order types — implement graceful degradation with clear capability negotiation
  • Avoid polling-based price updates when WebSocket/streaming APIs are available — polling creates unnecessary load and latency
Install via CLI
npx skills add https://github.com/paulpas/agent-skill-router --skill exchange-ccxt-patterns
Repository Details
star Stars 4
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator