Skip to content

Orchestrator

The Orchestrator is the public Go type you construct once per process and use everywhere else. It owns your model, your sinks, your storage, and the optional coordinator runner, and it exposes the three entry points (RunFlow, RunGoal, RunAgent) that drive every other concern in zenflow.

It is, deliberately, the only top-level type to learn.

go
orch := zenflow.New(
    zenflow.WithModel(llm),
    zenflow.WithProgress(sink),
    zenflow.WithStorage(zenflow.NewFileStorage(".zenflow")),
    zenflow.WithCoordinator(zenflow.NewDefaultCoordRunner(llm)),
)
defer orch.Close()

result, err := orch.RunFlow(ctx, wf)

Construction is plain functional options. Every With* option mutates one orchestrator-level field; nothing happens at construction beyond wiring. The first goroutine spawn is on the first RunFlow/RunGoal/RunAgent call.

What it owns

Field groupWhat it isHow it's set
Model & toolsThe default provider.LanguageModel, the registered tool catalog, and per-call goai optionsWithModel, WithTools, WithGoAIOptions
Storage & memoryThe Storage backend (memory or file) and the optional shared-memory store for cross-step KVWithStorage, WithSharedMemory, WithModelResolver
Sinks & approvalProgress sink, drop callback, approval handler, permission handler, tracerWithProgress, WithDropCallback, WithApproval, WithPermissions, WithTracer
CoordinatorThe hosted *AgentRunner that handles cross-agent messaging when one is installedWithCoordinator(runner) (optional)
Workflow defaultsDefault model, max-concurrency, max-turns, max-depth, isolation, output transformWithDefaultModel, WithMaxConcurrency, WithMaxTurns, WithIsolation, WithOutputTransform

The orchestrator is stateless across runs for the workflow itself: every RunFlow call constructs a fresh Executor, a fresh MessageRouter, and a fresh in-memory MailboxStore. What persists across runs is the configuration you set on the orchestrator and, if you wired one, the coordinator runner instance (so the coord LLM keeps its conversation continuity if you opt to share it).

Three entry points

Entry pointWhat you give itWhat it does
RunFlow(ctx, wf)A parsed *Workflow (from LoadWorkflow or built in code)Runs a fully-declared YAML DAG end to end
RunGoal(ctx, goal)A single-sentence goal stringAsks the coordinator to decompose the goal into a Workflow, then runs it
RunAgent(ctx, cfg)An AgentConfig for one agentRuns a single agent's tool loop with no DAG and no coordinator

ResumeFlow(ctx, runID, wf) is the fourth entry point for picking up a previously-checkpointed run from Storage.

All four return either a *WorkflowResult (or *AgentResult for RunAgent) plus an error. The result carries per-step status, token totals, and content.

Why it's the only type you import

Almost every other zenflow type is reachable through the orchestrator:

orchestrator ownership graph

Orchestratorzenflow.New(opts...) · long-lived · holds config + entry points · concurrency-safeExecutorper-run · transient · walks the DAG, spawns AgentRunners, returns WorkflowResultexecutor.goMessageRouterper-run mailbox bus · hub-and-spoke routinginternal/router/MailboxStorein-memory by default · interface, swap with file or sqlite backendinternal/router/AgentRunnerthe coordinator (optional) · LLM hub · forward_to_agent · narrate · finalizeagent_runner.go · coord_factory.goProgressSinkyour hook into lifecycle events · stdout · json · custominterfaces.goStorageyour checkpoint backend · MemoryStorage · FileStorage · customstorage_file.go · storage_mem.goWorkflowpassed in, not owned · parsed from YAML or built in codeworkflow.go
Solid borders mark types the orchestrator owns (constructs or holds a reference to). The dashed border on Workflow marks the one type you pass in on each Run* call; it travels through the orchestrator but isn't part of its state.

Public consumers rarely instantiate any of these directly. Tests construct an Executor or a bare AgentRunner for unit-level tests; production code stays inside the orchestrator surface.

Lifecycle

go
orch := zenflow.New(opts...)   // 1. construct (sync, no goroutines)
defer orch.Close()             // 4. drain (handle registry, cancel coord goroutines)

result, err := orch.RunFlow(ctx, wf)  // 2. run (spawns Executor + step runners)
// ... inspect result ...
result2, err := orch.RunFlow(ctx, wf2) // 3. run again, same orchestrator

Close() is idempotent. It drains every active RunAgentAsync handle (best-effort, bounded) and signals long-lived goroutines (the coordinator's wake loop, the factory cache cleanup) to exit. Once closed, new Run* calls return ErrOrchestratorClosed.

For one-shot CLI invocations, defer orch.Close() is enough. For long-lived embedders (HTTP servers, queue workers), call Close() during shutdown, and remember the orchestrator is concurrency-safe: many goroutines can call RunFlow on the same instance.

Orchestrator vs Coordinator vs Executor

These three terms come up in every doc. They are not synonymous:

  • Orchestrator is the public Go type you construct (zenflow.Orchestrator). It's the owner.
  • Coordinator is an LLM-backed AgentRunner the orchestrator hosts when you pass WithCoordinator(...). It's the messaging hub.
  • Executor is a per-run internal struct that walks the DAG and spawns one AgentRunner per step. It's the scheduler.

See the agent orchestration architecture diagram (external SVG) for the full picture, including the message flow between the three.

Where to next

Released under the Apache 2.0 License.