Creating Custom Adapters
Learn how to build custom adapters to integrate Kedi with any LLM framework or provider.
The AgentAdapter Protocol
All Kedi adapters must implement the AgentAdapter protocol:
from typing import Any, Protocol, TypeVar
T = TypeVar("T")
class AgentAdapter(Protocol[T]):
"""Protocol that all Kedi adapters must implement."""
async def produce(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
) -> T:
"""
Async method to produce structured output from an LLM.
Args:
template: The prompt template (may include [field] placeholders)
output_schema: Dict mapping field names to Python types
**kwargs: Additional provider-specific arguments
Returns:
An object with attributes matching output_schema keys
"""
...
def produce_sync(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
) -> T:
"""
Synchronous version of produce().
Typically wraps the async version with asyncio.run().
"""
...
Basic Implementation
Here's a minimal adapter implementation:
import asyncio
from pydantic import create_model
from typing import Any
class MyAdapter:
"""A custom Kedi adapter."""
def __init__(self, model: str = "my-default-model"):
self.model = model
def _build_output_model(self, output_schema: dict[str, type]):
"""Create a Pydantic model for validation."""
return create_model(
"Output",
**{k: (v, ...) for k, v in output_schema.items()}
)
async def produce(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
"""Generate structured output from the LLM."""
# 1. Build output model for validation
OutputModel = self._build_output_model(output_schema)
# 2. Call your LLM (implement _call_llm)
raw_response = await self._call_llm(template, output_schema)
# 3. Parse and validate response
return OutputModel(**raw_response)
def produce_sync(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
"""Sync wrapper for produce()."""
return asyncio.run(self.produce(template, output_schema, **kwargs))
async def _call_llm(
self,
template: str,
output_schema: dict[str, type]
) -> dict:
"""Your LLM API call logic goes here."""
# Implement your LLM call
# Return a dict with keys matching output_schema
raise NotImplementedError
Complete Example: OpenAI Adapter
Here's a full adapter using the OpenAI API directly:
import asyncio
import json
from typing import Any
from pydantic import create_model
from openai import AsyncOpenAI
class OpenAIDirectAdapter:
"""Direct OpenAI API adapter for Kedi."""
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.client = AsyncOpenAI()
def _build_output_model(self, output_schema: dict[str, type]):
"""Create a Pydantic model from the output schema."""
return create_model(
"DynamicOutput",
**{k: (v, ...) for k, v in output_schema.items()}
)
def _build_json_schema(self, output_schema: dict[str, type]) -> dict:
"""Convert Python types to JSON schema for OpenAI."""
type_map = {
str: {"type": "string"},
int: {"type": "integer"},
float: {"type": "number"},
bool: {"type": "boolean"},
}
properties = {}
for name, pytype in output_schema.items():
origin = getattr(pytype, "__origin__", None)
if origin is list:
item_type = pytype.__args__[0]
properties[name] = {
"type": "array",
"items": type_map.get(item_type, {"type": "string"})
}
else:
properties[name] = type_map.get(pytype, {"type": "string"})
return {
"type": "object",
"properties": properties,
"required": list(output_schema.keys())
}
async def produce(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
"""Call OpenAI and return structured output."""
OutputModel = self._build_output_model(output_schema)
json_schema = self._build_json_schema(output_schema)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "You are a helpful assistant. Respond with valid JSON."
},
{
"role": "user",
"content": f"{template}\n\nRespond with JSON matching this schema:\n{json.dumps(json_schema, indent=2)}"
}
],
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
data = json.loads(content)
return OutputModel(**data)
def produce_sync(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
"""Sync wrapper."""
return asyncio.run(self.produce(template, output_schema, **kwargs))
Example: Ollama Adapter
Adapter for local models using Ollama:
import asyncio
import json
import httpx
from pydantic import create_model
from typing import Any
class OllamaAdapter:
"""Adapter for local Ollama models."""
def __init__(
self,
model: str = "llama3",
base_url: str = "http://localhost:11434"
):
self.model = model
self.base_url = base_url
def _build_output_model(self, output_schema: dict[str, type]):
return create_model(
"Output",
**{k: (v, ...) for k, v in output_schema.items()}
)
async def produce(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
OutputModel = self._build_output_model(output_schema)
# Build prompt with JSON instruction
fields = ", ".join(output_schema.keys())
prompt = f"""{template}
Respond with ONLY a valid JSON object with these fields: {fields}
No additional text before or after the JSON."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"format": "json"
},
timeout=60.0
)
response.raise_for_status()
data = response.json()
content = data["response"]
# Parse JSON from response
parsed = json.loads(content)
return OutputModel(**parsed)
def produce_sync(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
return asyncio.run(self.produce(template, output_schema, **kwargs))
Example: Mock Adapter for Testing
Useful for unit tests:
import asyncio
from pydantic import create_model
from typing import Any, Dict
class MockAdapter:
"""Mock adapter for testing Kedi programs."""
def __init__(self, responses: Dict[str, dict] = None):
"""
Args:
responses: Dict mapping template substrings to response dicts
"""
self.responses = responses or {}
self.calls = [] # Track all calls for assertions
def _build_output_model(self, output_schema: dict[str, type]):
return create_model(
"Output",
**{k: (v, ...) for k, v in output_schema.items()}
)
def add_response(self, template_contains: str, response: dict):
"""Add a mock response for templates containing the given string."""
self.responses[template_contains] = response
async def produce(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
# Track the call
self.calls.append({
"template": template,
"output_schema": output_schema,
"kwargs": kwargs
})
OutputModel = self._build_output_model(output_schema)
# Find matching response
for key, response in self.responses.items():
if key in template:
return OutputModel(**response)
# Default: generate placeholder values
defaults = {}
for name, pytype in output_schema.items():
if pytype == str:
defaults[name] = f"mock_{name}"
elif pytype == int:
defaults[name] = 42
elif pytype == float:
defaults[name] = 3.14
elif pytype == bool:
defaults[name] = True
elif getattr(pytype, "__origin__", None) is list:
defaults[name] = []
else:
defaults[name] = f"mock_{name}"
return OutputModel(**defaults)
def produce_sync(
self,
template: str,
output_schema: dict[str, type],
**kwargs: Any
):
return asyncio.run(self.produce(template, output_schema, **kwargs))
Usage in tests:
def test_kedi_program():
adapter = MockAdapter()
adapter.add_response("capital of France", {"capital": "Paris"})
adapter.add_response("population", {"population": 67000000})
runtime = KediRuntime(adapter=adapter)
# ... run your program
# Assert calls were made
assert len(adapter.calls) == 2
assert "capital" in adapter.calls[0]["output_schema"]
Using Your Custom Adapter
With KediRuntime
from kedi.core import KediRuntime
adapter = MyAdapter(model="my-model")
runtime = KediRuntime(adapter=adapter)
# Run a program
result = runtime.run_main()
Registering Globally
To use your adapter from the CLI, you can register it:
# In your package's __init__.py or a plugin file
from kedi.agent_adapter import register_adapter
from my_package import MyAdapter
register_adapter("my-adapter", MyAdapter)
Then use:
Best Practices
Use Pydantic for Validation
Always use pydantic.create_model() to build your output types. This ensures consistent validation across all adapters.
Handle Type Conversion
LLMs return strings. Convert types appropriately:
Implement Retries
LLMs can fail or return invalid JSON. Add retry logic:
Add Logging
Log requests and responses for debugging:
Type Handling Reference
Common Python types and their handling:
| Python Type | JSON Schema | Notes |
|---|---|---|
str |
{"type": "string"} |
Default type |
int |
{"type": "integer"} |
Parse from string |
float |
{"type": "number"} |
Parse from string |
bool |
{"type": "boolean"} |
Handle "true"/"false" |
list[str] |
{"type": "array", "items": {"type": "string"}} |
Nested arrays |
dict[str, Any] |
{"type": "object"} |
Free-form objects |
| Custom Model | Full JSON schema | Use model.model_json_schema() |
See Also
- AgentAdapter Overview — Understanding the adapter system
- PydanticAdapter — Reference implementation
- DSPyAdapter — Alternative implementation
- Python Interop — Using adapters in Python