构建可靠的 LLM 批处理系统

面向长时运行工作负载的实用扇出架构

今天,我们有一篇由独一无二的 来宾作者 Alexandru Vesa 撰写的文章。Alex 是一名 ML / AI 工程师、创始人,也是 Hyperplane 出版物的创始人。

Alex 把时间花在真实世界的 AI 系统一线,在那里问题不是理论上会失败……而是在凌晨 3 点没有日志、半数数据丢失时才暴露出来!

在这篇文章中,他将带我们学习如何构建可靠的 LLM 批处理处理系统 ,并以一个真实的生产用例为基础。

祝阅读愉快!
Miguel


用于 LLM 批处理系统的分发模式

🙋 在构建任何分布式系统之前,花一分钟问自己: 当它出故障时会发生什么?

如果你正在用 LLMs 处理批量工作负载,你很可能碰到过这个难题:

  • 文档处理 :需要分析 500 份合同,每份约 20 秒。Lambda 在第 45 份合同时超时。
  • 数据丰富 :需要调研 200 家公司资料,每家需调用 3 次 API。任务在 14:58 终止。
  • 报告生成 :80 份月度报告,每份需要 LLM 生成摘要。完成一半,另一半丢失。
  • 图片处理流程 :300 张产品图片需添加图注。进度条停在 23%。

模式总是一样:你的工作负载能处理 10 个条目。然后一个顾客发送了 100 个条目,一切就崩溃了。

👉 我经历过,很多次。

本文展示了我们如何为一个客户解决该问题——一家分析 127 笔交易的销售情报平台。但这一模式适用于任何生成式 AI 批处理工作负载 ,当:

  1. 每个条目都是独立的(条目 A 不需要条目 B 的结果)
  2. 每个商品都需要大量时间(通过 LLM 需 5–60 秒)
  3. 总工作负载超过了 Lambda 的 15 分钟限制

如果这描述了你的系统,继续阅读!

案例研究——销售优惠情报提取

在展示任何架构之前,让我先说明我们要处理的内容。

客户的系统分析销售交易以提取情报信号:

  • 资格评估 :BANT 标准(预算、决策权、需求、时间表),评分 0-100
  • 风险信号 :单线沟通的交易、失联、提及竞争对手、预算问题
  • 购买信号 :定价讨论、演示请求、决策者参与
  • 痛点 :买方试图解决的具体问题
  • 参与质量 :响应模式、内部支持者强度、决策者参与度

以下因素使该领域充满挑战:

  1. Each 优惠 is independent – 优惠 A 的分析不依赖于优惠 B。非常适合并行化。
  2. LLM 调用很慢 – 每个优惠需 25 / 35 秒。串行处理会累积等待时间。
  3. 大型管道 – 客户的管道有 127 个优惠。更多管道有 200+。
  4. 超时约束 – Lambda 的 15 分钟限制成为硬性上限。

传统方法 → 在单个 Lambda 中按顺序执行各个进程。

下面就是当时的情形:

以下是 CloudWatch 日志的样子:

[14:52:33] Processing deal 29/127: "Enterprise Renewal - Q4"
[14:52:58] Deal 29 analysis complete
[14:52:58] Processing deal 30/127: "New Logo - Manufacturing"
[14:58:00] Task timed out after 900.00 seconds 

在 14:58,该函数达到了 Lambda 的 15 分钟超时 并被强制终止。127 个交易中只有 29 个被处理,其余 98 个丢失……没有检查点,也没有恢复机制。

从用户的角度看,作业只是以一条通用的 “任务失败” 消息结束,未指示失败是由超时引起,也未说明实际完成了多少工作。

让我们更仔细地看看原始架构以及它在高负载下为何表现不佳。


顺序处理

以下是出问题的架构示意:

🤔 主要问题是什么?

一次 Lambda,一笔优惠,一次 15 分钟限制。处理了 29 笔优惠……但损失了 98 笔。没有实现检查点机制,所以所有进度都丢失了!

在分析了现有架构之后,下一步显而易见: 我们必须对其进行改变 。真正的问题不是系统是否需要改进 ,而是如何在规模化时使其可靠 

