构建可靠的 LLM 批处理系统
面向长时运行工作负载的实用扇出架构
今天,我们有一篇由独一无二的 来宾作者 Alexandru Vesa 撰写的文章。Alex 是一名 ML / AI 工程师、创始人,也是 Hyperplane 出版物的创始人。
Alex 把时间花在真实世界的 AI 系统一线,在那里问题不是理论上会失败……而是在凌晨 3 点没有日志、半数数据丢失时才暴露出来!
在这篇文章中,他将带我们学习如何构建可靠的 LLM 批处理处理系统 ,并以一个真实的生产用例为基础。
祝阅读愉快!
Miguel

🙋 在构建任何分布式系统之前,花一分钟问自己: 当它出故障时会发生什么?
如果你正在用 LLMs 处理批量工作负载,你很可能碰到过这个难题:
- 文档处理 :需要分析 500 份合同,每份约 20 秒。Lambda 在第 45 份合同时超时。
- 数据丰富 :需要调研 200 家公司资料,每家需调用 3 次 API。任务在 14:58 终止。
- 报告生成 :80 份月度报告,每份需要 LLM 生成摘要。完成一半,另一半丢失。
- 图片处理流程 :300 张产品图片需添加图注。进度条停在 23%。
模式总是一样:你的工作负载能处理 10 个条目。然后一个顾客发送了 100 个条目,一切就崩溃了。
👉 我经历过,很多次。
本文展示了我们如何为一个客户解决该问题——一家分析 127 笔交易的销售情报平台。但这一模式适用于任何生成式 AI 批处理工作负载 ,当:
- 每个条目都是独立的(条目 A 不需要条目 B 的结果)
- 每个商品都需要大量时间(通过 LLM 需 5–60 秒)
- 总工作负载超过了 Lambda 的 15 分钟限制
如果这描述了你的系统,继续阅读!

案例研究——销售优惠情报提取
在展示任何架构之前,让我先说明我们要处理的内容。
客户的系统分析销售交易以提取情报信号:
- 资格评估 :BANT 标准(预算、决策权、需求、时间表),评分 0-100
- 风险信号 :单线沟通的交易、失联、提及竞争对手、预算问题
- 购买信号 :定价讨论、演示请求、决策者参与
- 痛点 :买方试图解决的具体问题
- 参与质量 :响应模式、内部支持者强度、决策者参与度
以下因素使该领域充满挑战:
- Each 优惠 is independent – 优惠 A 的分析不依赖于优惠 B。非常适合并行化。
- LLM 调用很慢 – 每个优惠需 25 / 35 秒。串行处理会累积等待时间。
- 大型管道 – 客户的管道有 127 个优惠。更多管道有 200+。
- 超时约束 – Lambda 的 15 分钟限制成为硬性上限。
传统方法 → 在单个 Lambda 中按顺序执行各个进程。
下面就是当时的情形:

以下是 CloudWatch 日志的样子:
[14:52:33] Processing deal 29/127: "Enterprise Renewal - Q4"
[14:52:58] Deal 29 analysis complete
[14:52:58] Processing deal 30/127: "New Logo - Manufacturing"
[14:58:00] Task timed out after 900.00 seconds
在 14:58,该函数达到了 Lambda 的 15 分钟超时 并被强制终止。127 个交易中只有 29 个被处理,其余 98 个丢失……没有检查点,也没有恢复机制。
从用户的角度看,作业只是以一条通用的 “任务失败” 消息结束,未指示失败是由超时引起,也未说明实际完成了多少工作。
让我们更仔细地看看原始架构以及它在高负载下为何表现不佳。
顺序处理
以下是出问题的架构示意:

