跳到内容

dspy.streamify

dspy.streamify(program: Module, status_message_provider: Optional[StatusMessageProvider] = None, stream_listeners: Optional[List[StreamListener]] = None, include_final_prediction_in_output_stream: bool = True, is_async_program: bool = False, async_streaming: bool = True) -> Callable[[Any, Any], Awaitable[Any]]

封装一个 DSPy 程序,使其能够增量地流式输出结果,而不是一次性返回所有结果。它还向用户提供状态消息以指示程序的进度,用户可以实现自己的状态消息提供者来自定义状态消息以及为哪个模块生成状态消息。

参数

名称 类型 描述 默认值
program Module

要封装以提供流式传输功能的 DSPy 程序。

必填
status_message_provider 可选[StatusMessageProvider]

用于替代默认的状态消息生成器的自定义生成器。用户可以实现自己的状态消息生成器来自定义状态消息以及为哪个模块生成状态消息。

stream_listeners 可选[列表[StreamListener]]

一个流式监听器列表,用于捕获程序中子预测的特定字段的流式输出。提供此列表时,只有目标预测中的目标字段会流式传输给用户。

include_final_prediction_in_output_stream 布尔值

是否在输出流中包含最终预测结果,仅在提供了 stream_listeners 时有用。如果设置为 False,最终预测结果将不会包含在输出流中。当程序命中缓存或没有监听器捕获任何内容时,即使此参数为 False,最终预测结果仍会包含在输出流中。

True
is_async_program 布尔值

程序是否为异步。如果为 False,程序将通过 asyncify 封装,否则将通过 acall 调用。

False
async_streaming 布尔值

是返回异步生成器还是同步生成器。如果为 False,流式传输将转换为同步生成器。

True

返回值

类型 描述
可调用对象[[任意类型, 任意类型], Awaitable[任意类型]]

一个函数,接受与原始程序相同的参数,但返回一个异步生成器,该生成器会增量地产生程序的输出。

示例

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"))

# Use the program with streaming output
async def use_streaming():
    output = program(q="Why did a chicken cross the kitchen?")
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

使用自定义状态消息提供者的示例

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class MyStatusMessageProvider(StatusMessageProvider):
    def module_start_status_message(self, instance, inputs):
        return f"Predicting..."

    def tool_end_status_message(self, outputs):
        return f"Tool calling finished with output: {outputs}!"

# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())

# Use the program with streaming output
async def use_streaming():
    output = program(q="Why did a chicken cross the kitchen?")
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

使用流式监听器的示例

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))

# Create the program and wrap it with streaming functionality
predict = dspy.Predict("question->answer, reasoning")
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)

async def use_streaming():
    output = stream_predict(
        question="why did a chicken cross the kitchen?",
        include_final_prediction_in_output_stream=False,
    )
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

您应该在控制台输出中看到流式传输的块(格式为 dspy.streaming.StreamResponse)。

