import asyncio from agentui.pipeline.executor import PipelineExecutor from agentui.pipeline.storage import clear_var_store from tests.utils import pp as _pp, ctx as _ctx async def scenario_bare_vars_and_braces(): print("\n=== MACROS 1: Bare [[NAME]] и {{ NAME }} + числа/объекты без кавычек ===") p = { "id": "p_macros_1", "name": "Bare and Braces", "loop_mode": "iterative", "nodes": [ { "id": "n1", "type": "SetVars", "config": { "variables": [ {"id": "v1", "name": "STR", "mode": "string", "value": "строка"}, {"id": "v2", "name": "NUM", "mode": "expr", "value": "42"}, {"id": "v3", "name": "OBJ", "mode": "expr", "value": '{"x": 1, "y": [2,3]}'}, ] }, "in": {} }, { "id": "n2", "type": "Return", "config": { "target_format": "openai", # Вставляем: строка из [[STR]], число через {{ NUM }}, и словарь через {{ OBJ }} "text_template": "[[STR]] | {{ NUM }} | {{ OBJ }}" }, "in": { "depends": "n1.done" } } ] } out = await PipelineExecutor(p).run(_ctx()) print("OUT:", _pp(out)) async def scenario_var_path_and_defaults(): print("\n=== MACROS 2: [[VAR:path]] и {{ ...|default(...) }} (вложенные и JSON-литералы) ===") incoming = { "method": "POST", "url": "http://localhost/test?foo=bar", "path": "/test", "query": "foo=bar", "headers": {"authorization": "Bearer X", "x-api-key": "Y"}, "json": {"a": None} } p = { "id": "p_macros_2", "name": "VAR and defaults", "loop_mode": "dag", "nodes": [ { "id": "n1", "type": "Return", "config": { "target_format": "openai", "text_template": ( "auth=[[VAR:incoming.headers.authorization]] | " "xkey=[[VAR:incoming.headers.x-api-key]] | " # nested default: params.a|default(123) -> если param не задан, 123 "num={{ params.a|default(123) }} | " # deeper default chain: incoming.json.a|default(params.a|default(456)) "num2={{ incoming.json.a|default(params.a|default(456)) }} | " # JSON literal default list/object "lit_list={{ missing|default([1,2,3]) }} | lit_obj={{ missing2|default({\"k\":10}) }}" ) }, "in": {} } ] } out = await PipelineExecutor(p).run(_ctx(incoming=incoming, params={"temperature": 0.2})) print("OUT:", _pp(out)) async def scenario_out_macros_full_and_short(): print("\n=== MACROS 3: [[OUT:nX...]] и короткая форма [[OUTx]] ===") p = { "id": "p_macros_3", "name": "OUT full and short", "loop_mode": "iterative", "nodes": [ { "id": "n1", "type": "SetVars", "config": { "variables": [ {"id": "v1", "name": "MSG", "mode": "string", "value": "hello"} ] }, "in": {} }, { "id": "n2", "type": "Return", "config": { "target_format": "openai", "text_template": "[[MSG]]" }, "in": { "depends": "n1.done" } }, { "id": "n3", "type": "Return", "config": { "target_format": "openai", # Две формы: полная от n1.vars.MSG и короткая от n2 => [[OUT2]] "text_template": "[[OUT:n1.vars.MSG]] + [[OUT2]]" }, "in": { "depends": "n2.done" } } ] } out = await PipelineExecutor(p).run(_ctx()) print("OUT:", _pp(out)) async def scenario_store_macros_two_runs(): print("\n=== MACROS 4: [[STORE:key]] и {{ STORE.key }} между запусками (clear_var_store=False) ===") pid = "p_macros_4_store" # начинаем с чистого стора clear_var_store(pid) p = { "id": pid, "name": "STORE across runs", "loop_mode": "iterative", "clear_var_store": False, # критично: не очищать между запусками "nodes": [ { "id": "n1", "type": "SetVars", "config": { "variables": [ {"id": "v1", "name": "KEEP", "mode": "string", "value": "persist-me"} ] }, "in": {} }, { "id": "n2", "type": "Return", "config": { "target_format": "openai", "text_template": "first-run" }, "in": { "depends": "n1.done" } } ] } # Первый запуск — кладём KEEP в STORE out1 = await PipelineExecutor(p).run(_ctx()) print("RUN1:", _pp(out1)) # Второй запуск — читаем из STORE через макросы p2 = { "id": pid, "name": "STORE read", "loop_mode": "dag", "clear_var_store": False, "nodes": [ { "id": "n1", "type": "Return", "config": { "target_format": "openai", "text_template": "[[STORE:KEEP]] | {{ STORE.KEEP }}" }, "in": {} } ] } out2 = await PipelineExecutor(p2).run(_ctx()) print("RUN2:", _pp(out2)) async def scenario_pm_prompt_blocks_to_provider_structs(): print("\n=== MACROS 5: Prompt Blocks ([[PROMPT]]) → provider-structures (OpenAI) ===") # Проверяем, что [[PROMPT]] со списком блоков превращается в "messages":[...] p = { "id": "p_macros_5", "name": "PROMPT OpenAI", "loop_mode": "dag", "nodes": [ { "id": "n1", "type": "ProviderCall", "config": { "provider": "openai", "provider_configs": { "openai": { "base_url": "https://api.openai.com", "endpoint": "/v1/chat/completions", "headers": "{\"Authorization\":\"Bearer TEST\"}", "template": "{\n \"model\": \"{{ model }}\",\n [[PROMPT]],\n \"temperature\": {{ params.temperature|default(0.7) }}\n}" } }, "blocks": [ {"id": "b1", "name": "sys", "role": "system", "prompt": "You are test", "enabled": True, "order": 0}, {"id": "b2", "name": "user", "role": "user", "prompt": "Say [[VAR:chat.last_user]]", "enabled": True, "order": 1} ] }, "in": {} } ] } # Сборка и запрос (ожидаемый 401), главное — валидное тело с messages:[...] out = await PipelineExecutor(p).run(_ctx()) print("OUT:", _pp(out)) def run_all(): async def main(): await scenario_bare_vars_and_braces() await scenario_var_path_and_defaults() await scenario_out_macros_full_and_short() await scenario_store_macros_two_runs() await scenario_pm_prompt_blocks_to_provider_structs() print("\n=== MACROS VARS SUITE: DONE ===") asyncio.run(main()) if __name__ == "__main__": run_all()