writing/tutorial/2026/06
TutorialJun 17, 2026·28 min read

Microsoft Agent Framework: Build Multi-Agent Workflows in Python

Learn how to build production AI agents and multi-agent workflows with Microsoft Agent Framework — the unification of Semantic Kernel and AutoGen. Covers agents, tools, sessions, orchestration, the functional and graph workflow APIs, and human-in-the-loop.

Microsoft spent 2025 running two parallel agent stories: Semantic Kernel, the enterprise-grade SDK with telemetry, middleware, and type safety, and AutoGen, the research project famous for its lightweight multi-agent abstractions. In late 2025 the two teams merged their efforts into a single library — Microsoft Agent Framework — and in early 2026 it reached its 1.0 release for both .NET and Python.

This tutorial walks you through the Python SDK end to end. You will build a customer support triage system: a set of agents that classify an incoming ticket, draft a reply using real tools, route hard cases to a human, and pass everything through a quality gate before it ships. Along the way you will learn the two workflow APIs the framework ships with and when to reach for each.

A note on the 2026 API. The 1.0 release simplified the core types: ChatAgent became Agent, ChatMessage became Message, run_stream() folded into run(..., stream=True), and the @ai_function decorator became @tool. This tutorial uses the new names throughout. If you are reading older blog posts or sample repos, expect the longer Chat* names.

Prerequisites

Before starting, ensure you have:

  • Python 3.10+ installed (python --version)
  • An OpenAI API key (or an Azure OpenAI / Microsoft Foundry endpoint)
  • Basic familiarity with async/await in Python
  • A code editor such as VS Code

We will use OpenAI directly because it is the fastest path to a running agent. Everything in this tutorial maps cleanly to Azure OpenAI and Microsoft Foundry by swapping the client class.

What You'll Build

A multi-stage support pipeline composed of small, single-purpose agents:

  1. A triage agent classifies a ticket into a category and urgency.
  2. A support agent drafts a reply, calling tools to look up order status and refund policy.
  3. A human-in-the-loop gate pauses for approval when the urgency is high.
  4. A QA agent reviews the final draft for tone and accuracy before it is sent.

You will implement this first with plain agent calls, then with the functional workflow API (@workflow / @step), and finally with the graph workflow API (WorkflowBuilder + executors + edges) so you understand the trade-offs.

Step 1: Project Setup

Create a project folder and a virtual environment, then install the framework.

mkdir support-agents && cd support-agents
python -m venv .venv
source .venv/bin/activate   # on Windows: .venv\Scripts\activate
 
pip install agent-framework python-dotenv

The umbrella agent-framework package pulls in OpenAI and Azure OpenAI support by default. If you want a leaner install you can use pip install agent-framework-core instead.

Create a .env file for your credentials. Agent Framework does not auto-load .env files, so we will call load_dotenv() explicitly.

# .env
OPENAI_API_KEY=sk-...
OPENAI_CHAT_MODEL=gpt-4o-mini

Tip: The framework reads OPENAI_CHAT_MODEL (and OPENAI_MODEL) from the environment. Note the 2026 standardization: the parameter is now model, never model_id.

Step 2: Your First Agent

Create hello.py. An agent is the combination of a chat client (the model connection) and instructions (the system prompt).

# hello.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
 
load_dotenv()
 
async def main() -> None:
    agent = Agent(
        client=OpenAIChatClient(),
        name="HelloAgent",
        instructions="You are a friendly support assistant. Keep answers brief.",
    )
 
    result = await agent.run("A customer asks: where is my order?")
    print(result)
 
if __name__ == "__main__":
    asyncio.run(main())

Run it:

python hello.py

The agent.run() call returns a response object whose string representation is the model's text. To receive tokens as they are generated, pass stream=True:

print("Agent: ", end="", flush=True)
async for chunk in agent.run("Give me a one-sentence apology template.", stream=True):
    if chunk.text:
        print(chunk.text, end="", flush=True)
print()

The same Agent class works against any provider. To target Azure OpenAI, swap the import for AzureOpenAIChatClient from agent_framework.azure; to target Microsoft Foundry, use FoundryChatClient. The agent code above does not change.

Step 3: Give the Agent Tools

Agents become useful when they can act. In Agent Framework, a tool is just a Python function with a docstring and typed parameters — the framework generates the JSON schema and handles the call loop for you.

Create tools.py:

# tools.py
from typing import Annotated
from pydantic import Field
 