🤔 主要问题是什么?
一次 Lambda,一笔优惠,一次 15 分钟限制。处理了 29 笔优惠……但损失了 98 笔。没有实现检查点机制,所以所有进度都丢失了!
在分析了现有架构之后,下一步显而易见: 我们必须对其进行改变 。真正的问题不是系统是否需要改进 ,而是如何在规模化时使其可靠 。
为了解决这一问题,我们必须通过一系列具体的架构选择来推进。总共我们做出了七项关键决策 ,每一项都针对流水线中的不同故障模式。
我们将在下面的各节中逐一讲解。
决策 1 – 协调器问题
第一个架构问题很简单但却至关重要: 谁来协调工作负载?
Lambda 无法自我编排。如果一个 Lambda 函数尝试分发并调用数十个其他 Lambda,它仍然受相同的 15 分钟超时限制。
最终,在工作完成之前,编排器就会超时。你需要一个可以 等待 的组件。
显而易见的首选是 Step Functions。
虽然 Step Functions 可以处理编排,但它们带来了自身的 权衡 。它们有各自的限制,并且在此用例中代价过高——你为每一次状态转换付费,而不是为实际计算付费。
更合适的选择是 ECS(Elastic Container Service)。
ECS 运行长期存在的 Docker 容器 ,没有严格的执行超时。这使其非常适合用于编排。在我们的案例中,它负责:
- 将工作负载拆分为批次
- 并行调用 Lambda 工作进程
- 通过轮询 Redis 等待完成
- 收集部分结果
- 将流水线推进到下一阶段
这里是架构发生根本性变化的地方。我们不再将所有内容都通过单一执行路径,而是采用了分发(fan-out)模式 。

在扇出模型中,ECS 充当协调器 ,将工作负载拆分为多个独立批次并将其扇出到并行的 Lambda 工作器 。
每个 Lambda 独立处理分配给它的批次 ,而 ECS 则保持活跃以协调进度并处理后续步骤。
结果是: 九个 Lambda 函数并行运行 ,每个处理一批交易。每批大约在七分钟内完成——远低于 Lambda 的 15 分钟限制。所有 127 笔交易均成功处理 ,没有超时也没有丢失工作。
这自然引出一些问题。
为什么是 ECS,而不是更多的 Lambda?
因为必须有人等待。在扇出模式中,协调者必须保持存活直到所有工作者完成。ECS 可以在需要时轮询 Redis 达 30–40 分钟。Lambda 做不到——它在 15 分钟后就会终止。
为什么不把所有东西都用 ECS?
成本与效率。运行九个 ECS 容器来处理交易的成本会远高于调用九个 Lambda。Lambda 适合短期并行计算。ECS 扮演的是耐心的协调者角色。
这种职责分离正是扇出模式能够奏效的原因:Lambda 负责执行,ECS 负责编排 。
决定 2 — 批量大小问题

每个 Lambda 批处理有多少交易 ?
批处理大小被证明是一个关键决策。
太小(每批 5 个优惠)
127 个优惠被划分为 26 个批次。此时,编排开销占主导。你花在拆分任务、调用函数和收集结果上的时间,比实际处理的时间更多。
太大(每批 50 个优惠)
50 个优惠 × 每个优惠 30 秒 大约等于 25 分钟——远超 Lambda 的 15 分钟超时限制。批处理永远无法完成。
最佳选择是每批 15 笔交易 。
下面是计算方式:
- 15 笔交易 × 约 30 秒 ≈ 每批 7.5 分钟
- 这还留有另外 7.5 分钟的缓冲 ,才会触及 Lambda 的超时
- 127 笔交易 ÷ 15 ≈ 9 个批次 (8 个完整批次和一个包含 7 笔的小批次)
但在实际操作中, 并非所有交易都相同 。
有些交易包含数百封电子邮件、几十条笔记和多份通话记录。每一项可能需要 45–60 秒 。完全由“重型”交易组成的批次仍可能使 Lambda 调用接近超时。
理想的方案是基于内容体量的动态批量大小 ——但那会增加复杂性。我们起初采用固定大小 15 并密切监测性能。对于该客户的工作负载,这被证明是一个安全且有效的选择。
这使我们有九个批次。下一个问题是如何在不触及有效负载限制的情况下,将相当于 15 笔交易的数据传递给每个 Lambda。
决策3 – 有效负载问题

