name: cocoindex description: This skill should be used when building data processing pipelines with CocoIndex, a Python library for incremental data transformation. Use when the task involves processing files/data into databases, creating vector embeddings, building knowledge graphs, ETL workflows, or any data pipeline requiring automatic change detection and incremental updates. CocoIndex is Python-native (supports any Python types), has no DSL, and uses version 1.0.0 or later.
CocoIndex
CocoIndex is a Python library for building incremental data processing pipelines with declarative target states. Think spreadsheets or React for data pipelines: declare what the output should look like based on current input, and CocoIndex automatically handles incremental updates, change detection, and syncing to external systems.
Overview
CocoIndex enables building data pipelines that:
- Automatically handle incremental updates: Only reprocess changed data
- Use declarative target states: Declare what should exist, not how to update
- Support any Python types: No custom DSL -- use dataclasses, Pydantic, NamedTuple
- Provide function memoization: Skip expensive operations when inputs/code unchanged
- Sync to multiple targets: PostgreSQL, SQLite, LanceDB, Qdrant, SurrealDB, Apache Doris, file systems, Kafka
Key principle: TargetState = Transform(SourceState)
When to Use This Skill
Use this skill when building pipelines that involve:
- Document processing: PDF/Markdown conversion, text extraction, chunking
- Vector embeddings: Embedding documents/code for semantic search
- Database transformations: ETL from source DB to target DB
- Knowledge graphs: Extract entities and relationships from data
- LLM-based extraction: Structured data extraction using LLMs
- File-based pipelines: Transform files from one format to another
- Incremental indexing: Keep search indexes up-to-date with source changes
- Streaming pipelines: Kafka-based real-time data processing
Quick Start: Creating a New Project
Initialize Project
cocoindex init my-project
cd my-project
This creates: main.py, pyproject.toml, .env, README.md.
Add Dependencies
# For vector embeddings with PostgreSQL
dependencies = ["cocoindex>=1.0.0", "sentence-transformers", "asyncpg"]
# For LLM extraction
dependencies = ["cocoindex>=1.0.0", "litellm", "instructor", "pydantic>=2.0"]
See references/setup_project.md for complete examples.
Run the Pipeline
pip install -e .
cocoindex update main.py
Core Concepts
1. Apps
An App is the top-level executable that binds a main function with parameters:
import cocoindex as coco
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
...
app = coco.App(
coco.AppConfig(name="MyApp"),
app_main,
sourcedir=pathlib.Path("./data"),
)
2. Functions (@coco.fn)
The @coco.fn decorator marks functions as CocoIndex processing functions. Add memo=True to skip re-execution when inputs/code are unchanged:
@coco.fn(memo=True)
async def expensive_operation(data: str) -> Result:
# LLM call, embedding generation, heavy computation
return await expensive_transform(data)
Key parameters:
memo=True-- Enable memoization (skip if inputs/code unchanged)version=1-- Explicit version bump to force re-executionbatching=True-- Auto-batch concurrent calls (async only)runner=coco.GPU-- Serialize GPU-bound execution
3. Processing Components
A processing component groups an item's processing with its target states.
Mount components with mount_each() (preferred for lists) or mount():
# One component per item (preferred for lists)
await coco.mount_each(process_file, files.items(), target_table)
# Single component (subpath auto-derived from fn.__name__)
await coco.mount(setup_fn, arg1)
# Dependent component (blocks until result returned)
result = await coco.use_mount(init_fn)
# Explicit subpath (when you need a specific path, e.g. in loops)
await coco.mount(coco.component_subpath("item", item_id), process_item, item)
Key points:
- All mount APIs are
async mount(),use_mount(), andmount_each()auto-derive subpath fromfn.__name__; optional explicit subpath as first arg- Use
use_mount()when you need the return value - Use stable component paths for proper memoization and cleanup
4. Target States
Declare what should exist -- CocoIndex handles creation/update/deletion:
# Database row target
table.declare_row(row=MyRecord(id=1, name="example"))
# File target
localfs.declare_file(outdir / "output.txt", content, create_parent_dirs=True)
# Kafka message target
topic_target.declare_target_state(key="msg-1", value=json.dumps(data))
5. Context for Shared Resources
Use ContextKey to share expensive resources (DB connections, models) across components:
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(EMBEDDER, SentenceTransformerEmbedder("all-MiniLM-L6-v2"))
yield
# In processing functions:
embedder = coco.use_context(EMBEDDER)
The @coco.lifespan decorator registers the function to the default CocoIndex environment, which is shared among all apps by default. ContextKey also serves as the stable identity for sources/targets -- the key string must remain stable across runs.
6. ID Generation
Generate stable, unique identifiers that persist across incremental updates:
from cocoindex.resources.id import generate_id, IdGenerator
# Deterministic: same dep -> same ID
chunk_id = await generate_id(chunk.content)
# Always distinct: each call -> new ID, even with same dep
id_gen = IdGenerator()
for chunk in chunks:
chunk_id = await id_gen.next_id(chunk.content)
7. Catch-Up vs Live Mode
By default, app.update() runs in catch-up mode: it scans all sources, processes what changed since the last run (memoized components are skipped), syncs target states, and returns. Each call still has to scan sources to discover changes.
Live mode keeps the app running after catch-up and lets components stream changes continuously from their sources (e.g., file watcher, Kafka consumer), applying them with very low latency.
# Enable live mode
app.update_blocking(live=True)
# Or: cocoindex update main.py -L
Two things are needed: (1) enable live mode on the app, and (2) use a source that supports live updates.
LiveMapViewsources (e.g.,localfs.walk_dir(..., live=True)) scan current state first, then watch for changes. They also work in catch-up mode -- write the pipeline once, choose mode at run time.LiveMapFeedsources (e.g.,kafka.topic_as_map()) only stream changes with no initial snapshot.mount_each()auto-detects these and creates a live component internally.
# LocalFS with live watching
files = localfs.walk_dir(sourcedir, live=True, ...)
await coco.mount_each(process_file, files.items(), target)
# Kafka -- inherently live
items = kafka.topic_as_map(consumer, ["my-topic"])
await coco.mount_each(process_message, items, target)
Common Pipeline Patterns
Pattern 1: File Transformation
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
@coco.fn(memo=True)
async def process_file(file: FileLike, outdir: pathlib.Path) -> None:
content = await file.read_text()
transformed = transform_content(content)
outname = file.file_path.path.stem + ".out"
localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)
@coco.fn
async def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
)
await coco.mount_each(process_file, files.items(), outdir)
app = coco.App(
coco.AppConfig(name="Transform"),
app_main,
sourcedir=pathlib.Path("./data"),
outdir=pathlib.Path("./out"),
)
Pattern 2: Vector Embedding Pipeline
import pathlib
from dataclasses import dataclass
from typing import AsyncIterator, Annotated
import asyncpg
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
DATABASE_URL = "postgres://cocoindex:cocoindex@localhost/cocoindex"
PG_DB = coco.ContextKey[asyncpg.Pool]("pg_db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
_splitter = RecursiveSplitter()
@dataclass
class DocEmbedding:
id: int
filename: str
text: str
embedding: Annotated[NDArray, EMBEDDER] # Dimensions inferred from ContextKey
chunk_start: int
chunk_end: int
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder("all-MiniLM-L6-v2"))
yield
@coco.fn
async def process_chunk(
chunk: Chunk, filename: pathlib.PurePath,
id_gen: IdGenerator, table: postgres.TableTarget[DocEmbedding],
) -> None:
table.declare_row(row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await coco.use_context(EMBEDDER).embed(chunk.text),
chunk_start=chunk.start.char_offset,
chunk_end=chunk.end.char_offset,
))
@coco.fn(memo=True)
async def process_file(file: FileLike, table: postgres.TableTarget[DocEmbedding]) -> None:
text = await file.read_text()
chunks = _splitter.split(text, chunk_size=2000, chunk_overlap=500)
id_gen = IdGenerator()
await coco.map(process_chunk, chunks, file.file_path.path, id_gen, table)
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
target_table = await postgres.mount_table_target(
PG_DB,
table_name="embeddings",
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
)
target_table.declare_vector_index(column="embedding")
files = localfs.walk_dir(sourcedir, recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]))
await coco.mount_each(process_file, files.items(), target_table)
app = coco.App(coco.AppConfig(name="Embedding"), app_main, sourcedir=pathlib.Path("./data"))
Pattern 3: LLM-Based Extraction
import instructor
from pydantic import BaseModel
from litellm import acompletion
_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)
class ExtractionResult(BaseModel):
title: str
topics: list[str]
@coco.fn(memo=True) # Memo avoids re-calling LLM
async def extract_and_store(content: str, message_id: int, table) -> None:
result = await _instructor_client.chat.completions.create(
model="gpt-4",
response_model=ExtractionResult,
messages=[{"role": "user", "content": f"Extract topics: {content}"}],
)
table.declare_row(row=Message(id=message_id, title=result.title, content=content))
Connectors and Operations
CocoIndex provides connectors for reading from and writing to external systems:
| Connector | Source | Target | Vectors | Use Case |
|---|---|---|---|---|
| PostgreSQL | Y | Y | pgvector | Production SQL + vectors |
| SQLite | - | Y | sqlite-vec | Local SQL + vectors |
| LanceDB | - | Y | Y | Cloud-native vector DB |
| Qdrant | - | Y | Y | Specialized vector DB |
| SurrealDB | - | Y | Y | Graph + document DB |
| Apache Doris | - | Y | Y | Analytical DB + vectors |
| LocalFS | Y | Y | N/A | File-based pipelines |
| Amazon S3 | Y | - | N/A | Cloud object storage |
| Kafka | Y | Y | N/A | Streaming pipelines |
| Google Drive | Y | - | N/A | Cloud file source |
For detailed connector documentation, see references/connectors.md.
Text and Embedding Operations
Text Splitting
from cocoindex.ops.text import RecursiveSplitter, detect_code_language
splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")
chunks = splitter.split(text, chunk_size=1000, chunk_overlap=200, language=language)
Embeddings
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
embedding = await embedder.embed(text) # Returns NDArray
CLI Commands
cocoindex init my-project # Create new project
cocoindex update main.py # Run app
cocoindex update main.py:my_app # Run specific app
cocoindex update main.py -L # Run in live mode (continuous)
cocoindex update main.py --full-reprocess # Reprocess everything
cocoindex drop main.py [-f] # Drop and reset all state
cocoindex ls [main.py] # List apps
cocoindex show main.py [--tree] # Show component paths
Best Practices
1. Use @coco.fn on All Processing Functions
Every function that participates in the pipeline (declares target states, calls mount APIs, etc.) must be decorated with @coco.fn.
2. Add Memoization for Expensive Operations
@coco.fn(memo=True) # Skip re-execution when inputs/code unchanged
async def process_chunk(chunk, table):
embedding = await embedder.embed(chunk.text) # Expensive!
table.declare_row(...)
3. Use Stable Component Paths
# Good: Stable identifiers
coco.component_subpath("file", str(file.file_path.path))
coco.component_subpath("record", record.id)
# Bad: Unstable identifiers
coco.component_subpath("file", file) # Object reference
coco.component_subpath("idx", idx) # Index changes
4. Use Context for Shared Resources
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
yield
5. Use Annotated[NDArray, CONTEXT_KEY] for Vectors
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@dataclass
class Record:
vector: Annotated[NDArray, EMBEDDER] # Auto-infer dimensions from ContextKey
6. Use Convenience APIs for Targets
# Mount table target -- subpath is automatic
table = await postgres.mount_table_target(
PG_DB,
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
Troubleshooting
Everything Reprocessing
Add memo=True to expensive functions:
@coco.fn(memo=True) # Add this
async def process_item(item):
...
Memoization Not Working
Check component paths are stable. Use stable IDs, not object references.
Resources
references/
- api_reference.md: Quick API reference
- connectors.md: Complete connector reference
- patterns.md: Detailed pipeline patterns
- setup_project.md: Project setup guide
- setup_database.md: Database setup guide
Runnable examples
Every pattern above has a complete, runnable app under
examples/ — start
from the one closest to the task and adapt it. Each has its own README.md and
most Python examples have a .env.example; see
examples/AGENTS.md
for the full map, credentials, and per-example run commands. Good starting points:
- Vector search →
text_embedding(Postgres),text_embedding_qdrant/_lancedb(other stores) - Code search →
code_embedding - LLM extraction →
hn_trending_topics,patient_intake_extraction_baml - Knowledge graph →
conversation_to_knowledge,meeting_notes_graph_neo4j - Custom transform →
files_transform,pdf_to_markdown
External
- CocoIndex Documentation — full text at llms-full.txt
- GitHub Examples
Version Note
This skill is for CocoIndex >=1.0.0 (v1). It uses a completely different API from v0.
v0 code is what you likely learned from training data — do not emit it. If you find yourself writing any of these symbols, you are using the removed v0 API:
| v0 (removed) | v1 equivalent |
|---|---|
@cocoindex.flow_def, FlowBuilder, Flow, open_flow |
coco.App + a @coco.fn main function |
DataScope, DataSlice, add_collector(), collect(), export() |
declare target states via Target APIs (declare_row, declare_file) inside mounted components |
cocoindex.sources.LocalFile, cocoindex.sources.* |
connector APIs, e.g. localfs.walk_dir(...) |
cocoindex.functions.SplitRecursively, cocoindex.functions.* |
cocoindex.ops.*, e.g. RecursiveSplitter |
cocoindex.targets.Postgres, cocoindex.targets.* / storages.* |
connector targets, e.g. postgres.declare_table_target(...) |
transform_flow, cocoindex.op.function() |
plain @coco.fn functions |
cocoindex.init(), settings, COCOINDEX_DATABASE_URL |
coco.App(coco.AppConfig(...)); state lives in a local db path |
CLI cocoindex setup |
no setup step — just cocoindex update (-L/--live for live mode) |
When reading third-party tutorials or model memory that mention these v0 symbols, disregard them and use the patterns in this skill and references/api_reference.md instead.