Files
HadTavern/agentui/common/manual_http.py
2025-10-03 21:55:24 +03:00

415 lines
14 KiB
Python

from __future__ import annotations
import json
import re
from typing import Any, Dict, Optional, Tuple
# Reuse executor's registry for original (untrimmed) requests
try:
from agentui.pipeline.executor import register_http_request as _reg_http_req # type: ignore
except Exception: # pragma: no cover
_reg_http_req = None # type: ignore
# -------- HTTP editable text parser (safe) --------
def parse_editable_http(s: str) -> Tuple[str, str, Dict[str, str], str]:
"""
Parse text pasted from Request area into (method, url, headers, body_text).
Stops header parsing when a line is not a valid HTTP header key (prevents treating JSON like '"contents": ...' as header).
"""
method, url = "POST", ""
headers: Dict[str, str] = {}
body = ""
try:
if not isinstance(s, str) or not s.strip():
return method, url, headers, body
txt = s.replace("\r\n", "\n")
lines = txt.split("\n")
if not lines:
return method, url, headers, body
first = (lines[0] or "").strip()
m = re.match(r"^([A-Z]+)\s+(\S+)(?:\s+HTTP/\d+(?:\.\d+)?)?$", first)
i = 1
if m:
method = (m.group(1) or "POST").strip().upper()
url = (m.group(2) or "").strip()
else:
i = 0 # no start-line -> treat as headers/body only
def _is_header_line(ln: str) -> bool:
if ":" not in ln:
return False
name = ln.split(":", 1)[0].strip()
# HTTP token: only letters/digits/hyphen. Prevents JSON keys like "contents": from being treated as headers.
return bool(re.fullmatch(r"[A-Za-z0-9\-]+", name))
# Read headers until blank line OR until line not looking like header (start of body)
while i < len(lines):
ln = lines[i]
if ln.strip() == "":
i += 1
break
if not _is_header_line(ln):
break
k, v = ln.split(":", 1)
headers[str(k).strip()] = str(v).strip()
i += 1
# Remainder is body (JSON or text)
body = "\n".join(lines[i:]) if i < len(lines) else ""
except Exception:
pass
return method, url, headers, body
# -------- Headers helpers --------
def dedupe_headers(h: Dict[str, Any]) -> Dict[str, Any]:
"""
Case-insensitive dedupe; drop Host/Content-Length (httpx will set proper).
Last value wins.
"""
try:
dedup: Dict[str, Tuple[str, Any]] = {}
for k, v in (h or {}).items():
lk = str(k).strip().lower()
if lk in {"host", "content-length"}:
continue
dedup[lk] = (k, v)
return {orig_k: val for (_, (orig_k, val)) in dedup.items()}
except Exception:
return dict(h or {})
def content_type_is_json(h: Dict[str, Any]) -> bool:
try:
return any(str(k).lower() == "content-type" and "json" in str(v).lower() for k, v in (h or {}).items())
except Exception:
return False
# -------- JSON parsing & normalization helpers --------
def try_parse_json(s: Any) -> Optional[Any]:
try:
if isinstance(s, (dict, list)):
return s
if isinstance(s, str) and s.strip():
return json.loads(s)
except Exception:
return None
return None
def normalize_jsonish_text(s: Any) -> str:
"""
Normalize JSON-looking text safely:
- If whole text is a quoted JSON string, decode via json.loads to inner string.
- Replace visible \\n/\\r/\\t outside JSON string literals with real control chars.
- Escape raw CR/LF/TAB inside JSON string literals as \\n/\\r/\\t to keep JSON valid.
"""
try:
txt = str(s if s is not None else "")
except Exception:
return ""
# If whole text looks like a quoted JSON string: decode to inner string
try:
if len(txt) >= 2 and txt[0] == '"' and txt[-1] == '"':
v = json.loads(txt)
if isinstance(v, str):
txt = v
except Exception:
pass
out_chars = []
i = 0
n = len(txt)
in_str = False
esc = False
while i < n:
ch = txt[i]
if in_str:
# escape raw control chars within JSON string literal
if ch == "\r":
# CRLF -> \n
if (i + 1) < n and txt[i + 1] == "\n":
out_chars.append("\\n")
i += 2
esc = False
continue
out_chars.append("\\r")
i += 1
esc = False
continue
if ch == "\n":
out_chars.append("\\n")
i += 1
esc = False
continue
if ch == "\t":
out_chars.append("\\t")
i += 1
esc = False
continue
out_chars.append(ch)
if esc:
esc = False
else:
if ch == "\\":
esc = True
elif ch == '"':
in_str = False
i += 1
continue
# not in string literal
if ch == '"':
in_str = True
out_chars.append(ch)
i += 1
continue
if ch == "\\" and (i + 1) < n:
nx = txt[i + 1]
if nx == "n":
out_chars.append("\n")
i += 2
continue
if nx == "r":
out_chars.append("\r")
i += 2
continue
if nx == "t":
out_chars.append("\t")
i += 2
continue
out_chars.append(ch)
i += 1
return "".join(out_chars)
def extract_json_trailing(s: str) -> Optional[Any]:
"""
Pull trailing JSON object/array from mixed text:
- Try whole text first
- Then scan from last '{' or '[' backward.
"""
try:
if not isinstance(s, str):
return None
txt = s.strip()
try:
return json.loads(txt)
except Exception:
pass
idx = txt.rfind("{")
while idx >= 0:
seg = txt[idx:]
try:
return json.loads(seg)
except Exception:
idx = txt.rfind("{", 0, idx)
idx = txt.rfind("[")
while idx >= 0:
seg = txt[idx:]
try:
return json.loads(seg)
except Exception:
idx = txt.rfind("[", 0, idx)
return None
except Exception:
return None
def global_unescape_jsonish(s: str) -> str:
"""
Last-resort: unicode_escape decode to convert \\n -> \n, \\" -> ", \\\\ -> \, \\uXXXX -> char, etc.
"""
try:
import codecs as _codecs
return _codecs.decode(s, "unicode_escape")
except Exception:
try:
return (
s.replace("\\n", "\n")
.replace("\\r", "\r")
.replace("\\t", "\t")
.replace('\\"', '"')
.replace("\\\\", "\\")
)
except Exception:
return s
def looks_jsonish(txt: Any) -> bool:
try:
s = str(txt or "")
if "{" in s or "[" in s:
return True
# also patterns like key:
return bool(re.search(r'\s["\']?[A-Za-z0-9_\-]+["\']?\s*:', s))
except Exception:
return False
def deep_merge_dicts(a: Any, b: Any) -> Any:
"""
Merge dicts (b over a, recursively). Lists or non-dicts are replaced by b.
"""
if isinstance(a, dict) and isinstance(b, dict):
out = dict(a)
for k, v in b.items():
if (k in a) and isinstance(a.get(k), dict) and isinstance(v, dict):
out[k] = deep_merge_dicts(a.get(k), v)
else:
out[k] = v
return out
return b
# ---- Trim-aware merge that preserves original binary/base64 fields ----
def is_trimmed_b64_string(s: Any) -> bool:
try:
if not isinstance(s, str):
return False
return "(trimmed " in s
except Exception:
return False
def looks_base64ish(s: Any) -> bool:
try:
if not isinstance(s, str) or len(s) < 64:
return False
return bool(re.fullmatch(r"[A-Za-z0-9+/=\r\n]+", s))
except Exception:
return False
def merge_lists_preserving_b64(orig_list: Any, edited_list: Any) -> Any:
"""
Merge lists with base64-trimmed preservation but DO NOT pad from original:
- Result length equals edited_list length (indices beyond edited are dropped).
- At each index:
* If edited value is a trimmed placeholder string and original has a string → keep original.
* If both dicts → recurse via deep_merge_preserving_b64.
* If both lists → recurse via merge_lists_preserving_b64.
* Else → take edited value as-is.
"""
if not isinstance(edited_list, list):
return edited_list
if not isinstance(orig_list, list):
orig_list = []
out = []
for i, ev in enumerate(edited_list):
ov = orig_list[i] if i < len(orig_list) else None
if isinstance(ev, str) and is_trimmed_b64_string(ev) and isinstance(ov, str):
out.append(ov)
elif isinstance(ev, dict) and isinstance(ov, dict):
out.append(deep_merge_preserving_b64(ov, ev))
elif isinstance(ev, list) and isinstance(ov, list):
out.append(merge_lists_preserving_b64(ov, ev))
else:
out.append(ev)
return out
def deep_merge_preserving_b64(orig: Any, edited: Any) -> Any:
"""
Merge preserving original base64/data_url only for trimmed placeholders, with strict edited-shape:
- If edited is a trimmed placeholder string and orig is a string → keep orig.
- Dicts: RESULT CONTAINS ONLY KEYS FROM EDITED. Keys missing in edited are treated as deleted.
For each present key: recurse (dict/list) or take edited value; for trimmed strings keep orig.
- Lists: delegate to merge_lists_preserving_b64 (result length = edited length).
- Other types: replace with edited.
"""
if isinstance(edited, str) and is_trimmed_b64_string(edited) and isinstance(orig, str):
return orig
if isinstance(orig, dict) and isinstance(edited, dict):
out: Dict[str, Any] = {}
for k, ev in edited.items():
ov = orig.get(k)
if isinstance(ev, str) and is_trimmed_b64_string(ev) and isinstance(ov, str):
out[k] = ov
elif isinstance(ev, dict) and isinstance(ov, dict):
out[k] = deep_merge_preserving_b64(ov, ev)
elif isinstance(ev, list) and isinstance(ov, list):
out[k] = merge_lists_preserving_b64(ov, ev)
else:
out[k] = ev
return out
if isinstance(orig, list) and isinstance(edited, list):
return merge_lists_preserving_b64(orig, edited)
return edited
def salvage_json_for_send(
edited_body_text: Any,
headers: Dict[str, Any],
orig_json: Optional[Any],
prefer_registry_original: bool = True,
) -> Tuple[Optional[Any], Optional[str]]:
"""
Build (final_json, final_text) for outgoing request body.
Strategy:
- Normalize text for JSON.
- Try parse; then try trailing extract; then unicode_escape unescape and retry.
- If prefer_registry_original=True and orig_json present:
* If edited_json present: deep-merge with base64 preservation, but ONLY keep keys present in edited;
lists are limited to the edited length (no padding from original).
* If not: DO NOT resurrect original. Empty/whitespace → send empty text; otherwise send raw text as-is.
- Else:
* If edited_json present => final_json = edited_json
* Else: if content-type is json and orig_json present => final_json = orig_json
else send raw text.
"""
# Normalize and attempt parse
norm = normalize_jsonish_text(edited_body_text)
edited_json = try_parse_json(norm)
if edited_json is None:
edited_json = extract_json_trailing(norm)
if edited_json is None:
ue = global_unescape_jsonish(str(edited_body_text or ""))
if isinstance(ue, str) and ue != edited_body_text:
ue_norm = normalize_jsonish_text(ue)
edited_json = try_parse_json(ue_norm) or extract_json_trailing(ue_norm)
json_ct = content_type_is_json(headers)
# Prefer original registry JSON where applicable
if prefer_registry_original and orig_json is not None:
if edited_json is None:
# Respect full manual control: do NOT resurrect original JSON.
# Empty/whitespace → send empty text; otherwise send raw text as-is.
if isinstance(norm, str) and not norm.strip():
return None, ""
else:
return None, str(edited_body_text or "")
else:
# Merge edits over original with trimmed-b64 preservation, but keep only keys present in edited
# and limit lists to the edited length.
return deep_merge_preserving_b64(orig_json, edited_json), None
# No prefer or no orig_json
if edited_json is not None:
return edited_json, None
if json_ct and orig_json is not None:
# Hard salvage for declared JSON payloads
maybe = try_parse_json(norm) or extract_json_trailing(norm)
return (maybe if maybe is not None else orig_json), None
# Plain text fallback
return None, str(edited_body_text or "")
# -------- Registry wrapper --------
def register_manual_request(req_id: str, info: Dict[str, Any]) -> None:
try:
if _reg_http_req:
_reg_http_req(req_id, info)
except Exception:
pass