建议:设置 MLflow 追踪,以了解底层情况。
MLflow DSPy 集成¶
MLflow 是一款与 DSPy 原生集成的 LLMOps 工具,提供可解释性和实验追踪功能。在本教程中,你可以使用 MLflow 将提示和优化进度可视化为追踪,以更好地理解 DSPy 的行为。按照以下四个步骤即可轻松设置 MLflow。
- 安装 MLflow
%pip install mlflow>=2.20
- 在单独的终端中启动 MLflow UI
mlflow ui --port 5000
- 将 Notebook 连接到 MLflow
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
- 启用追踪。
mlflow.dspy.autolog()
要了解有关此集成的更多信息,请访问 MLflow DSPy 文档。
在本教程中,我们将演示新的实验性提示优化器 dspy.SIMBA
,它对于大型语言模型 (LLMs) 和更具挑战性的任务通常很强大。使用它,我们将把代理的准确率从 35% 提高到 60%。
import dspy
import ujson
import random
gpt4o = dspy.LM("openai/gpt-4o", temperature=0.7)
dspy.configure(lm=gpt4o)
现在让我们下载数据。
from dspy.utils import download
download("https://hugging-face.cn/datasets/bytedance-research/ToolHop/resolve/main/data/ToolHop.json")
data = ujson.load(open("ToolHop.json"))
random.Random(0).shuffle(data)
Downloading 'ToolHop.json'...
然后我们准备一组经过清理的示例。ToolHop 任务的有趣之处在于,代理会获得一组独特的工具(函数),用于单独处理每个请求。因此,它需要学习如何在实践中有效地使用任何此类工具。
import re
import inspect
examples = []
fns2code = {}
def finish(answer: str):
"""Conclude the trajectory and return the final answer."""
return answer
for datapoint in data:
func_dict = {}
for func_code in datapoint["functions"]:
cleaned_code = func_code.rsplit("\n\n# Example usage", 1)[0]
fn_name = re.search(r"^\s*def\s+([a-zA-Z0-9_]+)\s*\(", cleaned_code)
fn_name = fn_name.group(1) if fn_name else None
if not fn_name:
continue
local_vars = {}
exec(cleaned_code, {}, local_vars)
fn_obj = local_vars.get(fn_name)
if callable(fn_obj):
func_dict[fn_name] = fn_obj
assert fn_obj not in fns2code, f"Duplicate function found: {fn_name}"
fns2code[fn_obj] = (fn_name, cleaned_code)
func_dict["finish"] = finish
example = dspy.Example(question=datapoint["question"], answer=datapoint["answer"], functions=func_dict)
examples.append(example.with_inputs("question", "functions"))
trainset, devset, testset = examples[:100], examples[100:400], examples[400:]
然后让我们为该任务定义一些辅助函数。在这里,我们将定义 metric
,它将比原始论文中的标准(严格)得多:我们将要求预测结果(归一化后)与实际结果完全匹配。我们还将以第二种方式严格要求:我们将只允许代理总共执行 5 个步骤,以实现高效部署。
from func_timeout import func_set_timeout
def wrap_function_with_timeout(fn):
@func_set_timeout(10)
def wrapper(*args, **kwargs):
try:
return {"return_value": fn(*args, **kwargs), "errors": None}
except Exception as e:
return {"return_value": None, "errors": str(e)}
return wrapper
def fn_metadata(func):
signature = inspect.signature(func)
docstring = inspect.getdoc(func) or "No docstring."
return dict(function_name=func.__name__, arguments=str(signature), docstring=docstring)
def metric(example, pred, trace=None):
gold = str(example.answer).rstrip(".0").replace(",", "").lower()
pred = str(pred.answer).rstrip(".0").replace(",", "").lower()
return pred == gold # stricter than the original paper's metric!
evaluate = dspy.Evaluate(devset=devset, metric=metric, num_threads=24, display_progress=True, display_table=0, max_errors=999)
现在,让我们定义代理!我们代理的核心将基于 ReAct 循环,其中模型查看当前的轨迹以及可调用的函数集,并决定要调用的下一个工具。
为了保持最终代理的速度,我们将 max_steps
限制在 5 个步骤。我们还将为每个函数调用设置超时。
class Agent(dspy.Module):
def __init__(self, max_steps=5):
self.max_steps = max_steps
instructions = "For the final answer, produce short (not full sentence) answers in which you format dates as YYYY-MM-DD, names as Firstname Lastname, and numbers without leading 0s."
signature = dspy.Signature('question, trajectory, functions -> next_selected_fn, args: dict[str, Any]', instructions)
self.react = dspy.ChainOfThought(signature)
def forward(self, question, functions):
tools = {fn_name: fn_metadata(fn) for fn_name, fn in functions.items()}
trajectory = []
for _ in range(self.max_steps):
pred = self.react(question=question, trajectory=trajectory, functions=tools)
selected_fn = pred.next_selected_fn.strip('"').strip("'")
fn_output = wrap_function_with_timeout(functions[selected_fn])(**pred.args)
trajectory.append(dict(reasoning=pred.reasoning, selected_fn=selected_fn, args=pred.args, **fn_output))
if selected_fn == "finish":
break
return dspy.Prediction(answer=fn_output.get("return_value", ''), trajectory=trajectory)
开箱即用,让我们评估一下基于 GPT-4o
的代理在开发集上的表现。
agent = Agent()
evaluate(agent)
2025/03/23 21:46:10 INFO dspy.evaluate.evaluate: Average Metric: 105.0 / 300 (35.0%)
35.0
现在,让我们使用 dspy.SIMBA
优化代理,它代表随机自省小批量上升。这个提示优化器接受任意 DSPy 程序,就像我们这里的代理一样,并按一系列小批量进行,旨在对提示指令或少样本示例进行增量改进。
simba = dspy.SIMBA(metric=metric, max_steps=12, max_demos=10)
optimized_agent = simba.compile(agent, trainset=trainset, seed=6793115)
dspy.SIMBA
优化代理,它代表 Stochastic Introspective Mini-Batch Ascent(随机自省小批量上升)。这种提示优化器接受任意 DSPy 程序(如我们的代理),并按照一系列小批量进行,以寻求对提示指令或少样本示例进行增量改进。完成此优化后,现在让我们再次评估代理。我们看到显著的相对增益 71%,准确率跃升至 60%。
evaluate(optimized_agent)
2025/03/23 21:46:21 INFO dspy.evaluate.evaluate: Average Metric: 182.0 / 300 (60.7%)
60.67