跳到内容

异步 DSPy 编程

DSPy 原生支持异步编程,使您能够构建更高效、更具可扩展性的应用程序。本指南将引导您了解如何在 DSPy 中利用异步能力,涵盖内置模块和自定义实现。

为什么要在 DSPy 中使用异步?

DSPy 中的异步编程提供了多项优势:- 通过并发操作提高性能 - 更好地利用资源 - 减少 I/O 密集型操作的等待时间 - 增强处理多个请求的可扩展性

我应该何时使用同步或异步?

在 DSPy 中选择同步还是异步编程取决于您的具体用例。以下指南将帮助您做出正确的选择

使用同步编程时:

  • 您正在探索或原型化新想法
  • 您正在进行研究或实验
  • 您正在构建中小型应用程序
  • 您需要更简单、更直观的代码
  • 您想要更轻松的调试和错误跟踪

使用异步编程时:

  • 您正在构建高吞吐量服务(高 QPS)
  • 您正在使用仅支持异步操作的工具
  • 您需要高效处理多个并发请求
  • 您正在构建需要高可扩展性的生产服务

重要考量

虽然异步编程提供了性能优势,但它也带来了一些权衡:

  • 更复杂的错误处理和调试
  • 可能出现难以追踪的微妙错误
  • 更复杂的代码结构
  • ipython (Colab, Jupyter lab, Databricks notebooks 等) 和普通 Python 运行时之间的代码差异。

对于大多数开发场景,我们建议从同步编程开始,仅当您明确需要异步编程的优势时才切换到异步。这种方法允许您在处理异步编程的额外复杂性之前,专注于应用程序的核心逻辑。

异步使用内置模块

大多数 DSPy 内置模块通过 acall() 方法支持异步操作。此方法保持与同步 __call__ 方法相同的接口,但异步执行。

以下是使用 dspy.Predict 的一个基本示例:

import dspy
import asyncio
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")

async def main():
    # Use acall() for async execution
    output = await predict.acall(question="why did a chicken cross the kitchen?")
    print(output)


asyncio.run(main())

使用异步工具

DSPy 的 Tool 类与异步函数无缝集成。当您向 dspy.Tool 提供一个异步函数时,您可以使用 acall() 执行它。这对于 I/O 密集型操作或在使用外部服务时特别有用。

import asyncio
import dspy
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"

async def foo(x):
    # Simulate an async operation
    await asyncio.sleep(0.1)
    print(f"I get: {x}")

# Create a tool from the async function
tool = dspy.Tool(foo)

async def main():
    # Execute the tool asynchronously
    await tool.acall(x=2)

asyncio.run(main())

注意:将 dspy.ReAct 与工具一起使用时,在 ReAct 实例上调用 acall() 将自动使用工具的 acall() 方法异步执行所有工具。

创建自定义异步 DSPy 模块

要创建您自己的异步 DSPy 模块,请实现 aforward() 方法而不是 forward()。此方法应包含您模块的异步逻辑。以下是一个自定义模块的示例,该模块链式执行两个异步操作:

import dspy
import asyncio
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class MyModule(dspy.Module):
    def __init__(self):
        self.predict1 = dspy.ChainOfThought("question->answer")
        self.predict2 = dspy.ChainOfThought("answer->simplified_answer")

    async def aforward(self, question, **kwargs):
        # Execute predictions sequentially but asynchronously
        answer = await self.predict1.acall(question=question)
        return await self.predict2.acall(answer=answer)


async def main():
    mod = MyModule()
    result = await mod.acall(question="Why did a chicken cross the kitchen?")
    print(result)


asyncio.run(main())