Types
Reference for every public value type in the zenflow package. Types are grouped by concern; for option helpers see Options; for *Orchestrator methods see Core Functions.
All types live in package github.com/zendev-sh/zenflow unless noted otherwise (sink helpers live in github.com/zendev-sh/zenflow/sink).
Workflow definition
Workflow
type Workflow struct {
Name string
Description string
Version int
Agents map[string]AgentConfig
Includes map[string]string
Steps []Step
Options WorkflowOptions
BaseDir string // set by LoadWorkflow; empty for programmatic use
}Top-level structure for a workflow. Fields that map directly to YAML/JSON keys carry the obvious tags; BaseDir is set by LoadWorkflow to the directory of the source file so relative contextFiles resolve against it.
A workflow without Steps is valid only as a sub-workflow target for Includes.
Step
type Step struct {
ID string
Agent string
Instructions string
DependsOn []string
ContextFiles []string
Model string // overrides the agent's default model
Timeout Duration
Retries int
MaxRetries *int
Condition *string // CEL expression
Include string // sub-workflow reference
Loop *Loop
}A single node in the workflow DAG.
IDmust match^[a-zA-Z][a-zA-Z0-9_-]*$. Validated byLoadWorkflow.Agentreferences a key inWorkflow.Agents. Empty string falls back to the default agent.DependsOnlists step IDs that must complete before this step runs. Cycles are rejected at parse time.ContextFilespaths are resolved againstWorkflow.BaseDirand read into the step's prompt as additional context.Timeout,Retries,MaxRetriesare per-step overrides;WorkflowOptionscarries workflow-wide defaults.Conditionis a CEL expression evaluated against the workflow's variables and previous step outputs; false means skip.Includereferences a sub-workflow by name (looked up inWorkflow.Includes).Loopmakes this step iterative (see below).
Loop
type Loop struct {
MaxIterations *int
Until *string // CEL stop condition
UntilAgent string // judge agent for stop decision
ForEach any // iterable: list, range, or CEL expression
MaxConcurrency int
Delay Duration
Steps []Step
OutputMode LoopOutputMode // "last" (default) or "cumulative"
}OutputMode is a typed string for compile-time safety; use the named constants below.
Iteration config for a step. The most common shapes:
ForEach: [...]+Steps: [...]- run the inner steps once per item.MaxIterations: ptr(N)+Until: "...condition..."- run up to N times or until the condition is true.UntilAgent: "judge"- delegate the stop decision to a judge agent.
OutputMode controls how the loop step's StepResult.Content is constructed for downstream consumers:
"last"(or empty, the default) - content is the last inner step of the final iteration only. Right for refine-style loops."cumulative"- content concatenates every iteration's output (plus judge feedback whenUntilAgentis set). Right for aggregator loops where downstream needs the full history.
const (
LoopOutputModeLast LoopOutputMode = "last"
LoopOutputModeCumulative LoopOutputMode = "cumulative"
)AgentConfig
type AgentConfig struct {
// YAML/JSON-declared:
Description string
Prompt string
Model string
Tools []string // names; resolved from orchestrator catalog
DisallowedTools []string
MaxTurns int
Temperature *float64
TopP *float64
ResultSchema map[string]any
// Per-call (never serialized):
Name string
CallTools []goai.Tool // resolved tool values; overrides Tools
ProgressSink ProgressSink
SubagentToolSet string // advisory label
Attachments []provider.Part // multimodal: text fragments, image refs, PDF bytes
SessionID string // for ListAgentHandles partitioning
}Defines a named agent's role, model, and tool access. Two halves to the struct:
- YAML-declared fields (above the
// Per-calldivider) describe a workflow-level agent.Toolsis a list of tool NAMES because workflow YAML can only carry strings; the orchestrator resolves them against its registered tool catalog. - Per-call fields are populated at
RunAgent/RunAgentAsynctime to override orchestrator defaults for one invocation.CallToolscarries resolved[]goai.Toolvalues;ProgressSinkoverridesWithProgress;Attachmentscarries multimodalprovider.Partvalues (text snippets, image refs, PDF byte payloads) the agent loop concatenates onto the user message at the start of the run;SessionIDkeys the live-handle registry forListAgentHandles.
Tools and DisallowedTools are workflow-only fields - they are read by the executor when filtering tools for a workflow-declared agent, and ignored on the RunAgent path (use CallTools there).
WorkflowOptions
type WorkflowOptions struct {
MaxConcurrency int
MaxRetries *int
OnStepFailure FailureStrategy // "cascade" | "skip-dependents" | "abort"
Timeout Duration
StepTimeout Duration
Isolation string
Scheduler SchedulerStrategy // "dependency-first" | "round-robin" | "least-busy"
}Workflow-wide execution knobs. Per-step values override these. OnStepFailure and Scheduler are typed strings for compile-time safety; use the named constants below.
Failure strategy constants:
const (
FailureCascade FailureStrategy = "cascade"
FailureSkipDependents FailureStrategy = "skip-dependents"
FailureAbort FailureStrategy = "abort"
)Scheduler strategy constants:
const (
SchedulerDependencyFirst SchedulerStrategy = "dependency-first"
SchedulerRoundRobin SchedulerStrategy = "round-robin"
SchedulerLeastBusy SchedulerStrategy = "least-busy"
)Duration
type Duration time.DurationWraps time.Duration with YAML/JSON string parsing ("30s", "5m", etc.). Used in Step.Timeout, Loop.Delay, WorkflowOptions.Timeout.
Run results
WorkflowResult
type WorkflowResult struct {
RunID string
Status WorkflowStatus
Steps map[string]*StepResult
Duration time.Duration
Tokens provider.Usage
Summary string
}Outcome of a RunFlow / RunGoal / ResumeFlow call.
RunID- the run identifier; same value flows through every event'sRunIDfield.Status- terminal workflow state (seeWorkflowStatusbelow).Steps- map keyed by step ID. Note this is amap, so iteration order is non-deterministic; if you need ordered output, walkwf.Stepsand look up by ID.Duration- wall clock from start to terminal status.Tokens- aggregatedprovider.Usageacross every LLM call (steps + coordinator).Summary- human-readable summary produced by the coordinator'sfinalizetool. Empty when no coordinator was installed or the coord did not callfinalize.
StepResult
type StepResult struct {
ID string
Status StepStatus
Content string
Result map[string]any
Tokens provider.Usage
Duration time.Duration
Error error
PreserveContent bool
}Outcome of a single step.
Content- the agent's final assistant text (after the last LLM turn).Result- structured output ifAgentConfig.ResultSchemawas set on the agent; nil otherwise.Tokens- per-step LLM usage.Error- non-nil if the step enteredStepFailed. Error text describes whether the failure was a tool error, a model error, or a downstream cascade.PreserveContent- signals to the prompt assembler that this step's content was intentionally aggregated (e.g., by acumulative-mode loop) and must not be subject to per-dep truncation. The overall prompt cap still applies as a final safety net.
WorkflowStatus
type WorkflowStatus string
const (
StatusRunning WorkflowStatus = "running"
StatusCompleted WorkflowStatus = "completed"
StatusFailed WorkflowStatus = "failed"
StatusPartial WorkflowStatus = "partial"
)StatusRunning- workflow currently executing (visible only in mid-run progress events;RunFlownever returns this).StatusCompleted- all steps reachedStepCompleted.StatusFailed- workflow failed with no completed steps (e.g., the first step errored under theabortstrategy).StatusPartial- some steps completed but at least one failed or was skipped.
StepStatus
type StepStatus string
const (
StepCompleted StepStatus = "completed"
StepFailed StepStatus = "failed"
StepSkipped StepStatus = "skipped"
StepCancelled StepStatus = "cancelled"
)StepCompleted- the step finished successfully.StepFailed- the step encountered an error (model error, tool error, panic, timeout).StepSkipped- skipped due to a failed dependency under theskip-dependentsstrategy, or because aConditionevaluated false.StepCancelled- cancelled due to a failed dependency under thecascadestrategy, or the step was scheduled but the workflow's context was cancelled BEFORE the step could be dispatched. In-flight steps interrupted by cancellation surface asStepFailedwithcontext.Canceled.
Run
type Run struct {
ID string
Workflow *Workflow
Status WorkflowStatus
Steps map[string]*StepResult
}A persisted workflow execution instance. Used by Storage.SaveRun / Storage.LoadRun. Most application code interacts with WorkflowResult instead.
Agent runtime
AgentRunner
type AgentRunner struct { /* all fields unexported */ }
func NewAgentRunner(opts ...RunnerOption) *AgentRunnerExecutes a single-agent conversation loop with tool calling. Most callers do not construct AgentRunner directly - the orchestrator builds them per RunAgent / per workflow step. The notable exception is the coordinator, where NewDefaultCoordRunner(llm, opts...) returns a runner the caller hosts in their own goroutine.
All fields are unexported. Configure via the WithRunner* functional options documented in Options.
Capabilities the runner holds (set via the corresponding WithRunner* options):
- Model, tools, permissions, progress sink, goai option pass-through - same semantics as the orchestrator-level options.
- Streaming and verbose toggles - per-runner overrides of the orchestrator-level streaming knobs.
- Mailbox + wake channel - inter-agent message delivery. When both are configured the runner enters mailbox-mode and consumes inter-agent messages via the Mailbox+Wake protocol; otherwise the runner skips messaging entirely.
- Message router - the
MessageRoutershared with sibling/child runners. - Max wake cycles - per-runner cap; falls back to package default.
- Transcript store, model ID, system prompt - resume support; the store persists conversation history per step; the metadata is what
Executor.ResumeStepconsults to reconstruct the invocation. - Initial messages - pre-loaded conversation history, used by the resume path to replay a saved transcript through standard
AgentRunner.Runmachinery.
Minimal usage example:
runner := zenflow.NewAgentRunner(
zenflow.WithRunnerModel(model),
zenflow.WithRunnerTools(tools),
zenflow.WithRunnerPermissions(permHandler),
zenflow.WithRunnerProgress(progressSink),
)
result, err := runner.Run(ctx, cfg, userMessage, modelID, tools)The runner's Run method is the entry point - it owns the goai tool loop:
func (r *AgentRunner) Run(
ctx context.Context,
cfg AgentConfig,
userMessage string,
model string,
tools []goai.Tool,
attachments ...provider.Part,
) (*AgentResult, error)AgentRunnerOption
type AgentRunnerOption = RunnerOptionDeprecated: prefer RunnerOption for new code. AgentRunnerOption is a back-compat alias kept on the public facade so external SDK consumers do not see a breaking rename - the canonical internal name is RunnerOption (the older AgentRunner prefix was dropped to avoid stutter with the package name). Both spellings resolve to the same underlying type, so WithRunner* options accept either form interchangeably. New code should use RunnerOption; existing call sites compile unchanged.
AgentResult
type AgentResult struct {
Content string
Result map[string]any
Tokens provider.Usage
Turns int
Status AgentStatus
Duration time.Duration
Error error
}Outcome of a single agent run. For synchronous RunAgent, only Content, Result, Tokens, Turns, Status, Duration are set; errors return as the second return value.
For RunAgentAsync handles, the same struct is delivered over AgentHandle.Done() and Error carries any terminal error - including AgentError-wrapped sentinels for TTL timeout, cancel, and panic-recover. Async consumers MUST check Error before trusting the other fields.
AgentStatus
type AgentStatus string
const (
AgentStatusCompleted AgentStatus = "completed"
AgentStatusTruncated AgentStatus = "truncated"
)AgentStatusCompleted- agent finished normally (LLM returned no further tool calls).AgentStatusTruncated- agent hit itsMaxTurnscap mid-conversation.
AgentHandle
type AgentHandle struct {
ID string
// ... internal fields ...
}
func NewAgentHandle(id string) *AgentHandle
func (h *AgentHandle) Done() <-chan AgentResult
func (h *AgentHandle) Cancel() errorReturned by RunAgentAsync. The caller drives completion via Done() and may force-terminate via Cancel().
ID- stable for the lifetime of the handle. Format:agent-<UUID v4>. Flows through everyProgressSinkevent the underlying runner emits.Done()- read-only channel, buffered size 1, delivers exactly one terminalAgentResultthen closes. Multiple reads after close yield the zero value.Cancel()- force-terminates. SubsequentDone()reads seeAgentResult{Error: AgentError{Sentinel: ErrAgentCancelled}}. Idempotent.
NewAgentHandle is for tests that construct standalone handles outside RunAgentAsync. Production code should always go through RunAgentAsync. Panics if id is empty.
AgentError
type AgentError struct {
Sentinel error
Msg string
}
func (e AgentError) Error() string
func (e AgentError) Unwrap() errorWraps a sentinel error class with optional human-readable detail. errors.Is(AgentError{Sentinel: X}, X) returns true.
Sentinels exposed alongside:
ErrAgentHandleTimeout- the handle exceeded its TTL (default 30 minutes;DefaultAgentHandleTTL).ErrAgentCancelled- cancelled viaAgentHandle.Cancel().ErrAgentPanicked- the agent goroutine recovered a panic; the recovered value is inMsg.
DefaultAgentHandleTTL = 30 * time.Minute. SDK consumers: call zenflow.WithAgentHandleTTL(d). CLI users may set the ZENFLOW_AGENT_HANDLE_TTL env var; the CLI maps it to the option, but the library never reads env vars directly.
Messaging
MessageRouter
type MessageRouter struct { /* internal fields */ }
func NewMessageRouter() *MessageRouter
func (r *MessageRouter) SetMailbox(store MailboxStore)
func (r *MessageRouter) Mailbox() MailboxStore
func (r *MessageRouter) RegisterStep(stepID string)
func (r *MessageRouter) RegisterInbox(stepID string)
func (r *MessageRouter) RegisterWrapperStep(stepID string)
func (r *MessageRouter) IsWrapperStep(stepID string) bool
func (r *MessageRouter) KnownSteps() []string
func (r *MessageRouter) Send(stepID string, msg RouterMessage) error
func (r *MessageRouter) Close(stepID string)
func (r *MessageRouter) MarkWorkflowCancelled()
func (r *MessageRouter) WorkflowCancelled() bool
// ...Routes RouterMessage values from coordinator/agents into the per-step MailboxStore. The orchestrator allocates one router per RunFlow / RunAgent call; integration tests can install an observer via WithRouterObserver to grab a handle on the per-call router without polling.
Lifecycle invariants:
SetMailbox(store)must be called before anySend. Without a mailbox,Sendreturns anunknown-stepdrop.RegisterInbox(stepID)marks a step as live soSendroutes deliveries into the mailbox.Close(stepID)marks the step terminal: subsequentSends emittarget-terminaldrops.
Send returns nil on accept and a typed *DropError on drop. The error's Error() method returns the canonical "dropped: <reason>" text, so substring-matching consumers (LLM tool results, log greppers) keep working unchanged. Routing-decision callers should use errors.As to extract the typed reason instead of parsing the string:
if err := router.Send(stepID, msg); err != nil {
var de *zenflow.DropError
if errors.As(err, &de) {
switch de.Reason {
case zenflow.DropReasonUnknownStep:
// append "valid step IDs: …" hint
case zenflow.DropReasonMailboxFull:
// back off + retry
}
}
}MailboxStore
type MailboxStore interface {
Append(stepID string, msg RouterMessage) (id string, err error)
Unread(stepID string) []RouterMessage
MarkRead(stepID string, ids []string) []string
Close(stepID string)
}Per-step inbox storage. Append assigns a stable MessageID and returns it; Unread returns all undrained messages; MarkRead is the CAS dedup contract - it returns the subset of IDs that were already marked read on a prior call so concurrent drainers can detect double-consume without holding a lock across LLM calls.
Close is the interface's terminal signal: it drops any pending messages and marks the step closed so subsequent Append calls are silently dropped and subsequent Unread calls return nil.
The default implementation is *InMemoryMailboxStore:
func NewInMemoryMailboxStore() *InMemoryMailboxStore
func (s *InMemoryMailboxStore) Seal(stepID string)Custom implementations (SQLite, Redis) are wired via WithMailboxStore(factory).
Seal is an *InMemoryMailboxStore-only extension (not part of the MailboxStore interface). It is the soft-close variant of Close: it marks the step terminal so subsequent Appends are silently dropped, but unlike Close it does NOT delete the existing queue. Unread continues to return any messages that were enqueued before the seal so the poller can finish draining. The executor uses this to flush in-flight messages before invoking the hard Close. Idempotent: re-sealing an already-sealed (or closed) step is a no-op.
MailboxLen(store, stepID) is a free function that returns (unread, total) counts when the store implements the optional Len method.
RouterMessage
type RouterMessage struct {
MessageID string
From string
To string
Content string
Type RouterMessageType
Metadata map[string]string
Timestamp time.Time
}A message between agents or coordinator.
MessageID- assigned byMailboxStore.Append(callers leave it empty). Stable identity for theMarkReadCAS contract.From,To- sender / target step IDs. The router is internal to the library, all senders are trusted -Fromis informational only, never authenticated.Type- classifies the message; seeRouterMessageTypebelow.Metadata- arbitrary string map. Reserved keyMetadataKeyResumeReverse("zenflow-resume-reverse") marks messages produced byExecutor.runResumeso a Send to a closed target does not cascade into a second resume.
RouterMessageType
type RouterMessageType int
const (
RouterMessageInfo RouterMessageType = iota
RouterMessageCancel
RouterMessageContextUpdate
RouterMessageResumeReply
)
func (t RouterMessageType) String() stringRouterMessageInfo- general informational message.RouterMessageCancel- request the receiving agent to stop.RouterMessageContextUpdate- inject new context into the agent's conversation.RouterMessageResumeReply- reverse-routed reply produced byExecutor.runResumeafter a resumed step finishes. Drain logic treats it the same asRouterMessageInfo(appended as a user turn) but observers can distinguish it via the type tag.
String() returns canonical lowercase forms ("info", "cancel", "context_update", "resume_reply").
DropEvent
type DropEvent struct {
StepID string
Msg RouterMessage
Reason DropReason
}Describes a single message discarded by the router without ever being appended to the target's mailbox. Surfaced through WithDropCallback and through EventMessageDropped events on the progress sink. See Errors for the full DropReason enum.
Delivery engine
The delivery engine watches the mailbox for unread messages on active steps and signals each step's wake channel so the runner re-enters its tool loop and drains the inbox. Orchestrator wires it inside Executor.Run; most consumers never touch the public types listed below.
The pieces are exported so tests and bespoke runtimes (e.g., a multi-process router that owns its own poll loop) can substitute their own active-step source, wake target, registry, or clock without forking the engine.
DeliveryEngine
type DeliveryEngine struct { /* internal fields */ }
func NewDeliveryEngine(
source EngineActiveStepsSource,
mailbox MailboxStore,
registry EngineWakeRegistry,
opts ...EngineOption,
) *DeliveryEngine
func (e *DeliveryEngine) TickInterval() time.Duration
func (e *DeliveryEngine) PollOne(stepID string)Per-run goroutine that polls each active step's mailbox on a tick and signals the step's wake channel when there are unread messages. None of source, mailbox, or registry may be nil - a nil dependency would silently no-op the corresponding role and mask wiring bugs. Default tick cadence is 500ms; override via WithEngineTickInterval.
PollOne(stepID) triggers a single poll cycle without spinning up the loop; tests use it to drive a specific step deterministically.
EngineOption
type EngineOption func(*DeliveryEngine)
func WithEngineTickInterval(d time.Duration) EngineOption
func WithEngineClock(c EngineClock) EngineOption
func WithStepLocker(l EngineStepLocker) EngineOptionFunctional-option type for NewDeliveryEngine. WithEngineTickInterval overrides the 500ms default; non-positive values revert to the default. WithEngineClock substitutes the tick source (production uses RealClock; tests pass a fake). WithStepLocker wires the per-step RWMutex acquirer required for the read-then-wake atomicity invariant - omit it in tests where the invariant does not matter.
EngineActiveStepsSource
type EngineActiveStepsSource interface {
ActiveSteps() []string
AgentState(stepID string) *goai.AgentState
}The minimal subset of *Executor the engine reads. Defined as an interface so tests can drop in a fake without standing up an Executor + Workflow + Storage. ActiveSteps returns a snapshot of step IDs currently executing; subsequent ticks call again. AgentState returns the runner's *goai.AgentState, or nil if the step has not been registered (or was unregistered after completion).
EngineWakeTarget
type EngineWakeTarget interface {
SignalWake()
}The per-step wake handle the engine signals when a mailbox has unread messages. SignalWake MUST be non-blocking: if the wake channel already has a pending signal, the call is a no-op (cap-1 buffer semantics).
EngineWakeRegistry
type EngineWakeRegistry interface {
WakeTarget(stepID string) EngineWakeTarget
}Lookup the engine uses to find the wake target for a given stepID. Returns nil when no target is registered (e.g., the step has not yet been admitted into the executor's mailbox path or was already unregistered at end-of-step).
ChanWakeTarget
type ChanWakeTarget struct { /* internal fields */ }
func NewChanWakeTarget(ch chan struct{}) EngineWakeTargetThe production EngineWakeTarget: it wraps a buffered chan struct{} of capacity 1 (matching the AgentRunner.Wake contract). SignalWake is non-blocking; a duplicate wake against an already-pending channel is dropped (a single wake suffices to flush the entire mailbox). The channel passed to NewChanWakeTarget MUST be buffered with cap >= 1; cap-0 would drop signals deterministically when the agent is mid-LLM-call.
MapWakeRegistry
type MapWakeRegistry struct { /* internal fields */ }
func NewWakeRegistry() *MapWakeRegistry
func (r *MapWakeRegistry) Register(stepID string, t EngineWakeTarget)
func (r *MapWakeRegistry) Unregister(stepID string)
func (r *MapWakeRegistry) WakeTarget(stepID string) EngineWakeTargetMinimal in-memory EngineWakeRegistry. The Executor populates it as steps start (Register) and clears entries as they end (Unregister); Register overwrites any prior entry, useful for retried steps that reallocate their wake channel. Safe for concurrent calls.
EngineClock and RealClock
type EngineClock interface {
Tick(d time.Duration) <-chan time.Time
Stop()
}
type RealClock struct { /* internal fields */ }Abstracts time.Tick so tests can drive ticks deterministically. RealClock wraps time.NewTicker and is the production implementation; tests pass a fake clock via WithEngineClock. Calling Tick twice on the same RealClock without an intervening Stop cancels the prior ticker so its goroutine and channel are GC'd; not safe for concurrent calls on the same RealClock.
EngineStepLocker
type EngineStepLocker interface {
AcquireStepLock(stepID string) *sync.RWMutex
}Optional interface the engine uses to acquire a per-step RWMutex for the "read-then-wake" atomicity invariant. The poll loop wraps each step's Observe + SignalWake sequence in RLock/RUnlock so a concurrent Run-return defer (which takes the write-lock and transitions to SetTerminal) cannot flip state between the read and the wake. Omit (or pass via WithStepLocker(nil)) in test contexts where a spurious wake against a freshly-terminated step is harmless.
LenAware
type LenAware interface {
Len(stepID string) (unread, total int)
}Optional MailboxStore extension that exposes the queue length. unread is the number of messages currently visible to a subsequent Unread call; total is unread + already-MarkRead'd count. Stores that do not retain MarkRead'd messages report total == unread. The free function MailboxLen(store, stepID) is the canonical entry point; it falls back to len(store.Unread(stepID)) when the store does not implement the interface.
ClosedAware
type ClosedAware interface {
Closed(stepID string) bool
}Optional MailboxStore extension implemented by stores that expose their per-step closed flag. The router uses it to surface "mailbox-closed-by-finalize" drops when a Close races a concurrent Send.
BoundedInMemoryStore
type BoundedInMemoryStore struct { /* internal fields */ }
func NewBoundedInMemoryStore(maxSize int) *BoundedInMemoryStore
func (b *BoundedInMemoryStore) MaxSize() intA MailboxStore that wraps InMemoryMailboxStore with a hard per-step cap on queued unread messages. When Append would exceed the cap, the new message is rejected and Append returns ("", ErrMailboxFull); the router maps that to DropReasonMailboxFull when emitting OnDrop.
Footgun: non-positive maxSize disables the cap
A maxSize of 0 or negative makes this store effectively unbounded - equivalent to NewInMemoryMailboxStore but with a wrapper that adds no protection. Callers that want a true cap MUST pass a positive value. If you want unbounded behaviour, prefer NewInMemoryMailboxStore directly so the intent is explicit at the call site.
MaxSize() reports the configured cap; tests use it to verify constructor wiring without reaching into unexported state.
Resume helpers
The resume mechanism (Executor.ResumeStep driven by the router when a closed step receives a new message) is built on three small types. Most embedders never construct them directly - the executor does. They are exported for integration tests and for bespoke runtimes that wire their own resume policy.
Resumer
type Resumer interface {
CanResume(stepID string) bool
ResumeStep(ctx context.Context, stepID, prompt, fromAgent string) (*ResumeHandle, error)
}The contract the MessageRouter uses to decide whether a Send to a closed step should kick off a resume rather than drop. Executor implements this and installs itself via MessageRouter.SetResumer at the start of Run. CanResume returns true only when the step has a saved transcript AND the executor is still willing to spawn (run not cancelled). ResumeStep loads the transcript, appends prompt as a fresh user turn, spawns a fresh AgentRunner, and returns a ResumeHandle. When the executor has no TranscriptStore (mailbox-delivery disabled), CanResume always returns false and the router falls back to the pre-resume target-terminal drop path.
ResumeHandle
type ResumeHandle struct {
StepID string
ResumeID string
OriginalSender string
DoneCh chan struct{}
Result string
Err error
}Per-resume control block returned by Resumer.ResumeStep. Block on DoneCh for completion - it is closed once the resume goroutine has finished, either after a successful final assistant response (Result populated, Err == nil) or on failure (Err non-nil). ResumeID is the per-invocation identifier emitted on EventResumeStarted / EventResumeCompleted / EventResumeFailed so operators can correlate the three events for the same resume. OriginalSender is the From field of the router message that triggered the resume; the resumed runner's final assistant response is routed back to this agent via a reverse RouterMessage (suppressed when empty).
MetadataKeyResumeReverse
const MetadataKeyResumeReverse = "zenflow-resume-reverse"Sentinel Metadata key the executor stamps on the reverse RouterMessage it sends after a resume completes. The router checks for this key when handling drops on the reverse path so a closed sender does NOT cascade into a second resume (an infinite-resume guard). Embedders subscribing to dropped messages can use this key to distinguish reverse-path drops from ordinary forward-path drops.
Persistence
Storage
type Storage interface {
SaveRun(ctx context.Context, run *Run) error
LoadRun(ctx context.Context, id string) (*Run, error)
SaveStepResult(ctx context.Context, runID, stepID string, result *StepResult) error
LoadStepResult(ctx context.Context, runID, stepID string) (*StepResult, error)
SaveSharedMemory(ctx context.Context, runID string, entries map[string]string) error
LoadSharedMemory(ctx context.Context, runID string) (map[string]string, error)
}Persists workflow state for ResumeFlow and WithSharedMemory. Save* are called during execution; Load* are used on resume.
Storage is the canonical contract every Executor backend implements; it is the composition of the three narrower role interfaces below. The split is purely structural - existing implementations satisfy Storage unchanged - so callers can depend on the narrowest slice they need.
RunStore
type RunStore interface {
SaveRun(ctx context.Context, run *Run) error
LoadRun(ctx context.Context, id string) (*Run, error)
}Narrow role interface: persists workflow Run records (lifecycle metadata, status, step roll-ups). Consumers that only need to load or save runs - e.g., a UI that lists historical runs, a test fake that does not care about per-step output - should depend on RunStore rather than the wider Storage.
StepResultStore
type StepResultStore interface {
SaveStepResult(ctx context.Context, runID, stepID string, result *StepResult) error
LoadStepResult(ctx context.Context, runID, stepID string) (*StepResult, error)
}Narrow role interface: persists per-step outputs (StepResult: status, output payload, error, timing). Useful for components that read step outputs without needing run-level state - e.g., a result-only consumer that streams completed step content into another system. Depend on StepResultStore rather than the wider Storage when run-level operations are not required.
SharedMemoryStore
type SharedMemoryStore interface {
SaveSharedMemory(ctx context.Context, runID string, entries map[string]string) error
LoadSharedMemory(ctx context.Context, runID string) (map[string]string, error)
}Narrow role interface: persists the per-run shared key/value scratchpad that steps and the coordinator use to pass facts between agents. Kept separate from run/step persistence so alternate memory backends (Redis, vector store) can satisfy just this slice without implementing run lifecycle or step-result storage. Pair with Storage - or a custom composition - when wiring WithSharedMemory.
Why three interfaces instead of one
The original Storage interface bundled all six methods, forcing test fakes and bespoke backends to stub out unrelated operations even when only one role was exercised. Splitting into RunStore + StepResultStore + SharedMemoryStore lets callers implement only the slice they need:
- A run-listing UI fakes
RunStoreonly. - A SQLite step-result archive implements
StepResultStoreand embeds an in-memoryRunStore+SharedMemoryStore. - A vector-store memory backend implements
SharedMemoryStoreand reuses the defaultMemoryStoragefor the other two roles.
Storage itself remains the canonical executor contract, and *MemoryStorage / *FileStorage continue to satisfy it unchanged - so existing call sites compile and behave identically.
Built-in implementations:
NewMemoryStorage() *MemoryStorage- in-process map. Default; lost when the orchestrator exits.NewFileStorage(baseDir string) *FileStorage- one JSON file per run/step underbaseDir. Survives process restarts; safe for cross-process resume when readers and writers agree on the directory. Empty baseDir does NOT panic at construction; the first write fails with an OS error fromos.MkdirAll. Validate the path before calling.
*MemoryStorage exposes one extension method beyond the interface:
func (m *MemoryStorage) DeleteRun(runID string)Removes the run record, every persisted step result for that run, and any shared-memory entries scoped to it. Idempotent: calling with an unknown runID is a no-op. Long-lived embedders (web servers, daemons) should call this once a workflow result has been consumed so the in-process map does not grow unbounded. The method is concrete-type only (not on the Storage interface) and takes no context.Context and returns no error.
TranscriptStore
type TranscriptStore interface {
Append(runID, stepID string, msgs []provider.Message) error
Load(runID, stepID string) (*StepTranscript, error)
Delete(runID, stepID string) error
}Persists per-step conversation history so a terminated step can be resumed with full prior context. The default implementation (InMemoryTranscriptStore) covers intra-run resume only; cross-run / cross-process resume requires a persistent backend wired via WithTranscriptStore.
Append returns ErrTranscriptTooLarge when the configured cap is exceeded - the messages are not appended and the caller surfaces DropReasonTranscriptTooLarge to the router.
Load returns ErrNoTranscript when no transcript exists, or ErrTranscriptTooLarge when a prior Append sealed the slot.
StepTranscript
type StepTranscript struct {
StepID string
RunID string
Messages []provider.Message
SystemPrompt string
Model string // "provider:model-id"
SavedAt time.Time
}The persisted conversation state. SystemPrompt and Model are captured at run start so a resume can reconstruct the exact invocation.
TranscriptTruncatedLoader
type TranscriptTruncatedLoader interface {
LoadTruncated(runID, stepID string, maxMessages int) (*StepTranscript, error)
}Optional extension. Stores that implement it can return a truncated tail of a sealed transcript; combined with WithTruncationOnCapReached(), this preserves operability at the cost of potentially-incomplete history.
MetadataSetter
type MetadataSetter interface {
SetMetadata(runID, stepID, systemPrompt, model string)
}Optional extension implemented by TranscriptStore backends that can persist per-step invocation metadata (system prompt and model ID). When the AgentRunner starts, it calls SetMetadata on the store if implemented so the resume path can reconstruct the exact invocation without a separate metadata table. InMemoryTranscriptStore implements this interface.
InMemoryTranscriptStore
type InMemoryTranscriptStore struct { /* internal fields */ }
func NewInMemoryTranscriptStore() *InMemoryTranscriptStore
func NewInMemoryTranscriptStoreWithCaps(maxMessages int, maxBytes int64) *InMemoryTranscriptStore
func NewInMemoryTranscriptStoreWithOptions(opts ...InMemoryTranscriptStoreOption) *InMemoryTranscriptStore
func WithTranscriptCaps(maxMessages int, maxBytes int64) InMemoryTranscriptStoreOption
func (s *InMemoryTranscriptStore) Append(...) error
func (s *InMemoryTranscriptStore) Load(...) (*StepTranscript, error)
func (s *InMemoryTranscriptStore) LoadTruncated(...) (*StepTranscript, error)
func (s *InMemoryTranscriptStore) Delete(...) error
func (s *InMemoryTranscriptStore) SetMetadata(runID, stepID, systemPrompt, model string)Default implementation. Caps default to 10000 messages, 50 MiB; override via the constructor or WithMaxTranscriptMessages / WithMaxTranscriptBytes. Implements both TranscriptStore and TranscriptTruncatedLoader.
NewInMemoryTranscriptStoreWithOptions(opts...)- constructs with functional options. Preferred overNewInMemoryTranscriptStoreWithCapsfor new code.WithTranscriptCaps(maxMessages, maxBytes)- sets entry/byte caps for the cap-based eviction policy. Pass toNewInMemoryTranscriptStoreWithOptions.
SharedMemory
type SharedMemory struct { /* internal fields */ }
func NewSharedMemory() *SharedMemory
func (sm *SharedMemory) Write(agent, key, value string)
func (sm *SharedMemory) Read(qualifiedKey string) (string, bool)
func (sm *SharedMemory) ListByAgent(agent string) map[string]string
func (sm *SharedMemory) Summary() string
func (sm *SharedMemory) Entries() map[string]string
func (sm *SharedMemory) LoadEntries(entries map[string]string)Cross-step key-value store. Each entry is namespaced by writer (e.g., "researcher.findings"). The shared_memory_read and shared_memory_write tools auto-inject into agent tool chains when WithSharedMemory is set.
Summary() returns a human-readable list of all entries; agents can read it via the tool to see what's available.
LoadEntries is used by ResumeFlow to restore persisted shared memory entries after a process restart.
FactoryCache
type FactoryCache struct { /* internal fields */ }
func NewFactoryCache(inner func(sessionID string) *Orchestrator) *FactoryCache
func (c *FactoryCache) For(sessionID string) *Orchestrator
func (c *FactoryCache) Close(sessionID string) error
func (c *FactoryCache) CloseAll()Session-scoped *Orchestrator cache with TTL-based eviction. Memoizes the orchestrator per sessionID so repeated calls for the same session reuse a single instance rather than allocating fresh goroutine trees (handle registry, TTL watchdogs) on every invocation.
NewFactoryCache(inner)- wraps the supplied constructor. Panics ifinner == nil.For(sessionID)- returns the cached orchestrator forsessionID, building it viainneron the first call. Returnsnilifinnerexceeds the construction timeout.Close(sessionID)- callsOrchestrator.Close()on the cached instance and removes it from the cache. Idempotent.CloseAll()- closes every cached instance; call on server shutdown.
Hooks and policies
ProgressSink
type ProgressSink interface {
OnEvent(ctx context.Context, event Event)
OnOutput(ctx context.Context, output Output)
}Receives execution events. OnEvent is fired for every lifecycle transition (workflow start/end, step start/end, agent turn, tool call, error, etc.); OnOutput is fired per streaming token when streaming is on.
Built-in implementations live under github.com/zendev-sh/zenflow/sink:
sink.NewStdoutSink()/sink.NewStdoutSinkTo(w)- human-readable progress with glyphs and colors.sink.NewJSONSink()/sink.NewJSONSinkTo(w)- NDJSON, one event per line. The CLI's--jsonmode.sink.Buffered(wrapped, window)- debounces high-frequency output deltas into one event per window.
Event
type Event struct {
Type EventType
Timestamp time.Time
RunID string
StepID string
AgentName string
Data map[string]any
Duration time.Duration
Tokens *provider.Usage
Error error
Message string
MessageKind string // "notification" | "content"
Subject string // optional short tag
Context string // populated only on workflow_start when WithFlowContext was set
}EventType
The full event type enum (every constant):
const (
EventWorkflowStart EventType = "workflow_start"
EventWorkflowEnd EventType = "workflow_end"
EventStepStart EventType = "step_start"
EventStepEnd EventType = "step_end"
EventStepSkipped EventType = "step_skipped"
EventAgentTurn EventType = "agent_turn"
EventToolCall EventType = "tool_call"
EventMessage EventType = "message"
EventError EventType = "error"
EventCoordinatorNarration EventType = "coordinator_narration"
EventCoordinatorMessage EventType = "coordinator_message"
EventCoordinatorSynthesis EventType = "coordinator_synthesis"
EventCoordinatorInboxMessage EventType = "coordinator_inbox_message"
EventMessageSent EventType = "message_sent"
EventPlanReady EventType = "plan_ready"
EventAgentInboxDrain EventType = "agent_inbox_drain"
EventMessageDropped EventType = "message_dropped"
EventAgentIdle EventType = "agent_idle"
EventAgentWake EventType = "agent_wake"
EventMaxWakeCyclesWarning EventType = "max_wake_cycles_warning"
EventResumeStarted EventType = "resume_started"
EventResumeCompleted EventType = "resume_completed"
EventResumeFailed EventType = "resume_failed"
EventResumeQueued EventType = "resume_queued"
EventTranscriptSealed EventType = "transcript_sealed"
)MessageKind constants:
const (
MessageKindNotification = "notification"
MessageKindContent = "content"
)Output
type Output struct {
RunID string
StepID string
AgentID string
Delta string
Done bool
Reasoning bool // true when delta is a reasoning/thinking token
}Streaming output delta. Sinks coalesce these into displayable text.
PermissionHandler, PermissionRequest
type PermissionHandler interface {
RequestPermission(ctx context.Context, req PermissionRequest) (bool, error)
}
type PermissionRequest struct {
RunID string
StepID string
ToolName string
ToolArgs json.RawMessage
}Gates tool execution. Return false to deny (the goai loop surfaces the rejection); return an error to abort the entire step. CLI consumers typically prompt the user; library consumers can implement static policies (deny destructive tools) or audit-only logging (always permit).
ApprovalHandler
type ApprovalHandler interface {
ApprovePlan(ctx context.Context, plan *Workflow) (bool, error)
}Gates RunGoal plan execution. After the LLM produces a workflow plan, the handler is asked to approve it. Returning false aborts the run; returning an error wraps it as fmt.Errorf("approval: %w", err).
Tracer
type Tracer interface {
StartSpan(ctx context.Context, name string, attrs map[string]string) context.Context
EndSpan(ctx context.Context, err error)
}Tracing bridge. zenflow has no OTel dependency in core; the zenflow/observability/otel sub-module implements Tracer against the OTel SDK.
Span name conventions: zenflow.flow, zenflow.goal, zenflow.agent, zenflow.step, zenflow.coordinator, zenflow.loop, zenflow.loop.iteration, zenflow.include.
StepIsolation
type StepIsolation interface {
Setup(ctx context.Context, runID, stepID string) (workDir string, err error)
Cleanup(ctx context.Context, runID, stepID string) error
}Per-step environment isolation. Setup is called before each step; the returned workDir is the step's working directory. Cleanup is deferred after the step completes.
NopIsolation{} is the no-op implementation; pass it via WithIsolation(zenflow.NopIsolation{}) if you want explicit "no isolation" wiring. The orchestrator default is nil, which the executor treats as "skip setup/cleanup."
OutputTransformer
type OutputTransformer interface {
TransformStepOutput(
stepID string,
content string,
result map[string]any,
targetModel string,
) (string, map[string]any)
}Transforms a completed step's content and result before injection into a dependent step's prompt. Use this to implement smart truncation based on the target model's context window.
ModelResolver
type ModelResolver func(modelID string) (provider.LanguageModel, error)Resolves a saved-transcript model identifier (as stored in StepTranscript.Model, e.g. "openai:gpt-4o-mini") to a concrete provider.LanguageModel. Consulted by the resume path when a transcript's model differs from the executor's default.
Returning (nil, nil) is treated as "not resolvable" - the resume fails with ErrModelResolverMissing.
Sink helpers
In github.com/zendev-sh/zenflow/sink:
StdoutSink
type StdoutSink struct { /* internal */ }
type StdoutSinkOption func(*StdoutSink)
func WithStdoutShowPlan(v bool) StdoutSinkOption // enable DAG rendering on EventPlanReady
func WithStdoutVerbose(v bool) StdoutSinkOption // enable agent text and reasoning content
func NewStdoutSink(opts ...StdoutSinkOption) *StdoutSink
func NewStdoutSinkTo(w io.Writer, opts ...StdoutSinkOption) *StdoutSink
// Deprecated: use WithStdoutShowPlan option in NewStdoutSink. Will be removed before v1.0.
func (s *StdoutSink) WithShowPlan(enabled bool) *StdoutSink
// Deprecated: use WithStdoutVerbose option in NewStdoutSink. Will be removed before v1.0.
func (s *StdoutSink) WithVerbose(enabled bool) *StdoutSinkHuman-readable progress: triangle/check glyphs for step start/end, dashed lines for coordinator narration, colored status badges. Pass WithStdoutShowPlan(true) to render the DAG on EventPlanReady; pass WithStdoutVerbose(true) to include agent text and reasoning content.
SetColorEnabled(v bool) in the sink package overrides the auto-detected color setting. Intended for tests; production code should let the sink detect terminal capabilities automatically.
JSONSink
type JSONSink struct { /* internal */ }
func NewJSONSink() *JSONSink // -> os.Stdout
func NewJSONSinkTo(w io.Writer) *JSONSink
func (s *JSONSink) Err() error // first encode errorNDJSON output. One JSON object per line, additive contract (new events / fields don't break old parsers). Backs the CLI's --json mode.
BufferedSink
type BufferedSink struct { /* internal */ }
func Buffered(wrapped ProgressSink, window time.Duration) ProgressSink
func (b *BufferedSink) Close() errorWraps a sink with a coalescing window: high-frequency OnOutput deltas within window are merged into one event. Lifecycle events flush immediately. Use this when wrapping a slow downstream sink (database writer, network logger) to avoid back-pressure on the agent loop.
IsLifecycleEvent(e Event) bool is a free helper that classifies an event as lifecycle (flush-immediate) or stream (coalescable).
Sentinel errors
Surfaced through error returns from various methods; documented in detail in Errors:
ErrOrchestratorClosed-RunAgent/RunAgentAsynccalled on a closed orchestrator.ErrAgentHandleTimeout,ErrAgentCancelled,ErrAgentPanicked- async handle terminal states.ErrNoTranscript,ErrTranscriptTooLarge- transcript store errors.ErrResumeShutdown,ErrModelResolverMissing,ErrModelResolverError,ErrMailboxFullOnResume- resume path errors.