大模型应用的可观测性:日志、监控、调试全攻略#
当你的 Agent 在凌晨三点调用了 Claude 4、GPT-5 和 Gemini 2.5 Pro 完成一个多步推理任务却返回了一个错误答案时,你需要的不只是一个错误日志——你需要一个完整的可观测性体系。
为什么 LLM 应用需要专门的可观测性?#
传统 Web 应用的可观测性围绕请求-响应、数据库查询和 CPU/内存展开。大模型应用引入了全新的复杂性:
- 非确定性输出:相同输入可能产生不同结果
- 高成本操作:一次 API 调用可能花费数美元
- 多模型编排:一个用户请求可能串联 3-5 个模型调用
- 质量难以量化:“正确"和"幻觉"之间的界限模糊
- 延迟波动大:从 200ms 到 30s 都有可能
2026 年,随着 Claude 4 Opus、GPT-5、Gemini 2.5 Pro、Llama 4 和 DeepSeek-V3 等模型的大规模生产部署,可观测性已经从"锦上添花"变成了"不可或缺”。
可观测性三大支柱在 LLM 场景的实践#
一、结构化日志(Structured Logging)#
LLM 调用日志不是简单的 print(response)。你需要记录每次调用的完整上下文。
核心字段设计#
import json
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class LLMCallLog:
request_id: str
trace_id: str
timestamp: str
model: str # e.g. "claude-4-opus", "gpt-5"
provider: str # e.g. "anthropic", "openai"
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: float
cost_usd: float
status: str # "success" | "error" | "timeout"
error_type: Optional[str]
temperature: float
max_tokens: int
user_id: Optional[str]
session_id: Optional[str]
prompt_hash: str # 用于去重和聚类,不存原文
response_hash: str
metadata: dict # 自定义字段
class LLMLogger:
def __init__(self, log_path: str = "/var/log/llm/calls.jsonl"):
self.log_path = log_path
self.token_prices = {
"claude-4-opus": {"input": 15.0, "output": 75.0},
"claude-4-sonnet": {"input": 3.0, "output": 15.0},
"gpt-5": {"input": 10.0, "output": 30.0},
"gpt-5-mini": {"input": 1.5, "output": 6.0},
"gemini-2.5-pro": {"input": 7.0, "output": 21.0},
"deepseek-v3": {"input": 0.27, "output": 1.10},
"llama-4-maverick": {"input": 0.20, "output": 0.60},
}
def calculate_cost(self, model: str, prompt_tokens: int,
completion_tokens: int) -> float:
prices = self.token_prices.get(model, {"input": 0, "output": 0})
return (prompt_tokens * prices["input"] +
completion_tokens * prices["output"]) / 1_000_000
def log_call(self, log_entry: LLMCallLog):
with open(self.log_path, "a") as f:
f.write(json.dumps(asdict(log_entry), ensure_ascii=False) + "\n")日志上下文传播#
在异步 Python 应用中,使用 contextvars 传播 trace_id:
import contextvars
trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
'trace_id', default=''
)
request_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
'request_id', default=''
)
def get_current_trace_id() -> str:
return trace_id_var.get() or str(uuid.uuid4())
# 在入口处设置
async def handle_request(request):
trace_id = str(uuid.uuid4())
trace_id_var.set(trace_id)
request_id_var.set(str(uuid.uuid4()))
# ... 处理请求二、指标监控(Metrics)#
关键指标体系#
| 指标类别 | 指标名 | 类型 | 说明 |
|---|---|---|---|
| 延迟 | llm_request_duration_seconds | Histogram | 端到端延迟 |
| 延迟 | llm_time_to_first_token_seconds | Histogram | 首 token 延迟(流式) |
| 吞吐 | llm_requests_total | Counter | 请求总数 |
| Token | llm_tokens_total | Counter | Token 消耗总量 |
| 成本 | llm_cost_usd_total | Counter | 累计成本 |
| 错误 | llm_errors_total | Counter | 错误计数(按类型) |
| 质量 | llm_quality_score | Histogram | 质量评分 |
| 缓存 | llm_cache_hit_ratio | Gauge | 缓存命中率 |
Prometheus 指标定义#
from prometheus_client import Histogram, Counter, Gauge, Info
# 请求延迟
LLM_REQUEST_DURATION = Histogram(
'llm_request_duration_seconds',
'LLM API request duration in seconds',
['model', 'provider', 'operation', 'status'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]
)
# Time to First Token
LLM_TTFT = Histogram(
'llm_time_to_first_token_seconds',
'Time to first token for streaming requests',
['model', 'provider'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
# Token 消耗
LLM_TOKENS = Counter(
'llm_tokens_total',
'Total tokens consumed',
['model', 'provider', 'token_type'] # token_type: input/output
)
# 请求成本
LLM_COST = Counter(
'llm_cost_usd_total',
'Total cost in USD',
['model', 'provider']
)
# 错误计数
LLM_ERRORS = Counter(
'llm_errors_total',
'Total LLM errors',
['model', 'provider', 'error_type']
)
# 活跃请求
LLM_ACTIVE_REQUESTS = Gauge(
'llm_active_requests',
'Currently active LLM requests',
['model', 'provider']
)
# 质量分数
LLM_QUALITY_SCORE = Histogram(
'llm_quality_score',
'LLM response quality score (0-1)',
['model', 'evaluator'],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)中间件自动采集#
import asyncio
from functools import wraps
from prometheus_client import Counter, Histogram
def llm_instrumented(model: str, provider: str, operation: str = "chat"):
"""装饰器:自动采集 LLM 调用指标"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
LLM_ACTIVE_REQUESTS.labels(model=model, provider=provider).inc()
start_time = time.time()
status = "success"
error_type = None
try:
result = await func(*args, **kwargs)
# 记录 Token
LLM_TOKENS.labels(
model=model, provider=provider, token_type="input"
).inc(result.prompt_tokens)
LLM_TOKENS.labels(
model=model, provider=provider, token_type="output"
).inc(result.completion_tokens)
# 记录成本
cost = calculate_cost(model, result.prompt_tokens,
result.completion_tokens)
LLM_COST.labels(model=model, provider=provider).inc(cost)
return result
except Exception as e:
status = "error"
error_type = type(e).__name__
LLM_ERRORS.labels(
model=model, provider=provider, error_type=error_type
).inc()
raise
finally:
duration = time.time() - start_time
LLM_REQUEST_DURATION.labels(
model=model, provider=provider,
operation=operation, status=status
).observe(duration)
LLM_ACTIVE_REQUESTS.labels(
model=model, provider=provider
).dec()
return wrapper
return decorator
# 使用示例
@llm_instrumented(model="gpt-5", provider="openai", operation="chat")
async def call_gpt5(prompt: str):
return await openai_client.chat.completions.create(
model="gpt-5",
messages=[{"role": "user", "content": prompt}]
)Grafana Dashboard 配置#
{
"dashboard": {
"title": "LLM Observability - 2026",
"panels": [
{
"title": "请求延迟分布 (P50/P95/P99)",
"type": "timeseries",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(llm_request_duration_seconds_bucket[5m]))",
"legendFormat": "P50"
},
{
"expr": "histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, rate(llm_request_duration_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
},
{
"title": "各模型 Token 消耗速率",
"type": "timeseries",
"targets": [
{
"expr": "sum(rate(llm_tokens_total[5m])) by (model)",
"legendFormat": "{{model}}"
}
]
},
{
"title": "每小时成本",
"type": "stat",
"targets": [
{
"expr": "sum(increase(llm_cost_usd_total[1h]))",
"legendFormat": "Cost/hour"
}
]
},
{
"title": "错误率",
"type": "timeseries",
"targets": [
{
"expr": "rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) * 100",
"legendFormat": "Error % ({{model}})"
}
]
}
]
}
}三、分布式链路追踪(Distributed Tracing)#
多 Agent 和多模型编排是 2026 年 LLM 应用的标配。一个用户请求可能经历:
用户请求 → Router Agent
├─ Claude 4 Opus (复杂推理)
├─ GPT-5 (代码生成)
└─ Gemini 2.5 Pro (多模态理解)
└─ Llama 4 (本地快速分类)
└─ DeepSeek-V3 (数据提取)OpenTelemetry 集成#
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter
)
from opentelemetry.sdk.resources import Resource
# 初始化 Tracer
resource = Resource.create({
"service.name": "llm-agent-service",
"service.version": "2.0.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://otel-collector:4317")
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-observability")
async def traced_llm_call(
model: str,
messages: list,
parent_span: trace.Span = None
):
"""带链路追踪的 LLM 调用"""
with tracer.start_as_current_span(
f"llm.call.{model}",
kind=trace.SpanKind.CLIENT,
attributes={
"llm.model": model,
"llm.provider": get_provider(model),
"llm.request.type": "chat",
"llm.prompt.length": sum(len(m["content"]) for m in messages),
}
) as span:
try:
response = await call_model(model, messages)
span.set_attribute("llm.response.tokens.prompt",
response.usage.prompt_tokens)
span.set_attribute("llm.response.tokens.completion",
response.usage.completion_tokens)
span.set_attribute("llm.response.tokens.total",
response.usage.total_tokens)
span.set_attribute("llm.response.finish_reason",
response.choices[0].finish_reason)
span.set_status(trace.Status(trace.StatusCode.OK))
return response
except Exception as e:
span.set_status(
trace.Status(trace.StatusCode.ERROR, str(e))
)
span.record_exception(e)
raise
# 多模型编排追踪
async def multi_model_agent(user_query: str):
with tracer.start_as_current_span("agent.multi_model_pipeline") as root:
root.set_attribute("user.query.length", len(user_query))
# 并行调用多个模型
with tracer.start_as_current_span("parallel.model_calls"):
results = await asyncio.gather(
traced_llm_call("claude-4-opus", complex_reasoning_prompt),
traced_llm_call("gpt-5", code_generation_prompt),
traced_llm_call("gemini-2.5-pro", multimodal_prompt),
)
# 汇总结果
with tracer.start_as_current_span("agent.synthesize"):
final = await traced_llm_call(
"claude-4-opus",
synthesize_prompt(results)
)
return final四、Prompt/Response 日志与 PII 脱敏#
记录原始 prompt 和 response 对调试至关重要,但必须处理敏感信息。
PII 脱敏方案#
import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
class PIIRedactor:
"""LLM 请求/响应的 PII 脱敏器"""
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
# 额外的自定义模式
self.custom_patterns = {
"api_key": re.compile(
r'(sk-[a-zA-Z0-9]{20,}|AIza[a-zA-Z0-9_-]{35})'
),
"phone_cn": re.compile(r'1[3-9]\d{9}'),
"id_card_cn": re.compile(
r'\d{17}[\dXx]'
),
}
def redact(self, text: str, language: str = "zh") -> str:
# 使用 Presidio 检测 PII
results = self.analyzer.analyze(
text=text,
entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"CREDIT_CARD", "IP_ADDRESS"],
language=language,
)
anonymized = self.anonymizer.anonymize(
text=text, analyzer_results=results
)
# 应用自定义正则
result = anonymized.text
for name, pattern in self.custom_patterns.items():
result = pattern.sub(f"[REDACTED_{name.upper()}]", result)
return result
def safe_log_prompt(self, messages: list) -> list:
"""安全记录 prompt,脱敏后再写入日志"""
return [
{**msg, "content": self.redact(msg["content"])}
for msg in messages
]
# 使用示例
redactor = PIIRedactor()
def safe_log_llm_call(request, response):
safe_log = {
"request_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"model": request.model,
"messages": redactor.safe_log_prompt(request.messages),
"response": redactor.redact(response.content),
"metadata": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
}
}
logger.info(json.dumps(safe_log, ensure_ascii=False))五、质量监控与幻觉检测#
2026 年的质量监控已经远超简单的"人工评测"。
自动化幻觉检测#
class HallucinationDetector:
"""基于多策略的幻觉检测器"""
def __init__(self):
self.fact_checker_model = "claude-4-sonnet"
self.fact_checker = LiteLLMClient(model=self.fact_checker_model)
async def detect(
self,
query: str,
response: str,
context: list[str] = None
) -> dict:
scores = {}
# 策略 1:基于上下文的一致性检查
if context:
scores["context_faithfulness"] = await self._check_faithfulness(
response, context
)
# 策略 2:自我一致性检查(采样多次对比)
scores["self_consistency"] = await self._check_self_consistency(
query, response
)
# 策略 3:事实核查
scores["fact_check"] = await self._fact_check(response)
# 策略 4:引用验证
scores["citation_accuracy"] = await self._verify_citations(
response, context
)
# 综合评分
weights = {
"context_faithfulness": 0.35,
"self_consistency": 0.25,
"fact_check": 0.25,
"citation_accuracy": 0.15
}
composite = sum(
scores.get(k, 0) * v for k, v in weights.items()
)
return {
"hallucination_score": 1.0 - composite,
"detail_scores": scores,
"is_hallucination": composite < 0.6,
"confidence": self._calculate_confidence(scores),
}
async def _check_faithfulness(
self, response: str, context: list[str]
) -> float:
prompt = f"""评估以下回答是否忠实于提供的上下文。
仅基于上下文信息评分,0=完全不忠实,1=完全忠实。
上下文: {chr(10).join(context)}
回答: {response}
输出一个 0-1 之间的数字。"""
result = await self.fact_checker.complete(prompt)
try:
return float(result.strip())
except ValueError:
return 0.5
async def _check_self_consistency(
self, query: str, response: str
) -> float:
"""多次采样检查一致性"""
samples = []
for _ in range(3):
sample = await self.fact_checker.complete(
f"回答以下问题:{query}"
)
samples.append(sample)
# 简化的一致性评分:比较关键信息点
agreements = 0
total = 0
response_claims = self._extract_claims(response)
for sample in samples:
sample_claims = self._extract_claims(sample)
for claim in response_claims:
if any(self._claims_match(claim, sc)
for sc in sample_claims):
agreements += 1
total += 1
return agreements / total if total > 0 else 0.5
# 质量指标上报
async def evaluate_and_report(
query: str, response: str, model: str
):
detector = HallucinationDetector()
result = await detector.detect(query, response)
# 上报到 Prometheus
LLM_QUALITY_SCORE.labels(
model=model, evaluator="hallucination"
).observe(1.0 - result["hallucination_score"])
if result["is_hallucination"]:
logger.warning(
f"Potential hallucination detected",
extra={
"model": model,
"hallucination_score": result["hallucination_score"],
"detail_scores": result["detail_scores"],
}
)
return result六、成本看板与告警#
成本追踪与预算告警#
from prometheus_client import Counter, Gauge
import asyncio
# 成本预算告警规则 (Prometheus AlertManager)
ALERT_RULES = """
groups:
- name: llm_cost_alerts
rules:
- alert: LLMHourlyCostHigh
expr: sum(increase(llm_cost_usd_total[1h])) > 50
for: 5m
labels:
severity: warning
annotations:
summary: "LLM 每小时成本超过 $50"
description: "当前每小时成本: {{ $value | humanize }} USD"
- alert: LLMDailyCostCritical
expr: sum(increase(llm_cost_usd_total[24h])) > 500
for: 10m
labels:
severity: critical
annotations:
summary: "LLM 每日成本超过 $500"
description: "当前每日成本: {{ $value | humanize }} USD"
- alert: LLMTokenRateAnomaly
expr: rate(llm_tokens_total[5m]) > 3 * rate(llm_tokens_total[1h] offset 1d)
for: 15m
labels:
severity: warning
annotations:
summary: "Token 消耗速率异常升高"
description: "当前速率是昨日同期的 3 倍以上"
- alert: LLMErrorRateHigh
expr: rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "LLM 错误率超过 10%"
"""
# 动态成本预算管理
class CostBudgetManager:
def __init__(self, daily_limit: float = 100.0,
hourly_limit: float = 20.0):
self.daily_limit = daily_limit
self.hourly_limit = hourly_limit
self.daily_spend = Gauge('llm_budget_daily_remaining_usd',
'Remaining daily budget')
self.hourly_spend = Gauge('llm_budget_hourly_remaining_usd',
'Remaining hourly budget')
async def check_budget(self, model: str,
estimated_cost: float) -> bool:
"""在调用前检查预算"""
remaining = await self._get_remaining_budget()
if estimated_cost > remaining["hourly"]:
logger.warning(
f"Budget exceeded: estimated ${estimated_cost:.4f}, "
f"hourly remaining ${remaining['hourly']:.4f}"
)
return False
return True
async def _get_remaining_budget(self) -> dict:
# 从 Prometheus 查询当前消费
# ... 查询逻辑
pass七、调试工具与技巧#
常见问题诊断清单#
class LLMDebugger:
"""LLM 调用诊断工具"""
def diagnose(self, call_log: dict) -> list[str]:
issues = []
# 1. 延迟异常
if call_log["latency_ms"] > 10000:
issues.append(
f"⚠️ 高延迟: {call_log['latency_ms']}ms "
f"(模型: {call_log['model']})"
)
# 2. Token 使用效率
ratio = (call_log["completion_tokens"] /
max(call_log["prompt_tokens"], 1))
if ratio > 10:
issues.append(
f"⚠️ 输出/输入比过高: {ratio:.1f}x,"
f"可能需要优化 prompt"
)
# 3. 成本突增
expected_cost = self._get_expected_cost(call_log["model"])
if call_log["cost_usd"] > expected_cost * 2:
issues.append(
f"⚠️ 成本异常: ${call_log['cost_usd']:.4f} "
f"(预期: ${expected_cost:.4f})"
)
# 4. 频繁重试
if call_log.get("retry_count", 0) > 2:
issues.append(
f"⚠️ 频繁重试: {call_log['retry_count']} 次,"
f"错误类型: {call_log.get('error_type')}"
)
# 5. 截断检测
if call_log.get("finish_reason") == "length":
issues.append(
"⚠️ 输出被截断 (max_tokens 不足)"
)
return issues
def compare_models(
self, logs: list[dict], models: list[str]
) -> dict:
"""对比不同模型在同一请求集上的表现"""
comparison = {}
for model in models:
model_logs = [l for l in logs if l["model"] == model]
if model_logs:
comparison[model] = {
"avg_latency_ms": mean(
[l["latency_ms"] for l in model_logs]
),
"avg_cost_usd": mean(
[l["cost_usd"] for l in model_logs]
),
"success_rate": (
len([l for l in model_logs
if l["status"] == "success"])
/ len(model_logs)
),
"avg_quality_score": mean(
[l.get("quality_score", 0)
for l in model_logs]
),
}
return comparison交互式调试 Session#
class LLMDebugSession:
"""交互式调试会话,可逐步重放请求"""
def __init__(self, trace_id: str):
self.trace_id = trace_id
self.calls = self._load_trace(trace_id)
def _load_trace(self, trace_id: str) -> list[dict]:
# 从日志存储加载完整 trace
pass
def timeline(self):
"""展示调用时间线"""
for i, call in enumerate(self.calls):
bar = "█" * int(call["latency_ms"] / 100)
print(f"[{i}] {call['model']:25s} | "
f"{call['latency_ms']:8.0f}ms | "
f"{bar}")
def replay_call(self, index: int, model: str = None):
"""使用不同模型重放单个调用"""
original = self.calls[index]
target_model = model or original["model"]
print(f"Replaying with {target_model}...")
# 重放逻辑
pass
def export_for_evaluation(self) -> dict:
"""导出 trace 数据用于质量评估"""
return {
"trace_id": self.trace_id,
"calls": self.calls,
"total_cost": sum(c["cost_usd"] for c in self.calls),
"total_latency_ms": sum(c["latency_ms"] for c in self.calls),
"models_used": list(set(c["model"] for c in self.calls)),
}八、主流工具对比#
2026 年的 LLM 可观测性工具生态已经非常成熟:
LangSmith#
LangChain 官方平台,深度集成 LangChain/LangGraph。
from langsmith import traceable
@traceable(
name="my_agent",
run_type="chain",
metadata={"version": "2.0"}
)
async def my_agent(query: str):
# LangSmith 自动记录输入输出、延迟、token 使用
result = await chain.ainvoke({"query": query})
return result优势:与 LangChain 生态无缝集成、强大的 Prompt Hub、内置评估框架。
Helicone#
基于代理的日志方案,零代码改动。
# 只需修改 base_url
client = OpenAI(
base_url="https://oai.helicone.ai/v1",
default_headers={
"Helicone-Auth": "Bearer YOUR_HELICONE_KEY",
"Helicone-User-Id": "user-123",
}
)优势:零侵入、支持缓存、成本分析仪表板。
Lunary#
开源全栈可观测性平台。
import lunary
lunary.init(app_id="your-app-id")
@lunary.track()
async def chat_handler(message: str):
# Lunary 自动捕获调用数据
response = await client.chat.completions.create(...)
return response优势:完全开源、内置用户反馈收集、支持多模型对比。
工具对比表#
| 特性 | LangSmith | Helicone | Lunary | 自建方案 |
|---|---|---|---|---|
| 开源 | ❌ | ❌ | ✅ | ✅ |
| 代理模式 | ❌ | ✅ | ❌ | N/A |
| PII 脱敏 | ✅ | ✅ | ✅ | 自定义 |
| 成本追踪 | ✅ | ✅ | ✅ | 自定义 |
| 链路追踪 | ✅ | 有限 | ✅ | 自定义 |
| 评估框架 | ✅ | ❌ | ✅ | 自定义 |
| 月费 | $39起 | 免费起 | 免费起 | 基础设施费 |
XiDao API 网关:开箱即用的 LLM 可观测性#
如果你正在使用 XiDao API Gateway,你已经拥有了一个强大的可观测性基础。
核心功能#
1. 统一请求日志
XiDao 网关自动记录所有经过的 LLM 调用,无需改动应用代码:
# xidao-gateway 配置
observability:
logging:
enabled: true
format: json
include_request_body: true
include_response_body: true
pii_redaction:
enabled: true
patterns:
- email
- phone
- credit_card
- api_key
storage:
type: elasticsearch
endpoint: "https://es.example.com:9200"
index: "llm-logs-{yyyy.MM.dd}"2. 实时指标暴露
observability:
metrics:
enabled: true
endpoint: /metrics
format: prometheus
custom_labels:
- team
- environment
- cost_centerXiDao 自动生成 llm_request_duration_seconds、llm_tokens_total 等标准指标,可直接接入 Grafana。
3. 分布式追踪注入
observability:
tracing:
enabled: true
exporter: otlp
endpoint: "http://jaeger-collector:4317"
sample_rate: 0.1 # 生产环境采样 10%
propagation: w3c4. 成本看板
XiDao 内置成本追踪,支持按用户、团队、项目维度分析:
# 查看过去 24 小时成本分布
xidao cost report --period 24h --group-by team
# 设置预算告警
xidao cost alert set \
--team=engineering \
--daily-limit=200 \
--hourly-limit=30 \
--webhook=https://hooks.slack.com/xxx5. 多模型 A/B 测试追踪
routing:
ab_tests:
- name: "model-comparison-q2-2026"
variants:
- model: claude-4-opus
weight: 30
- model: gpt-5
weight: 40
- model: gemini-2.5-pro
weight: 30
metrics:
- latency_p95
- quality_score
- cost_per_request最佳实践总结#
分层可观测性架构#
┌─────────────────────────────────────────────────┐
│ 应用层 │
│ 结构化日志 │ 业务指标 │ 质量评分 │
├─────────────────────────────────────────────────┤
│ 采集层 │
│ XiDao Gateway │ OpenTelemetry Collector │
├─────────────────────────────────────────────────┤
│ 存储层 │
│ Elasticsearch │ Prometheus │ ClickHouse │
├─────────────────────────────────────────────────┤
│ 展示层 │
│ Grafana │ LangSmith │ 自建 Dashboard │
├─────────────────────────────────────────────────┤
│ 告警层 │
│ AlertManager │ PagerDuty │ Slack Webhook │
└─────────────────────────────────────────────────┘关键建议#
- 从第一天就开始记录:日志格式确定后很难修改,尽早设计好 schema
- trace_id 贯穿全链路:从用户请求到最终响应,每个环节都要携带
- PII 脱敏是底线:宁可多脱敏,也不要泄露用户数据
- 成本监控要实时:大模型的成本可以在几分钟内失控
- 质量监控要自动化:人工评测不能扩展,必须建立自动评估流水线
- 使用 XiDao 网关简化基础设施:让网关处理日志采集和指标暴露,应用层专注业务逻辑
结语#
2026 年的大模型应用不再是简单的 API 调用——它们是复杂的多模型编排系统。可观测性不是可选项,而是你在生产环境中生存的基本需求。
从结构化日志开始,逐步添加指标监控、链路追踪、质量检测和成本告警。使用 XiDao API Gateway 作为你的可观测性入口,让整个体系的建设变得简单而高效。
记住:你无法优化你看不到的东西。
作者:XiDao 团队 | 2026 年 5 月
想要了解更多 LLM 可观测性实践?访问 XiDao 文档 或加入我们的社区讨论。