import os import sys import json import hmac import secrets import base64 import threading from flask import Flask, request, jsonify, make_response, send_from_directory from flask_sock import Sock # Load environment variables from ~/.agentmemory/.env if it exists def load_env(): home = os.path.expanduser("~") env_path = os.path.join(home, ".agentmemory", ".env") if os.path.exists(env_path): try: with open(env_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: k, v = line.split("=", 1) k = k.strip() v = v.strip().strip('"').strip("'") os.environ[k] = v print(f"[config] Loaded environment from {env_path}") except Exception as e: print(f"[config] Error reading env file: {e}") load_env() import datetime def datetime_now_iso() -> str: return datetime.datetime.utcnow().isoformat() + "Z" # Import local modules from db import StateKV import search import functions from functions import KV, query_audit app = Flask(__name__) # Enable CORS for all routes (important for client scripts connecting locally) @app.after_request def after_request(response): origin = request.headers.get("Origin") if origin: parsed_origin = origin.lower() is_allowed = False if parsed_origin in ("null", "vscode-webview://"): is_allowed = True elif parsed_origin.startswith("http://localhost:") or parsed_origin == "http://localhost": is_allowed = True elif parsed_origin.startswith("http://127.0.0.1:") or parsed_origin == "http://127.0.0.1": is_allowed = True elif parsed_origin.startswith("vscode-webview://"): is_allowed = True elif parsed_origin.startswith("chrome-extension://"): is_allowed = True if is_allowed: response.headers["Access-Control-Allow-Origin"] = origin response.headers["Access-Control-Allow-Credentials"] = "true" response.headers.add("Access-Control-Allow-Headers", "Content-Type, Authorization") response.headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") return response sock = Sock(app) # Global instances kv = None embedding_provider = None persistence = None def init_app(): global kv, embedding_provider, persistence # 1. Initialize DB kv = StateKV() # 2. Initialize Embedding Provider if API key is available api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") if api_key: try: embedding_provider = search.GeminiEmbeddingProvider(api_key) functions.set_embedding_provider(embedding_provider) print(f"[search] Embedding provider active: gemini (768 dims)") except Exception as e: print(f"[search] Error initializing embedding provider: {e}") else: print(f"[search] No GEMINI_API_KEY found, running in BM25-only mode.") # 3. Persistence & load indexes persistence = functions.IndexPersistence(kv, functions._bm25_index, functions._vector_index if api_key else None) functions.set_index_persistence(persistence) loaded = persistence.load() print(f"[persistence] Load results: BM25 index={loaded['bm25']}, Vector index={loaded['vector']}") # 4. Rebuild index if empty if functions._bm25_index.size == 0: print("[persistence] Search index is empty. Rebuilding in background thread...") def run_rebuild(): try: count = functions.rebuild_index(kv) print(f"[persistence] Rebuild completed: indexed {count} items.") except Exception as ex: print(f"[persistence] Rebuild failed: {ex}") t = threading.Thread(target=run_rebuild, daemon=True) t.start() # 5. Start background worker loops import time def run_auto_forget_loop(): time.sleep(10) while True: try: if os.getenv("AUTO_FORGET_ENABLED") != "false": print("[scheduler] Running auto_forget sweep...") res = functions.auto_forget(kv, dry_run=False) print(f"[scheduler] auto_forget sweep completed: {res}") except Exception as e: print(f"[scheduler] auto_forget loop error: {e}") time.sleep(3600) def run_consolidation_loop(): time.sleep(15) while True: try: print("[scheduler] Running lesson_decay_sweep...") decay_res = functions.lesson_decay_sweep(kv) print(f"[scheduler] lesson_decay_sweep completed: {decay_res}") if functions.is_consolidation_enabled(): print("[scheduler] Running consolidation...") cons_res = functions.consolidate(kv) print(f"[scheduler] consolidation completed: {cons_res}") except Exception as e: print(f"[scheduler] consolidation/decay loop error: {e}") time.sleep(86400) t_forget = threading.Thread(target=run_auto_forget_loop, daemon=True) t_forget.start() t_consolidate = threading.Thread(target=run_consolidation_loop, daemon=True) t_consolidate.start() # ===================================================================== # Auth Middleware # ===================================================================== def timing_safe_compare(a: str, b: str) -> bool: return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8")) def check_auth(): secret = os.getenv("AGENTMEMORY_SECRET") if not secret: return None auth = request.headers.get("Authorization") or request.headers.get("authorization") if not auth or not auth.startswith("Bearer "): return jsonify({"error": "unauthorized"}), 401 provided_token = auth[7:].strip() if not timing_safe_compare(provided_token, secret): return jsonify({"error": "unauthorized"}), 401 return None # ===================================================================== # WebSocket Streaming # ===================================================================== active_websockets = set() @sock.route("/stream/mem-live/viewer") def stream_viewer(ws): secret = os.getenv("AGENTMEMORY_SECRET") if secret: token = request.args.get("token") or request.args.get("secret") if not token or not timing_safe_compare(token, secret): ws.close(1008) return active_websockets.add(ws) try: while True: data = ws.receive() if data is None: break except Exception: pass finally: active_websockets.discard(ws) def broadcast_to_viewers(payload): msg = json.dumps(payload) for ws in list(active_websockets): try: ws.send(msg) except Exception: active_websockets.discard(ws) # Register broadcaster in functions functions.set_stream_broadcaster(broadcast_to_viewers) # ===================================================================== # Viewer Routes # ===================================================================== @app.route("/") @app.route("/viewer") @app.route("/agentmemory/viewer") def serve_viewer(): try: base_dir = os.path.dirname(os.path.abspath(__file__)) template_path = os.path.join(base_dir, "viewer", "index.html") with open(template_path, "r", encoding="utf-8") as f: template = f.read() nonce = base64.urlsafe_b64encode(secrets.token_bytes(16)).decode("utf-8").rstrip("=") auto_token = os.environ.get("AGENTMEMORY_SECRET", "") html = (template .replace("__AGENTMEMORY_VIEWER_NONCE__", nonce) .replace("__AGENTMEMORY_VERSION__", "0.9.8") .replace("__AGENTMEMORY_AUTO_TOKEN__", auto_token)) csp = "; ".join([ "default-src 'none'", "base-uri 'none'", "frame-ancestors 'self' https://huggingface.co https://*.hf.space", "object-src 'none'", "form-action 'none'", f"script-src 'nonce-{nonce}'", "script-src-attr 'none'", "style-src 'unsafe-inline'", "connect-src 'self' https: http://localhost:* http://127.0.0.1:* wss: ws://localhost:* ws://127.0.0.1:* wss://localhost:* wss://127.0.0.1:*", "img-src 'self' data:", "font-src 'self'", ]) res = make_response(html) res.headers["Content-Type"] = "text/html; charset=utf-8" res.headers["Content-Security-Policy"] = csp res.headers["Cache-Control"] = "no-cache" return res except Exception as e: return f"Viewer not found: {e}", 404 @app.route("/favicon.svg") def serve_favicon(): base_dir = os.path.dirname(os.path.abspath(__file__)) return send_from_directory(os.path.join(base_dir, "viewer"), "favicon.svg") # ===================================================================== # REST REST API Endpoints # ===================================================================== @app.route("/agentmemory/livez", methods=["GET"]) def livez(): port = int(os.getenv("III_REST_PORT", os.getenv("PORT", "3111"))) return jsonify({ "status": "ok", "service": "agentmemory", "viewerPort": port, "viewerSkipped": False }) @app.route("/agentmemory/config/flags", methods=["GET"]) def config_flags(): auth_err = check_auth() if auth_err: return auth_err provider_kind = "llm" if embedding_provider else "noop" embedding_prov = "gemini" if embedding_provider else "none" flags = [ { "key": "GRAPH_EXTRACTION_ENABLED", "label": "Knowledge graph extraction", "enabled": functions.is_graph_extraction_enabled(), "default": False, "affects": ["Graph", "Dashboard"], "needsLlm": True, "description": "Extracts entities and relations from observations into a knowledge graph.", "enableHow": "Set GRAPH_EXTRACTION_ENABLED=true and restart.", "docsHref": "https://github.com/rohitg00/agentmemory#knowledge-graph" }, { "key": "CONSOLIDATION_ENABLED", "label": "Memory consolidation", "enabled": functions.is_consolidation_enabled(), "default": False, "affects": ["Dashboard", "Memories", "Crystals"], "needsLlm": True, "description": "Periodically summarizes sessions into semantic facts + procedures.", "enableHow": "Set CONSOLIDATION_ENABLED=true and restart.", "docsHref": "https://github.com/rohitg00/agentmemory#consolidation" }, { "key": "AGENTMEMORY_AUTO_COMPRESS", "label": "LLM-powered observation compression", "enabled": functions.is_auto_compress_enabled(), "default": False, "affects": ["Memories", "Timeline"], "needsLlm": True, "description": "Every observation is compressed by the LLM for richer summaries. OFF uses synthetic compression.", "enableHow": "Set AGENTMEMORY_AUTO_COMPRESS=true.", "docsHref": "https://github.com/rohitg00/agentmemory/issues/138" } ] return jsonify({ "version": "0.9.8", "provider": provider_kind, "embeddingProvider": embedding_prov, "flags": flags }) @app.route("/agentmemory/health", methods=["GET"]) def health(): return jsonify(functions.health_check(kv)) @app.route("/agentmemory/observe", methods=["POST"]) def api_observe(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.observe(kv, body) return jsonify(res), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/agent/observe", methods=["POST"]) def api_agent_observe(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} agent_id = body.get("agentId") session_id = body.get("sessionId") project = body.get("project") cwd = body.get("cwd") or "" text = body.get("text") or body.get("content") or "" obs_type = body.get("type") or "other" title = body.get("title") or f"agent_{obs_type}" image_data = body.get("imageData") if not session_id or not project: return jsonify({"error": "sessionId and project are required"}), 400 timestamp = datetime_now_iso() data_payload = { "tool_name": title, "tool_input": text, "tool_output": text, } if image_data: data_payload["imageBase64"] = image_data payload = { "sessionId": session_id, "project": project, "cwd": cwd, "hookType": "post_tool_use", "timestamp": timestamp, "data": data_payload } if agent_id: payload["agentId"] = agent_id res = functions.observe(kv, payload) return jsonify(res), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/context", methods=["POST"]) def api_context(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.context(kv, body) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/search", methods=["POST"]) def api_search(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} query = body.get("query") if not query or not query.strip(): return jsonify({"error": "query is required"}), 400 limit = body.get("limit") or 10 # smart search / hybrid search query res = functions._hybrid_search.search(query, limit) res = enrich_search_results(kv, res) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/export", methods=["GET"]) def api_export(): auth_err = check_auth() if auth_err: return auth_err try: max_sess = request.args.get("maxSessions") offset = request.args.get("offset") payload = {} if max_sess is not None: payload["maxSessions"] = max_sess if offset is not None: payload["offset"] = offset res = functions.export_data(kv, payload) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/agentmemory/replay/sessions", methods=["GET"]) def api_replay_sessions(): auth_err = check_auth() if auth_err: return auth_err sessions = functions.list_sessions(kv) return jsonify({"success": True, "sessions": sessions}), 200 @app.route("/agentmemory/session/start", methods=["POST"]) def api_session_start(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} session_id = body.get("sessionId") project = body.get("project") cwd = body.get("cwd") if not session_id or not project or not cwd: return jsonify({"error": "sessionId, project, and cwd are required"}), 400 title = body.get("title") agent_id = body.get("agentId") or functions.get_agent_id() session = { "id": session_id, "project": project, "cwd": cwd, "startedAt": datetime_now_iso(), "status": "active", "observationCount": 0 } if title: session["summary"] = title[:200] session["firstPrompt"] = title[:200] if agent_id: session["agentId"] = agent_id functions.create_session(kv, session) # Compile initial context ctx = functions.context(kv, {"sessionId": session_id, "project": project}) # Commit to Dolt functions.commit_if_enabled(kv, f"Start session {session_id[:8]}", agent_id) return jsonify({"session": session, "context": ctx.get("context", "")}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/antigravity/sync", methods=["POST"]) def api_antigravity_sync(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} mode = body.get("mode") or "current_session" current_convo = body.get("currentConversationId") current_folder = body.get("currentFolder") res = perform_antigravity_sync(mode, current_convo, current_folder) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/agentmemory/session/end", methods=["POST"]) def api_session_end(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} session_id = body.get("sessionId") if not session_id: return jsonify({"error": "sessionId is required"}), 400 functions.end_session(kv, session_id) # Commit to Dolt sess = functions.get_session(kv, session_id) or {} agent_id = sess.get("agentId") functions.commit_if_enabled(kv, f"End session {session_id[:8]}", agent_id) return jsonify({"success": True}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/session/commit", methods=["POST"]) def api_session_commit(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} sha = body.get("sha") if not sha: return jsonify({"error": "sha is required"}), 400 session_id = body.get("sessionId") branch = body.get("branch") repo = body.get("repo") message = body.get("message") author = body.get("author") authored_at = body.get("authoredAt") files = body.get("files") existing = kv.get(KV.commits, sha) or {} session_ids = set(existing.get("sessionIds", [])) if session_id: session_ids.add(session_id) link = { "sha": sha, "shortSha": sha[:7], "branch": branch or existing.get("branch"), "repo": repo or existing.get("repo"), "message": message or existing.get("message"), "author": author or existing.get("author"), "authoredAt": authored_at or existing.get("authoredAt"), "files": files or existing.get("files"), "sessionIds": list(session_ids), "linkedAt": existing.get("linkedAt") or datetime_now_iso() } kv.set(KV.commits, sha, link) if session_id: sess = functions.get_session(kv, session_id) if sess: commit_shas = set(sess.get("commitShas", [])) commit_shas.add(sha) sess["commitShas"] = list(commit_shas) functions.create_session(kv, sess) return jsonify({"commit": link}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/session/by-commit", methods=["GET"]) def api_session_by_commit(): auth_err = check_auth() if auth_err: return auth_err sha = request.args.get("sha") if not sha: return jsonify({"error": "sha query param required"}), 400 link = kv.get(KV.commits, sha) if not link: return jsonify({"error": "no sessions linked to this commit"}), 404 sessions = [] for sid in link.get("sessionIds", []): sess = functions.get_session(kv, sid) if sess: sessions.append(sess) return jsonify({"commit": link, "sessions": sessions}), 200 @app.route("/agentmemory/commits", methods=["GET"]) def api_commits(): auth_err = check_auth() if auth_err: return auth_err branch = request.args.get("branch") repo = request.args.get("repo") limit = int(request.args.get("limit", "100")) all_links = kv.list(KV.commits) filtered = all_links if branch: filtered = [c for c in filtered if c.get("branch") == branch] if repo: filtered = [c for c in filtered if c.get("repo") == repo] filtered.sort(key=lambda c: c.get("linkedAt", ""), reverse=True) return jsonify({"commits": filtered[:limit]}), 200 @app.route("/agentmemory/sessions", methods=["GET"]) def api_sessions(): auth_err = check_auth() if auth_err: return auth_err sessions = functions.list_sessions(kv) agent_id = request.args.get("agentId") if agent_id and agent_id != "*": sessions = [s for s in sessions if s.get("agentId") == agent_id] elif functions.is_agent_scope_isolated(): env_aid = functions.get_agent_id() if env_aid: sessions = [s for s in sessions if s.get("agentId") == env_aid] return jsonify({"sessions": sessions}), 200 @app.route("/agentmemory/observations", methods=["GET"]) def api_observations(): auth_err = check_auth() if auth_err: return auth_err session_id = request.args.get("sessionId") if not session_id: return jsonify({"error": "sessionId is required"}), 400 obs = kv.list(KV.observations(session_id)) obs.sort(key=lambda o: o.get("timestamp", "")) agent_id = request.args.get("agentId") if agent_id and agent_id != "*": obs = [o for o in obs if o.get("agentId") == agent_id] elif functions.is_agent_scope_isolated(): env_aid = functions.get_agent_id() if env_aid: obs = [o for o in obs if o.get("agentId") == env_aid] return jsonify({"observations": obs}), 200 @app.route("/agentmemory/remember", methods=["POST"]) def api_remember(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.remember(kv, body) return jsonify(res), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/agent/remember", methods=["POST"]) def api_agent_remember(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} content = body.get("content") if not content: return jsonify({"error": "content is required"}), 400 agent_id = body.get("agentId") project = body.get("project") mem_type = body.get("type") or "fact" concepts = body.get("concepts") or [] if isinstance(concepts, str): concepts = [c.strip() for c in concepts.split(",") if c.strip()] files = body.get("files") or [] if isinstance(files, str): files = [f.strip() for f in files.split(",") if f.strip()] payload = { "content": content, "type": mem_type, "concepts": concepts, "files": files, "project": project } if agent_id: payload["agentId"] = agent_id res = functions.remember(kv, payload) return jsonify(res), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/forget", methods=["POST"]) def api_forget(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.forget(kv, body) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 # ===================================================================== # Lessons Learned Endpoints # ===================================================================== @app.route("/agentmemory/lessons", methods=["GET"]) def api_lessons_list(): auth_err = check_auth() if auth_err: return auth_err project = request.args.get("project") source = request.args.get("source") min_conf = float(request.args.get("minConfidence", "0")) limit = int(request.args.get("limit", "50")) res = functions.lesson_list(kv, { "project": project, "source": source, "minConfidence": min_conf, "limit": limit }) return jsonify(res), 200 @app.route("/agentmemory/lessons", methods=["POST"]) def api_lessons_save(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.lesson_save(kv, body) return jsonify(res), 201 if res.get("action") == "created" else 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/lessons/search", methods=["POST"]) def api_lessons_search(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.lesson_recall(kv, body) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/lessons/strengthen", methods=["POST"]) def api_lessons_strengthen(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} lesson_id = body.get("lessonId") if not lesson_id: return jsonify({"error": "lessonId is required"}), 400 res = functions.lesson_strengthen(kv, lesson_id) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 # ===================================================================== # Memory Slots Endpoints # ===================================================================== @app.route("/agentmemory/slots", methods=["GET"]) def api_slots_list(): auth_err = check_auth() if auth_err: return auth_err return jsonify(functions.slot_list(kv)), 200 @app.route("/agentmemory/slot", methods=["GET"]) def api_slots_get(): auth_err = check_auth() if auth_err: return auth_err label = request.args.get("label") if not label: return jsonify({"error": "label required"}), 400 res = functions.slot_get(kv, label) status = 200 if res.get("success") else 404 return jsonify(res), status @app.route("/agentmemory/slot", methods=["POST"]) def api_slots_create(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.slot_create(kv, body) status = 201 if res.get("success") else 400 return jsonify(res), status except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/slot/append", methods=["POST"]) def api_slots_append(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} label = body.get("label") text = body.get("text") if not label or not text: return jsonify({"error": "label and text required"}), 400 res = functions.slot_append(kv, label, text) status = 200 if res.get("success") else 400 return jsonify(res), status except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/slot/replace", methods=["POST"]) def api_slots_replace(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} label = body.get("label") content = body.get("content") if not label or content is None: return jsonify({"error": "label and content required"}), 400 res = functions.slot_replace(kv, label, content) status = 200 if res.get("success") else 400 return jsonify(res), status except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/slot", methods=["DELETE"]) def api_slots_delete(): auth_err = check_auth() if auth_err: return auth_err label = request.args.get("label") if not label: return jsonify({"error": "label query param required"}), 400 res = functions.slot_delete(kv, label) status = 200 if res.get("success") else 404 return jsonify(res), status @app.route("/agentmemory/slot/reflect", methods=["POST"]) def api_slots_reflect(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} session_id = body.get("sessionId") if not session_id: return jsonify({"error": "sessionId required"}), 400 max_obs = body.get("maxObservations") or 50 res = functions.slot_reflect(kv, session_id, max_obs) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 # ===================================================================== # Audit, Relations, Evolve, Timeline, Profile # ===================================================================== @app.route("/agentmemory/audit", methods=["GET"]) def api_audit(): auth_err = check_auth() if auth_err: return auth_err op = request.args.get("operation") limit = int(request.args.get("limit", "50")) res = query_audit(kv, {"operation": op, "limit": limit}) return jsonify({"entries": res, "success": True}), 200 @app.route("/agentmemory/relations", methods=["GET"]) def api_relations_list(): auth_err = check_auth() if auth_err: return auth_err rels = functions.get_relations(kv) return jsonify({"relations": rels}), 200 @app.route("/agentmemory/relations", methods=["POST"]) def api_relations_add(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.add_relation(kv, body) return jsonify(res), 201 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/evolve", methods=["POST"]) def api_evolve(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.evolve_memory(kv, body) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/timeline", methods=["POST"]) def api_timeline(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.timeline(kv, body) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/profile", methods=["GET"]) def api_profile(): auth_err = check_auth() if auth_err: return auth_err project = request.args.get("project") if not project: sessions = kv.list(KV.sessions) projects = set(s.get("project", "") for s in sessions if s.get("project")) memories = kv.list(KV.memories) mem_projects = set(m.get("project", "") for m in memories if m.get("project")) all_projects = sorted(projects.union(mem_projects)) return jsonify({"projects": all_projects, "success": True}), 200 res = functions.get_project_profile(kv, project) return jsonify(res), 200 # ===================================================================== # Missing endpoints the JS viewer calls # ===================================================================== @app.route("/agentmemory/memories", methods=["GET"]) def api_memories_list(): auth_err = check_auth() if auth_err: return auth_err latest_only = request.args.get("latest", "false").lower() == "true" limit = int(request.args.get("limit", "500")) all_mems = kv.list(KV.memories) if latest_only: all_mems = [m for m in all_mems if m.get("isLatest") is not False] all_mems.sort(key=lambda m: m.get("createdAt", ""), reverse=True) return jsonify({"memories": all_mems[:limit], "total": len(all_mems)}), 200 @app.route("/agentmemory/graph/stats", methods=["GET"]) def api_graph_stats(): auth_err = check_auth() if auth_err: return auth_err nodes = kv.list(KV.graphNodes) edges = kv.list(KV.graphEdges) return jsonify({"nodes": len(nodes), "edges": len(edges), "success": True}), 200 @app.route("/agentmemory/semantic", methods=["GET"]) def api_semantic_list(): auth_err = check_auth() if auth_err: return auth_err items = kv.list(KV.semantic) return jsonify({"items": items, "total": len(items)}), 200 @app.route("/agentmemory/procedural", methods=["GET"]) def api_procedural_list(): auth_err = check_auth() if auth_err: return auth_err items = kv.list(KV.procedural) return jsonify({"items": items, "total": len(items)}), 200 @app.route("/agentmemory/crystals", methods=["GET"]) def api_crystals_list(): auth_err = check_auth() if auth_err: return auth_err items = kv.list(KV.crystals) return jsonify({"crystals": items, "total": len(items)}), 200 def escape_xml(s: str) -> str: return s.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """).replace("'", "'") def get_file_context(kv_inst, session_id, files, project): try: sessions = kv_inst.list(KV.sessions) other_sessions = [s for s in sessions if s.get("id") != session_id] if session_id else sessions if project: other_sessions = [s for s in other_sessions if s.get("project") == project] # Sort sessions by startedAt desc other_sessions.sort(key=lambda s: s.get("startedAt") or "", reverse=True) other_sessions = other_sessions[:15] obs_cache = {} for s in other_sessions: s_id = s["id"] obs_cache[s_id] = kv_inst.list(KV.observations(s_id)) results = [] for file in files: history = {"file": file, "observations": []} normalized_file = file.replace("./", "", 1) for s in other_sessions: s_id = s["id"] observations = obs_cache.get(s_id) or [] for obs in observations: obs_files = obs.get("files") or [] if isinstance(obs_files, str): obs_files = [f.strip() for f in obs_files.split(",") if f.strip()] obs_title = obs.get("title") if not obs_files or not obs_title: continue # Check match matches = False for f in obs_files: if f == file or f == normalized_file or f.endswith(f"/{normalized_file}") or normalized_file.endswith(f"/{f}"): matches = True break importance = int(obs.get("importance") or 0) if matches and importance >= 4: history["observations"].append({ "sessionId": s_id, "obsId": obs.get("id"), "type": obs.get("type", "other"), "title": obs_title, "narrative": obs.get("narrative") or obs.get("content") or "", "importance": importance, "timestamp": obs.get("timestamp", "") }) # Sort by importance desc history["observations"].sort(key=lambda o: o["importance"], reverse=True) history["observations"] = history["observations"][:5] if history["observations"]: results.append(history) if not results: return "" lines = [""] for fh in results: lines.append(f"## {fh['file']}") for obs in fh["observations"]: lines.append(f"- [{obs['type']}] {obs['title']}: {obs['narrative']}") lines.append("") return "\n".join(lines) except Exception as e: print(f"[enrich] Error generating file context: {e}") return "" @app.route("/agentmemory/enrich", methods=["POST"]) def api_enrich(): auth_err = check_auth() if auth_err: return auth_err try: data = request.get_json(force=True) or {} session_id = data.get("sessionId") files = data.get("files") or [] terms = data.get("terms") or [] project = data.get("project") parts = [] # 1. File Context if files: file_context = get_file_context(kv, session_id, files, project) if file_context: parts.append(file_context) # 2. Search search_queries = [os.path.basename(f) for f in files] + terms search_queries = [q for q in search_queries if q] if search_queries: query_str = " ".join(search_queries) res = functions._hybrid_search.search(query_str, 5) if project: res = [r for r in res if r.get("observation", {}).get("project") == project or r.get("project") == project] obs_texts = [] for r in res: obs = r.get("observation", r) narrative = obs.get("narrative") or obs.get("content") if narrative: obs_texts.append(escape_xml(narrative)) if obs_texts: parts.append("\n" + "\n".join(obs_texts) + "\n") # 3. Bug memories if files: bugs = [] memories = kv.list(KV.memories) for m in memories: m_type = m.get("type") m_is_latest = m.get("isLatest", True) m_project = m.get("project") m_files = m.get("files") or [] if isinstance(m_files, str): m_files = [f.strip() for f in m_files.split(",") if f.strip()] if m_type == "bug" and m_is_latest: if project and m_project and m_project != project: continue has_overlap = False for f in m_files: for df in files: if f in df or df in f: has_overlap = True break if has_overlap: break if has_overlap: bugs.append(m) bugs.sort(key=lambda m: m.get("updatedAt") or m.get("createdAt") or "", reverse=True) bugs_lines = [] for m in bugs[:3]: title = escape_xml(m.get("title") or "") content = escape_xml(m.get("content") or "") bugs_lines.append(f"- {title}: {content}") if bugs_lines: parts.append("\n" + "\n".join(bugs_lines) + "\n") context = "\n\n".join(parts) if len(context) > 4000: context = context[:4000] return jsonify({"context": context}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/agentmemory/crystals/auto", methods=["POST"]) def api_crystals_auto(): auth_err = check_auth() if auth_err: return auth_err return jsonify({"success": True, "message": "auto-crystallize stub (not implemented in Python version)"}), 200 @app.route("/agentmemory/claude-bridge/sync", methods=["POST"]) def api_claude_bridge_sync(): auth_err = check_auth() if auth_err: return auth_err return jsonify({"success": True, "message": "claude-bridge/sync stub"}), 200 @app.route("/agentmemory/actions", methods=["GET"]) def api_actions_list(): auth_err = check_auth() if auth_err: return auth_err limit = int(request.args.get("limit", "200")) status = request.args.get("status") items = kv.list(KV.actions) if status: items = [a for a in items if a.get("status") == status] items.sort(key=lambda a: a.get("updatedAt", ""), reverse=True) return jsonify({"actions": items[:limit], "total": len(items)}), 200 @app.route("/agentmemory/actions", methods=["POST"]) def api_action_create(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} action_id = functions.generate_id("act") from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() action = { "id": action_id, "title": body.get("title", ""), "description": body.get("description"), "priority": body.get("priority", 0), "status": body.get("status", "pending"), "tags": body.get("tags", []), "sessionId": body.get("sessionId"), "createdAt": now, "updatedAt": now, } kv.set(KV.actions, action_id, action) return jsonify({"action": action, "success": True}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/actions/", methods=["PATCH"]) def api_action_update(action_id): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} existing = kv.get(KV.actions, action_id) if not existing: return jsonify({"error": "not found"}), 404 from datetime import datetime, timezone allowed_fields = {"title", "description", "priority", "status", "tags", "sessionId"} updates = {k: v for k, v in body.items() if k in allowed_fields} existing.update(updates) existing["updatedAt"] = datetime.now(timezone.utc).isoformat() kv.set(KV.actions, action_id, existing) return jsonify({"action": existing, "success": True}), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/frontier", methods=["GET"]) def api_frontier(): auth_err = check_auth() if auth_err: return auth_err items = kv.list(KV.actions) frontier = [a for a in items if a.get("status") in ("pending", "active")] frontier.sort(key=lambda a: (-(a.get("priority") or 0), a.get("createdAt", ""))) return jsonify({"frontier": frontier[:50], "total": len(frontier)}), 200 @app.route("/agentmemory/insights", methods=["GET"]) def api_insights_list(): auth_err = check_auth() if auth_err: return auth_err limit = int(request.args.get("limit", "200")) items = kv.list(KV.insights) items.sort(key=lambda x: x.get("createdAt", ""), reverse=True) return jsonify({"insights": items[:limit], "total": len(items)}), 200 @app.route("/agentmemory/replay/load", methods=["GET"]) def api_replay_load(): auth_err = check_auth() if auth_err: return auth_err session_id = request.args.get("sessionId") if not session_id: return jsonify({"error": "sessionId required"}), 400 session = functions.get_session(kv, session_id) if not session: return jsonify({"error": "session not found"}), 404 obs = kv.list(KV.observations(session_id)) obs.sort(key=lambda o: o.get("timestamp", "")) from dateutil import parser as dtparser session_start = session.get("startedAt", "") try: t0_ms = dtparser.parse(session_start).timestamp() * 1000 if session_start else None except Exception: t0_ms = None _hook_kind = { "prompt_submit": "prompt", "post_tool_use": "tool_call", "post_tool_failure": "tool_error", "subagent_stop": "response", "task_completed": "response", "notification": "response", } events = [] for o in obs: ts = o.get("timestamp", "") try: t_ms = dtparser.parse(ts).timestamp() * 1000 if ts else 0 except Exception: t_ms = 0 offset_ms = max(0, t_ms - t0_ms) if t0_ms is not None else t_ms hook = o.get("hookType", "") kind = _hook_kind.get(hook, "prompt") # obs fields are top-level (no data wrapper) tool_name = o.get("toolName") tool_input = o.get("toolInput") tool_output = o.get("toolOutput") label = (o.get("userPrompt") or o.get("narrative") or o.get("title") or o.get("summary") or tool_name or (o.get("raw") or "")[:80] or hook) if isinstance(label, str): label = label[:120] events.append({ "kind": kind, "label": label, "offsetMs": offset_ms, "ts": ts, "body": o.get("narrative") or o.get("raw") or o.get("summary") or "", "toolName": tool_name, "toolInput": tool_input, "toolOutput": tool_output, }) total_ms = (events[-1]["offsetMs"] + 1000) if events else 0 timeline = {"events": events, "eventCount": len(events), "totalDurationMs": total_ms} return jsonify({"session": session, "timeline": timeline, "success": True}), 200 @app.route("/agentmemory/replay/import-jsonl", methods=["POST"]) def api_replay_import_jsonl(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} path = body.get("path") max_files = body.get("maxFiles") import replay_import res = replay_import.import_jsonl_data(kv, path=path, max_files=max_files) status_code = 200 if res.get("success", True) else 400 return jsonify(res), status_code except Exception as e: return jsonify({"error": str(e), "success": False}), 500 @app.route("/agentmemory/auto-forget", methods=["POST"]) def api_auto_forget(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} dry_run = body.get("dryRun", False) res = functions.auto_forget(kv, dry_run) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/summarize", methods=["POST"]) def api_summarize(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} res = functions.summarize(kv, body) if not res.get("success"): return jsonify(res), 400 return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/consolidate", methods=["POST"]) def api_consolidate(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} project = body.get("project") min_obs = body.get("minObservations") res = functions.consolidate(kv, project=project, min_observations=min_obs) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/consolidate-pipeline", methods=["POST"]) def api_consolidate_pipeline(): auth_err = check_auth() if auth_err: return auth_err try: res = functions.consolidate(kv) return jsonify(res), 200 except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/agentmemory/dolt/commits", methods=["GET"]) def api_dolt_commits(): auth_err = check_auth() if auth_err: return auth_err try: limit = int(request.args.get("limit", "50")) rows = kv.get_audit_log(limit) commits = [] for r in rows: ts_ms = r["ts"] iso = datetime.datetime.utcfromtimestamp(ts_ms / 1000).isoformat() + "Z" row_id = str(r["id"]) commits.append({ "sha": row_id.zfill(40), "shortSha": row_id, "agent": r["agent_id"], "email": f"{r['agent_id']}@agentmemory.ai", "date": iso, "message": r["message"] }) return jsonify({"success": True, "commits": commits}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/agentmemory/second-brain", methods=["GET"]) def api_get_second_brain(): auth_err = check_auth() if auth_err: return auth_err brain_dir = os.getenv("SECOND_BRAIN_DIR", os.path.join(os.path.expanduser("~"), ".agentmemory", "second-brain")) os.makedirs(brain_dir, exist_ok=True) file_param = request.args.get("file") if file_param: safe_name = os.path.basename(file_param) if not safe_name.endswith(".md"): return jsonify({"error": "Only markdown files are allowed"}), 400 file_path = os.path.join(brain_dir, safe_name) if not os.path.exists(file_path): return jsonify({"error": f"File {safe_name} not found"}), 404 try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() return jsonify({"file": safe_name, "content": content}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 try: files = [] for name in os.listdir(brain_dir): if name.endswith(".md") and os.path.isfile(os.path.join(brain_dir, name)): file_path = os.path.join(brain_dir, name) with open(file_path, "r", encoding="utf-8") as f: content = f.read() files.append({ "name": name, "size": os.path.getsize(file_path), "content": content }) return jsonify({"success": True, "path": brain_dir, "files": files}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/agentmemory/second-brain", methods=["POST"]) def api_update_second_brain(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} file_name = body.get("file") content = body.get("content") if not file_name or content is None: return jsonify({"error": "file and content are required"}), 400 safe_name = os.path.basename(file_name) if not safe_name.endswith(".md"): return jsonify({"error": "Only markdown files are allowed"}), 400 brain_dir = os.getenv("SECOND_BRAIN_DIR", os.path.join(os.path.expanduser("~"), ".agentmemory", "second-brain")) if not os.path.exists(brain_dir): os.makedirs(brain_dir, exist_ok=True) file_path = os.path.join(brain_dir, safe_name) content = functions.strip_private_data(content) with open(file_path, "w", encoding="utf-8") as f: f.write(content) functions.commit_if_enabled(kv, f"Update second-brain file: {safe_name}", "user") return jsonify({"success": True, "file": safe_name, "size": len(content)}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # ===================================================================== # MCP Server Integration (JSON-RPC) # ===================================================================== def parse_mcp_list_arg(arg_val): if isinstance(arg_val, list): return [str(item).strip() for item in arg_val if item] if isinstance(arg_val, str) and arg_val: return [item.strip() for item in arg_val.split(",") if item.strip()] return [] def enrich_search_results(kv_inst, results): enriched = [] for r in results: item = dict(r) obs_id = item.get("obsId") session_id = item.get("sessionId") if not obs_id: continue obj = None if session_id == "memory" or obs_id.startswith("mem_"): obj = kv_inst.get(KV.memories, obs_id) elif session_id: obj = kv_inst.get(KV.observations(session_id), obs_id) if obj: item["title"] = obj.get("title") or "" content_val = obj.get("content") or obj.get("narrative") or obj.get("raw") or "" if not isinstance(content_val, str): try: content_val = json.dumps(content_val) except Exception: content_val = str(content_val) item["content"] = content_val item["type"] = obj.get("type") or "" item["concepts"] = obj.get("concepts") or [] item["files"] = obj.get("files") or [] else: item["title"] = "" item["content"] = "" item["type"] = "" item["concepts"] = [] item["files"] = [] enriched.append(item) return enriched def perform_antigravity_sync(mode="current_session", current_conversation_id=None, current_folder=None): import os import json import glob import re brain_dir = os.path.join(os.path.expanduser("~"), ".gemini", "antigravity", "brain") if not os.path.exists(brain_dir): return {"success": False, "syncedSessions": [], "observationsAdded": 0, "error": f"Brain directory not found at {brain_dir}"} pattern = os.path.join(brain_dir, "*", ".system_generated", "logs", "transcript.jsonl") files = glob.glob(pattern) if not files: return {"success": True, "syncedSessions": [], "observationsAdded": 0} conversations = [] for fpath in files: try: mtime = os.path.getmtime(fpath) convo_id = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(fpath)))) conversations.append({ "id": convo_id, "transcriptPath": fpath, "mtime": mtime }) except Exception: pass if not conversations: return {"success": True, "syncedSessions": [], "observationsAdded": 0} conversations.sort(key=lambda x: x["mtime"], reverse=True) targets = [] if mode == "current_session": if current_conversation_id: match = next((c for c in conversations if c["id"] == current_conversation_id), None) if match: targets = [match] else: targets = [conversations[0]] elif mode == "current_folder": target_folder = current_folder if current_folder else "" if not target_folder: target_folder = os.getcwd() target_folder_norm = target_folder.replace("\\", "/").lower().strip() for convo in conversations: try: with open(convo["transcriptPath"], "r", encoding="utf-8") as tf: text = tf.read().lower() text_norm = text.replace("\\/", "/").replace("\\\\", "/") if target_folder_norm in text_norm: targets.append(convo) except Exception: pass elif mode == "all": targets = conversations else: return {"success": False, "syncedSessions": [], "observationsAdded": 0, "error": f"Invalid mode: {mode}"} if not targets: return {"success": True, "syncedSessions": [], "processedSessions": [], "observationsAdded": 0} synced_sessions = [] processed_sessions = [] observations_added = 0 for convo in targets: convo_id = convo["id"] tpath = convo["transcriptPath"] session_id = f"antigravity_{convo_id[:18].replace('-', '_')}" project_path = None try: with open(tpath, "r", encoding="utf-8") as tf: first_line = tf.readline() if first_line: step = json.loads(first_line) match = re.search(r"\[([^\]]+)\]\s*->\s*\[([^\]]+)\]", step.get("content", "")) if match: project_path = match.group(2) except Exception: pass if not project_path: project_path = os.getcwd() turns = [] current_prompt = None current_timestamp = None try: with open(tpath, "r", encoding="utf-8") as tf: for line in tf: if not line.strip(): continue try: step = json.loads(line) step_type = step.get("type") if step_type == "USER_INPUT": p_text = step.get("content", "") if "" in p_text: parts = p_text.split("") if len(parts) > 1: p_text = parts[1] if "" in p_text: p_text = p_text.split("")[0] current_prompt = p_text.strip() current_timestamp = step.get("created_at") elif step_type == "PLANNER_RESPONSE" and current_prompt: turns.append({ "prompt": current_prompt, "response": step.get("content", ""), "timestamp": current_timestamp or step.get("created_at") or datetime_now_iso() }) current_prompt = None current_timestamp = None except Exception: pass except Exception: continue if not turns: continue existing_inputs = set() obs_list = kv.list(KV.observations(session_id)) for obs in obs_list: tool_input = obs.get("toolInput") or (obs.get("raw") or {}).get("tool_input") if tool_input: existing_inputs.add(tool_input.strip()) session_exists = kv.get(KV.sessions, session_id) is not None if not session_exists: session = { "id": session_id, "project": project_path, "cwd": project_path, "startedAt": datetime_now_iso(), "status": "active", "observationCount": 0, "summary": f"Antigravity Pair Programming ({convo_id[:8]})", "firstPrompt": f"Antigravity Pair Programming ({convo_id[:8]})", "agentId": "antigravity" } functions.create_session(kv, session) convo_synced = False for turn in turns: prompt = turn["prompt"] if prompt.strip() in existing_inputs: continue payload = { "sessionId": session_id, "project": project_path, "cwd": project_path, "hookType": "post_tool_use", "timestamp": turn["timestamp"], "agentId": "antigravity", "data": { "tool_name": "conversation", "tool_input": prompt, "tool_output": turn["response"], } } functions.observe(kv, payload) observations_added += 1 convo_synced = True processed_sessions.append(convo_id) if convo_synced: synced_sessions.append(convo_id) return {"success": True, "syncedSessions": synced_sessions, "processedSessions": processed_sessions, "observationsAdded": observations_added} @app.route("/agentmemory/mcp/tools", methods=["GET"]) def mcp_tools_list(): auth_err = check_auth() if auth_err: return auth_err tools = [ { "name": "memory_recall", "description": "Search past session observations for relevant context. Use when you need to recall what happened in previous sessions.", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query keywords"}, "limit": {"type": "number", "description": "Max results to return (default 10)"} }, "required": ["query"] } }, { "name": "memory_save", "description": "Explicitly save an important insight, decision, or pattern to long-term memory.", "inputSchema": { "type": "object", "properties": { "content": {"type": "string", "description": "The insight or decision to remember"}, "type": {"type": "string", "description": "Memory type: pattern, preference, architecture, bug, workflow, or fact"}, "concepts": { "oneOf": [ {"type": "string", "description": "Comma-separated key concepts"}, {"type": "array", "items": {"type": "string"}, "description": "List of key concepts"} ] }, "files": { "oneOf": [ {"type": "string", "description": "Comma-separated relevant file paths"}, {"type": "array", "items": {"type": "string"}, "description": "List of relevant file paths"} ] }, "project": {"type": "string", "description": "Canonical project identifier"} }, "required": ["content"] } }, { "name": "memory_sessions", "description": "List recent sessions with their status and observation counts.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_sessions_list", "description": "Retrieve list of all memory sessions.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_smart_search", "description": "Hybrid semantic+keyword search with progressive disclosure.", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "limit": {"type": "number", "description": "Max results (default 10)"} }, "required": ["query"] } }, { "name": "memory_timeline", "description": "Chronological observations around an anchor point.", "inputSchema": { "type": "object", "properties": { "anchor": {"type": "string", "description": "Anchor point date or keyword"}, "project": {"type": "string", "description": "Filter by project path"}, "sessionId": {"type": "string", "description": "Filter by session ID"} }, "required": ["anchor"] } }, { "name": "memory_observations", "description": "Retrieve all observations for a given session ID.", "inputSchema": { "type": "object", "properties": { "sessionId": {"type": "string", "description": "Session ID to fetch observations for"} }, "required": ["sessionId"] } }, { "name": "memory_profile", "description": "User/project profile with top concepts and file patterns.", "inputSchema": { "type": "object", "properties": { "project": {"type": "string", "description": "Project path"} }, "required": ["project"] } }, { "name": "memory_lessons", "description": "List all saved lessons, optionally filtered by project.", "inputSchema": { "type": "object", "properties": { "project": {"type": "string", "description": "Filter by project (optional)"}, "minConfidence": {"type": "number", "description": "Filter by minimum confidence (optional, default 0.0)"}, "limit": {"type": "number", "description": "Max results to return (optional, default 50)"} } } }, { "name": "memory_lesson_save", "description": "Save a lesson learned from this session.", "inputSchema": { "type": "object", "properties": { "content": {"type": "string", "description": "The lesson learned"}, "context": {"type": "string", "description": "When/where this lesson applies"}, "project": {"type": "string", "description": "Project this lesson is about"} }, "required": ["content"] } }, { "name": "memory_lesson_recall", "description": "Search lessons by query. Returns lessons sorted by confidence and recency.", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "project": {"type": "string", "description": "Filter by project"} }, "required": ["query"] } }, { "name": "memory_lesson_search", "description": "Search lessons learned by query.", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query keywords"}, "project": {"type": "string", "description": "Filter by project path (optional)"} }, "required": ["query"] } }, { "name": "memory_consolidate", "description": "Trigger the consolidation pipeline to summarize sessions and extract semantic/procedural memory.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_reflect", "description": "Trigger reflection for a session, updating pending items, project context, and session patterns.", "inputSchema": { "type": "object", "properties": { "sessionId": {"type": "string", "description": "Session ID to reflect upon"}, "maxObservations": {"type": "number", "description": "Max observations to scan (optional, default 50)"} }, "required": ["sessionId"] } }, { "name": "memory_diagnose", "description": "Run diagnostic health checks across all memory subsystems.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_forget", "description": "Delete a memory, a session, or specific observations within a session.", "inputSchema": { "type": "object", "properties": { "memoryId": {"type": "string", "description": "Memory ID to delete"}, "sessionId": {"type": "string", "description": "Session ID to delete"}, "observationIds": { "oneOf": [ {"type": "string", "description": "Comma-separated observation IDs to delete"}, {"type": "array", "items": {"type": "string"}, "description": "List of observation IDs to delete"} ] } } } }, { "name": "memory_export", "description": "Export all memory data including sessions, memories, lessons, observations, and slots as JSON.", "inputSchema": { "type": "object", "properties": { "maxSessions": {"type": "number", "description": "Max sessions to export (optional)"}, "offset": {"type": "number", "description": "Pagination offset for sessions (optional)"} } } }, { "name": "agent_observe", "description": "Log a direct observation, thought, command execution, or action from the agent's active execution.", "inputSchema": { "type": "object", "properties": { "agentId": {"type": "string", "description": "ID/Name of the agent logging this (e.g. 'antigravity', optional)"}, "sessionId": {"type": "string", "description": "Active session ID"}, "project": {"type": "string", "description": "Canonical project path/identifier"}, "text": {"type": "string", "description": "The observation log, thought, or content"}, "content": {"type": "string", "description": "The observation log, thought, or content (alternative to text)"}, "type": {"type": "string", "description": "Observation type: thought, command, tool, error, result, conversation, other"}, "title": {"type": "string", "description": "A short summary title for the observation"}, "cwd": {"type": "string", "description": "Current working directory"} }, "required": ["sessionId", "project"] } }, { "name": "agent_remember", "description": "Explicitly save a key insight, fact, user preference, or architecture decision to long-term memory.", "inputSchema": { "type": "object", "properties": { "agentId": {"type": "string", "description": "ID/Name of the agent (e.g. 'antigravity', optional)"}, "content": {"type": "string", "description": "The memory content/insight"}, "project": {"type": "string", "description": "Canonical project path/identifier"}, "type": {"type": "string", "description": "Memory type: fact, preference, bug, workflow, architecture"}, "concepts": { "oneOf": [ {"type": "string", "description": "Comma-separated key concepts"}, {"type": "array", "items": {"type": "string"}, "description": "List of key concepts"} ] }, "files": { "oneOf": [ {"type": "string", "description": "Comma-separated relevant file paths"}, {"type": "array", "items": {"type": "string"}, "description": "List of relevant file paths"} ] } }, "required": ["content", "project"] } }, { "name": "memory_antigravity_sync", "description": "Sync Antigravity chat transcripts to agentmemory. Supports syncing the current session, all sessions, or sessions associated with the current folder.", "inputSchema": { "type": "object", "properties": { "mode": {"type": "string", "description": "Sync mode: current_session (default), current_folder, or all"}, "currentConversationId": {"type": "string", "description": "Optional conversation ID of the current active session"}, "currentFolder": {"type": "string", "description": "Optional current folder path to filter by"} }, "required": ["mode"] } }, { "name": "memory_antigravity_sync_all", "description": "Sync the current Antigravity session, automatically crystallize (summarize) it, and reflect to populate pinned memory slots in a single action.", "inputSchema": { "type": "object", "properties": { "mode": {"type": "string", "description": "Sync mode: current_session (default), current_folder, or all"}, "currentConversationId": {"type": "string", "description": "Optional conversation ID of the current active session"}, "currentFolder": {"type": "string", "description": "Optional current folder path to filter by"} }, "required": ["mode"] } }, { "name": "memory_slot_list", "description": "List all pinned memory slots.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_slot_get", "description": "Retrieve the content of a specific pinned memory slot.", "inputSchema": { "type": "object", "properties": { "label": {"type": "string", "description": "The label of the pinned slot to fetch"} }, "required": ["label"] } }, { "name": "memory_slot_create", "description": "Create a new pinned memory slot or overwrite an existing one.", "inputSchema": { "type": "object", "properties": { "label": {"type": "string", "description": "The label of the pinned slot"}, "content": {"type": "string", "description": "Initial content for the slot (optional)"}, "scope": {"type": "string", "description": "Scope: global or session (optional, default 'global')"}, "sizeLimit": {"type": "number", "description": "Character limit (optional)"}, "pinned": {"type": "boolean", "description": "Whether pinned to context (optional, default true)"} }, "required": ["label"] } }, { "name": "memory_slot_append", "description": "Append text content to a pinned memory slot.", "inputSchema": { "type": "object", "properties": { "label": {"type": "string", "description": "The label of the pinned slot"}, "text": {"type": "string", "description": "Text to append"} }, "required": ["label", "text"] } }, { "name": "memory_slot_replace", "description": "Replace the content of a pinned memory slot.", "inputSchema": { "type": "object", "properties": { "label": {"type": "string", "description": "The label of the pinned slot"}, "content": {"type": "string", "description": "New content"} }, "required": ["label", "content"] } }, { "name": "memory_slot_delete", "description": "Delete a pinned memory slot.", "inputSchema": { "type": "object", "properties": { "label": {"type": "string", "description": "The label of the pinned slot to delete"} }, "required": ["label"] } }, { "name": "memory_action_create", "description": "Create a new work item / action.", "inputSchema": { "type": "object", "properties": { "title": {"type": "string", "description": "Title of the action"}, "description": {"type": "string", "description": "Detailed description of the action (optional)"}, "priority": {"type": "number", "description": "Priority score, higher is more urgent (optional, default 0)"}, "status": {"type": "string", "description": "Status: pending, active, completed (optional, default 'pending')"}, "tags": { "oneOf": [ {"type": "string", "description": "Comma-separated tags (optional)"}, {"type": "array", "items": {"type": "string"}, "description": "List of tags (optional)"} ] }, "sessionId": {"type": "string", "description": "Link to a specific session ID (optional)"} }, "required": ["title"] } }, { "name": "memory_action_update", "description": "Update fields of an existing action.", "inputSchema": { "type": "object", "properties": { "actionId": {"type": "string", "description": "ID of the action to update"}, "title": {"type": "string", "description": "Updated title (optional)"}, "description": {"type": "string", "description": "Updated description (optional)"}, "priority": {"type": "number", "description": "Updated priority (optional)"}, "status": {"type": "string", "description": "Updated status: pending, active, completed, discarded (optional)"}, "tags": { "oneOf": [ {"type": "string", "description": "Comma-separated tags (optional)"}, {"type": "array", "items": {"type": "string"}, "description": "List of tags (optional)"} ] }, "sessionId": {"type": "string", "description": "Updated session ID (optional)"} }, "required": ["actionId"] } }, { "name": "memory_frontier", "description": "Get pending and active actions sorted by priority.", "inputSchema": {"type": "object", "properties": {}} }, { "name": "memory_crystallize", "description": "Crystallize/summarize all observations in a session.", "inputSchema": { "type": "object", "properties": { "sessionId": {"type": "string", "description": "Session ID to crystallize"} }, "required": ["sessionId"] } } ] return jsonify({"tools": tools}), 200 @app.route("/agentmemory/mcp/tools", methods=["POST"]) def mcp_tools_call(): auth_err = check_auth() if auth_err: return auth_err try: body = request.get_json(force=True) or {} name = body.get("name") args = body.get("arguments") or {} if not name: return jsonify({"error": "name is required"}), 400 print(f"[mcp] Calling tool {name} with args: {args}") text_out = "" if name == "memory_recall": q = args.get("query") limit = int(args.get("limit") or 10) res = functions._hybrid_search.search(q, limit) res = enrich_search_results(kv, res) text_out = json.dumps(res, indent=2) elif name == "memory_save": content = args.get("content") concepts = parse_mcp_list_arg(args.get("concepts")) files = parse_mcp_list_arg(args.get("files")) session_id = args.get("sessionId") project = args.get("project") res = functions.remember(kv, { "content": content, "type": args.get("type") or "fact", "concepts": concepts, "files": files, "project": project }) # If sessionId provided, also write observation so memory is linked to session if session_id and project and content: obs_payload = { "sessionId": session_id, "project": project, "cwd": "", "hookType": "post_tool_use", "timestamp": datetime_now_iso(), "agentId": functions.get_agent_id() or "agent", "data": { "tool_name": "memory_save", "tool_input": content[:500], "tool_output": res.get("id", ""), } } functions.observe(kv, obs_payload) text_out = json.dumps(res) elif name in ("memory_sessions", "memory_sessions_list"): sessions = functions.list_sessions(kv) text_out = json.dumps({"sessions": sessions}, indent=2) elif name == "memory_smart_search": q = args.get("query") limit = int(args.get("limit") or 10) res = functions._hybrid_search.search(q, limit) res = enrich_search_results(kv, res) text_out = json.dumps(res, indent=2) elif name == "memory_timeline": res = functions.timeline(kv, { "anchor": args.get("anchor"), "project": args.get("project"), "sessionId": args.get("sessionId") }) text_out = json.dumps(res, indent=2) elif name == "memory_observations": session_id = args.get("sessionId") if not session_id: return jsonify({"error": "sessionId is required"}), 400 obs = kv.list(KV.observations(session_id)) obs.sort(key=lambda o: o.get("timestamp", "")) text_out = json.dumps({"observations": obs}, indent=2) elif name == "memory_profile": res = functions.build_project_profile(kv, args.get("project")) text_out = json.dumps(res, indent=2) elif name == "memory_lessons": res = functions.lesson_list(kv, { "project": args.get("project"), "minConfidence": args.get("minConfidence"), "limit": args.get("limit") }) text_out = json.dumps(res, indent=2) elif name == "memory_lesson_save": res = functions.lesson_save(kv, { "content": args.get("content"), "context": args.get("context"), "project": args.get("project") }) text_out = json.dumps(res) elif name in ("memory_lesson_recall", "memory_lesson_search"): res = functions.lesson_recall(kv, { "query": args.get("query"), "project": args.get("project") }) text_out = json.dumps(res, indent=2) elif name == "memory_consolidate": res = functions.consolidate(kv) text_out = json.dumps(res, indent=2) elif name == "memory_reflect": session_id = args.get("sessionId") max_obs = int(args.get("maxObservations") or 50) if not session_id: return jsonify({"error": "sessionId is required"}), 400 res = functions.slot_reflect(kv, session_id, max_obs) text_out = json.dumps(res, indent=2) elif name == "memory_diagnose": res = functions.health_check(kv) text_out = json.dumps(res, indent=2) elif name == "memory_forget": obs_ids = parse_mcp_list_arg(args.get("observationIds")) res = functions.forget(kv, { "memoryId": args.get("memoryId"), "sessionId": args.get("sessionId"), "observationIds": obs_ids }) text_out = json.dumps(res, indent=2) elif name == "memory_export": max_sess = args.get("maxSessions") offset = args.get("offset") payload = {} if max_sess is not None: payload["maxSessions"] = max_sess if offset is not None: payload["offset"] = offset res = functions.export_data(kv, payload) text_out = json.dumps(res, indent=2) elif name == "agent_observe": agent_id = args.get("agentId") or functions.get_agent_id() or "agent" session_id = args.get("sessionId") project = args.get("project") text = args.get("text") or args.get("content") obs_type = args.get("type") or "other" title = args.get("title") or f"agent_{obs_type}" cwd = args.get("cwd") or "" if not session_id or not project or not text: return jsonify({"error": "sessionId, project, and text (or content) are required"}), 400 payload = { "sessionId": session_id, "project": project, "cwd": cwd, "hookType": "post_tool_use", "timestamp": datetime_now_iso(), "agentId": agent_id, "data": { "tool_name": title, "tool_input": text, "tool_output": text, } } res = functions.observe(kv, payload) text_out = json.dumps(res) elif name == "agent_remember": agent_id = args.get("agentId") or functions.get_agent_id() or "agent" content = args.get("content") project = args.get("project") session_id = args.get("sessionId") mem_type = args.get("type") or "fact" concepts = parse_mcp_list_arg(args.get("concepts")) files = parse_mcp_list_arg(args.get("files")) if not content or not project: return jsonify({"error": "content and project are required"}), 400 payload = { "content": content, "type": mem_type, "concepts": concepts, "files": files, "project": project, "agentId": agent_id } res = functions.remember(kv, payload) # If sessionId provided, write observation to link memory to session if session_id and content: obs_payload = { "sessionId": session_id, "project": project, "cwd": "", "hookType": "post_tool_use", "timestamp": datetime_now_iso(), "agentId": agent_id, "data": { "tool_name": "agent_remember", "tool_input": content[:500], "tool_output": res.get("id", ""), } } functions.observe(kv, obs_payload) text_out = json.dumps(res) elif name == "memory_antigravity_sync": mode = args.get("mode") or "current_session" current_convo = args.get("currentConversationId") current_folder = args.get("currentFolder") res = perform_antigravity_sync(mode, current_convo, current_folder) text_out = json.dumps(res) elif name == "memory_antigravity_sync_all": mode = args.get("mode") or "current_session" current_convo = args.get("currentConversationId") current_folder = args.get("currentFolder") sync_res = perform_antigravity_sync(mode, current_convo, current_folder) processed_sessions = sync_res.get("processedSessions") or sync_res.get("syncedSessions") or [] crystallizations = {} reflections = {} for cid in processed_sessions: session_id = f"antigravity_{cid[:18].replace('-', '_')}" try: cres = functions.summarize(kv, {"sessionId": session_id}) crystallizations[session_id] = cres except Exception as ex: crystallizations[session_id] = {"success": False, "error": str(ex)} try: rres = functions.slot_reflect(kv, session_id, 50) reflections[session_id] = rres except Exception as ex: reflections[session_id] = {"success": False, "error": str(ex)} text_out = json.dumps({ "success": sync_res.get("success", True), "syncedSessions": sync_res.get("syncedSessions") or [], "processedSessions": processed_sessions, "observationsAdded": sync_res.get("observationsAdded", 0), "crystallizations": crystallizations, "reflections": reflections }, indent=2) elif name == "memory_slot_list": res = functions.slot_list(kv) text_out = json.dumps(res, indent=2) elif name == "memory_slot_get": label = args.get("label") if not label: return jsonify({"error": "label is required"}), 400 res = functions.slot_get(kv, label) text_out = json.dumps(res, indent=2) elif name == "memory_slot_create": label = args.get("label") if not label: return jsonify({"error": "label is required"}), 400 res = functions.slot_create(kv, { "label": label, "content": args.get("content"), "scope": args.get("scope") or "global", "sizeLimit": args.get("sizeLimit"), "pinned": args.get("pinned", True) }) text_out = json.dumps(res, indent=2) elif name == "memory_slot_append": label = args.get("label") text = args.get("text") if not label or not text: return jsonify({"error": "label and text are required"}), 400 res = functions.slot_append(kv, label, text) text_out = json.dumps(res, indent=2) elif name == "memory_slot_replace": label = args.get("label") content = args.get("content") if not label or content is None: return jsonify({"error": "label and content are required"}), 400 res = functions.slot_replace(kv, label, content) text_out = json.dumps(res, indent=2) elif name == "memory_slot_delete": label = args.get("label") if not label: return jsonify({"error": "label is required"}), 400 res = functions.slot_delete(kv, label) text_out = json.dumps(res, indent=2) elif name == "memory_action_create": action_id = functions.generate_id("act") from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() tags = parse_mcp_list_arg(args.get("tags")) action = { "id": action_id, "title": args.get("title") or "", "description": args.get("description"), "priority": args.get("priority", 0), "status": args.get("status", "pending"), "tags": tags, "sessionId": args.get("sessionId"), "createdAt": now, "updatedAt": now, } kv.set(KV.actions, action_id, action) text_out = json.dumps({"action": action, "success": True}, indent=2) elif name == "memory_action_update": action_id = args.get("actionId") if not action_id: return jsonify({"error": "actionId is required"}), 400 existing = kv.get(KV.actions, action_id) if not existing: return jsonify({"error": "action not found"}), 404 from datetime import datetime, timezone allowed_fields = {"title", "description", "priority", "status", "tags", "sessionId"} updates = {k: v for k, v in args.items() if k in allowed_fields} if "tags" in args: updates["tags"] = parse_mcp_list_arg(args.get("tags")) existing.update(updates) existing["updatedAt"] = datetime.now(timezone.utc).isoformat() kv.set(KV.actions, action_id, existing) text_out = json.dumps({"action": existing, "success": True}, indent=2) elif name == "memory_frontier": items = kv.list(KV.actions) frontier = [a for a in items if a.get("status") in ("pending", "active")] frontier.sort(key=lambda a: (-(a.get("priority") or 0), a.get("createdAt", ""))) text_out = json.dumps({"frontier": frontier[:50], "total": len(frontier)}, indent=2) elif name == "memory_crystallize": session_id = args.get("sessionId") if not session_id: return jsonify({"error": "sessionId is required"}), 400 res = functions.summarize(kv, {"sessionId": session_id}) text_out = json.dumps(res, indent=2) else: return jsonify({"error": f"unknown tool: {name}"}), 400 return jsonify({ "content": [ {"type": "text", "text": text_out} ] }), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # ===================================================================== # Agent Discovery Endpoint (/agent.md and /auth.md) # ===================================================================== AGENT_MD_TEMPLATE = """# AgentMemory — Agent Connection Guide > This document is auto-generated. Fetch it fresh each session so you always have current values. ## 1. Server | Field | Value | |-------|-------| | Base URL | `http://127.0.0.1:{port}` | | API prefix | `/agentmemory` | | Liveness | `GET http://127.0.0.1:{port}/agentmemory/livez` (no auth) | | Auth required | `{auth_required}` | | Auth header | `Authorization: Bearer ` | {auth_note} --- ## 2. Quick-Start: Minimum Viable Session Do these in order at the start of every session: ``` 1. POST /agentmemory/session/start ← register yourself 2. POST /agentmemory/context ← inject past memory into your system prompt ... do work, call observe after each tool use ... 3. POST /agentmemory/session/end ← mark session complete ``` ### 2a. Start a session ```http POST /agentmemory/session/start Content-Type: application/json {auth_header_example} {{ "sessionId": "", "project": "", "cwd": "", "agentId": "", "title": "" }} ``` Response includes `context` — paste it into your system prompt before answering. ### 2b. Compile context (also works mid-session) ```http POST /agentmemory/context Content-Type: application/json {auth_header_example} {{ "sessionId": "", "project": "", "budget": 2000 }} ``` Returns `context` string wrapped in `` tags. Insert verbatim. ### 2c. Log an observation (call after every significant tool use) ```http POST /agentmemory/agent/observe Content-Type: application/json {auth_header_example} {{ "agentId": "", "sessionId": "", "project": "", "cwd": "", "text": "", "type": "tool | command | thought | error | result | conversation | other", "title": "" }} ``` ### 2d. Save a long-term memory (decisions, patterns, preferences) ```http POST /agentmemory/agent/remember Content-Type: application/json {auth_header_example} {{ "agentId": "", "project": "", "content": "", "type": "fact | preference | architecture | bug | workflow | pattern", "concepts": "auth, jwt, middleware", "files": "src/middleware/auth.ts, tests/auth.test.ts" }} ``` ### 2e. End session ```http POST /agentmemory/session/end Content-Type: application/json {auth_header_example} {{ "sessionId": "" }} ``` --- ## 3. MCP Tools (JSON-RPC style) Call via: `POST /agentmemory/mcp/tools` ```http POST /agentmemory/mcp/tools Content-Type: application/json {auth_header_example} {{ "name": "", "arguments": {{ ... }} }} ``` | Tool | Required args | Description | |------|---------------|-------------| | `memory_recall` | `query` | BM25+vector hybrid search over all past observations | | `memory_smart_search` | `query` | Same as recall with progressive disclosure | | `memory_save` | `content` | Save long-term memory (type, concepts, files optional) | | `memory_sessions` | — | List recent sessions with observation counts | | `memory_timeline` | `anchor` | Observations around a date or keyword anchor | | `memory_profile` | `project` | Top concepts, files, conventions for a project | | `memory_lesson_save` | `content` | Save a lesson (auto-strengthens duplicates) | | `memory_lesson_recall` | `query` | Search lessons scored by confidence + recency | | `agent_observe` | `agentId`, `sessionId`, `project`, `text` | Log an observation (same as REST) | | `agent_remember` | `agentId`, `content`, `project` | Save memory (same as REST) | Get full schemas: `GET /agentmemory/mcp/tools` --- ## 4. Full REST Endpoint Reference All endpoints require `Authorization: Bearer ` when auth is enabled. Prefix every path with `http://127.0.0.1:{port}`. ### Session Management | Method | Path | Key body fields | Notes | |--------|------|-----------------|-------| | `POST` | `/agentmemory/session/start` | `sessionId`, `project`, `cwd`, `agentId` | Returns `session` + `context` | | `POST` | `/agentmemory/session/end` | `sessionId` | Marks status=completed | | `GET` | `/agentmemory/sessions` | — | List all sessions | | `GET` | `/agentmemory/observations?sessionId=` | — | Observations for a session | | `POST` | `/agentmemory/session/commit` | `sha`, `sessionId`, `branch`, `message` | Link a git commit to a session | | `GET` | `/agentmemory/session/by-commit?sha=` | — | Sessions linked to a commit | | `GET` | `/agentmemory/commits` | `branch`, `repo`, `limit` | List tracked commits | ### Observations | Method | Path | Key body fields | Notes | |--------|------|-----------------|-------| | `POST` | `/agentmemory/observe` | `sessionId`, `hookType`, `timestamp`, `data` | Raw hook payload | | `POST` | `/agentmemory/agent/observe` | `agentId`, `sessionId`, `project`, `text`, `type` | Simplified agent endpoint | `hookType` values: `post_tool_use`, `post_tool_failure`, `prompt_submit`, `subagent_stop`, `task_completed`, `notification` ### Memory | Method | Path | Key body fields | Notes | |--------|------|-----------------|-------| | `POST` | `/agentmemory/remember` | `content`, `type`, `concepts[]`, `files[]`, `project` | Supersedes similar memories automatically | | `POST` | `/agentmemory/agent/remember` | `agentId`, `content`, `project`, `type`, `concepts`, `files` | Simplified agent endpoint | | `POST` | `/agentmemory/forget` | `memoryId` OR `sessionId` [+ `observationIds[]`] | Delete memory/session/observations | | `POST` | `/agentmemory/evolve` | `memoryId`, `newContent`, `newTitle` | Version a memory (creates new, marks old superseded) | | `POST` | `/agentmemory/search` | `query`, `limit` | Hybrid BM25+vector search | | `POST` | `/agentmemory/context` | `sessionId`, `project`, `budget` | Compile context block for injection | Memory `type` values: `fact`, `preference`, `architecture`, `bug`, `workflow`, `pattern` ### Lessons | Method | Path | Key body fields | Notes | |--------|------|-----------------|-------| | `POST` | `/agentmemory/lessons` | `content`, `context`, `project`, `confidence`, `tags[]` | Create/strengthen (duplicate = auto-reinforce) | | `GET` | `/agentmemory/lessons` | `project`, `minConfidence`, `limit` | List lessons | | `POST` | `/agentmemory/lessons/search` | `query`, `project`, `limit` | Keyword search scored by confidence×recency | | `POST` | `/agentmemory/lessons/strengthen` | `lessonId` | Manually reinforce a lesson (+0.1 confidence) | ### Memory Slots (Pinned Context) Slots are named text buffers injected into every context compilation. | Method | Path | Key body/query fields | Notes | |--------|------|-----------------------|-------| | `GET` | `/agentmemory/slots` | — | List all slots | | `GET` | `/agentmemory/slot?label=` | — | Get single slot | | `POST` | `/agentmemory/slot` | `label`, `content`, `scope`, `sizeLimit`, `pinned` | Create slot | | `POST` | `/agentmemory/slot/append` | `label`, `text` | Append text to slot | | `POST` | `/agentmemory/slot/replace` | `label`, `content` | Replace slot content | | `DELETE` | `/agentmemory/slot?label=` | — | Delete slot | | `POST` | `/agentmemory/slot/reflect` | `sessionId`, `maxObservations` | Auto-populate slots from session observations | Default slot labels: `persona`, `user_preferences`, `tool_guidelines`, `project_context`, `guidance`, `pending_items`, `session_patterns`, `self_notes` ### Knowledge Graph & Analytics | Method | Path | Key body/query fields | Notes | |--------|------|-----------------------|-------| | `GET` | `/agentmemory/relations` | — | List all knowledge graph edges | | `POST` | `/agentmemory/relations` | `sourceId`, `targetId`, `type` | Add a relation edge | | `POST` | `/agentmemory/timeline` | `anchor`, `project`, `before`, `after` | Observations around anchor | | `GET` | `/agentmemory/profile?project=` | — | Project profile (top concepts/files) | | `GET` | `/agentmemory/audit` | `operation`, `limit` | Audit log | | `GET` | `/agentmemory/profile` | `project` (optional) | Project profile; omit `project` to list all known projects | | `GET` | `/agentmemory/actions` | `limit`, `status` | List actions | | `POST` | `/agentmemory/actions` | `title`, `description`, `priority`, `tags[]`, `status` | Create action | | `PATCH`| `/agentmemory/actions/` | any action fields | Update action | | `GET` | `/agentmemory/frontier` | — | Pending+active actions sorted by priority | | `GET` | `/agentmemory/insights` | `limit` | List insights | | `GET` | `/agentmemory/replay/sessions` | — | Sessions list for replay | | `GET` | `/agentmemory/replay/load?sessionId=` | — | Full session + ordered observations for replay | | `POST` | `/agentmemory/auto-forget` | `dryRun` | Evict stale observations | ### Health & Config | Method | Path | Auth | Notes | |--------|------|------|-------| | `GET` | `/agentmemory/livez` | No | Liveness check | | `GET` | `/agentmemory/health` | Yes | Full health + version | | `GET` | `/agentmemory/config/flags` | Yes | Feature flags (graph, consolidation, compression) | --- ## 5. WebSocket Live Stream ``` ws://127.0.0.1:{port}/stream/mem-live/viewer ``` Connect to receive real-time events. Message types: - `raw_observation` — raw hook payload as received - `compressed_observation` — after synthetic compression + indexing No auth on the WebSocket endpoint. --- ## 6. Agent Identity & Scope Isolation Set `AGENT_ID` env var on the server to scope all read operations to a single agent. Set `AGENTMEMORY_AGENT_SCOPE=isolated` to enforce per-agent isolation. Always pass `agentId` in every request body — it's stored on every observation/memory and enables per-agent filtering via `?agentId=` on list endpoints. --- ## 7. Secrets Are Redacted Any text containing API keys, Bearer tokens, passwords, or JWTs is automatically redacted to `[REDACTED_SECRET]` before storage. Wrap sensitive blocks in `...` tags to force redaction. --- *Generated by agentmemory v{version} — `GET /agent.md` to refresh* """ @app.route("/agent.md", methods=["GET"]) @app.route("/auth.md", methods=["GET"]) def agent_discovery(): auth_err = check_auth() if auth_err: return auth_err port = int(os.getenv("III_REST_PORT", os.getenv("PORT", "3111"))) secret = os.getenv("AGENTMEMORY_SECRET") auth_required = "yes" if secret else "no" if secret: auth_note = ( "> **Auth is enabled.** Add `Authorization: Bearer ` to every request.\n" "> The secret is set via `AGENTMEMORY_SECRET` env var in `~/.agentmemory/.env`." ) auth_header_example = 'Authorization: Bearer ' else: auth_note = ( "> **Auth is disabled.** No `Authorization` header needed — all endpoints are open." ) auth_header_example = '' md = AGENT_MD_TEMPLATE.format( port=port, auth_required=auth_required, auth_note=auth_note, auth_header_example=auth_header_example, version="0.9.8", ) response = make_response(md) response.headers["Content-Type"] = "text/markdown; charset=utf-8" response.headers["Cache-Control"] = "no-cache" return response # ===================================================================== # Lifecycle Helpers # ===================================================================== if __name__ == "__main__": init_app() # Default Flask server settings port = int(os.getenv("III_REST_PORT", os.getenv("PORT", "3111"))) # Listen on all interfaces (required for container deployments like HF Spaces) print(f"[main] Starting Flask daemon on port {port}...") app.run(host="0.0.0.0", port=port, debug=False)