Files
HadTavern/agentui/pipeline/templating.py

308 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional
__all__ = [
"_OUT_MACRO_RE",
"_VAR_MACRO_RE",
"_PROMPT_MACRO_RE",
"_OUT_SHORT_RE",
"_BARE_MACRO_RE",
"_BRACES_RE",
"_split_path",
"_get_by_path",
"_stringify_for_template",
"_deep_find_text",
"_best_text_from_outputs",
"render_template_simple",
]
# Regex-макросы (общие для бэка)
_OUT_MACRO_RE = re.compile(r"\[\[\s*OUT\s*[:\s]\s*([^\]]+?)\s*\]\]", re.IGNORECASE)
_VAR_MACRO_RE = re.compile(r"\[\[\s*VAR\s*[:\s]\s*([^\]]+?)\s*\]\]", re.IGNORECASE)
# Единый фрагмент PROMPT (провайдеро-специфичный JSON-фрагмент)
_PROMPT_MACRO_RE = re.compile(r"\[\[\s*PROMPT\s*\]\]", re.IGNORECASE)
# Короткая форма: [[OUT1]] — best-effort текст из ноды n1
_OUT_SHORT_RE = re.compile(r"\[\[\s*OUT\s*(\d+)\s*\]\]", re.IGNORECASE)
# Голые переменные: [[NAME]] или [[path.to.value]] — сначала ищем в vars, затем в контексте
_BARE_MACRO_RE = re.compile(r"\[\[\s*([A-Za-z_][A-Za-z0-9_]*(?:\.[^\]]+?)?)\s*\]\]")
# Подстановки {{ ... }} (включая простейший фильтр |default(...))
_BRACES_RE = re.compile(r"\{\{\s*([^}]+?)\s*\}\}")
def _split_path(path: str) -> List[str]:
return [p.strip() for p in str(path).split(".") if str(p).strip()]
def _get_by_path(obj: Any, path: Optional[str]) -> Any:
if path is None or path == "":
return obj
cur = obj
for seg in _split_path(path):
if isinstance(cur, dict):
if seg in cur:
cur = cur[seg]
else:
return None
elif isinstance(cur, list):
try:
idx = int(seg)
except Exception: # noqa: BLE001
return None
if 0 <= idx < len(cur):
cur = cur[idx]
else:
return None
else:
return None
return cur
def _stringify_for_template(val: Any) -> str:
if val is None:
return ""
if isinstance(val, bool):
# JSON-friendly booleans (useful when embedding into JSON-like templates)
return "true" if val else "false"
if isinstance(val, (dict, list)):
try:
return json.dumps(val, ensure_ascii=False)
except Exception: # noqa: BLE001
return str(val)
return str(val)
def _deep_find_text(obj: Any, max_nodes: int = 5000) -> Optional[str]:
"""
Best-effort поиск первого текстового значения в глубине структуры JSON.
Сначала пытаемся по ключам content/text, затем общий обход.
"""
try:
# Быстрые ветки
if isinstance(obj, str):
return obj
if isinstance(obj, dict):
c = obj.get("content")
if isinstance(c, str):
return c
t = obj.get("text")
if isinstance(t, str):
return t
parts = obj.get("parts")
if isinstance(parts, list) and parts:
for p in parts:
if isinstance(p, dict) and isinstance(p.get("text"), str):
return p.get("text")
# Общий нерекурсивный обход в ширину
queue: List[Any] = [obj]
seen = 0
while queue and seen < max_nodes:
cur = queue.pop(0)
seen += 1
if isinstance(cur, str):
return cur
if isinstance(cur, dict):
# часто встречающиеся поля
for k in ("text", "content"):
v = cur.get(k)
if isinstance(v, str):
return v
# складываем все значения
for v in cur.values():
queue.append(v)
elif isinstance(cur, list):
for it in cur:
queue.append(it)
except Exception:
pass
return None
def _best_text_from_outputs(node_out: Any) -> str:
"""
Унифицированное извлечение "текста" из выхода ноды.
Поддерживает:
- PromptTemplate: {"text": ...}
- LLMInvoke: {"response_text": ...}
- ProviderCall/RawForward: {"result": <provider_json>}, извлекаем текст для openai/gemini/claude
- Общий глубокий поиск текста, если специфичные ветки не сработали
"""
# Строка сразу
if isinstance(node_out, str):
return node_out
if not isinstance(node_out, dict):
return ""
# Явные короткие поля
if isinstance(node_out.get("response_text"), str) and node_out.get("response_text"):
return str(node_out["response_text"])
if isinstance(node_out.get("text"), str) and node_out.get("text"):
return str(node_out["text"])
res = node_out.get("result")
base = res if isinstance(res, (dict, list)) else node_out
# OpenAI
try:
if isinstance(base, dict):
ch0 = (base.get("choices") or [{}])[0]
msg = ch0.get("message") or {}
c = msg.get("content")
if isinstance(c, str):
return c
except Exception:
pass
# Gemini
try:
if isinstance(base, dict):
cand0 = (base.get("candidates") or [{}])[0]
content = cand0.get("content") or {}
parts0 = (content.get("parts") or [{}])[0]
t = parts0.get("text")
if isinstance(t, str):
return t
except Exception:
pass
# Claude
try:
if isinstance(base, dict):
blocks = base.get("content") or []
texts = [b.get("text") for b in blocks if isinstance(b, dict) and isinstance(b.get("text"), str)]
if texts:
return "\n".join(texts)
except Exception:
pass
# Общий глубокий поиск
txt = _deep_find_text(base)
return txt or ""
def render_template_simple(template: str, context: Dict[str, Any], out_map: Dict[str, Any]) -> str:
"""
Простая подстановка:
- {{ path }} — берёт из context (или {{ OUT.node.path }} для выходов)
- Поддержка фильтра по умолчанию: {{ path|default(value) }}
value может быть числом, строкой ('..'/".."), массивом/объектом в виде литерала.
- [[VAR:path]] — берёт из context
- [[OUT:nodeId(.path)*]] — берёт из out_map
Возвращает строку.
"""
if template is None:
return ""
s = str(template)
# 1) Макросы [[VAR:...]] и [[OUT:...]]
def repl_var(m: re.Match) -> str:
path = m.group(1).strip()
val = _get_by_path(context, path)
return _stringify_for_template(val)
def repl_out(m: re.Match) -> str:
body = m.group(1).strip()
if "." in body:
node_id, rest = body.split(".", 1)
node_val = out_map.get(node_id)
val = _get_by_path(node_val, rest)
else:
val = out_map.get(body)
return _stringify_for_template(val)
s = _VAR_MACRO_RE.sub(repl_var, s)
s = _OUT_MACRO_RE.sub(repl_out, s)
# [[OUT1]] → текст из ноды n1 (best-effort)
def repl_out_short(m: re.Match) -> str:
try:
num = int(m.group(1))
node_id = f"n{num}"
node_out = out_map.get(node_id)
txt = _best_text_from_outputs(node_out)
return _stringify_for_template(txt)
except Exception:
return ""
s = _OUT_SHORT_RE.sub(repl_out_short, s)
# [[PROMPT]] — провайдеро-специфичный JSON-фрагмент, подготовленный в context["PROMPT"]
s = _PROMPT_MACRO_RE.sub(lambda _m: str(context.get("PROMPT") or ""), s)
# 1.5) Голые [[NAME]] / [[path.to.value]]
def repl_bare(m: re.Match) -> str:
name = m.group(1).strip()
# Зарезервированные формы уже обработаны выше; бережно пропускаем похожие
if name.upper() in {"OUT", "VAR", "PROMPT"} or re.fullmatch(r"OUT\d+", name.upper() or ""):
return m.group(0)
# Сначала пользовательские переменные
vmap = context.get("vars") or {}
if isinstance(vmap, dict) and name in vmap:
return _stringify_for_template(vmap.get(name))
# Затем путь из общего контекста
val = _get_by_path(context, name)
return _stringify_for_template(val)
s = _BARE_MACRO_RE.sub(repl_bare, s)
# 2) Подстановки {{ ... }} (+ simple default filter)
def repl_braces(m: re.Match) -> str:
expr = m.group(1).strip()
def eval_path(p: str) -> Any:
p = p.strip()
# Приоритет пользовательских переменных для простых идентификаторов {{ NAME }}
vmap = context.get("vars") or {}
if re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", p) and isinstance(vmap, dict) and p in vmap:
return vmap.get(p)
if p.startswith("OUT."):
body = p[4:]
if "." in body:
node_id, rest = body.split(".", 1)
node_val = out_map.get(node_id)
return _get_by_path(node_val, rest)
return out_map.get(body)
return _get_by_path(context, p)
default_match = re.match(r"([^|]+)\|\s*default\((.*)\)\s*$", expr)
if default_match:
base_path = default_match.group(1).strip()
fallback_raw = default_match.group(2).strip()
# Рекурсивная обработка вложенных default(...) и путей
def eval_default(raw: str) -> Any:
raw = raw.strip()
# Вложенный default: a|default(b)
dm = re.match(r"([^|]+)\|\s*default\((.*)\)\s*$", raw)
if dm:
base2 = dm.group(1).strip()
fb2 = dm.group(2).strip()
v2 = eval_path(base2)
if v2 not in (None, ""):
return v2
return eval_default(fb2)
# Пробуем как путь
v = eval_path(raw)
if v not in (None, ""):
return v
# Явная строка в кавычках
if len(raw) >= 2 and ((raw[0] == '"' and raw[-1] == '"') or (raw[0] == "'" and raw[-1] == "'")):
return raw[1:-1]
# Пробуем распарсить как JSON литерал (число/объект/массив/true/false/null)
try:
return json.loads(raw)
except Exception:
# Последний вариант: вернуть сырой текст. Для строк рекомендуется default('...') с кавычками.
return raw
raw_val = eval_path(base_path)
val = raw_val if raw_val not in (None, "") else eval_default(fallback_raw)
else:
val = eval_path(expr)
return _stringify_for_template(val)
s = _BRACES_RE.sub(repl_braces, s)
return s