Loading...
Loading...
Comprehensive toolkit for developing with the CocoIndex library. Use when users need to create data transformation pipelines (flows), write custom functions, or operate flows via CLI or API. Covers building ETL workflows for AI data processing, including embedding documents into vector databases, building knowledge graphs, creating search indexes, or processing data streams with incremental updates.
npx skill4agent add davila7/claude-code-templates cocoindexcocoindexcocoindex[embeddings]SentenceTransformerEmbedcocoindex[colpali]ColPaliEmbedImageColPaliEmbedQuerycocoindex[lancedb]cocoindex[embeddings,lancedb]embeddingscolpalilancedbpyproject.tomlCOCOINDEX_DATABASE_URLpostgres://cocoindex:cocoindex@localhost/cocoindex.env# Database connection (required - internal storage)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API keys (add the ones you need)
OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings)
ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only)
GOOGLE_API_KEY=... # For Gemini (generation + embeddings)
VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)
# Ollama requires no API key (local)# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Flow definition here
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. Import source data
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. Create collector(s) for outputs
collector = data_scope.add_collector()
# 3. Transform data (iterate through rows)
with data_scope["source_name"].row() as item:
# Apply transformations
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# Nested iteration (e.g., chunks within documents)
with item["nested_table"].row() as nested_item:
# More transformations
nested_item["embedding"] = nested_item["text"].transform(...)
# Collect data for export
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. Export to target
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # If needed
).row()item["new_field"] = item["existing_field"].transform(...)new_field = item["existing_field"].transform(...)with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ Local variable
summaries_collector.collect(filename=file["filename"], summary=summary)with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ Field assignment
summaries_collector.collect(filename=file["filename"], summary=file["summary"])from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ Unnecessary - CocoIndex manages fields automatically
filename: str
summary: str
embedding: list[float]
# This dataclass is never used in the flow!references/flow_patterns.mdreferences/flow_patterns.mdreferences/flow_patterns.mdreferences/flow_patterns.mdreferences/flow_patterns.mdreferences/flow_patterns.mdreferences/flow_patterns.md# 1. Run with setup
cocoindex update --setup -f main # -f force setup without confirmation prompts
# 2. Start a server and redirect users to CocoInsight
cocoindex server -ci main
# Then open CocoInsight at https://cocoindex.io/cocoinsight
Anydict[str, Any]strintfloatboolbytesdatetime.datedatetime.datetimeuuid.UUIDcocoindex.Vector[cocoindex.Float32, typing.Literal[768]]list[float]Persondict[str, Any]Anydict[K, V]list[R]dict[Any, Any]list[Any]cocoindex.JsonT | Nonefrom dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ Vector with dimension (recommended for vector search)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""Generate 768-dim embedding - dimension needed for vector index."""
# ... embedding logic ...
return embedding # numpy array or list of 768 floats
# ✅ Struct return type, relaxed argument
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""Argument can be dict[str, Any], return must be specific Struct."""
return Person(name=person["name"], age=person["age"])
# ✅ LTable return type
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""Return type specifies list of specific Struct."""
return [p for p in people if p.age >= 18]
# ❌ Wrong: dict[str, str] is not a valid specific CocoIndex type
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
Function description.
Args:
input_arg: Description
optional_arg: Optional description
"""
# Transformation logic
return {"result": f"processed-{input_arg}"}@cocoindex.op.function()cache=Truebehavior_version# 1. Define configuration spec
class MyFunction(cocoindex.op.FunctionSpec):
"""Configuration for MyFunction."""
model_name: str
threshold: float = 0.5
# 2. Define executor
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # Required: link to spec
model = None # Instance variables for state
def prepare(self) -> None:
"""Optional: run once before execution."""
# Load model, setup connections, etc.
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""Required: execute for each data row."""
# Use self.spec for configuration
# Use self.model for loaded resources
result = self.model.process(text)
return {"result": result}behavior_versionreferences/custom_functions.mdcocoindex setup maincocoindex update main
# With auto-setup
cocoindex update --setup main
# Force reset everything before setup and update
cocoindex update --reset maincocoindex update main.py -L
# Requires refresh_interval on source or source-specific change capturecocoindex drop main.pycocoindex show main.py:FlowNamecocoindex evaluate main.py:FlowName --output-dir ./test_outputreferences/cli_operations.mdfrom dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... flow definition ...
passstats = my_flow.update()
print(f"Processed {stats.total_rows} rows")
# Async
stats = await my_flow.update_async()# As context manager
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# Updater runs in background
# Your application logic here
pass
# Manual control
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... application logic ...
updater.wait()my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# Use in flow for indexing
doc["embedding"] = text_to_embedding(doc["content"])
# Use for querying
query_embedding = text_to_embedding.eval("search query")references/api_operations.mddoc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # or "python", "javascript", etc.
chunk_size=2000,
chunk_overlap=500
)data = json_string.transform(cocoindex.functions.ParseJson())file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)# Requires: cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)# Requires: cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)import dataclasses
# For structured extraction
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="Extract product information"
)
)
# For text summarization/generation
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="Summarize this document in one paragraph"
)
)cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)# Requires: cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)cocoindex show main.py--app-dir.envCOCOINDEX_DATABASE_URLpsql $COCOINDEX_DATABASE_URL--env-filecocoindex setup main.pycocoindex drop main.py && cocoindex setup main.pyrefresh_intervalmax_inflight_rowsmax_inflight_bytes.envCOCOINDEX_SOURCE_MAX_INFLIGHT_ROWS