跳到内容

dspy.streaming.StatusMessageProvider

dspy.streaming.StatusMessageProvider

为 DSPy 程序提供可定制的状态消息流。

此类作为创建自定义状态消息提供程序的基础。用户可以继承并覆盖其方法,为程序执行的不同阶段定义特定的状态消息,每个方法必须返回一个字符串。

Example

class MyStatusMessageProvider(StatusMessageProvider):
    def lm_start_status_message(self, instance, inputs):
        return f"Calling LM with inputs {inputs}..."

    def module_end_status_message(self, outputs):
        return f"Module finished with output: {outputs}!"

program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())

函数

lm_end_status_message(outputs: Any) 在调用 dspy.LM 之后的状态消息。

在调用 dspy.LM 之后的状态消息。

源代码位于 dspy/streaming/messages.py
def lm_end_status_message(self, outputs: Any):
    """Status message after a `dspy.LM` is called."""
    pass

lm_start_status_message(instance: Any, inputs: Dict[str, Any]) 在调用 dspy.LM 之前的状态消息。

在调用 dspy.LM 之前的状态消息。

源代码位于 dspy/streaming/messages.py
def lm_start_status_message(self, instance: Any, inputs: Dict[str, Any]):
    """Status message before a `dspy.LM` is called."""
    pass

module_end_status_message(outputs: Any) 在调用 dspy.Moduledspy.Predict 之后的状态消息。

在调用 dspy.Moduledspy.Predict 之后的状态消息。

源代码位于 dspy/streaming/messages.py
def module_end_status_message(self, outputs: Any):
    """Status message after a `dspy.Module` or `dspy.Predict` is called."""
    pass

module_start_status_message(instance: Any, inputs: Dict[str, Any]) 在调用 dspy.Moduledspy.Predict 之前的状态消息。

在调用 dspy.Moduledspy.Predict 之前的状态消息。

源代码位于 dspy/streaming/messages.py
def module_start_status_message(self, instance: Any, inputs: Dict[str, Any]):
    """Status message before a `dspy.Module` or `dspy.Predict` is called."""
    pass

tool_end_status_message(outputs: Any) 在调用 dspy.Tool 之后的状态消息。

在调用 dspy.Tool 之后的状态消息。

源代码位于 dspy/streaming/messages.py
def tool_end_status_message(self, outputs: Any):
    """Status message after a `dspy.Tool` is called."""
    return "Tool calling finished! Querying the LLM with tool calling results..."

tool_start_status_message(instance: Any, inputs: Dict[str, Any]) 在调用 dspy.Tool 之前的状态消息。

在调用 dspy.Tool 之前的状态消息。

源代码位于 dspy/streaming/messages.py
def tool_start_status_message(self, instance: Any, inputs: Dict[str, Any]):
    """Status message before a `dspy.Tool` is called."""
    return f"Calling tool {instance.name}..."