跳到内容

流式传输

在本指南中,我们将向您介绍如何在 DSPy 程序中启用流式传输。DSPy 流式传输包括两部分:

  • 输出 Token 流式传输:在生成单个 token 时对其进行流式传输,而不是等待完整的响应。
  • 中间状态流式传输:提供有关程序执行状态的实时更新(例如,“正在调用网页搜索...”,“正在处理结果...”)。

输出 Token 流式传输

DSPy 的 token 流式传输功能适用于管道中的任何模块,而不仅仅是最终输出。唯一的要求是流式传输的字段必须是 str 类型。要启用 token 流式传输:

  1. 使用 dspy.streamify 包装您的程序
  2. 创建一个或多个 dspy.streaming.StreamListener 对象来指定要流式传输的字段

这是一个基本示例:

import os

import dspy

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

predict = dspy.Predict("question->answer")

# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
    predict,
    stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)

要使用流式输出:

import asyncio

async def read_output_stream():
    output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

    async for chunk in output_stream:
        print(chunk)

asyncio.run(read_output_stream())

这将产生如下输出:

StreamResponse(predict_name='self', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' other')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' side of the frying pan!')
Prediction(
    answer='To get to the other side of the frying pan!'
)

注意:由于 dspy.streamify 返回一个异步生成器,因此您必须在异步上下文中使用它。如果您使用的环境(如 Jupyter 或 Google Colab)已经有事件循环(异步上下文),则可以直接使用该生成器。

您可能已经注意到,上述流式传输包含两个不同的实体:StreamResponsePredictionStreamResponse 是对正在监听的字段上的流式 token 的包装,在此示例中它是 answer 字段。Prediction 是程序的最终输出。在 DSPy 中,流式传输以 sidecar 方式实现:我们在 LM 上启用流式传输,以便 LM 输出 token 流。我们将这些 token 发送到一个侧通道,用户定义的监听器会持续读取该通道。监听器会持续解释流,并决定它正在监听的 signature_field_name 是否已开始出现并已完成。一旦确定该字段出现,监听器便开始将 token 输出到用户可以读取的异步生成器。监听器的内部机制会根据幕后的适配器而变化,并且由于通常在我们看到下一个字段之前无法确定字段是否已完成,因此监听器会在发送到最终生成器之前缓冲输出 token,这就是为什么您通常会看到最后一个类型为 StreamResponse 的块包含多个 token。程序的输出也会写入流中,即上面示例输出中显示的 Prediction 块。

要处理这些不同类型并实现自定义逻辑:

import asyncio

async def read_output_stream():
  output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

  async for chunk in output_stream:
    return_value = None
    if isinstance(chunk, dspy.streaming.StreamResponse):
      print(f"Output token of field {chunk.signature_field_name}: {chunk.chunk}")
    elif isinstance(chunk, dspy.Prediction):
      return_value = chunk


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

理解 StreamResponse

StreamResponse (dspy.streaming.StreamResponse) 是流式传输 token 的包装类。它包含 3 个字段:

  • predict_name:包含 signature_field_name 的 predict 的名称。该名称与您运行 your_program.named_predictors() 时看到的键名相同。在上面的代码中,因为 answer 来自 predict 本身,所以 predict_name 显示为 self,这是您运行 predict.named_predictors() 时唯一的键。
  • signature_field_name:这些 token 映射到的输出字段。predict_namesignature_field_name 一起构成了字段的唯一标识符。稍后在本指南中,我们将演示如何处理多个字段流式传输和重复字段名。
  • chunk:流式块的值。

使用缓存进行流式传输

找到缓存结果时,流将跳过单个 token,只产生最终的 Prediction。例如:

Prediction(
    answer='To get to the other side of the dinner plate!'
)

流式传输多个字段

您可以为每个字段创建一个 StreamListener 来监控多个字段。这是一个多模块程序的示例:

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("answer->simplified_answer")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        simplified_answer = self.predict2(answer=answer)
        return simplified_answer


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="simplified_answer"),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
)

async def read_output_stream():
    output = stream_predict(question="why did a chicken cross the kitchen?")

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
    return return_value

program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

输出将如下所示:

StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk='To')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' reach')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' other side of the recipe!')
Final output:  Prediction(
    simplified_answer='To reach the other side of the recipe!'
)

