import os import re import time import uuid import json import hashlib import datetime from typing import Dict, Any, List, Optional, Tuple, Set from db import StateKV from search import ( SearchIndex, VectorIndex, GeminiEmbeddingProvider, HybridSearch, base64_to_float32, float32_to_base64 ) # ===================================================================== # Global Variables / Module State # ===================================================================== _bm25_index = SearchIndex() _vector_index = VectorIndex() _embedding_provider = None _hybrid_search = HybridSearch(_bm25_index, _vector_index, None, None) _index_persistence = None _stream_broadcaster = None # Callable: (payload) -> None # Default scopes matching schema.ts class KV: sessions = "mem:sessions" memories = "mem:memories" summaries = "mem:summaries" config = "mem:config" metrics = "mem:metrics" health = "mem:health" bm25Index = "mem:index:bm25" relations = "mem:relations" profiles = "mem:profiles" claudeBridge = "mem:claude-bridge" graphNodes = "mem:graph:nodes" graphEdges = "mem:graph:edges" graphSnapshot = "mem:graph:snapshot" graphNameIndex = "mem:graph:name-index" graphEdgeKey = "mem:graph:edge-key" graphNodeDegree = "mem:graph:node-degree" semantic = "mem:semantic" procedural = "mem:procedural" audit = "mem:audit" actions = "mem:actions" actionEdges = "mem:action-edges" leases = "mem:leases" routines = "mem:routines" routineRuns = "mem:routine-runs" signals = "mem:signals" checkpoints = "mem:checkpoints" mesh = "mem:mesh" sketches = "mem:sketches" facets = "mem:facets" sentinels = "mem:sentinels" crystals = "mem:crystals" lessons = "mem:lessons" insights = "mem:insights" graphEdgeHistory = "mem:graph:edge-history" retentionScores = "mem:retention" accessLog = "mem:access" imageRefs = "mem:image-refs" slots = "mem:slots" globalSlots = "mem:slots:global" commits = "mem:commits" recentSearches = "mem:recent-searches" @staticmethod def observations(session_id: str) -> str: return f"mem:obs:{session_id}" @staticmethod def team_shared(team_id: str) -> str: return f"mem:team:{team_id}:shared" @staticmethod def team_users(team_id: str, user_id: str) -> str: return f"mem:team:{team_id}:users:{user_id}" @staticmethod def team_profile(team_id: str) -> str: return f"mem:team:{team_id}:profile" @staticmethod def enriched_chunks(session_id: str) -> str: return f"mem:enriched:{session_id}" @staticmethod def latent_embeddings(obs_id: str) -> str: return f"mem:latent:{obs_id}" # ===================================================================== # Core Helpers & Utilities # ===================================================================== def generate_id(prefix: str) -> str: t = int(time.time() * 1000) chars = "0123456789abcdefghijklmnopqrstuvwxyz" ts_str = "" while t > 0: ts_str = chars[t % 36] + ts_str t //= 36 if not ts_str: ts_str = "0" rand = uuid.uuid4().hex[:12] return f"{prefix}_{ts_str}_{rand}" def fingerprint_id(prefix: str, content: str) -> str: h = hashlib.sha256(content.strip().lower().encode('utf-8')).hexdigest() return f"{prefix}_{h[:16]}" def auto_complete_old_active_sessions(kv: StateKV, current_session_id: str) -> int: sessions = kv.list(KV.sessions) count = 0 now = datetime.datetime.utcnow().isoformat() + "Z" for s in sessions: if s.get("id") != current_session_id and s.get("status") == "active": s["status"] = "completed" if "endedAt" not in s: s["endedAt"] = now s["updatedAt"] = now kv.set(KV.sessions, s["id"], s) count += 1 if count > 0: print(f"[session] Auto-completed {count} dangling active sessions.") return count def jaccard_similarity(a: str, b: str) -> float: tokens_a = [t for t in a.split() if len(t) > 2] tokens_b = [t for t in b.split() if len(t) > 2] set_a = set(tokens_a) set_b = set(tokens_b) if not set_a and not set_b: return 1.0 if not set_a or not set_b: return 0.0 intersection = len(set_a.intersection(set_b)) union = len(set_a.union(set_b)) return intersection / union # ===================================================================== # Privacy & Data Scrubbing # ===================================================================== PRIVATE_TAG_RE = re.compile(r'[\s\S]*?', re.IGNORECASE) SECRET_PATTERN_SOURCES = [ re.compile(r'(?:api[_-]?key|secret|token|password|credential|auth)[\s]*[=:]\s*["\']?[A-Za-z0-9_\-/.+]{20,}["\']?', re.IGNORECASE), re.compile(r'Bearer\s+[A-Za-z0-9._\-+/=]{20,}', re.IGNORECASE), re.compile(r'sk-proj-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), re.compile(r'(?:sk|pk|rk|ak)-[A-Za-z0-9][A-Za-z0-9\-_]{19,}', re.IGNORECASE), re.compile(r'sk-ant-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), re.compile(r'gh[pus]_[A-Za-z0-9]{36,}', re.IGNORECASE), re.compile(r'github_pat_[A-Za-z0-9_]{22,}', re.IGNORECASE), re.compile(r'xoxb-[A-Za-z0-9\-]+', re.IGNORECASE), re.compile(r'AKIA[0-9A-Z]{16}', re.IGNORECASE), re.compile(r'AIza[A-Za-z0-9\-_]{35}', re.IGNORECASE), re.compile(r'eyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}', re.IGNORECASE), re.compile(r'npm_[A-Za-z0-9]{36}', re.IGNORECASE), re.compile(r'glpat-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), re.compile(r'dop_v1_[A-Za-z0-9]{64}', re.IGNORECASE), ] def strip_private_data(input_str: str) -> str: result = PRIVATE_TAG_RE.sub("[REDACTED]", input_str) for pattern in SECRET_PATTERN_SOURCES: result = pattern.sub("[REDACTED_SECRET]", result) return result # ===================================================================== # Audit Log System # ===================================================================== def record_audit( kv: StateKV, operation: str, function_id: str, target_ids: List[str], details: Dict[str, Any] = {}, quality_score: Optional[float] = None, user_id: Optional[str] = None, ) -> Dict[str, Any]: entry = { "id": generate_id("aud"), "timestamp": datetime.datetime.utcnow().isoformat() + "Z", "operation": operation, "userId": user_id, "functionId": function_id, "targetIds": target_ids, "details": details, "qualityScore": quality_score, } kv.set(KV.audit, entry["id"], entry) return entry def safe_audit( kv: StateKV, operation: str, function_id: str, target_ids: List[str], details: Dict[str, Any] = {}, quality_score: Optional[float] = None, user_id: Optional[str] = None, ) -> None: try: record_audit(kv, operation, function_id, target_ids, details, quality_score, user_id) except Exception as e: print(f"[audit] Failed to write audit: {e}") def query_audit( kv: StateKV, filter_opts: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: all_entries = kv.list(KV.audit) entries = sorted(all_entries, key=lambda x: x.get("timestamp", ""), reverse=True) if not filter_opts: return entries[:100] op = filter_opts.get("operation") if op: entries = [e for e in entries if e.get("operation") == op] import dateutil.parser date_from = filter_opts.get("dateFrom") if date_from: try: dt_from = dateutil.parser.parse(date_from).replace(tzinfo=None) filtered_entries = [] for e in entries: ts = e.get("timestamp") if ts: try: dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None) if dt_ts >= dt_from: filtered_entries.append(e) except Exception: pass entries = filtered_entries except Exception: pass date_to = filter_opts.get("dateTo") if date_to: try: dt_to = dateutil.parser.parse(date_to).replace(tzinfo=None) filtered_entries = [] for e in entries: ts = e.get("timestamp") if ts: try: dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None) if dt_ts <= dt_to: filtered_entries.append(e) except Exception: pass entries = filtered_entries except Exception: pass limit = filter_opts.get("limit", 100) return entries[:limit] # ===================================================================== # Image Store System # ===================================================================== IMAGES_DIR = os.path.join(os.path.expanduser("~"), ".agentmemory", "images") def get_max_bytes() -> int: return int(os.getenv("AGENTMEMORY_IMAGE_STORE_MAX_BYTES", 500 * 1024 * 1024)) def is_managed_image_path(file_path: str) -> bool: if not file_path: return False resolved = os.path.abspath(file_path) normalized_images_dir = os.path.abspath(IMAGES_DIR) return resolved.startswith(normalized_images_dir + os.sep) or resolved == normalized_images_dir def save_image_to_disk(base64_data: str) -> Tuple[str, int]: if not base64_data: return "", 0 if not os.path.exists(IMAGES_DIR): os.makedirs(IMAGES_DIR, exist_ok=True) clean_base64 = base64_data ext = "png" if base64_data.startswith("data:image/"): comma_idx = base64_data.find(",") if comma_idx != -1: meta = base64_data[:comma_idx] if "jpeg" in meta or "jpg" in meta: ext = "jpg" elif "webp" in meta: ext = "webp" elif "gif" in meta: ext = "gif" clean_base64 = base64_data[comma_idx + 1:] elif base64_data.startswith("/9j/"): ext = "jpg" h = hashlib.sha256(clean_base64.encode('utf-8')).hexdigest() file_path = os.path.join(IMAGES_DIR, f"{h}.{ext}") if os.path.exists(file_path): return file_path, 0 import base64 buffer = base64.b64decode(clean_base64) with open(file_path, "wb") as f: f.write(buffer) size = os.path.getsize(file_path) return file_path, size def delete_image(file_path: Optional[str]) -> int: if not file_path or not is_managed_image_path(file_path): return 0 try: if os.path.exists(file_path): size = os.path.getsize(file_path) os.remove(file_path) return size except Exception as e: print(f"[agentmemory] Failed to delete image context: {e}") return 0 def touch_image(file_path: str) -> None: if not file_path or not is_managed_image_path(file_path): return try: if os.path.exists(file_path): os.utime(file_path, None) except Exception: pass # ===================================================================== # Index Persistence System (JSON Sharded) # ===================================================================== class IndexPersistence: def __init__(self, kv: StateKV, bm25: SearchIndex, vector: Optional[VectorIndex]): self.kv = kv self.bm25 = bm25 self.vector = vector def schedule_save(self) -> None: self.save() def save(self) -> None: try: self.save_sharded_index( json.dumps(self.bm25.serialize_data()), "data:manifest", "data", "mem:index:bm25:bm25:" ) if self.vector: self.save_sharded_index( json.dumps(self.vector.serialize_data()), "vectors:manifest", "vectors", "mem:index:bm25:vectors:" ) except Exception as e: print(f"[index persistence] failed to save index: {e}") def save_sharded_index(self, serialized: str, manifest_key: str, legacy_key: str, scope_prefix: str) -> None: previous = self.kv.get(KV.bm25Index, manifest_key) generation = generate_id("idx") chunk_chars = 2000000 shards = [] chunks = [] offset = 0 shard_idx = 0 while offset < len(serialized): scope = f"{scope_prefix}{generation}:{str(shard_idx).zfill(5)}" chunk = serialized[offset:offset + chunk_chars] shards.append({"scope": scope, "key": "data", "chars": len(chunk)}) chunks.append(chunk) offset += chunk_chars shard_idx += 1 for shard, chunk in zip(shards, chunks): self.kv.set(shard["scope"], shard["key"], chunk) next_manifest = { "v": 1, "generation": generation, "shards": shards, "chars": len(serialized) } self.kv.set(KV.bm25Index, manifest_key, next_manifest) self.kv.delete(KV.bm25Index, legacy_key) # Cleanup ALL obsolete shards starting with scope_prefix that are NOT in the current shards try: conn = self.kv._get_conn() try: with conn.cursor() as cursor: cursor.execute( "SELECT DISTINCT scope FROM kv_store WHERE scope LIKE %s", (scope_prefix + "%",) ) rows = cursor.fetchall() current_scopes = {s["scope"] for s in shards} to_delete = [] for row in rows: scope_name = row["scope"] if scope_name not in current_scopes: to_delete.append(scope_name) if to_delete: for i in range(0, len(to_delete), 50): chunk_delete = to_delete[i:i + 50] format_strings = ','.join(['%s'] * len(chunk_delete)) cursor.execute( f"DELETE FROM kv_store WHERE scope IN ({format_strings})", tuple(chunk_delete) ) finally: conn.close() except Exception as ex: print(f"[index persistence] error cleaning up obsolete shards: {ex}") if previous and isinstance(previous, dict) and previous.get("v") == 1 and isinstance(previous.get("shards"), list): current_shards = {(s["scope"], s["key"]) for s in shards} for old_shard in previous["shards"]: if (old_shard["scope"], old_shard["key"]) not in current_shards: self.kv.delete(old_shard["scope"], old_shard["key"]) def load(self) -> Dict[str, Any]: bm25_data = self.load_sharded_data("data", "data:manifest") bm25_loaded = False if bm25_data: try: self.bm25.restore_from_data(json.loads(bm25_data)) bm25_loaded = True except Exception as e: print(f"[index persistence] failed to restore BM25: {e}") vector_loaded = False if self.vector: vector_data = self.load_sharded_data("vectors", "vectors:manifest") if vector_data: try: self.vector.restore_from_data(json.loads(vector_data)) vector_loaded = True except Exception as e: print(f"[index persistence] failed to restore vectors: {e}") return {"bm25": bm25_loaded, "vector": vector_loaded} def load_sharded_data(self, legacy_key: str, manifest_key: str) -> Optional[str]: manifest = self.kv.get(KV.bm25Index, manifest_key) if manifest and isinstance(manifest, dict) and manifest.get("v") == 1: shards = manifest.get("shards", []) chunks = [] for shard in shards: chunk = self.kv.get(shard["scope"], shard["key"]) if chunk is None: return None chunks.append(chunk) return "".join(chunks) legacy = self.kv.get(KV.bm25Index, legacy_key) if isinstance(legacy, str): return legacy return None # ===================================================================== # Vector Index / Embedding Helpers # ===================================================================== def clip_embed_input(text: str) -> str: EMBED_MAX_CHARS = 16000 if len(text) <= EMBED_MAX_CHARS: return text return text[:EMBED_MAX_CHARS] def get_agent_id() -> Optional[str]: return os.getenv("AGENT_ID") or None def commit_if_enabled(kv: StateKV, message: str, agent_id: Optional[str]) -> Optional[str]: return kv.commit_version(message, agent_id or "unknown-agent") def is_agent_scope_isolated() -> bool: return os.getenv("AGENTMEMORY_AGENT_SCOPE") == "isolated" def is_auto_compress_enabled() -> bool: return os.getenv("AGENTMEMORY_AUTO_COMPRESS") == "true" def is_slots_enabled() -> bool: return os.getenv("AGENTMEMORY_SLOTS") == "true" def is_reflect_enabled() -> bool: return os.getenv("AGENTMEMORY_REFLECT") == "true" def is_graph_extraction_enabled() -> bool: return os.getenv("GRAPH_EXTRACTION_ENABLED") == "true" def is_consolidation_enabled() -> bool: val = os.getenv("CONSOLIDATION_ENABLED") if val in ("false", "0"): return False if val in ("true", "1"): return True return bool(os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")) def vector_index_add_guarded( obs_id: str, session_id: str, text: str, context: Dict[str, Any] ) -> bool: vi = _vector_index ep = _embedding_provider if not vi or not ep: return False try: clipped = clip_embed_input(text) embedding = ep.embed(clipped) if len(embedding) != ep.dimensions: print(f"[vector-index] Dimension mismatch: expected {ep.dimensions}, got {len(embedding)}") return False vi.add(obs_id, session_id, embedding) return True except Exception as e: print(f"[vector-index] Embed failed: {e}") return False # ===================================================================== # Observation System (Observe, Synthetic Compression) # ===================================================================== def extract_image(d: Any) -> Optional[str]: if not d: return None if isinstance(d, str): if d.startswith("data:image/") or d.startswith("iVBORw0KGgo") or d.startswith("/9j/"): return d return None if isinstance(d, dict): for k in ["image_data", "image_path", "imageBase64", "imagePath"]: if isinstance(d.get(k), str): return d[k] for key, val in d.items(): match = extract_image(val) if match: return match return None def infer_type(tool_name: Optional[str], hook_type: str) -> str: if hook_type == "post_tool_failure": return "error" if hook_type == "prompt_submit": return "conversation" if hook_type in ("subagent_stop", "task_completed"): return "subagent" if hook_type == "notification": return "notification" if not tool_name: return "other" n = re.sub(r'([a-z])([A-Z])', r'\1_\2', tool_name) n = re.sub(r'[-\s]+', '_', n).lower() def has_word(word: str) -> bool: return bool(re.search(rf"(^|_){word}(_|$)", n)) or n == word or n.endswith(word) or n.startswith(word) if any(has_word(w) for w in ["fetch", "http", "web"]): return "web_fetch" if any(has_word(w) for w in ["grep", "search", "glob", "find"]): return "search" if any(has_word(w) for w in ["bash", "shell", "exec", "run"]): return "command_run" if any(has_word(w) for w in ["edit", "update", "patch", "replace"]): return "file_edit" if any(has_word(w) for w in ["write", "create"]): return "file_write" if any(has_word(w) for w in ["read", "view"]): return "file_read" if any(has_word(w) for w in ["task", "agent"]): return "subagent" return "other" def extract_files(input_data: Any) -> List[str]: if not input_data or not isinstance(input_data, dict): return [] out = set() for key in ["file_path", "filepath", "path", "filePath", "file", "pattern"]: v = input_data.get(key) if isinstance(v, str) and 0 < len(v) < 512: out.add(v) return list(out) def stringify_for_narrative(v: Any) -> str: if v is None: return "" if isinstance(v, str): return v try: return json.dumps(v) except Exception: return str(v) def build_synthetic_compression(raw: Dict[str, Any]) -> Dict[str, Any]: tool_name = raw.get("toolName") or raw.get("hookType") input_str = stringify_for_narrative(raw.get("toolInput")) output_str = stringify_for_narrative(raw.get("toolOutput")) prompt_str = raw.get("userPrompt") or "" parts = [s for s in [prompt_str, input_str, output_str] if len(s) > 0] narrative = " | ".join(parts) if len(narrative) > 400: narrative = narrative[:399] + "\u2026" title = tool_name or "observation" if len(title) > 80: title = title[:79] + "\u2026" subtitle = None if input_str: subtitle = input_str if len(subtitle) > 120: subtitle = subtitle[:119] + "\u2026" res = { "id": raw["id"], "sessionId": raw["sessionId"], "timestamp": raw["timestamp"], "type": infer_type(raw.get("toolName"), raw["hookType"]), "title": title, "subtitle": subtitle, "facts": [], "narrative": narrative, "concepts": [], "files": extract_files(raw.get("toolInput")), "importance": 5, "confidence": 0.3, } for k in ["modality", "imageData", "agentId"]: if raw.get(k) is not None: res[k] = raw[k] return res def observe(kv: StateKV, payload: Dict[str, Any]) -> Dict[str, Any]: session_id = payload.get("sessionId") hook_type = payload.get("hookType") timestamp = payload.get("timestamp") if not session_id or not hook_type or not timestamp: raise ValueError("Invalid payload: sessionId, hookType, and timestamp are required") obs_id = generate_id("obs") sanitized_data = payload.get("data") try: json_str = json.dumps(payload.get("data")) sanitized = strip_private_data(json_str) sanitized_data = json.loads(sanitized) except Exception: sanitized_data = strip_private_data(str(payload.get("data"))) raw = { "id": obs_id, "sessionId": session_id, "timestamp": timestamp, "hookType": hook_type, "raw": sanitized_data, } extracted_img = extract_image(sanitized_data) if isinstance(sanitized_data, dict): if hook_type in ("post_tool_use", "post_tool_failure"): raw["toolName"] = sanitized_data.get("tool_name") raw["toolInput"] = sanitized_data.get("tool_input") raw["toolOutput"] = sanitized_data.get("tool_output") or sanitized_data.get("error") if hook_type == "prompt_submit": raw["userPrompt"] = sanitized_data.get("prompt") if extracted_img: raw["modality"] = "mixed" if (raw.get("toolInput") or raw.get("toolOutput") or raw.get("userPrompt")) else "image" elif isinstance(sanitized_data, str) and extracted_img: raw["modality"] = "image" max_obs = int(os.getenv("MAX_OBS_PER_SESSION", "500")) if max_obs > 0: existing = kv.list(KV.observations(session_id)) if len(existing) >= max_obs: raise ValueError(f"Session observation limit reached ({max_obs})") existing_session = kv.get(KV.sessions, session_id) inherited_agent_id = existing_session.get("agentId") if existing_session else get_agent_id() if inherited_agent_id: raw["agentId"] = inherited_agent_id if extracted_img and (extracted_img.startswith("data:image/") or extracted_img.startswith("iVBORw0KGgo") or extracted_img.startswith("/9j/")): try: file_path, bytes_written = save_image_to_disk(extracted_img) raw["imageData"] = file_path # Increment image ref count img_refs = kv.get(KV.imageRefs, file_path) or 0 kv.set(KV.imageRefs, file_path, img_refs + 1) except Exception as ex: print(f"[image store] failed: {ex}") # Set raw observation kv.set(KV.observations(session_id), obs_id, raw) # Stream raw observation broadcast_stream({ "type": "raw_observation", "sessionId": session_id, "data": { "type": "raw", "observation": raw, "sessionId": session_id } }) if existing_session: updates = [ {"type": "set", "path": "updatedAt", "value": datetime.datetime.utcnow().isoformat() + "Z"}, {"type": "set", "path": "observationCount", "value": (existing_session.get("observationCount") or 0) + 1} ] if not existing_session.get("firstPrompt") and isinstance(raw.get("userPrompt"), str): trimmed = " ".join(raw["userPrompt"].split()).strip() if trimmed: updates.append({"type": "set", "path": "firstPrompt", "value": trimmed[:200]}) kv.update(KV.sessions, session_id, updates) else: auto_complete_old_active_sessions(kv, session_id) project = payload.get("project") or "unknown" cwd = payload.get("cwd") or os.getcwd() trimmed_prompt = None if isinstance(raw.get("userPrompt"), str): trimmed_prompt = " ".join(raw["userPrompt"].split()).strip()[:200] ts = datetime.datetime.utcnow().isoformat() + "Z" new_sess = { "id": session_id, "project": project, "cwd": cwd, "startedAt": payload.get("timestamp") or ts, "updatedAt": ts, "status": "active", "observationCount": 1, } if inherited_agent_id: new_sess["agentId"] = inherited_agent_id if trimmed_prompt: new_sess["firstPrompt"] = trimmed_prompt kv.set(KV.sessions, session_id, new_sess) # Perform synthetic compression (we default to synthetic) synthetic = build_synthetic_compression(raw) for k in ["hookType", "raw", "toolName", "toolInput", "toolOutput", "userPrompt"]: if k in raw: synthetic[k] = raw[k] kv.set(KV.observations(session_id), obs_id, synthetic) _bm25_index.add(synthetic) comb_text = synthetic["title"] + " " + (synthetic.get("narrative") or "") vector_index_add_guarded(synthetic["id"], synthetic["sessionId"], comb_text, {"kind": "synthetic", "logId": synthetic["id"]}) if _index_persistence: _index_persistence.schedule_save() # Stream compressed observation broadcast_stream({ "type": "compressed_observation", "sessionId": session_id, "data": { "type": "compressed", "observation": synthetic, "sessionId": session_id } }) # Commit to Dolt commit_if_enabled(kv, f"Observe: {synthetic.get('title', 'observation')} in session {session_id[:8]}", synthetic.get("agentId")) return {"observationId": obs_id} # ===================================================================== # Memory System (Remember, Forget, Evolve) # ===================================================================== def memory_to_observation(memory: Dict[str, Any]) -> Dict[str, Any]: return { "id": memory["id"], "sessionId": memory.get("sessionIds", ["memory"])[0] if memory.get("sessionIds") else "memory", "timestamp": memory["createdAt"], "type": "decision", "title": memory["title"], "facts": [memory["content"]], "narrative": memory["content"], "concepts": memory.get("concepts", []), "files": memory.get("files", []), "importance": memory.get("strength", 7), } def remember(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: content = data.get("content") if not content or not content.strip(): raise ValueError("content is required") content = strip_private_data(content) concepts = data.get("concepts") or [] files = data.get("files") or [] source_obs = data.get("sourceObservationIds") or [] ttl_days = data.get("ttlDays") mem_type = data.get("type") or "fact" project = data.get("project") if project: project = project.strip() now = datetime.datetime.utcnow().isoformat() + "Z" existing_memories = kv.list(KV.memories) superseded_id = None superseded_version = 1 superseded_memory = None lower_content = content.lower() for existing in existing_memories: if existing.get("isLatest") is False: continue if project and existing.get("project") and existing["project"] != project: continue similarity = jaccard_similarity(lower_content, existing.get("content", "").lower()) if similarity > 0.7: superseded_id = existing["id"] superseded_version = existing.get("version") or 1 superseded_memory = existing break call_agent_id = data.get("agentId") or get_agent_id() new_mem = { "id": generate_id("mem"), "createdAt": now, "updatedAt": now, "type": mem_type, "title": content[:80], "content": content, "concepts": concepts, "files": files, "sessionIds": [], "strength": 7, "version": superseded_version + 1 if superseded_id else 1, "parentId": superseded_id, "supersedes": [superseded_id] if superseded_id else [], "sourceObservationIds": [i for i in source_obs if i], "isLatest": True, } if call_agent_id: new_mem["agentId"] = call_agent_id if project: new_mem["project"] = project if ttl_days and isinstance(ttl_days, (int, float)) and ttl_days > 0: forget_time = datetime.datetime.utcnow() + datetime.timedelta(days=ttl_days) new_mem["forgetAfter"] = forget_time.isoformat() + "Z" if superseded_memory: superseded_memory["isLatest"] = False kv.set(KV.memories, superseded_memory["id"], superseded_memory) kv.set(KV.memories, new_mem["id"], new_mem) try: _bm25_index.add(memory_to_observation(new_mem)) except Exception as ex: print(f"[bm25] memory add failed: {ex}") comb_text = new_mem["title"] + " " + new_mem["content"] vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]}) if _index_persistence: _index_persistence.schedule_save() # Commit to Dolt commit_if_enabled(kv, f"Remember: {new_mem.get('title', '')}", new_mem.get("agentId")) return {"success": True, "memory": new_mem} def forget(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: memory_id = data.get("memoryId") session_id = data.get("sessionId") obs_ids = data.get("observationIds") or [] deleted = 0 deleted_mem_ids = [] deleted_obs_ids = [] deleted_session = False if memory_id: mem = kv.get(KV.memories, memory_id) kv.delete(KV.memories, memory_id) if mem and mem.get("imageRef"): ref = mem["imageRef"] refs = kv.get(KV.imageRefs, ref) or 0 if refs > 0: kv.set(KV.imageRefs, ref, refs - 1) _bm25_index.remove(memory_id) if _vector_index: _vector_index.remove(memory_id) deleted_mem_ids.append(memory_id) deleted += 1 if session_id and obs_ids: for oid in obs_ids: obs = kv.get(KV.observations(session_id), oid) kv.delete(KV.observations(session_id), oid) if obs: img = obs.get("imageData") or obs.get("imageRef") if img: refs = kv.get(KV.imageRefs, img) or 0 if refs > 0: kv.set(KV.imageRefs, img, refs - 1) _bm25_index.remove(oid) if _vector_index: _vector_index.remove(oid) deleted_obs_ids.append(oid) deleted += 1 if session_id and not obs_ids and not memory_id: obs_list = kv.list(KV.observations(session_id)) for obs in obs_list: kv.delete(KV.observations(session_id), obs["id"]) img = obs.get("imageData") or obs.get("imageRef") if img: refs = kv.get(KV.imageRefs, img) or 0 if refs > 0: kv.set(KV.imageRefs, img, refs - 1) _bm25_index.remove(obs["id"]) if _vector_index: _vector_index.remove(obs["id"]) deleted_obs_ids.append(obs["id"]) deleted += 1 kv.delete(KV.sessions, session_id) kv.delete(KV.summaries, session_id) deleted_session = True deleted += 2 if deleted > 0: if _index_persistence: _index_persistence.schedule_save() safe_audit( kv, "forget", "mem::forget", deleted_mem_ids + deleted_obs_ids, { "sessionId": session_id, "deleted": deleted, "memoriesDeleted": len(deleted_mem_ids), "observationsDeleted": len(deleted_obs_ids), "sessionDeleted": deleted_session, "reason": "user-initiated forget" } ) # Commit to Dolt agent_id = data.get("agentId") or get_agent_id() commit_if_enabled(kv, f"Forget: memory_id={memory_id} session_id={session_id}", agent_id) return {"success": True, "deleted": deleted} # ===================================================================== # Prompt Context Compilation System # ===================================================================== def estimate_tokens(text: str) -> int: return int(len(text) / 3) def escape_xml_attr(s: str) -> str: return s.replace("&", "&").replace('"', """).replace("<", "<").replace(">", ">") def context(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: session_id = data.get("sessionId") project = data.get("project") budget = data.get("budget") or int(os.getenv("TOKEN_BUDGET", "2000")) if not session_id or not project: raise ValueError("sessionId and project are required") blocks = [] # 1. Pinned Slots pinned_slots = list_pinned_slots(kv) slot_content = render_pinned_context(pinned_slots) if slot_content: blocks.append({ "type": "memory", "content": slot_content, "tokens": estimate_tokens(slot_content), "recency": int(time.time() * 1000) }) # 2. Profile profile = kv.get(KV.profiles, project) if profile: profile_parts = [] if profile.get("topConcepts"): profile_parts.append( "Concepts: " + ", ".join([c["concept"] for c in profile["topConcepts"][:8]]) ) if profile.get("topFiles"): profile_parts.append( "Key files: " + ", ".join([f["file"] for f in profile["topFiles"][:5]]) ) if profile.get("conventions"): profile_parts.append("Conventions: " + "; ".join(profile["conventions"])) if profile.get("commonErrors"): profile_parts.append("Common errors: " + "; ".join(profile["commonErrors"][:3])) if profile_parts: profile_content = f"## Project Profile\n" + "\n".join(profile_parts) blocks.append({ "type": "memory", "content": profile_content, "tokens": estimate_tokens(profile_content), "recency": int(time.time() * 1000) }) # 3. Lessons lessons = kv.list(KV.lessons) relevant_lessons = [ l for l in lessons if not l.get("deleted") and (not l.get("project") or l["project"] == project) ] # Score lessons def lesson_score(l): factor = 1.5 if l.get("project") == project else 1.0 return factor * l.get("confidence", 0.5) relevant_lessons.sort(key=lesson_score, reverse=True) relevant_lessons = relevant_lessons[:10] if relevant_lessons: items = [] for l in relevant_lessons: desc = f"- ({l['confidence']:.2f}) {l['content']}" if l.get("context"): desc += f" — {l['context']}" items.append(desc) lessons_content = "## Lessons Learned\n" + "\n".join(items) blocks.append({ "type": "memory", "content": lessons_content, "tokens": estimate_tokens(lessons_content), "recency": int(time.time() * 1000) }) # 4. Sessions & Summaries all_sessions = kv.list(KV.sessions) sessions = [ s for s in all_sessions if s.get("project") == project and s["id"] != session_id ] sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) sessions = sessions[:10] for s in sessions: summary = kv.get(KV.summaries, s["id"]) if summary: content = f"## {summary.get('title', 'Session summary')}\n{summary.get('narrative', '')}\n" \ f"Decisions: {'; '.join(summary.get('keyDecisions', []))}\n" \ f"Files: {', '.join(summary.get('filesModified', []))}" blocks.append({ "type": "summary", "content": content, "tokens": estimate_tokens(content), "recency": int(time.time() * 1000) }) else: # Fallback to important observations obs_list = kv.list(KV.observations(s["id"])) important = [o for o in obs_list if o.get("title") and o.get("importance", 0) >= 5] if important: important.sort(key=lambda o: o.get("importance", 0), reverse=True) top = important[:5] items = [f"- [{o.get('type')}] {o.get('title')}: {o.get('narrative')}" for o in top] content = f"## Session {s['id'][:8]} ({s.get('startedAt')})\n" + "\n".join(items) blocks.append({ "type": "observation", "content": content, "tokens": estimate_tokens(content), "recency": int(time.time() * 1000) }) blocks.sort(key=lambda b: b.get("recency", 0), reverse=True) header = f'' footer = "" used_tokens = estimate_tokens(header) + estimate_tokens(footer) selected = [] for b in blocks: if used_tokens + b["tokens"] > budget: continue selected.append(b["content"]) used_tokens += b["tokens"] if not selected: return {"context": "", "blocks": 0, "tokens": 0} res_context = f"{header}\n" + "\n\n".join(selected) + f"\n{footer}" return {"context": res_context, "blocks": len(selected), "tokens": used_tokens} # ===================================================================== # Memory Slots System # ===================================================================== DEFAULT_SLOTS = [ { "label": "persona", "content": "", "sizeLimit": 1000, "description": "How the agent should see itself: role, tone, behavioural guidelines.", "pinned": True, "readOnly": False, "scope": "global", }, { "label": "user_preferences", "content": "", "sizeLimit": 2000, "description": "Coding style, tool preferences, naming conventions, and other habits the user wants preserved across sessions.", "pinned": True, "readOnly": False, "scope": "global", }, { "label": "tool_guidelines", "content": "", "sizeLimit": 1500, "description": "Rules the agent should follow when picking or sequencing tools (e.g. prefer X over Y, never run Z without confirmation).", "pinned": True, "readOnly": False, "scope": "global", }, { "label": "project_context", "content": "", "sizeLimit": 3000, "description": "Architecture decisions, codebase conventions, build/test commands, and cross-cutting constraints for the current project.", "pinned": True, "readOnly": False, "scope": "project", }, { "label": "guidance", "content": "", "sizeLimit": 1500, "description": "Active advice for the next session: what to focus on, what to avoid, open risks.", "pinned": True, "readOnly": False, "scope": "project", }, { "label": "pending_items", "content": "", "sizeLimit": 2000, "description": "Unfinished work, explicit TODOs, and promises made but not yet delivered.", "pinned": True, "readOnly": False, "scope": "project", }, { "label": "session_patterns", "content": "", "sizeLimit": 1500, "description": "Recurring behaviours and common struggles observed across recent sessions.", "pinned": False, "readOnly": False, "scope": "project", }, { "label": "self_notes", "content": "", "sizeLimit": 1500, "description": "Free-form notes the agent keeps for itself: hypotheses, dead ends, things to revisit.", "pinned": False, "readOnly": False, "scope": "project", }, ] def seed_defaults(kv: StateKV) -> None: now = datetime.datetime.utcnow().isoformat() + "Z" for tmpl in DEFAULT_SLOTS: scope = tmpl["scope"] target = KV.globalSlots if scope == "global" else KV.slots existing = kv.get(target, tmpl["label"]) if existing: continue slot = dict(tmpl) slot["createdAt"] = now slot["updatedAt"] = now kv.set(target, tmpl["label"], slot) def list_pinned_slots(kv: StateKV) -> List[Dict[str, Any]]: p_slots = kv.list(KV.slots) g_slots = kv.list(KV.globalSlots) merged = {} for s in g_slots: merged[s["label"]] = s for s in p_slots: merged[s["label"]] = s pinned = [s for s in merged.values() if s.get("pinned") and s.get("content", "").strip()] pinned.sort(key=lambda s: s["label"]) return pinned def render_pinned_context(slots: List[Dict[str, Any]]) -> str: if not slots: return "" lines = ["# agentmemory pinned slots", ""] for s in slots: lines.append(f"## {s['label']}") lines.append(s["content"].strip()) lines.append("") return "\n".join(lines) def slot_list(kv: StateKV) -> Dict[str, Any]: p_slots = kv.list(KV.slots) g_slots = kv.list(KV.globalSlots) merged = {} for s in g_slots: merged[s["label"]] = s for s in p_slots: merged[s["label"]] = s slots = sorted(list(merged.values()), key=lambda s: s["label"]) return {"success": True, "slots": slots} def slot_get(kv: StateKV, label: str) -> Dict[str, Any]: project = kv.get(KV.slots, label) if project: return {"success": True, "slot": project, "scope": "project"} global_s = kv.get(KV.globalSlots, label) if global_s: return {"success": True, "slot": global_s, "scope": "global"} return {"success": False, "error": "slot not found"} def slot_create(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: label = data.get("label") if not label or not re.match(r'^[a-z][a-z0-9_]*$', label): return {"success": False, "error": "label required (lowercase, starts with letter, [a-z0-9_])"} scope = data.get("scope") or "project" if scope not in ("project", "global"): return {"success": False, "error": "scope must be 'project' or 'global'"} limit = data.get("sizeLimit") or 2000 if not isinstance(limit, int) or limit < 1 or limit > 20000: return {"success": False, "error": "sizeLimit must be an integer between 1 and 20000"} content = strip_private_data(data.get("content") or "") if len(content) > limit: return {"success": False, "error": f"content exceeds sizeLimit ({len(content)} > {limit})"} description = data.get("description") or "" pinned = data.get("pinned", True) target_kv = KV.globalSlots if scope == "global" else KV.slots existing = kv.get(target_kv, label) if existing: return {"success": False, "error": f"slot already exists in {scope} scope"} now = datetime.datetime.utcnow().isoformat() + "Z" slot = { "label": label, "content": content, "sizeLimit": limit, "description": description, "pinned": pinned, "readOnly": False, "scope": scope, "createdAt": now, "updatedAt": now, } kv.set(target_kv, label, slot) safe_audit(kv, "slot_create", "mem::slot-create", [label], {"scope": scope, "sizeLimit": limit, "pinned": pinned}) # Commit to Dolt agent_id = data.get("agentId") or get_agent_id() commit_if_enabled(kv, f"Create slot: {label}", agent_id) return {"success": True, "slot": slot} def slot_append(kv: StateKV, label: str, text: str, agent_id: Optional[str] = None) -> Dict[str, Any]: res = slot_get(kv, label) if not res.get("success"): return {"success": False, "error": "slot not found"} slot = res["slot"] scope = res["scope"] target_kv = KV.globalSlots if scope == "global" else KV.slots if slot.get("readOnly"): return {"success": False, "error": "slot is read-only"} content = slot.get("content") or "" sep = "\n" if content and not content.endswith("\n") else "" next_content = content + sep + strip_private_data(text) limit = slot.get("sizeLimit") or 2000 if len(next_content) > limit: return { "success": False, "error": f"append would exceed sizeLimit ({len(next_content)} > {limit})", "currentSize": len(content), "sizeLimit": limit } slot["content"] = next_content slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" kv.set(target_kv, label, slot) safe_audit(kv, "slot_append", "mem::slot-append", [label], {"scope": scope, "added": len(text), "total": len(next_content)}) # Commit to Dolt commit_if_enabled(kv, f"Append slot: {label}", agent_id or get_agent_id()) return {"success": True, "slot": slot, "size": len(next_content)} def slot_replace(kv: StateKV, label: str, content: str, agent_id: Optional[str] = None) -> Dict[str, Any]: res = slot_get(kv, label) if not res.get("success"): return {"success": False, "error": "slot not found"} slot = res["slot"] scope = res["scope"] target_kv = KV.globalSlots if scope == "global" else KV.slots if slot.get("readOnly"): return {"success": False, "error": "slot is read-only"} content = strip_private_data(content) limit = slot.get("sizeLimit") or 2000 if len(content) > limit: return { "success": False, "error": f"content exceeds sizeLimit ({len(content)} > {limit})", "sizeLimit": limit } before_len = len(slot.get("content") or "") slot["content"] = content slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" kv.set(target_kv, label, slot) safe_audit(kv, "slot_replace", "mem::slot-replace", [label], {"scope": scope, "before": before_len, "after": len(content)}) # Commit to Dolt commit_if_enabled(kv, f"Replace slot: {label}", agent_id or get_agent_id()) return {"success": True, "slot": slot, "size": len(content)} def slot_delete(kv: StateKV, label: str, agent_id: Optional[str] = None) -> Dict[str, Any]: res = slot_get(kv, label) if not res.get("success"): return {"success": False, "error": "slot not found"} slot = res["slot"] scope = res["scope"] target_kv = KV.globalSlots if scope == "global" else KV.slots if slot.get("readOnly"): return {"success": False, "error": "slot is read-only"} kv.delete(target_kv, label) safe_audit(kv, "slot_delete", "mem::slot-delete", [label], {"scope": scope, "size": len(slot.get("content") or "")}) # Commit to Dolt commit_if_enabled(kv, f"Delete slot: {label}", agent_id or get_agent_id()) return {"success": True} def slot_reflect(kv: StateKV, session_id: str, max_obs: int = 50) -> Dict[str, Any]: observations = kv.list(KV.observations(session_id)) if not observations: return {"success": True, "applied": 0, "reason": "no observations for session"} recent = sorted(observations, key=lambda x: x.get("timestamp", ""), reverse=True)[:max_obs] pending_lines = [] pattern_counts = {} files = set() for obs in recent: title = (obs.get("title") or "").lower() narrative = (obs.get("narrative") or "").lower() if "todo" in narrative or "todo" in title: pending_lines.append(f"- {obs.get('title') or obs['id']}") if obs.get("type") == "error": pattern_counts["errors"] = pattern_counts.get("errors", 0) + 1 if obs.get("type") == "command_run": pattern_counts["commands"] = pattern_counts.get("commands", 0) + 1 for f in obs.get("files") or []: files.add(f) applied = 0 now = datetime.datetime.utcnow().isoformat() + "Z" if pending_lines: res = slot_get(kv, "pending_items") if res.get("success"): slot = res["slot"] scope = res["scope"] target_kv = scopeKv = KV.globalSlots if scope == "global" else KV.slots already = set((slot.get("content") or "").split("\n")) fresh = [l for l in pending_lines if l not in already] if fresh: sep = "\n" if slot.get("content") and not slot["content"].endswith("\n") else "" next_content = (slot.get("content") or "") + sep + "\n".join(fresh) limit = slot.get("sizeLimit") or 2000 if len(next_content) > limit: next_content = next_content[-limit:] slot["content"] = next_content slot["updatedAt"] = now kv.set(target_kv, "pending_items", slot) applied += 1 if pattern_counts: res = slot_get(kv, "session_patterns") if res.get("success"): slot = res["slot"] scope = res["scope"] target_kv = KV.globalSlots if scope == "global" else KV.slots summary = [f"last reflection: {now}"] for k, v in pattern_counts.items(): summary.append(f"- {k}: {v} in last {len(recent)} observations") next_content = "\n".join(summary) limit = slot.get("sizeLimit") or 2000 if len(next_content) > limit: next_content = next_content[:limit] slot["content"] = next_content slot["updatedAt"] = now kv.set(target_kv, "session_patterns", slot) applied += 1 if files: res = slot_get(kv, "project_context") if res.get("success"): slot = res["slot"] scope = res["scope"] target_kv = KV.globalSlots if scope == "global" else KV.slots already = slot.get("content") or "" fresh = [f for f in files if f not in already][:20] if fresh: header_line = "Files touched in recent sessions:" if not already else "" sep = "\n" if already and not already.endswith("\n") else "" lines = [already] if header_line: lines.append(header_line) for f in fresh: lines.append(f"- {f}") next_content = sep.join([l for l in lines if l]) limit = slot.get("sizeLimit") or 2000 if len(next_content) > limit: next_content = next_content[-limit:] slot["content"] = next_content slot["updatedAt"] = now kv.set(target_kv, "project_context", slot) applied += 1 if applied > 0: safe_audit(kv, "slot_reflect", "mem::slot-reflect", [session_id], {"observationCount": len(recent), "slotsUpdated": applied}) commit_if_enabled(kv, f"Slot reflect: updated {applied} slots in session {session_id[:8]}", "system") return {"success": True, "applied": applied, "observationsReviewed": len(recent)} # ===================================================================== # Lessons Learned System # ===================================================================== def reinforce_lesson(lesson: Dict[str, Any]) -> None: now = datetime.datetime.utcnow().isoformat() + "Z" lesson["reinforcements"] = lesson.get("reinforcements", 0) + 1 conf = lesson.get("confidence", 0.5) lesson["confidence"] = min(1.0, conf + 0.1 * (1 - conf)) lesson["lastReinforcedAt"] = now lesson["updatedAt"] = now def lesson_save(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: content = data.get("content") if not content or not content.strip(): return {"success": False, "error": "content is required"} content = strip_private_data(content) context_str = strip_private_data(data.get("context") or "") agent_id = data.get("agentId") or get_agent_id() fp = fingerprint_id("lsn", content) existing = kv.get(KV.lessons, fp) if existing and not existing.get("deleted"): reinforce_lesson(existing) if context_str and not existing.get("context"): existing["context"] = context_str kv.set(KV.lessons, existing["id"], existing) safe_audit(kv, "lesson_strengthen", "mem::lesson-save", [existing["id"]]) # Commit to Dolt commit_if_enabled(kv, f"Strengthen lesson: {existing.get('content', '')[:60]}", agent_id) return {"success": True, "action": "strengthened", "lesson": existing} confidence = data.get("confidence") if not isinstance(confidence, (int, float)) or confidence < 0 or confidence > 1: confidence = 0.5 now = datetime.datetime.utcnow().isoformat() + "Z" lesson = { "id": fp, "content": content.strip(), "context": context_str.strip(), "confidence": confidence, "reinforcements": 0, "source": data.get("source") or "manual", "sourceIds": data.get("sourceIds") or [], "project": data.get("project"), "tags": data.get("tags") or [], "createdAt": now, "updatedAt": now, "decayRate": 0.05, } kv.set(KV.lessons, lesson["id"], lesson) safe_audit(kv, "lesson_save", "mem::lesson-save", [lesson["id"]]) # Commit to Dolt commit_if_enabled(kv, f"Create lesson: {lesson['content'][:60]}", agent_id) return {"success": True, "action": "created", "lesson": lesson} def lesson_list(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: limit = data.get("limit") or 50 min_confidence = data.get("minConfidence") or 0.0 all_lessons = kv.list(KV.lessons) lessons = [ l for l in all_lessons if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence ] project = data.get("project") if project: lessons = [l for l in lessons if l.get("project") == project] source = data.get("source") if source: lessons = [l for l in lessons if l.get("source") == source] lessons.sort(key=lambda x: x.get("confidence", 0.5), reverse=True) return {"success": True, "lessons": lessons[:limit]} def lesson_recall(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: query = data.get("query") if not query or not query.strip(): return {"success": False, "error": "query is required"} query_lower = query.lower() min_confidence = data.get("minConfidence") or 0.1 limit = data.get("limit") or 10 all_lessons = kv.list(KV.lessons) lessons = [ l for l in all_lessons if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence ] project = data.get("project") if project: lessons = [l for l in lessons if l.get("project") == project] scored = [] terms = [t for t in query_lower.split() if len(t) > 1] for l in lessons: text = f"{l.get('content', '')} {l.get('context', '')} {' '.join(l.get('tags') or [])}".lower() match_count = sum(1 for t in terms if t in text) if match_count == 0: continue relevance = match_count / len(terms) baseline = l.get("lastReinforcedAt") or l.get("createdAt") import dateutil.parser dt = dateutil.parser.parse(baseline) days = (datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24) recency_boost = 1 / (1 + days * 0.01) score = l.get("confidence", 0.5) * relevance * recency_boost scored.append({"lesson": l, "score": score}) scored.sort(key=lambda x: x["score"], reverse=True) results = [] for s in scored[:limit]: item = dict(s["lesson"]) item["score"] = round(s["score"], 3) results.append(item) safe_audit(kv, "lesson_recall", "mem::lesson-recall", [], {"query": query, "resultCount": len(results)}) return {"success": True, "lessons": results} def lesson_strengthen(kv: StateKV, lesson_id: str) -> Dict[str, Any]: lesson = kv.get(KV.lessons, lesson_id) if not lesson or lesson.get("deleted"): return {"success": False, "error": "lesson not found"} reinforce_lesson(lesson) kv.set(KV.lessons, lesson["id"], lesson) safe_audit(kv, "lesson_strengthen", "mem::lesson-strengthen", [lesson["id"]]) # Commit to Dolt commit_if_enabled(kv, f"Strengthen lesson: {lesson.get('content', '')[:60]}", get_agent_id()) return {"success": True, "lesson": lesson} def lesson_decay_sweep(kv: StateKV) -> Dict[str, Any]: all_lessons = kv.list(KV.lessons) decayed = 0 soft_deleted = 0 now = datetime.datetime.utcnow() timestamp = now.isoformat() + "Z" for l in all_lessons: if l.get("deleted"): continue baseline_str = l.get("lastDecayedAt") or l.get("lastReinforcedAt") or l["createdAt"] import dateutil.parser dt = dateutil.parser.parse(baseline_str) weeks = (now.replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24 * 7) if weeks < 1.0: continue decay = l.get("decayRate", 0.05) * weeks new_conf = max(0.05, l.get("confidence", 0.5) - decay) if new_conf != l.get("confidence"): before = l.get("confidence", 0.5) l["confidence"] = round(new_conf, 3) l["lastDecayedAt"] = timestamp l["updatedAt"] = timestamp if l["confidence"] <= 0.1 and l.get("reinforcements", 0) == 0: l["deleted"] = True soft_deleted += 1 else: decayed += 1 kv.set(KV.lessons, l["id"], l) safe_audit(kv, "lesson_strengthen", "mem::lesson-decay-sweep", [l["id"]], { "action": "soft-delete" if l.get("deleted") else "decay", "actor": "system", "reason": "decay-sweep", "before": {"confidence": before, "deleted": False}, "after": {"confidence": l["confidence"], "deleted": bool(l.get("deleted"))} }) if decayed > 0 or soft_deleted > 0: commit_if_enabled(kv, f"Lesson decay sweep: decayed {decayed}, soft-deleted {soft_deleted}", "system") return {"success": True, "decayed": decayed, "softDeleted": soft_deleted, "total": len(all_lessons)} # ===================================================================== # Database Rebuilder (Index Bootstrapper) # ===================================================================== def rebuild_index(kv: StateKV) -> int: _bm25_index.clear() if _vector_index: _vector_index.clear() # Backfill BM25 with observations sessions = kv.list(KV.sessions) total_indexed = 0 for sess in sessions: sid = sess.get("id") if not sid: continue obs_list = kv.list(KV.observations(sid)) for obs in obs_list: # Only index compressed (non-raw) observations if obs.get("title") and obs.get("narrative"): _bm25_index.add(obs) comb_text = obs["title"] + " " + obs["narrative"] vector_index_add_guarded(obs["id"], sid, comb_text, {"kind": "observation", "logId": obs["id"]}) total_indexed += 1 # Backfill BM25 with memories memories = kv.list(KV.memories) for mem in memories: if mem.get("isLatest") is False: continue if not mem.get("title") or not mem.get("content"): continue converted = memory_to_observation(mem) _bm25_index.add(converted) comb_text = mem["title"] + " " + mem["content"] vector_index_add_guarded(mem["id"], "memory", comb_text, {"kind": "memory", "logId": mem["id"]}) total_indexed += 1 if _index_persistence and total_indexed > 0: _index_persistence.schedule_save() return total_indexed # ===================================================================== # Advanced Function Stubs / CRUD Operations # ===================================================================== def list_sessions(kv: StateKV) -> List[Dict[str, Any]]: sessions = kv.list(KV.sessions) for s in sessions: sid = s.get("id") if sid: summary = kv.get(KV.summaries, sid) if summary: s["title"] = summary.get("title") s["summary"] = summary.get("narrative") sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) return sessions def get_session(kv: StateKV, session_id: str) -> Optional[Dict[str, Any]]: s = kv.get(KV.sessions, session_id) if s: summary = kv.get(KV.summaries, session_id) if summary: s["title"] = summary.get("title") s["summary"] = summary.get("narrative") return s def create_session(kv: StateKV, session: Dict[str, Any]) -> Dict[str, Any]: auto_complete_old_active_sessions(kv, session["id"]) kv.set(KV.sessions, session["id"], session) return session def end_session(kv: StateKV, session_id: str) -> bool: now = datetime.datetime.utcnow().isoformat() + "Z" kv.update(KV.sessions, session_id, [ {"type": "set", "path": "endedAt", "value": now}, {"type": "set", "path": "status", "value": "completed"} ]) return True def timeline(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: # Simple timeline query returning observations sorted by timestamp anchor = data.get("anchor") project = data.get("project") session_id = data.get("sessionId") before = data.get("before") or 10 after = data.get("after") or 10 sessions = kv.list(KV.sessions) if session_id: sessions = [s for s in sessions if s.get("id") == session_id] elif project: sessions = [s for s in sessions if s.get("project") == project] all_obs = [] for s in sessions: all_obs.extend(kv.list(KV.observations(s["id"]))) # sort by timestamp all_obs.sort(key=lambda x: x.get("timestamp", "")) anchor_idx = -1 for idx, obs in enumerate(all_obs): if obs["id"] == anchor or obs.get("timestamp", "") >= (anchor or ""): anchor_idx = idx break if anchor_idx == -1: anchor_idx = len(all_obs) // 2 start = max(0, anchor_idx - before) end = min(len(all_obs), anchor_idx + after + 1) return { "success": True, "observations": all_obs[start:end], "anchorIndex": anchor_idx - start } def get_project_profile(kv: StateKV, project: str) -> Dict[str, Any]: prof = kv.get(KV.profiles, project) if not prof: prof = { "project": project, "topConcepts": [], "topFiles": [], "conventions": [], "commonErrors": [], "updatedAt": datetime.datetime.utcnow().isoformat() + "Z" } if not prof.get("topConcepts") and not prof.get("topFiles"): prof = build_project_profile(kv, project) return prof def build_project_profile(kv: StateKV, project: str) -> Dict[str, Any]: prof = kv.get(KV.profiles, project) if not prof: prof = { "project": project, "topConcepts": [], "topFiles": [], "conventions": [], "commonErrors": [], "updatedAt": datetime.datetime.utcnow().isoformat() + "Z" } # Stored profile may lack topConcepts/topFiles — compute from observations + memories if empty if not prof.get("topConcepts") and not prof.get("topFiles"): import re as _re, json as _j, os.path as _osp from collections import Counter sessions = kv.list(KV.sessions) project_sessions = [s for s in sessions if s.get("project") == project] concept_counts = Counter() file_counts = Counter() def _harvest_file(path, fc, cc): if not isinstance(path, str) or not path: return fc[path] += 1 parts = _re.split(r'[\\/]', path) fname = parts[-1] if parts else "" skip = {"tmp", "temp", "claude", "appdata", "local", "users", "windows"} for part in parts[:-1]: p = part.lower().strip() if p and len(p) > 2 and p not in skip and not _re.match(r'^[a-z]:|^\.|^--', p): cc[p] += 1 stem = _osp.splitext(fname)[0] if stem and len(stem) > 2: cc[stem.lower()] += 1 ext = _osp.splitext(fname)[1].lstrip(".") if ext in ("py", "ts", "js", "jsx", "tsx", "go", "rs", "java", "cs", "cpp"): cc[ext] += 1 for s in project_sessions: sid = s.get("id", "") if not sid: continue for o in kv.list(KV.observations(sid)): for c in (o.get("concepts") or []): if isinstance(c, str) and c: concept_counts[c] += 1 for f in (o.get("files") or []): _harvest_file(f, file_counts, concept_counts) tn = o.get("toolName") if tn: concept_counts[tn] += 1 ti = o.get("toolInput") if isinstance(ti, str): try: ti = _j.loads(ti) except Exception: ti = {} if isinstance(ti, dict): for fk in ("path", "file_path", "file", "filename"): _harvest_file(ti.get(fk, ""), file_counts, concept_counts) narr = o.get("narrative") or o.get("raw") or "" if isinstance(narr, str) and narr.startswith("{"): try: nd = _j.loads(narr) if isinstance(nd, dict): tn2 = nd.get("toolName") or nd.get("tool_name") if tn2: concept_counts[tn2] += 1 for fk in ("path", "file_path", "file", "filename"): _harvest_file(nd.get(fk, ""), file_counts, concept_counts) except Exception: pass # memories for this project for m in kv.list(KV.memories): if m.get("project") == project: for c in (m.get("concepts") or []): if c: concept_counts[c] += 1 for f in (m.get("files") or []): _harvest_file(f, file_counts, concept_counts) prof["topConcepts"] = [{"concept": c, "frequency": n} for c, n in concept_counts.most_common(20)] prof["topFiles"] = [{"file": f, "frequency": n} for f, n in file_counts.most_common(20)] prof["sessionCount"] = len(project_sessions) return prof def export_data(kv: StateKV, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if data is None: data = {} raw_max = data.get("maxSessions") max_sessions = None if raw_max is not None: try: max_sessions = min(max(int(raw_max), 1), 1000) except Exception: pass raw_offset = data.get("offset") offset = 0 if raw_offset is not None: try: offset = max(int(raw_offset), 0) except Exception: pass all_sessions = kv.list(KV.sessions) all_sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) if max_sessions is not None: paginated_sessions = all_sessions[offset:offset + max_sessions] else: paginated_sessions = all_sessions memories = kv.list(KV.memories) summaries = kv.list(KV.summaries) observations = {} for s in paginated_sessions: sid = s.get("id") if sid: obs = kv.list(KV.observations(sid)) if obs: observations[sid] = obs profiles = [] unique_projects = list(set(s.get("project") for s in paginated_sessions if s.get("project"))) for proj in unique_projects: p = kv.get(KV.profiles, proj) if p: profiles.append(p) graph_nodes = kv.list(KV.graphNodes) graph_edges = kv.list(KV.graphEdges) semantic_memories = kv.list(KV.semantic) procedural_memories = kv.list(KV.procedural) actions = kv.list(KV.actions) action_edges = kv.list(KV.actionEdges) sentinels = kv.list(KV.sentinels) sketches = kv.list(KV.sketches) crystals = kv.list(KV.crystals) facets = kv.list(KV.facets) lessons = kv.list(KV.lessons) insights = kv.list(KV.insights) routines = kv.list(KV.routines) signals = kv.list(KV.signals) checkpoints = kv.list(KV.checkpoints) access_logs = kv.list(KV.accessLog) res = { "version": "0.9.21", "exportedAt": datetime.datetime.utcnow().isoformat() + "Z", "sessions": paginated_sessions, "observations": observations, "memories": memories, "summaries": summaries } if profiles: res["profiles"] = profiles if graph_nodes: res["graphNodes"] = graph_nodes if graph_edges: res["graphEdges"] = graph_edges if semantic_memories: res["semanticMemories"] = semantic_memories if procedural_memories: res["proceduralMemories"] = procedural_memories if actions: res["actions"] = actions if action_edges: res["actionEdges"] = action_edges if sentinels: res["sentinels"] = sentinels if sketches: res["sketches"] = sketches if crystals: res["crystals"] = crystals if facets: res["facets"] = facets if lessons: res["lessons"] = lessons if insights: res["insights"] = insights if routines: res["routines"] = routines if signals: res["signals"] = signals if checkpoints: res["checkpoints"] = checkpoints if access_logs: res["accessLogs"] = access_logs if max_sessions is not None: res["pagination"] = { "offset": offset, "limit": max_sessions, "total": len(all_sessions), "hasMore": offset + max_sessions < len(all_sessions) } return res def set_project_profile(kv: StateKV, project: str, profile: Dict[str, Any]) -> Dict[str, Any]: profile["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" kv.set(KV.profiles, project, profile) # Commit to Dolt commit_if_enabled(kv, f"Set project profile for {project}", get_agent_id()) return profile def get_relations(kv: StateKV) -> List[Dict[str, Any]]: return kv.list(KV.relations) def add_relation(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: rel = { "id": generate_id("rel"), "sourceId": data["sourceId"], "targetId": data["targetId"], "type": data["type"], "createdAt": datetime.datetime.utcnow().isoformat() + "Z" } kv.set(KV.relations, rel["id"], rel) # Commit to Dolt agent_id = data.get("agentId") or get_agent_id() commit_if_enabled(kv, f"Add relation {rel['type']} between {rel['sourceId']} and {rel['targetId']}", agent_id) return rel def evolve_memory(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: # Update memory content and create a new version mem_id = data["memoryId"] new_content = data["newContent"] new_title = data.get("newTitle") existing = kv.get(KV.memories, mem_id) if not existing: raise ValueError("Memory not found") existing["isLatest"] = False kv.set(KV.memories, existing["id"], existing) now = datetime.datetime.utcnow().isoformat() + "Z" new_mem = dict(existing) new_mem["id"] = generate_id("mem") new_mem["content"] = new_content if new_title: new_mem["title"] = new_title else: new_mem["title"] = new_content[:80] new_mem["version"] = existing.get("version", 1) + 1 new_mem["parentId"] = existing["id"] new_mem["supersedes"] = [existing["id"]] new_mem["createdAt"] = now new_mem["updatedAt"] = now new_mem["isLatest"] = True kv.set(KV.memories, new_mem["id"], new_mem) # Re-index try: _bm25_index.add(memory_to_observation(new_mem)) _bm25_index.remove(existing["id"]) except Exception: pass comb_text = new_mem["title"] + " " + new_mem["content"] vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]}) if _vector_index: _vector_index.remove(existing["id"]) if _index_persistence: _index_persistence.schedule_save() # Commit to Dolt agent_id = data.get("agentId") or get_agent_id() or new_mem.get("agentId") commit_if_enabled(kv, f"Evolve memory {new_mem['id']} (v{new_mem['version']}): {new_mem['title']}", agent_id) return {"success": True, "memory": new_mem} def auto_forget(kv: StateKV, dry_run: bool = False) -> Dict[str, Any]: now_dt = datetime.datetime.utcnow() now_str = now_dt.isoformat() + "Z" evicted_memories = [] evicted_observations = [] # 1. Evict expired memories memories = kv.list(KV.memories) for mem in memories: forget_after = mem.get("forgetAfter") if forget_after: try: import dateutil.parser fa_dt = dateutil.parser.parse(forget_after) if fa_dt.tzinfo: fa_dt = fa_dt.replace(tzinfo=None) if fa_dt < now_dt: evicted_memories.append(mem["id"]) except Exception as e: print(f"[auto_forget] Failed to parse forgetAfter '{forget_after}': {e}") # 2. Evict low-value old observations (importance <= 2, age > 180 days) sessions = kv.list(KV.sessions) for sess in sessions: sid = sess.get("id") if not sid: continue obs_list = kv.list(KV.observations(sid)) for obs in obs_list: importance = obs.get("importance") ts = obs.get("timestamp") if importance is not None and ts: try: import dateutil.parser ts_dt = dateutil.parser.parse(ts) if ts_dt.tzinfo: ts_dt = ts_dt.replace(tzinfo=None) age_days = (now_dt - ts_dt).days if importance <= 2 and age_days > 180: evicted_observations.append((sid, obs["id"])) except Exception as e: print(f"[auto_forget] Failed to parse timestamp '{ts}': {e}") if not dry_run: for mem_id in evicted_memories: mem = kv.get(KV.memories, mem_id) kv.delete(KV.memories, mem_id) if mem and mem.get("imageRef"): ref = mem["imageRef"] refs = kv.get(KV.imageRefs, ref) or 0 if refs > 0: kv.set(KV.imageRefs, ref, refs - 1) _bm25_index.remove(mem_id) if _vector_index: _vector_index.remove(mem_id) for sid, obs_id in evicted_observations: obs = kv.get(KV.observations(sid), obs_id) kv.delete(KV.observations(sid), obs_id) if obs: img = obs.get("imageData") or obs.get("imageRef") if img: refs = kv.get(KV.imageRefs, img) or 0 if refs > 0: kv.set(KV.imageRefs, img, refs - 1) _bm25_index.remove(obs_id) if _vector_index: _vector_index.remove(obs_id) if evicted_memories or evicted_observations: if _index_persistence: _index_persistence.schedule_save() safe_audit( kv, "auto_forget", "mem::auto_forget", evicted_memories + [oid for _, oid in evicted_observations], { "evictedMemoriesCount": len(evicted_memories), "evictedObservationsCount": len(evicted_observations), "dryRun": False } ) commit_if_enabled(kv, f"Auto forget: evicted {len(evicted_memories)} memories, {len(evicted_observations)} observations", "system") return { "success": True, "evictedMemories": evicted_memories, "evictedObservations": [oid for _, oid in evicted_observations], "evicted": len(evicted_memories) + len(evicted_observations), "dryRun": dry_run } def health_check(kv: StateKV) -> Dict[str, Any]: db_status = "connected" try: conn = kv._get_conn() conn.close() except Exception: db_status = "disconnected" return { "status": "healthy" if db_status == "connected" else "degraded", "service": "agentmemory", "version": "0.9.8", "database": "dolt", "databaseStatus": db_status } def strip_xml_wrappers(raw: str) -> str: if not raw: return "" cleaned = raw.strip() cleaned = re.sub(r'```xml\s*\n?', '', cleaned, flags=re.IGNORECASE) cleaned = re.sub(r'```', '', cleaned) cleaned = cleaned.strip() root_match = re.search(r'(<[a-zA-Z_][a-zA-Z0-9_-]*>[\s\S]*<\/[a-zA-Z_][a-zA-Z0-9_-]*>)', cleaned) if root_match: return root_match.group(1).strip() return cleaned def get_xml_tag(text: str, tag: str) -> Optional[str]: cleaned = strip_xml_wrappers(text) pattern = rf"<{tag}>(.*?)" match = re.search(pattern, cleaned, re.DOTALL) return match.group(1).strip() if match else None def get_xml_children(text: str, parent_tag: str, child_tag: str) -> List[str]: parent_content = get_xml_tag(text, parent_tag) if not parent_content: return [] pattern = rf"<{child_tag}>(.*?)" return [m.strip() for m in re.findall(pattern, parent_content, re.DOTALL)] def generate_content(system_instruction: str, prompt: str) -> str: api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") if not api_key: raise ValueError("No Gemini/Google API key found") model = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" payload = { "contents": [ { "role": "user", "parts": [ {"text": prompt} ] } ], "systemInstruction": { "parts": [ {"text": system_instruction} ] }, "generationConfig": { "temperature": 0.2 } } req_data = json.dumps(payload).encode("utf-8") import urllib.request req = urllib.request.Request( url, data=req_data, headers={"Content-Type": "application/json"}, method="POST" ) try: with urllib.request.urlopen(req, timeout=60.0) as response: resp_data = json.loads(response.read().decode("utf-8")) candidates = resp_data.get("candidates", []) if not candidates: raise RuntimeError("Gemini generateContent returned no candidates") parts = candidates[0].get("content", {}).get("parts", []) if not parts: raise RuntimeError("Gemini generateContent candidate content had no parts") return parts[0].get("text", "") except Exception as e: raise RuntimeError(f"Gemini generateContent call failed: {e}") def summarize(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: session_id = data.get("sessionId") if not session_id: return {"success": False, "error": "sessionId is required"} session = kv.get(KV.sessions, session_id) if not session: return {"success": False, "error": "session_not_found"} observations = kv.list(KV.observations(session_id)) compressed = [o for o in observations if o.get("title")] if not compressed: return {"success": False, "error": "no_observations"} SUMMARY_SYSTEM = """You are a session summarization assistant. Your job is to read all raw tool executions and outcomes from a coding session and produce a high-fidelity summary. Output XML: Concise title summarizing the session 1-2 paragraphs of narrative describing what was done, what succeeded, and what failed Architectural decision, key insight, or choice made path/to/modified/file important concept, library, tool, or command used """ chunk_size = 400 chunks = [compressed[i:i + chunk_size] for i in range(0, len(compressed), chunk_size)] partial_summaries = [] for idx, chunk in enumerate(chunks): obs_text = "" for o in chunk: obs_text += f"[{o.get('type')}] {o.get('title')}\n{o.get('narrative') or ''}\nFiles: {', '.join(o.get('files') or [])}\n\n" prompt = f"Summarize this chunk {idx+1}/{len(chunks)} of observations:\n\n{obs_text}" try: response = generate_content(SUMMARY_SYSTEM, prompt) cleaned = strip_xml_wrappers(response) title = get_xml_tag(cleaned, "title") if not title: continue partial_summaries.append({ "title": title, "narrative": get_xml_tag(cleaned, "narrative") or "", "keyDecisions": get_xml_children(cleaned, "decisions", "decision"), "filesModified": get_xml_children(cleaned, "files", "file"), "concepts": get_xml_children(cleaned, "concepts", "concept"), }) except Exception as e: last_error = str(e) print(f"[summarize] Chunk {idx+1} failed: {e}") if not partial_summaries: return {"success": False, "error": f"No chunks summarized successfully. Last error: {last_error}"} if len(partial_summaries) == 1: final_summary = { "sessionId": session_id, "project": session.get("project"), "createdAt": datetime.datetime.utcnow().isoformat() + "Z", "title": partial_summaries[0]["title"], "narrative": partial_summaries[0]["narrative"], "keyDecisions": partial_summaries[0]["keyDecisions"], "filesModified": partial_summaries[0]["filesModified"], "concepts": partial_summaries[0]["concepts"], "observationCount": len(compressed) } else: REDUCE_SYSTEM = """You are a session summarization reducer. Reduce multiple partial chunk summaries into a single final summary. Output XML: Concise final title summarizing the entire session Comprehensive narrative describing what was done, what succeeded, and what failed Architectural decision, key insight, or choice made path/to/modified/file important concept, library, tool, or command used """ reduce_prompt = "Reduce these partial summaries:\n\n" for idx, ps in enumerate(partial_summaries): reduce_prompt += f"[Chunk {idx+1}]\nTitle: {ps['title']}\nNarrative: {ps['narrative']}\nDecisions: {', '.join(ps['keyDecisions'])}\nFiles: {', '.join(ps['filesModified'])}\nConcepts: {', '.join(ps['concepts'])}\n\n" try: response = generate_content(REDUCE_SYSTEM, reduce_prompt) cleaned = strip_xml_wrappers(response) final_summary = { "sessionId": session_id, "project": session.get("project"), "createdAt": datetime.datetime.utcnow().isoformat() + "Z", "title": get_xml_tag(cleaned, "title") or partial_summaries[0]["title"], "narrative": get_xml_tag(cleaned, "narrative") or "", "keyDecisions": get_xml_children(cleaned, "decisions", "decision"), "filesModified": get_xml_children(cleaned, "files", "file"), "concepts": get_xml_children(cleaned, "concepts", "concept"), "observationCount": len(compressed) } except Exception as e: return {"success": False, "error": f"Reduction failed: {e}"} kv.set(KV.summaries, session_id, final_summary) session = kv.get(KV.sessions, session_id) if session: session["title"] = final_summary["title"] session["summary"] = final_summary["narrative"] kv.set(KV.sessions, session_id, session) safe_audit(kv, "compress", "mem::summarize", [session_id], { "title": final_summary["title"], "observationCount": len(compressed) }) return {"success": True, "summary": final_summary} def consolidate(kv: StateKV, project: Optional[str] = None, min_observations: int = 10) -> Dict[str, Any]: sessions = list_sessions(kv) if project: sessions = [s for s in sessions if s.get("project") == project] all_obs = [] for s in sessions: obs_list = kv.list(KV.observations(s["id"])) for o in obs_list: if o.get("title") and o.get("importance", 5) >= 5: all_obs.append((o, s["id"])) if len(all_obs) < min_observations: return {"consolidated": 0, "reason": "insufficient_observations", "success": True} # Group observations by concepts concept_groups = {} for obs, sid in all_obs: concepts = obs.get("concepts") or [] for c in concepts: key = c.lower().strip() if not key: continue if key not in concept_groups: concept_groups[key] = [] concept_groups[key].append((obs, sid)) # Sort groups that have >= 3 observations by size descending sorted_groups = sorted( [(k, g) for k, g in concept_groups.items() if len(g) >= 3], key=lambda x: len(x[1]), reverse=True ) consolidated_count = 0 existing_memories = kv.list(KV.memories) MAX_LLM_CALLS = 10 llm_calls = 0 # Prompt templates CONSOLIDATION_SYSTEM = """You are a memory consolidation engine. Given a set of related observations from coding sessions, synthesize them into a single long-term memory. Output XML: pattern|preference|architecture|bug|workflow|fact Concise memory title (max 80 chars) 2-4 sentence description of the learned insight key term relevant/file/path 1-10 how confident/important this memory is """ for concept, obs_group in sorted_groups: if llm_calls >= MAX_LLM_CALLS: break # Get top 8 by importance top = sorted(obs_group, key=lambda x: x[0].get("importance", 5), reverse=True)[:8] session_ids = list(set([x[1] for x in top])) obs_ids = list(set([x[0]["id"] for x in top])) prompt_parts = [] for obs, sid in top: prompt_parts.append(f"[{obs.get('type')}] {obs.get('title')}\n{obs.get('narrative') or ''}\nFiles: {', '.join(obs.get('files') or [])}\nImportance: {obs.get('importance', 5)}") obs_prompt = "\n\n".join(prompt_parts) try: response = generate_content(CONSOLIDATION_SYSTEM, f"Concept: \"{concept}\"\n\nObservations:\n{obs_prompt}") llm_calls += 1 cleaned = strip_xml_wrappers(response) m_type = get_xml_tag(cleaned, "type") or "fact" m_title = get_xml_tag(cleaned, "title") m_content = get_xml_tag(cleaned, "content") if not m_title or not m_content: continue m_strength_str = get_xml_tag(cleaned, "strength") or "5" try: m_strength = max(1, min(10, int(m_strength_str))) except Exception: m_strength = 5 concepts_list = get_xml_children(cleaned, "concepts", "concept") files_list = get_xml_children(cleaned, "files", "file") now = datetime.datetime.utcnow().isoformat() + "Z" # Find existing memory with same title existing_match = None for mem in existing_memories: if mem.get("title", "").lower() == m_title.lower() and mem.get("isLatest") is not False: if not project or not mem.get("project") or mem.get("project") == project: existing_match = mem break if existing_match: existing_match["isLatest"] = False kv.set(KV.memories, existing_match["id"], existing_match) evolved = { "id": generate_id("mem"), "createdAt": now, "updatedAt": now, "type": m_type, "title": m_title, "content": m_content, "concepts": concepts_list, "files": files_list, "sessionIds": session_ids, "strength": m_strength, "version": (existing_match.get("version") or 1) + 1, "parentId": existing_match["id"], "supersedes": [existing_match["id"]] + (existing_match.get("supersedes") or []), "sourceObservationIds": obs_ids, "isLatest": True } if project: evolved["project"] = project kv.set(KV.memories, evolved["id"], evolved) consolidated_count += 1 else: memory = { "id": generate_id("mem"), "createdAt": now, "updatedAt": now, "type": m_type, "title": m_title, "content": m_content, "concepts": concepts_list, "files": files_list, "sessionIds": session_ids, "strength": m_strength, "version": 1, "sourceObservationIds": obs_ids, "isLatest": True } if project: memory["project"] = project kv.set(KV.memories, memory["id"], memory) consolidated_count += 1 except Exception as e: print(f"[consolidate] Concept '{concept}' failed: {e}") # === Semantic Memory Fact Merger === summaries = kv.list(KV.summaries) new_facts_count = 0 if len(summaries) >= 5: recent_summaries = sorted( summaries, key=lambda s: s.get("createdAt", ""), reverse=True )[:20] SEMANTIC_MERGE_SYSTEM = """You are a memory consolidation engine. Given overlapping episodic memories (session summaries), extract stable factual knowledge. Output format (XML): Concise factual statement Rules: - Extract only facts that appear in 2+ episodes or are highly confident - Confidence reflects how well-supported the fact is across episodes - Combine overlapping information into single concise facts - Skip ephemeral details (specific error messages, temporary states)""" prompt_parts = [] for i, s in enumerate(recent_summaries): prompt_parts.append(f"[Episode {i + 1}]\nTitle: {s.get('title')}\nNarrative: {s.get('narrative') or ''}\nConcepts: {', '.join(s.get('concepts') or [])}") merge_prompt = "Consolidate these episodic memories into stable facts:\n\n" + "\n\n".join(prompt_parts) try: response = generate_content(SEMANTIC_MERGE_SYSTEM, merge_prompt) fact_matches = re.findall(r'([^<]+)', response, re.DOTALL) existing_semantic = kv.list(KV.semantic) now = datetime.datetime.utcnow().isoformat() + "Z" for conf_str, fact_text in fact_matches: fact_text = fact_text.strip() try: confidence = float(conf_str) except Exception: confidence = 0.5 existing = None for es in existing_semantic: if es.get("fact", "").lower() == fact_text.lower(): existing = es break if existing: existing["accessCount"] = (existing.get("accessCount") or 0) + 1 existing["lastAccessedAt"] = now existing["updatedAt"] = now existing["confidence"] = max(existing.get("confidence", 0.5), confidence) kv.set(KV.semantic, existing["id"], existing) else: sem = { "id": generate_id("sem"), "fact": fact_text, "confidence": confidence, "sourceSessionIds": [s["sessionId"] for s in recent_summaries if "sessionId" in s], "sourceMemoryIds": [], "accessCount": 1, "lastAccessedAt": now, "strength": confidence, "createdAt": now, "updatedAt": now } kv.set(KV.semantic, sem["id"], sem) new_facts_count += 1 except Exception as e: print(f"[consolidate] Semantic merge failed: {e}") # === Procedural Memory Extraction === memories = kv.list(KV.memories) new_procs_count = 0 patterns = [] for m in memories: if m.get("isLatest") is not False and m.get("type") == "pattern": freq = len(m.get("sessionIds") or []) if freq >= 2: patterns.append({"content": m.get("content", ""), "frequency": freq}) if len(patterns) >= 2: PROCEDURAL_EXTRACTION_SYSTEM = """You are a procedural memory extractor. Given repeated patterns and workflows observed across sessions, extract reusable procedures. Output format (XML): Step 1 description Step 2 description Rules: - Only extract procedures observed 2+ times - Steps should be concrete and actionable - Trigger condition should be specific enough to match automatically""" prompt_parts = [] for i, p in enumerate(patterns): prompt_parts.append(f"[Pattern {i + 1}] (seen {p['frequency']}x)\n{p['content']}") proc_prompt = "Extract reusable procedures from these recurring patterns:\n\n" + "\n\n".join(prompt_parts) try: response = generate_content(PROCEDURAL_EXTRACTION_SYSTEM, proc_prompt) proc_matches = re.findall(r'([\s\S]*?)', response, re.DOTALL) existing_procs = kv.list(KV.procedural) now = datetime.datetime.utcnow().isoformat() + "Z" for name, trigger, steps_block in proc_matches: steps = [s.strip() for s in re.findall(r'([^<]+)', steps_block, re.DOTALL)] existing = None for ep in existing_procs: if ep.get("name", "").lower() == name.lower(): existing = ep break if existing: existing["frequency"] = (existing.get("frequency") or 1) + 1 existing["updatedAt"] = now existing["strength"] = min(1.0, (existing.get("strength") or 0.5) + 0.1) kv.set(KV.procedural, existing["id"], existing) else: proc = { "id": generate_id("proc"), "name": name, "steps": steps, "triggerCondition": trigger, "frequency": 1, "sourceSessionIds": [], "strength": 0.5, "createdAt": now, "updatedAt": now } kv.set(KV.procedural, proc["id"], proc) new_procs_count += 1 except Exception as e: print(f"[consolidate] Procedural extraction failed: {e}") res_summary = { "success": True, "consolidated": consolidated_count, "totalObservations": len(all_obs), "semantic": { "newFacts": new_facts_count, "totalSummaries": len(summaries) }, "procedural": { "newProcedures": new_procs_count, "patternsAnalyzed": len(patterns) } } safe_audit(kv, "consolidate", "mem::consolidate-pipeline", [], res_summary) commit_if_enabled(kv, f"Consolidation complete: consolidated={consolidated_count}, facts={new_facts_count}, procs={new_procs_count}", "system") return res_summary # Setup persistence helper wire-ups def set_index_persistence(persistence: IndexPersistence) -> None: global _index_persistence _index_persistence = persistence def set_embedding_provider(provider) -> None: global _embedding_provider, _hybrid_search _embedding_provider = provider _hybrid_search = HybridSearch( _bm25_index, _vector_index, _embedding_provider, None ) def set_stream_broadcaster(broadcaster) -> None: global _stream_broadcaster _stream_broadcaster = broadcaster def broadcast_stream(payload: Dict[str, Any]) -> None: if _stream_broadcaster: try: _stream_broadcaster(payload) except Exception as e: print(f"[broadcaster] Failed: {e}")