Files
HadTavern/tests/test_executor_iterative.py
2025-09-11 17:27:15 +03:00

167 lines
5.4 KiB
Python

import asyncio
from agentui.pipeline.executor import PipelineExecutor, ExecutionError, Node, NODE_REGISTRY
def run_checks():
async def scenario():
# Test 1: linear pipeline in iterative mode (SetVars -> Return)
p1 = {
"id": "pipeline_test_iter_1",
"name": "Iterative Linear",
"loop_mode": "iterative",
"loop_max_iters": 100,
"loop_time_budget_ms": 5000,
"nodes": [
{
"id": "n1",
"type": "SetVars",
"config": {
"variables": [
{"id": "v1", "name": "MSG", "mode": "string", "value": "Hello"}
]
},
"in": {}
},
{
"id": "n2",
"type": "Return",
"config": {
"target_format": "openai",
"text_template": "[[MSG]]"
},
"in": {
"depends": "n1.done"
}
}
]
}
ctx = {
"model": "gpt-x",
"vendor_format": "openai",
"params": {},
"chat": {"last_user": "hi"},
"OUT": {}
}
ex1 = PipelineExecutor(p1)
out1 = await ex1.run(ctx)
assert isinstance(out1, dict) and "result" in out1
res1 = out1["result"]
# OpenAI-like object from Return formatter
assert res1.get("object") == "chat.completion"
msg1 = res1.get("choices", [{}])[0].get("message", {}).get("content")
assert msg1 == "Hello"
# Test 2: If gating in iterative mode (SetVars -> If -> Return(true))
p2 = {
"id": "pipeline_test_iter_2",
"name": "Iterative If Gate",
"loop_mode": "iterative",
"loop_max_iters": 100,
"loop_time_budget_ms": 5000,
"nodes": [
{
"id": "n1",
"type": "SetVars",
"config": {
"variables": [
{"id": "v1", "name": "MSG", "mode": "string", "value": "Hello world"}
]
},
"in": {}
},
{
"id": "nIf",
"type": "If",
"config": {
"expr": '[[MSG]] contains "Hello"'
},
"in": {
"depends": "n1.done"
}
},
{
"id": "nRet",
"type": "Return",
"config": {
"target_format": "openai",
"text_template": "[[MSG]] ok"
},
"in": {
"depends": "nIf.true"
}
}
]
}
ex2 = PipelineExecutor(p2)
out2 = await ex2.run(ctx)
assert "result" in out2
res2 = out2["result"]
assert res2.get("object") == "chat.completion"
msg2 = res2.get("choices", [{}])[0].get("message", {}).get("content")
assert msg2 == "Hello world ok"
# Test 3: [[OUT:...]] is treated as a real dependency in iterative mode
class ProbeNode(Node):
type_name = "Probe"
async def run(self, inputs, context):
x = inputs.get("x")
assert isinstance(x, dict) and isinstance(x.get("vars"), dict)
v = x["vars"].get("MSG")
assert v == "Hello OUT"
return {"vars": {"X_MSG": v}}
# Register probe node
NODE_REGISTRY[ProbeNode.type_name] = ProbeNode
p3 = {
"id": "pipeline_test_iter_3",
"name": "Iterative OUT dependency",
"loop_mode": "iterative",
"loop_max_iters": 100,
"loop_time_budget_ms": 5000,
"nodes": [
{
"id": "n1",
"type": "SetVars",
"config": {
"variables": [
{"id": "v1", "name": "MSG", "mode": "string", "value": "Hello OUT"}
]
},
"in": {}
},
{
"id": "n2",
"type": "Probe",
"config": {},
"in": {
"x": "[[OUT:n1]]"
}
},
{
"id": "n3",
"type": "Return",
"config": {
"target_format": "openai",
"text_template": "[[VAR:vars.X_MSG]]"
},
"in": {
"depends": "n2.done"
}
}
]
}
ex3 = PipelineExecutor(p3)
out3 = await ex3.run(ctx)
assert "result" in out3
res3 = out3["result"]
assert res3.get("object") == "chat.completion"
msg3 = res3.get("choices", [{}])[0].get("message", {}).get("content")
assert msg3 == "Hello OUT"
asyncio.run(scenario())
print("Iterative executor tests: OK")
if __name__ == "__main__":
run_checks()