| |
| """Tool-safe OpenAI-compatible fast proxy for Kaiju Coder 7 OpenCode. |
| |
| The normal Gojira gateway is product/API oriented and aggregates content. OpenCode |
| needs raw tool-call chunks preserved, so this proxy only patches serving knobs |
| and then passes upstream responses through unchanged. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import time |
| import urllib.error |
| import urllib.request |
| from http import HTTPStatus |
| from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer |
| from typing import Any |
|
|
|
|
| DEFAULT_HOST = "127.0.0.1" |
| DEFAULT_PORT = int(os.environ.get("KAIJU_OPENCODE_FAST_PROXY_PORT", "18181")) |
| UPSTREAM_BASE_URL = os.environ.get("KAIJU_OPENAI_BASE_URL", "http://100.109.109.14:18084/v1") |
| DEFAULT_MODEL = os.environ.get("KAIJU_DEFAULT_MODEL", "kaiju-coder-7") |
| API_KEY = os.environ.get("KAIJU_OPENAI_API_KEY", "") |
| NORMAL_MAX_TOKENS = int(os.environ.get("KAIJU_NORMAL_MAX_TOKENS", "384")) |
| WORK_MAX_TOKENS = int(os.environ.get("KAIJU_WORK_MAX_TOKENS", "1536")) |
| ARTIFACT_MAX_TOKENS = int(os.environ.get("KAIJU_ARTIFACT_MAX_TOKENS", "4096")) |
| MAX_REQUEST_BYTES = int(os.environ.get("KAIJU_MAX_REQUEST_BYTES", "2097152")) |
| AUTOROUTE_ENABLED = os.environ.get("KAIJU_OPENCODE_AUTOROUTE", "1").lower() not in {"0", "false", "no"} |
| SUMMARY_ENABLED = os.environ.get("KAIJU_OPENCODE_FAST_SUMMARY", "1").lower() not in {"0", "false", "no"} |
| TOOL_NAME = "kaiju_artifact" |
| WRITE_TOOL_NAME = "kaiju_write_file" |
|
|
|
|
| def normalize_messages(messages: Any) -> list[dict[str, Any]]: |
| if not isinstance(messages, list): |
| return [] |
| return [message for message in messages if isinstance(message, dict)] |
|
|
|
|
| def content_to_text(content: Any) -> str: |
| if isinstance(content, str): |
| stripped = content.strip() |
| if stripped.startswith(("{", "[")): |
| try: |
| return content_to_text(json.loads(stripped)) |
| except Exception: |
| return content |
| return content |
| if isinstance(content, list): |
| parts: list[str] = [] |
| for item in content: |
| if not isinstance(item, dict): |
| continue |
| if isinstance(item.get("text"), str): |
| parts.append(item["text"]) |
| elif item.get("type") == "text" and isinstance(item.get("content"), str): |
| parts.append(item["content"]) |
| elif isinstance(item.get("output"), str): |
| parts.append(item["output"]) |
| if parts: |
| return "\n".join(parts) |
| return json.dumps(content, ensure_ascii=False) |
| if isinstance(content, dict): |
| for key in ("output", "text", "content"): |
| if isinstance(content.get(key), str): |
| return content[key] |
| return json.dumps(content, ensure_ascii=False) |
| return json.dumps(content, ensure_ascii=False) |
|
|
|
|
| def message_text(messages: list[dict[str, Any]]) -> str: |
| parts: list[str] = [] |
| for message in messages: |
| parts.append(content_to_text(message.get("content", ""))) |
| return "\n".join(parts).lower() |
|
|
|
|
| def latest_user_text(messages: list[dict[str, Any]]) -> str: |
| for message in reversed(messages): |
| if message.get("role") != "user": |
| continue |
| content = message.get("content", "") |
| if isinstance(content, str): |
| return content |
| if isinstance(content, list): |
| parts: list[str] = [] |
| for item in content: |
| if not isinstance(item, dict): |
| continue |
| if isinstance(item.get("text"), str): |
| parts.append(item["text"]) |
| elif item.get("type") == "text" and isinstance(item.get("content"), str): |
| parts.append(item["content"]) |
| if parts: |
| return "\n".join(parts) |
| return json.dumps(content, ensure_ascii=False) |
| return "" |
|
|
|
|
| def clean_prompt(prompt: str) -> str: |
| cleaned = prompt.strip() |
| if len(cleaned) >= 2 and cleaned[0] == cleaned[-1] and cleaned[0] in {"'", '"'}: |
| return cleaned[1:-1].strip() |
| return cleaned |
|
|
|
|
| def has_tool(messages: list[dict[str, Any]]) -> bool: |
| return any(message.get("role") == "tool" or message.get("tool_call_id") for message in messages) |
|
|
|
|
| def has_tool_result(messages: list[dict[str, Any]]) -> bool: |
| text = message_text(messages) |
| return ( |
| TOOL_NAME in text |
| or WRITE_TOOL_NAME in text |
| or "wrote file:" in text |
| or "task type:" in text |
| or "artifact:" in text |
| or "manifest:" in text |
| or "changed files:" in text |
| ) and has_tool(messages) |
|
|
|
|
| def classify_text(text: str) -> str: |
| text = text.lower() |
| artifact_terms = ( |
| "website", |
| "one-page", |
| "one page", |
| "homepage", |
| "complete html", |
| "html file", |
| "one-file website", |
| "landing page", |
| "build a website", |
| "make a website", |
| "full file", |
| "desktop", |
| "owner pack", |
| "operating pack", |
| "business suite", |
| ) |
| work_terms = ( |
| "create ", |
| "write ", |
| "edit ", |
| "implement", |
| "debug", |
| "fix", |
| "refactor", |
| "test", |
| "repo", |
| "file", |
| ) |
| if any(term in text for term in artifact_terms): |
| return "artifact" |
| if any(term in text for term in work_terms): |
| return "work" |
| return "normal" |
|
|
|
|
| def classify_job(messages: list[dict[str, Any]]) -> str: |
| return classify_text(clean_prompt(latest_user_text(messages))) |
|
|
|
|
| def infer_kind(prompt: str) -> str: |
| lower = prompt.lower() |
| if any(term in lower for term in ("website", "landing page", "one-page", "one page", "homepage", "html")): |
| return "website" |
| if any(term in lower for term in ("owner pack", "operating pack", "business suite")): |
| return "business_suite" |
| return "auto" |
|
|
|
|
| def infer_out_dir(prompt: str) -> str: |
| folder_match = re.search(r"folder named\s+([A-Za-z0-9_ -]{3,80})(?:\.|,|$)", prompt, re.IGNORECASE) |
| if folder_match: |
| folder = re.sub(r"\s+", "-", re.sub(r"[^A-Za-z0-9_. -]", "", folder_match.group(1).strip().rstrip(" ."))) |
| return os.path.join(os.path.expanduser("~"), "Desktop", folder) |
| if "desktop" in prompt.lower(): |
| return os.path.join(os.path.expanduser("~"), "Desktop", "Kaiju-Coder-7-Artifacts") |
| return "" |
|
|
|
|
| def should_synthesize_tool_call(body: dict[str, Any], messages: list[dict[str, Any]]) -> bool: |
| if not AUTOROUTE_ENABLED or has_tool(messages): |
| return False |
| if classify_job(messages) != "artifact": |
| return False |
| return tool_available(body, TOOL_NAME) |
|
|
|
|
| def tool_available(body: dict[str, Any], name: str) -> bool: |
| tools = body.get("tools") |
| if not isinstance(tools, list): |
| return False |
| return any( |
| isinstance(item, dict) |
| and item.get("type") == "function" |
| and isinstance(item.get("function"), dict) |
| and item["function"].get("name") == name |
| for item in tools |
| ) |
|
|
|
|
| def parse_exact_file_write(prompt: str) -> dict[str, str] | None: |
| prompt = clean_prompt(prompt) |
| match = re.search( |
| r"\bcreate\s+([A-Za-z0-9_./-]{1,160})\s+with exactly(?: this content and no extra characters)?:\s*(.+?)\s*$", |
| prompt, |
| re.IGNORECASE | re.DOTALL, |
| ) |
| if not match: |
| return None |
| file_path = match.group(1).strip() |
| content = match.group(2).strip() |
| if not file_path or not content: |
| return None |
| return {"file_path": file_path, "content": content} |
|
|
|
|
| def should_synthesize_file_write(body: dict[str, Any], messages: list[dict[str, Any]]) -> bool: |
| if not AUTOROUTE_ENABLED or has_tool(messages): |
| return False |
| if not tool_available(body, WRITE_TOOL_NAME): |
| return False |
| return parse_exact_file_write(latest_user_text(messages)) is not None |
|
|
|
|
| def tool_call_arguments(prompt: str) -> dict[str, Any]: |
| prompt = clean_prompt(prompt) |
| args: dict[str, Any] = { |
| "prompt": prompt, |
| "kind": infer_kind(prompt), |
| "no_planner": True, |
| } |
| out_dir = infer_out_dir(prompt) |
| if out_dir: |
| args["out_dir"] = out_dir |
| return args |
|
|
|
|
| def completion_id(prefix: str = "chatcmpl-kaiju") -> str: |
| return f"{prefix}-{int(time.time() * 1000)}" |
|
|
|
|
| def write_sse(handler: BaseHTTPRequestHandler, chunks: list[dict[str, Any]]) -> None: |
| handler.send_response(HTTPStatus.OK) |
| handler.send_header("content-type", "text/event-stream; charset=utf-8") |
| handler.send_header("cache-control", "no-store, no-transform") |
| handler.send_header("connection", "close") |
| handler.end_headers() |
| for chunk in chunks: |
| handler.wfile.write(f"data: {json.dumps(chunk, separators=(',', ':'))}\n\n".encode("utf-8")) |
| handler.wfile.flush() |
| handler.wfile.write(b"data: [DONE]\n\n") |
| handler.wfile.flush() |
|
|
|
|
| def split_json_arguments(args: dict[str, Any]) -> list[str]: |
| raw = json.dumps(args, separators=(",", ":"), ensure_ascii=False) |
| return [raw[index : index + 768] for index in range(0, len(raw), 768)] or ["{}"] |
|
|
|
|
| def synthesize_function_call(handler: BaseHTTPRequestHandler, body: dict[str, Any], tool_name: str, args: dict[str, Any]) -> None: |
| created = int(time.time()) |
| model = str(body.get("model") or DEFAULT_MODEL) |
| chat_id = completion_id() |
| call_id = f"call_kaiju_{created}" |
| if body.get("stream") is True: |
| chunks = [ |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": { |
| "role": "assistant", |
| "tool_calls": [ |
| { |
| "index": 0, |
| "id": call_id, |
| "type": "function", |
| "function": {"name": tool_name, "arguments": ""}, |
| } |
| ], |
| }, |
| "finish_reason": None, |
| } |
| ], |
| } |
| ] |
| chunks.extend( |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {"tool_calls": [{"index": 0, "function": {"arguments": part}}]}, |
| "finish_reason": None, |
| } |
| ], |
| } |
| for part in split_json_arguments(args) |
| ) |
| chunks.append( |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}], |
| } |
| ) |
| write_sse(handler, chunks) |
| return |
| handler._json( |
| HTTPStatus.OK, |
| { |
| "id": chat_id, |
| "object": "chat.completion", |
| "created": created, |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": None, |
| "tool_calls": [ |
| { |
| "id": call_id, |
| "type": "function", |
| "function": {"name": tool_name, "arguments": json.dumps(args, separators=(",", ":"))}, |
| } |
| ], |
| }, |
| "finish_reason": "tool_calls", |
| } |
| ], |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, |
| }, |
| ) |
|
|
|
|
| def synthesize_tool_call(handler: BaseHTTPRequestHandler, body: dict[str, Any], prompt: str) -> None: |
| synthesize_function_call(handler, body, TOOL_NAME, tool_call_arguments(prompt)) |
|
|
|
|
| def synthesize_file_write_call(handler: BaseHTTPRequestHandler, body: dict[str, Any], prompt: str) -> None: |
| args = parse_exact_file_write(prompt) |
| if args is None: |
| raise ValueError("prompt is not an exact file-write request") |
| synthesize_function_call(handler, body, WRITE_TOOL_NAME, args) |
|
|
|
|
| def extract_tool_summary(messages: list[dict[str, Any]]) -> str: |
| text = "" |
| for message in reversed(messages): |
| if message.get("role") == "tool" or message.get("tool_call_id"): |
| text = content_to_text(message.get("content", "")) |
| break |
| if not text: |
| text = message_text(messages) |
| fields = [] |
| for label in ("Task type", "Artifact type", "Manifest", "Artifact", "Project/repo", "Changed files", "Opened artifact"): |
| match = re.search(rf"^{re.escape(label)}:\s*(.+)$", text, re.MULTILINE) |
| if match: |
| fields.append(f"{label}: {match.group(1).strip()}") |
| if fields: |
| return "Kaiju artifact complete.\n\n" + "\n".join(fields) |
| write_match = re.search(r"Wrote file:\s*(.+)$", text, re.MULTILINE) |
| if write_match: |
| return f"File written.\n\nPath: {write_match.group(1).strip()}" |
| return "Kaiju artifact complete. Review the generated output folder and manifest from the tool result." |
|
|
|
|
| def synthesize_summary(handler: BaseHTTPRequestHandler, body: dict[str, Any], messages: list[dict[str, Any]]) -> None: |
| created = int(time.time()) |
| model = str(body.get("model") or DEFAULT_MODEL) |
| content = extract_tool_summary(messages) |
| chat_id = completion_id("chatcmpl-kaiju-summary") |
| if body.get("stream") is True: |
| chunks = [ |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}], |
| }, |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}], |
| }, |
| { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], |
| }, |
| ] |
| write_sse(handler, chunks) |
| return |
| handler._json( |
| HTTPStatus.OK, |
| { |
| "id": chat_id, |
| "object": "chat.completion", |
| "created": created, |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "message": {"role": "assistant", "content": content}, |
| "finish_reason": "stop", |
| } |
| ], |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, |
| }, |
| ) |
|
|
|
|
| def target_tokens(job_class: str) -> int: |
| if job_class == "artifact": |
| return ARTIFACT_MAX_TOKENS |
| if job_class == "work": |
| return WORK_MAX_TOKENS |
| return NORMAL_MAX_TOKENS |
|
|
|
|
| def patch_chat_payload(body: dict[str, Any]) -> dict[str, Any]: |
| patched = dict(body) |
| patched["model"] = DEFAULT_MODEL |
| messages = normalize_messages(patched.get("messages")) |
| job_class = classify_job(messages) |
| patched["max_tokens"] = target_tokens(job_class) |
| patched["chat_template_kwargs"] = { |
| **(patched.get("chat_template_kwargs") if isinstance(patched.get("chat_template_kwargs"), dict) else {}), |
| "enable_thinking": False, |
| "thinking": False, |
| } |
| return patched |
|
|
|
|
| class Handler(BaseHTTPRequestHandler): |
| server_version = "KaijuOpenCodeFastProxy/0.1" |
|
|
| def log_message(self, fmt: str, *args: Any) -> None: |
| print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {self.address_string()} - {fmt % args}", flush=True) |
|
|
| def _json(self, status: int, payload: dict[str, Any]) -> None: |
| data = json.dumps(payload).encode("utf-8") |
| self.send_response(status) |
| self.send_header("content-type", "application/json; charset=utf-8") |
| self.send_header("cache-control", "no-store") |
| self.send_header("content-length", str(len(data))) |
| self.end_headers() |
| self.wfile.write(data) |
|
|
| def _read_json(self) -> dict[str, Any]: |
| length = int(self.headers.get("content-length", "0")) |
| if length > MAX_REQUEST_BYTES: |
| raise ValueError("request body too large") |
| raw = self.rfile.read(length) |
| if not raw: |
| return {} |
| value = json.loads(raw.decode("utf-8")) |
| if not isinstance(value, dict): |
| raise ValueError("request body must be a JSON object") |
| return value |
|
|
| def do_GET(self) -> None: |
| if self.path == "/health": |
| self._json( |
| HTTPStatus.OK, |
| { |
| "ok": True, |
| "model": DEFAULT_MODEL, |
| "upstream": UPSTREAM_BASE_URL, |
| "normal_max_tokens": NORMAL_MAX_TOKENS, |
| "work_max_tokens": WORK_MAX_TOKENS, |
| "artifact_max_tokens": ARTIFACT_MAX_TOKENS, |
| }, |
| ) |
| return |
| if self.path == "/v1/models": |
| self._forward_get("/models") |
| return |
| self._json(HTTPStatus.NOT_FOUND, {"error": {"message": "Not found", "type": "not_found"}}) |
|
|
| def do_POST(self) -> None: |
| if self.path != "/v1/chat/completions": |
| self._json(HTTPStatus.NOT_FOUND, {"error": {"message": "Not found", "type": "not_found"}}) |
| return |
| try: |
| body = patch_chat_payload(self._read_json()) |
| except Exception as error: |
| self._json(HTTPStatus.BAD_REQUEST, {"error": {"message": str(error), "type": "bad_request"}}) |
| return |
| messages = normalize_messages(body.get("messages")) |
| if should_synthesize_file_write(body, messages): |
| synthesize_file_write_call(self, body, latest_user_text(messages)) |
| return |
| if should_synthesize_tool_call(body, messages): |
| synthesize_tool_call(self, body, latest_user_text(messages)) |
| return |
| if SUMMARY_ENABLED and has_tool_result(messages): |
| synthesize_summary(self, body, messages) |
| return |
| self._forward_post("/chat/completions", body) |
|
|
| def _headers(self) -> dict[str, str]: |
| headers = {"content-type": "application/json"} |
| if API_KEY: |
| headers["authorization"] = f"Bearer {API_KEY}" |
| return headers |
|
|
| def _forward_get(self, suffix: str) -> None: |
| request = urllib.request.Request( |
| f"{UPSTREAM_BASE_URL.rstrip('/')}{suffix}", |
| headers=self._headers(), |
| method="GET", |
| ) |
| try: |
| with urllib.request.urlopen(request, timeout=30) as upstream: |
| data = upstream.read() |
| self.send_response(upstream.status) |
| self.send_header("content-type", upstream.headers.get("content-type", "application/json")) |
| self.send_header("cache-control", "no-store") |
| self.send_header("content-length", str(len(data))) |
| self.end_headers() |
| self.wfile.write(data) |
| except urllib.error.HTTPError as error: |
| self._json(error.code, {"error": {"message": error.read().decode("utf-8", errors="replace")[:500]}}) |
| except Exception as error: |
| self._json(HTTPStatus.BAD_GATEWAY, {"error": {"message": str(error), "type": "upstream_error"}}) |
|
|
| def _forward_post(self, suffix: str, body: dict[str, Any]) -> None: |
| data = json.dumps(body).encode("utf-8") |
| request = urllib.request.Request( |
| f"{UPSTREAM_BASE_URL.rstrip('/')}{suffix}", |
| data=data, |
| headers=self._headers(), |
| method="POST", |
| ) |
| try: |
| timeout = 1200 if classify_job(normalize_messages(body.get("messages"))) == "artifact" else 600 |
| with urllib.request.urlopen(request, timeout=timeout) as upstream: |
| content_type = upstream.headers.get("content-type", "application/json") |
| if body.get("stream") is True: |
| self.send_response(upstream.status) |
| self.send_header("content-type", content_type) |
| self.send_header("cache-control", "no-store, no-transform") |
| self.send_header("connection", "close") |
| self.end_headers() |
| for chunk in upstream: |
| self.wfile.write(chunk) |
| self.wfile.flush() |
| return |
| response = upstream.read() |
| self.send_response(upstream.status) |
| self.send_header("content-type", content_type) |
| self.send_header("cache-control", "no-store") |
| self.send_header("content-length", str(len(response))) |
| self.end_headers() |
| self.wfile.write(response) |
| except urllib.error.HTTPError as error: |
| detail = error.read().decode("utf-8", errors="replace")[:500] |
| self._json(error.code, {"error": {"message": detail, "type": "upstream_error"}}) |
| except Exception as error: |
| self._json(HTTPStatus.BAD_GATEWAY, {"error": {"message": str(error), "type": "upstream_error"}}) |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--host", default=DEFAULT_HOST) |
| parser.add_argument("--port", type=int, default=DEFAULT_PORT) |
| args = parser.parse_args() |
| server = ThreadingHTTPServer((args.host, args.port), Handler) |
| print(f"Kaiju OpenCode fast proxy listening on http://{args.host}:{args.port}", flush=True) |
| print(f"Upstream: {UPSTREAM_BASE_URL}", flush=True) |
| server.serve_forever() |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|