异步 DSPy 编程
DSPy 原生支持异步编程,使您能够构建更高效、更具可扩展性的应用程序。本指南将引导您了解如何在 DSPy 中利用异步能力,涵盖内置模块和自定义实现。
为什么要在 DSPy 中使用异步?
DSPy 中的异步编程提供了多项优势:- 通过并发操作提高性能 - 更好地利用资源 - 减少 I/O 密集型操作的等待时间 - 增强处理多个请求的可扩展性
我应该何时使用同步或异步?
在 DSPy 中选择同步还是异步编程取决于您的具体用例。以下指南将帮助您做出正确的选择
使用同步编程时:
- 您正在探索或原型化新想法
- 您正在进行研究或实验
- 您正在构建中小型应用程序
- 您需要更简单、更直观的代码
- 您想要更轻松的调试和错误跟踪
使用异步编程时:
- 您正在构建高吞吐量服务(高 QPS)
- 您正在使用仅支持异步操作的工具
- 您需要高效处理多个并发请求
- 您正在构建需要高可扩展性的生产服务
重要考量
虽然异步编程提供了性能优势,但它也带来了一些权衡:
- 更复杂的错误处理和调试
- 可能出现难以追踪的微妙错误
- 更复杂的代码结构
- ipython (Colab, Jupyter lab, Databricks notebooks 等) 和普通 Python 运行时之间的代码差异。
对于大多数开发场景,我们建议从同步编程开始,仅当您明确需要异步编程的优势时才切换到异步。这种方法允许您在处理异步编程的额外复杂性之前,专注于应用程序的核心逻辑。
异步使用内置模块
大多数 DSPy 内置模块通过 acall()
方法支持异步操作。此方法保持与同步 __call__
方法相同的接口,但异步执行。
以下是使用 dspy.Predict
的一个基本示例:
import dspy
import asyncio
import os
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")
async def main():
# Use acall() for async execution
output = await predict.acall(question="why did a chicken cross the kitchen?")
print(output)
asyncio.run(main())
使用异步工具
DSPy 的 Tool
类与异步函数无缝集成。当您向 dspy.Tool
提供一个异步函数时,您可以使用 acall()
执行它。这对于 I/O 密集型操作或在使用外部服务时特别有用。
import asyncio
import dspy
import os
os.environ["OPENAI_API_KEY"] = "your_api_key"
async def foo(x):
# Simulate an async operation
await asyncio.sleep(0.1)
print(f"I get: {x}")
# Create a tool from the async function
tool = dspy.Tool(foo)
async def main():
# Execute the tool asynchronously
await tool.acall(x=2)
asyncio.run(main())
注意:将 dspy.ReAct
与工具一起使用时,在 ReAct 实例上调用 acall()
将自动使用工具的 acall()
方法异步执行所有工具。
创建自定义异步 DSPy 模块
要创建您自己的异步 DSPy 模块,请实现 aforward()
方法而不是 forward()
。此方法应包含您模块的异步逻辑。以下是一个自定义模块的示例,该模块链式执行两个异步操作:
import dspy
import asyncio
import os
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class MyModule(dspy.Module):
def __init__(self):
self.predict1 = dspy.ChainOfThought("question->answer")
self.predict2 = dspy.ChainOfThought("answer->simplified_answer")
async def aforward(self, question, **kwargs):
# Execute predictions sequentially but asynchronously
answer = await self.predict1.acall(question=question)
return await self.predict2.acall(answer=answer)
async def main():
mod = MyModule()
result = await mod.acall(question="Why did a chicken cross the kitchen?")
print(result)
asyncio.run(main())