DEEP DIVE · AI・ML
上級オーケストレータ型エージェントの作り方: タスク分割・委譲・統合を実装する
LLM エージェントの orchestrator-workers パターンを、概念ではなく実装の手触りまで落として解説します。中央のオーケストレータがタスクを動的に分割し、ワーカーに委譲し、結果を統合するループを、状態管理・並列実行・エラー回復・打ち切り条件まで含めて擬似コードで組み立てます。どこを LLM に任せ、どこをコードで縛るかの線引きが設計の肝です。
オーケストレータの本体は LLM ではなく、その周りを固めるループ・状態・ガードレールのコードです。
概要
orchestrator-workers 型エージェントは「オーケストレータ LLM がタスクを動的に分割し、ワーカー LLM に委譲して結果を統合する」パターンです。本稿では計画 → 委譲 → 統合のループを擬似コードで実装し、サブタスクの状態管理、ワーカーの並列実行、失敗時のリトライと再計画、無限ループを防ぐ打ち切り条件までを具体的に設計します。LLM に任せる判断とコードで固定する制御を分ける線引きが、壊れにくいオーケストレータの分かれ目です。
前提知識
この記事を読み進めるうえで知っておくとスムーズなトピック。
要点
先に押さえておきたい、この深掘りの核となるポイント。
オーケストレータは「計画 → 委譲 → 統合」の制御ループ
オーケストレータ LLM が出すのはサブタスクの一覧という構造化データで、それを実行するのは周囲のコードです。LLM は判断を、コードは制御を担当する役割分担を最初に決めます。
サブタスクは状態機械として管理する
各サブタスクに pending / running / done / failed の状態を持たせ、依存関係 (DAG) を解決しながら実行可能なものから処理します。状態を外に出すことで再開やデバッグが容易になります。
ワーカーは独立・並列・隔離が基本
依存のないサブタスクは並列に実行し、各ワーカーには必要なコンテキストとツールだけを渡します。ワーカー同士を疎結合に保つほど、失敗の切り分けと再試行が簡単になります。
失敗は握りつぶさず、再計画の材料にする
ワーカーの失敗を構造化エラーとしてオーケストレータに返し、リトライ・代替手段・サブタスクの作り直しを判断させます。同時に最大ステップ数や予算で打ち切り条件を設けます。
統合フェーズは別の責務として分ける
ワーカーの出力をそのまま連結するのではなく、統合専用の LLM 呼び出し (または評価器) で矛盾の解消・重複の除去・最終整形を行います。
オーケストレータが解く課題
単一の ReAct エージェントは、ツールを使いながら 1 本の思考の流れでタスクを進めます。これで解ける問題は多いのですが、「何をすればよいか自体を実行前に決め打ちできない」タスクになると途端に苦しくなります。たとえば「このリポジトリの壊れたテストを全部直して」や「この製品について 5 つの観点で競合調査して」のように、サブタスクの数も内容も入力次第で変わるものです。
orchestrator-workers パターンは、この「動的な分割」を中心に据えます。中央のオーケストレータ LLM がタスクを見てサブタスクへ分割し、それぞれを専門のワーカーに委譲し、最後に結果を統合します。Parallelization (並列化) と似ていますが、決定的な違いはサブタスクを事前に決め打ちしない点です。分割の仕方そのものをオーケストレータが判断します。
ここで最初に押さえておきたいのは、オーケストレータの「本体」は LLM ではないということです。LLM が出すのは「次に何をすべきか」という構造化データに過ぎません。それを受け取って実際にワーカーを起動し、状態を更新し、失敗をハンドリングし、ループを回すのは周囲のコードです。設計の重心は、華やかなプロンプトよりもこの地味な制御コードの側にあります。
全体ループ: 計画 → 委譲 → 統合
まず骨格を擬似コードで示します。言語は問いません。雰囲気は Python 風です。
def orchestrate(goal: str, budget: Budget) -> Result:
plan = plan_subtasks(goal) # ① 計画: LLM がサブタスクへ分割
state = TaskGraph(plan) # ② 状態: DAG として保持
while not state.is_complete():
if budget.exhausted(): # 打ち切り条件 (後述)
break
ready = state.ready_tasks() # 依存が解けた実行可能タスク
results = run_parallel( # ③ 委譲: ワーカーを並列起動
[run_worker(t) for t in ready],
max_concurrency=budget.max_parallel,
)
for task, outcome in zip(ready, results):
if outcome.ok:
state.complete(task, outcome.value)
else:
handle_failure(state, task, outcome.error, budget) # ④ 回復
return integrate(goal, state.all_outputs()) # ⑤ 統合: 別の LLM 呼び出し
このループには、設計上の判断ポイントが 5 つ埋め込まれています。①計画をどう構造化して受け取るか、②状態をどう持つか、③どう並列化し隔離するか、④失敗をどう回復するか、⑤統合を誰に任せるか。以降はこの順に掘り下げます。
① 計画: サブタスクを構造化出力で受け取る
計画フェーズの肝は、オーケストレータ LLM の出力を自由文ではなく構造化データとして受け取ることです。function calling や JSON スキーマで型を縛り、コード側がそのまま機械的に扱える形にします。
PLAN_SCHEMA = {
"type": "object",
"properties": {
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"goal": {"type": "string"}, # ワーカーへの指示
"depends_on":{"type": "array", "items": {"type": "string"}},
"tools": {"type": "array", "items": {"type": "string"}},
},
"required": ["id", "goal", "depends_on"],
},
}
},
"required": ["subtasks"],
}
depends_on を持たせることで、サブタスク間の依存関係をオーケストレータ自身に宣言させられます。「テストを列挙する」が終わってから「各テストを修正する」が走る、という順序がそのまま DAG (有向非巡回グラフ) になります。tools を含めれば、ワーカーに渡す権限を最小化する材料にもなります (この最小化はセキュリティ上きわめて重要で、姉妹記事の AI エージェントのセキュリティ で詳述します)。
計画プロンプトで意識すべきは「分割しすぎない」ことです。サブタスクが細かすぎるとオーバーヘッドとトークン消費が膨らみ、粗すぎるとワーカー 1 体が抱える複雑さが ReAct の限界に逆戻りします。「ワーカー 1 体が 1 つの明確な成果物を返せる粒度」を目安にすると安定します。
② 状態: サブタスクを状態機械で管理する
サブタスクは、それぞれが小さな状態機械です。最低限 pending (依存待ち) / ready (実行可能) / running / done / failed を持たせます。この状態を LLM のコンテキストの中ではなく外側のデータ構造に持つことが、再開可能性とデバッグ性を決定づけます。
class TaskGraph:
def ready_tasks(self):
# 依存タスクがすべて done かつ自身が未着手のものを返す
return [t for t in self.tasks
if t.status == "pending"
and all(self.tasks[d].status == "done" for d in t.depends_on)]
def complete(self, task, value):
task.status, task.output = "done", value
def is_complete(self):
return all(t.status in ("done", "skipped") for t in self.tasks)
状態を外部 (メモリ上のグラフ、必要なら永続ストア) に置く利点は 3 つあります。第一に、途中で落ちても done のサブタスクをスキップして再開できること。第二に、各サブタスクの入出力を後から監査・評価できること。第三に、オーケストレータ LLM のコンテキストを軽く保てることです。完了済みサブタスクの全文をコンテキストに積み続けると、長いタスクではすぐに溢れます。要約や ID 参照で渡し、本体は外に置きます。
③ 委譲: ワーカーを並列・隔離して動かす
依存のないサブタスクは並列に実行できます。ここでワーカーをどう隔離するかが品質に効きます。各ワーカーには、そのサブタスクに必要なコンテキストとツールだけを渡し、他のワーカーの内部状態は見せません。疎結合に保つほど、1 体の失敗が全体に波及しにくくなり、再試行も局所的に済みます。
async def run_worker(task: Subtask) -> Outcome:
# ワーカーは独立した ReAct ループ。渡すのは task.goal と
# task.tools で許可された最小限のツールだけ。
agent = ReActAgent(
instructions=task.goal,
tools=resolve_tools(task.tools), # 最小権限で解決
timeout=task.timeout,
)
try:
value = await agent.run()
return Outcome(ok=True, value=value)
except (ToolError, Timeout) as e:
return Outcome(ok=False, error=str(e)) # 失敗を構造化して返す
並列実行で必ず設けるべきガードは 2 つです。並列数の上限 (max_concurrency) と個別タイムアウトです。並列数を絞らないと外部 API のレート制限やコスト爆発を招きます。タイムアウトがないと、1 体のワーカーが詰まっただけで全体が止まります。Anthropic がマルチエージェント調査システムについて書いた事例でも、トークン消費が単一エージェントの十数倍に達しうる点が報告されており、並列度とコストの管理はオーケストレータ運用の中心的な関心事です。
④ エラー回復: 失敗を再計画の入力にする
ワーカーの失敗を握りつぶしてはいけません。かといって即座に全体を失敗にするのも乱暴です。失敗は「オーケストレータが次の判断をするための入力」として扱います。
def handle_failure(state, task, error, budget):
task.retries += 1
if task.retries <= budget.max_retries:
task.status = "pending" # 同じワーカーで素直に再試行
task.last_error = error # 失敗理由をワーカーへ伝える
return
# リトライ上限に達した → オーケストレータに再計画を委ねる
decision = replan(state.summary(), failed=task, error=error)
if decision.action == "substitute":
state.replace(task, decision.new_subtasks) # 別アプローチに差し替え
elif decision.action == "skip":
task.status = "skipped" # 部分的成功を許容
else:
task.status = "failed" # ここで初めて諦める
ポイントは 3 段階の回復です。まず同一ワーカーでのリトライ。このとき last_error を次の試行に渡すと、ワーカーは自己修正できます (Reflexion 的な反省メモリの最小形です)。次にリトライで直らなければオーケストレータによる再計画。「別の手段を試す」「そのサブタスクは飛ばして部分的成功で進む」をオーケストレータ LLM に判断させます。それでも駄目なら諦める。「全部成功か全部失敗か」の二択にせず、部分的成功を扱えるようにしておくと、現実のタスクで生存率が大きく上がります。
そして全体には必ず打ち切り条件を置きます。最大ステップ数、トークン/コスト予算、壁時計時間のいずれか (理想的には全部) です。オーケストレータは再計画を繰り返すうちに同じ場所をぐるぐる回ることがあるため、この上限がない実装は本番に出せません。
⑤ 統合: 連結ではなく統合する
すべてのワーカーが終わったら、出力を最終成果物にまとめます。ここでやりがちな失敗が、ワーカーの出力を単に連結することです。並列に動いた複数のワーカーは、互いに矛盾する結論や重複した内容を出していることがよくあります。
統合は独立した責務として、専用の LLM 呼び出し (あるいは evaluator-optimizer の評価器) に任せます。
def integrate(goal: str, outputs: list[Output]) -> Result:
# 統合専用プロンプト: 矛盾の解消・重複の除去・元のゴールへの整列
return llm.complete(
system="あなたは複数の調査結果を統合する編集者です。"
"矛盾は明示し、重複は除き、元の依頼に答える形に整えます。",
user=render(goal, outputs),
)
統合フェーズを分けておくと、品質チェックの差し込み口にもなります。「元のゴールに本当に答えているか」を評価器に判定させ、不十分ならもう一周オーケストレーションに戻す、という evaluator-optimizer ループを上に重ねられます。
LLM に任せる線とコードで縛る線
最後に、このパターン全体を貫く設計原則を 1 つにまとめます。判断は LLM に、制御はコードにです。
| 責務 | 担当 | 理由 |
|---|---|---|
| サブタスクへの分割方針 | LLM | 入力依存で動的。決め打ちできない |
| 依存解決・実行順序 | コード | 決定的に再現できるべき |
| ワーカーへのツール付与 | コード | 最小権限を機械的に強制する |
| 失敗時に再試行か再計画か | LLM | 文脈を読んだ判断が要る |
| リトライ上限・予算・タイムアウト | コード | 安全弁は LLM の気分で動かさない |
| 出力の統合・矛盾解消 | LLM | 意味的な編集が必要 |
この線引きを曖昧にすると、オーケストレータは「制御もプロンプト任せ」になり、再現性とコスト管理を同時に失います。逆にここをきれいに分けておけば、LLM が多少不安定でも周囲のコードが暴走を食い止めます。
まとめ
orchestrator-workers の作り方は、突き詰めると「計画 → 委譲 → 統合のループに、状態機械・並列制御・エラー回復・打ち切り条件という安全弁を編み込む」ことに尽きます。LLM が出すのは構造化された計画と判断だけで、それを動かし続けるのは周囲のコードです。
実装の重心は計画プロンプトではなく、状態・DAG 管理とエラー回復にあります。そして動的にツールを付与する以上、ワーカーに何を許すかというセキュリティの設計と必ずセットになります。次は、そのエージェント特有のセキュリティを AI エージェントのセキュリティ で扱います。パターン全体の地図は LLM エージェントの設計パターン を併せて参照してください。
関連するデイリーニュース
このテーマが取り上げられた平日のノート。