def get_order_status(
    order_id: Annotated[str, Field(description="The customer's order ID, e.g. 'A-1042'")]
) -> str:
    """Look up the shipping status of an order by its ID."""
    # In production this would hit your orders database or API.
    fake_db = {
        "A-1042": "Shipped on 2026-06-14, arriving 2026-06-19.",
        "A-1099": "Processing — not yet shipped.",
    }
    return fake_db.get(order_id, "No order found with that ID.")
 
def get_refund_policy(
    region: Annotated[str, Field(description="Customer region: 'EU', 'US', or 'MENA'")]
) -> str:
    """Return the refund window and conditions for a region."""
    policies = {
        "EU": "14-day no-questions-asked returns under EU consumer law.",
        "US": "30-day returns with receipt.",
        "MENA": "14-day returns for unused items in original packaging.",
    }
    return policies.get(region, "Standard 14-day return policy applies.")

Now register the tools by passing them to the agent. The model decides when to call them.

# support_agent.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
 
from tools import get_order_status, get_refund_policy
 
load_dotenv()
 
async def main() -> None:
    agent = Agent(
        client=OpenAIChatClient(),
        name="SupportAgent",
        instructions=(
            "You are a support agent. Use the tools to look up real order "
            "and policy data before answering. Never invent order details."
        ),
        tools=[get_order_status, get_refund_policy],
    )
 
    result = await agent.run(
        "Hi, where is order A-1042, and can I return it? I'm in the EU."
    )
    print(result)
 
if __name__ == "__main__":
    asyncio.run(main())

The agent will call get_order_status("A-1042") and get_refund_policy("EU"), then weave both results into a single grounded reply. You did not write any call-handling code — the framework runs the tool loop automatically.

Decorator alternative. For tools that need configuration or richer metadata, decorate the function with @tool (imported from agent_framework). A plain function passed in the tools list is treated as a tool implicitly, so the decorator is optional for simple cases.

Step 4: Multi-Turn Memory with Sessions

Each agent.run() call is stateless by default. To hold a conversation, create a session (formerly called a "thread") and pass it on every turn.

# session_demo.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
from tools import get_order_status
 
load_dotenv()
 
async def main() -> None:
    agent = Agent(
        client=OpenAIChatClient(),
        name="SupportAgent",
        instructions="You are a concise support agent.",
        tools=[get_order_status],
    )
 
    session = agent.create_session()
 
    r1 = await agent.run("Where is order A-1042?", session=session)
    print("Turn 1:", r1)
 
    # The follow-up has no order ID — the session carries the context.
    r2 = await agent.run("And will it arrive before the weekend?", session=session)
    print("Turn 2:", r2)
 
if __name__ == "__main__":
    asyncio.run(main())

Two renames to remember from the 2026 release: get_new_thread() became create_session(), and the thread= parameter became session=. If no history provider is configured, the framework auto-injects an InMemoryHistoryProvider, so conversations work out of the box.

Step 5: Orchestrate Multiple Agents

Real systems use several specialists rather than one do-everything agent. The simplest orchestration is sequential: agent A's output feeds agent B. You can do this by hand — agents are composable Python objects.

# pipeline_manual.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
from tools import get_order_status, get_refund_policy
 
load_dotenv()
 
def make_agent(name: str, instructions: str, tools=None) -> Agent:
    return Agent(
        client=OpenAIChatClient(),
        name=name,
        instructions=instructions,
        tools=tools or [],
    )
 
async def main() -> None:
    triage = make_agent(
        "Triage",
        "Classify the ticket. Reply with exactly: 'CATEGORY: <billing|shipping|other> | "
        "URGENCY: <low|high>'. Nothing else.",
    )
    support = make_agent(
        "Support",
        "Draft a warm, accurate reply. Use tools for real data.",
        tools=[get_order_status, get_refund_policy],
    )
    qa = make_agent(
        "QA",
        "You review support drafts. If the tone is professional and the facts are "
        "grounded, reply with the draft unchanged. Otherwise rewrite it.",
    )
 
    ticket = "URGENT: order A-1099 still not here and I leave the country tomorrow! EU customer."
 
    label = await triage.run(ticket)
    print("Triage:", label)
 
    draft = await support.run(f"Ticket: {ticket}\nClassification: {label}")
    print("Draft:", draft)
 
    final = await qa.run(f"Review this draft:\n{draft}")
    print("Final:", final)
 
if __name__ == "__main__":
    asyncio.run(main())

