dspy.streamify
dspy.streamify(program: Module, status_message_provider: Optional[StatusMessageProvider] = None, stream_listeners: Optional[List[StreamListener]] = None, include_final_prediction_in_output_stream: bool = True, is_async_program: bool = False, async_streaming: bool = True) -> Callable[[Any, Any], Awaitable[Any]]
封装一个 DSPy 程序,使其能够增量地流式输出结果,而不是一次性返回所有结果。它还向用户提供状态消息以指示程序的进度,用户可以实现自己的状态消息提供者来自定义状态消息以及为哪个模块生成状态消息。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
program
|
Module
|
要封装以提供流式传输功能的 DSPy 程序。 |
必填 |
status_message_provider
|
可选[StatusMessageProvider]
|
用于替代默认的状态消息生成器的自定义生成器。用户可以实现自己的状态消息生成器来自定义状态消息以及为哪个模块生成状态消息。 |
无
|
stream_listeners
|
可选[列表[StreamListener]]
|
一个流式监听器列表,用于捕获程序中子预测的特定字段的流式输出。提供此列表时,只有目标预测中的目标字段会流式传输给用户。 |
无
|
include_final_prediction_in_output_stream
|
布尔值
|
是否在输出流中包含最终预测结果,仅在提供了 |
True
|
is_async_program
|
布尔值
|
程序是否为异步。如果为 |
False
|
async_streaming
|
布尔值
|
是返回异步生成器还是同步生成器。如果为 |
True
|
返回值
类型 | 描述 |
---|---|
可调用对象[[任意类型, 任意类型], Awaitable[任意类型]]
|
一个函数,接受与原始程序相同的参数,但返回一个异步生成器,该生成器会增量地产生程序的输出。 |
示例
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"))
# Use the program with streaming output
async def use_streaming():
output = program(q="Why did a chicken cross the kitchen?")
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
使用自定义状态消息提供者的示例
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class MyStatusMessageProvider(StatusMessageProvider):
def module_start_status_message(self, instance, inputs):
return f"Predicting..."
def tool_end_status_message(self, outputs):
return f"Tool calling finished with output: {outputs}!"
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())
# Use the program with streaming output
async def use_streaming():
output = program(q="Why did a chicken cross the kitchen?")
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
使用流式监听器的示例
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))
# Create the program and wrap it with streaming functionality
predict = dspy.Predict("question->answer, reasoning")
stream_listeners = [
dspy.streaming.StreamListener(signature_field_name="answer"),
dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)
async def use_streaming():
output = stream_predict(
question="why did a chicken cross the kitchen?",
include_final_prediction_in_output_stream=False,
)
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
您应该在控制台输出中看到流式传输的块(格式为 dspy.streaming.StreamResponse
)。
源代码位于 dspy/streaming/streamify.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
|