agent_framework 是微软推出的 Python 库,专门用于构建 AI 代理应用程序。我在研究这个框架时发现,它为聊天代理、工作流、工具调用和中间件提供了非常优雅的抽象。核心类包括实现 AgentProtocol 协议的代理、用于在工作流中运行代理的执行器、用于聊天输入/输出的消息,以及用于定义工作流拓扑的构建器。例如,AgentProtocol 是一个结构化接口,任何代理都必须遵循它(比如 ChatAgent 就实现了这个接口)。

代理和执行器

我发现代理是由 AgentExecutor 包装的,它可以适应流式或非流式模式。在使用时,你向执行器发送一个 AgentExecutorRequest(包含 ChatMessage 列表),执行器会产生一个包含代理回复的 AgentExecutorResponseAgentExecutorResponse 包含一个 AgentRunResponse(实际的代理输出)和一个唯一的执行器 ID。在内部,AgentThread 为每个代理维护会话状态和历史记录,可以在本地或通过服务线程进行。

聊天消息和响应

基于聊天的代理使用 ChatMessage 对象来表示消息。一个 ChatMessage 具有 role(例如 “user”、“assistant”、“system”)以及可选的 text 或丰富的 contents(例如函数调用或文件内容)。例如:

1
2
from agent_framework import ChatMessage
user_msg = ChatMessage(role="user", text="What's the weather?")

代理和聊天框架

ChatAgent

这是一个基于 ChatClient 的代理,用于驱动与 AI 模型的对话。它支持工具、上下文提供者、中间件,以及流式和非流式模式。

我通常通过传递聊天客户端和系统指令来创建 ChatAgent。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# 定义一个简单的工具(可选)
@ai_function
def get_weather(location: str) -> str:
    return f"Weather in {location}: sunny"

# 创建聊天客户端和代理
client = OpenAIChatClient(model_id="gpt-4")
agent = ChatAgent(
    name="assistant",
    chat_client=client,
    instructions="You are a helpful assistant.",
    tools=[get_weather]
)

# 在用户消息上运行代理
result = await agent.run("What's the weather in London?")
print(result.messages[-1].text)  # 例如 "Weather in London: sunny"

这会产生一个包含生成消息的 ChatResponse。默认情况下,代理是无状态的;要维护对话上下文,需要使用 AgentThread(见下文)。

ChatMessage

表示对话中的单个聊天消息(角色 + 内容)。它具有 role(SYSTEM、USER 或 ASSISTANT)和 text 等属性。例如:

1
2
3
from agent_framework import ChatMessage
msg = ChatMessage(role="user", text="Hello, how are you?")
print(msg.role, msg.text)  # "user Hello, how are you?"

ChatResponse 表示助手的完整回复。你可以从文本或消息构造它,它也支持流式更新。它有 messagestextfinish_reason 等字段。例如:

1
2
3
4
from agent_framework import ChatResponse, ChatMessage
response = ChatResponse(text="Hello, how can I help you?")
msg = ChatMessage(role="assistant", text="The weather is sunny.")
response2 = ChatResponse(messages=[msg], finish_reason="stop", model_id="gpt-4")

ChatResponseUpdate 是来自聊天客户端的部分流式块。你可以使用 ChatResponse.from_chat_response_updates(updates) 将更新组合成最终响应。

聊天存储和选项

agent_framework 还包括聊天历史存储和设置。ChatMessageStore(使用协议 ChatMessageStoreProtocol)处理消息的持久化(内存中或自定义存储)。ChatOptions 封装请求设置:模型、温度、最大令牌数、停止序列、工具等。例如:

1
2
from agent_framework import ChatOptions
opts = ChatOptions(model_id="gpt-4", temperature=0.7, max_tokens=1000)

这些选项在创建代理或发出请求时传递给聊天客户端(例如 AzureOpenAIChatClient)。

AgentExecutor

这是一个包装代理的工作流执行器。AgentExecutor 根据工作流上下文调整其行为,支持流式或非流式。

在工作流中嵌入代理时使用它。要直接使用它,请包装一个代理并调用 run()run_stream():

1
2
3
4
5
from agent_framework import AgentExecutor

executor = AgentExecutor(id="my_exec", agent=agent)
response = await executor.run("Hello!")
print(response.agent_run_response.messages[-1].text)

在这里,response 是一个包含代理输出消息和元数据的 AgentExecutorResponse。在工作流中,你可以通过 WorkflowBuilder 将代理添加为执行器节点。

AgentRunContext

传递给代理中间件函数的上下文。包含有关代理调用的信息(代理标识、输入消息等)。