Lambda 强制执行严格的 6 MB 有效载荷限制
每个优惠套装包括:
- 优惠元数据
- 所有电子邮件(包括完整正文)
- 注释
- 电话记录
- 文档元数据
将其乘以 127 笔交易,您将得到大约 45 MB 的数据 ——远远超过 Lambda 调用所能接受的容量。
我们的第一次尝试是把所有东西存到 Qdrant 里。
我们已经在用 Qdrant 做语义搜索,当时这个想法看起来很合乎逻辑:
Qdrant 存储向量和载荷,为什么不把完整的优惠包作为载荷连同它们的嵌入一并存放呢?
一个系统、一条查询、所有东西都在同一个地方。
这是我们尝试过的:

三个问题几乎立刻显现出来。
问题 1 – Qdrant 不是一个二进制对象存储
Qdrant 优化用于 向量相似度搜索 ,并不适合存储和检索大型二进制负载。我们实际上是用法拉利搬家具。
每次带约 500 KB 负载的 upsert 大约花费 800 毫秒 。乘以 127 个交易,单是存储阶段就超过了 100 秒 —— 这还是在任何实际处理开始之前。
问题 2 – 压缩帮助不够
我们尝试压缩优惠套装,但效果有限。电子邮件正文和通话记录通常能压缩到原始大小的 40–60%。一个 1 MB 的套装压缩后仍大约是 500 KB——对于作为 Qdrant 有效载荷来说仍然过大。
我们尝试了多种方法:
- Gzip:约减小 50%,但所有优惠加起来仍然达到 63.5 MB 总量 。
- Brotli (level 11):比 gzip 约提高 15% 的压缩率,但压缩时间增加约 40%
Brotli 11 级提供最佳压缩比,但它也是最耗 CPU 的。额外的压缩时间很快就超过了边际上的体积节省。
问题 3 —— 检索更糟
写入数据很慢……但读回更慢。
当一个 Lambda 需要处理 15 笔交易的批次时,Qdrant 必须从其存储引擎反序列化 15 个大型负载 。一次本应花费约 ~50 毫秒的获取操作通常需要 3–4 秒 。
[14:20:15] Storing deal bundles in Qdrant...
[14:21:55] 127 bundles stored (100.3 seconds)
[14:21:55] Invoking Lambda batch 0...
[14:21:58] Batch 0: Fetching 15 bundles from Qdrant (3.2 seconds)
[14:22:01] Batch 0: Starting LLM analysis...
在九个批次中,仅检索我们已存储的数据就额外增加了 30 多秒的纯开销 。
解决方案?将 S3 用作批处理存储
解决办法很直接: 将批处理负载迁移到 S3。
S3 是为二进制大对象存储(blob storage) 而构建的。它对大对象高效、在大规模时成本低廉,并且对我们所需的顺序读取非常快速。最重要的是,它完全将 Lambda 的负载大小限制从问题中移除。

现在每个 Lambda 仅在需要时拉取所需的数据。
没有负载限制问题,没有臃肿的调用,也不会把 Qdrant 作为存储层滥用。Qdrant 回归它擅长的领域: 向量检索 ,而不是套装传输。
此时,我们有 九个 Lambda 并行运行 ——这引出了下一个问题: 它们不会同时结束 !
那么我们如何可靠地检测到 所有 批次都已完成?
决策 4 – 协调问题
第一个本能反应是轮询数据库:每 5 秒,ECS 会询问,“所有批次都完成了吗?”。
听起来合理……直到你看到它在真实并发下的表现。
想象三个 Lambda 在同一几毫秒内完成:
Lambda 0 finishes at 14:23:45.123 Lambda 1 finishes at 14:23:45.127 Lambda 2 finishes at 14:23:45.131
但 ECS 以固定节奏轮询修复:
ECS polls at 14:23:45.000 → "3/9 complete"
ECS polls at 14:23:50.000 → "3/9 complete" (missed the updates)
ECS polls at 14:23:55.000 → "6/9 complete" (missed more)
现在你已经引入了时间差,这会导致编排器对现实的视图变得过时。在最糟的情况下,ECS 可能认为只有 6 个批次已完成,而实际上全部 9 个已经完成了。
更好的方法:在 Redis 中使用原子计数器
我们不再尝试间接观察完成情况,而是通过原子操作对其进行显式跟踪。
当一个 Lambda 完成其批次时,它会做两件事 :
- 推送结果
- 原子性地递增完成计数器

