Files
HadTavern/agentui/api/server.py

803 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Request, HTTPException, Query, Header
import logging
import json
from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode, unquote
from fastapi.responses import JSONResponse, HTMLResponse, StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
import os
import hashlib
import time
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Literal, Optional
from agentui.pipeline.executor import PipelineExecutor
from agentui.pipeline.defaults import default_pipeline
from agentui.pipeline.storage import load_pipeline, save_pipeline, list_presets, load_preset, save_preset, load_var_store
from agentui.common.vendors import detect_vendor
from agentui.common.cancel import request_cancel, clear_cancel, is_cancelled
from agentui.pipeline.templating import render_template_simple
class UnifiedParams(BaseModel):
temperature: float = 0.7
max_tokens: Optional[int] = None
top_p: Optional[float] = 1.0
stop: Optional[List[str]] = None
class UnifiedMessage(BaseModel):
role: Literal["system", "user", "assistant", "tool"]
content: Any
tool_call_id: Optional[str] = None
name: Optional[str] = None
class UnifiedChatRequest(BaseModel):
vendor_format: Literal["openai", "gemini", "claude", "unknown"] = "unknown"
model: str = ""
messages: List[UnifiedMessage] = Field(default_factory=list)
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[Any] = None
params: UnifiedParams = Field(default_factory=UnifiedParams)
system: Optional[str] = None
stream: bool = False
metadata: Dict[str, Any] = Field(default_factory=dict)
def normalize_to_unified(payload: Dict[str, Any]) -> UnifiedChatRequest:
vendor = detect_vendor(payload)
if vendor == "openai":
model = payload.get("model", "")
messages = payload.get("messages", [])
system = None
# OpenAI может иметь system в messages
norm_messages: List[UnifiedMessage] = []
for m in messages:
role = m.get("role", "user")
content = m.get("content")
if role == "system" and system is None and isinstance(content, str):
system = content
else:
norm_messages.append(UnifiedMessage(role=role, content=content))
params = UnifiedParams(
temperature=payload.get("temperature", 0.7),
max_tokens=payload.get("max_tokens"),
top_p=payload.get("top_p", 1.0),
stop=payload.get("stop"),
)
stream = bool(payload.get("stream", False))
return UnifiedChatRequest(
vendor_format="openai",
model=model,
messages=norm_messages,
params=params,
system=system,
stream=stream,
tools=payload.get("tools"),
tool_choice=payload.get("tool_choice"),
)
elif vendor == "gemini":
# Gemini → Unified (упрощённо, текст только)
model = payload.get("model", "")
contents = payload.get("contents", [])
norm_messages: List[UnifiedMessage] = []
for c in contents:
raw_role = c.get("role", "user")
# Gemini использует role: "user" и "model" — маппим "model" -> "assistant"
role = "assistant" if raw_role == "model" else (raw_role if raw_role in {"user", "system", "assistant", "tool"} else "user")
parts = c.get("parts", [])
# текстовые части склеиваем
text_parts = []
for p in parts:
if isinstance(p, dict) and "text" in p:
text_parts.append(p["text"])
content = "\n".join(text_parts) if text_parts else parts
norm_messages.append(UnifiedMessage(role=role, content=content))
gen = payload.get("generationConfig", {})
params = UnifiedParams(
temperature=gen.get("temperature", 0.7),
max_tokens=gen.get("maxOutputTokens"),
top_p=gen.get("topP", 1.0),
stop=gen.get("stopSequences"),
)
return UnifiedChatRequest(
vendor_format="gemini",
model=model,
messages=norm_messages,
params=params,
stream=False,
)
elif vendor == "claude":
model = payload.get("model", "")
system = payload.get("system")
messages = payload.get("messages", [])
norm_messages: List[UnifiedMessage] = []
for m in messages:
role = m.get("role", "user")
content_raw = m.get("content")
# Anthropic messages API: content может быть строкой или массивом блоков {type:"text", text:"..."}
if isinstance(content_raw, list):
text_parts: List[str] = []
for part in content_raw:
if isinstance(part, dict) and part.get("type") == "text" and isinstance(part.get("text"), str):
text_parts.append(part["text"])
content = "\n".join(text_parts)
else:
content = content_raw
norm_messages.append(UnifiedMessage(role=role, content=content))
params = UnifiedParams(
temperature=payload.get("temperature", 0.7),
max_tokens=payload.get("max_tokens"),
top_p=payload.get("top_p", 1.0),
stop=payload.get("stop"),
)
return UnifiedChatRequest(
vendor_format="claude",
model=model,
messages=norm_messages,
params=params,
system=system,
stream=False,
)
else:
raise HTTPException(status_code=400, detail="Unsupported or unknown vendor payload")
def build_macro_context(u: UnifiedChatRequest, incoming: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
last_user = next((m.content for m in reversed(u.messages) if m.role == "user"), "")
inc = incoming or {}
# Распарсим query-параметры (в т.ч. key для Gemini)
try:
qparams = dict(parse_qsl(inc.get("query", ""), keep_blank_values=True))
except Exception: # noqa: BLE001
qparams = {}
inc_enriched: Dict[str, Any] = dict(inc)
inc_enriched["query_params"] = qparams
# Необязательный удобный срез ключей
try:
headers = inc.get("headers") or {}
api_keys: Dict[str, Any] = {}
if isinstance(headers, dict):
api_keys["authorization"] = headers.get("authorization") or headers.get("Authorization")
api_keys["key"] = qparams.get("key")
if api_keys:
inc_enriched["api_keys"] = api_keys
except Exception: # noqa: BLE001
pass
return {
"vendor_format": u.vendor_format,
"model": u.model,
"system": u.system or "",
"chat": {
"last_user": last_user,
"messages": [m.model_dump() for m in u.messages],
},
"params": u.params.model_dump(),
"incoming": inc_enriched,
}
# jinja_render removed (duplication). Use agentui.pipeline.templating.render_template_simple instead.
async def execute_pipeline_echo(u: UnifiedChatRequest) -> Dict[str, Any]:
# Мини-пайплайн: PromptTemplate -> LLMInvoke(echo) -> VendorFormatter
macro_ctx = build_macro_context(u)
# PromptTemplate
prompt_template = "System: {{ system }}\nUser: {{ chat.last_user }}"
rendered_prompt = render_template_simple(prompt_template, macro_ctx, {})
# LLMInvoke (echo, т.к. без реального провайдера в MVP)
llm_response_text = f"[echo by {u.model}]\n" + rendered_prompt
# Дополняем эхо человекочитаемым трейсом выполнения пайплайна (если есть)
try:
pid = (load_pipeline().get("id", "pipeline_editor"))
store = load_var_store(pid) or {}
snap = store.get("snapshot") or {}
trace_text = str(snap.get("EXEC_TRACE") or "").strip()
if trace_text:
llm_response_text = llm_response_text + "\n\n[Execution Trace]\n" + trace_text
except Exception:
pass
# VendorFormatter
if u.vendor_format == "openai":
return {
"id": "mockcmpl-123",
"object": "chat.completion",
"model": u.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": llm_response_text},
"finish_reason": "stop",
}
],
"usage": {"prompt_tokens": 0, "completion_tokens": len(llm_response_text.split()), "total_tokens": 0},
}
if u.vendor_format == "gemini":
return {
"candidates": [
{
"content": {
"role": "model",
"parts": [{"text": llm_response_text}],
},
"finishReason": "STOP",
"index": 0,
}
],
"modelVersion": u.model,
}
if u.vendor_format == "claude":
return {
"id": "msg_mock_123",
"type": "message",
"model": u.model,
"role": "assistant",
"content": [
{"type": "text", "text": llm_response_text}
],
"stop_reason": "end_turn",
}
raise HTTPException(status_code=400, detail="Unsupported vendor for formatting")
def create_app() -> FastAPI:
app = FastAPI(title="НадTavern")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agentui")
if not logger.handlers:
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)
# --- Simple in-process SSE hub (subscriptions per browser tab) ---
import asyncio as _asyncio
class _SSEHub:
def __init__(self) -> None:
self._subs: List[_asyncio.Queue] = []
def subscribe(self) -> _asyncio.Queue:
q: _asyncio.Queue = _asyncio.Queue()
self._subs.append(q)
return q
def unsubscribe(self, q: _asyncio.Queue) -> None:
try:
self._subs.remove(q)
except ValueError:
pass
async def publish(self, event: Dict[str, Any]) -> None:
# Fan-out to all subscribers; drop if queue is full
for q in list(self._subs):
try:
await q.put(event)
except Exception:
pass
_trace_hub = _SSEHub()
def _mask_headers(h: Dict[str, Any]) -> Dict[str, Any]:
# Временно отключаем маскировку Authorization для отладки
hidden = {"x-api-key", "cookie"}
masked: Dict[str, Any] = {}
for k, v in h.items():
lk = k.lower()
if lk in hidden:
masked[k] = "***"
else:
masked[k] = v
return masked
def _sanitize_url(url: str) -> str:
try:
parts = urlsplit(url)
qs = parse_qsl(parts.query, keep_blank_values=True)
qs_masked = [(k, "***" if k.lower() in {"key", "access_token", "token"} else v) for k, v in qs]
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(qs_masked), parts.fragment))
except Exception: # noqa: BLE001
return url
async def _log_request(req: Request, raw_body: Optional[bytes] = None, parsed: Optional[Any] = None) -> None:
try:
url = _sanitize_url(str(req.url))
headers = _mask_headers(dict(req.headers))
body_preview = None
if raw_body is not None:
body_preview = raw_body.decode(errors="ignore")
if len(body_preview) > 4000:
body_preview = body_preview[:4000] + "...<truncated>"
payload = {
"event": "incoming_request",
"method": req.method,
"url": url,
"headers": headers,
"body": body_preview,
"json": parsed if isinstance(parsed, (dict, list)) else None,
}
logger.info("%s", json.dumps(payload, ensure_ascii=False))
except Exception: # noqa: BLE001
pass
async def _log_response(req: Request, status: int, data: Any) -> None:
try:
payload = {
"event": "outgoing_response",
"method": req.method,
"path": req.url.path,
"status": status,
"json": data if isinstance(data, (dict, list)) else None,
}
logger.info("%s", json.dumps(payload, ensure_ascii=False))
except Exception: # noqa: BLE001
pass
async def _run_pipeline_for_payload(request: Request, payload: Dict[str, Any], raw: Optional[bytes] = None) -> JSONResponse:
# Единый обработчик: лог входящего запроса, нормализация, запуск PipelineExecutor, fallback-echo, лог ответа
await _log_request(request, raw_body=raw, parsed=payload)
unified = normalize_to_unified(payload)
unified.stream = False
incoming = {
"method": request.method,
"url": _sanitize_url(str(request.url)),
"path": request.url.path,
"query": request.url.query,
"headers": dict(request.headers),
"json": payload,
}
macro_ctx = build_macro_context(unified, incoming=incoming)
pipeline = load_pipeline()
executor = PipelineExecutor(pipeline)
async def _trace(evt: Dict[str, Any]) -> None:
try:
base = {"pipeline_id": pipeline.get("id", "pipeline_editor")}
await _trace_hub.publish({**base, **evt})
except Exception:
pass
# Диагностический INFOлог для валидации рефакторинга
try:
logger.info(
"%s",
json.dumps(
{
"event": "unified_handler",
"vendor": unified.vendor_format,
"model": unified.model,
"pipeline_id": pipeline.get("id", "pipeline_editor"),
},
ensure_ascii=False,
),
)
except Exception:
pass
# Mark pipeline start for UI and measure total active time
t0 = time.perf_counter()
try:
await _trace_hub.publish({
"event": "pipeline_start",
"pipeline_id": pipeline.get("id", "pipeline_editor"),
"ts": int(time.time() * 1000),
})
except Exception:
pass
last = await executor.run(macro_ctx, trace=_trace)
result = last.get("result") or await execute_pipeline_echo(unified)
# Mark pipeline end for UI
t1 = time.perf_counter()
try:
await _trace_hub.publish({
"event": "pipeline_done",
"pipeline_id": pipeline.get("id", "pipeline_editor"),
"ts": int(time.time() * 1000),
"duration_ms": int((t1 - t0) * 1000),
})
except Exception:
pass
await _log_response(request, 200, result)
return JSONResponse(result)
@app.get("/")
async def index() -> HTMLResponse:
html = (
"<html><head><title>НадTavern</title></head>"
"<body>"
"<h1>НадTavern</h1>"
"<p>Простой UI и API запущены.</p>"
"<p>POST /v1/chat/completions — универсальный эндпоинт (без стриминга)."
" Поддерживает OpenAI/Gemini/Claude формы. Возвращает в исходном формате.</p>"
"<p><a href='/ui'>Перейти в UI</a></p>"
"</body></html>"
)
return HTMLResponse(html)
@app.post("/v1/chat/completions")
async def chat_completions(request: Request) -> JSONResponse:
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
return await _run_pipeline_for_payload(request, payload, raw)
# Google AI Studio совместимые роуты (Gemini):
# POST /v1beta/models/{model}:generateContent?key=...
# POST /v1/models/{model}:generateContent?key=...
@app.post("/v1beta/models/{model}:generateContent")
async def gemini_generate_content_v1beta(model: str, request: Request, key: Optional[str] = Query(default=None)) -> JSONResponse: # noqa: ARG001
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Invalid payload type")
payload = {**payload, "model": model}
return await _run_pipeline_for_payload(request, payload, raw)
@app.post("/v1/models/{model}:generateContent")
async def gemini_generate_content_v1(model: str, request: Request, key: Optional[str] = Query(default=None)) -> JSONResponse: # noqa: ARG001
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Invalid payload type")
payload = {**payload, "model": model}
return await _run_pipeline_for_payload(request, payload, raw)
# Catch-all для случаев, когда двоеточие в пути закодировано как %3A
@app.post("/v1beta/models/{rest_of_path:path}")
async def gemini_generate_content_v1beta_catchall(rest_of_path: str, request: Request, key: Optional[str] = Query(default=None)) -> JSONResponse: # noqa: ARG001
decoded = unquote(rest_of_path)
if ":generateContent" not in decoded:
raise HTTPException(status_code=404, detail="Not Found")
model = decoded.split(":generateContent", 1)[0]
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Invalid payload type")
payload = {**payload, "model": model}
return await _run_pipeline_for_payload(request, payload, raw)
@app.post("/v1/models/{rest_of_path:path}")
async def gemini_generate_content_v1_catchall(rest_of_path: str, request: Request, key: Optional[str] = Query(default=None)) -> JSONResponse: # noqa: ARG001
decoded = unquote(rest_of_path)
if ":generateContent" not in decoded:
raise HTTPException(status_code=404, detail="Not Found")
model = decoded.split(":generateContent", 1)[0]
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Invalid payload type")
payload = {**payload, "model": model}
return await _run_pipeline_for_payload(request, payload, raw)
# Anthropic Claude messages endpoint compatibility
@app.post("/v1/messages")
async def claude_messages(request: Request, anthropic_version: Optional[str] = Header(default=None)) -> JSONResponse: # noqa: ARG001
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Invalid payload type")
if anthropic_version:
payload = {**payload, "anthropic_version": anthropic_version}
else:
payload = {**payload, "anthropic_version": payload.get("anthropic_version", "2023-06-01")}
return await _run_pipeline_for_payload(request, payload, raw)
app.mount("/ui", StaticFiles(directory="static", html=True), name="ui")
# NOTE: нельзя объявлять эндпоинты под /ui/* после монтирования StaticFiles(/ui),
# т.к. монтирование перехватывает все пути под /ui. Используем отдельный путь /ui_version.
@app.get("/ui_version")
async def ui_version() -> JSONResponse:
try:
import time
static_dir = os.path.abspath("static")
editor_path = os.path.join(static_dir, "editor.html")
js_ser_path = os.path.join(static_dir, "js", "serialization.js")
js_pm_path = os.path.join(static_dir, "js", "pm-ui.js")
def md5p(p: str):
try:
with open(p, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except Exception:
return None
payload = {
"cwd": os.path.abspath("."),
"static_dir": static_dir,
"files": {
"editor.html": md5p(editor_path),
"js/serialization.js": md5p(js_ser_path),
"js/pm-ui.js": md5p(js_pm_path),
},
"ts": int(time.time()),
}
return JSONResponse(payload, headers={"Cache-Control": "no-store"})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500, headers={"Cache-Control": "no-store"})
# --- Favicon and PWA icons at root -----------------------------------------
FAV_DIR = "favicon_io_saya"
@app.get("/favicon.ico")
async def _favicon_ico():
p = f"{FAV_DIR}/favicon.ico"
try:
return FileResponse(p, media_type="image/x-icon")
except Exception:
raise HTTPException(status_code=404, detail="favicon not found")
@app.get("/apple-touch-icon.png")
async def _apple_touch_icon():
p = f"{FAV_DIR}/apple-touch-icon.png"
try:
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="apple-touch-icon not found")
@app.get("/favicon-32x32.png")
async def _favicon_32():
p = f"{FAV_DIR}/favicon-32x32.png"
try:
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="favicon-32x32 not found")
@app.get("/favicon-16x16.png")
async def _favicon_16():
p = f"{FAV_DIR}/favicon-16x16.png"
try:
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="favicon-16x16 not found")
@app.get("/android-chrome-192x192.png")
async def _android_192():
p = f"{FAV_DIR}/android-chrome-192x192.png"
try:
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="android-chrome-192x192 not found")
@app.get("/android-chrome-512x512.png")
async def _android_512():
p = f"{FAV_DIR}/android-chrome-512x512.png"
try:
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="android-chrome-512x512 not found")
@app.get("/site.webmanifest")
async def _site_manifest():
p = f"{FAV_DIR}/site.webmanifest"
try:
return FileResponse(p, media_type="application/manifest+json")
except Exception:
raise HTTPException(status_code=404, detail="site.webmanifest not found")
# Custom APNG favicon for "busy" state in UI
@app.get("/saya1.png")
async def _apng_busy_icon():
p = f"{FAV_DIR}/saya1.png"
try:
# APNG served as image/png is acceptable for browsers
return FileResponse(p, media_type="image/png")
except Exception:
raise HTTPException(status_code=404, detail="saya1.png not found")
# Variable store API (per-pipeline)
@app.get("/admin/vars")
async def get_vars() -> JSONResponse:
try:
p = load_pipeline()
pid = p.get("id", "pipeline_editor")
except Exception:
p = default_pipeline()
pid = p.get("id", "pipeline_editor")
store = {}
try:
from agentui.pipeline.storage import load_var_store
store = load_var_store(pid)
except Exception:
store = {}
return JSONResponse({"pipeline_id": pid, "store": store})
@app.delete("/admin/vars")
async def clear_vars() -> JSONResponse:
try:
p = load_pipeline()
pid = p.get("id", "pipeline_editor")
except Exception:
p = default_pipeline()
pid = p.get("id", "pipeline_editor")
try:
from agentui.pipeline.storage import clear_var_store
clear_var_store(pid)
except Exception:
pass
return JSONResponse({"ok": True})
# Admin API для пайплайна
@app.get("/admin/pipeline")
async def get_pipeline() -> JSONResponse:
p = load_pipeline()
# Диагностический лог состава meta (для подтверждения DRY-рефакторинга)
try:
meta_keys = [
"id","name","parallel_limit","loop_mode","loop_max_iters","loop_time_budget_ms","clear_var_store",
"http_timeout_sec","text_extract_strategy","text_extract_json_path","text_join_sep","text_extract_presets"
]
present = [k for k in meta_keys if k in p]
meta_preview = {k: p.get(k) for k in present if k != "text_extract_presets"}
presets_count = 0
try:
presets = p.get("text_extract_presets")
if isinstance(presets, list):
presets_count = len(presets)
except Exception:
presets_count = 0
logger.info(
"%s",
json.dumps(
{
"event": "admin_get_pipeline_meta",
"keys": present,
"presets_count": presets_count,
"meta_preview": meta_preview,
},
ensure_ascii=False,
),
)
except Exception:
pass
return JSONResponse(p)
@app.post("/admin/pipeline")
async def set_pipeline(request: Request) -> JSONResponse:
raw = await request.body()
try:
pipeline = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
# простая проверка
if not isinstance(pipeline, dict) or "nodes" not in pipeline:
raise HTTPException(status_code=400, detail="Invalid pipeline format")
# Диагностический лог входящих meta-ключей перед сохранением
try:
meta_keys = [
"id","name","parallel_limit","loop_mode","loop_max_iters","loop_time_budget_ms","clear_var_store",
"http_timeout_sec","text_extract_strategy","text_extract_json_path","text_join_sep","text_extract_presets"
]
present = [k for k in meta_keys if k in pipeline]
meta_preview = {k: pipeline.get(k) for k in present if k != "text_extract_presets"}
presets_count = 0
try:
presets = pipeline.get("text_extract_presets")
if isinstance(presets, list):
presets_count = len(presets)
except Exception:
presets_count = 0
logger.info(
"%s",
json.dumps(
{
"event": "admin_set_pipeline_meta",
"keys": present,
"presets_count": presets_count,
"meta_preview": meta_preview,
},
ensure_ascii=False,
),
)
except Exception:
pass
save_pipeline(pipeline)
return JSONResponse({"ok": True})
# Presets
@app.get("/admin/presets")
async def get_presets() -> JSONResponse:
return JSONResponse({"items": list_presets()})
@app.get("/admin/presets/{name}")
async def get_preset(name: str) -> JSONResponse:
try:
return JSONResponse(load_preset(name))
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Preset not found")
@app.post("/admin/presets/{name}")
async def put_preset(name: str, request: Request) -> JSONResponse:
raw = await request.body()
try:
payload = json.loads(raw or b"{}")
except Exception: # noqa: BLE001
raise HTTPException(status_code=400, detail="Invalid JSON")
if not isinstance(payload, dict) or "nodes" not in payload:
raise HTTPException(status_code=400, detail="Invalid pipeline format")
save_preset(name, payload)
return JSONResponse({"ok": True})
# --- Manual cancel/clear for pipeline execution ---
@app.post("/admin/cancel")
async def admin_cancel() -> JSONResponse:
try:
p = load_pipeline()
pid = p.get("id", "pipeline_editor")
except Exception:
p = default_pipeline()
pid = p.get("id", "pipeline_editor")
try:
request_cancel(pid)
except Exception:
pass
return JSONResponse({"ok": True, "pipeline_id": pid, "cancelled": True})
@app.post("/admin/cancel/clear")
async def admin_cancel_clear() -> JSONResponse:
try:
p = load_pipeline()
pid = p.get("id", "pipeline_editor")
except Exception:
p = default_pipeline()
pid = p.get("id", "pipeline_editor")
try:
clear_cancel(pid)
except Exception:
pass
return JSONResponse({"ok": True, "pipeline_id": pid, "cancelled": False})
# --- SSE endpoint for live pipeline trace ---
@app.get("/admin/trace/stream")
async def sse_trace() -> StreamingResponse:
loop = _asyncio.get_event_loop()
q = _trace_hub.subscribe()
async def _gen():
try:
# warm-up: send a comment to keep connection open
yield ":ok\n\n"
while True:
evt = await q.get()
try:
line = f"data: {json.dumps(evt, ensure_ascii=False)}\n\n"
except Exception:
line = "data: {}\n\n"
yield line
except Exception:
pass
finally:
_trace_hub.unsubscribe(q)
return StreamingResponse(_gen(), media_type="text/event-stream")
return app
app = create_app()