它仅在中间件内部使用;你不需要直接创建它。例如,在代理中间件中,你会收到一个 AgentRunContext:

1
2
3
4
5
6
7
8
9
from agent_framework import AgentRunContext, agent_middleware, ChatAgent

@agent_middleware
async def logging_middleware(context: AgentRunContext, next):
    print(f"[Before] Agent: {context.agent.name}, Message: {context.messages}")
    await next(context)
    print(f"[After] Response: {context.result}")

agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

这个中间件示例在代理运行之前和之后记录消息。

AgentThread

管理 ChatAgent 的对话状态。AgentThread 保存消息历史,可以使用服务管理的线程或本地 ChatMessageStore

你可以通过 agent.get_new_thread() 获得一个新线程。示例:

1
2
3
4
5
thread = agent.get_new_thread()            # 开始一个新的对话线程
res1 = await agent.run("What's 2+2?", thread=thread)
print(res1.messages[-1].text)  # 例如 "4"
res2 = await agent.run("Multiply that by 10.", thread=thread)
print(res2.messages[-1].text)  # 例如 "40"

使用相同的 thread 对象确保代理"记住"之前的交流。如果不使用线程,每次 run 都是独立的(无状态)。

工作流和构建器

Workflow 编排执行器(处理单元)的有向图。你使用 WorkflowBuilder 构建工作流 - 不要直接实例化 WorkflowWorkflowBuilder 提供流畅的 API 来通过边连接执行器(如 AgentExecutorFunctionExecutor 或任何 Executor 子类)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient

client = AzureChatClient(credential=credential)
agent1 = client.create_agent(instructions="You write content.", name="writer")
agent2 = client.create_agent(instructions="You review content.", name="reviewer")

builder = WorkflowBuilder()
builder.set_start_executor(agent1)
builder.add_edge(agent1, agent2)
workflow = builder.build()

这个 workflow 接收输入聊天消息,将其发送到 agent1,然后将 agent1 的输出转发到 agent2

要执行工作流:

1
2
3
4
result = await workflow.run("Draft an introduction to AI.")
outputs = result.get_outputs()
for msg in outputs:
    print(msg.text)

该框架包括工作流构建器来定义代理/执行器的执行图。

WorkflowBuilder

使用 WorkflowBuilder() 通过有向边连接执行器(代理或自定义执行器)。流畅的 API 允许你添加链(.add_edge)、扇入/扇出组、switch-case 逻辑,并设置起始执行器,然后调用 .build() 获得 Workflow 实例。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from agent_framework import Executor, WorkflowBuilder, handler, WorkflowContext

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]):
        await ctx.send_message(text.upper())

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]):
        await ctx.yield_output(text[::-1])

workflow = (
    WorkflowBuilder()
    .add_edge(UpperCaseExecutor(id="upper"), ReverseExecutor(id="reverse"))
    .set_start_executor("upper")
    .build()
)
events = await workflow.run("hello")
print(events.get_outputs())  # ['OLLEH']

这定义了一个两步工作流:用户文本 → 大写执行器 → 反转执行器。构建器强制执行类型兼容性和连接性。

HandoffBuilder

对于具有"协调器"和"专家"代理的多代理场景,使用 HandoffBuilder。这支持单层或多层切换。

你指定一个协调器和专家,框架会自动生成切换工具(例如 handoff_to_refund_agent)来切换控制。它在代理之间保留对话历史,并支持自定义终止逻辑。

例如:

1
2
3
4
5
6
7
8
9
from agent_framework import HandoffBuilder, InMemoryCheckpointStorage

workflow = (
    HandoffBuilder(participants=[coordinator, refund_agent, ship_agent])
    .set_coordinator("coordinator_agent")
    .with_termination_condition(lambda conv: any("goodbye" in msg.text for msg in conv))
    .with_checkpointing(InMemoryCheckpointStorage())
    .build()
)

当专家需要更多用户输入时,工作流会发出一个 HandoffUserInputRequest 事件,其中包含完整对话、等待代理 ID 和提示。应用程序应收集用户输入并通过 workflow.send_response(request_id, user_input) 发送回去。

GroupChatBuilder

对于编排的多方聊天,使用 GroupChatBuilder。它让管理器(Python 函数或 LLM 代理)决定哪个参与者接下来发言。

例如,你可以使用 set_select_speakers_func 设置一个简单的轮流函数,或使用 set_manager(manager_agent) 让协调器代理选择下一个发言者。传递给管理器的状态快照(GroupChatStateSnapshot)包括任务、参与者、完整对话、每轮历史记录和轮次索引,因此管理器可以做出决策。然后,构建的工作流在参与者之间动态路由消息。

