主页 > 互联网  > 

【Agent的革命之路——LangGraph】工作流中的map-reduce模式

【Agent的革命之路——LangGraph】工作流中的map-reduce模式

这节我们来探索一下工作流设计中常用的 Map-Reduce 模式,在介绍 Map-Reduce 模式之前,我们想来看下 LangGraph 中的Send 对象已经结构化输出。

Send对象

默认情况下,节点(Nodes)和边(Edges)会预先定义,并在同一共享状态(State)下运行。然而,某些情况下可能无法提前确定边的具体数量,或者需要同时存在不同版本的状态。一个典型例子是 Map-Reduce(映射-归约)设计模式:在此模式中,第一个节点可能生成一个对象列表,而后续需要对列表中的每个对象应用另一个节点。此时,对象的数量可能未知(意味着边的数量无法预先确定),且下游节点的输入状态应是独立的(每个生成的对象对应一个独立的状态)。 为支持此类设计模式,LangGraph 允许从**条件边(conditional edges)**返回 Send 对象。Send 接收两个参数:第一个是目标节点的名称,第二个是要传递给该节点的状态(State)。 下面是Send的伪代码:

def continue_to_jokes(state: OverallState): return [Send("generate_joke", {"subject": s}) for s in state['subjects']] graph.add_conditional_edges("node_a", continue_to_jokes) Structured outputs 结构化输出

对于许多应用程序(例如聊天机器人),模型需要直接用自然语言响应用户。但是,在某些情况下,我们需要模型以结构化格式输出。例如,我们可能希望将模型输出存储在数据库中,并确保输出符合数据库模式。这需要激发结构化输出的概念,可以指示模型以特定的输出结构进行响应。 下面我们展示一下结构化输出的伪代码,该伪代码说明了使用结构化输出时推荐的工作流程。 Langchain提供了一种方法with_structured_output(),该方法可自动将模式与模型结合并解析输出的过程。该辅助功能可用于所有支持结构化输出的模型提供商。

# Define schema schema = {"foo": "bar"} # Bind schema to model model_with_structure = model.with_structured_output(schema) # Invoke the model to produce structured output that matches the schema structured_output = model_with_structure.invoke(user_input)

我们稍微改动一下上面伪代码,使真实的代码更有可比性。

定义模式 通过Pydantic定义结构化输出,它提供了类型的提示和验证, from pydantic import BaseModel, Field class ResponseFormatter(BaseModel): """Always use this tool to structure your response to the user.""" answer: str = Field(description="The answer to the user's question") followup_question: str = Field(description="A followup question the user could ask") 返回结构化输出 有几种结构化输出的方式,也是根据不同的模型提供商提供的输出方式,我一一举例。 工具调用模式: from langchain_openai import ChatOpenAI model = ChatOpenAI(model="gpt-4o", temperature=0) # Bind responseformatter schema as a tool to the model model_with_tools = model.bind_tools([ResponseFormatter]) # Invoke the model ai_msg = model_with_tools.invoke("What is the powerhouse of the cell?") # Get the tool call arguments ai_msg.tool_calls[0]["args"] # 输出结果 {'answer': "The powerhouse of the cell is the mitochondrion. Mitochondria are organelles that generate most of the cell's supply of adenosine triphosphate (ATP), which is used as a source of chemical energy.", 'followup_question': 'What is the function of ATP in the cell?'} # Parse the dictionary into a pydantic object pydantic_object = ResponseFormatter.model_validate(ai_msg.tool_calls[0]["args"])

当使用工具调用时,需要将工具调用参数从字典中解析为原始架构。并且,我们还需要指示模型在我们要执行结构化输出时始终使用该工具,这是提供商特定的设置。 json模式:

from langchain_openai import ChatOpenAI model = ChatOpenAI(model="gpt-4o", model_kwargs={ "response_format": { "type": "json_object" } }) ai_msg = model.invoke("Return a JSON object with key 'random_ints' and a value of 10 random ints in [0-99]") ai_msg.content '\n{\n "random_ints": [23, 47, 89, 15, 34, 76, 58, 3, 62, 91]\n}'

使用JSON模式时,需要将输出解析为JSON对象。 当然Langchain提供了一个辅助功能(with_structured_output())来简化该过程。 这两者都将模式绑定到模型作为工具,并将输出解析为指定的输出模式。

# Bind the schema to the model model_with_structure = model.with_structured_output(ResponseFormatter) # Invoke the model structured_output = model_with_structure.invoke("What is the powerhouse of the cell?") # 打印这个 pydantic object structured_output # 得到下面结果 ResponseFormatter(answer="The powerhouse of the cell is the mitochondrion. Mitochondria are organelles that generate most of the cell's supply of adenosine triphosphate (ATP), which is used as a source of chemical energy.", followup_question='What is the function of ATP in the cell?') Map-reduce