为了解决这一问题,我们必须通过一系列具体的架构选择来推进。总共我们做出了七项关键决策 ,每一项都针对流水线中的不同故障模式。

我们将在下面的各节中逐一讲解。


决策 1 – 协调器问题

第一个架构问题很简单但却至关重要: 谁来协调工作负载?

Lambda 无法自我编排。如果一个 Lambda 函数尝试分发并调用数十个其他 Lambda,它仍然受相同的 15 分钟超时限制。

最终,在工作完成之前,编排器就会超时。你需要一个可以 等待 的组件。

显而易见的首选是 Step Functions

虽然 Step Functions 可以处理编排,但它们带来了自身的 权衡 。它们有各自的限制,并且在此用例中代价过高——你为每一次状态转换付费,而不是为实际计算付费。

更合适的选择是 ECS(Elastic Container Service)

ECS 运行长期存在的 Docker 容器 ,没有严格的执行超时。这使其非常适合用于编排。在我们的案例中,它负责:

  • 将工作负载拆分为批次
  • 并行调用 Lambda 工作进程
  • 通过轮询 Redis 等待完成
  • 收集部分结果
  • 将流水线推进到下一阶段

这里是架构发生根本性变化的地方。我们不再将所有内容都通过单一执行路径,而是采用了分发(fan-out)模式 

在扇出模型中,ECS 充当协调器 ,将工作负载拆分为多个独立批次并将其扇出到并行的 Lambda 工作器 

每个 Lambda 独立处理分配给它的批次 ,而 ECS 则保持活跃以协调进度并处理后续步骤。

结果是: 九个 Lambda 函数并行运行 ,每个处理一批交易。每批大约在七分钟内完成——远低于 Lambda 的 15 分钟限制。所有 127 笔交易均成功处理 ,没有超时也没有丢失工作。

这自然引出一些问题。

为什么是 ECS,而不是更多的 Lambda?

因为必须有人等待。在扇出模式中,协调者必须保持存活直到所有工作者完成。ECS 可以在需要时轮询 Redis 达 30–40 分钟。Lambda 做不到——它在 15 分钟后就会终止。

为什么不把所有东西都用 ECS?

成本与效率。运行九个 ECS 容器来处理交易的成本会远高于调用九个 Lambda。Lambda 适合短期并行计算。ECS 扮演的是耐心的协调者角色。

这种职责分离正是扇出模式能够奏效的原因:Lambda 负责执行,ECS 负责编排 


决定 2 — 批量大小问题

 

每个 Lambda 批处理有多少交易 

批处理大小被证明是一个关键决策。

太小(每批 5 个优惠)

127 个优惠被划分为 26 个批次。此时,编排开销占主导。你花在拆分任务、调用函数和收集结果上的时间,比实际处理的时间更多。

太大(每批 50 个优惠)

50 个优惠 × 每个优惠 30 秒 大约等于 25 分钟——远超 Lambda 的 15 分钟超时限制。批处理永远无法完成。

最佳选择是每批 15 笔交易 

下面是计算方式:

  • 15 笔交易 × 约 30 秒 ≈ 每批 7.5 分钟
  • 这还留有另外 7.5 分钟的缓冲 ,才会触及 Lambda 的超时
  • 127 笔交易 ÷ 15 ≈ 9 个批次 (8 个完整批次和一个包含 7 笔的小批次)

但在实际操作中, 并非所有交易都相同 

有些交易包含数百封电子邮件、几十条笔记和多份通话记录。每一项可能需要 45–60 秒 。完全由“重型”交易组成的批次仍可能使 Lambda 调用接近超时。

理想的方案是基于内容体量的动态批量大小 ——但那会增加复杂性。我们起初采用固定大小 15 并密切监测性能。对于该客户的工作负载,这被证明是一个安全且有效的选择。

这使我们有九个批次。下一个问题是如何在不触及有效负载限制的情况下,将相当于 15 笔交易的数据传递给每个 Lambda。


决策3 – 有效负载问题

Lambda 强制执行严格的 6 MB 有效载荷限制

每个优惠套装包括:

  • 优惠元数据
  • 所有电子邮件(包括完整正文)
  • 注释
  • 电话记录
  • 文档元数据

