流式传输
在本指南中,我们将向您介绍如何在 DSPy 程序中启用流式传输。DSPy 流式传输包括两部分:
- 输出 Token 流式传输:在生成单个 token 时对其进行流式传输,而不是等待完整的响应。
- 中间状态流式传输:提供有关程序执行状态的实时更新(例如,“正在调用网页搜索...”,“正在处理结果...”)。
输出 Token 流式传输
DSPy 的 token 流式传输功能适用于管道中的任何模块,而不仅仅是最终输出。唯一的要求是流式传输的字段必须是 str
类型。要启用 token 流式传输:
- 使用
dspy.streamify
包装您的程序 - 创建一个或多个
dspy.streaming.StreamListener
对象来指定要流式传输的字段
这是一个基本示例:
import os
import dspy
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")
# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
predict,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)
要使用流式输出:
import asyncio
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")
async for chunk in output_stream:
print(chunk)
asyncio.run(read_output_stream())
这将产生如下输出:
StreamResponse(predict_name='self', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' other')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' side of the frying pan!')
Prediction(
answer='To get to the other side of the frying pan!'
)
注意:由于 dspy.streamify
返回一个异步生成器,因此您必须在异步上下文中使用它。如果您使用的环境(如 Jupyter 或 Google Colab)已经有事件循环(异步上下文),则可以直接使用该生成器。
您可能已经注意到,上述流式传输包含两个不同的实体:StreamResponse
和 Prediction
。StreamResponse
是对正在监听的字段上的流式 token 的包装,在此示例中它是 answer
字段。Prediction
是程序的最终输出。在 DSPy 中,流式传输以 sidecar 方式实现:我们在 LM 上启用流式传输,以便 LM 输出 token 流。我们将这些 token 发送到一个侧通道,用户定义的监听器会持续读取该通道。监听器会持续解释流,并决定它正在监听的 signature_field_name
是否已开始出现并已完成。一旦确定该字段出现,监听器便开始将 token 输出到用户可以读取的异步生成器。监听器的内部机制会根据幕后的适配器而变化,并且由于通常在我们看到下一个字段之前无法确定字段是否已完成,因此监听器会在发送到最终生成器之前缓冲输出 token,这就是为什么您通常会看到最后一个类型为 StreamResponse
的块包含多个 token。程序的输出也会写入流中,即上面示例输出中显示的 Prediction
块。
要处理这些不同类型并实现自定义逻辑:
import asyncio
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")
async for chunk in output_stream:
return_value = None
if isinstance(chunk, dspy.streaming.StreamResponse):
print(f"Output token of field {chunk.signature_field_name}: {chunk.chunk}")
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
理解 StreamResponse
StreamResponse
(dspy.streaming.StreamResponse
) 是流式传输 token 的包装类。它包含 3 个字段:
predict_name
:包含signature_field_name
的 predict 的名称。该名称与您运行your_program.named_predictors()
时看到的键名相同。在上面的代码中,因为answer
来自predict
本身,所以predict_name
显示为self
,这是您运行predict.named_predictors()
时唯一的键。signature_field_name
:这些 token 映射到的输出字段。predict_name
和signature_field_name
一起构成了字段的唯一标识符。稍后在本指南中,我们将演示如何处理多个字段流式传输和重复字段名。chunk
:流式块的值。
使用缓存进行流式传输
找到缓存结果时,流将跳过单个 token,只产生最终的 Prediction
。例如:
流式传输多个字段
您可以为每个字段创建一个 StreamListener
来监控多个字段。这是一个多模块程序的示例:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict1 = dspy.Predict("question->answer")
self.predict2 = dspy.Predict("answer->simplified_answer")
def forward(self, question: str, **kwargs):
answer = self.predict1(question=question)
simplified_answer = self.predict2(answer=answer)
return simplified_answer
predict = MyModule()
stream_listeners = [
dspy.streaming.StreamListener(signature_field_name="answer"),
dspy.streaming.StreamListener(signature_field_name="simplified_answer"),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
)
async def read_output_stream():
output = stream_predict(question="why did a chicken cross the kitchen?")
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
输出将如下所示:
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk='To')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' reach')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' other side of the recipe!')
Final output: Prediction(
simplified_answer='To reach the other side of the recipe!'
)
处理重复字段名
从不同模块流式传输同名字段时,请在 StreamListener
中同时指定 predict
和 predict_name
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict1 = dspy.Predict("question->answer")
self.predict2 = dspy.Predict("question, answer->answer, score")
def forward(self, question: str, **kwargs):
answer = self.predict1(question=question)
simplified_answer = self.predict2(answer=answer)
return simplified_answer
predict = MyModule()
stream_listeners = [
dspy.streaming.StreamListener(
signature_field_name="answer",
predict=predict.predict1,
predict_name="predict1"
),
dspy.streaming.StreamListener(
signature_field_name="answer",
predict=predict.predict2,
predict_name="predict2"
),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
)
async def read_output_stream():
output = stream_predict(question="why did a chicken cross the kitchen?")
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
输出将如下所示:
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk="I'm")
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' ready')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' assist')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' you')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk='! Please provide a question.')
Final output: Prediction(
answer="I'm ready to assist you! Please provide a question.",
score='N/A'
)
中间状态流式传输
状态流式传输让用户了解程序的进度,这对于工具调用或复杂 AI 管道等长时间运行的操作尤其有用。要实现状态流式传输:
- 通过继承
dspy.streaming.StatusMessageProvider
创建自定义状态消息提供程序 - 覆盖所需的 hook 方法以提供自定义状态消息
- 将您的提供程序传递给
dspy.streamify
Example
class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
def lm_start_status_message(self, instance, inputs):
return f"Calling LM with inputs {inputs}..."
def lm_end_status_message(self, outputs):
return f"Tool finished with output: {outputs}!"
可用的 hook:
- lm_start_status_message:调用 dspy.LM 开始时的状态消息。
- lm_end_status_message:调用 dspy.LM 结束时的状态消息。
- module_start_status_message:调用 dspy.Module 开始时的状态消息。
- module_end_status_message:调用 dspy.Module 开始时的状态消息。
- tool_start_status_message:调用 dspy.Tool 开始时的状态消息。
- tool_end_status_message:调用 dspy.Tool 结束时的状态消息。
每个 hook 都应返回一个包含状态消息的字符串。
创建消息提供程序后,只需将其传递给 dspy.streamify
,即可同时启用状态消息流式传输和输出 token 流式传输。请参见下面的示例。中间状态消息由类 dspy.streaming.StatusMessage
表示,因此我们需要另外一个条件检查来捕获它。
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.tool = dspy.Tool(lambda x: 2 * x, name="double_the_number")
self.predict = dspy.ChainOfThought("num1, num2->sum")
def forward(self, num, **kwargs):
num2 = self.tool(x=num)
return self.predict(num1=num, num2=num2)
class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
def tool_start_status_message(self, instance, inputs):
return f"Calling Tool {instance.name} with inputs {inputs}..."
def tool_end_status_message(self, instance, outputs):
return f"Tool finished with output: {outputs}!"
predict = MyModule()
stream_listeners = [
# dspy.ChainOfThought has a built-in output field called "reasoning".
dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
)
async def read_output_stream():
output = stream_predict(num=3)
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
elif isinstance(chunk, dspy.streaming.StatusMessage):
print(chunk)
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
示例输出:
StatusMessage(message='Calling tool double_the_number...')
StatusMessage(message='Tool calling finished! Querying the LLM with tool calling results...')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='To')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' find')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' sum')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' two')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' numbers')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' we')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' simply')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' add')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' them')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' together')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='.')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' Here')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' ')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='3')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' plus')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' 6 equals 9.')
Final output: Prediction(
reasoning='To find the sum of the two numbers, we simply add them together. Here, 3 plus 6 equals 9.',
sum='9'
)
同步流式传输
默认情况下,调用流式 DSPy 程序会生成一个异步生成器。为了获得同步生成器,您可以将标志 async_streaming
设置为 False
import os
import dspy
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")
# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
predict,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
async_streaming=False,
)
output = stream_predict(question="why did a chicken cross the kitchen?")
program_output = None
for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
program_output = chunk
print(f"Program output: {program_output}")