Programmatic Workflow Agents

Who this is for: developers building deterministic controller agents that should orchestrate child agents without spending a parent LLM turn on each routing step.

When to use this pattern

Use a programmatic workflow agent when the top-level control flow is deterministic and code-driven:

Do not use this pattern just because Python is available. If the parent agent still needs genuine model reasoning about what to do next, keep the normal decision loop.

Core API

The public workflow surface is agent-owned:

The important design choice is parity with native orchestration. Programmatic steps delegate back into framework-owned subagent execution, so the parent run still emits the same kinds of artifacts as model-driven call_subagent / call_subagents.

What parity means in practice

When a programmatic workflow step delegates to children, the parent run still gets:

That is the reason to use Agent.execute_programmatic_workflow(...) instead of calling host.call_subagent(...) directly from behavior code.

Basic pattern

The supported first-iteration pattern is:

  1. Attach a Python AgentBehavior.
  2. In before_run(...), inspect run.parameter_values.
  3. Build a ProgrammaticWorkflow.
  4. Call agent.execute_programmatic_workflow(...).
  5. Return AgentHookDecision(final_result=...) so the parent skips the normal LLM loop.

Example:

from agent_framework import (
    AgentBehavior,
    AgentHookDecision,
    AgentResult,
    ProgrammaticWorkflow,
    SubagentCallSpec,
    WorkflowBranchStep,
    WorkflowCallSubagentStep,
    WorkflowCallSubagentsStep,
    WorkflowReturnStep,
)


class DeckReviewWorkflowBehavior(AgentBehavior):
    def attach(self, agent):
        return None

    def before_run(self, agent, host, *, run, caller_id):
        workflow = ProgrammaticWorkflow(
            entry_step="maybe_intake",
            steps={
                "maybe_intake": WorkflowBranchStep(
                    step_id="maybe_intake",
                    condition=lambda state: bool(run.parameter_values.get("intake_complete")),
                    then_step="review_axes",
                    else_step="run_intake",
                ),
                "run_intake": WorkflowCallSubagentStep(
                    step_id="run_intake",
                    subagent_id="deck_review_intake",
                    parameters=lambda state: {
                        "deck": run.parameter_values["deck"],
                        "intake": run.parameter_values.get("intake", ""),
                    },
                    next_step="review_axes",
                ),
                "review_axes": WorkflowCallSubagentsStep(
                    step_id="review_axes",
                    calls=lambda state: (
                        SubagentCallSpec("axis_audience", {"deck": run.parameter_values["deck"]}, "audience"),
                        SubagentCallSpec("axis_design", {"deck": run.parameter_values["deck"]}, "design"),
                    ),
                    mode="parallel",
                    next_step="finish",
                ),
                "finish": WorkflowReturnStep(
                    step_id="finish",
                    value=lambda state: AgentResult(
                        status="completed",
                        message=str(state.require_step_result("review_axes")),
                    ),
                ),
            },
        )
        result = agent.execute_programmatic_workflow(
            host=host,
            run=run,
            caller_id=caller_id,
            workflow=workflow,
        )
        return AgentHookDecision(final_result=result)

Step model

WorkflowCallSubagentStep

Use for one child call.

WorkflowCallSubagentsStep

Use for a native batch call.

WorkflowBranchStep

Use for deterministic branching.

WorkflowReturnStep

Use to finish the workflow.

Allowed return values:

Strings and None are normalized into AgentResult(status="completed", ...).

WorkflowRaiseStep

Use to fail fast with a specific exception or message.

Using workflow state

ProgrammaticWorkflowState stores:

Use state.require_step_result("step_id") when later steps need earlier outputs.

Current limits

The first iteration is intentionally small:

The supported pattern today is deterministic Python orchestration with native runtime parity, not a full workflow engine.