将其乘以 127 笔交易,您将得到大约 45 MB 的数据 ——远远超过 Lambda 调用所能接受的容量。

我们的第一次尝试是把所有东西存到 Qdrant 里。

我们已经在用 Qdrant 做语义搜索,当时这个想法看起来很合乎逻辑:

Qdrant 存储向量载荷,为什么不把完整的优惠包作为载荷连同它们的嵌入一并存放呢?

一个系统、一条查询、所有东西都在同一个地方。

这是我们尝试过的:

三个问题几乎立刻显现出来。

问题 1 – Qdrant 不是一个二进制对象存储

Qdrant 优化用于 向量相似度搜索 ,并不适合存储和检索大型二进制负载。我们实际上是用法拉利搬家具。

每次带约 500 KB 负载的 upsert 大约花费 800 毫秒 。乘以 127 个交易,单是存储阶段就超过了 100 秒 —— 这还是在任何实际处理开始之前。

问题 2 – 压缩帮助不够

我们尝试压缩优惠套装,但效果有限。电子邮件正文和通话记录通常能压缩到原始大小的 40–60%。一个 1 MB 的套装压缩后仍大约是 500 KB——对于作为 Qdrant 有效载荷来说仍然过大。

我们尝试了多种方法:

  • Gzip:约减小 50%,但所有优惠加起来仍然达到 63.5 MB 总量 
  • Brotli (level 11):比 gzip 约提高 15% 的压缩率,但压缩时间增加约 40%

Brotli 11 级提供最佳压缩比,但它也是最耗 CPU 的。额外的压缩时间很快就超过了边际上的体积节省。

问题 3 —— 检索更糟

写入数据很慢……但读回更慢。

当一个 Lambda 需要处理 15 笔交易的批次时,Qdrant 必须从其存储引擎反序列化 15 个大型负载 。一次本应花费约 ~50 毫秒的获取操作通常需要 3–4 秒 

[14:20:15] Storing deal bundles in Qdrant...
[14:21:55] 127 bundles stored (100.3 seconds)
[14:21:55] Invoking Lambda batch 0...
[14:21:58] Batch 0: Fetching 15 bundles from Qdrant (3.2 seconds)
[14:22:01] Batch 0: Starting LLM analysis...

在九个批次中,仅检索我们已存储的数据就额外增加了 30 多秒的纯开销 

解决方案?将 S3 用作批处理存储

解决办法很直接: 将批处理负载迁移到 S3

S3 是为二进制大对象存储(blob storage) 而构建的。它对大对象高效、在大规模时成本低廉,并且对我们所需的顺序读取非常快速。最重要的是,它完全将 Lambda 的负载大小限制从问题中移除。

现在每个 Lambda 仅在需要时拉取所需的数据。

没有负载限制问题,没有臃肿的调用,也不会把 Qdrant 作为存储层滥用。Qdrant 回归它擅长的领域: 向量检索 ,而不是套装传输。

此时,我们有 九个 Lambda 并行运行 ——这引出了下一个问题: 它们不会同时结束 

那么我们如何可靠地检测到 所有 批次都已完成?


决策 4 – 协调问题

第一个本能反应是轮询数据库:每 5 秒,ECS 会询问,“所有批次都完成了吗?”。

听起来合理……直到你看到它在真实并发下的表现。

想象三个 Lambda 在同一几毫秒内完成:

Lambda 0 finishes at 14:23:45.123 Lambda 1 finishes at 14:23:45.127 Lambda 2 finishes at 14:23:45.131

但 ECS 以固定节奏轮询修复:

ECS polls at 14:23:45.000 → "3/9 complete"
ECS polls at 14:23:50.000 → "3/9 complete" (missed the updates)
ECS polls at 14:23:55.000 → "6/9 complete" (missed more)

现在你已经引入了时间差,这会导致编排器对现实的视图变得过时。在最糟的情况下,ECS 可能认为只有 6 个批次已完成,而实际上全部 9 个已经完成了。

更好的方法:在 Redis 中使用原子计数器