边和流程

工作流使用来连接执行器。你可以添加链式边(.add_chain)、扇入组(.add_fan_in_edges 来同步多个源)、扇出(.add_fan_out_edges 来广播)、多选边,或 switch-case(.add_switch_case_edge_group)来有条件地路由消息。

验证(例如没有无终止的循环、类型检查)在构建时或通过 validate_workflow_graph 实用程序完成。

Executor(基类)

Executor 是工作流中处理消息的节点。我通常会子类化 Executor 并用 @handler 装饰处理方法。处理程序是接受 (self, message, ctx)async 函数。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from agent_framework import Executor, handler, WorkflowContext

class MyExecutor(Executor):
    def __init__(self):
        super().__init__(id="my_executor")

    @handler
    async def handle_string(self, message: str, ctx: WorkflowContext[int]):
        result = message.upper()
        await ctx.send_message(result)

WorkflowRunResult

workflow.run() 的结果是一个 WorkflowRunResult 对象。它包含运行期间生成的所有事件,包括执行器输出和控制事件。

关键方法包括 get_outputs()(提取所有生成的工作流输出)、get_request_info_events()(外部输入请求)和 get_final_state()

1
2
3
4
result = await workflow.run("Start task")
outputs = result.get_outputs()
for out_msg in outputs:
    print(out_msg.text)

在内部,每个代理或执行器步骤都向工作流生成消息,WorkflowRunResult 聚合它们。在流式模式(run_stream())中,你将迭代事件。

WorkflowContext

传递给执行器处理程序,WorkflowContext 提供与工作流交互的方法。它是泛型类型的,以强制执行输入/输出类型。例如,上下文 WorkflowContext[int, str] 可以发送整数消息并生成字符串输出。

示例处理程序签名包括:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from agent_framework import WorkflowContext

# 向下游发送整数消息
async def process(msg: str, ctx: WorkflowContext[int]):
    await ctx.send_message(len(msg))

# 发送 int 并生成工作流输出 str
async def dual(msg: str, ctx: WorkflowContext[int, str]):
    await ctx.send_message(42)
    await ctx.yield_output("complete")

这使工作流类型安全。你不需要自己实例化 WorkflowContext;它由框架在运行执行器时提供。

HandoffBuilder

用于切换工作流的高级构建器,具有协调器代理和专家。HandoffBuilder 自动创建在代理之间切换任务的工具。

在最简单的(单层)情况下,你指定所有代理和一个协调器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from agent_framework import HandoffBuilder

workflow = (
    HandoffBuilder(
        name="support",
        participants=[coordinator_agent, refund_agent, shipping_agent]
    )
    .set_coordinator("coordinator_agent")
    .build()
)

这创建了一个工作流,协调器可以切换到专家。你可以使用 .with_termination_condition(...).add_handoff(...) 自定义终止条件或多层切换。

1
2
3
4
5
events = workflow.run_stream("My package hasn't arrived yet")
async for ev in events:
    if isinstance(ev, RequestInfoEvent):
        answer = input("User: ")
        await workflow.send_response(ev.data.request_id, answer)

WorkflowCheckpoint

表示工作流状态的保存快照,启用暂停/恢复。WorkflowCheckpoint 包括所有执行器状态、传输中的消息和共享状态。

WorkflowBuilder 上使用 .with_checkpointing(storage) 启用检查点,其中 storage 实现 CheckpointStorage。例如,使用内存存储:

1
2
3
4
5
from agent_framework import InMemoryCheckpointStorage

storage = InMemoryCheckpointStorage()
workflow = WorkflowBuilder().with_checkpointing(storage).build()
# 现在每次运行都会通过 'storage' 检查点状态。

检查点允许通过加载存储的状态在中断后恢复工作流。

中间件和实用程序

代理中间件

子类化 AgentMiddleware 来拦截代理调用。它提供一个 async process(context, next) 方法,你可以在其中检查或修改 AgentRunContext(包含代理实例、消息、流式标志等)。例如,重试中间件可以捕获错误并重新调用代理最多 n 次。

例如,使用 @agent_middleware 装饰器创建中间件函数:

1
2
3
4
5
6
7
8
9
from agent_framework import agent_middleware, AgentRunContext, ChatAgent

@agent_middleware
async def logging_mw(ctx: AgentRunContext, next):
    print(f"[AgentMiddleware before] {ctx.agent.name}")
    await next(ctx)
    print(f"[AgentMiddleware after] result = {ctx.result}")

agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_mw)

这将在每次 agent.run() 之前和之后运行。

