Files
HadTavern/agentui/pipeline/executor.py

1038 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from typing import Any, Dict, List, Optional, Callable, Awaitable
from urllib.parse import urljoin
import json
import re
import asyncio
import time
from agentui.providers.http_client import build_client
from agentui.common.vendors import detect_vendor
from agentui.pipeline.templating import (
_OUT_MACRO_RE,
_VAR_MACRO_RE,
_PROMPT_MACRO_RE,
_OUT_SHORT_RE,
_BRACES_RE,
_split_path,
_get_by_path,
_stringify_for_template,
_deep_find_text,
_best_text_from_outputs,
render_template_simple,
)
# --- Templating helpers are imported from agentui.pipeline.templating ---
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
# moved to agentui.pipeline.templating
def _extract_out_node_id_from_ref(s: Any) -> Optional[str]:
"""
Извлекает node_id из строки с макросом [[OUT:nodeId(.path)*]].
Возвращает None, если макрос не найден.
"""
if not isinstance(s, str):
return None
m = _OUT_MACRO_RE.search(s)
if not m:
return None
body = m.group(1).strip()
node_id = body.split(".", 1)[0].strip()
return node_id or None
def _resolve_in_value(source: Any, context: Dict[str, Any], values: Dict[str, Dict[str, Any]]) -> Any:
"""
Разрешает входные связи/макросы в значение для inputs:
- Нестроковые значения возвращаются как есть
- "macro:path" → берёт значение из context по точечному пути
- "[[VAR:path]]" → берёт значение из context
- "[[OUT:nodeId(.path)*]]" → берёт из уже вычисленных выходов ноды
- "nodeId(.path)*" → ссылка на выходы ноды
- Если передан список ссылок — вернёт список разрешённых значений
- Иначе пытается взять из context по пути; если не найдено, оставляет исходную строку
"""
# Поддержка массивов ссылок (для multi-depends или будущих списковых входов)
if isinstance(source, list):
return [_resolve_in_value(s, context, values) for s in source]
if not isinstance(source, str):
return source
s = source.strip()
# macro:path
if s.lower().startswith("macro:"):
path = s.split(":", 1)[1].strip()
return _get_by_path(context, path)
# [[VAR: path]]
m = _VAR_MACRO_RE.fullmatch(s)
if m:
path = m.group(1).strip()
return _get_by_path(context, path)
# [[OUT: nodeId(.path)*]]
m = _OUT_MACRO_RE.fullmatch(s)
if m:
body = m.group(1).strip()
if "." in body:
node_id, rest = body.split(".", 1)
node_val = values.get(node_id)
return _get_by_path(node_val, rest)
node_val = values.get(body)
return node_val
# "nodeId(.path)*"
if "." in s:
node_id, rest = s.split(".", 1)
if node_id in values:
return _get_by_path(values.get(node_id), rest)
if s in values:
return values.get(s)
# fallback: from context by dotted path or raw string
ctx_val = _get_by_path(context, s)
return ctx_val if ctx_val is not None else source
# moved to agentui.pipeline.templating
class ExecutionError(Exception):
pass
class Node:
type_name: str = "Base"
def __init__(self, node_id: str, config: Optional[Dict[str, Any]] = None) -> None:
self.node_id = node_id
self.config = config or {}
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
"""Execute node with inputs and context. Return dict of outputs."""
raise NotImplementedError
# Регистрация поддерживаемых типов нод (минимальный набор)
NODE_REGISTRY: Dict[str, Any] = {}
class PipelineExecutor:
def __init__(self, pipeline: Dict[str, Any]) -> None:
self.pipeline = pipeline
self.nodes_by_id: Dict[str, Node] = {}
for n in pipeline.get("nodes", []):
node_cls = NODE_REGISTRY.get(n.get("type"))
if not node_cls:
raise ExecutionError(f"Unknown node type: {n.get('type')}")
self.nodes_by_id[n["id"]] = node_cls(n["id"], n.get("config", {}))
async def run(
self,
context: Dict[str, Any],
trace: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None,
) -> Dict[str, Any]:
"""
Исполнитель пайплайна с динамическим порядком на основе зависимостей графа.
Новый режим: волновое (level-by-level) исполнение с параллелизмом и барьером.
Все узлы «готовой волны» стартуют параллельно, ждём всех, затем открывается следующая волна.
Ограничение параллелизма берётся из pipeline.parallel_limit (по умолчанию 8).
Политика ошибок: fail-fast — при исключении любой задачи волны прерываем пайплайн.
"""
nodes: List[Dict[str, Any]] = list(self.pipeline.get("nodes", []))
id_set = set(self.nodes_by_id.keys())
# Собираем зависимости: node_id -> set(parent_ids), и обратные связи dependents
deps_map: Dict[str, set] = {n["id"]: set() for n in nodes}
dependents: Dict[str, set] = {n["id"]: set() for n in nodes}
for n in nodes:
nid = n["id"]
for _, source in (n.get("in") or {}).items():
# Разворачиваем массивы ссылок (multi-depends)
sources = source if isinstance(source, list) else [source]
for src in sources:
if not isinstance(src, str):
# Нестрочные значения и массивы констант — зависимостей нет
continue
if src.startswith("macro:"):
# Макросы берутся из контекста, без зависимостей
continue
# [[VAR:...]] — макрос из контекста, зависимостей нет
if re.fullmatch(r"\[\[\s*VAR\s*[:\s]\s*[^\]]+\s*\]\]", src.strip()):
continue
# [[OUT:nodeId(.key)*]] — зависимость от указанной ноды
out_ref_node = _extract_out_node_id_from_ref(src)
if out_ref_node and out_ref_node in id_set:
deps_map[nid].add(out_ref_node)
dependents[out_ref_node].add(nid)
continue
# Ссылки вида "node.outKey" или "node"
src_id = src.split(".", 1)[0] if "." in src else src
if src_id in id_set:
deps_map[nid].add(src_id)
dependents[src_id].add(nid)
# Входящие степени и первая волна
in_degree: Dict[str, int] = {nid: len(deps) for nid, deps in deps_map.items()}
ready: List[str] = [nid for nid, deg in in_degree.items() if deg == 0]
processed: List[str] = []
values: Dict[str, Dict[str, Any]] = {}
last_result: Dict[str, Any] = {}
node_def_by_id: Dict[str, Dict[str, Any]] = {n["id"]: n for n in nodes}
# Накопитель пользовательских переменных (SetVars) — доступен как context["vars"]
user_vars: Dict[str, Any] = {}
# Параметры параллелизма
try:
parallel_limit = int(self.pipeline.get("parallel_limit", 8))
except Exception:
parallel_limit = 8
if parallel_limit <= 0:
parallel_limit = 1
# Вспомогательная корутина исполнения одной ноды со снапшотом OUT
async def exec_one(node_id: str, values_snapshot: Dict[str, Any], wave_num: int) -> tuple[str, Dict[str, Any]]:
ndef = node_def_by_id.get(node_id)
if not ndef:
raise ExecutionError(f"Node definition not found: {node_id}")
node = self.nodes_by_id[node_id]
# Снимок контекста и OUT на момент старта волны
ctx = dict(context)
ctx["OUT"] = values_snapshot
# Пользовательские переменные (накопленные SetVars)
try:
ctx["vars"] = dict(user_vars)
except Exception:
ctx["vars"] = {}
# Разрешаем inputs для ноды
inputs: Dict[str, Any] = {}
for name, source in (ndef.get("in") or {}).items():
if isinstance(source, list):
inputs[name] = [_resolve_in_value(s, ctx, values_snapshot) for s in source]
else:
inputs[name] = _resolve_in_value(source, ctx, values_snapshot)
# Трассировка старта
if trace is not None:
try:
await trace({"event": "node_start", "node_id": ndef["id"], "wave": wave_num, "ts": int(time.time() * 1000)})
except Exception:
pass
started = time.perf_counter()
try:
out = await node.run(inputs, ctx)
except Exception as exc:
if trace is not None:
try:
await trace({
"event": "node_error",
"node_id": ndef["id"],
"wave": wave_num,
"ts": int(time.time() * 1000),
"error": str(exc),
})
except Exception:
pass
raise
else:
dur_ms = int((time.perf_counter() - started) * 1000)
if trace is not None:
try:
await trace({
"event": "node_done",
"node_id": ndef["id"],
"wave": wave_num,
"ts": int(time.time() * 1000),
"duration_ms": dur_ms,
})
except Exception:
pass
return node_id, out
# Волновое исполнение
wave_idx = 0
while ready:
wave_nodes = list(ready)
ready = [] # будет заполнено после завершения волны
wave_results: Dict[str, Dict[str, Any]] = {}
# Один общий снапшот OUT для всей волны (барьер — узлы волны не видят результаты друг друга)
values_snapshot = dict(values)
# Чанковый запуск с лимитом parallel_limit
for i in range(0, len(wave_nodes), parallel_limit):
chunk = wave_nodes[i : i + parallel_limit]
# fail-fast: при исключении любой задачи gather бросит и отменит остальные
results = await asyncio.gather(
*(exec_one(nid, values_snapshot, wave_idx) for nid in chunk),
return_exceptions=False,
)
# Коммитим результаты чанка в локальное хранилище волны
for nid, out in results:
wave_results[nid] = out
last_result = out # обновляем на каждом успешном результате
# После завершения волны — коммитим все её результаты в общие values
values.update(wave_results)
processed.extend(wave_nodes)
# Соберём пользовательские переменные из SetVars узлов волны
try:
for _nid, out in wave_results.items():
if isinstance(out, dict):
v = out.get("vars")
if isinstance(v, dict):
user_vars.update(v)
except Exception:
pass
# Обновляем входящие степени для зависимых и формируем следующую волну
for done in wave_nodes:
for child in dependents.get(done, ()):
in_degree[child] -= 1
next_ready = [nid for nid, deg in in_degree.items() if deg == 0 and nid not in processed and nid not in wave_nodes]
# Исключаем уже учтённые и добавляем только те, которые действительно готовы
ready = next_ready
wave_idx += 1
# Проверка на циклы/недостижимые ноды
if len(processed) != len(nodes):
remaining = [n["id"] for n in nodes if n["id"] not in processed]
raise ExecutionError(f"Cycle detected or unresolved dependencies among nodes: {remaining}")
return last_result
class SetVarsNode(Node):
type_name = "SetVars"
def _normalize(self) -> List[Dict[str, Any]]:
raw = self.config.get("variables") or []
if not isinstance(raw, list):
return []
norm: List[Dict[str, Any]] = []
for i, b in enumerate(raw):
if not isinstance(b, dict):
continue
name = str(b.get("name", "")).strip()
mode = str(b.get("mode", "string")).lower().strip()
value = b.get("value", "")
try:
order = int(b.get("order")) if b.get("order") is not None else i
except Exception:
order = i
norm.append({
"id": b.get("id") or f"v{i}",
"name": name,
"mode": "expr" if mode == "expr" else "string",
"value": value,
"order": order,
})
return norm
def _safe_eval_expr(self, expr: str) -> Any:
"""
Безопасная оценка выражений для SetVars.
Поддержка:
- Литералы: числа/строки/bool/None, списки, кортежи, словари
- JSONлитералы: true/false/null, объекты и массивы (парсятся как Python True/False/None, dict/list)
- Арифметика: + - * / // %, унарные +-
- Логика: and/or, сравнения (== != < <= > >=, цепочки)
- Безопасные функции: rand(), randint(a,b), choice(list)
Запрещено: имя/атрибуты/индексация/условные/импорты/прочие вызовы функций.
"""
import ast
import operator as op
import random
# 0) Попытаться распознать чистый JSONлитерал (включая true/false/null, объекты/массивы/числа/строки).
# Это не вмешивается в математику: для выражений вида "1+2" json.loads бросит исключение и мы пойдём в AST.
try:
s = str(expr).strip()
return json.loads(s)
except Exception:
pass
allowed_bin = {
ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul, ast.Div: op.truediv,
ast.FloorDiv: op.floordiv, ast.Mod: op.mod,
}
allowed_unary = {ast.UAdd: lambda x: +x, ast.USub: lambda x: -x}
allowed_cmp = {
ast.Eq: op.eq, ast.NotEq: op.ne, ast.Lt: op.lt, ast.LtE: op.le, ast.Gt: op.gt, ast.GtE: op.ge,
}
def eval_node(node: ast.AST) -> Any:
if isinstance(node, ast.Expression):
return eval_node(node.body)
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Tuple):
return tuple(eval_node(e) for e in node.elts)
if isinstance(node, ast.List):
return [eval_node(e) for e in node.elts]
if isinstance(node, ast.Dict):
return {eval_node(k): eval_node(v) for k, v in zip(node.keys, node.values)}
if isinstance(node, ast.UnaryOp) and type(node.op) in allowed_unary:
return allowed_unary[type(node.op)](eval_node(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in allowed_bin:
return allowed_bin[type(node.op)](eval_node(node.left), eval_node(node.right))
if isinstance(node, ast.BoolOp):
vals = [eval_node(v) for v in node.values]
if isinstance(node.op, ast.And):
res = True
for v in vals:
res = res and bool(v)
return res
if isinstance(node.op, ast.Or):
res = False
for v in vals:
res = res or bool(v)
return res
if isinstance(node, ast.Compare):
left = eval_node(node.left)
for opnode, comparator in zip(node.ops, node.comparators):
if type(opnode) not in allowed_cmp:
raise ExecutionError("Unsupported comparison operator")
right = eval_node(comparator)
if not allowed_cmp[type(opnode)](left, right):
return False
left = right
return True
# Разрешённые вызовы: rand(), randint(a,b), choice(list)
if isinstance(node, ast.Call):
# Никаких kwargs, *args
if node.keywords or isinstance(getattr(node, "starargs", None), ast.AST) or isinstance(getattr(node, "kwargs", None), ast.AST):
raise ExecutionError("Call with kwargs/starargs is not allowed")
fn = node.func
if not isinstance(fn, ast.Name):
raise ExecutionError("Only simple function calls are allowed")
name = fn.id
if name == "rand":
if len(node.args) != 0:
raise ExecutionError("rand() takes no arguments")
return random.random()
if name == "randint":
if len(node.args) != 2:
raise ExecutionError("randint(a,b) requires two arguments")
a = eval_node(node.args[0])
b = eval_node(node.args[1])
try:
return random.randint(int(a), int(b))
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"randint invalid arguments: {exc}")
if name == "choice":
if len(node.args) != 1:
raise ExecutionError("choice(list) requires one argument")
seq = eval_node(node.args[0])
if not isinstance(seq, (list, tuple)):
raise ExecutionError("choice() expects list or tuple")
if not seq:
raise ExecutionError("choice() on empty sequence")
return random.choice(seq)
raise ExecutionError(f"Function {name} is not allowed")
# Запрещаем всё остальное (Name/Attribute/Subscript/IfExp/Comprehensions и пр.)
raise ExecutionError("Expression not allowed")
try:
tree = ast.parse(str(expr), mode="eval")
except Exception as exc:
raise ExecutionError(f"SetVars expr parse error: {exc}") from exc
return eval_node(tree)
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
out_map = context.get("OUT") or {}
result: Dict[str, Any] = {}
import re as _re
for v in sorted(self._normalize(), key=lambda x: x.get("order", 0)):
name = v.get("name") or ""
if not _re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", name or ""):
raise ExecutionError(f"SetVars invalid variable name: {name!r}")
mode = v.get("mode", "string")
raw_val = v.get("value", "")
if mode == "expr":
resolved = self._safe_eval_expr(str(raw_val))
else:
resolved = render_template_simple(str(raw_val or ""), context, out_map)
result[name] = resolved
return {"vars": result}
class ProviderCallNode(Node):
type_name = "ProviderCall"
# --- Prompt Manager helpers -------------------------------------------------
def _get_blocks(self) -> List[Dict[str, Any]]:
"""Return normalized list of prompt blocks from config."""
raw = self.config.get("blocks") or self.config.get("prompt_blocks") or []
if not isinstance(raw, list):
return []
norm: List[Dict[str, Any]] = []
for i, b in enumerate(raw):
if not isinstance(b, dict):
continue
role = str(b.get("role", "user")).lower().strip()
if role not in {"system", "user", "assistant", "tool"}:
role = "user"
# order fallback: keep original index if not provided/correct
try:
order = int(b.get("order")) if b.get("order") is not None else i
except Exception: # noqa: BLE001
order = i
norm.append(
{
"id": b.get("id") or f"b{i}",
"name": b.get("name") or f"Block {i+1}",
"role": role,
"prompt": b.get("prompt") or "",
"enabled": bool(b.get("enabled", True)),
"order": order,
}
)
return norm
def _render_blocks_to_unified(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Filter+sort+render blocks to unified messages:
[{role, content, name?}]
"""
out_map = context.get("OUT") or {}
blocks = [b for b in self._get_blocks() if b.get("enabled", True)]
blocks.sort(key=lambda x: x.get("order", 0))
messages: List[Dict[str, Any]] = []
for b in blocks:
content = render_template_simple(str(b.get("prompt") or ""), context, out_map)
# name поля блоков не передаются в провайдерские payload'ы
msg = {"role": b["role"], "content": content}
messages.append(msg)
return messages
def _messages_to_payload(self, provider: str, messages: List[Dict[str, Any]], context: Dict[str, Any]) -> Dict[str, Any]:
"""Convert unified messages to provider-specific request payload."""
params = context.get("params") or {}
model = context.get("model") or ""
if provider == "openai":
payload: Dict[str, Any] = {
"model": model,
"messages": [
{"role": m["role"], "content": m["content"]}
for m in messages
],
"temperature": params.get("temperature", 0.7),
}
if params.get("max_tokens") is not None:
payload["max_tokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
payload["top_p"] = params.get("top_p")
if params.get("stop") is not None:
payload["stop"] = params.get("stop")
return payload
if provider == "gemini":
sys_text = "\n\n".join([m["content"] for m in messages if m["role"] == "system"]).strip()
contents = []
for m in messages:
if m["role"] == "system":
continue
role = "model" if m["role"] == "assistant" else "user"
contents.append({"role": role, "parts": [{"text": m["content"]}]})
gen_cfg: Dict[str, Any] = {}
if params.get("temperature") is not None:
gen_cfg["temperature"] = params.get("temperature")
if params.get("max_tokens") is not None:
gen_cfg["maxOutputTokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
gen_cfg["topP"] = params.get("top_p")
if params.get("stop") is not None:
gen_cfg["stopSequences"] = params.get("stop")
payload = {"model": model, "contents": contents}
if sys_text:
payload["systemInstruction"] = {"parts": [{"text": sys_text}]}
if gen_cfg:
payload["generationConfig"] = gen_cfg
return payload
if provider == "claude":
sys_text = "\n\n".join([m["content"] for m in messages if m["role"] == "system"]).strip()
msgs = []
for m in messages:
if m["role"] == "system":
continue
role = m["role"] if m["role"] in {"user", "assistant"} else "user"
msgs.append({"role": role, "content": [{"type": "text", "text": m["content"]}]})
payload: Dict[str, Any] = {
"model": model,
"messages": msgs,
}
if sys_text:
payload["system"] = sys_text
if params.get("temperature") is not None:
payload["temperature"] = params.get("temperature")
if params.get("max_tokens") is not None:
payload["max_tokens"] = params.get("max_tokens")
if params.get("top_p") is not None:
payload["top_p"] = params.get("top_p")
if params.get("stop") is not None:
payload["stop"] = params.get("stop")
return payload
return {}
def _blocks_struct_for_template(self, provider: str, messages: List[Dict[str, Any]], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Сформировать структуру для вставки в шаблон (template) из Prompt Blocks.
Возвращает provider-специфичные ключи, которые можно вставлять в JSON:
- openai: { "messages": [...] , "system_text": "..." }
- gemini: { "contents": [...], "systemInstruction": {...}, "system_text": "..." }
- claude: { "system_text": "...", "system": "...", "messages": [...] }
"""
provider = (provider or "openai").lower()
# Гарантируем список
msgs = messages or []
if provider == "openai":
# Уже в формате {"role","content"}
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
# Вставляем как есть (editor будет встраивать JSON массива без кавычек)
return {
"messages": [
{"role": m["role"], "content": m.get("content")}
for m in msgs
],
"system_text": sys_text,
}
if provider == "gemini":
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
contents = []
for m in msgs:
if m.get("role") == "system":
continue
role = "model" if m.get("role") == "assistant" else "user"
contents.append({"role": role, "parts": [{"text": str(m.get("content") or "")}]})
d: Dict[str, Any] = {
"contents": contents,
"system_text": sys_text,
}
if sys_text:
d["systemInstruction"] = {"parts": [{"text": sys_text}]}
return d
if provider == "claude":
sys_text = "\n\n".join([m["content"] for m in msgs if m.get("role") == "system"]).strip()
out_msgs = []
for m in msgs:
if m.get("role") == "system":
continue
role = m.get("role")
role = role if role in {"user", "assistant"} else "user"
out_msgs.append({"role": role, "content": [{"type": "text", "text": str(m.get("content") or "")}]})
return {
"system_text": sys_text,
"system": sys_text, # удобно для шаблона: "system": "{{ pm.system_text }}"
"messages": out_msgs,
}
# По умолчанию ничего, но это валидный JSON
return {"messages": []}
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
provider = (self.config.get("provider") or "openai").lower()
# Support provider-specific configs stored in UI as provider_configs.{provider}
prov_cfg: Dict[str, Any] = {}
try:
cfgs = self.config.get("provider_configs") or {}
if isinstance(cfgs, dict):
prov_cfg = cfgs.get(provider) or {}
except Exception: # noqa: BLE001
prov_cfg = {}
base_url = prov_cfg.get("base_url") or self.config.get("base_url")
if not base_url:
raise ExecutionError(f"Node {self.node_id} ({self.type_name}) requires 'base_url' in config")
if not str(base_url).startswith(("http://", "https://")):
base_url = "http://" + str(base_url)
endpoint_tmpl: str = prov_cfg.get("endpoint") or self.config.get("endpoint") or ""
template: str = prov_cfg.get("template") or self.config.get("template") or "{}"
headers_json: str = prov_cfg.get("headers") or self.config.get("headers") or "{}"
# Default endpoints if not set
if not endpoint_tmpl:
if provider == "openai":
endpoint_tmpl = "/v1/chat/completions"
elif provider == "gemini":
endpoint_tmpl = "/v1beta/models/{{ model }}:generateContent"
elif provider == "claude":
endpoint_tmpl = "/v1/messages"
# Подготовим Prompt Blocks + pm-структуру для шаблона
unified_msgs = self._render_blocks_to_unified(context)
pm_struct = self._blocks_struct_for_template(provider, unified_msgs, context)
# Расширяем контекст для рендера шаблонов
render_ctx = dict(context)
render_ctx["pm"] = pm_struct
# Единый JSON-фрагмент PROMPT для шаблонов: [[PROMPT]]
prompt_fragment = ""
try:
if provider == "openai":
prompt_fragment = '"messages": ' + json.dumps(pm_struct.get("messages", []), ensure_ascii=False)
elif provider == "gemini":
parts = []
contents = pm_struct.get("contents")
if contents is not None:
parts.append('"contents": ' + json.dumps(contents, ensure_ascii=False))
sysi = pm_struct.get("systemInstruction")
# даже если пустой объект {}, это валидно
if sysi is not None:
parts.append('"systemInstruction": ' + json.dumps(sysi, ensure_ascii=False))
prompt_fragment = ", ".join(parts)
elif provider == "claude":
parts = []
sys_text = pm_struct.get("system_text") or pm_struct.get("system")
if sys_text is not None:
parts.append('"system": ' + json.dumps(sys_text, ensure_ascii=False))
msgs = pm_struct.get("messages")
if msgs is not None:
parts.append('"messages": ' + json.dumps(msgs, ensure_ascii=False))
prompt_fragment = ", ".join(parts)
except Exception: # noqa: BLE001
prompt_fragment = ""
render_ctx["PROMPT"] = prompt_fragment
# Render helper с поддержкой [[VAR]], [[OUT]] и {{ ... }}
def render(s: str) -> str:
return render_template_simple(s or "", render_ctx, render_ctx.get("OUT") or {})
# Рендер endpoint с макросами/шаблонами
endpoint = render(endpoint_tmpl)
# Формируем тело ТОЛЬКО из template/[[PROMPT]] (без сырого payload/входов).
# Больше НИКАКОГО фоллбэка на unified-построение: если шаблон невалиден — это ошибка ноды.
try:
rendered = render(template)
# DEBUG: печать отрендеренного шаблона с номерами строк для точной диагностики JSONDecodeError
try:
_lines = rendered.splitlines()
_preview = "\n".join(f"{i+1:03d}: {_lines[i]}" for i in range(min(len(_lines), 120)))
print(f"DEBUG: ProviderCallNode rendered_template node={self.node_id} provider={provider}\\n{_preview}")
except Exception:
try:
print(f"DEBUG: ProviderCallNode rendered_template(node={self.node_id}, provider={provider}) len={len(rendered)}")
except Exception:
pass
payload = json.loads(rendered)
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"ProviderCall template invalid JSON: {exc}")
# Заголовки — полностью из редактируемого JSON с макросами
try:
headers_src = render(headers_json) if headers_json else "{}"
headers = json.loads(headers_src) if headers_src else {}
if not isinstance(headers, dict):
raise ValueError("headers must be a JSON object")
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"ProviderCall headers invalid JSON: {exc}")
# Итоговый URL
if not base_url.startswith(("http://", "https://")):
base_url = "http://" + base_url
url = endpoint if endpoint.startswith("http") else urljoin(base_url.rstrip('/') + '/', endpoint.lstrip('/'))
# Debug logs to validate config selection and payload
# Brute request/response logging (FULL, no masking)
try:
final_headers = {"Content-Type": "application/json", **headers}
print("===== ProviderCall REQUEST BEGIN =====")
print(f"node={self.node_id} type={self.type_name} provider={provider}")
print(f"URL: {url}")
try:
print("Headers:")
print(json.dumps(final_headers, ensure_ascii=False, indent=2))
except Exception:
print(f"Headers(raw): {final_headers}")
try:
print("Body JSON:")
print(json.dumps(payload, ensure_ascii=False, indent=2))
except Exception:
print(f"Body(raw): {payload}")
print("===== ProviderCall REQUEST END =====")
except Exception:
pass
async with build_client() as client:
resp = await client.post(url, json=payload, headers=final_headers)
# Do not raise_for_status: keep body/logs on 4xx/5xx
try:
print("===== ProviderCall RESPONSE BEGIN =====")
print(f"node={self.node_id} type={self.type_name} provider={provider}")
print(f"Status: {resp.status_code}")
try:
print("Headers:")
print(json.dumps(dict(resp.headers), ensure_ascii=False, indent=2))
except Exception:
try:
print(f"Headers(raw): {dict(resp.headers)}")
except Exception:
print("Headers(raw): <unavailable>")
try:
body_text = resp.text
except Exception:
body_text = "<resp.text decode error>"
print("Body Text:")
print(body_text)
print("===== ProviderCall RESPONSE END =====")
except Exception:
pass
try:
data = resp.json()
except Exception:
data = {"error": "Failed to decode JSON from upstream", "text": resp.text}
# Извлекаем текст best-effort
text = None
if provider == "openai":
try:
text = data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception: # noqa: BLE001
text = None
elif provider == "gemini":
try:
text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text")
except Exception: # noqa: BLE001
text = None
elif provider == "claude":
try:
blocks = data.get("content") or []
texts = [b.get("text") for b in blocks if isinstance(b, dict) and b.get("type") == "text"]
text = "\n".join([t for t in texts if isinstance(t, str)])
except Exception: # noqa: BLE001
text = None
return {"result": data, "response_text": text or ""}
class RawForwardNode(Node):
type_name = "RawForward"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
incoming = context.get("incoming", {})
raw_payload = incoming.get("json")
base_url: Optional[str] = self.config.get("base_url")
override_path: Optional[str] = self.config.get("override_path")
# Разрешаем макросы в конфиге RawForward (base_url, override_path)
out_map_for_macros = context.get("OUT") or {}
if base_url:
base_url = render_template_simple(str(base_url), context, out_map_for_macros)
if override_path:
override_path = render_template_simple(str(override_path), context, out_map_for_macros)
# Если base_url не указан, включаем автодетекцию
if not base_url:
vendor = detect_vendor(raw_payload)
if vendor == "openai":
base_url = "https://api.openai.com"
elif vendor == "claude":
base_url = "https://api.anthropic.com"
elif vendor == "gemini":
base_url = "https://generativelanguage.googleapis.com"
else:
raise ExecutionError(f"Node {self.node_id} ({self.type_name}): 'base_url' is not configured and vendor could not be detected.")
# Гарантируем наличие схемы у base_url
if not base_url.startswith(("http://", "https://")):
base_url = "http://" + base_url
path = override_path or incoming.get("path") or "/"
query = incoming.get("query")
if query:
path_with_qs = f"{path}?{query}"
else:
path_with_qs = path
url = urljoin(base_url.rstrip("/") + "/", path_with_qs.lstrip("/"))
passthrough_headers: bool = bool(self.config.get("passthrough_headers", True))
extra_headers_json: str = self.config.get("extra_headers") or "{}"
# Макросы в extra_headers
try:
extra_headers_src = render_template_simple(extra_headers_json, context, out_map_for_macros) if extra_headers_json else "{}"
extra_headers = json.loads(extra_headers_src) if extra_headers_src else {}
if not isinstance(extra_headers, dict):
raise ValueError("extra_headers must be an object")
except Exception as exc: # noqa: BLE001
raise ExecutionError(f"RawForward extra_headers invalid JSON: {exc}")
headers: Dict[str, str] = {}
if passthrough_headers:
inc_headers = incoming.get("headers") or {}
# Копируем все заголовки, кроме Host и Content-Length
for k, v in inc_headers.items():
if k.lower() not in ['host', 'content-length']:
headers[k] = v
# Убедимся, что Content-Type на месте, если его не было
if 'content-type' not in {k.lower() for k in headers}:
headers['Content-Type'] = 'application/json'
headers.update(extra_headers)
# Brute request/response logging (FULL, no masking)
try:
print("===== RawForward REQUEST BEGIN =====")
print(f"node={self.node_id} type={self.type_name}")
print(f"URL: {url}")
try:
print("Headers:")
print(json.dumps(headers, ensure_ascii=False, indent=2))
except Exception:
print(f"Headers(raw): {headers}")
try:
print("Body JSON:")
print(json.dumps(raw_payload, ensure_ascii=False, indent=2))
except Exception:
print(f"Body(raw): {raw_payload}")
print("===== RawForward REQUEST END =====")
except Exception:
pass
async with build_client() as client:
resp = await client.post(url, json=raw_payload, headers=headers)
# Response logging
try:
print("===== RawForward RESPONSE BEGIN =====")
print(f"node={self.node_id} type={self.type_name}")
print(f"Status: {resp.status_code}")
try:
print("Headers:")
print(json.dumps(dict(resp.headers), ensure_ascii=False, indent=2))
except Exception:
try:
print(f"Headers(raw): {dict(resp.headers)}")
except Exception:
print("Headers(raw): <unavailable>")
try:
body_text = resp.text
except Exception:
body_text = "<resp.text decode error>"
print("Body Text:")
print(body_text)
print("===== RawForward RESPONSE END =====")
except Exception:
pass
# Decode JSON if possible, otherwise return text
try:
data = resp.json()
except Exception:
data = {"error": "Failed to decode JSON from upstream", "text": resp.text}
return {"result": data}
class ReturnNode(Node):
type_name = "Return"
async def run(self, inputs: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: # noqa: D401
# Определяем целевой формат
cfg = self.config or {}
target = str(cfg.get("target_format", "auto")).lower().strip() or "auto"
if target == "auto":
target = str(context.get("vendor_format") or "openai").lower().strip() or "openai"
# Рендерим текст из шаблона (по умолчанию берём [[OUT1]])
out_map = context.get("OUT") or {}
template = cfg.get("text_template")
if template is None or template == "":
template = "[[OUT1]]"
try:
text = render_template_simple(str(template), context, out_map)
except Exception:
text = ""
model = str(context.get("model") or "")
# Форматтеры под провайдеры (как в execute_pipeline_echo)
def fmt_openai(t: str) -> Dict[str, Any]:
return {
"id": "ret_mock_123",
"object": "chat.completion",
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": t},
"finish_reason": "stop",
}
],
"usage": {"prompt_tokens": 0, "completion_tokens": len((t or "").split()), "total_tokens": 0},
}
def fmt_gemini(t: str) -> Dict[str, Any]:
return {
"candidates": [
{
"content": {
"role": "model",
"parts": [{"text": t}],
},
"finishReason": "STOP",
"index": 0,
}
],
"modelVersion": model,
}
def fmt_claude(t: str) -> Dict[str, Any]:
return {
"id": "msg_ret_123",
"type": "message",
"model": model,
"role": "assistant",
"content": [
{"type": "text", "text": t}
],
"stop_reason": "end_turn",
}
if target == "openai":
result = fmt_openai(text)
elif target == "gemini":
result = fmt_gemini(text)
elif target == "claude":
result = fmt_claude(text)
else:
# неизвестное значение — безопасный дефолт
result = fmt_openai(text)
return {"result": result, "response_text": text}
NODE_REGISTRY.update({
SetVarsNode.type_name: SetVarsNode,
ProviderCallNode.type_name: ProviderCallNode,
RawForwardNode.type_name: RawForwardNode,
ReturnNode.type_name: ReturnNode,
})