处理重复字段名

从不同模块流式传输同名字段时,请在 StreamListener 中同时指定 predictpredict_name

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("question, answer->answer, score")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        simplified_answer = self.predict2(answer=answer)
        return simplified_answer


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(
        signature_field_name="answer",
        predict=predict.predict1,
        predict_name="predict1"
    ),
    dspy.streaming.StreamListener(
        signature_field_name="answer",
        predict=predict.predict2,
        predict_name="predict2"
    ),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
)


async def read_output_stream():
    output = stream_predict(question="why did a chicken cross the kitchen?")

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
    return return_value


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

输出将如下所示:

StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk="I'm")
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' ready')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' assist')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' you')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk='! Please provide a question.')
Final output:  Prediction(
    answer="I'm ready to assist you! Please provide a question.",
    score='N/A'
)

中间状态流式传输

状态流式传输让用户了解程序的进度,这对于工具调用或复杂 AI 管道等长时间运行的操作尤其有用。要实现状态流式传输:

  1. 通过继承 dspy.streaming.StatusMessageProvider 创建自定义状态消息提供程序
  2. 覆盖所需的 hook 方法以提供自定义状态消息
  3. 将您的提供程序传递给 dspy.streamify

Example

class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
    def lm_start_status_message(self, instance, inputs):
        return f"Calling LM with inputs {inputs}..."

    def lm_end_status_message(self, outputs):
        return f"Tool finished with output: {outputs}!"

可用的 hook:

  • lm_start_status_message:调用 dspy.LM 开始时的状态消息。
  • lm_end_status_message:调用 dspy.LM 结束时的状态消息。
  • module_start_status_message:调用 dspy.Module 开始时的状态消息。
  • module_end_status_message:调用 dspy.Module 开始时的状态消息。
  • tool_start_status_message:调用 dspy.Tool 开始时的状态消息。
  • tool_end_status_message:调用 dspy.Tool 结束时的状态消息。

每个 hook 都应返回一个包含状态消息的字符串。

创建消息提供程序后,只需将其传递给 dspy.streamify,即可同时启用状态消息流式传输和输出 token 流式传输。请参见下面的示例。中间状态消息由类 dspy.streaming.StatusMessage 表示,因此我们需要另外一个条件检查来捕获它。

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.tool = dspy.Tool(lambda x: 2 * x, name="double_the_number")
        self.predict = dspy.ChainOfThought("num1, num2->sum")

    def forward(self, num, **kwargs):
        num2 = self.tool(x=num)
        return self.predict(num1=num, num2=num2)


class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
    def tool_start_status_message(self, instance, inputs):
        return f"Calling Tool {instance.name} with inputs {inputs}..."

    def tool_end_status_message(self, instance, outputs):
        return f"Tool finished with output: {outputs}!"


predict = MyModule()
stream_listeners = [
    # dspy.ChainOfThought has a built-in output field called "reasoning".
    dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
)


async def read_output_stream():
    output = stream_predict(num=3)

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
        elif isinstance(chunk, dspy.streaming.StatusMessage):
            print(chunk)
    return return_value


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

示例输出:

StatusMessage(message='Calling tool double_the_number...')
StatusMessage(message='Tool calling finished! Querying the LLM with tool calling results...')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='To')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' find')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' sum')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' two')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' numbers')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' we')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' simply')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' add')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' them')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' together')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='.')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' Here')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' ')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='3')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' plus')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' 6 equals 9.')
Final output:  Prediction(
    reasoning='To find the sum of the two numbers, we simply add them together. Here, 3 plus 6 equals 9.',
    sum='9'
)

同步流式传输

默认情况下,调用流式 DSPy 程序会生成一个异步生成器。为了获得同步生成器,您可以将标志 async_streaming 设置为 False

import os

import dspy

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

predict = dspy.Predict("question->answer")

# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
    predict,
    stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
    async_streaming=False,
)

output = stream_predict(question="why did a chicken cross the kitchen?")

program_output = None
for chunk in output:
    if isinstance(chunk, dspy.streaming.StreamResponse):
        print(chunk)
    elif isinstance(chunk, dspy.Prediction):
        program_output = chunk
print(f"Program output: {program_output}")