聊天中间件

类似地,子类化 ChatMiddleware 来拦截聊天客户端调用(例如添加系统提示、日志记录)。这些中间件管道在内置代理/客户端上自动启用,但可以通过 @use_agent_middleware@use_chat_middleware 装饰器添加到自定义类。

1
2
3
4
5
6
7
8
9
from agent_framework import chat_middleware, ChatContext, ChatAgent

@chat_middleware
async def chat_logger(ctx: ChatContext, next):
    print(f"[ChatMiddleware] Sending {len(ctx.messages)} messages")
    await next(ctx)
    print(f"[ChatMiddleware] Received response")

agent = ChatAgent(chat_client=client, name="assistant", middleware=chat_logger)

工具和函数中间件

@function_middleware 装饰器标记拦截工具调用流程的函数。在执行器内部使用 @handler 来标记处理某些消息类型的方法。

实用函数

该包提供实用程序,如 get_logger(name='agent_framework') 获取日志记录器,validate_workflow_graph 检查构建的工作流的一致性,prepare_function_call_results 格式化模型的函数输出。装饰器 @use_function_invocation 可以包装聊天客户端,以便自动执行模型建议的工具调用并返回。

工具和函数调用

该框架包括集成外部工具的类:

AIFunction

一个帮助程序,将 Python 函数包装为 AI 可调用工具。它从类型注释生成 JSON 模式,验证输入/输出,并在响应中生成 FunctionCallContent 对象。你可以将其用作装饰器或实例化它:

1
2
3
4
5
from agent_framework import AIFunction

@ai_function
def get_weather(city: str) -> str:
    return f"Sunny in {city}"

这里 @ai_function 装饰 get_weather,以便模型可以调用它,框架会自动处理调用结果。

1
2
3
4
5
6
from agent_framework import ai_function
from typing import Annotated

@ai_function(name="add_numbers", description="Add two numbers")
def add(a: Annotated[int, "first addend"], b: Annotated[int, "second addend"]) -> int:
    return a + b

这创建了一个 AI 可调用函数。AI 模型(通过函数调用)可以使用与签名匹配的 JSON 参数调用它。然后,你可以通过将它们添加到 ChatAgent 工具或在 FunctionExecutor 中处理函数调用来将这些集成到聊天中。

函数调用协议

在聊天期间,如果 AI 模型返回函数调用,装饰器 @use_function_invocation 将拦截它,执行相应的 AIFunction,并将结果反馈给模型。像 FunctionCallContentFunctionResultContentFunctionApprovalRequestContent 等类保存这些调用和批准的数据(例如用于人工参与)在聊天消息中。

托管工具

该包提供几个预构建的工具类,你可以通过 ChatOptions.tools 传递以启用特定功能:

HostedWebSearchTool: 让模型搜索网络。它是一个标记工具(它本身不运行搜索;后端必须支持它),可以包含 description 或上下文属性。

1
2
3
from agent_framework import HostedWebSearchTool
search_tool = HostedWebSearchTool()
agent = ChatAgent(chat_client=client, name="assistant", tools=[search_tool])

HostedFileSearchTool: 启用文件搜索(例如在向量存储中)。你可以配置输入内容(例如 HostedVectorStoreContent)和最大结果数。

HostedCodeInterpreterTool: 表示可以执行生成的代码(服务需要支持它)。你可以选择附加代码可能使用的文件输入。

1
2
3
from agent_framework import HostedCodeInterpreterTool
tool = HostedCodeInterpreterTool()
agent = ChatAgent(chat_client=client, name="assistant", tools=[tool])
1
2
3
from agent_framework import HostedWebSearchTool, HostedCodeInterpreterTool
tools = [HostedWebSearchTool(), HostedCodeInterpreterTool()]
options = ChatOptions(model_id="gpt-4", tools=tools, tool_choice="auto")

这些工具对象被序列化到聊天请求中,以便 AI 模型"知道"这些工具存在。

FunctionExecutor

将用户定义的函数包装为工作流中的执行器。

我可以这样做,而不是编写完整的 Executor 子类:

1
2
3
4
5
6
7
8
9
from agent_framework import FunctionExecutor

def square(x: int) -> int:
    return x * x

executor = FunctionExecutor(square)  # executor.id 默认为函数名
# 在 WorkflowBuilder 中使用执行器:
builder = WorkflowBuilder()
builder.set_start_executor(executor)

当轮到它时,框架运行函数(同步或异步)。同步函数在线程池上执行以避免阻塞。

FunctionCallContent

表示模型可能发出的函数调用请求(名称 + 参数)。它通常在内部创建。为了完整性,你可以手动创建一个:

