Files
HadTavern/agentui/pipeline/executor.py

2350 lines
104 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from typing import Any, Dict, List, Optional, Callable, Awaitable
from urllib.parse import urljoin
import json
import re
import asyncio
import time
import hashlib
from collections import deque
from agentui.providers.http_client import build_client
from agentui.common.vendors import detect_vendor
from agentui.pipeline.templating import (
_OUT_MACRO_RE,
_VAR_MACRO_RE,
_PROMPT_MACRO_RE,
_OUT_SHORT_RE,
_BRACES_RE,
_split_path,
_get_by_path,
_stringify_for_template,
_deep_find_text,
_best_text_from_outputs,
render_template_simple,
eval_condition_expr,
)
from agentui.pipeline.storage import load_var_store, save_var_store, clear_var_store
from agentui.common.cancel import is_cancelled, clear_cancel
# --- Templating helpers are imported from agentui.pipeline.templating ---
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
def _extract_out_node_id_from_ref(s: Any) -> Optional[str]:
"""
Извлекает node_id из строки с макросом [[OUT:nodeId(.path)*]].
Возвращает None, если макрос не найден.
"""
if not isinstance(s, str):
return None
m = _OUT_MACRO_RE.search(s)
if not m:
return None
body = m.group(1).strip()
node_id = body.split(".", 1)[0].strip()
return node_id or None
def _resolve_in_value(source: Any, context: Dict[str, Any], values: Dict[str, Dict[str, Any]]) -> Any:
"""
Разрешает входные связи/макросы в значение для inputs:
- Нестроковые значения возвращаются как есть
- "macro:path" → берёт значение из context по точечному пути
- "[[VAR:path]]" → берёт значение из context
- "[[OUT:nodeId(.path)*]]" → берёт из уже вычисленных выходов ноды
- "nodeId(.path)*" → ссылка на выходы ноды
- Если передан список ссылок — вернёт список разрешённых значений
- Иначе пытается взять из context по пути; если не найдено, оставляет исходную строку
"""
# Поддержка массивов ссылок (для multi-depends или будущих списковых входов)
if isinstance(source, list):
return [_resolve_in_value(s, context, values) for s in source]
if not isinstance(source, str):
return source
s = source.strip()
# macro:path
if s.lower().startswith("macro:"):
path = s.split(":", 1)[1].strip()
return _get_by_path(context, path)
# [[VAR: path]]
m = _VAR_MACRO_RE.fullmatch(s)
if m:
path = m.group(1).strip()
return _get_by_path(context, path)
# [[OUT: nodeId(.path)*]]
m = _OUT_MACRO_RE.fullmatch(s)
if m:
body = m.group(1).strip()
if "." in body:
node_id, rest = body.split(".", 1)
node_val = values.get(node_id)
return _get_by_path(node_val, rest)
node_val = values.get(body)
return node_val
# "nodeId(.path)*"
if "." in s:
node_id, rest = s.split(".", 1)
if node_id in values:
return _get_by_path(values.get(node_id), rest)
if s in values:
return values.get(s)
# fallback: from context by dotted path or raw string
ctx_val = _get_by_path(context, s)
return ctx_val if ctx_val is not None else source
# moved to agentui.pipeline.templating
class ExecutionError(Exception):
pass
class Node:
type_name: str = "Base"
def __init__(self, node_id: str, config: Optional[Dict[str, Any]] = None) -> None:
self.node_id = node_id
self.config = config or {}
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
"""Execute node with inputs and context. Return dict of outputs."""
raise NotImplementedError
# Регистрация поддерживаемых типов нод (минимальный набор)
NODE_REGISTRY: Dict[str, Any] = {}
class PipelineExecutor:
def __init__(self, pipeline: Dict[str, Any]) -> None:
self.pipeline = pipeline
self.nodes_by_id: Dict[str, Node] = {}
for n in pipeline.get("nodes", []):
node_cls = NODE_REGISTRY.get(n.get("type"))
if not node_cls:
raise ExecutionError(f"Unknown node type: {n.get('type')}")
self.nodes_by_id[n["id"]] = node_cls(n["id"], n.get("config", {}))
# Настройки режима исполнения (с дефолтами из default_pipeline)
try:
self.loop_mode = str(self.pipeline.get("loop_mode", "dag")).lower().strip() or "dag"
except Exception:
self.loop_mode = "dag"
try:
self.loop_max_iters = int(self.pipeline.get("loop_max_iters", 1000))
except Exception:
self.loop_max_iters = 1000
try:
self.loop_time_budget_ms = int(self.pipeline.get("loop_time_budget_ms", 10000))
except Exception:
self.loop_time_budget_ms = 10000
# Идентификатор пайплайна и политика очистки стора переменных
try:
self.pipeline_id = str(self.pipeline.get("id", "pipeline_editor")) or "pipeline_editor"
except Exception:
self.pipeline_id = "pipeline_editor"
try:
self.clear_var_store = bool(self.pipeline.get("clear_var_store", True))
except Exception:
self.clear_var_store = True
# В памяти держим актуальный STORE (доступен в шаблонах через {{ store.* }} и [[STORE:*]])
self._store: Dict[str, Any] = {}
# Локальный журнал выполнения для человекочитаемого трейсинга
self._exec_log: List[str] = []
async def run(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Точка входа исполнителя. Переключает режим между:
- "dag": волновое исполнение с барьерами (исходное поведение)
- "iterative": итеративное исполнение с очередью и гейтами
"""
# Инициализация STORE по политике
try:
if self.clear_var_store:
clear_var_store(self.pipeline_id)
self._store = {}
else:
self._store = load_var_store(self.pipeline_id) or {}
except Exception:
self._store = {}
# Перед стартом прогона сбрасываем возможный «старый» флаг отмены
try:
clear_cancel(self.pipeline_id)
except Exception:
pass
mode = (self.loop_mode or "dag").lower()
if mode == "iterative":
res = await self._run_iterative(context, trace)
else:
res = await self._run_dag(context, trace)
# На всякий случай финальная запись стора (если были изменения)
try:
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
return res
async def _run_dag(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Исходная волновая модель (DAG) — без изменений поведения.
"""
nodes: List[Dict[str, Any]] = list(self.pipeline.get("nodes", []))
id_set = set(self.nodes_by_id.keys())
# Собираем зависимости: node_id -> set(parent_ids), и обратные связи dependents
deps_map: Dict[str, set] = {n["id"]: set() for n in nodes}
dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
# Гейты для ветвлений: child_id -> List[(parent_id, gate_name)] (gate_name: "true"/"false")
gate_deps: Dict[str, List[tuple[str, str]]] = {n["id"]: [] for n in nodes}
for n in nodes:
nid = n["id"]
for _, source in (n.get("in") or {}).items():
# Разворачиваем массивы ссылок (multi-depends)
sources = source if isinstance(source, list) else [source]
for src in sources:
if not isinstance(src, str):
# Нестрочные значения и массивы констант — зависимостей нет
continue
if src.startswith("macro:"):
# Макросы берутся из контекста, без зависимостей
continue
# [[VAR:...]] — макрос из контекста, зависимостей нет
if re.fullmatch(r"\[\[\s*VAR\s*[:\s]\s*[^\]]+\s*\]\]", src.strip()):
continue
# [[OUT:nodeId(.key)*]] — зависимость от указанной ноды
out_ref_node = _extract_out_node_id_from_ref(src)
if out_ref_node and out_ref_node in id_set:
deps_map[nid].add(out_ref_node)
dependents[out_ref_node].add(nid)
continue
# Ссылки вида "node.outKey" или "node"
src_id = src.split(".", 1)[0] if "." in src else src
if src_id in id_set:
# Если указали конкретный outKey (например, nIf.true / nIf.false),
# трактуем это ТОЛЬКО как гейт (НЕ как топологическую зависимость), чтобы избежать дедлоков.
is_gate = False
if "." in src:
try:
_, out_name = src.split(".", 1)
on = str(out_name).strip().lower()
if on in {"true", "false"}:
gate_deps[nid].append((src_id, on))
is_gate = True
except Exception:
is_gate = False
if not is_gate:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Входящие степени и первая волна
in_degree: Dict[str, int] = {nid: len(deps) for nid, deps in deps_map.items()}
ready: List[str] = [nid for nid, deg in in_degree.items() if deg == 0]
processed: List[str] = []
values: Dict[str, Dict[str, Any]] = {}
last_result: Dict[str, Any] = {}
last_node_id: Optional[str] = None
node_def_by_id: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes}
user_vars: Dict[str, Any] = {}
try:
parallel_limit = int(self.pipeline.get("parallel_limit", 8))
except Exception:
parallel_limit = 8
if parallel_limit <= 0:
parallel_limit = 1
async def exec_one(node_id: str, values_snapshot: Dict[str, Any], wave_num: int) -> tuple[str, Dict[str, Any]]:
ndef = node_def_by_id.get(node_id)
if not ndef:
raise ExecutionError(f"Node definition not found: {node_id}")
node = self.nodes_by_id[node_id]
ctx = dict(context)
ctx["OUT"] = values_snapshot
try:
ctx["vars"] = dict(user_vars)
except Exception:
ctx["vars"] = {}
# STORE доступен в шаблонах
try:
ctx["store"] = dict(self._store)
except Exception:
ctx["store"] = {}
# Pipeline meta (includes http_timeout_sec) available to nodes via context.meta
try:
ctx["meta"] = {
"id": self.pipeline_id,
"loop_mode": self.loop_mode,
"loop_max_iters": self.loop_max_iters,
"loop_time_budget_ms": self.loop_time_budget_ms,
"clear_var_store": self.clear_var_store,
"http_timeout_sec": float(self.pipeline.get("http_timeout_sec", 60) or 60),
# v1: стратегия извлечения текста для [[OUTx]]
"text_extract_strategy": str(self.pipeline.get("text_extract_strategy", "auto") or "auto"),
"text_extract_json_path": str(self.pipeline.get("text_extract_json_path", "") or ""),
"text_join_sep": str(self.pipeline.get("text_join_sep", "\n") or "\n"),
# v2: коллекция пресетов парсинга для выбора в нодах
"text_extract_presets": list(self.pipeline.get("text_extract_presets", []) or []),
}
except Exception:
pass
# Прокидываем traceфункцию в контекст, чтобы ноды могли посылать события (SSE лог)
try:
if trace is not None:
ctx["_trace"] = trace
except Exception:
pass
inputs: Dict[str, Any] = {}
for name, source in (ndef.get("in") or {}).items():
if isinstance(source, list):
inputs[name] = [_resolve_in_value(s, ctx, values_snapshot) for s in source]
else:
inputs[name] = _resolve_in_value(source, ctx, values_snapshot)
if trace is not None:
try:
await trace({
"event": "node_start",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
})
except Exception:
pass
started = time.perf_counter()
try:
print(f"TRACE start: {ndef['id']} ({node.type_name})")
except Exception:
pass
try:
out = await node.run(inputs, ctx)
except Exception as exc:
if trace is not None:
try:
await trace({
"event": "node_error",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
"error": str(exc),
})
except Exception:
pass
raise
else:
dur_ms = int((time.perf_counter() - started) * 1000)
if trace is not None:
try:
await trace({
"event": "node_done",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
"duration_ms": dur_ms,
})
except Exception:
pass
try:
prev_text = ""
try:
from agentui.pipeline.templating import _best_text_from_outputs as _bt # type: ignore
if isinstance(out, dict):
prev_text = _bt(out) or ""
except Exception:
prev_text = ""
if isinstance(prev_text, str) and len(prev_text) > 200:
prev_text = prev_text[:200]
print(f"TRACE done: {ndef['id']} ({node.type_name}) dur={dur_ms}ms text={prev_text!r}")
except Exception:
pass
return node_id, out
wave_idx = 0
while ready:
# Ручная отмена исполнения (DAG)
try:
if is_cancelled(self.pipeline_id):
if trace is not None:
try:
await trace({
"event": "cancelled",
"node_id": "",
"node_type": "Pipeline",
"wave": wave_idx,
"ts": int(time.time() * 1000),
})
except Exception:
pass
# Снимок и финальный EXEC TRACE
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary and isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
return last_result
except Exception:
pass
wave_nodes = list(ready)
ready = []
wave_results: Dict[str, Dict[str, Any]] = {}
values_snapshot = dict(values)
for i in range(0, len(wave_nodes), parallel_limit):
chunk = wave_nodes[i : i + parallel_limit]
results = await asyncio.gather(
*(exec_one(nid, values_snapshot, wave_idx) for nid in chunk),
return_exceptions=False,
)
for nid, out in results:
wave_results[nid] = out
last_result = out
last_node_id = nid
try:
ndef = node_def_by_id.get(nid) or {}
ntype = ndef.get("type", "")
if ntype == "If":
expr = str((ndef.get("config") or {}).get("expr", ""))
resb = False
try:
if isinstance(out, dict):
resb = bool(out.get("result"))
except Exception:
resb = False
self._exec_log.append(f"If (#{nid}) {expr} => {str(resb).lower()}")
else:
self._exec_log.append(f"{nid}({ntype})")
except Exception:
pass
values.update(wave_results)
processed.extend(wave_nodes)
try:
for _nid, out in wave_results.items():
if isinstance(out, dict):
v = out.get("vars")
if isinstance(v, dict):
user_vars.update(v)
except Exception:
pass
# Обновляем и сохраняем STORE, если были новые переменные из SetVars/прочих нод
try:
merged: Dict[str, Any] = {}
for _nid, out in wave_results.items():
if isinstance(out, dict) and isinstance(out.get("vars"), dict):
merged.update(out.get("vars") or {})
if merged:
self._store.update(merged)
try:
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
for done in wave_nodes:
for child in dependents.get(done, ()):
in_degree[child] -= 1
next_ready_candidates = [nid for nid, deg in in_degree.items() if deg == 0 and nid not in processed and nid not in wave_nodes]
def _gates_ok(child_id: str) -> bool:
pairs = gate_deps.get(child_id) or []
if not pairs:
return True
missing_seen = False
for pid, gate in pairs:
v = values.get(pid)
if v is None:
# значение гейта ещё не известно
missing_seen = True
continue
try:
flag = bool(isinstance(v, dict) and v.get(gate))
except Exception:
flag = False
if not flag:
return False
# Семантика "missing gate = pass" как в iterative-режиме:
# если флаги гейта ещё не известны, но у ребёнка есть реальные родители — допускаем первый запуск.
# если реальных родителей нет (только гейты) — ждём появления значения гейта.
if missing_seen:
real_parents = deps_map.get(child_id) or set()
if len(real_parents) == 0:
return False
return True
ready = [nid for nid in next_ready_candidates if _gates_ok(nid)]
wave_idx += 1
if len(processed) != len(nodes):
remaining = [n["id"] for n in nodes if n["id"] not in processed]
raise ExecutionError(f"Cycle detected or unresolved dependencies among nodes: {remaining}")
# Persist snapshot of macro context and outputs into STORE
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Построение и сохранение человекочитаемого EXECUTION TRACE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
async def _run_iterative(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Итеративное исполнение:
- Очередь готовых задач
- Повторные пуски допустимы: ребёнок ставится в очередь, когда какой-либо из его родителей обновился после последнего запуска ребёнка
- Гейты If.true/If.false фильтруют постановку в очередь
- Ограничения: loop_max_iters, loop_time_budget_ms
"""
nodes: List[Dict[str, Any]] = list(self.pipeline.get("nodes", []))
id_set = set(self.nodes_by_id.keys())
# Строим зависимости и гейты (как в DAG)
deps_map: Dict[str, set] = {n["id"]: set() for n in nodes}
dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
gate_deps: Dict[str, List[tuple[str, str]]] = {n["id"]: [] for n in nodes}
# Обратная карта для гейтов: кто «слушает» изменения конкретного родителя по гейту
gate_dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
for n in nodes:
nid = n["id"]
for _, source in (n.get("in") or {}).items():
sources = source if isinstance(source, list) else [source]
for src in sources:
if not isinstance(src, str):
continue
if src.startswith("macro:"):
continue
if re.fullmatch(r"\[\[\s*VAR\s*[:\s]\s*[^\]]+\s*\]\]", src.strip()):
continue
out_ref_node = _extract_out_node_id_from_ref(src)
if out_ref_node and out_ref_node in id_set:
# [[OUT:nodeId(.key)*]] — это РЕАЛЬНАЯ топологическая зависимость
deps_map[nid].add(out_ref_node)
dependents[out_ref_node].add(nid)
continue
src_id = src.split(".", 1)[0] if "." in src else src
if src_id in id_set:
# "node.true/false" — ТОЛЬКО гейт (без топологической зависимости)
is_gate = False
if "." in src:
try:
_, out_name = src.split(".", 1)
on = str(out_name).strip().lower()
if on in {"true", "false"}:
gate_deps[nid].append((src_id, on))
# Регистрируем обратную зависимость по гейту,
# чтобы будить ребёнка при изменении флага у родителя
gate_dependents[src_id].add(nid)
is_gate = True
except Exception:
is_gate = False
if not is_gate:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Вспом. структуры состояния
values: Dict[str, Dict[str, Any]] = {}
last_result: Dict[str, Any] = {}
last_node_id: Optional[str] = None
user_vars: Dict[str, Any] = {}
node_def_by_id: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes}
# Версии обновлений нод (инкремент при каждом коммите)
version: Dict[str, int] = {n["id"]: 0 for n in nodes}
# Когда ребёнок запускался в последний раз (номер глобального шага)
last_run_step: Dict[str, int] = {n["id"]: -1 for n in nodes}
# Глобальный счётчик шагов
step = 0
# Очередь готовых задач
q: deque[str] = deque()
# Начальные готовые — ноды без зависимостей
in_degree: Dict[str, int] = {nid: len(deps) for nid, deps in deps_map.items()}
for nid, deg in in_degree.items():
if deg == 0:
q.append(nid)
def gates_ok(child_id: str) -> bool:
pairs = gate_deps.get(child_id) or []
if not pairs:
return True
missing_seen = False
for pid, gate in pairs:
v = values.get(pid)
# Если значение гейта ещё не появилось — отмечаем и продолжаем проверку других гейтов
if v is None:
missing_seen = True
continue
try:
flag = bool(isinstance(v, dict) and v.get(gate))
except Exception:
flag = False
if not flag:
return False
# Если были отсутствующие значения гейтов, то:
# - у узла есть реальные (топологические) родители → не блокируем (первый запуск допустим)
# - у узла НЕТ реальных родителей (только гейты) → откладываем до появления значения гейта
if missing_seen:
real_parents = deps_map.get(child_id) or set()
if len(real_parents) == 0:
return False
return True
# Лимиты
total_runs = 0
t0 = time.perf_counter()
try:
parallel_limit = int(self.pipeline.get("parallel_limit", 8))
except Exception:
parallel_limit = 8
if parallel_limit <= 0:
parallel_limit = 1
# Асинхронный запуск одной ноды (снимок OUT на момент dequeue)
async def exec_one(node_id: str, snapshot: Dict[str, Any], wave_num: int) -> tuple[str, Dict[str, Any]]:
ndef = node_def_by_id.get(node_id)
if not ndef:
raise ExecutionError(f"Node definition not found: {node_id}")
node = self.nodes_by_id[node_id]
ctx = dict(context)
ctx["OUT"] = snapshot
try:
ctx["vars"] = dict(user_vars)
except Exception:
ctx["vars"] = {}
# STORE доступен в шаблонах
try:
ctx["store"] = dict(self._store)
except Exception:
ctx["store"] = {}
# Pipeline meta (includes http_timeout_sec) available to nodes via context.meta
try:
ctx["meta"] = {
"id": self.pipeline_id,
"loop_mode": self.loop_mode,
"loop_max_iters": self.loop_max_iters,
"loop_time_budget_ms": self.loop_time_budget_ms,
"clear_var_store": self.clear_var_store,
"http_timeout_sec": float(self.pipeline.get("http_timeout_sec", 60) or 60),
# v1: стратегия извлечения текста для [[OUTx]]
"text_extract_strategy": str(self.pipeline.get("text_extract_strategy", "auto") or "auto"),
"text_extract_json_path": str(self.pipeline.get("text_extract_json_path", "") or ""),
"text_join_sep": str(self.pipeline.get("text_join_sep", "\n") or "\n"),
# v2: коллекция пресетов парсинга для выбора в нодах
"text_extract_presets": list(self.pipeline.get("text_extract_presets", []) or []),
}
except Exception:
pass
# Прокидываем traceфункцию в контекст, чтобы ноды могли посылать события (SSE лог)
try:
if trace is not None:
ctx["_trace"] = trace
except Exception:
pass
inputs: Dict[str, Any] = {}
for name, source in (ndef.get("in") or {}).items():
if isinstance(source, list):
inputs[name] = [_resolve_in_value(s, ctx, snapshot) for s in source]
else:
inputs[name] = _resolve_in_value(source, ctx, snapshot)
if trace is not None:
try:
await trace({
"event": "node_start",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
})
except Exception:
pass
started = time.perf_counter()
try:
print(f"TRACE start: {ndef['id']} ({node.type_name})")
except Exception:
pass
try:
out = await node.run(inputs, ctx)
except Exception as exc:
if trace is not None:
try:
await trace({
"event": "node_error",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
"error": str(exc),
})
except Exception:
pass
raise
else:
dur_ms = int((time.perf_counter() - started) * 1000)
if trace is not None:
try:
await trace({
"event": "node_done",
"node_id": ndef["id"],
"node_type": node.type_name,
"wave": wave_num,
"ts": int(time.time() * 1000),
"duration_ms": dur_ms,
})
except Exception:
pass
try:
prev_text = ""
try:
from agentui.pipeline.templating import _best_text_from_outputs as _bt # type: ignore
if isinstance(out, dict):
prev_text = _bt(out) or ""
except Exception:
prev_text = ""
if isinstance(prev_text, str) and len(prev_text) > 200:
prev_text = prev_text[:200]
print(f"TRACE done: {ndef['id']} ({node.type_name}) dur={dur_ms}ms text={prev_text!r}")
except Exception:
pass
return node_id, out
# Главный цикл
while q:
# Ручная отмена исполнения (iterative)
try:
if is_cancelled(self.pipeline_id):
if trace is not None:
try:
await trace({
"event": "cancelled",
"node_id": "",
"node_type": "Pipeline",
"wave": step,
"ts": int(time.time() * 1000),
})
except Exception:
pass
# Снимок и финальный EXEC TRACE
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary and isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
return last_result
except Exception:
pass
# Проверяем лимиты
if total_runs >= self.loop_max_iters:
raise ExecutionError(f"Iterative mode exceeded loop_max_iters={self.loop_max_iters}")
if (time.perf_counter() - t0) * 1000.0 > self.loop_time_budget_ms:
raise ExecutionError(f"Iterative mode exceeded loop_time_budget_ms={self.loop_time_budget_ms}")
# Собираем партию до parallel_limit, с учётом гейтов.
# ВАЖНО: защищаемся от «вечной карусели» в пределах одной партии.
# Обрабатываем не более исходной длины очереди за проход.
batch: List[str] = []
seen: set[str] = set()
spin_guard = len(q)
processed_in_pass = 0
while q and len(batch) < parallel_limit and processed_in_pass < spin_guard:
nid = q.popleft()
processed_in_pass += 1
if nid in seen:
# Уже добавлен в партию — пропускаем без возврата в очередь
continue
# Должны быть удовлетворены зависимости: или нет родителей, или все родители хотя бы раз исполнились
parents = deps_map.get(nid) or set()
deps_ok = all(version[p] > 0 or in_degree.get(nid, 0) == 0 for p in parents) or (len(parents) == 0)
if not deps_ok:
# отложим до лучших времён
q.append(nid)
continue
if not gates_ok(nid):
# гейты пока не открыты — попробуем позже
q.append(nid)
continue
batch.append(nid)
seen.add(nid)
if not batch:
# Очередь «застряла» — либо циклические ожидания без начальных значений, либо гейт не откроется.
# Безопасно выходим.
break
# Снимок на момент партии
snapshot = dict(values)
results = await asyncio.gather(*(exec_one(nid, snapshot, step) for nid in batch), return_exceptions=False)
# Коммитим результаты немедленно и продвигаем версии
produced_vars: Dict[str, Any] = {}
for nid, out in results:
values[nid] = out
version[nid] = version.get(nid, 0) + 1
last_result = out
last_node_id = nid
total_runs += 1
last_run_step[nid] = step
# Человекочитаемый журнал
try:
ndef = node_def_by_id.get(nid) or {}
ntype = ndef.get("type", "")
if ntype == "If":
expr = str((ndef.get("config") or {}).get("expr", ""))
resb = False
try:
if isinstance(out, dict):
resb = bool(out.get("result"))
except Exception:
resb = False
self._exec_log.append(f"If (#{nid}) {expr} => {str(resb).lower()}")
else:
self._exec_log.append(f"{nid}({ntype})")
except Exception:
pass
# Сбор пользовательских переменных
try:
if isinstance(out, dict) and isinstance(out.get("vars"), dict):
produced_vars.update(out.get("vars") or {})
except Exception:
pass
if produced_vars:
user_vars.update(produced_vars)
# Обновляем STORE и сохраняем на диск
try:
self._store.update(produced_vars)
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
# Постановка потомков, у кого изменился хотя бы один родитель
for nid in batch:
# Топологические потомки (реальные зависимости): будим всегда
for child in dependents.get(nid, ()):
if last_run_step.get(child, -1) < step:
q.append(child)
# Потомки по гейтам: будим ТОЛЬКО выбранную ветку
# Логика: если у gchild есть пара gate_deps с текущим nid и нужным gate ('true'/'false'),
# то будим только если соответствующий флаг у родителя истинный. Иначе — не будим.
g_out = values.get(nid) or {}
g_children = gate_dependents.get(nid, ())
for gchild in g_children:
if last_run_step.get(gchild, -1) >= step:
continue
pairs = [p for p in (gate_deps.get(gchild) or []) if p and p[0] == nid]
if not pairs:
# gchild зарегистрирован как зависимый по гейту, но явной пары нет — пропускаем
continue
wake = False
for (_pid, gate_name) in pairs:
try:
want = str(gate_name or "").strip().lower()
if want in ("true", "false"):
if bool(g_out.get(want, False)):
wake = True
break
except Exception:
pass
if wake:
q.append(gchild)
try:
print(f"TRACE wake_by_gate: parent={nid} -> child={gchild}")
except Exception:
pass
# Ранний выход, если встретили Return среди выполненных
try:
for nid in batch:
ndef = node_def_by_id.get(nid) or {}
if (ndef.get("type") or "") == "Return":
# Зафиксируем, что именно Return завершил выполнение
try:
last_node_id = nid
last_result = values.get(nid) or last_result
except Exception:
pass
# Снимок стора до выхода
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Финальный EXECUTION TRACE в лог и STORE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
except Exception:
pass
step += 1
# Финальный снапшот всего контекста и OUT в STORE.snapshot
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Построение и сохранение человекочитаемого EXECUTION TRACE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
def _safe_preview(self, obj: Any, max_bytes: int = 262_144) -> Any:
"""
Вернёт объект как есть, если его JSON-представление укладывается в лимит.
Иначе — мета-объект с пометкой о тримминге и SHA256 + текстовый превью.
"""
try:
s = json.dumps(obj, ensure_ascii=False)
except Exception:
try:
s = str(obj)
except Exception:
s = "<unrepresentable>"
b = s.encode("utf-8", errors="ignore")
if len(b) <= max_bytes:
return obj
sha = hashlib.sha256(b).hexdigest()
preview = b[:max_bytes].decode("utf-8", errors="ignore")
return {
"__truncated__": True,
"sha256": sha,
"preview": preview,
}
def _commit_snapshot(self, context: Dict[str, Any], values: Dict[str, Any], last_node_id: str) -> None:
"""
Собирает STORE.snapshot:
- incoming.*, params.*, model, vendor_format, system
- OUT_TEXT.nX — строковая вытяжка (до 16КБ)
- OUT.nX — сырой JSON (с триммингом до ~256КБ через preview)
- Алиасы OUT1/OUT2/... — плоские строки (то же, что [[OUT1]])
- LAST_NODE — последний выполнившийся узел
"""
# OUT_TEXT / OUT
out_text: Dict[str, str] = {}
out_raw: Dict[str, Any] = {}
aliases: Dict[str, Any] = {}
for nid, out in (values or {}).items():
if out is None:
continue
try:
txt = _best_text_from_outputs(out) # type: ignore[name-defined]
except Exception:
txt = ""
if not isinstance(txt, str):
try:
txt = str(txt)
except Exception:
txt = ""
if len(txt) > 16_384:
txt = txt[:16_384]
out_text[nid] = txt
out_raw[nid] = self._safe_preview(out, 262_144)
m = re.match(r"^n(\d+)$", str(nid))
if m:
aliases[f"OUT{int(m.group(1))}"] = txt
snapshot: Dict[str, Any] = {
"incoming": self._safe_preview(context.get("incoming"), 262_144),
"params": self._safe_preview(context.get("params"), 65_536),
"model": context.get("model"),
"vendor_format": context.get("vendor_format"),
"system": context.get("system") or "",
"OUT": out_raw,
"OUT_TEXT": out_text,
"LAST_NODE": last_node_id or "",
**aliases,
}
# Сохраняем под ключом snapshot, пользовательские vars остаются как есть в корне STORE
try:
if not isinstance(self._store, dict):
self._store = {}
self._store["snapshot"] = snapshot
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
class SetVarsNode(Node):
type_name = "SetVars"
def _normalize(self) -> List[Dict[str, Any]]:
raw = self.config.get("variables") or []
if not isinstance(raw, list):
return []
norm: List[Dict[str, Any]] = []
for i, b in enumerate(raw):
if not isinstance(b, dict):
continue
name = str(b.get("name", "")).strip()
mode = str(b.get("mode", "string")).lower().strip()
value = b.get("value", "")
try:
order = int(b.get("order")) if b.get("order") is not None else i
except Exception:
order = i
norm.append({
"id": b.get("id") or f"v{i}",
"name": name,
"mode": "expr" if mode == "expr" else "string",
"value": value,
"order": order,
})
return norm
def _safe_eval_expr(self, expr: str) -> Any:
"""
Безопасная оценка выражений для SetVars.
Поддержка:
- Литералы: числа/строки/bool/None, списки, кортежи, словари
- JSONлитералы: true/false/null, объекты и массивы (парсятся как Python True/False/None, dict/list)
- Арифметика: + - * / // %, унарные +-
- Логика: and/or, сравнения (== != < <= > >=, цепочки)
- Безопасные функции: rand(), randint(a,b), choice(list)
Запрещено: имя/атрибуты/индексация/условные/импорты/прочие вызовы функций.
"""
import ast
import operator as op
import random
# 0) Попытаться распознать чистый JSONлитерал (включая true/false/null, объекты/массивы/числа/строки).
# Это не вмешивается в математику: для выражений вида "1+2" json.loads бросит исключение и мы пойдём в AST.
try:
s = str(expr).strip()
return json.loads(s)
except Exception:
pass
allowed_bin = {
ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul, ast.Div: op.truediv,
ast.FloorDiv: op.floordiv, ast.Mod: op.mod,
}
allowed_unary = {ast.UAdd: lambda x: +x, ast.USub: lambda x: -x}
allowed_cmp = {
ast.Eq: op.eq, ast.NotEq: op.ne, ast.Lt: op.lt, ast.LtE: op.le, ast.Gt: op.gt, ast.GtE: op.ge,
}
def eval_node(node: ast.AST) -> Any:
if isinstance(node, ast.Expression):
return eval_node(node.body)
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Tuple):
return tuple(eval_node(e) for e in node.elts)
if isinstance(node, ast.List):
return [eval_node(e) for e in node.elts]
if isinstance(node, ast.Dict):
return {eval_node(k): eval_node(v) for k, v in zip(node.keys, node.values)}
if isinstance(node, ast.UnaryOp) and type(node.op) in allowed_unary:
return allowed_unary[type(node.op)](eval_node(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in allowed_bin:
return allowed_bin[type(node.op)](eval_node(node.left), eval_node(node.right))
if isinstance(node, ast.BoolOp):
vals = [eval_node(v) for v in node.values]
if isinstance(node.op, ast.And):
res = True
for v in vals:
res = res and bool(v)
return res
if isinstance(node.op, ast.Or):
res = False
for v in vals:
res = res or bool(v)
return res
if isinstance(node, ast.Compare):
left = eval_node(node.left)
for opnode, comparator in zip(node.ops, node.comparators):
if type(opnode) not in allowed_cmp:
raise ExecutionError("Unsupported comparison operator")
right = eval_node(comparator)
if not allowed_cmp[type(opnode)](left, right):
return False
left = right
return True
# Разрешённые вызовы: rand(), randint(a,b), choice(list)
if isinstance(node, ast.Call):
# Никаких kwargs, *args
if node.keywords or isinstance(getattr(node, "starargs", None), ast.AST) or isinstance(getattr(node, "kwargs", None), ast.AST):
raise ExecutionError("Call with kwargs/starargs is not allowed")
fn = node.func
if not isinstance(fn, ast.Name):
raise ExecutionError("Only simple function calls are allowed")
name = fn.id
if name == "rand":
if len(node.args) != 0:
raise ExecutionError("rand() takes no arguments")
return random.random()
if name == "randint":
if len(node.args) != 2:
raise ExecutionError("randint(a,b) requires two arguments")
a = eval_node(node.args[0])
b = eval_node(node.args[1])
try:
return random.randint(int(a), int(b))
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"randint invalid arguments: {exc}")
if name == "choice":
if len(node.args) != 1:
raise ExecutionError("choice(list) requires one argument")
seq = eval_node(node.args[0])
if not isinstance(seq, (list, tuple)):
raise ExecutionError("choice() expects list or tuple")
if not seq:
raise ExecutionError("choice() on empty sequence")
return random.choice(seq)
raise ExecutionError(f"Function {name} is not allowed")
# Запрещаем всё остальное (Name/Attribute/Subscript/IfExp/Comprehensions и пр.)
raise ExecutionError("Expression not allowed")
try:
tree = ast.parse(str(expr), mode="eval")
except Exception as exc:
raise ExecutionError(f"SetVars expr parse error: {exc}") from exc
return eval_node(tree)
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
out_map = context.get("OUT") or {}
result: Dict[str, Any] = {}
import re as _re
for v in sorted(self._normalize(), key=lambda x: x.get("order", 0)):
name = v.get("name") or ""
if not _re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", name or ""):
raise ExecutionError(f"SetVars invalid variable name: {name!r}")
mode = v.get("mode", "string")
raw_val = v.get("value", "")
if mode == "expr":
resolved = self._safe_eval_expr(str(raw_val))
else:
resolved = render_template_simple(str(raw_val or ""), context, out_map)
result[name] = resolved
# Событие «vars_set» в SSEлог (+ превью значений)
try:
trace_fn = context.get("_trace")
if trace_fn:
def _pv(val: Any) -> str:
try:
if isinstance(val, str):
s = val
else:
s = json.dumps(val, ensure_ascii=False)
except Exception:
try:
s = str(val)
except Exception:
s = "<unrepresentable>"
s = s if isinstance(s, str) else str(s)
return (s[:300] + "") if len(s) > 300 else s
values_preview = {k: _pv(v) for k, v in (result or {}).items()}
await trace_fn({
"event": "vars_set",
"node_id": self.node_id,
"node_type": self.type_name,
"vars": list(result.keys()),
"count": len(result),
"values_preview": values_preview,
})
except Exception:
pass
return {"vars": result}
# --- v1: Нормализованный экстрактор текста для OUTx -----------------------------
def _json_path_extract(obj: Any, path: str) -> Any:
if not path:
return None
nodes = [obj]
for raw_seg in str(path).split("."):
seg = (raw_seg or "").strip()
if not seg:
continue
next_nodes = []
# Поддержка "*": разворачиваем dict.values() или элементы списка
if seg == "*":
for n in nodes:
if isinstance(n, dict):
next_nodes.extend(list(n.values()))
elif isinstance(n, list):
next_nodes.extend(list(n))
nodes = next_nodes
if not nodes:
break
continue
# Числовой индекс для списков
idx = None
try:
idx = int(seg)
except Exception:
idx = None
for n in nodes:
if idx is not None and isinstance(n, list):
if 0 <= idx < len(n):
next_nodes.append(n[idx])
elif idx is None and isinstance(n, dict):
if seg in n:
next_nodes.append(n[seg])
nodes = next_nodes
if not nodes:
break
if not nodes:
return None
return nodes if len(nodes) > 1 else nodes[0]
def _stringify_join(values: Any, join_sep: str = "\n") -> str:
def _flatten(x):
if isinstance(x, list):
for it in x:
yield from _flatten(it)
else:
yield x
arr = list(_flatten(values if isinstance(values, list) else [values]))
out: List[str] = []
for v in arr:
if isinstance(v, str):
if v:
out.append(v)
elif isinstance(v, (dict, list)):
try:
t = _deep_find_text(v) # type: ignore[name-defined]
if isinstance(t, str) and t:
out.append(t)
except Exception:
pass
else:
try:
sv = str(v)
if sv:
out.append(sv)
except Exception:
pass
if not out:
return ""
return (join_sep or "\n").join(out)
def _extract_text_for_out(data: Any, strategy: str = "auto", provider_hint: str = "", json_path: str = "", join_sep: str = "\n") -> str:
s = (strategy or "auto").lower().strip() or "auto"
prov = (provider_hint or "").lower().strip()
# Принудительные стратегии
if s == "deep":
try:
t = _deep_find_text(data) # type: ignore[name-defined]
return t if isinstance(t, str) else (str(t) if t is not None else "")
except Exception:
return ""
if s == "jsonpath":
try:
vals = _json_path_extract(data, json_path or "")
return _stringify_join(vals, join_sep)
except Exception:
return ""
if s in {"openai", "gemini", "claude"}:
try:
if s == "openai" and isinstance(data, dict):
msg = (data.get("choices") or [{}])[0].get("message") or {}
c = msg.get("content")
return c if isinstance(c, str) else (str(c) if c is not None else "")
if s == "gemini" and isinstance(data, dict):
cand0 = (data.get("candidates") or [{}])[0]
content = cand0.get("content") or {}
parts0 = (content.get("parts") or [{}])[0]
t = parts0.get("text")
return t if isinstance(t, str) else (str(t) if t is not None else "")
if s == "claude" and isinstance(data, dict):
blocks = data.get("content") or []
texts = [b.get("text") for b in blocks if isinstance(b, dict) and isinstance(b.get("text"), str)]
if texts:
return "\n".join(texts)
return ""
except Exception:
return ""
return ""
# auto: если есть подсказка — пробуем сначала её ветку, потом общий
if prov in {"openai", "gemini", "claude"}:
forced = _extract_text_for_out(data, prov, prov, json_path, join_sep)
if forced:
return forced
# Универсальный best-effort: знает openai/gemini/claude
try:
t = _best_text_from_outputs({"result": data}) # type: ignore[name-defined]
if isinstance(t, str) and t:
return t
if t is not None:
t2 = str(t)
if t2:
return t2
except Exception:
pass
# Последний шанс — глубокий поиск
try:
t = _deep_find_text(data) # type: ignore[name-defined]
return t if isinstance(t, str) else (str(t) if t is not None else "")
except Exception:
return ""
class ProviderCallNode(Node):
type_name = "ProviderCall"
# --- Prompt Manager helpers -------------------------------------------------
def _get_blocks(self) -> List[Dict[str, Any]]:
"""Return normalized list of prompt blocks from config."""
raw = self.config.get("blocks") or self.config.get("prompt_blocks") or []
if not isinstance(raw, list):
return []
norm: List[Dict[str, Any]] = []
for i, b in enumerate(raw):
if not isinstance(b, dict):
continue
role = str(b.get("role", "user")).lower().strip()
if role not in {"system", "user", "assistant", "tool"}:
role = "user"
# order fallback: keep original index if not provided/correct
try:
order = int(b.get("order")) if b.get("order") is not None else i
except Exception: # noqa: BLE001
order = i
norm.append(
{
"id": b.get("id") or f"b{i}",
"name": b.get("name") or f"Block {i+1}",
"role": role,
"prompt": b.get("prompt") or "",
"enabled": bool(b.get("enabled", True)),
"order": order,
}
)
return norm
def _render_blocks_to_unified(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Filter+sort+render blocks to unified messages:
[{role, content, name?}]
"""
out_map = context.get("OUT") or {}
blocks = [b for b in self._get_blocks() if b.get("enabled", True)]
blocks.sort(key=lambda x: x.get("order", 0))
messages: List[Dict[str, Any]] = []
for b in blocks:
content = render_template_simple(str(b.get("prompt") or ""), context, out_map)
# name поля блоков не передаются в провайдерские payload'ы
msg = {"role": b["role"], "content": content}
messages.append(msg)
return messages
def _messages_to_payload(self, provider: str, messages: List[Dict[str, Any]], context: Dict[str, Any]) -> Dict[str, Any]:
"""Convert unified messages to provider-specific request payload."""
params = context.get("params") or {}
model = context.get("model") or ""
if provider == "openai":
payload: Dict[str, Any] = {
"model": model,
"messages": [
{"role": m["role"], "content": m["content"]}
for m in messages
],
"temperature": params.get("temperature", 0.7),
}
if params.get("max_tokens") is not None:
payload["max_tokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
payload["top_p"] = params.get("top_p")
if params.get("stop") is not None:
payload["stop"] = params.get("stop")
return payload
if provider == "gemini":
sys_text = "\n\n".join([m["content"] for m in messages if m["role"] == "system"]).strip()
contents = []
for m in messages:
if m["role"] == "system":
continue
role = "model" if m["role"] == "assistant" else "user"
contents.append({"role": role, "parts": [{"text": m["content"]}]})
gen_cfg: Dict[str, Any] = {}
if params.get("temperature") is not None:
gen_cfg["temperature"] = params.get("temperature")
if params.get("max_tokens") is not None:
gen_cfg["maxOutputTokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
gen_cfg["topP"] = params.get("top_p")
if params.get("stop") is not None:
gen_cfg["stopSequences"] = params.get("stop")
payload = {"model": model, "contents": contents}
if sys_text:
payload["systemInstruction"] = {"parts": [{"text": sys_text}]}
if gen_cfg:
payload["generationConfig"] = gen_cfg
return payload
if provider == "claude":
sys_text = "\n\n".join([m["content"] for m in messages if m["role"] == "system"]).strip()
msgs = []
for m in messages:
if m["role"] == "system":
continue
role = m["role"] if m["role"] in {"user", "assistant"} else "user"
msgs.append({"role": role, "content": [{"type": "text", "text": m["content"]}]})
payload: Dict[str, Any] = {
"model": model,
"messages": msgs,
}
if sys_text:
payload["system"] = sys_text
if params.get("temperature") is not None:
payload["temperature"] = params.get("temperature")
if params.get("max_tokens") is not None:
payload["max_tokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
payload["top_p"] = params.get("top_p")
if params.get("stop") is not None:
payload["stop"] = params.get("stop")
return payload
return {}
def _blocks_struct_for_template(self, provider: str, messages: List[Dict[str, Any]], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Сформировать структуру для вставки в шаблон (template) из Prompt Blocks.
Возвращает provider-специфичные ключи, которые можно вставлять в JSON:
- openai: { "messages": [...] , "system_text": "..." }
- gemini: { "contents": [...], "systemInstruction": {...}, "system_text": "..." }
- claude: { "system_text": "...", "system": "...", "messages": [...] }
"""
provider = (provider or "openai").lower()
# Гарантируем список
msgs = messages or []
if provider == "openai":
# Уже в формате {"role","content"}
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
# Вставляем как есть (editor будет встраивать JSON массива без кавычек)
return {
"messages": [
{"role": m["role"], "content": m.get("content")}
for m in msgs
],
"system_text": sys_text,
}
if provider == "gemini":
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
contents = []
for m in msgs:
if m.get("role") == "system":
continue
role = "model" if m.get("role") == "assistant" else "user"
contents.append({"role": role, "parts": [{"text": str(m.get("content") or "")}]})
d: Dict[str, Any] = {
"contents": contents,
"system_text": sys_text,
}
if sys_text:
d["systemInstruction"] = {"parts": [{"text": sys_text}]}
return d
if provider == "claude":
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
out_msgs = []
for m in msgs:
if m.get("role") == "system":
continue
role = m.get("role")
role = role if role in {"user", "assistant"} else "user"
out_msgs.append({"role": role, "content": [{"type": "text", "text": str(m.get("content") or "")}]})
return {
"system_text": sys_text,
"system": sys_text, # удобно для шаблона: "system": "{{ pm.system_text }}"
"messages": out_msgs,
}
# По умолчанию ничего, но это валидный JSON
return {"messages": []}
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
provider = (self.config.get("provider") or "openai").lower()
# Optional sleep before execution (UI-configured, milliseconds)
try:
sleep_ms = int(self.config.get("sleep_ms") or 0)
except Exception:
sleep_ms = 0
if sleep_ms > 0:
# Emit SSE event to highlight node as "sleeping" on UI
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "node_sleep",
"node_id": self.node_id,
"node_type": self.type_name,
"sleep_ms": int(sleep_ms),
"ts": int(time.time() * 1000),
})
except Exception:
pass
# Non-blocking pause
try:
await asyncio.sleep(max(0.0, sleep_ms / 1000.0))
except Exception:
pass
# Support provider-specific configs stored in UI as provider_configs.{provider}
prov_cfg: Dict[str, Any] = {}
try:
cfgs = self.config.get("provider_configs") or {}
if isinstance(cfgs, dict):
prov_cfg = cfgs.get(provider) or {}
except Exception: # noqa: BLE001
prov_cfg = {}
base_url = prov_cfg.get("base_url") or self.config.get("base_url")
if not base_url:
raise ExecutionError(f"Node {self.node_id} ({self.type_name}) requires 'base_url' in config")
if not str(base_url).startswith(("http://", "https://")):
base_url = "http://" + str(base_url)
endpoint_tmpl: str = prov_cfg.get("endpoint") or self.config.get("endpoint") or ""
template: str = prov_cfg.get("template") or self.config.get("template") or "{}"
headers_json: str = prov_cfg.get("headers") or self.config.get("headers") or "{}"
# Default endpoints if not set
if not endpoint_tmpl:
if provider == "openai":
endpoint_tmpl = "/v1/chat/completions"
elif provider == "gemini":
endpoint_tmpl = "/v1beta/models/{{ model }}:generateContent"
elif provider == "claude":
endpoint_tmpl = "/v1/messages"
# Подготовим Prompt Blocks + pm-структуру для шаблона
unified_msgs = self._render_blocks_to_unified(context)
pm_struct = self._blocks_struct_for_template(provider, unified_msgs, context)
# Расширяем контекст для рендера шаблонов
render_ctx = dict(context)
render_ctx["pm"] = pm_struct
# Единый JSON-фрагмент PROMPT для шаблонов: [[PROMPT]]
prompt_fragment = ""
try:
if provider == "openai":
prompt_fragment = '"messages": ' + json.dumps(pm_struct.get("messages", []), ensure_ascii=False)
elif provider == "gemini":
parts = []
contents = pm_struct.get("contents")
if contents is not None:
parts.append('"contents": ' + json.dumps(contents, ensure_ascii=False))
sysi = pm_struct.get("systemInstruction")
# даже если пустой объект {}, это валидно
if sysi is not None:
parts.append('"systemInstruction": ' + json.dumps(sysi, ensure_ascii=False))
prompt_fragment = ", ".join(parts)
elif provider == "claude":
parts = []
sys_text = pm_struct.get("system_text") or pm_struct.get("system")
if sys_text is not None:
parts.append('"system": ' + json.dumps(sys_text, ensure_ascii=False))
msgs = pm_struct.get("messages")
if msgs is not None:
parts.append('"messages": ' + json.dumps(msgs, ensure_ascii=False))
prompt_fragment = ", ".join(parts)
except Exception: # noqa: BLE001
prompt_fragment = ""
render_ctx["PROMPT"] = prompt_fragment
# Render helper с поддержкой [[VAR]], [[OUT]] и {{ ... }}
def render(s: str) -> str:
return render_template_simple(s or "", render_ctx, render_ctx.get("OUT") or {})
# Рендер endpoint с макросами/шаблонами
endpoint = render(endpoint_tmpl)
# Формируем тело ТОЛЬКО из template/[[PROMPT]] (без сырого payload/входов).
# Больше НИКАКОГО фоллбэка на unified-построение: если шаблон невалиден — это ошибка ноды.
try:
rendered = render(template)
# DEBUG: печать отрендеренного шаблона с номерами строк для точной диагностики JSONDecodeError
try:
_lines = rendered.splitlines()
_preview = "\n".join(f"{i+1:03d}: {_lines[i]}" for i in range(min(len(_lines), 120)))
print(f"DEBUG: ProviderCallNode rendered_template node={self.node_id} provider={provider}\\n{_preview}")
except Exception:
try:
print(f"DEBUG: ProviderCallNode rendered_template(node={self.node_id}, provider={provider}) len={len(rendered)}")
except Exception:
pass
payload = json.loads(rendered)
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"ProviderCall template invalid JSON: {exc}")
# Заголовки — полностью из редактируемого JSON с макросами
try:
headers_src = render(headers_json) if headers_json else "{}"
headers = json.loads(headers_src) if headers_src else {}
if not isinstance(headers, dict):
raise ValueError("headers must be a JSON object")
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"ProviderCall headers invalid JSON: {exc}")
# Итоговый URL
if not base_url.startswith(("http://", "https://")):
base_url = "http://" + base_url
url = endpoint if endpoint.startswith("http") else urljoin(base_url.rstrip('/') + '/', endpoint.lstrip('/'))
# Debug logs to validate config selection and payload
# Brute request/response logging (FULL, no masking)
try:
final_headers = {"Content-Type": "application/json", **headers}
print("===== ProviderCall REQUEST BEGIN =====")
print(f"node={self.node_id} type={self.type_name} provider={provider}")
print(f"URL: {url}")
try:
print("Headers:")
print(json.dumps(final_headers, ensure_ascii=False, indent=2))
except Exception:
print(f"Headers(raw): {final_headers}")
try:
print("Body JSON:")
print(json.dumps(payload, ensure_ascii=False, indent=2))
except Exception:
print(f"Body(raw): {payload}")
print("===== ProviderCall REQUEST END =====")
except Exception:
pass
# SSE: http_req (как в Burp)
req_id = f"{self.node_id}-{int(time.time()*1000)}"
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "http_req",
"node_id": self.node_id,
"node_type": self.type_name,
"provider": provider,
"req_id": req_id,
"method": "POST",
"url": url,
"headers": final_headers,
"body_text": json.dumps(payload, ensure_ascii=False, indent=2),
})
except Exception:
pass
# Timeout from run-meta (seconds); fallback to 60
try:
timeout_sec = float((context.get("meta") or {}).get("http_timeout_sec", 60) or 60)
except Exception:
timeout_sec = 60.0
st: Optional[int] = None
async with build_client(timeout=timeout_sec) as client:
body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
resp = await client.post(url, content=body_bytes, headers=final_headers)
# Do not raise_for_status: keep body/logs on 4xx/5xx
try:
print("===== ProviderCall RESPONSE BEGIN =====")
print(f"node={self.node_id} type={self.type_name} provider={provider}")
print(f"Status: {resp.status_code}")
try:
print("Headers:")
print(json.dumps(dict(resp.headers), ensure_ascii=False, indent=2))
except Exception:
try:
print(f"Headers(raw): {dict(resp.headers)}")
except Exception:
print("Headers(raw): <unavailable>")
try:
body_text = resp.text
except Exception:
body_text = "<resp.text decode error>"
print("Body Text:")
print(body_text)
print("===== ProviderCall RESPONSE END =====")
except Exception:
body_text = "<resp.text decode error>"
pass
try:
data = resp.json()
except Exception:
data = {"error": "Failed to decode JSON from upstream", "text": resp.text}
# SSE: http_resp
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "http_resp",
"node_id": self.node_id,
"node_type": self.type_name,
"provider": provider,
"req_id": req_id,
"status": int(resp.status_code),
"headers": dict(resp.headers),
"body_text": body_text if isinstance(body_text, str) else str(body_text),
})
except Exception:
pass
try:
st = int(resp.status_code)
except Exception:
st = None
# Извлечение текста: приоритет — пресет ноды -> кастомн. поля ноды -> глобальные meta
try:
meta_opts = (context.get("meta") or {})
presets = meta_opts.get("text_extract_presets") or []
cfg = self.config or {}
strategy = None
json_path = None
join_sep = None
# 1) Пресет по id
pid = str(cfg.get("text_extract_preset_id") or "").strip()
if pid:
try:
pr = next((x for x in presets if isinstance(x, dict) and str(x.get("id", "")) == pid), None)
except Exception:
pr = None
if pr:
strategy = str(pr.get("strategy") or "auto")
json_path = str(pr.get("json_path") or "")
join_sep = str(pr.get("join_sep") or "\n")
# 2) Кастомные поля ноды (если задано что-то из набора)
if not strategy and (cfg.get("text_extract_strategy") is not None or cfg.get("text_extract_json_path") is not None or cfg.get("text_join_sep") is not None):
strategy = str(cfg.get("text_extract_strategy") or "auto")
json_path = str(cfg.get("text_extract_json_path") or "")
join_sep = str(cfg.get("text_join_sep") or "\n")
# 3) Глобальные метаданные запуска
if not strategy:
strategy = str(meta_opts.get("text_extract_strategy", "auto") or "auto")
json_path = str(meta_opts.get("text_extract_json_path", "") or "")
join_sep = str(meta_opts.get("text_join_sep", "\n") or "\n")
except Exception:
strategy, json_path, join_sep = "auto", "", "\n"
text = _extract_text_for_out(data, strategy, provider, json_path, join_sep)
# Подробный SSE-лог завершения ProviderCall
try:
trace_fn = context.get("_trace")
if trace_fn:
# Compute destination macros for extracted text
try:
to_path = f"OUT.{self.node_id}.response_text"
macro_braces = f"{{{{ OUT.{self.node_id}.response_text }}}}"
alias_macro = ""
m = re.match(r"^n(\d+)$", str(self.node_id))
if m:
alias_macro = f"[[OUT{int(m.group(1))}]]"
except Exception:
to_path = f"OUT.{self.node_id}.response_text"
macro_braces = f"{{{{ OUT.{self.node_id}.response_text }}}}"
alias_macro = ""
await trace_fn({
"event": "provider_done",
"node_id": self.node_id,
"node_type": self.type_name,
"provider": provider,
"url": url,
"req_id": req_id,
"status": int(st) if st is not None else None,
"text_len": int(len(text or "")),
"extracted_text": text or "",
"strategy": strategy,
"json_path": json_path or "",
"join_sep": join_sep or "\n",
"to_path": to_path,
"to_macro_outx": alias_macro,
"to_macro_braces": macro_braces,
"ts": int(time.time() * 1000),
})
except Exception:
pass
return {"result": data, "response_text": text or ""}
class RawForwardNode(Node):
type_name = "RawForward"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Прямой форвард входящего запроса на апстрим:
- base_url/override_path/extra_headers поддерживают макросы [[...]] и {{ ... }}
- По умолчанию пробрасываем входящие заголовки (кроме Host/Content-Length), затем применяем extra_headers поверх.
- Тело берём из incoming.json (если есть), иначе пытаемся использовать текст/байты.
- Эмитим SSE события http_req/http_resp для панели логов.
"""
incoming = context.get("incoming", {}) or {}
raw_payload = incoming.get("json")
# Optional sleep before forwarding (UI-configured, milliseconds)
try:
sleep_ms = int(self.config.get("sleep_ms") or 0)
except Exception:
sleep_ms = 0
if sleep_ms > 0:
# Notify UI to show "sleep" state for this node
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "node_sleep",
"node_id": self.node_id,
"node_type": self.type_name,
"sleep_ms": int(sleep_ms),
"ts": int(time.time() * 1000),
})
except Exception:
pass
try:
await asyncio.sleep(max(0.0, sleep_ms / 1000.0))
except Exception:
pass
# Конфиг с поддержкой макросов
base_url: Optional[str] = self.config.get("base_url")
override_path: Optional[str] = self.config.get("override_path")
out_map_for_macros = context.get("OUT") or {}
if base_url:
base_url = render_template_simple(str(base_url), context, out_map_for_macros)
if override_path:
override_path = render_template_simple(str(override_path), context, out_map_for_macros)
# Автодетекция вендора для базового URL если base_url не задан
if not base_url:
vendor = detect_vendor(raw_payload)
if vendor == "openai":
base_url = "https://api.openai.com"
elif vendor == "claude":
base_url = "https://api.anthropic.com"
elif vendor == "gemini":
base_url = "https://generativelanguage.googleapis.com"
else:
raise ExecutionError(
f"Node {self.node_id} ({self.type_name}): 'base_url' is not configured and vendor could not be detected."
)
if not str(base_url).startswith(("http://", "https://")):
base_url = "http://" + str(base_url)
path = override_path or incoming.get("path") or "/"
query = incoming.get("query")
path_with_qs = f"{path}?{query}" if query else str(path)
url = urljoin(str(base_url).rstrip("/") + "/", str(path_with_qs).lstrip("/"))
# Заголовки: passthrough + extra_headers(JSON)
passthrough_headers: bool = bool(self.config.get("passthrough_headers", True))
extra_headers_json: str = self.config.get("extra_headers") or "{}"
try:
extra_headers_src = render_template_simple(extra_headers_json, context, out_map_for_macros) if extra_headers_json else "{}"
extra_headers = json.loads(extra_headers_src) if extra_headers_src else {}
if not isinstance(extra_headers, dict):
raise ValueError("extra_headers must be an object")
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"RawForward extra_headers invalid JSON: {exc}")
headers: Dict[str, str] = {}
if passthrough_headers:
inc_headers = incoming.get("headers") or {}
for k, v in inc_headers.items():
# Не пробрасываем управляющие заголовки, которые рассчитает httpx/сервер
if k and k.lower() not in {"host", "content-length"}:
headers[k] = v
# extra_headers перекрывают проброс
try:
headers.update(extra_headers)
except Exception:
pass
# Нормализация: защитимся от повторного наличия host/content-length после update
for k in list(headers.keys()):
kl = k.lower()
if kl in {"host", "content-length"}:
try:
headers.pop(k, None)
except Exception:
pass
# Метод
method = str(incoming.get("method") or "POST").upper().strip() or "POST"
# Тело запроса
body_text = ""
body_bytes: bytes = b""
if raw_payload is not None:
# JSON — сериализуем явно как UTF8
try:
body_text = json.dumps(raw_payload, ensure_ascii=False, indent=2)
except Exception:
try:
body_text = str(raw_payload)
except Exception:
body_text = ""
try:
body_bytes = json.dumps(raw_payload, ensure_ascii=False).encode("utf-8")
except Exception:
body_bytes = (body_text or "").encode("utf-8", errors="ignore")
# Если Content-Type не задан — выставим JSON
if "content-type" not in {k.lower() for k in headers.keys()}:
headers["Content-Type"] = "application/json"
else:
# Попробуем текст/байты из incoming
raw_bytes = incoming.get("body_bytes") or incoming.get("bytes")
raw_text = incoming.get("body_text") or incoming.get("text")
if isinstance(raw_bytes, (bytes, bytearray)):
body_bytes = bytes(raw_bytes)
try:
body_text = body_bytes.decode("utf-8", errors="ignore")
except Exception:
body_text = ""
elif raw_text is not None:
body_text = str(raw_text)
try:
body_bytes = body_text.encode("utf-8")
except Exception:
body_bytes = (body_text or "").encode("utf-8", errors="ignore")
else:
body_text = ""
body_bytes = b""
# Таймаут из метаданных запуска
try:
timeout_sec = float((context.get("meta") or {}).get("http_timeout_sec", 60) or 60)
except Exception:
timeout_sec = 60.0
# Диагностический вывод полной заявки/ответа (без маскировки)
try:
print("===== RawForward REQUEST BEGIN =====")
print(f"node={self.node_id} type={self.type_name}")
print(f"Method: {method}")
print(f"URL: {url}")
try:
print("Headers:")
print(json.dumps(headers, ensure_ascii=False, indent=2))
except Exception:
print(f"Headers(raw): {headers}")
print("Body:")
print(body_text)
print("===== RawForward REQUEST END =====")
except Exception:
pass
# SSE: http_req
req_id = f"{self.node_id}-{int(time.time()*1000)}"
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "http_req",
"node_id": self.node_id,
"node_type": self.type_name,
"req_id": req_id,
"method": method,
"url": url,
"headers": headers,
"body_text": body_text,
})
except Exception:
pass
# Отправка
async with build_client(timeout=timeout_sec) as client:
# Для GET/HEAD обычно не отправляем body
send_content = None if method in {"GET", "HEAD"} else body_bytes
resp = await client.request(method, url, headers=headers, content=send_content)
# Ответ: лог/печать
try:
print("===== RawForward RESPONSE BEGIN =====")
print(f"node={self.node_id} type={self.type_name}")
print(f"Status: {resp.status_code}")
try:
print("Headers:")
print(json.dumps(dict(resp.headers), ensure_ascii=False, indent=2))
except Exception:
try:
print(f"Headers(raw): {dict(resp.headers)}")
except Exception:
print("Headers(raw): <unavailable>")
try:
resp_text = resp.text
except Exception:
resp_text = "<resp.text decode error>"
print("Body Text:")
print(resp_text)
print("===== RawForward RESPONSE END =====")
except Exception:
resp_text = "<resp.text decode error>"
# SSE: http_resp
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "http_resp",
"node_id": self.node_id,
"node_type": self.type_name,
"req_id": req_id,
"status": int(resp.status_code),
"headers": dict(resp.headers),
"body_text": resp_text if isinstance(resp_text, str) else str(resp_text),
})
except Exception:
pass
# Разбор результата
try:
data = resp.json()
except Exception:
data = {"error": "Failed to decode JSON from upstream", "text": resp_text}
# Извлечение текста: приоритет — пресет ноды -> кастомные поля ноды -> глобальные meta.
try:
meta_opts = (context.get("meta") or {})
presets = meta_opts.get("text_extract_presets") or []
cfg = self.config or {}
strategy = None
json_path = None
join_sep = None
# 1) Пресет по id, выбранный в ноде
pid = str(cfg.get("text_extract_preset_id") or "").strip()
if pid:
try:
pr = next((x for x in presets if isinstance(x, dict) and str(x.get("id", "")) == pid), None)
except Exception:
pr = None
if pr:
strategy = str(pr.get("strategy") or "auto")
json_path = str(pr.get("json_path") or "")
join_sep = str(pr.get("join_sep") or "\n")
# 2) Кастомные поля ноды (если что-то задано)
if not strategy and (cfg.get("text_extract_strategy") is not None or cfg.get("text_extract_json_path") is not None or cfg.get("text_join_sep") is not None):
strategy = str(cfg.get("text_extract_strategy") or "auto")
json_path = str(cfg.get("text_extract_json_path") or "")
join_sep = str(cfg.get("text_join_sep") or "\n")
# 3) Глобальные метаданные запуска (дефолт)
if not strategy:
strategy = str(meta_opts.get("text_extract_strategy", "auto") or "auto")
json_path = str(meta_opts.get("text_extract_json_path", "") or "")
join_sep = str(meta_opts.get("text_join_sep", "\n") or "\n")
except Exception:
strategy, json_path, join_sep = "auto", "", "\n"
# Подсказка о провайдере: используем detect_vendor(data) как hint для auto
try:
prov_hint = detect_vendor(data if isinstance(data, dict) else {}) # type: ignore[arg-type]
except Exception:
prov_hint = "unknown"
best_text = _extract_text_for_out(data, strategy, prov_hint, json_path, join_sep)
# Подробный SSE-лог завершения RawForward
try:
trace_fn = context.get("_trace")
if trace_fn:
# Compute destination macros for extracted text and include req_id
try:
to_path = f"OUT.{self.node_id}.response_text"
macro_braces = f"{{{{ OUT.{self.node_id}.response_text }}}}"
alias_macro = ""
m = re.match(r"^n(\d+)$", str(self.node_id))
if m:
alias_macro = f"[[OUT{int(m.group(1))}]]"
except Exception:
to_path = f"OUT.{self.node_id}.response_text"
macro_braces = f"{{{{ OUT.{self.node_id}.response_text }}}}"
alias_macro = ""
await trace_fn({
"event": "rawforward_done",
"node_id": self.node_id,
"node_type": self.type_name,
"method": method,
"url": url,
"req_id": req_id,
"status": int(resp.status_code) if 'resp' in locals() else None,
"text_len": int(len(best_text or "")),
"extracted_text": best_text or "",
"strategy": strategy,
"json_path": json_path or "",
"join_sep": join_sep or "\n",
"to_path": to_path,
"to_macro_outx": alias_macro,
"to_macro_braces": macro_braces,
"ts": int(time.time() * 1000),
})
except Exception:
pass
return {"result": data, "response_text": best_text or ""}
class ReturnNode(Node):
type_name = "Return"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
# Определяем целевой формат
cfg = self.config or {}
target = str(cfg.get("target_format", "auto")).lower().strip() or "auto"
if target == "auto":
target = str(context.get("vendor_format") or "openai").lower().strip() or "openai"
# Рендерим текст из шаблона (по умолчанию берём [[OUT1]])
out_map = context.get("OUT") or {}
template = cfg.get("text_template")
if template is None or template == "":
template = "[[OUT1]]"
try:
text = render_template_simple(str(template), context, out_map)
except Exception:
text = ""
model = str(context.get("model") or "")
# Форматтеры под провайдеры (как в execute_pipeline_echo)
def fmt_openai(t: str) -> Dict[str, Any]:
return {
"id": "ret_mock_123",
"object": "chat.completion",
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": t},
"finish_reason": "stop",
}
],
"usage": {"prompt_tokens": 0, "completion_tokens": len((t or "").split()), "total_tokens": 0},
}
def fmt_gemini(t: str) -> Dict[str, Any]:
return {
"candidates": [
{
"content": {
"role": "model",
"parts": [{"text": t}],
},
"finishReason": "STOP",
"index": 0,
}
],
"modelVersion": model,
}
def fmt_claude(t: str) -> Dict[str, Any]:
return {
"id": "msg_ret_123",
"type": "message",
"model": model,
"role": "assistant",
"content": [
{"type": "text", "text": t}
],
"stop_reason": "end_turn",
}
if target == "openai":
result = fmt_openai(text)
elif target == "gemini":
result = fmt_gemini(text)
elif target == "claude":
result = fmt_claude(text)
else:
# неизвестное значение — безопасный дефолт
result = fmt_openai(text)
# Подробный SSE-лог Return
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "return_detail",
"node_id": self.node_id,
"node_type": self.type_name,
"target": target,
"text_len": int(len(text or "")),
"template_used": str(template),
"ts": int(time.time() * 1000),
})
except Exception:
pass
return {"result": result, "response_text": text}
class IfNode(Node):
type_name = "If"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
expr = str(self.config.get("expr") or "").strip()
out_map = context.get("OUT") or {}
# Для логов: отображаем развёрнутое выражение (с подставленными макросами)
try:
expanded = render_template_simple(expr, context, out_map)
except Exception:
expanded = expr
try:
res = bool(eval_condition_expr(expr, context, out_map)) if expr else False
except Exception as exc: # noqa: BLE001
# Расширенный лог, чтобы быстрее найти ошибку парсинга/оценки выражения
try:
print(f"TRACE if_error: {self.node_id} expr={expr!r} expanded={expanded!r} error={exc}")
except Exception:
pass
raise ExecutionError(f"If expr error: {exc}")
try:
print(f"TRACE if: {self.node_id} expr={expr!r} expanded={expanded!r} result={str(res).lower()}")
except Exception:
pass
# Подробный SSE-лог по If
try:
trace_fn = context.get("_trace")
if trace_fn:
await trace_fn({
"event": "if_result",
"node_id": self.node_id,
"node_type": self.type_name,
"expr": expr,
"expanded": expanded,
"result": bool(res),
"ts": int(time.time() * 1000),
})
except Exception:
pass
return {"result": res, "true": res, "false": (not res)}
NODE_REGISTRY.update({
SetVarsNode.type_name: SetVarsNode,
ProviderCallNode.type_name: ProviderCallNode,
RawForwardNode.type_name: RawForwardNode,
ReturnNode.type_name: ReturnNode,
IfNode.type_name: IfNode,
})