Options
zenflow's Orchestrator is configured through functional options - one constructor (zenflow.New) plus a long list of With* helpers. This page groups them by concern, lists the signature, default behavior, and a short example for each.
There are two flavors of option:
Option- applied at construction time; long-lived (zenflow.New(opts...)).RunFlowOption/RunGoalOption- applied per call (orch.RunFlow(ctx, wf, opts...)).
LLM and tools
WithModel
func WithModel(m provider.LanguageModel) OptionSets the language model used by the orchestrator. Required for RunFlow, RunGoal, and RunAgent - the run methods error explicitly when no model is set.
Default: none. Without WithModel, every run method returns zenflow.ErrModelRequired (canonical text: "zenflow: LLM provider is required (use WithModel)"; match via errors.Is(err, zenflow.ErrModelRequired)).
import "github.com/zendev-sh/goai/provider/anthropic"
model := anthropic.Chat("claude-sonnet-4-6")
orch := zenflow.New(zenflow.WithModel(model))WithDefaultModel
func WithDefaultModel(model string) OptionSets the fallback model identifier (string form, e.g., "bedrock:anthropic.claude-sonnet-4-6") used when a workflow step or AgentConfig does not specify one explicitly.
Default: "" (no fallback - per-step / per-call must specify).
When to use: workflows where most steps share one model and only a few override (e.g., a "fast" reasoner step). The orchestrator passes the resolved string through to goai so the right provider is selected.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithDefaultModel("gemini:gemini-2.5-flash"),
)Orchestrator.DefaultModel() returns the configured value.
WithForceModel
func WithForceModel(modelID string) OptionOverrides every per-agent and per-step Model identifier with the given model name during execution. Empty string disables the override (equivalent to leaving the option off). Precedence (high → low) for effective model resolution is: WithForceModel > Step.Model > AgentConfig.Model > WithDefaultModel.
Default: "" (no override; per-step / per-agent / default-model resolution applies normally).
When to use: cross-provider CLI overrides (e.g., running every step of a workflow through one test provider regardless of what the YAML specifies). For ordinary defaults, prefer WithDefaultModel - it lets per-agent and per-step Model declarations win.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithForceModel("bedrock:anthropic.claude-sonnet-4-6"),
)WithGoAIOptions
func WithGoAIOptions(opts ...goai.Option) OptionForwards extra goai.Option values into every GenerateText / StreamText call zenflow makes. Use this for tracing, custom retry policies, or any goai-level knob zenflow does not expose directly.
Default: no extra options.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithGoAIOptions(
goai.WithTemperature(0.5),
goai.WithMaxRetries(3),
),
)WithTools
func WithTools(tools ...goai.Tool) OptionSets the tool catalog available to agents. Each tool's Execute closure is invoked when the LLM calls the tool by name; zenflow handles the goai loop, permission checks, and progress events.
Default: empty slice (agents can only produce text, no tool calls succeed).
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithTools(
readFileTool,
writeFileTool,
bashTool,
),
)For per-call tool restriction (e.g., a sub-agent gets only read-only tools), set AgentConfig.CallTools instead of using a separate orchestrator.
WithMaxTurns
func WithMaxTurns(n int) OptionCaps the number of turns (LLM call + tool round trips) per RunAgent invocation. Applies to RunAgent only - RunFlow uses per-agent MaxTurns from workflow YAML.
Default: defaultMaxTurns = 50.
When to hit it: agents stuck in a tool-call loop. The runner returns Status: AgentStatusTruncated rather than erroring.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithMaxTurns(20), // tighter cap for per-call agents
)WithMaxDepth
func WithMaxDepth(n int) OptionCaps agent nesting depth - how many levels of child agents can be spawned via the agent tool. Applies to RunAgent only.
Default: 3.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithMaxDepth(5),
)Orchestrator.MaxDepth() returns the configured value (without the runtime fallback).
Coordinator
WithCoordinator
func WithCoordinator(runner *AgentRunner) OptionInstalls a caller-provided AgentRunner as the workflow coordinator. The executor pushes lifecycle events (step start/end, errors, etc.) into the runner's mailbox so a coord LLM can narrate progress, route messages between agents, and produce a final summary.
Default: nil (no coordinator; workflow runs without LLM-driven monitoring).
Lifecycle is the caller's responsibility. The orchestrator never calls runner.Run, never blocks on it, and never checks whether anyone drains the mailbox. Pass nil to disable.
The simplest path is NewDefaultCoordRunner plus a goroutine to host its Run loop. See Core Functions → NewDefaultCoordRunner for the canonical pattern.
coord := zenflow.NewDefaultCoordRunner(model)
go coord.Run(ctx, zenflow.AgentConfig{}, "", "", coord.Tools)
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithCoordinator(coord),
)WithMaxWakeCycles
func WithMaxWakeCycles(n int) OptionCaps wake-driven re-entries into goai per AgentRunner.Run. Each wake cycle = one round trip of mailbox drained → goai called → return to wait.
Default: defaultMaxWakeCycles = 10.
When to raise: workflows where many messages stream into one agent during its run (e.g., a long-running reviewer that receives updates from parallel research steps). The default is conservative; production aggregator workflows often run at 50-250.
When the cap is reached with messages still pending, the runner emits one EventMessageDropped{reason: max-wake-cycles} per remaining message rather than dropping silently.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithMaxWakeCycles(50),
)For coord runners specifically, use zenflow.WithCoordMaxWakeCycles(n) when constructing via NewDefaultCoordRunner - that one defaults to 100.
WithCoordContextProvider
func WithCoordContextProvider(fn func() string) CoordOptionCoordOption (passed to NewDefaultCoordRunner, not to New()). Installs a callback the coord runner invokes once before the first goai.GenerateText call AND once on every wake-driven re-entry after the mailbox drain. The returned string is appended as a fresh user-role message wrapped in <dynamic-context>...</dynamic-context> so the LLM can distinguish ambient state from in-band conversation. Empty / whitespace-only returns are skipped; a nil callback is a no-op.
When to use: chat-driven UX consumers (e.g. an editor integration) that need ambient context refreshed every wake without re-engineering the system prompt - currently-open files, repo metadata, session topic, recent user actions. Keep the callback cheap (microseconds) and goroutine-safe; it runs synchronously on the runner goroutine.
coord := zenflow.NewDefaultCoordRunner(
llm,
zenflow.WithCoordContextProvider(func() string {
return ambientSnapshotForCoord()
}),
)For symmetry, the runner-level option zenflow.WithRunnerWakeContextProvider(fn) exposes the same hook on any AgentRunner constructed via zenflow.NewAgentRunner(...). Coord callers should normally prefer WithCoordContextProvider; the runner-level option is for bespoke runner construction outside the coord factory.
Concurrency and lifecycle
WithMaxConcurrency
func WithMaxConcurrency(n int) OptionSets the maximum number of workflow steps that can run in parallel.
Default: 5.
Precedence: workflow YAML options.maxConcurrency > WithMaxConcurrency(n) orchestrator option > library default 5. Setting YAML maxConcurrency: 0 (or omitting the field) is treated as "unset" and falls through to the next level, so WithMaxConcurrency(n) is honored when the workflow does not pin the value. The default is applied at execution time, not by the parser.
When to tune: raise on machines with many cores or workflows where most steps are LLM-bound (waiting on the network, not CPU). Drop to 1-2 on small CI runners or workflows where steps share rate-limited APIs.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithMaxConcurrency(10),
)WithIsolation
func WithIsolation(iso StepIsolation) OptionInstalls a StepIsolation strategy - Setup is called before each step, Cleanup is deferred after. Use for per-step working directories, container sandboxes, or any other resource scope tied to step lifetime.
Default: nil (no setup/cleanup).
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithIsolation(myWorkdirIsolation),
)The interface is documented in Types.
WithMaxMailboxSize
func WithMaxMailboxSize(n int) OptionBounds the per-step in-memory mailbox queue. When the cap is exceeded, Append rejects the newest message and the router emits EventMessageDropped{reason: mailbox-full} via OnDrop.
Default: 0 (unbounded).
When to set: workflows where a producer can flood a slow consumer. Without a cap, runaway producers can OOM. The default is unbounded for backward compatibility; setting any positive value is a good practice.
Only takes effect with the default InMemoryMailboxStore - custom stores enforce their own caps.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithMaxMailboxSize(1000),
)WithHoldTimeout
func WithHoldTimeout(d time.Duration) OptionBounds how long the executor retains a step in StepIdle while messages keep arriving. After the timeout, the step force-terminates and one EventMessageDropped{reason: hold-timeout} is emitted per remaining mailbox message.
Default: defaultHoldTimeout = 30 seconds.
When to tune: raise for chat-style workflows where idle gaps between user messages are normal; lower for automated pipelines where any idle gap signals a stuck step.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithHoldTimeout(2 * time.Minute),
)WithAgentHandleTTL
func WithAgentHandleTTL(d time.Duration) OptionBounds the start-to-finish wall-clock cap on a RunAgentAsync handle. When the TTL is exceeded the handle is force-completed with AgentError{Sentinel: ErrAgentHandleTimeout} and the inner context is cancelled.
Default: DefaultAgentHandleTTL = 30 minutes. Zero or negative values fall back to the default.
When to tune: raise for long-running async agents (multi-hour research / batch jobs) where 30 minutes is too tight; lower for interactive UIs that should reclaim a stuck handle quickly. The library does not consult any environment variables; CLI consumers wiring ZENFLOW_AGENT_HANDLE_TTL (or any other source) pass the parsed time.Duration to this option themselves.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithAgentHandleTTL(2 * time.Hour),
)AgentRunner-specific options
These options configure a hand-rolled AgentRunner (zenflow.NewAgentRunner). Most consumers use RunFlow / RunGoal / RunAgent and never need them. Use these only when building a custom runner outside the orchestrator (e.g., a bespoke coordinator, an embedding harness, or a test fixture). Each option returns a RunnerOption and is applied by NewAgentRunner(opts...).
Inputs
WithRunnerModel
func WithRunnerModel(m provider.LanguageModel) RunnerOptionSets the language model the runner calls via goai.GenerateText / goai.StreamText.
Default: none. Required - Run errors without a model.
WithRunnerModelID
func WithRunnerModelID(id string) RunnerOptionSets the model identifier string recorded in transcript metadata (e.g., "bedrock:anthropic.claude-sonnet-4-6"). Distinct from WithRunnerModel, which sets the live provider handle.
Default: "".
When to use: resume / replay paths where the saved transcript must record exactly which model produced each step.
WithRunnerSystemPrompt
func WithRunnerSystemPrompt(prompt string) RunnerOptionSets the system prompt forwarded to goai as WithSystem(prompt).
Default: "" (no system prompt).
WithRunnerTools
func WithRunnerTools(tools ...goai.Tool) RunnerOptionSets the tool catalog the runner exposes to goai. Each tool's Execute closure is invoked when the LLM calls the tool by name.
Default: empty slice.
WithRunnerInitialMessages
func WithRunnerInitialMessages(msgs []provider.Message) RunnerOptionPre-loads conversation history that the runner prepends to the fresh user turn passed to Run. Used by Executor.runResume to replay a saved transcript through the standard AgentRunner.Run machinery.
Default: empty (normal, non-resume runs).
WithRunnerGoAIOptions
func WithRunnerGoAIOptions(opts ...goai.Option) RunnerOptionForwards extra goai.Option values into every GenerateText / StreamText call the runner makes (tracing, retry policy, temperature, etc.).
Default: none.
Wiring
WithRunnerRunID
func WithRunnerRunID(id string) RunnerOptionSets the workflow run ID stamped on every event the runner emits. Pair with WithRunnerStepID so consumers can correlate runner output with the parent workflow.
Default: "".
WithRunnerStepID
func WithRunnerStepID(id string) RunnerOptionSets the step ID stamped on every event the runner emits.
Default: "".
WithRunnerStateRef
func WithRunnerStateRef(s *goai.AgentState) RunnerOptionWires a goai.AgentState into the runner so an external poller can observe the tool-loop lifecycle without holding a lock. See AgentState for the lifecycle contract.
Default: nil.
When to use: UIs that need lock-free state polling (e.g., a TUI status bar refreshing at 60fps).
WithRunnerMailbox
func WithRunnerMailbox(m MailboxStore) RunnerOptionSets the MailboxStore the runner reads inter-agent messages from. Pair with WithRunnerWake to enable mailbox-mode delivery.
Default: nil (legacy single-call mode, no inter-agent messaging).
WithRunnerWake
func WithRunnerWake(ch chan struct{}) RunnerOptionSets the wake-signal channel the DeliveryEngine pings when the runner's mailbox has unread messages.
Default: nil. Pair with WithRunnerMailbox.
WithRunnerRouter
func WithRunnerRouter(rt *MessageRouter) RunnerOptionWires a shared MessageRouter into the runner so child spawns inherit a live router for inter-agent messaging.
Default: nil (legacy single-call path with no messaging).
WithRunnerTranscript
func WithRunnerTranscript(ts resume.TranscriptStore) RunnerOptionWires a TranscriptStore so the runner persists the step's conversation on every goai step-finish hook AND on Run exit.
Default: nil. Required for the Resume Mechanism.
WithRunnerWakeContextProvider
func WithRunnerWakeContextProvider(fn func() string) RunnerOptionInstalls a callback the runner invokes once before the first LLM call AND once after every wake-driven mailbox drain. The returned string is appended as a fresh user-role message wrapped in <dynamic-context>...</dynamic-context>. Empty / whitespace returns are skipped; nil callback is a no-op.
Default: nil.
When to use: bespoke runners that need per-wake ambient context refresh (open files, repo metadata, session topic). Coord callers should normally use WithCoordContextProvider - that option threads into this hook.
Behavior
WithRunnerStreaming
func WithRunnerStreaming() RunnerOptionEnables streaming mode - tokens are surfaced via ProgressSink.OnOutput as they arrive instead of as a single block after the LLM call returns.
Default: off.
WithRunnerVerbose
func WithRunnerVerbose() RunnerOptionEnables verbose output - the runner's LLM responses are surfaced via ProgressSink.OnOutput in addition to lifecycle events.
Default: off.
WithRunnerMaxWakeCycles
func WithRunnerMaxWakeCycles(n int) RunnerOptionCaps wake-driven re-entries into goai.GenerateText per Run. Zero or negative falls back to the package default.
Default: defaultMaxWakeCycles = 10. (Coord runners constructed via NewDefaultCoordRunner default to 100.)
WithRunnerSpawnDepth
func WithRunnerSpawnDepth(depth int) RunnerOptionRecords the recursion depth of this runner relative to the top-level RunAgent invocation. Stamped onto EventToolCall payloads so TUI consumers can collapse nested-spawn cards under the parent.
Default: 0 (top-level).
WithRunnerSpawnParentCallID
func WithRunnerSpawnParentCallID(id string) RunnerOptionRecords the agent-tool ToolCallID that produced this runner via SpawnChild. Emitted on every EventToolCall in Data["parentCallID"] so consumers can route nested events into the parent's children list.
Default: "".
WithRunnerPermissions
func WithRunnerPermissions(h PermissionHandler) RunnerOptionSets the permission handler consulted before every tool call. Returning false rejects the call.
Default: nil (every tool call permitted).
WithRunnerProgress
func WithRunnerProgress(s ProgressSink) RunnerOptionInstalls the ProgressSink the runner emits lifecycle events and streaming output through.
Default: nil (events discarded).
WithRunnerPreStartDrainGate
func WithRunnerPreStartDrainGate(gate <-chan struct{}) RunnerOptionTest-only hook: when non-nil, Run blocks on receive from this channel BEFORE the first mailbox drain. Lets tests hold the pre-start drain while setting up mailbox preconditions.
Default: nil (production behavior).
Storage and memory
WithStorage
func WithStorage(s Storage) OptionSets the persistence backend. Used by RunFlow to checkpoint step results (enabling ResumeFlow) and by WithSharedMemory to persist cross-step memory.
Default: NewMemoryStorage() (in-process map; lost when the orchestrator exits). The zenflow CLI binary installs NewFileStorage(~/.zenflow/runs/) instead of this default to enable --resume across processes; library embedders inherit the in-memory default.
For resume: use NewFileStorage(dir) or implement the Storage interface against your database.
RunAgent does not persist state - this option only affects workflow runs.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithStorage(zenflow.NewFileStorage("/var/lib/zenflow")),
)WithSharedMemory
func WithSharedMemory(sm *SharedMemory) OptionInstalls a SharedMemory instance so shared_memory_read and shared_memory_write tools auto-inject into agent tool chains during RunFlow / ResumeFlow. Lets agents collaborate via a key-value store namespaced by writer.
Default: nil (no shared memory tools available).
sm := zenflow.NewSharedMemory()
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithSharedMemory(sm),
)See Concepts → Shared Memory for the read/write contract.
WithTranscriptStore
func WithTranscriptStore(factory func() TranscriptStore) OptionInstalls a TranscriptStore factory used by the resume mechanism. The factory is invoked once per Executor.Run so each run gets a fresh store.
Default: per-run InMemoryTranscriptStore (intra-run resume only).
When to use: cross-run or cross-process resume. Supply a persistent (file / SQLite / etc.) store.
WithMaxTranscriptMessages, WithMaxTranscriptBytes
func WithMaxTranscriptMessages(n int) Option
func WithMaxTranscriptBytes(b int64) OptionOverride the per-step caps for the default InMemoryTranscriptStore. Ignored when a custom store is supplied via WithTranscriptStore.
Defaults: 10000 messages, 50 MiB.
Permissions and approval
WithPermissions
func WithPermissions(h PermissionHandler) OptionSets the permission handler. Each tool call invokes RequestPermission before execution; returning false rejects the call and surfaces the rejection through the goai tool loop.
Default: nil (every tool call permitted).
When to use: interactive CLIs (prompt the user), security policies (deny destructive tools), or audit logging.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithPermissions(myInteractivePermissionPrompt),
)WithApproval
func WithApproval(h ApprovalHandler) OptionSets the plan approval handler used by RunGoal. After the LLM produces a workflow plan, the handler is asked to approve it before execution. Returning false aborts the run with the sentinel zenflow.ErrPlanDenied (canonical text: "zenflow: plan denied by approval handler"; match via errors.Is(err, zenflow.ErrPlanDenied)). See Errors → Sentinel errors.
Default: nil (plans execute immediately, no confirmation).
When to use: CLI consumers that want a "type yes to run this DAG" prompt; programmatic gates that reject plans touching forbidden tools or steps.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithApproval(myCLIPromptApprover),
)WithApprovalTimeout
func WithApprovalTimeout(d time.Duration) OptionBounds how long ApprovalHandler.ApprovePlan may block. Must be applied after WithApproval - the option wraps the previously installed handler, so applying it before WithApproval is a no-op. On timeout, ApprovePlan returns (false, zenflow.ErrApprovalTimeout) and RunGoal aborts cleanly. Applying the option twice is safe; the second call is a no-op (no double-wrapping).
Default: zero (no timeout - the handler may block indefinitely). Zero or negative values are also a no-op.
When to use: interactive CLI / TUI approval prompts where a human may walk away from the keyboard, or programmatic gates that must not stall a long-running orchestrator. Distinguish "timeout" from "user cancel" by inspecting the error: errors.Is(err, zenflow.ErrApprovalTimeout).
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithApproval(myCLIPromptApprover),
zenflow.WithApprovalTimeout(2 * time.Minute),
)Output
WithProgress
func WithProgress(s ProgressSink) OptionInstalls a ProgressSink. zenflow emits one event per lifecycle transition (workflow start/end, step start/end, agent turn, tool call, error, etc.) and one Output per streaming token (when streaming is on).
Default: nil (events discarded; no observable progress).
The two built-in sinks live in zenflow/sink:
sink.NewStdoutSink()- human-readable progress with glyphs and colors.sink.NewJSONSink()- NDJSON to stdout, the CLI's--jsonmode.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithProgress(sink.NewStdoutSink()),
)For composition (e.g., write to both file and stdout), implement ProgressSink and fan out manually.
WithStreaming
func WithStreaming() Option
func WithoutStreaming() OptionEnables streaming mode. Content visible to the user is delivered token-by-token via ProgressSink.OnOutput instead of as full text after the LLM call returns.
Default: false.
When to enable: TUIs, chat interfaces, anywhere a user is watching output flow in real time. CI / batch / scripted use cases want it off (full lines per event are easier to parse).
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithProgress(sink),
zenflow.WithStreaming(),
)For dynamic toggling, the deprecated WithStreamingBool(enabled bool) shim accepts a bool but will be removed before v1.0.
WithVerbose
func WithVerbose() Option
func WithoutVerbose() OptionEnables agent output display. When on, agent LLM responses are surfaced via ProgressSink.OnOutput in addition to the workflow events and coordinator narration. When off, only workflow events and narration are shown.
Default: false.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithProgress(sink),
zenflow.WithVerbose(),
)For dynamic toggling, the deprecated WithVerboseBool(enabled bool) shim accepts a bool but will be removed before v1.0.
WithOutputTransform
func WithOutputTransform(t OutputTransformer) OptionInstalls a transformer applied to step outputs before they are injected into dependent steps' prompts. Use this to implement smart truncation based on the target model's context window.
Default: nil (uses fixed-size byte truncation, currently 16 KB per dep, 120 KB total prompt cap).
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithOutputTransform(myContextAwareCompactor),
)WithProgressBufferSize
func WithProgressBufferSize(n int) OptionControls the buffer size of the non-blocking progress sink wrapper. Larger buffers tolerate slower downstream sinks at the cost of more buffered memory.
Default: defaultEventBusBuffer = 1024.
Behavior: emits are non-blocking while the buffered channel has capacity. On overflow, the wrapper applies up to 1 second of bounded back-pressure; if the channel is still full at the deadline, the event is dropped and an internal counter is incremented.
Observability
WithTracer
func WithTracer(t Tracer) OptionInstalls a Tracer for distributed tracing. The OTel sub-module (zenflow/observability/otel) provides an implementation that creates OTel spans for every workflow, goal, agent, and step run.
Default: nil (no spans produced).
Span names: zenflow.flow, zenflow.goal, zenflow.agent, zenflow.step, zenflow.coordinator, zenflow.loop / zenflow.loop.iteration, zenflow.include. Attributes vary by span - see Integrations → Observability.
import zenotel "github.com/zendev-sh/zenflow/observability/otel"
orch := zenflow.New(
zenflow.WithModel(model),
zenotel.WithTracing(),
)WithDropCallback
func WithDropCallback(fn func(DropEvent)) OptionInstalls a user-supplied observer invoked once per dropped router message, in addition to the EventMessageDropped event already emitted via ProgressSink. Use this for metrics / alerting paths that don't want to subscribe to the full event firehose.
Default: nil.
Synchronous by default. Set WithDropCallbackBufferSize to a positive value to dispatch asynchronously through a buffered channel.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithDropCallback(func(d zenflow.DropEvent) {
prometheusCounter.WithLabelValues(d.Reason.String()).Inc()
}),
)WithDropCallbackBufferSize
func WithDropCallbackBufferSize(n int) OptionSelects the buffer size for asynchronous dispatch of drop-callback events.
Default: 0 (synchronous dispatch).
Overflow behavior: if the buffered channel fills up, the callback falls back to synchronous dispatch so no drop event is itself silently lost.
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithDropCallback(myCallback),
zenflow.WithDropCallbackBufferSize(64),
)WithRouterObserver
func WithRouterObserver(fn func(*MessageRouter)) OptionRegisters a callback invoked once per RunAgent / RunFlow invocation with the freshly-allocated MessageRouter for that run. Intended for observability hooks (telemetry, debug inspectors, integration tests) that need a handle on the per-call router without polling internal state.
Default: nil.
Panic semantics: if the callback panics, the panic IS recovered (the run continues) and logged via slog.Error. Production callers typically leave this unset.
Per-call options
WithFlowContext
func WithFlowContext(ctx string) RunFlowOptionSupplies use-case input to the workflow steps for one specific RunFlow call. Behavior depends on whether a coordinator is installed:
- With
WithCoordinator: the context is pushed into the coord's mailbox as the first event (workflow_startwithContextfield set) so the coord LLM can curate per-step forwards. - Without a coordinator (or
WithCoordinator(nil)): the context is blanket-prepended to every step's effective user prompt as a static fallback.
Default: "" (no flow context, no blanket injection).
result, err := orch.RunFlow(ctx, wf,
zenflow.WithFlowContext("PR #123: optimize message routing"),
)WithGoalContext
func WithGoalContext(ctx string) RunGoalOptionSupplies additional context (beyond the goal text itself) to the RunGoal decomposition prompt. Appended as a clearly-labelled ## Goal Context section so the decomposition LLM uses it without parsing the goal text for context cues.
Default: "" (no extra context).
result, err := orch.RunGoal(ctx,
"Audit the message routing path for race conditions",
zenflow.WithGoalContext("Repo uses Mailbox+Wake delivery; pay attention to that contract."),
)Other options
A few options exist for advanced use cases or testing infrastructure:
WithMailboxStore(factory)- replace the defaultInMemoryMailboxStorewith a custom backend (e.g., SQLite for multi-process workflows). Factory is invoked once per run.WithMailboxDelivery()/WithoutMailboxDelivery()- toggle the entire mailbox + delivery engine stack. Defaults to enabled. Mostly used by tests that exercise the scheduler path without messaging machinery. DeprecatedWithMailboxDeliveryBool(enabled bool)shim accepts a bool but will be removed before v1.0.WithExternalInbox(ids...)- pre-register non-step sender inboxes (e.g.,"coordinator") on the MessageRouter so reverse-routed responses don't drop withDropReasonUnknownStep.WithModelResolver(r)- install aModelResolverconsulted by the resume path when a saved transcript references a model identifier different from the executor's default.WithTruncationOnCapReached()/WithoutTruncationOnCapReached()- configure the resume path to fall back to a truncated load when a sealed transcript hits its cap. Default disabled (sealed transcripts fail the resume loudly). DeprecatedWithTruncationOnCapReachedBool(enabled bool)shim accepts a bool but will be removed before v1.0.WithRunID(runID string)- pin the orchestrator's run identifier. Without this, zenflow generates a fresh ID internally; useful when an HTTP server has already returned a run ID to the caller and needs the emitted events to carry the same ID.
Coordinator-loop options
These options configure RunCoordinatorLoop (see Core Functions → RunCoordinatorLoop). They are a separate option family (CoordLoopOption) - not interchangeable with Option / RunFlowOption / RunGoalOption.
WithCleanupTimeout
func WithCleanupTimeout(d time.Duration) CoordLoopOptionBounds the cleanup phase of the func returned by RunCoordinatorLoop. After the inner context is cancelled, the cleanup waits up to d for the loop goroutine to exit, then returns. Caps shutdown latency: a stuck coord LLM call cannot block the caller indefinitely.
Default: DefaultCoordCleanupTimeout = 2 seconds. Zero or negative values fall back to the default.
cleanup := zenflow.RunCoordinatorLoop(ctx, runner, modelID,
zenflow.WithCleanupTimeout(5 * time.Second),
)
defer cleanup()Public defaults
A handful of Default* constants are exported so embedders can compute relative caps (e.g., "2x the default mailbox size") without hardcoding magic numbers that may shift in future releases.
| Constant | Value | What it caps |
|---|---|---|
DefaultMaxBytesPerDep | 8 * 1024 (8 KiB) | Per-dependency byte cap applied by the orchestrator's default OutputTransform when WithOutputTransform is not provided. Truncates upstream step content before injection into a downstream step's prompt. |
DefaultMaxMailboxSize | 10000 | Per-step mailbox cap installed by New() when WithMaxMailboxSize is not provided. Pass WithMaxMailboxSize(0) to opt out of the cap; pass any positive value to override. |
DefaultCoordCleanupTimeout | 2 * time.Second | Default for WithCleanupTimeout on RunCoordinatorLoop. The cap on the cleanup func's wait-for-exit phase. |
DefaultAgentHandleTTL | 30 * time.Minute | Wall-clock TTL on RunAgentAsync handles. Override per-orchestrator via WithAgentHandleTTL. Documented under Types → AgentError. |