Bulletproofing LLM Structured Output in Python: Healing Retries, Cost Caps, and Drift Detection (Runnable Code)
I shipped a structured-output endpoint to production in March. The schema was clean, JSON mode was on, the model was GPT-4.1, the eval suite was green. Three weeks in, the on-call channel lit up because a downstream billing job had silently skipped 4,200 records over a weekend. The output was valid JSON. It just wasn't the JSON we asked for. That was my last "JSON mode is good enough" deployment. Since then I've shipped four more LLM structured-output systems and the failures keep coming from the same places — and JSON mode catches roughly two of them. This post is the toolkit I wish I had on day one, with runnable Python you can drop into a FastAPI service this afternoon. Two months of incident logs across two enterprise deployments, sorted by frequency: Silent truncation. max_tokens runs out mid-object. You get parseable JSON for the first 80% of an array, the last item is gone. Hallucinated keys. Model returns customer_id when the schema says client_id. JSON mode does not check field names against your schema unless you use strict structured output, and even then nested types slip through. Type coercion. "price": "1,499.00" instead of 1499.00. JSON parser is happy. Your billing job is not. Semantic drift. Schema-valid output where the values are wrong — wrong customer, wrong amount, wrong country code. Refusals returning JSON. Safety filter triggers, model returns {"refusal": "I can't help with that"}. Your code parses it as a normal response. Schema-version desync. You ship a new field, an in-flight worker is still on the old schema, batch fails for two hours until someone notices. JSON mode catches #1 and #3 sometimes. The other four need real validation, healing, and observability layered on top. We're building this around four pieces: A strict validator that runs after JSON mode (catches what JSON mode misses). A healing retry loop that feeds the validation error back to the model — not a blind retry. A cost-bounded fallback chain so a bad prompt cannot burn through $400 in tokens. A drift detector that tracks parse compliance and field-distribution shifts over time. Full file structure: llm_structured/ ├── schemas.py # Pydantic models with versioning ├── validator.py # Strict validation beyond JSON mode ├── healer.py # Healing retry loop ├── budget.py # Per-request and global cost caps ├── chain.py # Multi-provider fallback with circuit breaker ├── observability.py # Metrics + drift detection └── service.py # FastAPI endpoint that ties it all together Install dependencies: pip install pydantic==2.7.4 openai==1.30.0 anthropic==0.30.0 \ tenacity==8.3.0 prometheus-client==0.20.0 fastapi==0.111.0 \ uvicorn==0.30.1 httpx==0.27.0 Schema versioning sounds boring until you've had two services on different versions for ninety minutes during a deploy. # schemas.py from pydantic import BaseModel, Field, field_validator from typing import Literal from decimal import Decimal class InvoiceLineV2(BaseModel): schema_version: Literal["2.0"] = "2.0" client_id: str = Field(min_length=3, max_length=64) amount: Decimal = Field(gt=0, decimal_places=2) currency: Literal["USD", "EUR", "GBP", "INR"] invoice_date: str = Field(pattern=r"^\d{4}-\d{2}-\d{2}$") line_items: list[str] = Field(min_length=1, max_length=50) confidence: float = Field(ge=0.0, le=1.0) @field_validator("amount", mode="before") @classmethod def coerce_amount(cls, v): if isinstance(v, str): cleaned = v.replace(",", "").replace("$", "").strip() return Decimal(cleaned) return v def schema_for_prompt(model: type[BaseModel]) -> dict: """Return a JSON-schema dict suitable for OpenAI response_format.""" return { "type": "json_schema", "json_schema": { "name": model.__name__, "schema": model.model_json_schema(), "strict": True, }, } The schema_version field is the key. Every output carries the version that produced it; downstream consumers fail loudly when they see a version they don't understand instead of silently mis-mapping fields. JSON mode + strict: true will catch type errors and missing required fields. It will not catch refusals, won't catch semantic anchors, and won't tell you about partial truncation. So we run a second-pass validator. # validator.py from pydantic import BaseModel, ValidationError import json import re REFUSAL_PATTERNS = [ r"i can'?t help", r"i'?m not able to", r"as an ai", r"i'?m unable to provide", ] class ValidationResult: def __init__(self, ok: bool, value=None, errors=None, raw=None): self.ok = ok self.value = value self.errors = errors or [] self.raw = raw def validate(raw: str, model: type[BaseModel]) -> ValidationResult: if not raw or not raw.strip(): return ValidationResult(False, errors=["empty_response"], raw=raw) lower = raw.lower() for pat in REFUSAL_PATTERNS: if re.search(pat, lower): return ValidationResult(False, errors=["refusal_detected"], raw=raw) try: parsed = json.loads(raw) except json.JSONDecodeError as e: return ValidationResult(False, errors=[f"json_decode: {e}"], raw=raw) try: instance = model.model_validate(parsed) except ValidationError as e: return ValidationResult(False, errors=_format_errors(e), raw=raw) return ValidationResult(True, value=instance, raw=raw) def _format_errors(e: ValidationError) -> list[str]: out = [] for err in e.errors(): loc = ".".join(str(p) for p in err["loc"]) out.append(f"{loc}: {err['msg']}") return out The _format_errors step matters. If you feed ValidationError.json() back to the model verbatim, you get a wall of stack-trace-looking text the model wastes tokens trying to parse. Plain English errors heal in one round most of the time. A blind retry on the same prompt with temperature=0 gives you the same broken output. The fix is to tell the model what was wrong and ask it to repair that specific output. # healer.py from .validator import validate, ValidationResult from pydantic import BaseModel from openai import AsyncOpenAI REPAIR_PROMPT = """The previous response failed validation. Original schema requirements: {schema} Your previous output: {previous} Validation errors: {errors} Return ONLY corrected JSON matching the schema. Do not explain. """ async def heal( client: AsyncOpenAI, model_name: str, user_prompt: str, response_model: type[BaseModel], max_attempts: int = 3, ) -> ValidationResult: history = [{"role": "user", "content": user_prompt}] last_raw = "" for attempt in range(max_attempts): resp = await client.chat.completions.create( model=model_name, messages=history, response_format={"type": "json_object"}, temperature=0.0, ) last_raw = resp.choices[0].message.content or "" result = validate(last_raw, response_model) if result.ok: return result history.append({"role": "assistant", "content": last_raw}) history.append({ "role": "user", "content": REPAIR_PROMPT.format( schema=response_model.model_json_schema(), previous=last_raw, errors="\n".join(result.errors), ), }) return ValidationResult(False, errors=["max_heal_attempts"], raw=last_raw) Three attempts is the cap I land on most of the time. In incident data from one client, attempt 1 succeeds 87.4% of the time, attempt 2 takes another 9.1%, attempt 3 captures 2.8%, and the remaining 0.7% is genuinely broken (model can't comply, downstream needs a human). Anything past three is just burning tokens. The 0.7% that fail-loud are also the prompts that recursively spiral. So we cap. # budget.py import time from dataclasses import dataclass from contextvars import ContextVar @dataclass class CostState: spent_usd: float = 0.0 started_at: float = 0.0 request_cap_usd: float = 0.10 global_cap_usd_per_min: float = 5.0 global_window_start: float = 0.0 global_spent_in_window: float = 0.0 _state: ContextVar[CostState] = ContextVar("_cost_state") PRICING = { "gpt-4.1": (0.00250, 0.01000), "gpt-4.1-mini": (0.00015, 0.00060), "claude-sonnet-4.5": (0.00300, 0.01500), } def estimate(model: str, prompt_tokens: int, completion_tokens: int) -> float: in_price, out_price = PRICING.get(model, (0.0, 0.0)) return (prompt_tokens / 1000) * in_price + (completion_tokens / 1000) * out_price def charge(model: str, prompt_tokens: int, completion_tokens: int) -> None: state = _state.get() cost = estimate(model, prompt_tokens, completion_tokens) state.spent_usd += cost now = time.time() if now - state.global_window_start > 60: state.global_window_start = now state.global_spent_in_window = 0.0 state.global_spent_in_window += cost if state.spent_usd > state.request_cap_usd: raise BudgetExceeded(f"per-request cap hit: {state.spent_usd:.4f}") if state.global_spent_in_window > state.global_cap_usd_per_min: raise BudgetExceeded( f"global rate cap hit: {state.global_spent_in_window:.4f}/min" ) class BudgetExceeded(Exception): pass def with_budget(request_cap_usd: float = 0.10) -> CostState: state = CostState(request_cap_usd=request_cap_usd, started_at=time.time()) _state.set(state) return state The per-request cap is what saves you from one runaway prompt. The per-minute global cap is what saves you from a bug — like the time I deployed a regex that turned every retrieved doc into a 200KB context, and we caught it because the global cap kicked in at minute three instead of the next billing cycle. If OpenAI returns a 5xx burst, retrying OpenAI is wasted seconds. Fall over to Anthropic, but don't fall back forever — open the circuit, let one probe through every 30 seconds, recover when it succeeds. # chain.py import time from dataclasses import dataclass from openai import AsyncOpenAI from anthropic import AsyncAnthropic from pydantic import BaseModel from .healer import heal from .validator import ValidationResult @dataclass class Breaker: failures: int = 0 open_until: float = 0.0 threshold: int = 3 cooldown: float = 30.0 class FallbackChain: def __init__(self, openai_client: AsyncOpenAI, anthropic_client: AsyncAnthropic): self.openai = openai_client self.anthropic = anthropic_client self.breakers = {"openai": Breaker(), "anthropic": Breaker()} def _can_call(self, name: str) -> bool: return time.time() >= self.breakers[name].open_until def _record(self, name: str, ok: bool) -> None: b = self.breakers[name] if ok: b.failures = 0 b.open_until = 0.0 else: b.failures += 1 if b.failures >= b.threshold: b.open_until = time.time() + b.cooldown async def run( self, user_prompt: str, model: type[BaseModel] ) -> ValidationResult: if self._can_call("openai"): try: result = await heal(self.openai, "gpt-4.1-mini", user_prompt, model) self._record("openai", result.ok) if result.ok: return result except Exception: self._record("openai", False) if self._can_call("anthropic"): try: result = await self._call_anthropic(user_prompt, model) self._record("anthropic", result.ok) return result except Exception: self._record("anthropic", False) return ValidationResult(False, errors=["all_providers_unavailable"]) async def _call_anthropic(self, prompt: str, model: type[BaseModel]): # Anthropic uses tool_use to force structured output from .validator import validate resp = await self.anthropic.messages.create( model="claude-sonnet-4.5", max_tokens=2000, tools=[{ "name": model.__name__, "description": f"Return a {model.__name__}", "input_schema": model.model_json_schema(), }], tool_choice={"type": "tool", "name": model.__name__}, messages=[{"role": "user", "content": prompt}], ) import json for block in resp.content: if block.type == "tool_use": return validate(json.dumps(block.input), model) return ValidationResult(False, errors=["no_tool_use_block"]) Most teams stop at parse_compliance_rate and call it observability. That tells you nothing on the day a model upgrade silently shifts your confidence field from a 0.85 mean to 0.62. # observability.py from prometheus_client import Counter, Histogram, Gauge from collections import deque import statistics PARSE_OK = Counter("llm_parse_ok_total", "Successful parses", ["model", "schema"]) PARSE_FAIL = Counter( "llm_parse_fail_total", "Failed parses", ["model", "schema", "reason"] ) HEAL_ATTEMPTS = Histogram( "llm_heal_attempts", "Attempts to validation success", ["model", "schema"] ) COST_USD = Counter("llm_cost_usd_total", "Cost in USD", ["model"]) _field_windows: dict[str, deque] = {} DRIFT_GAUGE = Gauge("llm_field_drift_zscore", "Z-score of field mean", ["field"]) def track_field(field_name: str, value: float, window: int = 1000) -> None: if field_name not in _field_windows: _field_windows[field_name] = deque(maxlen=window) q = _field_windows[field_name] q.append(value) if len(q) >= 50: old = list(q)[: len(q) // 2] new = list(q)[len(q) // 2 :] if statistics.stdev(old) > 0: z = (statistics.mean(new) - statistics.mean(old)) / statistics.stdev(old) DRIFT_GAUGE.labels(field=field_name).set(z) The track_field helper is what catches the silent model-upgrade regression. Wire it to alert when |z| > 2.5 on any field for ten minutes — same pattern I broke down for the LLM evaluation harness in pytest, except that one runs at CI time and this one runs in production. # service.py from fastapi import FastAPI, HTTPException from openai import AsyncOpenAI from anthropic import AsyncAnthropic from .schemas import InvoiceLineV2 from .chain import FallbackChain from .budget import with_budget, BudgetExceeded from .observability import PARSE_OK, PARSE_FAIL, HEAL_ATTEMPTS, track_field app = FastAPI() chain = FallbackChain(AsyncOpenAI(), AsyncAnthropic()) @app.post("/extract/invoice") async def extract_invoice(payload: dict): text = payload.get("text", "") with_budget(request_cap_usd=0.05) try: result = await chain.run(text, InvoiceLineV2) except BudgetExceeded as e: raise HTTPException(429, f"cost cap: {e}") label = {"model": "gpt-4.1-mini", "schema": "InvoiceLineV2"} if not result.ok: for reason in result.errors: PARSE_FAIL.labels(**label, reason=reason[:32]).inc() raise HTTPException(422, {"errors": result.errors, "raw": result.raw}) PARSE_OK.labels(**label).inc() track_field("invoice.confidence", float(result.value.confidence)) return result.value.model_dump() The whole point of the toolkit is that the bad days behave. Test the bad days on purpose. # tests/test_chaos.py import pytest from unittest.mock import AsyncMock, patch from llm_structured.service import app from fastapi.testclient import TestClient client = TestClient(app) @pytest.mark.parametrize("bad_response", [ "I can't help with that request.", '{"client_id": "abc", "amount": "not_a_number"}', '{"customer_id": "abc", "amount": 100}', '{"client_id": "abc", "amount": 100, "currency": "ZZZ"}', ]) def test_chaos_responses(bad_response): with patch("openai.AsyncOpenAI") as mock: mock.return_value.chat.completions.create = AsyncMock( return_value=type("R", (), { "choices": [type("C", (), { "message": type("M", (), {"content": bad_response})() })()] })() ) r = client.post("/extract/invoice", json={"text": "..."}) assert r.status_code in (422, 429) The point of the parametrize block isn't coverage. It's to make sure none of these failure modes can crash the service or silently succeed. A green test on this file is the closest thing to a guarantee you get. If you have a structured-output endpoint in production right now, do this Monday morning, in this order: Add the validator from section 2 after JSON mode. You will catch hallucinated keys you didn't know you had. Wire the per-request cost cap. It's twelve lines of code and it will save you the day a bad prompt loops. Add track_field to one numerical field with a known distribution. That's your drift canary. Steps 4 through 6 are the ones that take a sprint. These three take an afternoon, and they cover the failure modes that have hit me on every production rollout. I'm working on the next post in this cluster — a teardown of the failure-injection harness we use to test these endpoints under realistic chaos before they ship. If that's interesting, follow along. For teams that want this implemented end-to-end, our work on LLM integration and custom AI agents at Velocity Software Solutions covers exactly this kind of production-hardening; we ship Python services like this one for clients regularly through our Python development practice. Related production-grade pieces I've written: Building a Production LLM Evaluation Harness in Pytest — the CI-side companion to drift detection Production-Grade LLM Streaming in FastAPI — backpressure and cancellation patterns Building a Production MCP Server in Python — per-tool permissions and audit logs External references worth bookmarking: OpenAI structured outputs guide Pydantic v2 validation docs Anthropic tool use for structured output
