import asyncio import json from typing import Any, Dict, List from agentui.pipeline.executor import PipelineExecutor import agentui.providers.http_client as hc from tests.utils import ctx as _ctx, pp as _pp # Capture of all outbound ProviderCall HTTP requests (one per run) CAPTURED: List[Dict[str, Any]] = [] class DummyResponse: def __init__(self, status_code: int = 200, body: Dict[str, Any] | None = None): self.status_code = status_code self._json = body if body is not None else {"ok": True} self.headers = {} try: self.content = json.dumps(self._json, ensure_ascii=False).encode("utf-8") except Exception: self.content = b"{}" try: self.text = json.dumps(self._json, ensure_ascii=False) except Exception: self.text = "{}" def json(self) -> Any: return self._json class DummyClient: def __init__(self, capture: List[Dict[str, Any]], status_code: int = 200): self._capture = capture self._status = status_code async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return False async def post(self, url: str, content: bytes, headers: Dict[str, str]): try: payload = json.loads(content.decode("utf-8")) except Exception: payload = {"_raw": content.decode("utf-8", errors="ignore")} rec = {"url": url, "headers": headers, "payload": payload} self._capture.append(rec) # Echo payload back to keep extractor happy but not tied to vendor formats return DummyResponse(self._status, {"echo": rec}) # RawForward may use .request, but we don't need it here async def request(self, method: str, url: str, headers: Dict[str, str], content: bytes | None): return await self.post(url, content or b"{}", headers) def _patch_http_client(): """Monkeypatch build_client used by ProviderCall to our dummy.""" hc.build_client = lambda timeout=60.0: DummyClient(CAPTURED, 200) # type: ignore[assignment] # Также патчим символ, импортированный внутрь executor, чтобы ProviderCall использовал DummyClient import agentui.pipeline.executor as ex # type: ignore ex.build_client = lambda timeout=60.0: DummyClient(CAPTURED, 200) # type: ignore def _mk_pipeline(provider: str, prompt_combine: str) -> Dict[str, Any]: """Build a minimal ProviderCall-only pipeline for a given provider and combine spec.""" provider = provider.lower().strip() if provider not in {"openai", "gemini", "claude"}: raise AssertionError(f"Unsupported provider in test: {provider}") base_url = "http://mock.local" if provider == "openai": endpoint = "/v1/chat/completions" template = '{ "model": "{{ model }}", [[PROMPT]] }' elif provider == "gemini": endpoint = "/v1beta/models/{{ model }}:generateContent" template = '{ "model": "{{ model }}", [[PROMPT]] }' else: # claude endpoint = "/v1/messages" template = '{ "model": "{{ model }}", [[PROMPT]] }' p = { "id": f"p_prompt_combine_{provider}", "name": f"prompt_combine to {provider}", "loop_mode": "dag", "nodes": [ { "id": "n1", "type": "ProviderCall", "config": { "provider": provider, "provider_configs": { provider: { "base_url": base_url, "endpoint": endpoint, "headers": "{}", "template": template, } }, # Key under test: "prompt_combine": prompt_combine, # Prompt Blocks (PROMPT) "blocks": [ {"id": "b1", "name": "sys", "role": "system", "prompt": "Ты — Narrator-chan.", "enabled": True, "order": 0}, {"id": "b2", "name": "user", "role": "user", "prompt": "как лела", "enabled": True, "order": 1}, ], }, "in": {}, } ], } return p def _ctx_with_incoming(incoming_json: Dict[str, Any], vendor: str = "openai") -> Dict[str, Any]: base = _ctx(vendor=vendor) inc = dict(base["incoming"]) inc["json"] = incoming_json base["incoming"] = inc return base async def scenario_openai_target_from_gemini_contents(): print("\n=== PROMPT_COMBINE 1: target=openai, incoming=gemini.contents & PROMPT ===") _patch_http_client() CAPTURED.clear() # Incoming JSON in Gemini shape incoming_json = { "contents": [ {"role": "user", "parts": [{"text": "Прив"}]}, {"role": "model", "parts": [{"text": "И тебе привет!"}]}, ] } p = _mk_pipeline("openai", "[[VAR:incoming.json.contents]] & [[PROMPT]]") out = await PipelineExecutor(p).run(_ctx_with_incoming(incoming_json, vendor="gemini")) print("PIPE OUT:", _pp(out)) assert CAPTURED, "No HTTP request captured" req = CAPTURED[-1] payload = req["payload"] # Validate OpenAI body assert "messages" in payload, "OpenAI payload must contain messages" msgs = payload["messages"] # Expected: 2 (converted Gemini) + 2 (PROMPT blocks system+user) = 4 assert isinstance(msgs, list) and len(msgs) == 4 roles = [m.get("role") for m in msgs] # Gemini model -> OpenAI assistant assert "assistant" in roles and "user" in roles # PROMPT system+user present (system may be not first without @pos; we just ensure existence) assert any(m.get("role") == "system" for m in msgs), "System message from PROMPT must be present" async def scenario_gemini_target_from_openai_messages(): print("\n=== PROMPT_COMBINE 2: target=gemini, incoming=openai.messages & PROMPT ===") _patch_http_client() CAPTURED.clear() incoming_json = { "messages": [ {"role": "system", "content": "Системный-тест из входящего"}, {"role": "user", "content": "Its just me.."}, {"role": "assistant", "content": "Reply from model"}, ] } p = _mk_pipeline("gemini", "[[VAR:incoming.json.messages]] & [[PROMPT]]") out = await PipelineExecutor(p).run(_ctx_with_incoming(incoming_json, vendor="openai")) print("PIPE OUT:", _pp(out)) assert CAPTURED, "No HTTP request captured" payload = CAPTURED[-1]["payload"] # Validate Gemini body assert "contents" in payload, "Gemini payload must contain contents" cnts = payload["contents"] assert isinstance(cnts, list) # PROMPT system goes to systemInstruction, user block goes to contents assert "systemInstruction" in payload, "Gemini payload must contain systemInstruction when system text exists" si = payload["systemInstruction"] # SystemInstruction.parts[].text must include both incoming system and PROMPT system merged si_texts = [] try: for prt in si.get("parts", []): t = prt.get("text") if isinstance(t, str) and t.strip(): si_texts.append(t.strip()) except Exception: pass joined = "\n".join(si_texts) assert "Системный-тест из входящего" in joined, "Incoming system must be merged into systemInstruction" assert "Narrator-chan" in joined, "PROMPT system must be merged into systemInstruction" async def scenario_claude_target_from_openai_messages(): print("\n=== PROMPT_COMBINE 3: target=claude, incoming=openai.messages & PROMPT ===") _patch_http_client() CAPTURED.clear() incoming_json = { "messages": [ {"role": "system", "content": "Системный-тест CLAUDE"}, {"role": "user", "content": "Прив"}, {"role": "assistant", "content": "Привет!"}, ] } p = _mk_pipeline("claude", "[[VAR:incoming.json.messages]] & [[PROMPT]]") out = await PipelineExecutor(p).run(_ctx_with_incoming(incoming_json, vendor="openai")) print("PIPE OUT:", _pp(out)) assert CAPTURED, "No HTTP request captured" payload = CAPTURED[-1]["payload"] # Validate Claude body assert "messages" in payload, "Claude payload must contain messages" assert "system" in payload, "Claude payload must contain system blocks" sys_blocks = payload["system"] # system must be array of blocks with type=text assert isinstance(sys_blocks, list) and any(isinstance(b, dict) and b.get("type") == "text" for b in sys_blocks) sys_text_join = "\n".join([b.get("text") for b in sys_blocks if isinstance(b, dict) and isinstance(b.get("text"), str)]) assert "Системный-тест CLAUDE" in sys_text_join, "Incoming system should be present" assert "Narrator-chan" in sys_text_join, "PROMPT system should be present" async def scenario_prepend_positioning_openai(): print("\n=== PROMPT_COMBINE 4: target=openai, PROMPT@pos=prepend & incoming.contents ===") _patch_http_client() CAPTURED.clear() incoming_json = { "contents": [ {"role": "user", "parts": [{"text": "A"}]}, {"role": "model", "parts": [{"text": "B"}]}, ] } # Put PROMPT first; ensure system message becomes first in messages p = _mk_pipeline("openai", "[[PROMPT]]@pos=prepend & [[VAR:incoming.json.contents]]") out = await PipelineExecutor(p).run(_ctx_with_incoming(incoming_json, vendor="gemini")) print("PIPE OUT:", _pp(out)) assert CAPTURED, "No HTTP request captured" payload = CAPTURED[-1]["payload"] msgs = payload.get("messages", []) assert isinstance(msgs, list) and len(msgs) >= 2 first = msgs[0] # Expect first to be system (from PROMPT) due to prepend assert first.get("role") == "system", f"Expected system as first message, got {first}" def test_prompt_combine_all(): async def main(): await scenario_openai_target_from_gemini_contents() await scenario_gemini_target_from_openai_messages() await scenario_claude_target_from_openai_messages() await scenario_prepend_positioning_openai() print("\n=== PROMPT_COMBINE: DONE ===") asyncio.run(main())