1
2
from agent_framework import FunctionCallContent
content = FunctionCallContent(name="get_time", arguments={"timezone": "UTC"})

类装饰器

@use_agent_middleware@use_chat_middleware 向自定义类添加中间件支持。它们已经应用于内置代理/客户端。如果实现自己的类,请使用它们:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from agent_framework import use_agent_middleware

@use_agent_middleware
class CustomAgent:
    async def run(self, messages, **kwargs):
        # 自定义运行逻辑
        return ...

    async def run_stream(self, messages, **kwargs):
        # 自定义流式逻辑
        return ...

from agent_framework import use_chat_middleware

@use_chat_middleware
class CustomChatClient:
    async def get_response(self, messages, **kwargs):
        # 实现聊天调用
        pass
    async def get_streaming_response(self, messages, **kwargs):
        pass

这些装饰器包装 runget_response 方法以集成中间件执行。

FunctionMiddleware 和 @function_middleware

像聊天和代理中间件一样,这拦截函数/工具调用。使用装饰器标记处理 FunctionInvocationContext 的函数:

1
2
3
4
5
6
7
8
9
from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent

@function_middleware
async def func_logger(ctx: FunctionInvocationContext, next):
    print(f"Calling function: {ctx.function.name} with args {ctx.arguments}")
    await next(ctx)
    print(f"Function result: {ctx.result}")

agent = ChatAgent(chat_client=client, name="assistant", middleware=func_logger)

Handler 装饰器(@handler)

在工作流执行器中用于标记消息处理方法。@handler 装饰器必须注释 Executorasync 方法以指定其输入类型。

实用程序和消息类型

ChatOptions

保存 AI 请求的常见设置(模型 ID、温度、最大令牌数、工具等)。

使用它来独立于代理配置聊天请求。示例:

1
2
3
from agent_framework import ChatOptions
opts = ChatOptions(model_id="gpt-4", temperature=0.7, max_tokens=500)
print(opts.temperature)  # 0.7

ChatMessageStore

聊天消息的内存存储,对自定义线程有用。它实现 ChatMessageStoreProtocol 并支持异步操作。

1
2
3
4
5
6
7
from agent_framework import ChatMessageStore, ChatMessage

store = ChatMessageStore()               # 空存储
msg = ChatMessage(role="user", text="Hi")
await store.add_messages([msg])          # 添加消息
msgs = await store.list_messages()       # 检索所有
print(msgs[0].text)  # "Hi"

HostedFileContent

表示托管在 URI 的文件内容。例如,引用在线文件:

1
2
3
4
from agent_framework import HostedFileContent

file_content = HostedFileContent(name="data", uri="https://example.com/data.json?WT.mc_id=AI-MVP-5003172")
print(file_content.uri)

这可以在工作流或 RAG 上下文中使用以包含文件数据。(这个类只是一个容器;实际的托管和检索是外部管理的。)

DataContent

具有媒体类型的二进制数据(例如图像或作为数据 URI 的文件)。例如:

1
2
3
4
from agent_framework import DataContent

binary = DataContent(media_type="image/png", data="iVBORw0KGgoAAAANS...")
print(binary.media_type)  # "image/png"

TextContent 和 TextReasoningContent

表示纯文本内容。TextContent 是通用文本,而 TextReasoningContent 可以携带注释的推理步骤(它们具有相同的结构)。

1
2
3
4
from agent_framework import TextContent

text = TextContent(text="Some information")
print(text.text)  # "Some information"

在将附加文本注入聊天或作为函数输入时使用 TextContent

UriContent

表示由 URI 标识的内容(图像、文件链接等)。例如:

1
2
3
4
from agent_framework import UriContent

image = UriContent(uri="https://example.com/image.png?WT.mc_id=AI-MVP-5003172")
print(image.uri)

UsageContent

保存令牌使用指标(例如来自 OpenAI)。示例:

1
2
3
4
from agent_framework import UsageContent

usage = UsageContent(completion_tokens=10, prompt_tokens=5, total_tokens=15)
print(usage.total_tokens)  # 15

这些字段可能出现在 ChatResponse 或函数调用交换中以报告 API 使用情况。

总结

我在深入研究 Microsoft Agent Framework 后发现,它不仅仅是一个工具集 - 它是 AI 创新的催化剂。通过将复杂性抽象为可组合的原语,它使开发者能够构建能够对话、协作和适应的代理,具有前所未有的优雅性。对于关注下一波智能系统的开发者来说,这是你的基础。我建议你可以尝试创建一个 ChatAgent,体验这个框架的强大功能。