我们经过上面介绍了Send对象还有结构化输出,对于我们接下来写 Map-reduce 模式提供了很好的铺垫。 下面我们来开始写个例子,由于Map-reduce模式操作对于高效任务分解和并行处理至关重要。此例子涉及将任务分解为较小的子任务,并行处理每个子任务,并汇总所有已完成子任务的结果。 例子的场景:给定一个来自用户的一般主题,生成相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳笑话。在这个设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),我们想要将其他节点(例如,生成一个笑话)应用于所有这些对象(例如,主题)。然而,出现了两个主要挑战。 (1)当我们布置图形时,对象(例如,主体)的数量可能提前未知(意味着可能不知道边的数量), (2)下游节点的输入状态应该不同(每个生成的对象一个)。 LangGraph通过其SendAPI解决了这些挑战。通过利用条件边,Send可以将不同的状态(例如,主题)分配给节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流管理。 下面是本例子的详细代码,我在每个组件上都加了关键注释:

import operator from typing import Annotated from typing_extensions import TypedDict from langchain_anthropic import ChatAnthropic from langgraph.types import Send from langgraph.graph import END, StateGraph, START from pydantic import BaseModel, Field # 定义 model and prompts subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}.""" joke_prompt = """Generate a joke about {subject}""" best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one. {jokes}""" class Subjects(BaseModel): subjects: list[str] class Joke(BaseModel): joke: str class BestJoke(BaseModel): id: int = Field(description="Index of the best joke, starting with 0", ge=0) model = ChatAnthropic(model="claude-3-5-sonnet-20240620") # 定义组件 # 这将作为主图的全局状态。 # 它包含一个主题(我们期望用户提供), # 然后会生成一个主题列表,并为每个主题生成一个笑话。 class OverallState(TypedDict): topic: str subjects: list # 注意这里我们使用了 operator.add # 这是因为我们希望将所有从单个节点生成的笑话合并到一个列表中 —— 这本质上是“归约”(reduce)部分。 jokes: Annotated[list, operator.add] best_selected_joke: str # 这将作为节点的状态,我们将所有主题“映射”到该节点以生成笑话。 class JokeState(TypedDict): subject: str # 这是我们将用于生成笑话主题的函数。 def generate_topics(state: OverallState): prompt = subjects_prompt.format(topic=state["topic"]) response = model.with_structured_output(Subjects).invoke(prompt) return {"subjects": response.subjects} # 这里我们根据主题生成一个笑话。 def generate_joke(state: JokeState): prompt = joke_prompt.format(subject=state["subject"]) response = model.with_structured_output(Joke).invoke(prompt) return {"jokes": [response.joke]} # 这里我们定义了遍历生成主题的逻辑, # 并将其作为图中的一条边使用。 def continue_to_jokes(state: OverallState): # 我们将返回一个 `Send` 对象列表。 # 每个 `Send` 对象包含图中一个节点的名称, # 以及要发送给该节点的状态。 return [Send("generate_joke", {"subject": s}) for s in state["subjects"]] # 这里我们将评判最佳笑话。 def best_joke(state: OverallState): jokes = "\n\n".join(state["jokes"]) prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes) response = model.with_structured_output(BestJoke).invoke(prompt) return {"best_selected_joke": state["jokes"][response.id]} # 构建图:在这里我们将所有内容整合在一起以构建我们的图。 graph = StateGraph(OverallState) graph.add_node("generate_topics", generate_topics) graph.add_node("generate_joke", generate_joke) graph.add_node("best_joke", best_joke) graph.add_edge(START, "generate_topics") graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"]) graph.add_edge("generate_joke", "best_joke") graph.add_edge("best_joke", END) app = graph pile()

这是我们的graph: 调用该graph:

# Call the graph: here we call it to generate a list of jokes for s in app.stream({"topic": "animals"}): print(s)

得到结果:

{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}} {'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}} {'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}} {'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}} {'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}} {'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}

通过上面的示例中,通过设计Map-Reduce 模式来处理生成笑话的任务。它被分为了 Map 阶段,和 Reduce 阶段,在Map阶段generate_topics 节点生成与主题相关的多个子主题(subjects)。然后,通过 continue_to_jokes 函数,将每个子主题映射到 generate_joke 节点,生成对应的笑话。每个子主题的处理是独立的,因此可以并行执行。而在Reduce 阶段,对任务进行了分解,生成笑话的任务可以分解为多个独立的子任务(每个子主题生成一个笑话),适合并行处理。首先 Map-Reduce 模式天然支持这种任务分解和并行化。其次主题的数量是动态的,无法预先确定。Map-Reduce 模式可以灵活处理动态数量的子任务。最终需要将所有生成的笑话合并为一个列表,并选择最佳笑话。Reduce 阶段提供了这种汇总功能。Map-Reduce 模式将任务分解为清晰的阶段(Map 和 Reduce),使代码结构更清晰,易于理解和维护。

标签:

【Agent的革命之路——LangGraph】工作流中的map-reduce模式由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【Agent的革命之路——LangGraph】工作流中的map-reduce模式