源代码位于 dspy/streaming/streamify.py
def streamify(
    program: "Module",
    status_message_provider: Optional[StatusMessageProvider] = None,
    stream_listeners: Optional[List[StreamListener]] = None,
    include_final_prediction_in_output_stream: bool = True,
    is_async_program: bool = False,
    async_streaming: bool = True,
) -> Callable[[Any, Any], Awaitable[Any]]:
    """
    Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
    all at once. It also provides status messages to the user to indicate the progress of the program, and users
    can implement their own status message provider to customize the status messages and what module to generate
    status messages for.

    Args:
        program: The DSPy program to wrap with streaming functionality.
        status_message_provider: A custom status message generator to use instead of the default one. Users can
            implement their own status message generator to customize the status messages and what module to generate
            status messages for.
        stream_listeners: A list of stream listeners to capture the streaming output of specific fields of sub predicts
            in the program. When provided, only the target fields in the target predict will be streamed to the user.
        include_final_prediction_in_output_stream: Whether to include the final prediction in the output stream, only
            useful when `stream_listeners` is provided. If `False`, the final prediction will not be included in the
            output stream. When the program hit cache, or no listeners captured anything, the final prediction will
            still be included in the output stream even if this is `False`.
        is_async_program: Whether the program is async. If `False`, the program will be wrapped with `asyncify`,
            otherwise the program will be called with `acall`.
        async_streaming: Whether to return an async generator or a sync generator. If `False`, the streaming will be
            converted to a sync generator.

    Returns:
        A function that takes the same arguments as the original program, but returns an async
            generator that yields the program's outputs incrementally.

    Example:

    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
    # Create the program and wrap it with streaming functionality
    program = dspy.streamify(dspy.Predict("q->a"))

    # Use the program with streaming output
    async def use_streaming():
        output = program(q="Why did a chicken cross the kitchen?")
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    Example with custom status message provider:
    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

    class MyStatusMessageProvider(StatusMessageProvider):
        def module_start_status_message(self, instance, inputs):
            return f"Predicting..."

        def tool_end_status_message(self, outputs):
            return f"Tool calling finished with output: {outputs}!"

    # Create the program and wrap it with streaming functionality
    program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())

    # Use the program with streaming output
    async def use_streaming():
        output = program(q="Why did a chicken cross the kitchen?")
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    Example with stream listeners:

    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))

    # Create the program and wrap it with streaming functionality
    predict = dspy.Predict("question->answer, reasoning")
    stream_listeners = [
        dspy.streaming.StreamListener(signature_field_name="answer"),
        dspy.streaming.StreamListener(signature_field_name="reasoning"),
    ]
    stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)

    async def use_streaming():
        output = stream_predict(
            question="why did a chicken cross the kitchen?",
            include_final_prediction_in_output_stream=False,
        )
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    You should see the streaming chunks (in the format of `dspy.streaming.StreamResponse`) in the console output.
    """
    stream_listeners = stream_listeners or []
    if len(stream_listeners) > 0:
        predict_id_to_listener = find_predictor_for_stream_listeners(program, stream_listeners)
    else:
        predict_id_to_listener = {}

    if is_async_program:
        program = program.acall
    elif not iscoroutinefunction(program):
        program = asyncify(program)

    callbacks = settings.callbacks
    status_streaming_callback = StatusStreamingCallback(status_message_provider)
    if not any(isinstance(c, StatusStreamingCallback) for c in callbacks):
        callbacks.append(status_streaming_callback)

    async def generator(args, kwargs, stream: MemoryObjectSendStream):
        with settings.context(send_stream=stream, callbacks=callbacks, stream_listeners=stream_listeners):
            prediction = await program(*args, **kwargs)

        await stream.send(prediction)

    async def async_streamer(*args, **kwargs):
        send_stream, receive_stream = create_memory_object_stream(16)
        async with create_task_group() as tg, send_stream, receive_stream:
            tg.start_soon(generator, args, kwargs, send_stream)

            async for value in receive_stream:
                if isinstance(value, ModelResponseStream):
                    if len(predict_id_to_listener) == 0:
                        # No listeners are configured, yield the chunk directly for backwards compatibility.
                        yield value
                    else:
                        # We are receiving a chunk from the LM's response stream, delgate it to the listeners to
                        # determine if we should yield a value to the user.
                        output = None
                        for listener in predict_id_to_listener[value.predict_id]:
                            # There should be at most one listener provides a return value.
                            output = listener.receive(value) or output
                        if output:
                            yield output
                elif isinstance(value, StatusMessage):
                    yield value
                elif isinstance(value, Prediction):
                    if include_final_prediction_in_output_stream:
                        yield value
                    elif (
                        len(stream_listeners) == 0
                        or any(listener.cache_hit for listener in stream_listeners)
                        or not any(listener.stream_start for listener in stream_listeners)
                    ):
                        yield value
                    return

    if async_streaming:
        return async_streamer
    else:

        def sync_streamer(*args, **kwargs):
            output = async_streamer(*args, **kwargs)
            return apply_sync_streaming(output)

        return sync_streamer