This works, but the control flow, error handling, and observability are all on you. That is exactly what the workflow APIs solve.

Step 6: The Functional Workflow API

The functional API lets you express a workflow as a plain async function decorated with @workflow, where each stage is a @step. You keep native Python control flow — if/else, loops, asyncio.gather — while gaining per-step events, streaming, checkpointing, and human-in-the-loop support.

# workflow_functional.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import Agent, workflow, step
from agent_framework.openai import OpenAIChatClient
from tools import get_order_status, get_refund_policy
 
load_dotenv()
 
client = OpenAIChatClient()
 
triage = Agent(client=client, name="Triage",
               instructions="Reply 'URGENCY: high' or 'URGENCY: low' and one category word.")
support = Agent(client=client, name="Support",
               instructions="Draft an accurate, friendly reply. Use tools for real data.",
               tools=[get_order_status, get_refund_policy])
qa = Agent(client=client, name="QA",
           instructions="Approve or rewrite the draft for tone and factual grounding.")
 
@step
async def classify(ticket: str) -> dict:
    label = str(await triage.run(ticket))
    return {"ticket": ticket, "label": label, "high": "high" in label.lower()}
 
@step
async def draft_reply(state: dict) -> dict:
    draft = str(await support.run(f"Ticket: {state['ticket']}\nLabel: {state['label']}"))
    state["draft"] = draft
    return state
 
@step
async def review(state: dict) -> str:
    return str(await qa.run(f"Review and finalize:\n{state['draft']}"))
 
@workflow
async def support_pipeline(ticket: str, ctx) -> str:
    state = await classify(ticket)
 
    # Native Python branching: escalate urgent tickets to a human.
    if state["high"]:
        decision = await ctx.request_info(
            f"High-urgency ticket needs approval before auto-reply:\n{state['label']}"
        )
        if str(decision).strip().lower().startswith("no"):
            return "Escalated to a human agent. No automated reply sent."
 
    state = await draft_reply(state)
    return await review(state)
 
async def main() -> None:
    result = await support_pipeline.run(
        "URGENT: order A-1099 not delivered, leaving tomorrow! EU."
    )
    print(result)
 
if __name__ == "__main__":
    asyncio.run(main())

A few things to notice:

  • ctx.request_info(...) is the human-in-the-loop primitive. In an interactive run it pauses the workflow and surfaces a request event your application answers; in automated runs you wire a responder.
  • Each @step produces its own event, so you get fine-grained observability for free.
  • Because it is ordinary Python, you can use asyncio.gather to run independent steps in parallel — for example, classifying and pulling account history at the same time.

The functional API is the recommended starting point. Reach for the graph API when you need strict, type-validated message routing between many executors.

Step 7: The Graph Workflow API

The graph API models the workflow as a directed graph of executors connected by edges. It shines for fixed topologies, fan-out/fan-in parallelism, and superstep-based checkpointing. You build it with WorkflowBuilder.

# workflow_graph.py
import asyncio
from dotenv import load_dotenv
 
from agent_framework import (
    Agent, WorkflowBuilder, executor, WorkflowContext,
)
from agent_framework.openai import OpenAIChatClient
from tools import get_order_status, get_refund_policy
 
load_dotenv()
 
client = OpenAIChatClient()
 
support = Agent(client=client, name="Support",
                instructions="Draft an accurate, friendly reply. Use tools for real data.",
                tools=[get_order_status, get_refund_policy])
qa = Agent(client=client, name="QA",
           instructions="Finalize the draft for tone and accuracy.")
 
@executor(id="classify")
async def classify(ticket: str, ctx: WorkflowContext[str]) -> None:
    label = str(await Agent(
        client=client, name="Triage",
        instructions="Reply with one category word and 'high' or 'low' urgency.",
    ).run(ticket))
    # Stash the original ticket in shared state for later executors.
    ctx.set_state("ticket", ticket)
    await ctx.send_message(label)
 
@executor(id="draft")
async def draft(label: str, ctx: WorkflowContext[str]) -> None:
    ticket = ctx.get_state("ticket")
    reply = str(await support.run(f"Ticket: {ticket}\nLabel: {label}"))
    await ctx.send_message(reply)
 
@executor(id="review")
async def review(reply: str, ctx: WorkflowContext) -> None:
    final = str(await qa.run(f"Finalize:\n{reply}"))
    await ctx.yield_output(final)
 
