This commit is contained in:
2025-09-11 17:27:15 +03:00
parent 3c77c3dc2e
commit 11a0535712
32 changed files with 4682 additions and 442 deletions

View File

@@ -6,6 +6,8 @@ import json
import re
import asyncio
import time
import hashlib
from collections import deque
from agentui.providers.http_client import build_client
from agentui.common.vendors import detect_vendor
from agentui.pipeline.templating import (
@@ -20,7 +22,9 @@ from agentui.pipeline.templating import (
_deep_find_text,
_best_text_from_outputs,
render_template_simple,
eval_condition_expr,
)
from agentui.pipeline.storage import load_var_store, save_var_store, clear_var_store
# --- Templating helpers are imported from agentui.pipeline.templating ---
@@ -133,17 +137,75 @@ class PipelineExecutor:
raise ExecutionError(f"Unknown node type: {n.get('type')}")
self.nodes_by_id[n["id"]] = node_cls(n["id"], n.get("config", {}))
# Настройки режима исполнения (с дефолтами из default_pipeline)
try:
self.loop_mode = str(self.pipeline.get("loop_mode", "dag")).lower().strip() or "dag"
except Exception:
self.loop_mode = "dag"
try:
self.loop_max_iters = int(self.pipeline.get("loop_max_iters", 1000))
except Exception:
self.loop_max_iters = 1000
try:
self.loop_time_budget_ms = int(self.pipeline.get("loop_time_budget_ms", 10000))
except Exception:
self.loop_time_budget_ms = 10000
# Идентификатор пайплайна и политика очистки стора переменных
try:
self.pipeline_id = str(self.pipeline.get("id", "pipeline_editor")) or "pipeline_editor"
except Exception:
self.pipeline_id = "pipeline_editor"
try:
self.clear_var_store = bool(self.pipeline.get("clear_var_store", True))
except Exception:
self.clear_var_store = True
# В памяти держим актуальный STORE (доступен в шаблонах через {{ store.* }} и [[STORE:*]])
self._store: Dict[str, Any] = {}
# Локальный журнал выполнения для человекочитаемого трейсинга
self._exec_log: List[str] = []
async def run(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Исполнитель пайплайна с динамическим порядком на основе зависимостей графа.
Новый режим: волновое (level-by-level) исполнение с параллелизмом и барьером.
Все узлы «готовой волны» стартуют параллельно, ждём всех, затем открывается следующая волна.
Ограничение параллелизма берётся из pipeline.parallel_limit (по умолчанию 8).
Политика ошибок: fail-fast — при исключении любой задачи волны прерываем пайплайн.
Точка входа исполнителя. Переключает режим между:
- "dag": волновое исполнение с барьерами (исходное поведение)
- "iterative": итеративное исполнение с очередью и гейтами
"""
# Инициализация STORE по политике
try:
if self.clear_var_store:
clear_var_store(self.pipeline_id)
self._store = {}
else:
self._store = load_var_store(self.pipeline_id) or {}
except Exception:
self._store = {}
mode = (self.loop_mode or "dag").lower()
if mode == "iterative":
res = await self._run_iterative(context, trace)
else:
res = await self._run_dag(context, trace)
# На всякий случай финальная запись стора (если были изменения)
try:
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
return res
async def _run_dag(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Исходная волновая модель (DAG) — без изменений поведения.
"""
nodes: List[Dict[str, Any]] = list(self.pipeline.get("nodes", []))
id_set = set(self.nodes_by_id.keys())
@@ -151,6 +213,8 @@ class PipelineExecutor:
# Собираем зависимости: node_id -> set(parent_ids), и обратные связи dependents
deps_map: Dict[str, set] = {n["id"]: set() for n in nodes}
dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
# Гейты для ветвлений: child_id -> List[(parent_id, gate_name)] (gate_name: "true"/"false")
gate_deps: Dict[str, List[tuple[str, str]]] = {n["id"]: [] for n in nodes}
for n in nodes:
nid = n["id"]
@@ -176,8 +240,21 @@ class PipelineExecutor:
# Ссылки вида "node.outKey" или "node"
src_id = src.split(".", 1)[0] if "." in src else src
if src_id in id_set:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Если указали конкретный outKey (например, nIf.true / nIf.false),
# трактуем это ТОЛЬКО как гейт (НЕ как топологическую зависимость), чтобы избежать дедлоков.
is_gate = False
if "." in src:
try:
_, out_name = src.split(".", 1)
on = str(out_name).strip().lower()
if on in {"true", "false"}:
gate_deps[nid].append((src_id, on))
is_gate = True
except Exception:
is_gate = False
if not is_gate:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Входящие степени и первая волна
in_degree: Dict[str, int] = {nid: len(deps) for nid, deps in deps_map.items()}
@@ -186,11 +263,10 @@ class PipelineExecutor:
processed: List[str] = []
values: Dict[str, Dict[str, Any]] = {}
last_result: Dict[str, Any] = {}
last_node_id: Optional[str] = None
node_def_by_id: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes}
# Накопитель пользовательских переменных (SetVars) — доступен как context["vars"]
user_vars: Dict[str, Any] = {}
# Параметры параллелизма
try:
parallel_limit = int(self.pipeline.get("parallel_limit", 8))
except Exception:
@@ -198,23 +274,24 @@ class PipelineExecutor:
if parallel_limit <= 0:
parallel_limit = 1
# Вспомогательная корутина исполнения одной ноды со снапшотом OUT
async def exec_one(node_id: str, values_snapshot: Dict[str, Any], wave_num: int) -> tuple[str, Dict[str, Any]]:
ndef = node_def_by_id.get(node_id)
if not ndef:
raise ExecutionError(f"Node definition not found: {node_id}")
node = self.nodes_by_id[node_id]
# Снимок контекста и OUT на момент старта волны
ctx = dict(context)
ctx["OUT"] = values_snapshot
# Пользовательские переменные (накопленные SetVars)
try:
ctx["vars"] = dict(user_vars)
except Exception:
ctx["vars"] = {}
# Разрешаем inputs для ноды
# STORE доступен в шаблонах
try:
ctx["store"] = dict(self._store)
except Exception:
ctx["store"] = {}
inputs: Dict[str, Any] = {}
for name, source in (ndef.get("in") or {}).items():
if isinstance(source, list):
@@ -222,7 +299,6 @@ class PipelineExecutor:
else:
inputs[name] = _resolve_in_value(source, ctx, values_snapshot)
# Трассировка старта
if trace is not None:
try:
await trace({"event": "node_start", "node_id": ndef["id"], "wave": wave_num, "ts": int(time.time() * 1000)})
@@ -230,6 +306,10 @@ class PipelineExecutor:
pass
started = time.perf_counter()
try:
print(f"TRACE start: {ndef['id']} ({node.type_name})")
except Exception:
pass
try:
out = await node.run(inputs, ctx)
except Exception as exc:
@@ -258,35 +338,58 @@ class PipelineExecutor:
})
except Exception:
pass
try:
prev_text = ""
try:
from agentui.pipeline.templating import _best_text_from_outputs as _bt # type: ignore
if isinstance(out, dict):
prev_text = _bt(out) or ""
except Exception:
prev_text = ""
if isinstance(prev_text, str) and len(prev_text) > 200:
prev_text = prev_text[:200]
print(f"TRACE done: {ndef['id']} ({node.type_name}) dur={dur_ms}ms text={prev_text!r}")
except Exception:
pass
return node_id, out
# Волновое исполнение
wave_idx = 0
while ready:
wave_nodes = list(ready)
ready = [] # будет заполнено после завершения волны
ready = []
wave_results: Dict[str, Dict[str, Any]] = {}
# Один общий снапшот OUT для всей волны (барьер — узлы волны не видят результаты друг друга)
values_snapshot = dict(values)
# Чанковый запуск с лимитом parallel_limit
for i in range(0, len(wave_nodes), parallel_limit):
chunk = wave_nodes[i : i + parallel_limit]
# fail-fast: при исключении любой задачи gather бросит и отменит остальные
results = await asyncio.gather(
*(exec_one(nid, values_snapshot, wave_idx) for nid in chunk),
return_exceptions=False,
)
# Коммитим результаты чанка в локальное хранилище волны
for nid, out in results:
wave_results[nid] = out
last_result = out # обновляем на каждом успешном результате
# После завершения волны — коммитим все её результаты в общие values
last_result = out
last_node_id = nid
try:
ndef = node_def_by_id.get(nid) or {}
ntype = ndef.get("type", "")
if ntype == "If":
expr = str((ndef.get("config") or {}).get("expr", ""))
resb = False
try:
if isinstance(out, dict):
resb = bool(out.get("result"))
except Exception:
resb = False
self._exec_log.append(f"If (#{nid}) {expr} => {str(resb).lower()}")
else:
self._exec_log.append(f"{nid}({ntype})")
except Exception:
pass
values.update(wave_results)
processed.extend(wave_nodes)
# Соберём пользовательские переменные из SetVars узлов волны
try:
for _nid, out in wave_results.items():
if isinstance(out, dict):
@@ -295,23 +398,553 @@ class PipelineExecutor:
user_vars.update(v)
except Exception:
pass
# Обновляем входящие степени для зависимых и формируем следующую волну
# Обновляем и сохраняем STORE, если были новые переменные из SetVars/прочих нод
try:
merged: Dict[str, Any] = {}
for _nid, out in wave_results.items():
if isinstance(out, dict) and isinstance(out.get("vars"), dict):
merged.update(out.get("vars") or {})
if merged:
self._store.update(merged)
try:
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
for done in wave_nodes:
for child in dependents.get(done, ()):
in_degree[child] -= 1
next_ready = [nid for nid, deg in in_degree.items() if deg == 0 and nid not in processed and nid not in wave_nodes]
# Исключаем уже учтённые и добавляем только те, которые действительно готовы
ready = next_ready
next_ready_candidates = [nid for nid, deg in in_degree.items() if deg == 0 and nid not in processed and nid not in wave_nodes]
def _gates_ok(child_id: str) -> bool:
pairs = gate_deps.get(child_id) or []
if not pairs:
return True
missing_seen = False
for pid, gate in pairs:
v = values.get(pid)
if v is None:
# значение гейта ещё не известно
missing_seen = True
continue
try:
flag = bool(isinstance(v, dict) and v.get(gate))
except Exception:
flag = False
if not flag:
return False
# Семантика "missing gate = pass" как в iterative-режиме:
# если флаги гейта ещё не известны, но у ребёнка есть реальные родители — допускаем первый запуск.
# если реальных родителей нет (только гейты) — ждём появления значения гейта.
if missing_seen:
real_parents = deps_map.get(child_id) or set()
if len(real_parents) == 0:
return False
return True
ready = [nid for nid in next_ready_candidates if _gates_ok(nid)]
wave_idx += 1
# Проверка на циклы/недостижимые ноды
if len(processed) != len(nodes):
remaining = [n["id"] for n in nodes if n["id"] not in processed]
raise ExecutionError(f"Cycle detected or unresolved dependencies among nodes: {remaining}")
# Persist snapshot of macro context and outputs into STORE
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Построение и сохранение человекочитаемого EXECUTION TRACE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
async def _run_iterative(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Итеративное исполнение:
- Очередь готовых задач
- Повторные пуски допустимы: ребёнок ставится в очередь, когда какой-либо из его родителей обновился после последнего запуска ребёнка
- Гейты If.true/If.false фильтруют постановку в очередь
- Ограничения: loop_max_iters, loop_time_budget_ms
"""
nodes: List[Dict[str, Any]] = list(self.pipeline.get("nodes", []))
id_set = set(self.nodes_by_id.keys())
# Строим зависимости и гейты (как в DAG)
deps_map: Dict[str, set] = {n["id"]: set() for n in nodes}
dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
gate_deps: Dict[str, List[tuple[str, str]]] = {n["id"]: [] for n in nodes}
# Обратная карта для гейтов: кто «слушает» изменения конкретного родителя по гейту
gate_dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
for n in nodes:
nid = n["id"]
for _, source in (n.get("in") or {}).items():
sources = source if isinstance(source, list) else [source]
for src in sources:
if not isinstance(src, str):
continue
if src.startswith("macro:"):
continue
if re.fullmatch(r"\[\[\s*VAR\s*[:\s]\s*[^\]]+\s*\]\]", src.strip()):
continue
out_ref_node = _extract_out_node_id_from_ref(src)
if out_ref_node and out_ref_node in id_set:
# [[OUT:nodeId(.key)*]] — это РЕАЛЬНАЯ топологическая зависимость
deps_map[nid].add(out_ref_node)
dependents[out_ref_node].add(nid)
continue
src_id = src.split(".", 1)[0] if "." in src else src
if src_id in id_set:
# "node.true/false" — ТОЛЬКО гейт (без топологической зависимости)
is_gate = False
if "." in src:
try:
_, out_name = src.split(".", 1)
on = str(out_name).strip().lower()
if on in {"true", "false"}:
gate_deps[nid].append((src_id, on))
# Регистрируем обратную зависимость по гейту,
# чтобы будить ребёнка при изменении флага у родителя
gate_dependents[src_id].add(nid)
is_gate = True
except Exception:
is_gate = False
if not is_gate:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Вспом. структуры состояния
values: Dict[str, Dict[str, Any]] = {}
last_result: Dict[str, Any] = {}
last_node_id: Optional[str] = None
user_vars: Dict[str, Any] = {}
node_def_by_id: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes}
# Версии обновлений нод (инкремент при каждом коммите)
version: Dict[str, int] = {n["id"]: 0 for n in nodes}
# Когда ребёнок запускался в последний раз (номер глобального шага)
last_run_step: Dict[str, int] = {n["id"]: -1 for n in nodes}
# Глобальный счётчик шагов
step = 0
# Очередь готовых задач
q: deque[str] = deque()
# Начальные готовые — ноды без зависимостей
in_degree: Dict[str, int] = {nid: len(deps) for nid, deps in deps_map.items()}
for nid, deg in in_degree.items():
if deg == 0:
q.append(nid)
def gates_ok(child_id: str) -> bool:
pairs = gate_deps.get(child_id) or []
if not pairs:
return True
missing_seen = False
for pid, gate in pairs:
v = values.get(pid)
# Если значение гейта ещё не появилось — отмечаем и продолжаем проверку других гейтов
if v is None:
missing_seen = True
continue
try:
flag = bool(isinstance(v, dict) and v.get(gate))
except Exception:
flag = False
if not flag:
return False
# Если были отсутствующие значения гейтов, то:
# - у узла есть реальные (топологические) родители → не блокируем (первый запуск допустим)
# - у узла НЕТ реальных родителей (только гейты) → откладываем до появления значения гейта
if missing_seen:
real_parents = deps_map.get(child_id) or set()
if len(real_parents) == 0:
return False
return True
# Лимиты
total_runs = 0
t0 = time.perf_counter()
try:
parallel_limit = int(self.pipeline.get("parallel_limit", 8))
except Exception:
parallel_limit = 8
if parallel_limit <= 0:
parallel_limit = 1
# Асинхронный запуск одной ноды (снимок OUT на момент dequeue)
async def exec_one(node_id: str, snapshot: Dict[str, Any], wave_num: int) -> tuple[str, Dict[str, Any]]:
ndef = node_def_by_id.get(node_id)
if not ndef:
raise ExecutionError(f"Node definition not found: {node_id}")
node = self.nodes_by_id[node_id]
ctx = dict(context)
ctx["OUT"] = snapshot
try:
ctx["vars"] = dict(user_vars)
except Exception:
ctx["vars"] = {}
# STORE доступен в шаблонах
try:
ctx["store"] = dict(self._store)
except Exception:
ctx["store"] = {}
inputs: Dict[str, Any] = {}
for name, source in (ndef.get("in") or {}).items():
if isinstance(source, list):
inputs[name] = [_resolve_in_value(s, ctx, snapshot) for s in source]
else:
inputs[name] = _resolve_in_value(source, ctx, snapshot)
if trace is not None:
try:
await trace({"event": "node_start", "node_id": ndef["id"], "wave": wave_num, "ts": int(time.time() * 1000)})
except Exception:
pass
started = time.perf_counter()
try:
print(f"TRACE start: {ndef['id']} ({node.type_name})")
except Exception:
pass
try:
out = await node.run(inputs, ctx)
except Exception as exc:
if trace is not None:
try:
await trace({
"event": "node_error",
"node_id": ndef["id"],
"wave": wave_num,
"ts": int(time.time() * 1000),
"error": str(exc),
})
except Exception:
pass
raise
else:
dur_ms = int((time.perf_counter() - started) * 1000)
if trace is not None:
try:
await trace({
"event": "node_done",
"node_id": ndef["id"],
"wave": wave_num,
"ts": int(time.time() * 1000),
"duration_ms": dur_ms,
})
except Exception:
pass
try:
prev_text = ""
try:
from agentui.pipeline.templating import _best_text_from_outputs as _bt # type: ignore
if isinstance(out, dict):
prev_text = _bt(out) or ""
except Exception:
prev_text = ""
if isinstance(prev_text, str) and len(prev_text) > 200:
prev_text = prev_text[:200]
print(f"TRACE done: {ndef['id']} ({node.type_name}) dur={dur_ms}ms text={prev_text!r}")
except Exception:
pass
return node_id, out
# Главный цикл
while q:
# Проверяем лимиты
if total_runs >= self.loop_max_iters:
raise ExecutionError(f"Iterative mode exceeded loop_max_iters={self.loop_max_iters}")
if (time.perf_counter() - t0) * 1000.0 > self.loop_time_budget_ms:
raise ExecutionError(f"Iterative mode exceeded loop_time_budget_ms={self.loop_time_budget_ms}")
# Собираем партию до parallel_limit, с учётом гейтов.
# ВАЖНО: защищаемся от «вечной карусели» в пределах одной партии.
# Обрабатываем не более исходной длины очереди за проход.
batch: List[str] = []
seen: set[str] = set()
spin_guard = len(q)
processed_in_pass = 0
while q and len(batch) < parallel_limit and processed_in_pass < spin_guard:
nid = q.popleft()
processed_in_pass += 1
if nid in seen:
# Уже добавлен в партию — пропускаем без возврата в очередь
continue
# Должны быть удовлетворены зависимости: или нет родителей, или все родители хотя бы раз исполнились
parents = deps_map.get(nid) or set()
deps_ok = all(version[p] > 0 or in_degree.get(nid, 0) == 0 for p in parents) or (len(parents) == 0)
if not deps_ok:
# отложим до лучших времён
q.append(nid)
continue
if not gates_ok(nid):
# гейты пока не открыты — попробуем позже
q.append(nid)
continue
batch.append(nid)
seen.add(nid)
if not batch:
# Очередь «застряла» — либо циклические ожидания без начальных значений, либо гейт не откроется.
# Безопасно выходим.
break
# Снимок на момент партии
snapshot = dict(values)
results = await asyncio.gather(*(exec_one(nid, snapshot, step) for nid in batch), return_exceptions=False)
# Коммитим результаты немедленно и продвигаем версии
produced_vars: Dict[str, Any] = {}
for nid, out in results:
values[nid] = out
version[nid] = version.get(nid, 0) + 1
last_result = out
last_node_id = nid
total_runs += 1
last_run_step[nid] = step
# Человекочитаемый журнал
try:
ndef = node_def_by_id.get(nid) or {}
ntype = ndef.get("type", "")
if ntype == "If":
expr = str((ndef.get("config") or {}).get("expr", ""))
resb = False
try:
if isinstance(out, dict):
resb = bool(out.get("result"))
except Exception:
resb = False
self._exec_log.append(f"If (#{nid}) {expr} => {str(resb).lower()}")
else:
self._exec_log.append(f"{nid}({ntype})")
except Exception:
pass
# Сбор пользовательских переменных
try:
if isinstance(out, dict) and isinstance(out.get("vars"), dict):
produced_vars.update(out.get("vars") or {})
except Exception:
pass
if produced_vars:
user_vars.update(produced_vars)
# Обновляем STORE и сохраняем на диск
try:
self._store.update(produced_vars)
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
# Постановка потомков, у кого изменился хотя бы один родитель
for nid in batch:
# Топологические потомки (реальные зависимости): будим всегда
for child in dependents.get(nid, ()):
if last_run_step.get(child, -1) < step:
q.append(child)
# Потомки по гейтам: будим ТОЛЬКО выбранную ветку
# Логика: если у gchild есть пара gate_deps с текущим nid и нужным gate ('true'/'false'),
# то будим только если соответствующий флаг у родителя истинный. Иначе — не будим.
g_out = values.get(nid) or {}
g_children = gate_dependents.get(nid, ())
for gchild in g_children:
if last_run_step.get(gchild, -1) >= step:
continue
pairs = [p for p in (gate_deps.get(gchild) or []) if p and p[0] == nid]
if not pairs:
# gchild зарегистрирован как зависимый по гейту, но явной пары нет — пропускаем
continue
wake = False
for (_pid, gate_name) in pairs:
try:
want = str(gate_name or "").strip().lower()
if want in ("true", "false"):
if bool(g_out.get(want, False)):
wake = True
break
except Exception:
pass
if wake:
q.append(gchild)
try:
print(f"TRACE wake_by_gate: parent={nid} -> child={gchild}")
except Exception:
pass
# Ранний выход, если встретили Return среди выполненных
try:
for nid in batch:
ndef = node_def_by_id.get(nid) or {}
if (ndef.get("type") or "") == "Return":
# Зафиксируем, что именно Return завершил выполнение
try:
last_node_id = nid
last_result = values.get(nid) or last_result
except Exception:
pass
# Снимок стора до выхода
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Финальный EXECUTION TRACE в лог и STORE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
except Exception:
pass
step += 1
# Финальный снапшот всего контекста и OUT в STORE.snapshot
try:
self._commit_snapshot(context, values, last_node_id or "")
except Exception:
pass
# Построение и сохранение человекочитаемого EXECUTION TRACE
try:
summary = " -> ".join(self._exec_log) if getattr(self, "_exec_log", None) else ""
if summary:
try:
print("===== EXECUTION TRACE =====")
print(summary)
print("===== END TRACE =====")
except Exception:
pass
try:
if isinstance(self._store.get("snapshot"), dict):
self._store["snapshot"]["EXEC_TRACE"] = summary
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
except Exception:
pass
return last_result
def _safe_preview(self, obj: Any, max_bytes: int = 262_144) -> Any:
"""
Вернёт объект как есть, если его JSON-представление укладывается в лимит.
Иначе — мета-объект с пометкой о тримминге и SHA256 + текстовый превью.
"""
try:
s = json.dumps(obj, ensure_ascii=False)
except Exception:
try:
s = str(obj)
except Exception:
s = "<unrepresentable>"
b = s.encode("utf-8", errors="ignore")
if len(b) <= max_bytes:
return obj
sha = hashlib.sha256(b).hexdigest()
preview = b[:max_bytes].decode("utf-8", errors="ignore")
return {
"__truncated__": True,
"sha256": sha,
"preview": preview,
}
def _commit_snapshot(self, context: Dict[str, Any], values: Dict[str, Any], last_node_id: str) -> None:
"""
Собирает STORE.snapshot:
- incoming.*, params.*, model, vendor_format, system
- OUT_TEXT.nX — строковая вытяжка (до 16КБ)
- OUT.nX — сырой JSON (с триммингом до ~256КБ через preview)
- Алиасы OUT1/OUT2/... — плоские строки (то же, что [[OUT1]])
- LAST_NODE — последний выполнившийся узел
"""
# OUT_TEXT / OUT
out_text: Dict[str, str] = {}
out_raw: Dict[str, Any] = {}
aliases: Dict[str, Any] = {}
for nid, out in (values or {}).items():
if out is None:
continue
try:
txt = _best_text_from_outputs(out) # type: ignore[name-defined]
except Exception:
txt = ""
if not isinstance(txt, str):
try:
txt = str(txt)
except Exception:
txt = ""
if len(txt) > 16_384:
txt = txt[:16_384]
out_text[nid] = txt
out_raw[nid] = self._safe_preview(out, 262_144)
m = re.match(r"^n(\d+)$", str(nid))
if m:
aliases[f"OUT{int(m.group(1))}"] = txt
snapshot: Dict[str, Any] = {
"incoming": self._safe_preview(context.get("incoming"), 262_144),
"params": self._safe_preview(context.get("params"), 65_536),
"model": context.get("model"),
"vendor_format": context.get("vendor_format"),
"system": context.get("system") or "",
"OUT": out_raw,
"OUT_TEXT": out_text,
"LAST_NODE": last_node_id or "",
**aliases,
}
# Сохраняем под ключом snapshot, пользовательские vars остаются как есть в корне STORE
try:
if not isinstance(self._store, dict):
self._store = {}
self._store["snapshot"] = snapshot
save_var_store(self.pipeline_id, self._store)
except Exception:
pass
class SetVarsNode(Node):
type_name = "SetVars"
@@ -1026,12 +1659,40 @@ class ReturnNode(Node):
return {"result": result, "response_text": text}
class IfNode(Node):
type_name = "If"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
expr = str(self.config.get("expr") or "").strip()
out_map = context.get("OUT") or {}
# Для логов: отображаем развёрнутое выражение (с подставленными макросами)
try:
expanded = render_template_simple(expr, context, out_map)
except Exception:
expanded = expr
try:
res = bool(eval_condition_expr(expr, context, out_map)) if expr else False
except Exception as exc: # noqa: BLE001
# Расширенный лог, чтобы быстрее найти ошибку парсинга/оценки выражения
try:
print(f"TRACE if_error: {self.node_id} expr={expr!r} expanded={expanded!r} error={exc}")
except Exception:
pass
raise ExecutionError(f"If expr error: {exc}")
try:
print(f"TRACE if: {self.node_id} expr={expr!r} expanded={expanded!r} result={str(res).lower()}")
except Exception:
pass
return {"result": res, "true": res, "false": (not res)}
NODE_REGISTRY.update({
SetVarsNode.type_name: SetVarsNode,
ProviderCallNode.type_name: ProviderCallNode,
RawForwardNode.type_name: RawForwardNode,
ReturnNode.type_name: ReturnNode,
IfNode.type_name: IfNode,
})