name: exchange-ccxt-patterns compatibility: opencode completeness: 95 content-types:
- code
- guidance
- config
- do-dont
description: '"Effective patterns for using CCXT library for exchange connectivity
including" error handling, rate limiting, and state management'
license: MIT
maturity: stable
metadata:
domain: trading
output-format: code
related-skills: ai-order-flow-analysis, data-alternative-data
role: implementation
scope: implementation
triggers: connectivity, effective, exchange ccxt patterns, exchange-ccxt-patterns,
library
archetypes:
- tactical anti_triggers:
- brainstorming
- vague ideation
- no risk management response_profile: verbosity: low directive_strength: high abstraction_level: operational version: "1.0.0"
Role: Guide an AI coding assistant to build robust exchange integrations using CCXT with proper error handling, state management, and performance optimization
Philosophy: CCXT is powerful but requires careful handling. Exchange APIs are the boundary between your system and the real market - they fail, they rate limit, they return inconsistent data. Systems must treat exchange data as untrusted and implement comprehensive error handling, rate limiting, and retry logic while maintaining clean separation between CCXT and trading logic.
Key Principles
Error as Data: Exchange errors are not exceptions to handle, they are data to process. Systems should handle all error cases gracefully and return structured error information.
Rate Limiting is Non-Negotiable: Rate limits are hard constraints, not suggestions. Systems must implement proper rate limiting before any exchange interaction.
Stateful Connections: WebSocket connections maintain state. Systems must track connection state and implement automatic reconnection with exponential backoff.
Exchange Abstraction: Never expose CCXT directly to trading logic. Create a clean abstraction layer that handles CCXT-specific quirks.
Graceful Degradation: When an exchange is unavailable, the system should continue operating with reduced functionality or switch to backup exchanges.
Implementation Guidelines
Structure
- Core logic:
exchange_integration/ccxt_wrapper.py - Error handling:
exchange_integration/errors.py - State management:
exchange_integration/state.py - Utilities:
exchange_integration/utils.py
Patterns to Follow
- Early Exit: Reject operations when exchange state is invalid
- Atomic Predictability: Exchange operations return consistent structures
- Fail Fast: Halt when critical exchange data is unavailable
- Intentional Naming: Clear names that distinguish CCXT calls from trading logic
- Parse Don't Validate: Exchange data parsed at boundaries, validated internally
Code Examples
# Example 1: CCXT Wrapper with Error Handling
import ccxt
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import asyncio
import time
class ExchangeErrorType(Enum):
RATE_LIMIT = "rate_limit"
AUTHENTICATION = "authentication"
NETWORK = "network"
INVALID_PARAMS = "invalid_params"
SERVER_ERROR = "server_error"
UNKNOWN = "unknown"
@dataclass
class ExchangeError:
"""Structured exchange error"""
error_type: ExchangeErrorType
message: str
exchange_id: str
timestamp: datetime
retryable: bool
http_status: Optional[int] = None
ccxt_code: Optional[str] = None
@property
def is_transient(self) -> bool:
"""Is this error transient and worth retrying?"""
return self.retryable and self.error_type in [
ExchangeErrorType.NETWORK,
ExchangeErrorType.RATE_LIMIT,
ExchangeErrorType.SERVER_ERROR
]
class ExchangeWrapper:
"""Wrapper around CCXT with enhanced error handling and state management"""
def __init__(self, exchange_id: str, config: dict = None):
self.exchange_id = exchange_id
self.config = config or {}
# Initialize CCXT exchange
exchange_class = getattr(ccxt, exchange_id)
self.exchange = exchange_class(self._build_ccxt_config())
# State tracking
self.state = ExchangeState()
self.last_rate_limit_reset = 0
self.rate_limit_remaining = 0
# Custom headers for some exchanges
self._setup_custom_headers()
def _build_ccxt_config(self) -> dict:
"""Build CCXT configuration from custom config"""
ccxt_config = {
'enableRateLimit': True,
'timeout': self.config.get('timeout', 30000),
'options': self.config.get('ccxt_options', {})
}
# Add credentials if available
if 'api_key' in self.config:
ccxt_config['apiKey'] = self.config['api_key']
if 'secret' in self.config:
ccxt_config['secret'] = self.config['secret']
if 'password' in self.config:
ccxt_config['password'] = self.config['password']
if 'uid' in self.config:
ccxt_config['uid'] = self.config['uid']
return ccxt_config
def _setup_custom_headers(self):
"""Setup exchange-specific custom headers"""
# Some exchanges require custom headers
custom_headers = self.config.get('custom_headers', {})
if custom_headers:
self.exchange.headers = custom_headers
def _parse_error(self, error: Exception, operation: str) -> ExchangeError:
"""Parse CCXT exception into structured error"""
error_str = str(error)
# Check for known error patterns
if 'rate' in error_str.lower() or 'rateLimit' in error_str.lower():
return ExchangeError(
error_type=ExchangeErrorType.RATE_LIMIT,
message=f"Rate limit exceeded: {error_str}",
exchange_id=self.exchange_id,
timestamp=datetime.now(),
retryable=True
)
if 'auth' in error_str.lower() or 'invalid' in error_str.lower():
return ExchangeError(
error_type=ExchangeErrorType.AUTHENTICATION,
message=f"Authentication error: {error_str}",
exchange_id=self.exchange_id,
timestamp=datetime.now(),
retryable=False
)
if 'network' in error_str.lower() or 'connection' in error_str.lower():
return ExchangeError(
error_type=ExchangeErrorType.NETWORK,
message=f"Network error: {error_str}",
exchange_id=self.exchange_id,
timestamp=datetime.now(),
retryable=True
)
# Check CCXT-specific error codes
if hasattr(error, 'code'):
ccxt_code = getattr(error, 'code', '')
if ccxt_code:
return ExchangeError(
error_type=ExchangeErrorType.UNKNOWN,
message=f"CCXT error: {error_str}",
exchange_id=self.exchange_id,
timestamp=datetime.now(),
retryable=False,
ccxt_code=ccxt_code
)
return ExchangeError(
error_type=ExchangeErrorType.UNKNOWN,
message=f"Unknown error: {error_str}",
exchange_id=self.exchange_id,
timestamp=datetime.now(),
retryable=False
)
async def safe_call(self, operation: str, *args, **kwargs) -> dict:
"""
Safe wrapper for CCXT operations with error handling and retry logic
Usage:
result = await wrapper.safe_call('fetch_ticker', 'BTC/USDT')
"""
max_retries = self.config.get('max_retries', 3)
retry_delay = self.config.get('retry_delay', 1.0)
for attempt in range(max_retries + 1):
try:
# Check rate limit before call
if not self._check_rate_limit():
await self._wait_for_rate_limit_reset()
# Execute the operation
method = getattr(self.exchange, operation)
result = await method(*args, **kwargs)
# Update state after successful call
self._update_state_after_success()
return result
except Exception as error:
exchange_error = self._parse_error(error, operation)
if not exchange_error.is_transient:
# Don't retry non-transient errors
return {'error': exchange_error}
if attempt == max_retries:
# Final attempt failed, return error
return {'error': exchange_error}
# Wait before retry
await asyncio.sleep(retry_delay * (attempt + 1)) # Exponential backoff
def _check_rate_limit(self) -> bool:
"""Check if rate limit allows operation"""
# CCXT has built-in rate limiting, but we track additional metrics
now = time.time()
if now < self.last_rate_limit_reset + self.exchange.rateLimit / 1000:
return False
return True
def _wait_for_rate_limit_reset(self):
"""Wait for rate limit to reset"""
# Use CCXT's built-in rate limiting
time.sleep(self.exchange.rateLimit / 1000)
def _update_state_after_success(self):
"""Update exchange state after successful operation"""
self.state.successes += 1
self.state.failures = 0
self.state.last_success = datetime.now()
Pattern 2: CCXT Rate Limit Awareness with Token Bucket
import time
import asyncio
from dataclasses import dataclass, field
@dataclass
class TokenBucketRateLimiter:
"""Token bucket algorithm for exchange rate limit compliance.
Each exchange has different rate limits (requests per second/window).
This limiter tracks tokens and ensures calls don't exceed the limit
even across multiple parallel requests.
"""
max_tokens: int = 10
refill_rate: float = 1.0 # tokens per second
current_tokens: float = field(default=10.0, init=False)
last_refill_time: float = field(default_factory=time.monotonic, init=False)
def acquire(self, tokens: int = 1) -> bool:
"""Try to acquire N tokens. Returns True if successful, False if rate limited."""
self._refill()
if self.current_tokens >= tokens:
self.current_tokens -= tokens
return True
return False
def wait_for_token(self, timeout: float = 10.0) -> None:
"""Block until a token is available or timeout expires."""
start = time.monotonic()
while time.monotonic() - start < timeout:
if self.acquire():
return
time.sleep(1.0 / self.refill_rate)
raise TimeoutError("Rate limit wait timed out")
def _refill(self) -> None:
"""Refill tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self.last_refill_time
tokens_to_add = elapsed * self.refill_rate
self.current_tokens = min(self.max_tokens, self.current_tokens + tokens_to_add)
self.last_refill_time = now
class CCXTExchangeAdapter:
"""High-level adapter wrapping CCXT exchanges with rate limiting and error handling."""
def __init__(self, exchange_id: str, api_key: str | None = None, secret: str | None = None):
import ccxt
self._exchange = getattr(ccxt, exchange_id)({
"apiKey": api_key or "",
"secret": secret or "",
"enableRateLimit": True,
"options": {"defaultType": "spot"},
})
self._rate_limiter = TokenBucketRateLimiter(
max_tokens=10,
refill_rate=self._exchange.rateLimit / 1000.0,
)
async def fetch_ticker(self, symbol: str) -> dict:
"""Fetch ticker data for a trading pair with rate limit awareness."""
self._rate_limiter.wait_for_token(timeout=5.0)
try:
return self._exchange.fetch_ticker(symbol)
except ccxt.RateLimitExceeded as e:
self._rate_limiter.current_tokens = 0 # Reset bucket on rate limit hit
raise
except ccxt.NetworkError as e:
raise ConnectionError(f"Network error fetching {symbol}: {e}") from e
async def fetch_ohlcv(self, symbol: str, timeframe: str = "1m", limit: int = 500) -> list[list]:
"""Fetch OHLCV candle data for a trading pair."""
self._rate_limiter.wait_for_token()
try:
return self._exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
except ccxt.InvalidOrder as e:
raise ValueError(f"Invalid order parameters for {symbol}: {e}") from e
except ccxt.ExchangeNotAvailable as e:
raise ConnectionError(f"Exchange unavailable: {e}") from e
async def create_limit_order(
self, symbol: str, side: str, amount: float, price: float
) -> dict:
"""Place a limit order with safety checks."""
if amount <= 0 or price <= 0:
raise ValueError(f"Invalid order params: amount={amount}, price={price}")
self._rate_limiter.wait_for_token()
try:
return self._exchange.create_limit_order(symbol, side, amount, price)
except ccxt.InsufficientFunds as e:
raise InsufficientBalanceError(f"Insufficient funds for order: {e}") from e
except ccxt.InvalidOrder as e:
raise InvalidOrderError(f"Invalid order on {symbol}: {e}") from e
class InsufficientBalanceError(Exception):
"""Raised when account balance is insufficient for an order."""
pass
class InvalidOrderError(Exception):
"""Raised when the exchange rejects an order."""
pass
Constraints
MUST DO
- Implement a unified adapter interface across all exchange integrations to standardize order placement, cancellation, and querying
- Handle rate limiting proactively with token bucket or leaky bucket algorithms — never wait for 429 responses before slowing down
- Maintain local order state as the source of truth; reconcile with exchange state periodically via webhook events and polling
- Implement heartbeat monitoring per exchange connection with automatic failover to a secondary data feed on timeout
- Log all API interactions including request/response IDs, timing, and status codes for audit and debugging
MUST NOT DO
- Do not trust exchange-reported order states without local confirmation — always reconcile after every state change
- Avoid sending multiple orders for the same position simultaneously across different adapters or sessions
- Never store API keys or secrets in code — use environment variables or a secrets manager with automatic rotation
- Do not assume all exchanges support the same order types — implement graceful degradation with clear capability negotiation
- Avoid polling-based price updates when WebSocket/streaming APIs are available — polling creates unnecessary load and latency