name: fused-integrations description: Reference for using Fused's built-in integration connections inside UDFs. Covers data sources (Snowflake, BigQuery, GCS, S3, Airtable, Notion, Google Drive), compute/inference providers (Modal, Hugging Face, Baseten, Daytona, ComfyOrg, Slack), and LLM providers (Anthropic, OpenAI) — the fused.api connect helpers, secrets access, and common operations (query, write, list, invoke, infer). Use when the user is writing a UDF that reads from, writes to, or calls out to a connected service.
Fused Integrations
Once an integration is configured (via the Workbench → Integrations UI or fused integrations <provider> connect), use the helpers below inside any UDF. You do not need to manage credentials manually — connections and secrets are resolved by the runtime.
Available integrations: snowflake, bigquery, gcs, s3, airtable, notion, gdrive, modal, huggingface, baseten, daytona, comfy, anthropic, openai, slack (experimental).
Testing integration UDFs
Integration UDFs have two failure modes general UDFs don't: missing access grants and execution-context differences between fused run and public shared URLs. See the fused-udfs skill for general testing guidance — the integration-specific additions are below.
Start with a connectivity smoke test. Before building any logic, verify the integration actually connects and returns data:
@fused.udf
def udf():
# Smoke test: verify connection before writing full logic
nt = fused.api.notion_connect()
client = nt.client()
results = client.search(query="", filter={"value": "page", "property": "object"})
return {"connected": True, "pages_visible": len(results.get("results", []))}
Also test via the shared URL if the UDF will be served publicly (e.g. https://udf.ai/fc_TOKEN/udf_name). Some integrations — notably Notion — behave differently in that unauthenticated execution context. See the Notion section below for details.
External API keys — scope and fallback patterns
A secret stored as fused.secrets["my_api_key"] is just a string — the Fused runtime doesn't know which endpoints it covers. Two things to verify before writing logic that depends on a key:
Test the specific endpoint, not just the key. Many providers (Google, AWS, Stripe) issue keys with per-API or per-product scopes. A key that authenticates successfully against one API may return
403,REQUEST_DENIED, orPERMISSION_DENIEDon another, even from the same provider. Test each endpoint explicitly in isolation before assuming the key covers it.Build a primary + fallback chain for critical paths. If a UDF must geocode, enrich, or classify data and the primary API is unavailable or uncredentialed, having a fallback keeps the rest of the pipeline alive:
@fused.cache
def enrich(item, _v=1):
# Try primary API
result = call_primary_api(item)
if result is not None:
return result
# Fall back to alternative (open API, cached copy, degraded mode)
return call_fallback_api(item)
Rate-limited APIs: Any external API that enforces per-second or per-minute limits needs an explicit sleep between uncached calls. Wrap each call in @fused.cache so the sleep only fires on the first call per unique input; subsequent calls return instantly from cache:
@fused.cache
def call_rate_limited_api(key, _v=1):
import time
time.sleep(1.1) # stay under 1 req/s
return make_request(key)
Without the cache, every UDF run re-fires every API call, burning rate limit and adding latency proportional to the number of unique inputs.
Google Drive
Google Drive access only works inside cloud UDFs — local Python raises "Google Drive is not connected" even if CLI list commands work. Connect via Workbench → Integrations → Google Drive (OAuth browser flow), then grant file/folder access via the Picker UI.
gdrive:// path format
gdrive://<folder_id>/<folder_name>/ # list a folder
gdrive://<file_id>/<filename> # read a specific file
gdrive://root/ # all Picker-granted files at root
List files
@fused.udf
def udf():
return fused.api.list("gdrive://root/")
Read tabular files (CSV, Excel)
@fused.udf
def udf():
import pandas as pd
return pd.read_csv("gdrive://<file_id>/<name>.csv")
@fused.udf
def udf():
import pandas as pd
return pd.read_excel("gdrive://<file_id>/<name>.xlsx")
Google-native formats are auto-exported: Sheets → .xlsx, Docs → .docx, Slides → .pptx, Drawings → .png.
Read binary files / return to frontend
Base64-encode the raw bytes and return a data URI in the DataFrame — no fd:// write needed:
import base64, mimetypes, io, pandas as pd
data = fused.api.get(gdrive_path) # e.g. "gdrive://<id>/<name>.jpg"
file_name = gdrive_path.split("/")[-1]
# Tabular — return DataFrame directly
if file_name.endswith(".csv"):
return pd.read_csv(io.BytesIO(data))
if file_name.endswith((".xlsx", ".xls")):
return pd.read_excel(io.BytesIO(data))
# Binary — return as data URI
mime, _ = mimetypes.guess_type(file_name)
data_uri = f"data:{mime or 'application/octet-stream'};base64,{base64.b64encode(data).decode()}"
return pd.DataFrame([{"file": file_name, "size_kb": round(len(data)/1024, 1), "data_uri": data_uri}])
Use data_uri in the widget as <img src>, <video src>, or <embed src>. For very large files fall back to fd:// staging + fused.api.sign_url().
Upload a file to Google Drive from a URL
@fused.udf
def udf():
import urllib.request, requests
url = "https://example.com/data.csv"
folder_id = "root"
file_name = url.split("?")[0].rstrip("/").split("/")[-1] or "upload.csv"
with urllib.request.urlopen(url) as resp:
data = resp.read()
api = fused.api.api
creds = api.AUTHORIZATION.credentials
r = requests.put(
f"{api.OPTIONS.base_url}/files/upload",
params={"path": f"gdrive://{folder_id}/{file_name}"},
data=data,
headers={
"Authorization": f"Bearer {creds.access_token}",
"Content-Type": "application/octet-stream",
},
)
r.raise_for_status()
A 401 response means credentials belong to a different user — make a copy of the canvas.
Parse fused.api.list() results
items = fused.api.list("gdrive://root/")
for item in items:
stripped = item.replace("gdrive://", "").rstrip("/")
item_id, item_name = stripped.split("/", 1)
is_dir = item.endswith("/")
Snowflake
Docs: https://docs.fused.io/workbench/integrations/snowflake
Simple query
@fused.udf
def udf():
import fused.api
return fused.api.snowflake_query(
"SELECT * FROM my_db.my_schema.my_table LIMIT 10"
)
Reusable connection (multiple operations)
@fused.udf
def udf():
import fused.api
conn = fused.api.snowflake_connect(
warehouse="COMPUTE_WH",
database="ANALYTICS",
schema="PUBLIC",
role="ANALYST",
)
print("Tables:", conn.list_tables("ANALYTICS", "PUBLIC"))
return conn.query("""
SELECT region, SUM(amount) AS total
FROM orders
WHERE order_date >= '2025-01-01'
GROUP BY region
ORDER BY total DESC
""")
Write a DataFrame back to Snowflake
@fused.udf
def udf():
import fused.api, pandas as pd
conn = fused.api.snowflake_connect(
warehouse="COMPUTE_WH", database="ANALYTICS", schema="PUBLIC"
)
df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.1]})
conn.write(df, "ANALYTICS.PUBLIC.METRICS", mode="overwrite")
return df
Read from a Snowflake Stage
@fused.udf
def udf():
import fused.api
conn = fused.api.snowflake_connect(
warehouse="COMPUTE_WH", database="RAW_DATA", schema="INGEST"
)
files = conn.list_stage_files("@csv_stage", pattern=".*[.]csv")
if files:
return conn.read_stage(f"@csv_stage/{files[0].split('/')[-1]}")
Connection methods: .query(), .list_tables(), .write(df, table, mode=), .list_stage_files(), .read_stage()
BigQuery
Docs: https://docs.fused.io/workbench/integrations/bigquery
Credentials are stored as a service-account JSON string in fused.secrets["gcs_fused"].
@fused.udf
def udf():
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_info(
json.loads(fused.secrets["gcs_fused"]),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
query = """
SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
LIMIT 10
"""
return client.query(query).to_dataframe()
# For geospatial results: .to_geodataframe(geography_column="geometry")
Google Cloud Storage (GCS)
Docs: https://docs.fused.io/workbench/integrations/gcs
Credentials are stored as a service-account JSON string in fused.secrets["gcs_fused"].
List files in a bucket
@fused.udf
def udf():
import os
from google.cloud import storage
with open("/tmp/gcs_key.json", "w") as f:
f.write(fused.secrets["gcs_fused"])
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/gcs_key.json"
client = storage.Client()
bucket = client.bucket("your_bucket_name")
blobs = bucket.list_blobs(prefix="path/to/your/data")
print({blob.name for blob in blobs})
Amazon S3
Docs: https://docs.fused.io/workbench/integrations/s3
S3 access is granted via IAM role (configured once in the Integrations UI). No credentials are needed inside the UDF.
List files
@fused.udf
def udf():
return fused.api.list("s3://<BUCKET_NAME>/")
Reading and writing S3 files from UDFs works with standard libraries (boto3, s3fs, pandas) once the role is attached — the runtime inherits the IAM permissions automatically.
Airtable
Docs: https://docs.fused.io/workbench/integrations/airtable
All operations go through fused.api.airtable_connect().
List bases
@fused.udf()
def udf():
at = fused.api.airtable_connect()
bases = at.list_bases()
for base in bases:
print(base["id"], base["name"])
Read records
@fused.udf()
def udf():
import pandas as pd
at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
records = at.list_records(
"Tasks",
view="Grid view",
filterByFormula="{Status} = 'Done'",
maxRecords=100,
)
rows = [{"id": r["id"], **r["fields"]} for r in records]
return pd.DataFrame(rows)
Create records
@fused.udf()
def udf():
at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
created = at.create_records("Tasks", [
{"fields": {"Name": "Buy groceries", "Status": "Todo"}},
{"fields": {"Name": "Write docs", "Status": "In Progress"}},
])
for r in created:
print(r["id"])
Update records
@fused.udf()
def udf():
at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
at.update_records("Tasks", [
{"id": "recXXXXXXXXXXXXXX", "fields": {"Status": "Done"}},
])
Delete records
@fused.udf()
def udf():
at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
at.delete_records("Tasks", ["recAAAAAAAAAAAA", "recBBBBBBBBBBBB"])
Connection methods: .list_bases(), .list_records(table, view=, filterByFormula=, maxRecords=), .create_records(table, rows), .update_records(table, rows), .delete_records(table, ids)
Notion
Docs: https://docs.fused.io/workbench/integrations/notion
All operations go through fused.api.notion_connect(), which returns a thin wrapper. Call .client() to get a full notion-client SDK instance — nt itself does not expose search/create/etc. directly.
Access grant required. If you get
APIResponseError: Could not find page, the Notion integration hasn't been granted access to that page or database. Fix: Workbench → Integrations → Notion → reconnect and grant access to all relevant pages/databases. This is separate from the OAuth connect step.
Notion URL parsing. URLs copied from Notion often include a query string and block anchor:
https://www.notion.so/workspace/Page-abc123?v=uuid&source=copy_link#blockanchor. The#blockanchoris 32 hex chars and will match a page-ID regex if not stripped first. Always clean the URL before extracting the ID:clean = url.split("#")[0].split("?")[0].rstrip("/") slug = clean.split("/")[-1]
Search pages
@fused.udf()
def udf():
nt = fused.api.notion_connect()
client = nt.client()
results = client.search(query="Q4 Planning")
pages = client.search(query="Meeting", filter={"value": "page", "property": "object"})
Get / update a page
@fused.udf()
def udf():
nt = fused.api.notion_connect()
client = nt.client()
page = client.pages.retrieve(page_id="a1b2c3d4-...")
client.pages.update(
page_id="your-page-id",
properties={"Status": {"status": {"name": "Done"}}},
)
Query a database
⚠
client.databases.query()is not available in the Fused Notion client. Calling it raisesAttributeError: 'DatabasesEndpoint' object has no attribute 'query'. Useclient.search()with a page filter instead:
@fused.udf()
def udf():
nt = fused.api.notion_connect()
client = nt.client()
# Search within a specific database by filtering on the database ID via parent
results = client.search(
query="",
filter={"property": "object", "value": "page"},
)
for r in results.get("results", []):
# Check parent to scope to a specific database
if r.get("parent", {}).get("database_id") == "your-database-id":
print(r["id"], r["properties"])
Create a page in a database
@fused.udf()
def udf():
nt = fused.api.notion_connect()
client = nt.client()
client.pages.create(
parent={"database_id": "your-database-id"},
properties={
"Name": {"title": [{"text": {"content": "Weekly Report"}}]},
"Status": {"select": {"name": "Draft"}},
},
)
Create a page with formatted bullet-point content
Pass children as a list of block dicts. Use bulleted_list_item blocks (not a single paragraph with \n) so content renders as actual bullets. Use annotations for bold labels:
@fused.udf()
def udf():
nt = fused.api.notion_connect()
client = nt.client()
def bullet(label, text):
return {
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [
{"type": "text", "text": {"content": label}, "annotations": {"bold": True}},
{"type": "text", "text": {"content": " " + text}},
]
},
}
client.pages.create(
parent={"database_id": "your-database-id"},
properties={
"Name": {"title": [{"text": {"content": "My ticket"}}]},
},
children=[
bullet("What:", "The widget fails on empty datasets."),
bullet("Fix:", "Add a null-check before rendering."),
],
)
Note: Putting formatted content in a single
paragraphblock with\nseparators renders**bold**as literal asterisks. Always use separate block types.
Pull all pages into a DataFrame
@fused.udf()
def udf():
import pandas as pd
nt = fused.api.notion_connect()
client = nt.client()
response = client.search(filter={"value": "page", "property": "object"})
rows = []
for p in response["results"]:
props = p.get("properties", {})
title_parts = props.get("Name", {}).get("title", [])
title = "".join(t["plain_text"] for t in title_parts)
rows.append({"id": p["id"], "title": title, "url": p.get("url"), "last_edited": p.get("last_edited_time")})
return pd.DataFrame(rows)
Read a JSON code block from a Notion page
A useful pattern for lightweight data storage: write a JSON code block to a dedicated Notion page (e.g. from a daily pipeline), then read it back in a UDF. This avoids needing file storage for simple structured data.
@fused.udf
def udf():
import json
nt = fused.api.notion_connect()
client = nt.client()
blocks = client.blocks.children.list(block_id="YOUR_PAGE_ID")
for block in blocks["results"]:
if block["type"] == "code":
raw = block["code"]["rich_text"][0]["plain_text"]
return json.dumps(json.loads(raw)) # validate + return
return json.dumps({})
To write the code block from a pipeline (e.g. a Claude agent), use notion-update-page with replace_content and format the JSON inside a fenced ```json ``` block.
⚠ notion_connect() fails in shared URL execution environment
fused.api.notion_connect() works correctly when running via fused run (local or remote authenticated execution), but can fail with HTTP 422 when the UDF is called via a public shared URL endpoint (e.g. https://udf.ai/fc_TOKEN/udf_name). The Notion OAuth token is not available in that unauthenticated execution context.
Fix: wrap in try/except and fall back to an alternative data source (e.g. Fused file storage):
@fused.udf
def udf():
import json
# Try Notion first (works in authenticated context)
try:
nt = fused.api.notion_connect()
client = nt.client()
blocks = client.blocks.children.list(block_id="YOUR_PAGE_ID")
for block in blocks["results"]:
if block["type"] == "code":
raw = block["code"]["rich_text"][0]["plain_text"]
data = json.loads(raw)
if data:
return json.dumps(data)
except Exception:
pass
# Fall back to Fused file storage (works in all contexts)
try:
import fsspec
with fsspec.open("s3://fused-users/fused/YOUR_USERNAME/data/latest.json", "r") as f:
return json.dumps(json.load(f))
except Exception:
pass
return json.dumps({})
Modal
Docs: https://docs.fused.io/workbench/integrations/modal
Run Modal apps, functions, and serverless GPU workloads. Set MODAL_TOKEN_ID and MODAL_TOKEN_SECRET in Settings > Integrations & Secrets. fused.api.modal_connect() returns an authenticated modal.Client and configures the SDK for the current process — subsequent modal.Function.from_name(...).remote(...) calls authenticate automatically.
Invoke a deployed function
@fused.udf()
def udf(prompt: str = "A high-resolution satellite image of a coastline"):
import modal
fused.api.modal_connect()
generate = modal.Function.from_name("image-gen", "generate")
return generate.remote(prompt)
Fan out with .map()
@fused.udf()
def udf():
import modal, pandas as pd
fused.api.modal_connect()
embed = modal.Function.from_name("embeddings", "embed")
texts = ["hello", "world", "fused", "modal"]
vectors = list(embed.map(texts))
return pd.DataFrame({"text": texts, "embedding": vectors})
Class-based service
fused.api.modal_connect()
Model = modal.Cls.from_name("llm-app", "Llama")
model = Model()
return model.complete.remote("Explain GeoParquet in one paragraph.")
Async (spawn + poll)
call = train.spawn(epochs=10)
# Later UDF: modal.FunctionCall.from_id(call.object_id).get()
Hugging Face
Docs: https://docs.fused.io/workbench/integrations/huggingface
Token stored as HUGGINGFACE_API_KEY. Two helpers, both pre-authenticated:
fused.api.huggingface_connect()→HfApi(repos, models, datasets, files, Spaces)fused.api.huggingface_inference()→InferenceClient(hosted chat, embeddings, text-to-image, ASR…)
Inspect / list models
@fused.udf()
def udf():
import pandas as pd
hf = fused.api.huggingface_connect()
models = hf.list_models(author="meta-llama", limit=20)
return pd.DataFrame([{"id": m.id, "downloads": m.downloads, "likes": m.likes} for m in models])
Download a file from a repo
from huggingface_hub import hf_hub_download
fused.api.huggingface_connect() # configures token
path = hf_hub_download(repo_id="meta-llama/Llama-3.2-1B-Instruct", filename="config.json")
Load a dataset
from datasets import load_dataset
fused.api.huggingface_connect()
ds = load_dataset("squad", split="validation[:100]")
return ds.to_pandas()
Chat completion (hosted inference)
client = fused.api.huggingface_inference()
reply = client.chat_completion(
model="meta-llama/Llama-3.2-3B-Instruct",
messages=[{"role": "user", "content": question}],
max_tokens=256,
)
return reply.choices[0].message.content
Embeddings
client = fused.api.huggingface_inference()
vec = client.feature_extraction("coastal erosion", model="sentence-transformers/all-MiniLM-L6-v2")
Text-to-image
client = fused.api.huggingface_inference()
image = client.text_to_image(prompt, model="black-forest-labs/FLUX.1-schnell") # PIL.Image
Push to a repo
hf = fused.api.huggingface_connect()
hf.upload_file(
path_or_fileobj="results.parquet",
path_in_repo="results.parquet",
repo_id="my-org/my-dataset",
repo_type="dataset",
)
Baseten
Docs: https://docs.fused.io/workbench/integrations/baseten
API key stored as BASETEN_API_KEY. No connect helper — Baseten exposes an OpenAI-compatible endpoint at https://inference.baseten.co/v1, so use the openai SDK (pre-installed in the runtime).
Chat completion
@fused.udf()
def udf(prompt: str = "Summarize what GeoParquet is in one sentence."):
from openai import OpenAI
client = OpenAI(
api_key=fused.secrets["BASETEN_API_KEY"],
base_url="https://inference.baseten.co/v1",
)
response = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3.1",
messages=[{"role": "user", "content": prompt}],
max_tokens=256,
)
return response.choices[0].message.content
Streaming
stream = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3.1",
messages=[{"role": "user", "content": "Write a haiku about S3."}],
stream=True,
)
return "".join(chunk.choices[0].delta.content or "" for chunk in stream)
Dedicated model deployment
For models deployed to your own Baseten workspace, call the model-specific endpoint with an Api-Key header:
import requests
api_key = fused.secrets["BASETEN_API_KEY"]
resp = requests.post(
f"https://model-{model_id}.api.baseten.co/production/predict",
headers={"Authorization": f"Api-Key {api_key}"},
json={"prompt": "Hello, world!"},
timeout=60,
)
resp.raise_for_status()
return resp.json()
Daytona
Docs: https://docs.fused.io/workbench/integrations/daytona
Run untrusted code, manage files, and clone repositories in cloud sandboxes. API key stored as DAYTONA_API_KEY. fused.api.daytona_connect() returns an authenticated Daytona client.
Always clean up sandboxes. Wrap usage in
try/finallywithdaytona.delete(sandbox)— orphaned sandboxes keep running and accrue cost.
Create a sandbox and run code
@fused.udf()
def udf():
daytona = fused.api.daytona_connect()
sandbox = daytona.create()
try:
response = sandbox.process.code_run('import math; print(f"{math.pi:.10f}")')
return response.result
finally:
daytona.delete(sandbox)
Stateful execution (variables persist)
sandbox.code_interpreter.run_code("data = [1, 2, 3, 4, 5]")
result = sandbox.code_interpreter.run_code("print(sum(data))") # 15
Upload / download files
sandbox.fs.upload_file("/home/daytona/input.txt", b"Hello, world!")
content = sandbox.fs.download_file("/home/daytona/input.txt")
Clone a Git repo
sandbox.git.clone(url="https://github.com/fusedio/udfs", path="/home/daytona/udfs")
Custom configuration
from daytona import CreateSandboxFromSnapshotParams
params = CreateSandboxFromSnapshotParams(
language="python",
env_vars={"DEBUG": "true"},
auto_stop_interval=0,
)
sandbox = daytona.create(params, timeout=40)
Client methods: .create(params=, timeout=), .get(sandbox_id), .list(), .delete(sandbox)
Sandbox surfaces: .process.code_run(code), .code_interpreter.run_code(code), .fs.upload_file/.download_file, .git.clone(url=, path=)
ComfyOrg (Comfy Cloud)
Docs: https://docs.fused.io/workbench/integrations/comfy
Run ComfyUI workflows on Comfy Cloud. API key stored as COMFY_API_KEY. There is no connect helper — call the REST API at https://cloud.comfy.org directly.
Prerequisite: export your workflow as JSON via Graph → Export (API) in ComfyUI and upload it somewhere Fused can read (typically S3).
The flow is: load workflow → patch input nodes → POST /api/prompt → poll /api/job/{id}/status → download from /api/view.
@fused.udf()
def udf(prompt: str = "A high-resolution satellite image of a coastline"):
import io, time, requests, numpy as np
from PIL import Image
BASE_URL = "https://cloud.comfy.org"
api_key = fused.secrets["COMFY_API_KEY"]
headers = {"X-API-Key": api_key}
# 1. Load workflow JSON from S3
workflow = requests.get(fused.api.sign_url("s3://your-bucket/workflow.json")).json()
# 2. Patch input nodes — node IDs are top-level keys in the exported JSON
workflow["104:90"]["inputs"]["text"] = prompt
workflow["104:92"]["inputs"]["seed"] = 42
# 3. Submit
prompt_id = requests.post(
f"{BASE_URL}/api/prompt",
headers={**headers, "Content-Type": "application/json"},
json={"prompt": workflow, "extra_data": {"api_key_comfy_org": api_key}},
).json()["prompt_id"]
# 4. Poll
history = {}
for _ in range(120):
job = requests.get(f"{BASE_URL}/api/job/{prompt_id}/status", headers=headers).json()
if job.get("status") in ("failed", "cancelled", "error"):
raise RuntimeError(f"Comfy job failed: {job.get('error_message')}")
if job.get("status") in ("success", "completed"):
history = requests.get(f"{BASE_URL}/api/history_v2/{prompt_id}", headers=headers).json().get(prompt_id, {})
break
time.sleep(5)
# 5. Download first output image
for node_outputs in history.get("outputs", {}).values():
for file_info in node_outputs.get("images", []):
data = requests.get(
f"{BASE_URL}/api/view",
headers=headers,
params={"filename": file_info["filename"], "type": file_info.get("type", "output")},
).content
return np.array(Image.open(io.BytesIO(data)))
Node IDs are workflow-specific. Keys like
"104:90"come from one particular exported workflow — open your own JSON and look up the actual top-level keys for the nodes you want to patch.
Upload an input image
For image-to-video or style-transfer workflows, upload the input first and reference its returned filename:
resp = requests.post(
f"{BASE_URL}/api/upload/image",
headers={"X-API-Key": fused.secrets["COMFY_API_KEY"]},
files={"image": (fname, img_bytes, "image/png")},
)
uploaded_name = resp.json()["name"]
workflow["3"]["inputs"]["image"] = uploaded_name
Cache the slow job, wrap with a thin UDF
Generation is expensive — split the work: a @fused.cache helper that submits/polls/uploads to S3, and a thin @fused.udf(engine="small") wrapper that returns a fused.api.sign_url(...). engine="small" runs as a batch job so you escape the 120-second realtime UDF limit (often necessary for video).
Anthropic
Docs: https://docs.anthropic.com/en/api/client-sdks/python
API key stored as ANTHROPIC_API_KEY. fused.api.anthropic_connect() returns an authenticated anthropic.Anthropic client — the full SDK surface is available (client.messages, client.models, client.beta).
Chat completion
@fused.udf()
def udf(question: str = "Explain map projections in three sentences."):
client = fused.api.anthropic_connect()
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
)
return message.content[0].text
System prompt
client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system="You are a geospatial data expert. Answer concisely.",
messages=[{"role": "user", "content": question}],
)
Streaming
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a Haversine function."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
Tool use
tools = [{
"name": "get_weather",
"description": "Get the current weather for a location.",
"input_schema": {
"type": "object",
"properties": {"latitude": {"type": "number"}, "longitude": {"type": "number"}},
"required": ["latitude", "longitude"],
},
}]
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in San Francisco?"}],
)
for block in message.content:
if block.type == "tool_use":
print(block.name, block.input)
Vision (base64 image)
import base64
with open("satellite.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode()
client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_data}},
{"type": "text", "text": "Describe the land use patterns."},
],
}],
)
Anthropic image format differs from OpenAI. Anthropic uses
{"type": "image", "source": {"type": "base64", "media_type": ..., "data": ...}}. OpenAI uses{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}. Don't swap them.
Token usage is on message.usage.input_tokens / message.usage.output_tokens.
OpenAI
Docs: https://platform.openai.com/docs/libraries/python
API key stored as OPENAI_API_KEY. fused.api.openai_connect() returns an authenticated openai.OpenAI client — the full SDK surface is available (client.chat.completions, client.embeddings, client.images, client.models).
Chat completion
@fused.udf()
def udf(question: str = "Summarize GeoParquet's key features."):
client = fused.api.openai_connect()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": question}],
)
return response.choices[0].message.content
System prompt
client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a geospatial data expert. Answer concisely."},
{"role": "user", "content": question},
],
)
Streaming
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a Haversine function."}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
Function calling
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"latitude": {"type": "number"}, "longitude": {"type": "number"}},
"required": ["latitude", "longitude"],
},
},
}]
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "What's the weather in San Francisco?"}],
tools=tools,
)
for tc in response.choices[0].message.tool_calls or []:
print(tc.function.name, tc.function.arguments) # arguments is a JSON string — json.loads it
Vision (data URI)
import base64
with open("satellite.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode()
client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}},
{"type": "text", "text": "Describe the land use patterns."},
],
}],
)
Embeddings
response = client.embeddings.create(
model="text-embedding-3-small",
input=["geospatial data processing", "satellite imagery analysis"],
)
vectors = [d.embedding for d in response.data]
Token usage is on response.usage.prompt_tokens / response.usage.completion_tokens (note: different names than Anthropic).
Slack (Experimental)
Docs: https://docs.fused.io/workbench/integrations/slack
Slack integration is a team-level Slack bot for talking to a Canvas — it is not a UDF-level helper. Enable it in Preferences → Slack integration, then check the Integrations panel on your Fused home page:
- SYNCED — your team is set up. Go straight to Adding a new Canvas to Slack in the docs.
- STANDBY — run the one-time team setup (Loom walkthrough in the docs).
There is no fused.api.slack_connect() and no SLACK_* secret in UDFs — wiring happens at the team/canvas level, not in code.
Secrets (generic key/value)
Any secret stored via fused secrets set KEY VALUE (or the Workbench UI) is available as fused.secrets["KEY"] inside any UDF. Use this for API keys, tokens, or JSON credential blobs that don't have a first-class connect helper.
@fused.udf
def udf():
api_key = fused.secrets["my_api_key"]
fused.secretsraises on missing keys. Accessing a key that doesn't exist raisesSecretKeyNotFound, notKeyErrorand notNone. Runfused secrets listto verify a key exists before relying on it in a UDF.