LLMs produce high-value reasoning over a low-integrity transport layer. Streams stall, drop tokens, reorder events, violate timing guarantees, and expose no deterministic contract.
This breaks retries. It breaks supervision. It breaks reproducibility. It makes reliable AI systems impossible to build on top of raw provider streams.
L0 is the deterministic execution substrate that fixes the transport - with guardrails designed specifically for the streaming layer: stream-neutral, pattern-based, loop-safe, and timing-aware.
The result: production-grade, integrity-preserving, deterministic AI streams you can finally build real systems on.
It works with OpenAI and LiteLLM (100+ providers including Anthropic, Cohere, Bedrock, Vertex, Gemini). Supports tool calls and provides full observability.
pip install ai2070-l0Also available in TypeScript: @ai-2070/l0 npm install @ai2070/l0 - native implementation with full lifecycle and event signature parity.
Production-grade reliability. Just pass your stream. L0'll take it from here.
L0 includes 1,800+ tests covering all major reliability features.
Any AI Stream L0 Layer Your App
───────────────── ┌──────────────────────────────────────┐ ─────────────
│ │
OpenAI / LiteLLM │ Retry · Fallback · Resume │ Reliable
Custom Streams ──▶│ Guardrails · Timeouts · Consensus │─────▶ Output
│ Full Observability │
│ │
└──────────────────────────────────────┘
───────────────── ─────────────
L0 = Token-Level Reliability
| Feature | Description |
|---|---|
| 🔁 Smart Retries | Model-aware retries with fixed-jitter backoff. Automatic retries for zero-token output, network stalls, and provider overloads. |
| 🌐 Network Protection | Automatic recovery from dropped streams, slow responses, 429/503 load shedding, DNS errors, and partial chunks. |
| 🔀 Model Fallbacks | Automatically fallback to secondary models (e.g., GPT-4o → GPT-4o-mini → Claude) with full retry logic. |
| 💥 Zero-Token/Stall Protection | Detects when model produces nothing or stalls mid-stream. Automatically retries or switches to fallbacks. |
| 📍 Last-Known-Good Token Resumption | continue_from_last_good_token resumes from the last checkpoint on timeout or failure. No lost tokens. |
| 🧠 Drift Detection | Detects repetition, stalls, and format drift before corruption propagates. |
| 🧱 Structured Output | Guaranteed-valid JSON with Pydantic. Auto-corrects missing braces, commas, and markdown fences. |
| 🩹 JSON Auto-Healing | Automatic correction of truncated or malformed JSON (missing braces, brackets, quotes), and repair of broken Markdown code fences. |
| 🛡️ Guardrails | JSON, Markdown, and pattern validation with fast streaming checks. Delta-only checks run sync; full-content scans defer to async. |
| ⚡ Race: Fastest-Model Wins | Run multiple models or providers in parallel and return the fastest valid stream. Ideal for ultra-low-latency chat. |
| 🌿 Parallel: Fan-Out / Fan-In | Start multiple streams simultaneously and collect structured or summarized results. Perfect for agent-style multi-model workflows. |
| 🧩 Consensus: Agreement Across Models | Combine multiple model outputs using unanimous, majority, or best-match consensus. Guarantees high-confidence generation. |
| 🔔 Lifecycle Callbacks | on_start, on_complete, on_error, on_event, on_violation, on_retry, on_fallback, on_tool_call - full observability into every stream phase. |
| 📡 Streaming-First Runtime | Thin, deterministic wrapper with unified event types (token, error, complete) for easy UIs. |
| 📼 Central Event Bus | Full observability into every stream phase via on_event callback with 25+ structured event types. |
| 🔌 Custom Adapters (BYOA) | Bring your own adapter for any LLM provider. Built-in adapters for OpenAI and LiteLLM. |
| 📦 Raw Chunk Access | Access original provider chunks (e.g., OpenAI's ChatCompletionChunk) via stream.raw() for provider-specific processing. |
| ⚡ Pure asyncio | No compatibility layers (no anyio/trio). Native Python async for full determinism and performance. |
| 🔧 Own Retry Logic | No external dependencies (no tenacity). L0 controls all retry behavior for predictable execution. |
| 📝 Type-Safe | Full type hints with py.typed marker. Passes mypy strict mode. |
| 📦 Minimal Dependencies | Only httpx, pydantic, orjson, typing-extensions, uuid6. No heavy abstractions. |
| 🧪 Battle-Tested | 1,800+ unit tests and 100+ integration tests validating real streaming, retries, and advanced behavior. |
Know what you're doing? Skip the tutorial
import asyncio
from openai import AsyncOpenAI
import l0
async def main():
# Wrap the client once - L0 reliability is automatic
client = l0.wrap(AsyncOpenAI())
# Use normally - no lambdas needed!
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)
# Stream with L0 events
async for event in response:
if event.is_token:
print(event.text, end="", flush=True)
# Or read all at once
text = await response.read()
asyncio.run(main())import l0
from openai import AsyncOpenAI
# Configure once, use everywhere
client = l0.wrap(
AsyncOpenAI(),
guardrails=l0.Guardrails.recommended(),
retry=l0.Retry(max_attempts=5),
timeout=l0.Timeout(initial_token=10.0, inter_token=30.0),
continue_from_last_good_token=True, # Resume from checkpoint on failure
)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
)See Also: API.md for all options, ADVANCED.md for full examples
import asyncio
import litellm
import l0
async def main():
# For LiteLLM, use l0.run() with a factory function
result = await l0.run(
stream=lambda: litellm.acompletion(
model="anthropic/claude-3-haiku-20240307",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
),
guardrails=l0.Guardrails.recommended(),
)
# Read full text
text = await result.read()
print(text)
asyncio.run(main())from pydantic import BaseModel
import l0
class UserProfile(BaseModel):
name: str
age: int
occupation: str
result = await l0.structured(
schema=UserProfile,
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Generate a fictional person as JSON"}],
stream=True,
),
auto_correct=True, # Fix trailing commas, missing braces, markdown fences
)
print(result.name) # "Alice"
print(result.age) # 32import l0
result = await l0.run(
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
on_event=lambda event: (
print(event.text, end="") if event.is_token else
print(f"\nError: {event.error}") if event.is_error else
print("\nDone!") if event.is_complete else None
),
)import l0
result = await l0.run(
# Primary model
stream=lambda: client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
# Fallbacks: tried in order if primary fails
fallbacks=[
lambda: client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
lambda: litellm.acompletion(
model="anthropic/claude-3-haiku-20240307",
messages=[{"role": "user", "content": prompt}],
stream=True,
),
],
on_fallback=lambda index, reason: print(f"Switched to fallback {index}"),
)import l0
prompts = ["Name a fruit", "Name a color", "Name an animal"]
results = await l0.parallel(
tasks=[
lambda p=p: client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": p}],
stream=True,
)
for p in prompts
],
concurrency=3,
)
for prompt, result in zip(prompts, results):
print(f"{prompt}: {result.state.content.strip()}")- No magic - Everything is explicit and predictable
- Streaming-first - Built for real-time token delivery
- Signals, not rewrites - Guardrails detect issues, don't modify output
- Model-agnostic - Works with any provider via adapters
- Pure asyncio - No compatibility layers, native Python async
- Own retry logic - No tenacity, full control over behavior
| Guide | Description |
|---|---|
| QUICKSTART.md | Get started in 5 minutes |
| ADVANCED.md | Advanced usage and full examples |
| API.md | Complete API reference |
| GUARDRAILS.md | Guardrails and validation |
| STRUCTURED_OUTPUT.md | Structured output guide |
| CONSENSUS.md | Multi-generation consensus |
| DETERMINISTIC_LIFECYCLE.md | Lifecycle specification and events |
| NETWORK_ERRORS.md | Network error handling |
| ERROR_HANDLING.md | Error handling guide |
| CUSTOM_ADAPTERS.md | Build your own adapters |
| DOCUMENT_WINDOWS.md | Chunking and processing long documents |
| EVENT_SOURCING.md | Record/replay, audit trails |
| MONITORING.md | OpenTelemetry and Sentry integrations |
| FORMATTING.md | Context, memory, output, and tool formatting |
| PARALLEL_OPERATIONS.md | Parallel, race, batch, and pool operations |
| MULTIMODAL.md | Image, audio, video, and multimodal adapters |
| PERFORMANCE.md | Performance tuning guide |
# Basic installation
pip install ai2070-l0
# With OpenAI support
pip install ai2070-l0[openai]
# With LiteLLM (100+ providers)
pip install ai2070-l0[litellm]
# With OpenTelemetry
pip install ai2070-l0[otel]
# With Sentry
pip install ai2070-l0[sentry]
# Development
pip install ai2070-l0[dev]Or with uv:
uv add ai2070-l0
uv add ai2070-l0 --extra openai
uv add ai2070-l0 --extra litellm| Package | Purpose |
|---|---|
httpx |
HTTP client |
pydantic |
Schema validation |
orjson |
Fast JSON |
uuid6 |
UUIDv7 for stream IDs |
typing-extensions |
Type hints |
| Extra | Packages |
|---|---|
openai |
openai>=1.30 |
litellm |
litellm>=1.40 |
otel |
opentelemetry-api, opentelemetry-sdk, opentelemetry-instrumentation-httpx |
sentry |
sentry-sdk |
observability |
All of the above (convenience) |
speed |
uvloop (Unix only) |
dev |
pytest, pytest-asyncio, pytest-cov, mypy, ruff |
Apache-2.0
