name: kpler description: Use when fetching oil/gas trade flow data from Kpler. Covers authentication, trade queries, flow aggregations, entity search, vessel positions, and company fleet data.
Kpler Trade Data API
Access oil and gas trade flows, vessel tracking, and company data from the Kpler terminal.
Prerequisites
- Kpler account with terminal access
- Store credentials in
.env:
KPLER_USERNAME=user@example.com
KPLER_PASSWORD=your_password
Setup
Copy the client module to your project:
cp .claude/skills/kpler/scripts/kpler_client.py scripts/
Install dependencies:
uv add httpx pyjwt python-dotenv
Quick start
import asyncio
from kpler_client import KplerClient
async def main():
async with KplerClient() as client:
# Search for a player
results = await client.search("shell", categories=["PLAYER"])
shell_id = results["players"][0]["entity"]["id"]
# Query their trades
trades = await client.query_trades(players=[shell_id], size=20)
print(f"Found {len(trades['result']['trades'])} trades")
asyncio.run(main())
Authentication
The client handles Auth0 OAuth automatically:
- Auto-login from
.envcredentials - Token storage in
.kpler_token,.kpler_refresh_token - Automatic refresh 5 minutes before expiry
- Retry on 401 with fresh token
Manual auth control:
client = KplerClient()
await client.login("user@example.com", "password") # Manual login
client.is_authenticated() # Check status
await client.logout() # Clear tokens
API methods
Search entities
Find players, vessels, installations, zones, and products by name:
results = await client.search(
text="gazprom",
categories=["PLAYER", "INSTALLATION"], # Optional filter
commodity_types=["lng"], # Optional: lng, oil, lpg, dry
)
# Results grouped by type
for player in results.get("players", []):
print(f"{player['entity']['name']} (ID: {player['entity']['id']})")
Categories: PLAYER, VESSEL, INSTALLATION, ZONE, PRODUCT
Query trades
Get individual trade records:
trades = await client.query_trades(
# Pagination
from_=0,
size=100,
# Filters (use IDs from search - strings work, converted to ints)
locations=[1234], # Zone/installation IDs
products=[5678], # Product IDs
players=[3836], # Company IDs
vessels=[9012], # Vessel IDs
# Trade status
statuses=["completed"], # ongoing, completed, cancelled
trade_types=["import", "export"],
# Options
with_forecasted=False,
)
for trade in trades["data"]:
origin = trade.get("portCallOrigin", {}).get("zone", {}).get("name", "Unknown")
dest = trade.get("portCallDestination", {}).get("zone", {}).get("name", "Unknown")
print(f"{origin} → {dest}")
Query flows (aggregated)
Get aggregated flow data with time series:
flows = await client.query_flows(
# Required
direction="export", # export, import
granularity="months", # years, months, weeks, days
# Date range
start_date="2024-01-01",
end_date="2024-12-31",
# Filters
locations=[1234],
products=[5678],
players=[3836],
# Split results by dimension
split_on="countries", # countries, ports, products, vessels, buyers, sellers
# Options
cumulative=False,
forecasted=False,
intra=False, # Include intra-region flows
)
# Response format: series by date with split values
for entry in flows["series"]:
year = entry["date"]
for dataset in entry.get("datasets", []):
for split in dataset.get("splitValues", []):
vol = split["values"]["volume"]
print(f"{year}: {split['name']} = {vol/1e6:.1f} Mt")
Split options: countries, ports, products, vessels, buyers, sellers, charterers
Get vessel positions
Raw AIS tracking data:
positions = await client.get_vessel_positions(
vessel_id=12345,
start_date="2024-01-01T00:00:00Z", # ISO 8601
end_date="2024-01-31T23:59:59Z",
limit=1000,
)
for pos in positions:
print(f"{pos['timestamp']}: {pos['lat']}, {pos['lon']}")
Get player fleet
Company fleet information:
fleet = await client.get_player_fleet(player_id=3836)
print(f"Company: {fleet['name']}")
print(f"Vessels owned: {len(fleet.get('ownedVessels', []))}")
print(f"Subsidiaries: {len(fleet.get('subsidiaries', []))}")
Query contracts
Long-term agreements and tenders:
contracts = await client.query_contracts(
types=["SPA", "LTA"], # Tender, SPA, LTA, TUA
players=[3836],
from_=0,
size=50,
)
ETL pattern for notebooks
Typical workflow for building a DuckDB database:
# scripts/fetch_kpler.py
import asyncio
import duckdb
from kpler_client import KplerClient
from dotenv import load_dotenv
load_dotenv()
async def fetch_data():
async with KplerClient() as client:
# Search for entities
russia = await client.search("russia", categories=["ZONE"])
russia_id = russia["zones"][0]["entity"]["id"]
# Get export flows
flows = await client.query_flows(
direction="export",
locations=[russia_id],
granularity="months",
start_date="2020-01-01",
end_date="2024-12-31",
split_on="countries",
)
return flows["result"]["series"]
def build_database(data):
con = duckdb.connect("data/data.duckdb")
con.execute("""
CREATE OR REPLACE TABLE flows (
date DATE,
destination VARCHAR,
volume_kt DOUBLE
)
""")
for series in data:
for point in series.get("data", []):
con.execute(
"INSERT INTO flows VALUES (?, ?, ?)",
[point["date"], series["name"], point["value"]]
)
con.close()
if __name__ == "__main__":
data = asyncio.run(fetch_data())
build_database(data)
Add to Makefile:
data:
uv run python scripts/fetch_kpler.py
Rate limits
The Kpler API has rate limits. Add delays for large queries:
import asyncio
for player_id in player_ids:
data = await client.query_trades(players=[player_id])
await asyncio.sleep(0.5) # Rate limit protection
Common entity IDs
Search to find current IDs, as these may change:
| Entity | Example search | Type |
|---|---|---|
| Russia | russia |
ZONE |
| China | china |
ZONE |
| Shell | shell |
PLAYER |
| TotalEnergies | total |
PLAYER |
| Crude oil | crude |
PRODUCT |
| LNG | lng |
PRODUCT |
Troubleshooting
401 Unauthorized: Credentials invalid or expired. Check .env file.
Empty results: Verify entity IDs are correct. Use search() to find current IDs.
Token errors: Delete .kpler_token* files and re-authenticate.
Rate limited: Add delays between requests. The API may temporarily block rapid queries.
API reference
Base URL: https://terminal.kpler.com/api
| Endpoint | Method | Purpose |
|---|---|---|
/trades |
POST | Query trade records |
/flows |
POST | Aggregate flow data |
/contracts |
GET | Contract data |
/players/{id} |
GET | Company fleet info |
/vessels/{id}/positions |
GET | AIS positions |
/graphql/ |
POST | Entity search |