Scripting
zenflow's CLI is designed to drop into shell pipelines and child-process invocations. The two surfaces that matter for scripting are:
- Exit code.
0on success, non-zero on failure (see errors reference). --jsonflag. Switches stdout from human-readable progress to NDJSON: one JSON event per line.
This page covers bash, Node.js, and Python patterns for invoking zenflow as a subprocess.
Bash
The simplest case is "run the workflow, do something with the events":
#!/usr/bin/env bash
set -euo pipefail
# Run the workflow and pipe NDJSON through jq, line by line.
zenflow flow workflow.yaml --json | \
jq -c '. | select(.type == "step_end" or .type == "error")' | \
while IFS= read -r line; do
type=$(echo "$line" | jq -r '.type')
step=$(echo "$line" | jq -r '.stepId')
if [[ "$type" == "error" ]]; then
err=$(echo "$line" | jq -r '.error')
echo "[FAILED] $step - $err"
else
dur=$(echo "$line" | jq -r '.duration')
echo "[done] $step - $dur"
fi
done
# Capture the exit code of zenflow (not jq).
echo "Exit code: ${PIPESTATUS[0]}"Two things to call out:
PIPESTATUS[0]captures zenflow's exit code, not the last command in the pipeline. Without this,set -eplus a piped jq will hide a workflow failure because jq itself exited 0.set -euo pipefailmakes the script fail loudly on the first error.pipefailin particular ensures a zenflow failure propagates through the pipe instead of being masked by jq's success.
If you need both stdout (events) and stderr (warnings/diagnostics), capture them separately:
zenflow flow workflow.yaml --json \
> >(tee events.ndjson) \
2> >(tee errors.log >&2)
zf_exit=$?
echo "zenflow exited $zf_exit"
if [[ $zf_exit -ne 0 ]]; then
echo "Last 20 errors:"
tail -n 20 errors.log
exit "$zf_exit"
fiProcess substitution (> >(...)) is bash-specific - if you need POSIX-portable, redirect to files instead:
zenflow flow workflow.yaml --json > events.ndjson 2> errors.log
zf_exit=$?Filtering for specific events
The full event catalog is documented in Output Formats. The most useful selectors:
# Just step completions
jq 'select(.type == "step_end")' events.ndjson
# Failed steps only (failure is reported as a separate "error" event,
# not as a status field on step_end).
jq 'select(.type == "error")' events.ndjson
# Tokens used per step
jq -r 'select(.type == "step_end") |
"\(.stepId)\t\(.tokens.TotalTokens // 0)"' events.ndjson
# The final summary (when a coordinator is running)
jq -r 'select(.type == "coordinator_synthesis") | .message' events.ndjson
# Any dropped messages (diagnostic - usually a bug or misconfig)
jq 'select(.type == "message_dropped")' events.ndjsonNode.js
Use child_process.spawn for streaming, child_process.spawnSync for one-shot:
import { spawn } from 'node:child_process';
import readline from 'node:readline';
function runWorkflow(yamlPath) {
return new Promise((resolve, reject) => {
const child = spawn('zenflow', ['flow', yamlPath, '--json'], {
stdio: ['ignore', 'pipe', 'pipe'],
env: process.env, // forward GEMINI_API_KEY etc.
});
const events = [];
const errLines = [];
// Parse stdout line-by-line as NDJSON.
const rl = readline.createInterface({ input: child.stdout });
rl.on('line', (line) => {
if (!line.trim()) return;
try {
const evt = JSON.parse(line);
events.push(evt);
if (evt.type === 'step_end') {
console.log(`${evt.stepId} - ${evt.duration}`);
} else if (evt.type === 'error') {
console.log(`FAILED: ${evt.stepId} - ${evt.error}`);
}
} catch (e) {
console.warn('non-JSON line on stdout:', line);
}
});
// Capture stderr separately - zenflow uses it for warnings,
// not for the structured event stream.
child.stderr.on('data', (chunk) => {
errLines.push(chunk.toString());
});
child.on('close', (code, signal) => {
const result = {
exitCode: code,
signal,
events,
stderr: errLines.join(''),
};
if (code === 0) resolve(result);
else reject(Object.assign(new Error(`zenflow exit ${code}`), result));
});
child.on('error', reject); // spawn-level failure (binary not found)
});
}
try {
const { events } = await runWorkflow('./workflow.yaml');
const final = events.find((e) => e.type === 'workflow_end');
console.log('done:', final);
} catch (err) {
console.error('zenflow failed:', err.exitCode, err.stderr);
process.exit(err.exitCode ?? 1);
}Key choices:
spawnnotexec.execbuffers all output in memory before resolving, which kills you on long-running workflows that produce thousands of events.readlinefor NDJSON. Each line is a complete JSON object. Don't try toJSON.parsethe entire stdout buffer - it's not valid JSON, it's newline-delimited.stdio: ['ignore', 'pipe', 'pipe']. Closes stdin (zenflow doesn't read from it), pipes stdout and stderr separately so you can interleave them in your own logs.- Forward
process.env. Withoutenv: process.env, the child inherits an empty environment and provider keys go missing.
If you don't need streaming (small workflows, batch jobs):
import { spawnSync } from 'node:child_process';
const result = spawnSync('zenflow', ['flow', 'workflow.yaml', '--json'], {
encoding: 'utf-8',
});
if (result.status !== 0) {
throw new Error(`zenflow exit ${result.status}: ${result.stderr}`);
}
const events = result.stdout
.split('\n')
.filter(Boolean)
.map((line) => JSON.parse(line));
console.log(`${events.length} events`);spawnSync blocks until the binary exits and gives you everything at once. Fine for small workflows, do not use for anything multi-minute.
Python
#!/usr/bin/env python3
"""Run a zenflow workflow and process its event stream."""
import json
import subprocess
import sys
from pathlib import Path
def run_workflow(yaml_path: Path) -> tuple[int, list[dict], str]:
"""Run zenflow on yaml_path. Returns (exit_code, events, stderr)."""
proc = subprocess.Popen(
["zenflow", "flow", str(yaml_path), "--json"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # line-buffered
)
events: list[dict] = []
assert proc.stdout is not None
try:
for line in proc.stdout:
line = line.strip()
if not line:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
print(f"warn: non-JSON stdout line: {line!r}", file=sys.stderr)
continue
events.append(evt)
if evt.get("type") == "step_end":
print(f"{evt.get('stepId')} - {evt.get('duration')}")
elif evt.get("type") == "error":
print(f"FAILED: {evt.get('stepId')} - {evt.get('error')}")
finally:
proc.stdout.close()
# stderr is captured all at once after the child exits.
stderr = proc.stderr.read() if proc.stderr else ""
if proc.stderr:
proc.stderr.close()
exit_code = proc.wait()
return exit_code, events, stderr
def main() -> int:
yaml = Path("workflow.yaml")
if not yaml.exists():
print(f"missing {yaml}", file=sys.stderr)
return 2
code, events, stderr = run_workflow(yaml)
if code != 0:
print(f"zenflow exited {code}", file=sys.stderr)
if stderr:
print(stderr, file=sys.stderr)
return code
end = next((e for e in events if e.get("type") == "workflow_end"), None)
if end:
print(f"done: duration={end.get('duration')}")
return 0
if __name__ == "__main__":
sys.exit(main())If you don't need streaming, subprocess.run is the simpler call:
import json
import subprocess
result = subprocess.run(
["zenflow", "flow", "workflow.yaml", "--json"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise RuntimeError(
f"zenflow exit {result.returncode}: {result.stderr}"
)
events = [json.loads(line) for line in result.stdout.splitlines() if line.strip()]check=False lets you read result.stderr on failure. With check=True, the raised CalledProcessError does carry stderr but the flow is uglier.
Reliability patterns
A handful of things worth doing in any of the three languages:
- Set a timeout at the language level too. zenflow's
--timeoutis one safety net; an OS-level timeout on the subprocess is another (spawnSynctimeoutoption in Node,subprocess.run(..., timeout=N)in Python). If zenflow itself wedges (rare), the parent script can still recover. - Don't trust stdout to be NDJSON only. A misconfigured environment can occasionally produce a non-JSON line (e.g., a Go runtime panic). Skip lines that don't parse instead of crashing the whole script.
- Log stderr separately. zenflow uses stderr for warnings and diagnostics. Mixing it into stdout breaks NDJSON parsing.
- Buffer line by line, not chunk by chunk. All three examples above use line-oriented parsing. Reading a 64K chunk and splitting on
\nis fine for batch use, but breaks for interactive cases where you want each event delivered as it arrives. - Handle the
124watchdog exit specifically. If your script orchestrates retries, treat124(timeout) different from1(workflow failure) - retrying a timeout often makes sense, retrying a deterministic step failure usually doesn't. - Pin the binary path.
which zenflowat script start, then use the absolute path inspawn/Popen. Saves you from$PATHsurprises across CI environments.
When to use the Go API instead
If your script is itself a Go program, skip the subprocess dance entirely - import zenflow directly:
import "github.com/zendev-sh/zenflow"
orch := zenflow.New(
zenflow.WithModel(model),
zenflow.WithProgress(myCustomSink), // structured access to the same events
)
defer orch.Close()
result, err := orch.RunFlow(ctx, wf)You get the same events without NDJSON serialize/parse overhead, and you can build custom ProgressSink implementations that route events into your own structures. Full reference: Go API.