我们不再尝试间接观察完成情况,而是通过原子操作对其进行显式跟踪。

当一个 Lambda 完成其批次时,它会做两件事 

  1. 推送结果
  2. 原子性地递增完成计数器

如果该递增返回 total_batches,该 Lambda 就知道这是最后一个批次。此时,它会尝试获取一个锁:

这很重要,因为 Lambda 可能几乎同时完成。使用 Redis 时:

  • INCR 是原子操作 ,因此计数器始终正确
  • SETNX 是原子操作 ,因此只有一个 Lambda 能获取“收集锁”
  • 没有竞争条件,没有重复收集,也无需猜测

动作中的原子计数器

在作业中途,Redis 可能看起来像这样:

{
  "audit_intelligence_job:job-123": {
    "job_id": "job-123",
    "total_deals": 127,
    "total_batches": 9,
    "status": "extracting",
    "created_at": "2024-01-15T14:20:00Z"
  },
  "audit_intelligence_job:job-123:completed": 7,
  "audit_intelligence_job:job-123:failed": 0,
  "audit_intelligence_results:job-123": [
    "{\"batch_index\":0,\"deal_id\":\"deal-1\",\"intelligence\":{...}}",
    "{\"batch_index\":0,\"deal_id\":\"deal-2\",\"intelligence\":{...}}",
    ...
  ]
}

关键点:完成计数器(completed: 7)独立于作业元数据,可安全轮询。一旦它达到 9,你就可以可靠地知道所有批次都已完成。

既然我们能检测到完成,接下来的问题是: 结果到底是什么样的,应该如何为下游阶段构建?


决策 5 — 结果结构问题

一旦扇出开始运行,下一个问题就无法回避: 一个 Lambda 实际上会产生什么,最后聚合的输出又是什么样子?

在我们的案例中,每个 Lambda 处理一个批次并将其结果写入 Redis。输出单元是 每笔优惠一个 JSON 对象 ,包含提取出的情报以及对底层内容的简要总结。

下面是一个单一优惠结果的示例:

{
  "deal_id": "deal-12345",
  "deal_name": "Enterprise Renewal - Q4",
  "success": true,
  "intelligence": {
    "qualification_score": 72,
    "qualification_gaps": [
      "Timeline unclear - no close date discussed",
      "Budget not confirmed"
    ],
    "risk_level": "medium",
    "risk_signals": [
      "Single threaded - only sales rep contact",
      "Gone silent - no activity in 12 days"
    ],
    "pain_points_identified": [
      "Current system lacks real-time reporting",
      "Manual data entry causing errors"
    ],
    "objections_raised": [
      "Concerned about implementation timeline",
      "Wants to see ROI before committing"
    ],
    "competitors_mentioned": ["CompetitorA", "CompetitorB"],
    "buying_signals": [
      "Requested pricing breakdown",
      "Looped in VP of Engineering"
    ],
    "champion_strength": "moderate",
    "engagement_quality": "good",
    "decision_maker_engaged": true,
    "close_probability": 65,
    "days_to_close_estimate": 45,
    "deal_health_summary": "Deal shows moderate qualification with some risk signals. Champion is engaged but timeline and budget need clarification. Competitor mentions suggest active evaluation.",
    "recommended_next_action": "Schedule call with VP Engineering to clarify timeline and budget constraints",
    "recommended_intervention": null,
    "confidence_level": "high"
  },
  "content_summary": {
    "email_count": 47,
    "note_count": 12,
    "call_count": 8,
    "meeting_count": 3,
    "document_count": 5,
    "total_content_chars": 125000,
    "has_meaningful_content": true
  }
}

那是单个优惠。15 个优惠组成的一批会产生 15 条这样的记录。经过 9 批后,我们最终得到共计 127 条输出 

一旦所有批次完成,ECS 从 Redis 读取结果并构建一个规范结构——以 deal ID 为键的情报地图:

intelligence_map = {
    "deal-12345": DealIntelligence(...),
    "deal-12346": DealIntelligence(...),
    ...
    # 127 deals total
}

这个聚合视图成为下游步骤的输入,例如智能聚类 (将具有相似风险模式的交易分组)和优先级排序 (优先呈现高质量、高意图的交易)。

