跳过内容

教程:部署您的 DSPy 程序

本指南演示了两种在生产环境中部署 DSPy 程序的可能方法:FastAPI 用于轻量级部署,而 MLflow 用于更生产级的部署,具有程序版本控制和管理功能。

下面,我们假设您有一个简单的 DSPy 程序需要部署。您可以将其替换为更复杂的程序。

import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
dspy_program = dspy.ChainOfThought("question -> answer")

使用 FastAPI 进行部署

FastAPI 提供了一种直接的方法,可以将您的 DSPy 程序作为 REST API 提供服务。当您可以直接访问程序代码并需要轻量级部署解决方案时,这非常理想。

> pip install fastapi uvicorn
> export OPENAI_API_KEY="your-openai-api-key"

让我们创建一个 FastAPI 应用程序来服务上面定义的 dspy_program

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

import dspy

app = FastAPI(
    title="DSPy Program API",
    description="A simple API serving a DSPy Chain of Thought program",
    version="1.0.0"
)

# Define request model for better documentation and validation
class Question(BaseModel):
    text: str

# Configure your language model and 'asyncify' your DSPy program.
lm = dspy.LM("openai/gpt-4o-mini")
dspy.settings.configure(lm=lm, async_max_workers=4) # default is 8
dspy_program = dspy.ChainOfThought("question -> answer")
dspy_program = dspy.asyncify(dspy_program)

@app.post("/predict")
async def predict(question: Question):
    try:
        result = await dspy_program(question=question.text)
        return {
            "status": "success",
            "data": result.toDict()
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

在上面的代码中,我们调用 dspy.asyncify 将 dspy 程序转换为异步模式运行,以实现高吞吐量的 FastAPI 部署。当前,这会在单独的线程中运行 dspy 程序并等待其结果。

默认情况下, spawned 线程的限制是 8。可以将其视为一个工作池。如果您有 8 个正在执行的程序并再次调用它,则第 9 个调用将等待直到其中一个返回。您可以使用新的 async_max_workers 设置来配置异步容量。

流式传输,在 DSPy 2.6.0+ 中

DSPy 2.6.0+ 也支持流式传输,可以通过 pip install -U --pre dspy 作为候选发布版本获得。

我们可以使用 dspy.streamify 将 dspy 程序转换为流式传输模式。当您想在最终预测就绪之前将中间输出(即 O1 风格的推理)流式传输到客户端时,这非常有用。它底层使用了 asyncify 并继承了执行语义。

dspy_program = dspy.asyncify(dspy.ChainOfThought("question -> answer"))
streaming_dspy_program = dspy.streamify(dspy_program)

@app.post("/predict/stream")
async def stream(question: Question):
    async def generate():
        async for value in streaming_dspy_program(question=question.text):
            if isinstance(value, dspy.Prediction):
                data = {"prediction": value.labels().toDict()}
            elif isinstance(value, litellm.ModelResponse):
                data = {"chunk": value.json()}
            yield f"data: {ujson.dumps(data)}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

# Since you're often going to want to stream the result of a DSPy program as server-sent events,
# we've included a helper function for that, which is equivalent to the code above.

from dspy.utils.streaming import streaming_response

@app.post("/predict/stream")
async def stream(question: Question):
    stream = streaming_dspy_program(question=question.text)
    return StreamingResponse(streaming_response(stream), media_type="text/event-stream")

将您的代码写入文件,例如 fastapi_dspy.py。然后您可以使用以下命令服务应用程序:

> uvicorn fastapi_dspy:app --reload

它将在 http://127.0.0.1:8000/ 启动一个本地服务器。您可以使用下面的 python 代码进行测试:

import requests

response = requests.post(
    "http://127.0.0.1:8000/predict",
    json={"text": "What is the capital of France?"}
)
print(response.json())

您应该会看到如下所示的响应:

{'status': 'success', 'data': {'reasoning': 'The capital of France is a well-known fact, commonly taught in geography classes and referenced in various contexts. Paris is recognized globally as the capital city, serving as the political, cultural, and economic center of the country.', 'answer': 'The capital of France is Paris.'}}

使用 MLflow 进行部署

如果您希望打包您的 DSPy 程序并部署在隔离环境中,我们建议使用 MLflow 进行部署。MLflow 是一个用于管理机器学习工作流程的流行平台,包括版本控制、跟踪和部署。

> pip install mlflow>=2.18.0

让我们启动 MLflow 跟踪服务器,我们将在其中存储我们的 DSPy 程序。下面的命令将在 http://127.0.0.1:5000/ 启动一个本地服务器。

> mlflow ui

然后我们可以定义 DSPy 程序并将其记录到 MLflow 服务器中。"log" 在 MLflow 中是一个重载术语,基本上意味着我们将程序信息以及环境要求存储在 MLflow 服务器中。参见下面的代码:

import dspy
import mlflow

mlflow.set_tracking_uri("http://127.0.0.1:5000/")
mlflow.set_experiment("deploy_dspy_program")

lm = dspy.LM("openai/gpt-4o-mini")
dspy.settings.configure(lm=lm)
dspy_program = dspy.ChainOfThought("question -> answer")

with mlflow.start_run():
    mlflow.dspy.log_model(
        dspy_program,
        "dspy_program",
        input_example={"messages": [{"role": "user", "content": "What is LLM agent?"}]},
        task="llm/v1/chat",
    )

我们建议您设置 task="llm/v1/chat",这样部署的程序会自动以与 OpenAI chat API 相同的格式接收输入并生成输出,OpenAI chat API 是 LM 应用程序的常用接口。将上述代码写入文件,例如 mlflow_dspy.py,然后运行它。

记录程序后,您可以在 MLflow UI 中查看保存的信息。打开 http://127.0.0.1:5000/ 并选择 deploy_dspy_program 实验,然后选择您刚刚创建的运行,在 Artifacts 选项卡下,您应该看到记录的程序信息,类似于以下截图:

MLflow UI

从 UI(或执行 mlflow_dspy.py 时的控制台输出)获取您的运行 ID,现在您可以使用以下命令部署已记录的程序:

> mlflow models serve -m runs:/{run_id}/model -p 6000

程序部署后,您可以使用以下命令进行测试:

> curl http://127.0.0.1:6000/invocations -H "Content-Type:application/json"  --data '{"messages": [{"content": "what is 2 + 2?", "role": "user"}]}'

您应该会看到如下所示的响应:

{"choices": [{"index": 0, "message": {"role": "assistant", "content": "{\"reasoning\": \"The question asks for the sum of 2 and 2. To find the answer, we simply add the two numbers together: 2 + 2 = 4.\", \"answer\": \"4\"}"}, "finish_reason": "stop"}]}

有关如何使用 MLflow 部署 DSPy 程序以及如何自定义部署的完整指南,请参阅MLflow 文档

MLflow 部署的最佳实践

  1. 环境管理:始终在 conda.yamlrequirements.txt 文件中指定您的 Python 依赖项。
  2. 版本控制:为您的模型版本使用有意义的标签和描述。
  3. 输入验证:定义清晰的输入模式和示例。
  4. 监控:为生产部署设置适当的日志记录和监控。

对于生产部署,请考虑使用 MLflow 结合容器化

> mlflow models build-docker -m "runs:/{run_id}/model" -n "dspy-program"
> docker run -p 6000:8080 dspy-program

有关生产部署选项和最佳实践的完整指南,请参阅MLflow 文档