async def main() -> None:
    workflow = (
        WorkflowBuilder()
        .set_start_executor(classify)
        .add_edge(classify, draft)
        .add_edge(draft, review)
        .build()
    )
 
    async for event in workflow.run("Where is order A-1042? EU customer.", stream=True):
        if event.type == "output":
            print("FINAL:", event.data)
 
if __name__ == "__main__":
    asyncio.run(main())

Key concepts in the graph API:

  • An executor is a unit of work — an agent or custom logic — declared with the @executor decorator and a handler that receives a WorkflowContext.
  • ctx.send_message(...) passes a typed message along the outgoing edges; ctx.yield_output(...) emits a final workflow result.
  • ctx.set_state(...) / ctx.get_state(...) read and write shared state. In the 2026 release these became synchronous (no await), and shared_state was renamed to state.
  • add_edge(a, b) wires executors together. You can attach a condition function to an edge for content-based routing, and fan out to several executors for parallel supersteps.

Note the unified event model: instead of many event subclasses, you check event.type"output", "request_info", and so on. For human-in-the-loop in the graph API, add a RequestInfoExecutor node.

Step 8: Observability

Because Agent Framework inherits Semantic Kernel's enterprise lineage, it emits OpenTelemetry traces and metrics out of the box. Every agent run and every workflow executor becomes a span, so you can see token usage, tool calls, and latency in any OTel-compatible backend.

from agent_framework.observability import setup_observability
 
# Call once at startup. Reads OTEL_EXPORTER_OTLP_ENDPOINT from the environment.
setup_observability()

Point OTEL_EXPORTER_OTLP_ENDPOINT at a local collector (or Aspire Dashboard / Jaeger) and you get a full trace of which agent called which tool, with timing — invaluable when a multi-agent run misbehaves in production.

Testing Your Implementation

You do not want to hit a paid model in unit tests. Test the deterministic parts — your tools — directly, and assert on workflow structure.

# test_tools.py
from tools import get_order_status, get_refund_policy
 
def test_known_order():
    assert "Shipped" in get_order_status("A-1042")
 
def test_unknown_order():
    assert "No order found" in get_order_status("ZZZ")
 
def test_region_policy():
    assert "14-day" in get_refund_policy("EU")
    assert "30-day" in get_refund_policy("US")

Run with pytest. For agent-level tests, the framework provides test client utilities so you can stub model responses and assert that the expected tool was invoked, without network calls.

Troubleshooting

ImportError: cannot import name 'ChatAgent' — You are on the 1.0+ API. Use Agent, not ChatAgent. The same applies to ChatMessage (now Message).

The agent never calls my tool — Make sure the function has a docstring and typed, Annotated parameters. The framework builds the tool schema from these; a missing description gives the model nothing to match against. Also check your instructions explicitly tell the agent to use tools.

.env values are ignored — Agent Framework does not auto-load .env. Call load_dotenv() before constructing any client, or export the variables in your shell.

AttributeError on shared_state / await ctx.get_shared_state — In 2026 these are ctx.state, ctx.get_state(...), and ctx.set_state(...), and they are synchronous. Drop the await.

Authentication errors against AzureDefaultAzureCredential is convenient locally but probes many sources. In production prefer a specific credential like ManagedIdentityCredential to avoid latency and unexpected fallbacks.

Next Steps

  • Add a RequestInfoExecutor to the graph workflow so high-urgency tickets pause for a human, mirroring the functional example.
  • Swap OpenAIChatClient for AzureOpenAIChatClient and deploy behind a Foundry endpoint for enterprise governance.
  • Explore the built-in concurrent, hand-off, and magentic orchestration patterns for more dynamic multi-agent behavior.
  • Wrap a whole workflow as an agent with .as_agent() and nest it inside a larger system.

If you are building agents in TypeScript instead, our guides on the OpenAI Agents SDK and Mastra cover the same patterns in that ecosystem. For Python-first alternatives, see our Pydantic AI and Agno tutorials.

Conclusion

Microsoft Agent Framework gives you one coherent library that scales from a three-line "hello agent" to a type-validated, observable, human-in-the-loop multi-agent workflow. The mental model is small: an agent is a model plus instructions plus tools; a workflow is how you compose agents with explicit control flow. Start with plain agent calls, graduate to the functional @workflow API when you need orchestration, and adopt the graph WorkflowBuilder when you need strict routing at scale. Because it carries Semantic Kernel's telemetry and AutoGen's ergonomics, it is one of the few agent frameworks built for production from day one — a strong default for teams in the MENA region standardizing on Microsoft and Azure infrastructure.