如果该递增返回 total_batches,该 Lambda 就知道这是最后一个批次。此时,它会尝试获取一个锁:

这很重要,因为 Lambda 可能几乎同时完成。使用 Redis 时:
- INCR 是原子操作 ,因此计数器始终正确
- SETNX 是原子操作 ,因此只有一个 Lambda 能获取“收集锁”
- 没有竞争条件,没有重复收集,也无需猜测
动作中的原子计数器

在作业中途,Redis 可能看起来像这样:
{
"audit_intelligence_job:job-123": {
"job_id": "job-123",
"total_deals": 127,
"total_batches": 9,
"status": "extracting",
"created_at": "2024-01-15T14:20:00Z"
},
"audit_intelligence_job:job-123:completed": 7,
"audit_intelligence_job:job-123:failed": 0,
"audit_intelligence_results:job-123": [
"{\"batch_index\":0,\"deal_id\":\"deal-1\",\"intelligence\":{...}}",
"{\"batch_index\":0,\"deal_id\":\"deal-2\",\"intelligence\":{...}}",
...
]
}
关键点:完成计数器(completed: 7)独立于作业元数据,可安全轮询。一旦它达到 9,你就可以可靠地知道所有批次都已完成。
既然我们能检测到完成,接下来的问题是: 结果到底是什么样的,应该如何为下游阶段构建?
决策 5 — 结果结构问题
一旦扇出开始运行,下一个问题就无法回避: 一个 Lambda 实际上会产生什么,最后聚合的输出又是什么样子?
在我们的案例中,每个 Lambda 处理一个批次并将其结果写入 Redis。输出单元是 每笔优惠一个 JSON 对象 ,包含提取出的情报以及对底层内容的简要总结。
下面是一个单一优惠结果的示例:
{
"deal_id": "deal-12345",
"deal_name": "Enterprise Renewal - Q4",
"success": true,
"intelligence": {
"qualification_score": 72,
"qualification_gaps": [
"Timeline unclear - no close date discussed",
"Budget not confirmed"
],
"risk_level": "medium",
"risk_signals": [
"Single threaded - only sales rep contact",
"Gone silent - no activity in 12 days"
],
"pain_points_identified": [
"Current system lacks real-time reporting",
"Manual data entry causing errors"
],
"objections_raised": [
"Concerned about implementation timeline",
"Wants to see ROI before committing"
],
"competitors_mentioned": ["CompetitorA", "CompetitorB"],
"buying_signals": [
"Requested pricing breakdown",
"Looped in VP of Engineering"
],
"champion_strength": "moderate",
"engagement_quality": "good",
"decision_maker_engaged": true,
"close_probability": 65,
"days_to_close_estimate": 45,
"deal_health_summary": "Deal shows moderate qualification with some risk signals. Champion is engaged but timeline and budget need clarification. Competitor mentions suggest active evaluation.",
"recommended_next_action": "Schedule call with VP Engineering to clarify timeline and budget constraints",
"recommended_intervention": null,
"confidence_level": "high"
},
"content_summary": {
"email_count": 47,
"note_count": 12,
"call_count": 8,
"meeting_count": 3,
"document_count": 5,
"total_content_chars": 125000,
"has_meaningful_content": true
}
}
那是单个优惠。15 个优惠组成的一批会产生 15 条这样的记录。经过 9 批后,我们最终得到共计 127 条输出 。
一旦所有批次完成,ECS 从 Redis 读取结果并构建一个规范结构——以 deal ID 为键的情报地图:
intelligence_map = {
"deal-12345": DealIntelligence(...),
"deal-12346": DealIntelligence(...),
...
# 127 deals total
}
这个聚合视图成为下游步骤的输入,例如智能聚类 (将具有相似风险模式的交易分组)和优先级排序 (优先呈现高质量、高意图的交易)。
到目前为止,一切顺利 —— 但这一设计引出了下一个可靠性问题:
当一个批次失败会发生什么?
决策 6 —— 失败处理问题
现在假设第 3 批次失败 。这就有 15 笔交易处于悬而未决的状态。接下来应该怎么做?
最直观的做法是让整个任务失败 。但那是最糟糕的结果:你可能已经成功处理了 112 笔交易 ,却因为一个批次没有完成而将所有工作白白浪费掉。
更好的做法是将失败视为一等状态 :对其进行明确跟踪并在保留部分结果的情况下继续处理。
下面是批处理 3 失败后 Redis 可能的状态:
{
"audit_intelligence_job:job-123:completed": 8,
"audit_intelligence_job:job-123:failed": 1,
"audit_intelligence_job:job-123": {
"errors": [
"Batch 3 failed: Lambda timeout after processing 12/15 deals"
]
}
}
在这种模型下,作业仍会完成——只是结果集质量下降。例如:
- 115/127 笔交易已成功提取
- 聚类阶段继续使用现有的一切可用智能
- 用户会收到明确警告,例如 “115/127 笔交易已成功分析。12 笔交易因超时失败。”
这是比 “任务失败” 更重要的一大进步,因为它保留了价值并使系统在失败时更可预测。
我们还学到一点: 并非所有失败都是永久的 。超时可能是因为 LLM API 响应慢,而不是因为批次本身过大。在许多情况下,简单地重试失败的批次就能成功。
自然的下一步改进是对失败批次进行自动重试 (带有限制、回退和良好的可见性)。我们并没有立即实现这一点——但我们设计了系统,使其可以被整洁地加入。
这就引出了协调器本身的问题:即便我们能优雅地处理工作节点故障,ECS 应该等待多久才判定该作业真正卡住?
决策 7 – 协调器超时策略
此时,最后一个问题是:ECS 应该等待多长时间才放弃一个任务?
最简单的解决方案是一个固定超时 ——例如,“等待 30 分钟,然后失败。”
这只在简单的情况下可行。
在实际中,作业持续时间随工作量规模增长而增长。一个包含 127 个交易的流水线可能在 8 分钟内完成。一个包含 500 个交易的流水线可能确实需要 25 分钟。固定的超时要么浪费时间(对小作业等待过久),要么过早终止(截断那些本可完成的大作业)。
更好的方法是使用一种动态超时 ,它会随被处理项的数量而缩放。
以下是我们使用的简化版本:

