Yash030's picture
Initialize Hugging Face Space deployment for AgentMemory Python (clean without assets)
b2d9e47
"""
agentmemory memory provider for Hermes Agent.
Drop this folder into ~/.hermes/plugins/agentmemory/
or install via: hermes plugin install agentmemory
Requires agentmemory server running: npx @agentmemory/agentmemory
"""
from __future__ import annotations
import json
import os
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import URLError
try:
from agent.memory_provider import MemoryProvider
except ImportError:
from abc import ABC, abstractmethod
class MemoryProvider(ABC):
@property
@abstractmethod
def name(self) -> str: ...
@abstractmethod
def is_available(self) -> bool: ...
@abstractmethod
def initialize(self, session_id: str, **kwargs: Any) -> None: ...
@abstractmethod
def get_tool_schemas(self) -> list[dict]: ...
@abstractmethod
def handle_tool_call(self, name: str, args: dict) -> str: ...
def get_config_schema(self) -> list[dict]: return []
def save_config(self, values: dict, hermes_home: str) -> None: pass
def system_prompt_block(self) -> str: return ""
def prefetch(self, query: str, **kwargs: Any) -> str: return ""
def queue_prefetch(self, query: str, **kwargs: Any) -> None: pass
def sync_turn(self, user: str, assistant: str, **kwargs: Any) -> None: pass
def on_session_end(self, messages: list, **kwargs: Any) -> None: pass
def on_pre_compress(self, messages: list, **kwargs: Any) -> None: pass
def on_memory_write(self, action: str, target: str, content: str, **kwargs: Any) -> None: pass
def shutdown(self, **kwargs: Any) -> None: pass
DEFAULT_BASE_URL = "http://localhost:3111"
TIMEOUT = 5
LOOPBACK_HOSTS = {"localhost", "127.0.0.1", "::1"}
_plaintext_bearer_warned = False
# agentmemory's documented runtime config lives at ~/.agentmemory/.env.
# When agentmemory is launched as a systemd user service (or any other
# process manager that loads that file directly), those values never
# reach an interactive shell. `hermes memory status` then reads
# os.environ in the Hermes CLI process, finds AGENTMEMORY_URL /
# AGENTMEMORY_SECRET unset, and reports the plugin as "Missing" even
# though the service is healthy and live sessions can use it (#250).
#
# Preload the file at plugin-import time using os.environ.setdefault so
# we never override anything the user explicitly set in the shell. The
# preload is best-effort and silent on any failure (file absent,
# unreadable, malformed) — the plugin falls back to its existing default
# (http://localhost:3111) and Hermes status reflects that.
def _preload_agentmemory_dotenv() -> None:
candidates: list[Path] = []
home = os.environ.get("HOME")
if home:
candidates.append(Path(home) / ".agentmemory" / ".env")
xdg_config = os.environ.get("XDG_CONFIG_HOME")
if xdg_config:
candidates.append(Path(xdg_config) / "agentmemory" / ".env")
for path in candidates:
try:
if not path.is_file():
continue
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key:
os.environ.setdefault(key, value)
except (OSError, UnicodeDecodeError):
continue
# Guarantee AGENTMEMORY_URL is set so `hermes memory status` never
# reports it as Missing when a user runs agentmemory at the default
# localhost:3111 (or via systemd with the URL line commented out in
# ~/.agentmemory/.env because it matches the default). #520.
os.environ.setdefault("AGENTMEMORY_URL", DEFAULT_BASE_URL)
_preload_agentmemory_dotenv()
def _validate_url(base: str) -> bool:
if not base:
return False
try:
parsed = urlparse(base)
# .port raises ValueError on a non-numeric or out-of-range port
_ = parsed.port
except ValueError:
return False
if parsed.scheme not in ("http", "https"):
return False
return bool(parsed.hostname)
def _uses_plaintext_bearer_auth(base: str, secret: str = "") -> bool:
if not secret:
return False
parsed = urlparse(base)
return parsed.scheme == "http" and (parsed.hostname or "").lower() not in LOOPBACK_HOSTS
def _plaintext_bearer_auth_message(base: str) -> str:
return f"agentmemory: AGENTMEMORY_SECRET is configured for plaintext HTTP to {base}. Bearer tokens and memory payloads can be observed on the network; use HTTPS or an SSH tunnel."
def _warn_plaintext_bearer_auth(message: str) -> None:
print(message, file=sys.stderr)
def _check_plaintext_bearer_guard(
base: str,
secret: str = "",
warn: Callable[[str], None] | None = None,
) -> None:
global _plaintext_bearer_warned
if not _uses_plaintext_bearer_auth(base, secret):
return
message = _plaintext_bearer_auth_message(base)
if os.environ.get("AGENTMEMORY_REQUIRE_HTTPS") == "1":
raise RuntimeError(message)
if not _plaintext_bearer_warned:
_plaintext_bearer_warned = True
(warn or _warn_plaintext_bearer_auth)(message)
def _reset_plaintext_bearer_guard_for_tests() -> None:
global _plaintext_bearer_warned
_plaintext_bearer_warned = False
def _api(base: str, path: str, body: dict | None = None, method: str = "POST", secret: str = "") -> dict | None:
if not _validate_url(base):
return None
url = f"{base}/agentmemory/{path}"
headers = {"Content-Type": "application/json"}
auth = secret or os.environ.get("AGENTMEMORY_SECRET", "")
_check_plaintext_bearer_guard(base, auth)
if auth:
headers["Authorization"] = f"Bearer {auth}"
data = json.dumps(body).encode() if body else None
req = Request(url, data=data, headers=headers, method=method)
try:
with urlopen(req, timeout=TIMEOUT) as resp:
return json.loads(resp.read().decode())
except (URLError, TimeoutError, json.JSONDecodeError):
return None
def _api_bg(base: str, path: str, body: dict | None = None) -> None:
t = threading.Thread(target=_api, args=(base, path, body), daemon=True)
t.start()
class AgentMemoryProvider(MemoryProvider):
@property
def name(self) -> str:
return "agentmemory"
def is_available(self) -> bool:
# Hermes contract: no network calls in is_available.
base = os.environ.get("AGENTMEMORY_URL", DEFAULT_BASE_URL)
return _validate_url(base)
def initialize(self, session_id: str, **kwargs: Any) -> None:
self._base = os.environ.get("AGENTMEMORY_URL", DEFAULT_BASE_URL)
self._session_id = session_id
self._project = kwargs.get("cwd", os.getcwd())
if os.environ.get("AGENTMEMORY_REQUIRE_HTTPS") == "1":
_check_plaintext_bearer_guard(self._base, os.environ.get("AGENTMEMORY_SECRET", ""))
_api(self._base, "session/start", {
"sessionId": session_id,
"project": self._project,
"cwd": self._project,
})
def get_config_schema(self) -> list[dict]:
return [
{
"key": "url",
"description": "agentmemory server URL",
"default": DEFAULT_BASE_URL,
"env_var": "AGENTMEMORY_URL",
},
{
"key": "secret",
"description": "agentmemory auth secret (optional)",
"secret": True,
"required": False,
"env_var": "AGENTMEMORY_SECRET",
},
]
def save_config(self, values: dict, hermes_home: str) -> None:
config_path = Path(hermes_home) / "agentmemory.json"
config_path.write_text(json.dumps(values, indent=2))
def system_prompt_block(self) -> str:
result = _api(self._base, "context", {
"sessionId": self._session_id,
"project": self._project,
})
if result and result.get("context"):
return result["context"]
return ""
def prefetch(self, query: str, **kwargs: Any) -> str:
result = _api(self._base, "smart-search", {
"query": query,
"limit": 5,
})
if not result or not result.get("results"):
return ""
lines = []
for r in result["results"][:5]:
obs = r.get("observation", r)
title = obs.get("title", "")
narrative = obs.get("narrative", "")
if title:
lines.append(f"- {title}: {narrative[:200]}")
return "\n".join(lines) if lines else ""
def queue_prefetch(self, query: str, **kwargs: Any) -> None:
_api_bg(self._base, "smart-search", {"query": query, "limit": 3})
def get_tool_schemas(self) -> list[dict]:
return [
{
"name": "memory_recall",
"description": "Search agentmemory for past observations by keyword",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 10},
},
"required": ["query"],
},
},
{
"name": "memory_save",
"description": "Save an insight, decision, or pattern to long-term memory",
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "What to remember"},
"type": {
"type": "string",
"enum": ["pattern", "preference", "architecture", "bug", "workflow", "fact"],
"description": "Memory type",
},
},
"required": ["content"],
},
},
{
"name": "memory_search",
"description": "Hybrid semantic + keyword search across all memories",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
]
def handle_tool_call(self, name: str, args: dict) -> str:
# Hermes stores the return value as the tool result `content` in the
# session history. Anthropic-protocol providers reject non-string
# content with a 400 on the next request, so always serialize to a
# JSON string here — matches what agentmemory's main MCP server does
# in src/mcp/standalone.ts (`{ type: "text", text: JSON.stringify(...) }`).
if name == "memory_recall":
result = _api(self._base, "search", {
"query": args["query"],
"limit": args.get("limit", 10),
})
if not result:
return json.dumps({"results": []})
items = []
for r in result.get("results", []):
obs = r.get("observation", r)
items.append({
"title": obs.get("title", ""),
"type": obs.get("type", ""),
"narrative": obs.get("narrative", ""),
"importance": obs.get("importance", 0),
"timestamp": obs.get("timestamp", ""),
})
return json.dumps({"results": items})
if name == "memory_save":
result = _api(self._base, "remember", {
"content": args["content"],
"type": args.get("type", "fact"),
})
return json.dumps(result or {"success": False})
if name == "memory_search":
result = _api(self._base, "smart-search", {
"query": args["query"],
"limit": args.get("limit", 5),
})
if not result:
return json.dumps({"results": []})
items = []
for r in result.get("results", []):
obs = r.get("observation", r)
items.append({
"title": obs.get("title", ""),
"narrative": obs.get("narrative", "")[:300],
"score": r.get("combinedScore", r.get("score", 0)),
})
return json.dumps({"results": items})
return json.dumps({"error": f"Unknown tool: {name}"})
def sync_turn(self, user: str, assistant: str, **kwargs: Any) -> None:
_api_bg(self._base, "observe", {
"hookType": "post_tool_use",
"sessionId": kwargs.get("session_id", self._session_id),
"project": self._project,
"cwd": self._project,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"data": {
"tool_name": "conversation",
"tool_input": user[:500],
"tool_output": assistant[:2000],
},
})
def on_session_end(self, messages: list, **kwargs: Any) -> None:
_api(self._base, "session/end", {
"sessionId": kwargs.get("session_id", self._session_id),
})
def on_pre_compress(self, messages: list, **kwargs: Any) -> None:
result = _api(self._base, "context", {
"sessionId": kwargs.get("session_id", self._session_id),
"project": self._project,
})
if result and result.get("context"):
messages.insert(0, {
"role": "user",
"content": f"[agentmemory context before compaction]\n{result['context']}",
})
def on_memory_write(self, action: str, target: str, content: str, **kwargs: Any) -> None:
if action in ("add", "update") and content:
_api_bg(self._base, "remember", {
"content": content,
"type": "fact",
})
def shutdown(self, **kwargs: Any) -> None:
pass
def register(ctx: Any) -> None:
ctx.register_memory_provider(AgentMemoryProvider())