到目前为止,一切顺利 —— 但这一设计引出了下一个可靠性问题:

当一个批次失败会发生什么?


决策 6 —— 失败处理问题

现在假设第 3 批次失败 。这就有 15 笔交易处于悬而未决的状态。接下来应该怎么做?

最直观的做法是让整个任务失败 。但那是最糟糕的结果:你可能已经成功处理了 112 笔交易 ,却因为一个批次没有完成而将所有工作白白浪费掉。

更好的做法是将失败视为一等状态 :对其进行明确跟踪并在保留部分结果的情况下继续处理。

下面是批处理 3 失败后 Redis 可能的状态:

{
  "audit_intelligence_job:job-123:completed": 8,
  "audit_intelligence_job:job-123:failed": 1,
  "audit_intelligence_job:job-123": {
    "errors": [
      "Batch 3 failed: Lambda timeout after processing 12/15 deals"
    ]
  }
}

在这种模型下,作业仍会完成——只是结果集质量下降。例如:

  • 115/127 笔交易已成功提取
  • 聚类阶段继续使用现有的一切可用智能
  • 用户会收到明确警告,例如 “115/127 笔交易已成功分析。12 笔交易因超时失败。”

这是比 “任务失败” 更重要的一大进步,因为它保留了价值并使系统在失败时更可预测。

我们还学到一点: 并非所有失败都是永久的 。超时可能是因为 LLM API 响应慢,而不是因为批次本身过大。在许多情况下,简单地重试失败的批次就能成功。

自然的下一步改进是对失败批次进行自动重试 (带有限制、回退和良好的可见性)。我们并没有立即实现这一点——但我们设计了系统,使其可以被整洁地加入。

这就引出了协调器本身的问题:即便我们能优雅地处理工作节点故障,ECS 应该等待多久才判定该作业真正卡住?


决策 7 – 协调器超时策略

此时,最后一个问题是:ECS 应该等待多长时间才放弃一个任务?

最简单的解决方案是一个固定超时 ——例如,“等待 30 分钟,然后失败。”
这只在简单的情况下可行。

在实际中,作业持续时间随工作量规模增长而增长。一个包含 127 个交易的流水线可能在 8 分钟内完成。一个包含 500 个交易的流水线可能确实需要 25 分钟。固定的超时要么浪费时间(对小作业等待过久),要么过早终止(截断那些本可完成的大作业)。

更好的方法是使用一种动态超时 ,它会随被处理项的数量而缩放。

以下是我们使用的简化版本:

使用此模型:

  • 127 个交易 → 900 + (127 × 15) = 2805 秒 → 上限为 2400 秒(40 分钟)
  • 50 个交易 → 900 + (50 × 15) = 1650 秒(27.5 分钟)

这让较大的作业有足够时间完成,同时确保较小的作业不会无故闲置 —— 编排器也不会无限期等待。

至此,所有主要的故障模式均已考虑在内,系统可以随工作负载规模可预测地扩展。


完整架构

现在我们已经逐一讲解了各项决策,下面是完整的系统:

我们遇到了一个熟悉的失败模式:127 笔交易 ,估计需要 63.5 分钟的处理时间 ,而 Lambda 的严格 15 分钟超时 。 该任务在 14:58 被终止,只有 29 笔交易 已被处理。

这就是我们的最终成果:

  • ECS 调度器 — 无执行时间限制,负责协调整个工作流
  • Lambda 工作者 — 9 个批次并行运行,每个批次在约 7 分钟内处理 15 笔交易
  • S3 批量存储 — 消除了 Lambda 的 6 MB 有效载荷限制,并取代了失败的 Qdrant 方法
  • Redis 协调 — 使用原子计数器进行可靠完成跟踪,避免竞态条件
  • 优雅的失败处理 — 保留部分结果并明确跟踪失败情况
  • 动态超时 — 协调器等待时间随工作量规模调整

结果:8 分钟内处理 127 笔交易,而不是 63.5 分钟 。没有超时。没有进度丢失。

了解 RecodeX 的更多信息

立即订阅以继续阅读并访问完整档案。

继续阅读