使用此模型:
- 127 个交易 → 900 + (127 × 15) = 2805 秒 → 上限为 2400 秒(40 分钟)
- 50 个交易 → 900 + (50 × 15) = 1650 秒(27.5 分钟)
这让较大的作业有足够时间完成,同时确保较小的作业不会无故闲置 —— 编排器也不会无限期等待。
至此,所有主要的故障模式均已考虑在内,系统可以随工作负载规模可预测地扩展。
完整架构
现在我们已经逐一讲解了各项决策,下面是完整的系统:

我们遇到了一个熟悉的失败模式:127 笔交易 ,估计需要 63.5 分钟的处理时间 ,而 Lambda 的严格 15 分钟超时 。 该任务在 14:58 被终止,只有 29 笔交易 已被处理。
这就是我们的最终成果:
- ECS 调度器 — 无执行时间限制,负责协调整个工作流
- Lambda 工作者 — 9 个批次并行运行,每个批次在约 7 分钟内处理 15 笔交易
- S3 批量存储 — 消除了 Lambda 的 6 MB 有效载荷限制,并取代了失败的 Qdrant 方法
- Redis 协调 — 使用原子计数器进行可靠完成跟踪,避免竞态条件
- 优雅的失败处理 — 保留部分结果并明确跟踪失败情况
- 动态超时 — 协调器等待时间随工作量规模调整
结果:8 分钟内处理 127 笔交易,而不是 63.5 分钟 。没有超时。没有进度丢失。