Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import uuid | |
| import json | |
| import hashlib | |
| import datetime | |
| from typing import Dict, Any, List, Optional, Tuple, Set | |
| from db import StateKV | |
| from search import ( | |
| SearchIndex, | |
| VectorIndex, | |
| GeminiEmbeddingProvider, | |
| HybridSearch, | |
| base64_to_float32, | |
| float32_to_base64 | |
| ) | |
| # ===================================================================== | |
| # Global Variables / Module State | |
| # ===================================================================== | |
| _bm25_index = SearchIndex() | |
| _vector_index = VectorIndex() | |
| _embedding_provider = None | |
| _hybrid_search = HybridSearch(_bm25_index, _vector_index, None, None) | |
| _index_persistence = None | |
| _stream_broadcaster = None # Callable: (payload) -> None | |
| # Default scopes matching schema.ts | |
| class KV: | |
| sessions = "mem:sessions" | |
| memories = "mem:memories" | |
| summaries = "mem:summaries" | |
| config = "mem:config" | |
| metrics = "mem:metrics" | |
| health = "mem:health" | |
| bm25Index = "mem:index:bm25" | |
| relations = "mem:relations" | |
| profiles = "mem:profiles" | |
| claudeBridge = "mem:claude-bridge" | |
| graphNodes = "mem:graph:nodes" | |
| graphEdges = "mem:graph:edges" | |
| graphSnapshot = "mem:graph:snapshot" | |
| graphNameIndex = "mem:graph:name-index" | |
| graphEdgeKey = "mem:graph:edge-key" | |
| graphNodeDegree = "mem:graph:node-degree" | |
| semantic = "mem:semantic" | |
| procedural = "mem:procedural" | |
| audit = "mem:audit" | |
| actions = "mem:actions" | |
| actionEdges = "mem:action-edges" | |
| leases = "mem:leases" | |
| routines = "mem:routines" | |
| routineRuns = "mem:routine-runs" | |
| signals = "mem:signals" | |
| checkpoints = "mem:checkpoints" | |
| mesh = "mem:mesh" | |
| sketches = "mem:sketches" | |
| facets = "mem:facets" | |
| sentinels = "mem:sentinels" | |
| crystals = "mem:crystals" | |
| lessons = "mem:lessons" | |
| insights = "mem:insights" | |
| graphEdgeHistory = "mem:graph:edge-history" | |
| retentionScores = "mem:retention" | |
| accessLog = "mem:access" | |
| imageRefs = "mem:image-refs" | |
| slots = "mem:slots" | |
| globalSlots = "mem:slots:global" | |
| commits = "mem:commits" | |
| recentSearches = "mem:recent-searches" | |
| def observations(session_id: str) -> str: | |
| return f"mem:obs:{session_id}" | |
| def team_shared(team_id: str) -> str: | |
| return f"mem:team:{team_id}:shared" | |
| def team_users(team_id: str, user_id: str) -> str: | |
| return f"mem:team:{team_id}:users:{user_id}" | |
| def team_profile(team_id: str) -> str: | |
| return f"mem:team:{team_id}:profile" | |
| def enriched_chunks(session_id: str) -> str: | |
| return f"mem:enriched:{session_id}" | |
| def latent_embeddings(obs_id: str) -> str: | |
| return f"mem:latent:{obs_id}" | |
| # ===================================================================== | |
| # Core Helpers & Utilities | |
| # ===================================================================== | |
| def generate_id(prefix: str) -> str: | |
| t = int(time.time() * 1000) | |
| chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
| ts_str = "" | |
| while t > 0: | |
| ts_str = chars[t % 36] + ts_str | |
| t //= 36 | |
| if not ts_str: | |
| ts_str = "0" | |
| rand = uuid.uuid4().hex[:12] | |
| return f"{prefix}_{ts_str}_{rand}" | |
| def fingerprint_id(prefix: str, content: str) -> str: | |
| h = hashlib.sha256(content.strip().lower().encode('utf-8')).hexdigest() | |
| return f"{prefix}_{h[:16]}" | |
| def auto_complete_old_active_sessions(kv: StateKV, current_session_id: str) -> int: | |
| sessions = kv.list(KV.sessions) | |
| count = 0 | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| for s in sessions: | |
| if s.get("id") != current_session_id and s.get("status") == "active": | |
| s["status"] = "completed" | |
| if "endedAt" not in s: | |
| s["endedAt"] = now | |
| s["updatedAt"] = now | |
| kv.set(KV.sessions, s["id"], s) | |
| count += 1 | |
| if count > 0: | |
| print(f"[session] Auto-completed {count} dangling active sessions.") | |
| return count | |
| def jaccard_similarity(a: str, b: str) -> float: | |
| tokens_a = [t for t in a.split() if len(t) > 2] | |
| tokens_b = [t for t in b.split() if len(t) > 2] | |
| set_a = set(tokens_a) | |
| set_b = set(tokens_b) | |
| if not set_a and not set_b: | |
| return 1.0 | |
| if not set_a or not set_b: | |
| return 0.0 | |
| intersection = len(set_a.intersection(set_b)) | |
| union = len(set_a.union(set_b)) | |
| return intersection / union | |
| # ===================================================================== | |
| # Privacy & Data Scrubbing | |
| # ===================================================================== | |
| PRIVATE_TAG_RE = re.compile(r'<private>[\s\S]*?</private>', re.IGNORECASE) | |
| SECRET_PATTERN_SOURCES = [ | |
| re.compile(r'(?:api[_-]?key|secret|token|password|credential|auth)[\s]*[=:]\s*["\']?[A-Za-z0-9_\-/.+]{20,}["\']?', re.IGNORECASE), | |
| re.compile(r'Bearer\s+[A-Za-z0-9._\-+/=]{20,}', re.IGNORECASE), | |
| re.compile(r'sk-proj-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), | |
| re.compile(r'(?:sk|pk|rk|ak)-[A-Za-z0-9][A-Za-z0-9\-_]{19,}', re.IGNORECASE), | |
| re.compile(r'sk-ant-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), | |
| re.compile(r'gh[pus]_[A-Za-z0-9]{36,}', re.IGNORECASE), | |
| re.compile(r'github_pat_[A-Za-z0-9_]{22,}', re.IGNORECASE), | |
| re.compile(r'xoxb-[A-Za-z0-9\-]+', re.IGNORECASE), | |
| re.compile(r'AKIA[0-9A-Z]{16}', re.IGNORECASE), | |
| re.compile(r'AIza[A-Za-z0-9\-_]{35}', re.IGNORECASE), | |
| re.compile(r'eyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}', re.IGNORECASE), | |
| re.compile(r'npm_[A-Za-z0-9]{36}', re.IGNORECASE), | |
| re.compile(r'glpat-[A-Za-z0-9\-_]{20,}', re.IGNORECASE), | |
| re.compile(r'dop_v1_[A-Za-z0-9]{64}', re.IGNORECASE), | |
| ] | |
| def strip_private_data(input_str: str) -> str: | |
| result = PRIVATE_TAG_RE.sub("[REDACTED]", input_str) | |
| for pattern in SECRET_PATTERN_SOURCES: | |
| result = pattern.sub("[REDACTED_SECRET]", result) | |
| return result | |
| # ===================================================================== | |
| # Audit Log System | |
| # ===================================================================== | |
| def record_audit( | |
| kv: StateKV, | |
| operation: str, | |
| function_id: str, | |
| target_ids: List[str], | |
| details: Dict[str, Any] = {}, | |
| quality_score: Optional[float] = None, | |
| user_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| entry = { | |
| "id": generate_id("aud"), | |
| "timestamp": datetime.datetime.utcnow().isoformat() + "Z", | |
| "operation": operation, | |
| "userId": user_id, | |
| "functionId": function_id, | |
| "targetIds": target_ids, | |
| "details": details, | |
| "qualityScore": quality_score, | |
| } | |
| kv.set(KV.audit, entry["id"], entry) | |
| return entry | |
| def safe_audit( | |
| kv: StateKV, | |
| operation: str, | |
| function_id: str, | |
| target_ids: List[str], | |
| details: Dict[str, Any] = {}, | |
| quality_score: Optional[float] = None, | |
| user_id: Optional[str] = None, | |
| ) -> None: | |
| try: | |
| record_audit(kv, operation, function_id, target_ids, details, quality_score, user_id) | |
| except Exception as e: | |
| print(f"[audit] Failed to write audit: {e}") | |
| def query_audit( | |
| kv: StateKV, | |
| filter_opts: Optional[Dict[str, Any]] = None | |
| ) -> List[Dict[str, Any]]: | |
| all_entries = kv.list(KV.audit) | |
| entries = sorted(all_entries, key=lambda x: x.get("timestamp", ""), reverse=True) | |
| if not filter_opts: | |
| return entries[:100] | |
| op = filter_opts.get("operation") | |
| if op: | |
| entries = [e for e in entries if e.get("operation") == op] | |
| import dateutil.parser | |
| date_from = filter_opts.get("dateFrom") | |
| if date_from: | |
| try: | |
| dt_from = dateutil.parser.parse(date_from).replace(tzinfo=None) | |
| filtered_entries = [] | |
| for e in entries: | |
| ts = e.get("timestamp") | |
| if ts: | |
| try: | |
| dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None) | |
| if dt_ts >= dt_from: | |
| filtered_entries.append(e) | |
| except Exception: | |
| pass | |
| entries = filtered_entries | |
| except Exception: | |
| pass | |
| date_to = filter_opts.get("dateTo") | |
| if date_to: | |
| try: | |
| dt_to = dateutil.parser.parse(date_to).replace(tzinfo=None) | |
| filtered_entries = [] | |
| for e in entries: | |
| ts = e.get("timestamp") | |
| if ts: | |
| try: | |
| dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None) | |
| if dt_ts <= dt_to: | |
| filtered_entries.append(e) | |
| except Exception: | |
| pass | |
| entries = filtered_entries | |
| except Exception: | |
| pass | |
| limit = filter_opts.get("limit", 100) | |
| return entries[:limit] | |
| # ===================================================================== | |
| # Image Store System | |
| # ===================================================================== | |
| IMAGES_DIR = os.path.join(os.path.expanduser("~"), ".agentmemory", "images") | |
| def get_max_bytes() -> int: | |
| return int(os.getenv("AGENTMEMORY_IMAGE_STORE_MAX_BYTES", 500 * 1024 * 1024)) | |
| def is_managed_image_path(file_path: str) -> bool: | |
| if not file_path: | |
| return False | |
| resolved = os.path.abspath(file_path) | |
| normalized_images_dir = os.path.abspath(IMAGES_DIR) | |
| return resolved.startswith(normalized_images_dir + os.sep) or resolved == normalized_images_dir | |
| def save_image_to_disk(base64_data: str) -> Tuple[str, int]: | |
| if not base64_data: | |
| return "", 0 | |
| if not os.path.exists(IMAGES_DIR): | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| clean_base64 = base64_data | |
| ext = "png" | |
| if base64_data.startswith("data:image/"): | |
| comma_idx = base64_data.find(",") | |
| if comma_idx != -1: | |
| meta = base64_data[:comma_idx] | |
| if "jpeg" in meta or "jpg" in meta: | |
| ext = "jpg" | |
| elif "webp" in meta: | |
| ext = "webp" | |
| elif "gif" in meta: | |
| ext = "gif" | |
| clean_base64 = base64_data[comma_idx + 1:] | |
| elif base64_data.startswith("/9j/"): | |
| ext = "jpg" | |
| h = hashlib.sha256(clean_base64.encode('utf-8')).hexdigest() | |
| file_path = os.path.join(IMAGES_DIR, f"{h}.{ext}") | |
| if os.path.exists(file_path): | |
| return file_path, 0 | |
| import base64 | |
| buffer = base64.b64decode(clean_base64) | |
| with open(file_path, "wb") as f: | |
| f.write(buffer) | |
| size = os.path.getsize(file_path) | |
| return file_path, size | |
| def delete_image(file_path: Optional[str]) -> int: | |
| if not file_path or not is_managed_image_path(file_path): | |
| return 0 | |
| try: | |
| if os.path.exists(file_path): | |
| size = os.path.getsize(file_path) | |
| os.remove(file_path) | |
| return size | |
| except Exception as e: | |
| print(f"[agentmemory] Failed to delete image context: {e}") | |
| return 0 | |
| def touch_image(file_path: str) -> None: | |
| if not file_path or not is_managed_image_path(file_path): | |
| return | |
| try: | |
| if os.path.exists(file_path): | |
| os.utime(file_path, None) | |
| except Exception: | |
| pass | |
| # ===================================================================== | |
| # Index Persistence System (JSON Sharded) | |
| # ===================================================================== | |
| class IndexPersistence: | |
| def __init__(self, kv: StateKV, bm25: SearchIndex, vector: Optional[VectorIndex]): | |
| self.kv = kv | |
| self.bm25 = bm25 | |
| self.vector = vector | |
| def schedule_save(self) -> None: | |
| self.save() | |
| def save(self) -> None: | |
| try: | |
| self.save_sharded_index( | |
| json.dumps(self.bm25.serialize_data()), | |
| "data:manifest", | |
| "data", | |
| "mem:index:bm25:bm25:" | |
| ) | |
| if self.vector: | |
| self.save_sharded_index( | |
| json.dumps(self.vector.serialize_data()), | |
| "vectors:manifest", | |
| "vectors", | |
| "mem:index:bm25:vectors:" | |
| ) | |
| except Exception as e: | |
| print(f"[index persistence] failed to save index: {e}") | |
| def save_sharded_index(self, serialized: str, manifest_key: str, legacy_key: str, scope_prefix: str) -> None: | |
| previous = self.kv.get(KV.bm25Index, manifest_key) | |
| generation = generate_id("idx") | |
| chunk_chars = 2000000 | |
| shards = [] | |
| chunks = [] | |
| offset = 0 | |
| shard_idx = 0 | |
| while offset < len(serialized): | |
| scope = f"{scope_prefix}{generation}:{str(shard_idx).zfill(5)}" | |
| chunk = serialized[offset:offset + chunk_chars] | |
| shards.append({"scope": scope, "key": "data", "chars": len(chunk)}) | |
| chunks.append(chunk) | |
| offset += chunk_chars | |
| shard_idx += 1 | |
| for shard, chunk in zip(shards, chunks): | |
| self.kv.set(shard["scope"], shard["key"], chunk) | |
| next_manifest = { | |
| "v": 1, | |
| "generation": generation, | |
| "shards": shards, | |
| "chars": len(serialized) | |
| } | |
| self.kv.set(KV.bm25Index, manifest_key, next_manifest) | |
| self.kv.delete(KV.bm25Index, legacy_key) | |
| # Cleanup ALL obsolete shards starting with scope_prefix that are NOT in the current shards | |
| try: | |
| conn = self.kv._get_conn() | |
| try: | |
| with conn.cursor() as cursor: | |
| cursor.execute( | |
| "SELECT DISTINCT scope FROM kv_store WHERE scope LIKE %s", | |
| (scope_prefix + "%",) | |
| ) | |
| rows = cursor.fetchall() | |
| current_scopes = {s["scope"] for s in shards} | |
| to_delete = [] | |
| for row in rows: | |
| scope_name = row["scope"] | |
| if scope_name not in current_scopes: | |
| to_delete.append(scope_name) | |
| if to_delete: | |
| for i in range(0, len(to_delete), 50): | |
| chunk_delete = to_delete[i:i + 50] | |
| format_strings = ','.join(['%s'] * len(chunk_delete)) | |
| cursor.execute( | |
| f"DELETE FROM kv_store WHERE scope IN ({format_strings})", | |
| tuple(chunk_delete) | |
| ) | |
| finally: | |
| conn.close() | |
| except Exception as ex: | |
| print(f"[index persistence] error cleaning up obsolete shards: {ex}") | |
| if previous and isinstance(previous, dict) and previous.get("v") == 1 and isinstance(previous.get("shards"), list): | |
| current_shards = {(s["scope"], s["key"]) for s in shards} | |
| for old_shard in previous["shards"]: | |
| if (old_shard["scope"], old_shard["key"]) not in current_shards: | |
| self.kv.delete(old_shard["scope"], old_shard["key"]) | |
| def load(self) -> Dict[str, Any]: | |
| bm25_data = self.load_sharded_data("data", "data:manifest") | |
| bm25_loaded = False | |
| if bm25_data: | |
| try: | |
| self.bm25.restore_from_data(json.loads(bm25_data)) | |
| bm25_loaded = True | |
| except Exception as e: | |
| print(f"[index persistence] failed to restore BM25: {e}") | |
| vector_loaded = False | |
| if self.vector: | |
| vector_data = self.load_sharded_data("vectors", "vectors:manifest") | |
| if vector_data: | |
| try: | |
| self.vector.restore_from_data(json.loads(vector_data)) | |
| vector_loaded = True | |
| except Exception as e: | |
| print(f"[index persistence] failed to restore vectors: {e}") | |
| return {"bm25": bm25_loaded, "vector": vector_loaded} | |
| def load_sharded_data(self, legacy_key: str, manifest_key: str) -> Optional[str]: | |
| manifest = self.kv.get(KV.bm25Index, manifest_key) | |
| if manifest and isinstance(manifest, dict) and manifest.get("v") == 1: | |
| shards = manifest.get("shards", []) | |
| chunks = [] | |
| for shard in shards: | |
| chunk = self.kv.get(shard["scope"], shard["key"]) | |
| if chunk is None: | |
| return None | |
| chunks.append(chunk) | |
| return "".join(chunks) | |
| legacy = self.kv.get(KV.bm25Index, legacy_key) | |
| if isinstance(legacy, str): | |
| return legacy | |
| return None | |
| # ===================================================================== | |
| # Vector Index / Embedding Helpers | |
| # ===================================================================== | |
| def clip_embed_input(text: str) -> str: | |
| EMBED_MAX_CHARS = 16000 | |
| if len(text) <= EMBED_MAX_CHARS: | |
| return text | |
| return text[:EMBED_MAX_CHARS] | |
| def get_agent_id() -> Optional[str]: | |
| return os.getenv("AGENT_ID") or None | |
| def commit_if_enabled(kv: StateKV, message: str, agent_id: Optional[str]) -> Optional[str]: | |
| return kv.commit_version(message, agent_id or "unknown-agent") | |
| def is_agent_scope_isolated() -> bool: | |
| return os.getenv("AGENTMEMORY_AGENT_SCOPE") == "isolated" | |
| def is_auto_compress_enabled() -> bool: | |
| return os.getenv("AGENTMEMORY_AUTO_COMPRESS") == "true" | |
| def is_slots_enabled() -> bool: | |
| return os.getenv("AGENTMEMORY_SLOTS") == "true" | |
| def is_reflect_enabled() -> bool: | |
| return os.getenv("AGENTMEMORY_REFLECT") == "true" | |
| def is_graph_extraction_enabled() -> bool: | |
| return os.getenv("GRAPH_EXTRACTION_ENABLED") == "true" | |
| def is_consolidation_enabled() -> bool: | |
| val = os.getenv("CONSOLIDATION_ENABLED") | |
| if val in ("false", "0"): | |
| return False | |
| if val in ("true", "1"): | |
| return True | |
| return bool(os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")) | |
| def vector_index_add_guarded( | |
| obs_id: str, | |
| session_id: str, | |
| text: str, | |
| context: Dict[str, Any] | |
| ) -> bool: | |
| vi = _vector_index | |
| ep = _embedding_provider | |
| if not vi or not ep: | |
| return False | |
| try: | |
| clipped = clip_embed_input(text) | |
| embedding = ep.embed(clipped) | |
| if len(embedding) != ep.dimensions: | |
| print(f"[vector-index] Dimension mismatch: expected {ep.dimensions}, got {len(embedding)}") | |
| return False | |
| vi.add(obs_id, session_id, embedding) | |
| return True | |
| except Exception as e: | |
| print(f"[vector-index] Embed failed: {e}") | |
| return False | |
| # ===================================================================== | |
| # Observation System (Observe, Synthetic Compression) | |
| # ===================================================================== | |
| def extract_image(d: Any) -> Optional[str]: | |
| if not d: | |
| return None | |
| if isinstance(d, str): | |
| if d.startswith("data:image/") or d.startswith("iVBORw0KGgo") or d.startswith("/9j/"): | |
| return d | |
| return None | |
| if isinstance(d, dict): | |
| for k in ["image_data", "image_path", "imageBase64", "imagePath"]: | |
| if isinstance(d.get(k), str): | |
| return d[k] | |
| for key, val in d.items(): | |
| match = extract_image(val) | |
| if match: | |
| return match | |
| return None | |
| def infer_type(tool_name: Optional[str], hook_type: str) -> str: | |
| if hook_type == "post_tool_failure": | |
| return "error" | |
| if hook_type == "prompt_submit": | |
| return "conversation" | |
| if hook_type in ("subagent_stop", "task_completed"): | |
| return "subagent" | |
| if hook_type == "notification": | |
| return "notification" | |
| if not tool_name: | |
| return "other" | |
| n = re.sub(r'([a-z])([A-Z])', r'\1_\2', tool_name) | |
| n = re.sub(r'[-\s]+', '_', n).lower() | |
| def has_word(word: str) -> bool: | |
| return bool(re.search(rf"(^|_){word}(_|$)", n)) or n == word or n.endswith(word) or n.startswith(word) | |
| if any(has_word(w) for w in ["fetch", "http", "web"]): | |
| return "web_fetch" | |
| if any(has_word(w) for w in ["grep", "search", "glob", "find"]): | |
| return "search" | |
| if any(has_word(w) for w in ["bash", "shell", "exec", "run"]): | |
| return "command_run" | |
| if any(has_word(w) for w in ["edit", "update", "patch", "replace"]): | |
| return "file_edit" | |
| if any(has_word(w) for w in ["write", "create"]): | |
| return "file_write" | |
| if any(has_word(w) for w in ["read", "view"]): | |
| return "file_read" | |
| if any(has_word(w) for w in ["task", "agent"]): | |
| return "subagent" | |
| return "other" | |
| def extract_files(input_data: Any) -> List[str]: | |
| if not input_data or not isinstance(input_data, dict): | |
| return [] | |
| out = set() | |
| for key in ["file_path", "filepath", "path", "filePath", "file", "pattern"]: | |
| v = input_data.get(key) | |
| if isinstance(v, str) and 0 < len(v) < 512: | |
| out.add(v) | |
| return list(out) | |
| def stringify_for_narrative(v: Any) -> str: | |
| if v is None: | |
| return "" | |
| if isinstance(v, str): | |
| return v | |
| try: | |
| return json.dumps(v) | |
| except Exception: | |
| return str(v) | |
| def build_synthetic_compression(raw: Dict[str, Any]) -> Dict[str, Any]: | |
| tool_name = raw.get("toolName") or raw.get("hookType") | |
| input_str = stringify_for_narrative(raw.get("toolInput")) | |
| output_str = stringify_for_narrative(raw.get("toolOutput")) | |
| prompt_str = raw.get("userPrompt") or "" | |
| parts = [s for s in [prompt_str, input_str, output_str] if len(s) > 0] | |
| narrative = " | ".join(parts) | |
| if len(narrative) > 400: | |
| narrative = narrative[:399] + "\u2026" | |
| title = tool_name or "observation" | |
| if len(title) > 80: | |
| title = title[:79] + "\u2026" | |
| subtitle = None | |
| if input_str: | |
| subtitle = input_str | |
| if len(subtitle) > 120: | |
| subtitle = subtitle[:119] + "\u2026" | |
| res = { | |
| "id": raw["id"], | |
| "sessionId": raw["sessionId"], | |
| "timestamp": raw["timestamp"], | |
| "type": infer_type(raw.get("toolName"), raw["hookType"]), | |
| "title": title, | |
| "subtitle": subtitle, | |
| "facts": [], | |
| "narrative": narrative, | |
| "concepts": [], | |
| "files": extract_files(raw.get("toolInput")), | |
| "importance": 5, | |
| "confidence": 0.3, | |
| } | |
| for k in ["modality", "imageData", "agentId"]: | |
| if raw.get(k) is not None: | |
| res[k] = raw[k] | |
| return res | |
| def observe(kv: StateKV, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| session_id = payload.get("sessionId") | |
| hook_type = payload.get("hookType") | |
| timestamp = payload.get("timestamp") | |
| if not session_id or not hook_type or not timestamp: | |
| raise ValueError("Invalid payload: sessionId, hookType, and timestamp are required") | |
| obs_id = generate_id("obs") | |
| sanitized_data = payload.get("data") | |
| try: | |
| json_str = json.dumps(payload.get("data")) | |
| sanitized = strip_private_data(json_str) | |
| sanitized_data = json.loads(sanitized) | |
| except Exception: | |
| sanitized_data = strip_private_data(str(payload.get("data"))) | |
| raw = { | |
| "id": obs_id, | |
| "sessionId": session_id, | |
| "timestamp": timestamp, | |
| "hookType": hook_type, | |
| "raw": sanitized_data, | |
| } | |
| extracted_img = extract_image(sanitized_data) | |
| if isinstance(sanitized_data, dict): | |
| if hook_type in ("post_tool_use", "post_tool_failure"): | |
| raw["toolName"] = sanitized_data.get("tool_name") | |
| raw["toolInput"] = sanitized_data.get("tool_input") | |
| raw["toolOutput"] = sanitized_data.get("tool_output") or sanitized_data.get("error") | |
| if hook_type == "prompt_submit": | |
| raw["userPrompt"] = sanitized_data.get("prompt") | |
| if extracted_img: | |
| raw["modality"] = "mixed" if (raw.get("toolInput") or raw.get("toolOutput") or raw.get("userPrompt")) else "image" | |
| elif isinstance(sanitized_data, str) and extracted_img: | |
| raw["modality"] = "image" | |
| max_obs = int(os.getenv("MAX_OBS_PER_SESSION", "500")) | |
| if max_obs > 0: | |
| existing = kv.list(KV.observations(session_id)) | |
| if len(existing) >= max_obs: | |
| raise ValueError(f"Session observation limit reached ({max_obs})") | |
| existing_session = kv.get(KV.sessions, session_id) | |
| inherited_agent_id = existing_session.get("agentId") if existing_session else get_agent_id() | |
| if inherited_agent_id: | |
| raw["agentId"] = inherited_agent_id | |
| if extracted_img and (extracted_img.startswith("data:image/") or extracted_img.startswith("iVBORw0KGgo") or extracted_img.startswith("/9j/")): | |
| try: | |
| file_path, bytes_written = save_image_to_disk(extracted_img) | |
| raw["imageData"] = file_path | |
| # Increment image ref count | |
| img_refs = kv.get(KV.imageRefs, file_path) or 0 | |
| kv.set(KV.imageRefs, file_path, img_refs + 1) | |
| except Exception as ex: | |
| print(f"[image store] failed: {ex}") | |
| # Set raw observation | |
| kv.set(KV.observations(session_id), obs_id, raw) | |
| # Stream raw observation | |
| broadcast_stream({ | |
| "type": "raw_observation", | |
| "sessionId": session_id, | |
| "data": { | |
| "type": "raw", | |
| "observation": raw, | |
| "sessionId": session_id | |
| } | |
| }) | |
| if existing_session: | |
| updates = [ | |
| {"type": "set", "path": "updatedAt", "value": datetime.datetime.utcnow().isoformat() + "Z"}, | |
| {"type": "set", "path": "observationCount", "value": (existing_session.get("observationCount") or 0) + 1} | |
| ] | |
| if not existing_session.get("firstPrompt") and isinstance(raw.get("userPrompt"), str): | |
| trimmed = " ".join(raw["userPrompt"].split()).strip() | |
| if trimmed: | |
| updates.append({"type": "set", "path": "firstPrompt", "value": trimmed[:200]}) | |
| kv.update(KV.sessions, session_id, updates) | |
| else: | |
| auto_complete_old_active_sessions(kv, session_id) | |
| project = payload.get("project") or "unknown" | |
| cwd = payload.get("cwd") or os.getcwd() | |
| trimmed_prompt = None | |
| if isinstance(raw.get("userPrompt"), str): | |
| trimmed_prompt = " ".join(raw["userPrompt"].split()).strip()[:200] | |
| ts = datetime.datetime.utcnow().isoformat() + "Z" | |
| new_sess = { | |
| "id": session_id, | |
| "project": project, | |
| "cwd": cwd, | |
| "startedAt": payload.get("timestamp") or ts, | |
| "updatedAt": ts, | |
| "status": "active", | |
| "observationCount": 1, | |
| } | |
| if inherited_agent_id: | |
| new_sess["agentId"] = inherited_agent_id | |
| if trimmed_prompt: | |
| new_sess["firstPrompt"] = trimmed_prompt | |
| kv.set(KV.sessions, session_id, new_sess) | |
| # Perform synthetic compression (we default to synthetic) | |
| synthetic = build_synthetic_compression(raw) | |
| for k in ["hookType", "raw", "toolName", "toolInput", "toolOutput", "userPrompt"]: | |
| if k in raw: | |
| synthetic[k] = raw[k] | |
| kv.set(KV.observations(session_id), obs_id, synthetic) | |
| _bm25_index.add(synthetic) | |
| comb_text = synthetic["title"] + " " + (synthetic.get("narrative") or "") | |
| vector_index_add_guarded(synthetic["id"], synthetic["sessionId"], comb_text, {"kind": "synthetic", "logId": synthetic["id"]}) | |
| if _index_persistence: | |
| _index_persistence.schedule_save() | |
| # Stream compressed observation | |
| broadcast_stream({ | |
| "type": "compressed_observation", | |
| "sessionId": session_id, | |
| "data": { | |
| "type": "compressed", | |
| "observation": synthetic, | |
| "sessionId": session_id | |
| } | |
| }) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Observe: {synthetic.get('title', 'observation')} in session {session_id[:8]}", synthetic.get("agentId")) | |
| return {"observationId": obs_id} | |
| # ===================================================================== | |
| # Memory System (Remember, Forget, Evolve) | |
| # ===================================================================== | |
| def memory_to_observation(memory: Dict[str, Any]) -> Dict[str, Any]: | |
| return { | |
| "id": memory["id"], | |
| "sessionId": memory.get("sessionIds", ["memory"])[0] if memory.get("sessionIds") else "memory", | |
| "timestamp": memory["createdAt"], | |
| "type": "decision", | |
| "title": memory["title"], | |
| "facts": [memory["content"]], | |
| "narrative": memory["content"], | |
| "concepts": memory.get("concepts", []), | |
| "files": memory.get("files", []), | |
| "importance": memory.get("strength", 7), | |
| } | |
| def remember(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| content = data.get("content") | |
| if not content or not content.strip(): | |
| raise ValueError("content is required") | |
| content = strip_private_data(content) | |
| concepts = data.get("concepts") or [] | |
| files = data.get("files") or [] | |
| source_obs = data.get("sourceObservationIds") or [] | |
| ttl_days = data.get("ttlDays") | |
| mem_type = data.get("type") or "fact" | |
| project = data.get("project") | |
| if project: | |
| project = project.strip() | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| existing_memories = kv.list(KV.memories) | |
| superseded_id = None | |
| superseded_version = 1 | |
| superseded_memory = None | |
| lower_content = content.lower() | |
| for existing in existing_memories: | |
| if existing.get("isLatest") is False: | |
| continue | |
| if project and existing.get("project") and existing["project"] != project: | |
| continue | |
| similarity = jaccard_similarity(lower_content, existing.get("content", "").lower()) | |
| if similarity > 0.7: | |
| superseded_id = existing["id"] | |
| superseded_version = existing.get("version") or 1 | |
| superseded_memory = existing | |
| break | |
| call_agent_id = data.get("agentId") or get_agent_id() | |
| new_mem = { | |
| "id": generate_id("mem"), | |
| "createdAt": now, | |
| "updatedAt": now, | |
| "type": mem_type, | |
| "title": content[:80], | |
| "content": content, | |
| "concepts": concepts, | |
| "files": files, | |
| "sessionIds": [], | |
| "strength": 7, | |
| "version": superseded_version + 1 if superseded_id else 1, | |
| "parentId": superseded_id, | |
| "supersedes": [superseded_id] if superseded_id else [], | |
| "sourceObservationIds": [i for i in source_obs if i], | |
| "isLatest": True, | |
| } | |
| if call_agent_id: | |
| new_mem["agentId"] = call_agent_id | |
| if project: | |
| new_mem["project"] = project | |
| if ttl_days and isinstance(ttl_days, (int, float)) and ttl_days > 0: | |
| forget_time = datetime.datetime.utcnow() + datetime.timedelta(days=ttl_days) | |
| new_mem["forgetAfter"] = forget_time.isoformat() + "Z" | |
| if superseded_memory: | |
| superseded_memory["isLatest"] = False | |
| kv.set(KV.memories, superseded_memory["id"], superseded_memory) | |
| kv.set(KV.memories, new_mem["id"], new_mem) | |
| try: | |
| _bm25_index.add(memory_to_observation(new_mem)) | |
| except Exception as ex: | |
| print(f"[bm25] memory add failed: {ex}") | |
| comb_text = new_mem["title"] + " " + new_mem["content"] | |
| vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]}) | |
| if _index_persistence: | |
| _index_persistence.schedule_save() | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Remember: {new_mem.get('title', '')}", new_mem.get("agentId")) | |
| return {"success": True, "memory": new_mem} | |
| def forget(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| memory_id = data.get("memoryId") | |
| session_id = data.get("sessionId") | |
| obs_ids = data.get("observationIds") or [] | |
| deleted = 0 | |
| deleted_mem_ids = [] | |
| deleted_obs_ids = [] | |
| deleted_session = False | |
| if memory_id: | |
| mem = kv.get(KV.memories, memory_id) | |
| kv.delete(KV.memories, memory_id) | |
| if mem and mem.get("imageRef"): | |
| ref = mem["imageRef"] | |
| refs = kv.get(KV.imageRefs, ref) or 0 | |
| if refs > 0: | |
| kv.set(KV.imageRefs, ref, refs - 1) | |
| _bm25_index.remove(memory_id) | |
| if _vector_index: | |
| _vector_index.remove(memory_id) | |
| deleted_mem_ids.append(memory_id) | |
| deleted += 1 | |
| if session_id and obs_ids: | |
| for oid in obs_ids: | |
| obs = kv.get(KV.observations(session_id), oid) | |
| kv.delete(KV.observations(session_id), oid) | |
| if obs: | |
| img = obs.get("imageData") or obs.get("imageRef") | |
| if img: | |
| refs = kv.get(KV.imageRefs, img) or 0 | |
| if refs > 0: | |
| kv.set(KV.imageRefs, img, refs - 1) | |
| _bm25_index.remove(oid) | |
| if _vector_index: | |
| _vector_index.remove(oid) | |
| deleted_obs_ids.append(oid) | |
| deleted += 1 | |
| if session_id and not obs_ids and not memory_id: | |
| obs_list = kv.list(KV.observations(session_id)) | |
| for obs in obs_list: | |
| kv.delete(KV.observations(session_id), obs["id"]) | |
| img = obs.get("imageData") or obs.get("imageRef") | |
| if img: | |
| refs = kv.get(KV.imageRefs, img) or 0 | |
| if refs > 0: | |
| kv.set(KV.imageRefs, img, refs - 1) | |
| _bm25_index.remove(obs["id"]) | |
| if _vector_index: | |
| _vector_index.remove(obs["id"]) | |
| deleted_obs_ids.append(obs["id"]) | |
| deleted += 1 | |
| kv.delete(KV.sessions, session_id) | |
| kv.delete(KV.summaries, session_id) | |
| deleted_session = True | |
| deleted += 2 | |
| if deleted > 0: | |
| if _index_persistence: | |
| _index_persistence.schedule_save() | |
| safe_audit( | |
| kv, | |
| "forget", | |
| "mem::forget", | |
| deleted_mem_ids + deleted_obs_ids, | |
| { | |
| "sessionId": session_id, | |
| "deleted": deleted, | |
| "memoriesDeleted": len(deleted_mem_ids), | |
| "observationsDeleted": len(deleted_obs_ids), | |
| "sessionDeleted": deleted_session, | |
| "reason": "user-initiated forget" | |
| } | |
| ) | |
| # Commit to Dolt | |
| agent_id = data.get("agentId") or get_agent_id() | |
| commit_if_enabled(kv, f"Forget: memory_id={memory_id} session_id={session_id}", agent_id) | |
| return {"success": True, "deleted": deleted} | |
| # ===================================================================== | |
| # Prompt Context Compilation System | |
| # ===================================================================== | |
| def estimate_tokens(text: str) -> int: | |
| return int(len(text) / 3) | |
| def escape_xml_attr(s: str) -> str: | |
| return s.replace("&", "&").replace('"', """).replace("<", "<").replace(">", ">") | |
| def context(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| session_id = data.get("sessionId") | |
| project = data.get("project") | |
| budget = data.get("budget") or int(os.getenv("TOKEN_BUDGET", "2000")) | |
| if not session_id or not project: | |
| raise ValueError("sessionId and project are required") | |
| blocks = [] | |
| # 1. Pinned Slots | |
| pinned_slots = list_pinned_slots(kv) | |
| slot_content = render_pinned_context(pinned_slots) | |
| if slot_content: | |
| blocks.append({ | |
| "type": "memory", | |
| "content": slot_content, | |
| "tokens": estimate_tokens(slot_content), | |
| "recency": int(time.time() * 1000) | |
| }) | |
| # 2. Profile | |
| profile = kv.get(KV.profiles, project) | |
| if profile: | |
| profile_parts = [] | |
| if profile.get("topConcepts"): | |
| profile_parts.append( | |
| "Concepts: " + ", ".join([c["concept"] for c in profile["topConcepts"][:8]]) | |
| ) | |
| if profile.get("topFiles"): | |
| profile_parts.append( | |
| "Key files: " + ", ".join([f["file"] for f in profile["topFiles"][:5]]) | |
| ) | |
| if profile.get("conventions"): | |
| profile_parts.append("Conventions: " + "; ".join(profile["conventions"])) | |
| if profile.get("commonErrors"): | |
| profile_parts.append("Common errors: " + "; ".join(profile["commonErrors"][:3])) | |
| if profile_parts: | |
| profile_content = f"## Project Profile\n" + "\n".join(profile_parts) | |
| blocks.append({ | |
| "type": "memory", | |
| "content": profile_content, | |
| "tokens": estimate_tokens(profile_content), | |
| "recency": int(time.time() * 1000) | |
| }) | |
| # 3. Lessons | |
| lessons = kv.list(KV.lessons) | |
| relevant_lessons = [ | |
| l for l in lessons | |
| if not l.get("deleted") and (not l.get("project") or l["project"] == project) | |
| ] | |
| # Score lessons | |
| def lesson_score(l): | |
| factor = 1.5 if l.get("project") == project else 1.0 | |
| return factor * l.get("confidence", 0.5) | |
| relevant_lessons.sort(key=lesson_score, reverse=True) | |
| relevant_lessons = relevant_lessons[:10] | |
| if relevant_lessons: | |
| items = [] | |
| for l in relevant_lessons: | |
| desc = f"- ({l['confidence']:.2f}) {l['content']}" | |
| if l.get("context"): | |
| desc += f" — {l['context']}" | |
| items.append(desc) | |
| lessons_content = "## Lessons Learned\n" + "\n".join(items) | |
| blocks.append({ | |
| "type": "memory", | |
| "content": lessons_content, | |
| "tokens": estimate_tokens(lessons_content), | |
| "recency": int(time.time() * 1000) | |
| }) | |
| # 4. Sessions & Summaries | |
| all_sessions = kv.list(KV.sessions) | |
| sessions = [ | |
| s for s in all_sessions | |
| if s.get("project") == project and s["id"] != session_id | |
| ] | |
| sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) | |
| sessions = sessions[:10] | |
| for s in sessions: | |
| summary = kv.get(KV.summaries, s["id"]) | |
| if summary: | |
| content = f"## {summary.get('title', 'Session summary')}\n{summary.get('narrative', '')}\n" \ | |
| f"Decisions: {'; '.join(summary.get('keyDecisions', []))}\n" \ | |
| f"Files: {', '.join(summary.get('filesModified', []))}" | |
| blocks.append({ | |
| "type": "summary", | |
| "content": content, | |
| "tokens": estimate_tokens(content), | |
| "recency": int(time.time() * 1000) | |
| }) | |
| else: | |
| # Fallback to important observations | |
| obs_list = kv.list(KV.observations(s["id"])) | |
| important = [o for o in obs_list if o.get("title") and o.get("importance", 0) >= 5] | |
| if important: | |
| important.sort(key=lambda o: o.get("importance", 0), reverse=True) | |
| top = important[:5] | |
| items = [f"- [{o.get('type')}] {o.get('title')}: {o.get('narrative')}" for o in top] | |
| content = f"## Session {s['id'][:8]} ({s.get('startedAt')})\n" + "\n".join(items) | |
| blocks.append({ | |
| "type": "observation", | |
| "content": content, | |
| "tokens": estimate_tokens(content), | |
| "recency": int(time.time() * 1000) | |
| }) | |
| blocks.sort(key=lambda b: b.get("recency", 0), reverse=True) | |
| header = f'<agentmemory-context project="{escape_xml_attr(project)}">' | |
| footer = "</agentmemory-context>" | |
| used_tokens = estimate_tokens(header) + estimate_tokens(footer) | |
| selected = [] | |
| for b in blocks: | |
| if used_tokens + b["tokens"] > budget: | |
| continue | |
| selected.append(b["content"]) | |
| used_tokens += b["tokens"] | |
| if not selected: | |
| return {"context": "", "blocks": 0, "tokens": 0} | |
| res_context = f"{header}\n" + "\n\n".join(selected) + f"\n{footer}" | |
| return {"context": res_context, "blocks": len(selected), "tokens": used_tokens} | |
| # ===================================================================== | |
| # Memory Slots System | |
| # ===================================================================== | |
| DEFAULT_SLOTS = [ | |
| { | |
| "label": "persona", | |
| "content": "", | |
| "sizeLimit": 1000, | |
| "description": "How the agent should see itself: role, tone, behavioural guidelines.", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "global", | |
| }, | |
| { | |
| "label": "user_preferences", | |
| "content": "", | |
| "sizeLimit": 2000, | |
| "description": "Coding style, tool preferences, naming conventions, and other habits the user wants preserved across sessions.", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "global", | |
| }, | |
| { | |
| "label": "tool_guidelines", | |
| "content": "", | |
| "sizeLimit": 1500, | |
| "description": "Rules the agent should follow when picking or sequencing tools (e.g. prefer X over Y, never run Z without confirmation).", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "global", | |
| }, | |
| { | |
| "label": "project_context", | |
| "content": "", | |
| "sizeLimit": 3000, | |
| "description": "Architecture decisions, codebase conventions, build/test commands, and cross-cutting constraints for the current project.", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "project", | |
| }, | |
| { | |
| "label": "guidance", | |
| "content": "", | |
| "sizeLimit": 1500, | |
| "description": "Active advice for the next session: what to focus on, what to avoid, open risks.", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "project", | |
| }, | |
| { | |
| "label": "pending_items", | |
| "content": "", | |
| "sizeLimit": 2000, | |
| "description": "Unfinished work, explicit TODOs, and promises made but not yet delivered.", | |
| "pinned": True, | |
| "readOnly": False, | |
| "scope": "project", | |
| }, | |
| { | |
| "label": "session_patterns", | |
| "content": "", | |
| "sizeLimit": 1500, | |
| "description": "Recurring behaviours and common struggles observed across recent sessions.", | |
| "pinned": False, | |
| "readOnly": False, | |
| "scope": "project", | |
| }, | |
| { | |
| "label": "self_notes", | |
| "content": "", | |
| "sizeLimit": 1500, | |
| "description": "Free-form notes the agent keeps for itself: hypotheses, dead ends, things to revisit.", | |
| "pinned": False, | |
| "readOnly": False, | |
| "scope": "project", | |
| }, | |
| ] | |
| def seed_defaults(kv: StateKV) -> None: | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| for tmpl in DEFAULT_SLOTS: | |
| scope = tmpl["scope"] | |
| target = KV.globalSlots if scope == "global" else KV.slots | |
| existing = kv.get(target, tmpl["label"]) | |
| if existing: | |
| continue | |
| slot = dict(tmpl) | |
| slot["createdAt"] = now | |
| slot["updatedAt"] = now | |
| kv.set(target, tmpl["label"], slot) | |
| def list_pinned_slots(kv: StateKV) -> List[Dict[str, Any]]: | |
| p_slots = kv.list(KV.slots) | |
| g_slots = kv.list(KV.globalSlots) | |
| merged = {} | |
| for s in g_slots: | |
| merged[s["label"]] = s | |
| for s in p_slots: | |
| merged[s["label"]] = s | |
| pinned = [s for s in merged.values() if s.get("pinned") and s.get("content", "").strip()] | |
| pinned.sort(key=lambda s: s["label"]) | |
| return pinned | |
| def render_pinned_context(slots: List[Dict[str, Any]]) -> str: | |
| if not slots: | |
| return "" | |
| lines = ["# agentmemory pinned slots", ""] | |
| for s in slots: | |
| lines.append(f"## {s['label']}") | |
| lines.append(s["content"].strip()) | |
| lines.append("") | |
| return "\n".join(lines) | |
| def slot_list(kv: StateKV) -> Dict[str, Any]: | |
| p_slots = kv.list(KV.slots) | |
| g_slots = kv.list(KV.globalSlots) | |
| merged = {} | |
| for s in g_slots: | |
| merged[s["label"]] = s | |
| for s in p_slots: | |
| merged[s["label"]] = s | |
| slots = sorted(list(merged.values()), key=lambda s: s["label"]) | |
| return {"success": True, "slots": slots} | |
| def slot_get(kv: StateKV, label: str) -> Dict[str, Any]: | |
| project = kv.get(KV.slots, label) | |
| if project: | |
| return {"success": True, "slot": project, "scope": "project"} | |
| global_s = kv.get(KV.globalSlots, label) | |
| if global_s: | |
| return {"success": True, "slot": global_s, "scope": "global"} | |
| return {"success": False, "error": "slot not found"} | |
| def slot_create(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| label = data.get("label") | |
| if not label or not re.match(r'^[a-z][a-z0-9_]*$', label): | |
| return {"success": False, "error": "label required (lowercase, starts with letter, [a-z0-9_])"} | |
| scope = data.get("scope") or "project" | |
| if scope not in ("project", "global"): | |
| return {"success": False, "error": "scope must be 'project' or 'global'"} | |
| limit = data.get("sizeLimit") or 2000 | |
| if not isinstance(limit, int) or limit < 1 or limit > 20000: | |
| return {"success": False, "error": "sizeLimit must be an integer between 1 and 20000"} | |
| content = strip_private_data(data.get("content") or "") | |
| if len(content) > limit: | |
| return {"success": False, "error": f"content exceeds sizeLimit ({len(content)} > {limit})"} | |
| description = data.get("description") or "" | |
| pinned = data.get("pinned", True) | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| existing = kv.get(target_kv, label) | |
| if existing: | |
| return {"success": False, "error": f"slot already exists in {scope} scope"} | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| slot = { | |
| "label": label, | |
| "content": content, | |
| "sizeLimit": limit, | |
| "description": description, | |
| "pinned": pinned, | |
| "readOnly": False, | |
| "scope": scope, | |
| "createdAt": now, | |
| "updatedAt": now, | |
| } | |
| kv.set(target_kv, label, slot) | |
| safe_audit(kv, "slot_create", "mem::slot-create", [label], {"scope": scope, "sizeLimit": limit, "pinned": pinned}) | |
| # Commit to Dolt | |
| agent_id = data.get("agentId") or get_agent_id() | |
| commit_if_enabled(kv, f"Create slot: {label}", agent_id) | |
| return {"success": True, "slot": slot} | |
| def slot_append(kv: StateKV, label: str, text: str, agent_id: Optional[str] = None) -> Dict[str, Any]: | |
| res = slot_get(kv, label) | |
| if not res.get("success"): | |
| return {"success": False, "error": "slot not found"} | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| if slot.get("readOnly"): | |
| return {"success": False, "error": "slot is read-only"} | |
| content = slot.get("content") or "" | |
| sep = "\n" if content and not content.endswith("\n") else "" | |
| next_content = content + sep + strip_private_data(text) | |
| limit = slot.get("sizeLimit") or 2000 | |
| if len(next_content) > limit: | |
| return { | |
| "success": False, | |
| "error": f"append would exceed sizeLimit ({len(next_content)} > {limit})", | |
| "currentSize": len(content), | |
| "sizeLimit": limit | |
| } | |
| slot["content"] = next_content | |
| slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" | |
| kv.set(target_kv, label, slot) | |
| safe_audit(kv, "slot_append", "mem::slot-append", [label], {"scope": scope, "added": len(text), "total": len(next_content)}) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Append slot: {label}", agent_id or get_agent_id()) | |
| return {"success": True, "slot": slot, "size": len(next_content)} | |
| def slot_replace(kv: StateKV, label: str, content: str, agent_id: Optional[str] = None) -> Dict[str, Any]: | |
| res = slot_get(kv, label) | |
| if not res.get("success"): | |
| return {"success": False, "error": "slot not found"} | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| if slot.get("readOnly"): | |
| return {"success": False, "error": "slot is read-only"} | |
| content = strip_private_data(content) | |
| limit = slot.get("sizeLimit") or 2000 | |
| if len(content) > limit: | |
| return { | |
| "success": False, | |
| "error": f"content exceeds sizeLimit ({len(content)} > {limit})", | |
| "sizeLimit": limit | |
| } | |
| before_len = len(slot.get("content") or "") | |
| slot["content"] = content | |
| slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" | |
| kv.set(target_kv, label, slot) | |
| safe_audit(kv, "slot_replace", "mem::slot-replace", [label], {"scope": scope, "before": before_len, "after": len(content)}) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Replace slot: {label}", agent_id or get_agent_id()) | |
| return {"success": True, "slot": slot, "size": len(content)} | |
| def slot_delete(kv: StateKV, label: str, agent_id: Optional[str] = None) -> Dict[str, Any]: | |
| res = slot_get(kv, label) | |
| if not res.get("success"): | |
| return {"success": False, "error": "slot not found"} | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| if slot.get("readOnly"): | |
| return {"success": False, "error": "slot is read-only"} | |
| kv.delete(target_kv, label) | |
| safe_audit(kv, "slot_delete", "mem::slot-delete", [label], {"scope": scope, "size": len(slot.get("content") or "")}) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Delete slot: {label}", agent_id or get_agent_id()) | |
| return {"success": True} | |
| def slot_reflect(kv: StateKV, session_id: str, max_obs: int = 50) -> Dict[str, Any]: | |
| observations = kv.list(KV.observations(session_id)) | |
| if not observations: | |
| return {"success": True, "applied": 0, "reason": "no observations for session"} | |
| recent = sorted(observations, key=lambda x: x.get("timestamp", ""), reverse=True)[:max_obs] | |
| pending_lines = [] | |
| pattern_counts = {} | |
| files = set() | |
| for obs in recent: | |
| title = (obs.get("title") or "").lower() | |
| narrative = (obs.get("narrative") or "").lower() | |
| if "todo" in narrative or "todo" in title: | |
| pending_lines.append(f"- {obs.get('title') or obs['id']}") | |
| if obs.get("type") == "error": | |
| pattern_counts["errors"] = pattern_counts.get("errors", 0) + 1 | |
| if obs.get("type") == "command_run": | |
| pattern_counts["commands"] = pattern_counts.get("commands", 0) + 1 | |
| for f in obs.get("files") or []: | |
| files.add(f) | |
| applied = 0 | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| if pending_lines: | |
| res = slot_get(kv, "pending_items") | |
| if res.get("success"): | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = scopeKv = KV.globalSlots if scope == "global" else KV.slots | |
| already = set((slot.get("content") or "").split("\n")) | |
| fresh = [l for l in pending_lines if l not in already] | |
| if fresh: | |
| sep = "\n" if slot.get("content") and not slot["content"].endswith("\n") else "" | |
| next_content = (slot.get("content") or "") + sep + "\n".join(fresh) | |
| limit = slot.get("sizeLimit") or 2000 | |
| if len(next_content) > limit: | |
| next_content = next_content[-limit:] | |
| slot["content"] = next_content | |
| slot["updatedAt"] = now | |
| kv.set(target_kv, "pending_items", slot) | |
| applied += 1 | |
| if pattern_counts: | |
| res = slot_get(kv, "session_patterns") | |
| if res.get("success"): | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| summary = [f"last reflection: {now}"] | |
| for k, v in pattern_counts.items(): | |
| summary.append(f"- {k}: {v} in last {len(recent)} observations") | |
| next_content = "\n".join(summary) | |
| limit = slot.get("sizeLimit") or 2000 | |
| if len(next_content) > limit: | |
| next_content = next_content[:limit] | |
| slot["content"] = next_content | |
| slot["updatedAt"] = now | |
| kv.set(target_kv, "session_patterns", slot) | |
| applied += 1 | |
| if files: | |
| res = slot_get(kv, "project_context") | |
| if res.get("success"): | |
| slot = res["slot"] | |
| scope = res["scope"] | |
| target_kv = KV.globalSlots if scope == "global" else KV.slots | |
| already = slot.get("content") or "" | |
| fresh = [f for f in files if f not in already][:20] | |
| if fresh: | |
| header_line = "Files touched in recent sessions:" if not already else "" | |
| sep = "\n" if already and not already.endswith("\n") else "" | |
| lines = [already] | |
| if header_line: | |
| lines.append(header_line) | |
| for f in fresh: | |
| lines.append(f"- {f}") | |
| next_content = sep.join([l for l in lines if l]) | |
| limit = slot.get("sizeLimit") or 2000 | |
| if len(next_content) > limit: | |
| next_content = next_content[-limit:] | |
| slot["content"] = next_content | |
| slot["updatedAt"] = now | |
| kv.set(target_kv, "project_context", slot) | |
| applied += 1 | |
| if applied > 0: | |
| safe_audit(kv, "slot_reflect", "mem::slot-reflect", [session_id], {"observationCount": len(recent), "slotsUpdated": applied}) | |
| commit_if_enabled(kv, f"Slot reflect: updated {applied} slots in session {session_id[:8]}", "system") | |
| return {"success": True, "applied": applied, "observationsReviewed": len(recent)} | |
| # ===================================================================== | |
| # Lessons Learned System | |
| # ===================================================================== | |
| def reinforce_lesson(lesson: Dict[str, Any]) -> None: | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| lesson["reinforcements"] = lesson.get("reinforcements", 0) + 1 | |
| conf = lesson.get("confidence", 0.5) | |
| lesson["confidence"] = min(1.0, conf + 0.1 * (1 - conf)) | |
| lesson["lastReinforcedAt"] = now | |
| lesson["updatedAt"] = now | |
| def lesson_save(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| content = data.get("content") | |
| if not content or not content.strip(): | |
| return {"success": False, "error": "content is required"} | |
| content = strip_private_data(content) | |
| context_str = strip_private_data(data.get("context") or "") | |
| agent_id = data.get("agentId") or get_agent_id() | |
| fp = fingerprint_id("lsn", content) | |
| existing = kv.get(KV.lessons, fp) | |
| if existing and not existing.get("deleted"): | |
| reinforce_lesson(existing) | |
| if context_str and not existing.get("context"): | |
| existing["context"] = context_str | |
| kv.set(KV.lessons, existing["id"], existing) | |
| safe_audit(kv, "lesson_strengthen", "mem::lesson-save", [existing["id"]]) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Strengthen lesson: {existing.get('content', '')[:60]}", agent_id) | |
| return {"success": True, "action": "strengthened", "lesson": existing} | |
| confidence = data.get("confidence") | |
| if not isinstance(confidence, (int, float)) or confidence < 0 or confidence > 1: | |
| confidence = 0.5 | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| lesson = { | |
| "id": fp, | |
| "content": content.strip(), | |
| "context": context_str.strip(), | |
| "confidence": confidence, | |
| "reinforcements": 0, | |
| "source": data.get("source") or "manual", | |
| "sourceIds": data.get("sourceIds") or [], | |
| "project": data.get("project"), | |
| "tags": data.get("tags") or [], | |
| "createdAt": now, | |
| "updatedAt": now, | |
| "decayRate": 0.05, | |
| } | |
| kv.set(KV.lessons, lesson["id"], lesson) | |
| safe_audit(kv, "lesson_save", "mem::lesson-save", [lesson["id"]]) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Create lesson: {lesson['content'][:60]}", agent_id) | |
| return {"success": True, "action": "created", "lesson": lesson} | |
| def lesson_list(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| limit = data.get("limit") or 50 | |
| min_confidence = data.get("minConfidence") or 0.0 | |
| all_lessons = kv.list(KV.lessons) | |
| lessons = [ | |
| l for l in all_lessons | |
| if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence | |
| ] | |
| project = data.get("project") | |
| if project: | |
| lessons = [l for l in lessons if l.get("project") == project] | |
| source = data.get("source") | |
| if source: | |
| lessons = [l for l in lessons if l.get("source") == source] | |
| lessons.sort(key=lambda x: x.get("confidence", 0.5), reverse=True) | |
| return {"success": True, "lessons": lessons[:limit]} | |
| def lesson_recall(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| query = data.get("query") | |
| if not query or not query.strip(): | |
| return {"success": False, "error": "query is required"} | |
| query_lower = query.lower() | |
| min_confidence = data.get("minConfidence") or 0.1 | |
| limit = data.get("limit") or 10 | |
| all_lessons = kv.list(KV.lessons) | |
| lessons = [ | |
| l for l in all_lessons | |
| if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence | |
| ] | |
| project = data.get("project") | |
| if project: | |
| lessons = [l for l in lessons if l.get("project") == project] | |
| scored = [] | |
| terms = [t for t in query_lower.split() if len(t) > 1] | |
| for l in lessons: | |
| text = f"{l.get('content', '')} {l.get('context', '')} {' '.join(l.get('tags') or [])}".lower() | |
| match_count = sum(1 for t in terms if t in text) | |
| if match_count == 0: | |
| continue | |
| relevance = match_count / len(terms) | |
| baseline = l.get("lastReinforcedAt") or l.get("createdAt") | |
| import dateutil.parser | |
| dt = dateutil.parser.parse(baseline) | |
| days = (datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24) | |
| recency_boost = 1 / (1 + days * 0.01) | |
| score = l.get("confidence", 0.5) * relevance * recency_boost | |
| scored.append({"lesson": l, "score": score}) | |
| scored.sort(key=lambda x: x["score"], reverse=True) | |
| results = [] | |
| for s in scored[:limit]: | |
| item = dict(s["lesson"]) | |
| item["score"] = round(s["score"], 3) | |
| results.append(item) | |
| safe_audit(kv, "lesson_recall", "mem::lesson-recall", [], {"query": query, "resultCount": len(results)}) | |
| return {"success": True, "lessons": results} | |
| def lesson_strengthen(kv: StateKV, lesson_id: str) -> Dict[str, Any]: | |
| lesson = kv.get(KV.lessons, lesson_id) | |
| if not lesson or lesson.get("deleted"): | |
| return {"success": False, "error": "lesson not found"} | |
| reinforce_lesson(lesson) | |
| kv.set(KV.lessons, lesson["id"], lesson) | |
| safe_audit(kv, "lesson_strengthen", "mem::lesson-strengthen", [lesson["id"]]) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Strengthen lesson: {lesson.get('content', '')[:60]}", get_agent_id()) | |
| return {"success": True, "lesson": lesson} | |
| def lesson_decay_sweep(kv: StateKV) -> Dict[str, Any]: | |
| all_lessons = kv.list(KV.lessons) | |
| decayed = 0 | |
| soft_deleted = 0 | |
| now = datetime.datetime.utcnow() | |
| timestamp = now.isoformat() + "Z" | |
| for l in all_lessons: | |
| if l.get("deleted"): | |
| continue | |
| baseline_str = l.get("lastDecayedAt") or l.get("lastReinforcedAt") or l["createdAt"] | |
| import dateutil.parser | |
| dt = dateutil.parser.parse(baseline_str) | |
| weeks = (now.replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24 * 7) | |
| if weeks < 1.0: | |
| continue | |
| decay = l.get("decayRate", 0.05) * weeks | |
| new_conf = max(0.05, l.get("confidence", 0.5) - decay) | |
| if new_conf != l.get("confidence"): | |
| before = l.get("confidence", 0.5) | |
| l["confidence"] = round(new_conf, 3) | |
| l["lastDecayedAt"] = timestamp | |
| l["updatedAt"] = timestamp | |
| if l["confidence"] <= 0.1 and l.get("reinforcements", 0) == 0: | |
| l["deleted"] = True | |
| soft_deleted += 1 | |
| else: | |
| decayed += 1 | |
| kv.set(KV.lessons, l["id"], l) | |
| safe_audit(kv, "lesson_strengthen", "mem::lesson-decay-sweep", [l["id"]], { | |
| "action": "soft-delete" if l.get("deleted") else "decay", | |
| "actor": "system", | |
| "reason": "decay-sweep", | |
| "before": {"confidence": before, "deleted": False}, | |
| "after": {"confidence": l["confidence"], "deleted": bool(l.get("deleted"))} | |
| }) | |
| if decayed > 0 or soft_deleted > 0: | |
| commit_if_enabled(kv, f"Lesson decay sweep: decayed {decayed}, soft-deleted {soft_deleted}", "system") | |
| return {"success": True, "decayed": decayed, "softDeleted": soft_deleted, "total": len(all_lessons)} | |
| # ===================================================================== | |
| # Database Rebuilder (Index Bootstrapper) | |
| # ===================================================================== | |
| def rebuild_index(kv: StateKV) -> int: | |
| _bm25_index.clear() | |
| if _vector_index: | |
| _vector_index.clear() | |
| # Backfill BM25 with observations | |
| sessions = kv.list(KV.sessions) | |
| total_indexed = 0 | |
| for sess in sessions: | |
| sid = sess.get("id") | |
| if not sid: | |
| continue | |
| obs_list = kv.list(KV.observations(sid)) | |
| for obs in obs_list: | |
| # Only index compressed (non-raw) observations | |
| if obs.get("title") and obs.get("narrative"): | |
| _bm25_index.add(obs) | |
| comb_text = obs["title"] + " " + obs["narrative"] | |
| vector_index_add_guarded(obs["id"], sid, comb_text, {"kind": "observation", "logId": obs["id"]}) | |
| total_indexed += 1 | |
| # Backfill BM25 with memories | |
| memories = kv.list(KV.memories) | |
| for mem in memories: | |
| if mem.get("isLatest") is False: | |
| continue | |
| if not mem.get("title") or not mem.get("content"): | |
| continue | |
| converted = memory_to_observation(mem) | |
| _bm25_index.add(converted) | |
| comb_text = mem["title"] + " " + mem["content"] | |
| vector_index_add_guarded(mem["id"], "memory", comb_text, {"kind": "memory", "logId": mem["id"]}) | |
| total_indexed += 1 | |
| if _index_persistence and total_indexed > 0: | |
| _index_persistence.schedule_save() | |
| return total_indexed | |
| # ===================================================================== | |
| # Advanced Function Stubs / CRUD Operations | |
| # ===================================================================== | |
| def list_sessions(kv: StateKV) -> List[Dict[str, Any]]: | |
| sessions = kv.list(KV.sessions) | |
| for s in sessions: | |
| sid = s.get("id") | |
| if sid: | |
| summary = kv.get(KV.summaries, sid) | |
| if summary: | |
| s["title"] = summary.get("title") | |
| s["summary"] = summary.get("narrative") | |
| sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) | |
| return sessions | |
| def get_session(kv: StateKV, session_id: str) -> Optional[Dict[str, Any]]: | |
| s = kv.get(KV.sessions, session_id) | |
| if s: | |
| summary = kv.get(KV.summaries, session_id) | |
| if summary: | |
| s["title"] = summary.get("title") | |
| s["summary"] = summary.get("narrative") | |
| return s | |
| def create_session(kv: StateKV, session: Dict[str, Any]) -> Dict[str, Any]: | |
| auto_complete_old_active_sessions(kv, session["id"]) | |
| kv.set(KV.sessions, session["id"], session) | |
| return session | |
| def end_session(kv: StateKV, session_id: str) -> bool: | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| kv.update(KV.sessions, session_id, [ | |
| {"type": "set", "path": "endedAt", "value": now}, | |
| {"type": "set", "path": "status", "value": "completed"} | |
| ]) | |
| return True | |
| def timeline(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| # Simple timeline query returning observations sorted by timestamp | |
| anchor = data.get("anchor") | |
| project = data.get("project") | |
| session_id = data.get("sessionId") | |
| before = data.get("before") or 10 | |
| after = data.get("after") or 10 | |
| sessions = kv.list(KV.sessions) | |
| if session_id: | |
| sessions = [s for s in sessions if s.get("id") == session_id] | |
| elif project: | |
| sessions = [s for s in sessions if s.get("project") == project] | |
| all_obs = [] | |
| for s in sessions: | |
| all_obs.extend(kv.list(KV.observations(s["id"]))) | |
| # sort by timestamp | |
| all_obs.sort(key=lambda x: x.get("timestamp", "")) | |
| anchor_idx = -1 | |
| for idx, obs in enumerate(all_obs): | |
| if obs["id"] == anchor or obs.get("timestamp", "") >= (anchor or ""): | |
| anchor_idx = idx | |
| break | |
| if anchor_idx == -1: | |
| anchor_idx = len(all_obs) // 2 | |
| start = max(0, anchor_idx - before) | |
| end = min(len(all_obs), anchor_idx + after + 1) | |
| return { | |
| "success": True, | |
| "observations": all_obs[start:end], | |
| "anchorIndex": anchor_idx - start | |
| } | |
| def get_project_profile(kv: StateKV, project: str) -> Dict[str, Any]: | |
| prof = kv.get(KV.profiles, project) | |
| if not prof: | |
| prof = { | |
| "project": project, | |
| "topConcepts": [], | |
| "topFiles": [], | |
| "conventions": [], | |
| "commonErrors": [], | |
| "updatedAt": datetime.datetime.utcnow().isoformat() + "Z" | |
| } | |
| if not prof.get("topConcepts") and not prof.get("topFiles"): | |
| prof = build_project_profile(kv, project) | |
| return prof | |
| def build_project_profile(kv: StateKV, project: str) -> Dict[str, Any]: | |
| prof = kv.get(KV.profiles, project) | |
| if not prof: | |
| prof = { | |
| "project": project, | |
| "topConcepts": [], | |
| "topFiles": [], | |
| "conventions": [], | |
| "commonErrors": [], | |
| "updatedAt": datetime.datetime.utcnow().isoformat() + "Z" | |
| } | |
| # Stored profile may lack topConcepts/topFiles — compute from observations + memories if empty | |
| if not prof.get("topConcepts") and not prof.get("topFiles"): | |
| import re as _re, json as _j, os.path as _osp | |
| from collections import Counter | |
| sessions = kv.list(KV.sessions) | |
| project_sessions = [s for s in sessions if s.get("project") == project] | |
| concept_counts = Counter() | |
| file_counts = Counter() | |
| def _harvest_file(path, fc, cc): | |
| if not isinstance(path, str) or not path: | |
| return | |
| fc[path] += 1 | |
| parts = _re.split(r'[\\/]', path) | |
| fname = parts[-1] if parts else "" | |
| skip = {"tmp", "temp", "claude", "appdata", "local", "users", "windows"} | |
| for part in parts[:-1]: | |
| p = part.lower().strip() | |
| if p and len(p) > 2 and p not in skip and not _re.match(r'^[a-z]:|^\.|^--', p): | |
| cc[p] += 1 | |
| stem = _osp.splitext(fname)[0] | |
| if stem and len(stem) > 2: | |
| cc[stem.lower()] += 1 | |
| ext = _osp.splitext(fname)[1].lstrip(".") | |
| if ext in ("py", "ts", "js", "jsx", "tsx", "go", "rs", "java", "cs", "cpp"): | |
| cc[ext] += 1 | |
| for s in project_sessions: | |
| sid = s.get("id", "") | |
| if not sid: | |
| continue | |
| for o in kv.list(KV.observations(sid)): | |
| for c in (o.get("concepts") or []): | |
| if isinstance(c, str) and c: | |
| concept_counts[c] += 1 | |
| for f in (o.get("files") or []): | |
| _harvest_file(f, file_counts, concept_counts) | |
| tn = o.get("toolName") | |
| if tn: | |
| concept_counts[tn] += 1 | |
| ti = o.get("toolInput") | |
| if isinstance(ti, str): | |
| try: ti = _j.loads(ti) | |
| except Exception: ti = {} | |
| if isinstance(ti, dict): | |
| for fk in ("path", "file_path", "file", "filename"): | |
| _harvest_file(ti.get(fk, ""), file_counts, concept_counts) | |
| narr = o.get("narrative") or o.get("raw") or "" | |
| if isinstance(narr, str) and narr.startswith("{"): | |
| try: | |
| nd = _j.loads(narr) | |
| if isinstance(nd, dict): | |
| tn2 = nd.get("toolName") or nd.get("tool_name") | |
| if tn2: concept_counts[tn2] += 1 | |
| for fk in ("path", "file_path", "file", "filename"): | |
| _harvest_file(nd.get(fk, ""), file_counts, concept_counts) | |
| except Exception: | |
| pass | |
| # memories for this project | |
| for m in kv.list(KV.memories): | |
| if m.get("project") == project: | |
| for c in (m.get("concepts") or []): | |
| if c: concept_counts[c] += 1 | |
| for f in (m.get("files") or []): | |
| _harvest_file(f, file_counts, concept_counts) | |
| prof["topConcepts"] = [{"concept": c, "frequency": n} for c, n in concept_counts.most_common(20)] | |
| prof["topFiles"] = [{"file": f, "frequency": n} for f, n in file_counts.most_common(20)] | |
| prof["sessionCount"] = len(project_sessions) | |
| return prof | |
| def export_data(kv: StateKV, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| if data is None: | |
| data = {} | |
| raw_max = data.get("maxSessions") | |
| max_sessions = None | |
| if raw_max is not None: | |
| try: | |
| max_sessions = min(max(int(raw_max), 1), 1000) | |
| except Exception: | |
| pass | |
| raw_offset = data.get("offset") | |
| offset = 0 | |
| if raw_offset is not None: | |
| try: | |
| offset = max(int(raw_offset), 0) | |
| except Exception: | |
| pass | |
| all_sessions = kv.list(KV.sessions) | |
| all_sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True) | |
| if max_sessions is not None: | |
| paginated_sessions = all_sessions[offset:offset + max_sessions] | |
| else: | |
| paginated_sessions = all_sessions | |
| memories = kv.list(KV.memories) | |
| summaries = kv.list(KV.summaries) | |
| observations = {} | |
| for s in paginated_sessions: | |
| sid = s.get("id") | |
| if sid: | |
| obs = kv.list(KV.observations(sid)) | |
| if obs: | |
| observations[sid] = obs | |
| profiles = [] | |
| unique_projects = list(set(s.get("project") for s in paginated_sessions if s.get("project"))) | |
| for proj in unique_projects: | |
| p = kv.get(KV.profiles, proj) | |
| if p: | |
| profiles.append(p) | |
| graph_nodes = kv.list(KV.graphNodes) | |
| graph_edges = kv.list(KV.graphEdges) | |
| semantic_memories = kv.list(KV.semantic) | |
| procedural_memories = kv.list(KV.procedural) | |
| actions = kv.list(KV.actions) | |
| action_edges = kv.list(KV.actionEdges) | |
| sentinels = kv.list(KV.sentinels) | |
| sketches = kv.list(KV.sketches) | |
| crystals = kv.list(KV.crystals) | |
| facets = kv.list(KV.facets) | |
| lessons = kv.list(KV.lessons) | |
| insights = kv.list(KV.insights) | |
| routines = kv.list(KV.routines) | |
| signals = kv.list(KV.signals) | |
| checkpoints = kv.list(KV.checkpoints) | |
| access_logs = kv.list(KV.accessLog) | |
| res = { | |
| "version": "0.9.21", | |
| "exportedAt": datetime.datetime.utcnow().isoformat() + "Z", | |
| "sessions": paginated_sessions, | |
| "observations": observations, | |
| "memories": memories, | |
| "summaries": summaries | |
| } | |
| if profiles: res["profiles"] = profiles | |
| if graph_nodes: res["graphNodes"] = graph_nodes | |
| if graph_edges: res["graphEdges"] = graph_edges | |
| if semantic_memories: res["semanticMemories"] = semantic_memories | |
| if procedural_memories: res["proceduralMemories"] = procedural_memories | |
| if actions: res["actions"] = actions | |
| if action_edges: res["actionEdges"] = action_edges | |
| if sentinels: res["sentinels"] = sentinels | |
| if sketches: res["sketches"] = sketches | |
| if crystals: res["crystals"] = crystals | |
| if facets: res["facets"] = facets | |
| if lessons: res["lessons"] = lessons | |
| if insights: res["insights"] = insights | |
| if routines: res["routines"] = routines | |
| if signals: res["signals"] = signals | |
| if checkpoints: res["checkpoints"] = checkpoints | |
| if access_logs: res["accessLogs"] = access_logs | |
| if max_sessions is not None: | |
| res["pagination"] = { | |
| "offset": offset, | |
| "limit": max_sessions, | |
| "total": len(all_sessions), | |
| "hasMore": offset + max_sessions < len(all_sessions) | |
| } | |
| return res | |
| def set_project_profile(kv: StateKV, project: str, profile: Dict[str, Any]) -> Dict[str, Any]: | |
| profile["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z" | |
| kv.set(KV.profiles, project, profile) | |
| # Commit to Dolt | |
| commit_if_enabled(kv, f"Set project profile for {project}", get_agent_id()) | |
| return profile | |
| def get_relations(kv: StateKV) -> List[Dict[str, Any]]: | |
| return kv.list(KV.relations) | |
| def add_relation(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| rel = { | |
| "id": generate_id("rel"), | |
| "sourceId": data["sourceId"], | |
| "targetId": data["targetId"], | |
| "type": data["type"], | |
| "createdAt": datetime.datetime.utcnow().isoformat() + "Z" | |
| } | |
| kv.set(KV.relations, rel["id"], rel) | |
| # Commit to Dolt | |
| agent_id = data.get("agentId") or get_agent_id() | |
| commit_if_enabled(kv, f"Add relation {rel['type']} between {rel['sourceId']} and {rel['targetId']}", agent_id) | |
| return rel | |
| def evolve_memory(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| # Update memory content and create a new version | |
| mem_id = data["memoryId"] | |
| new_content = data["newContent"] | |
| new_title = data.get("newTitle") | |
| existing = kv.get(KV.memories, mem_id) | |
| if not existing: | |
| raise ValueError("Memory not found") | |
| existing["isLatest"] = False | |
| kv.set(KV.memories, existing["id"], existing) | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| new_mem = dict(existing) | |
| new_mem["id"] = generate_id("mem") | |
| new_mem["content"] = new_content | |
| if new_title: | |
| new_mem["title"] = new_title | |
| else: | |
| new_mem["title"] = new_content[:80] | |
| new_mem["version"] = existing.get("version", 1) + 1 | |
| new_mem["parentId"] = existing["id"] | |
| new_mem["supersedes"] = [existing["id"]] | |
| new_mem["createdAt"] = now | |
| new_mem["updatedAt"] = now | |
| new_mem["isLatest"] = True | |
| kv.set(KV.memories, new_mem["id"], new_mem) | |
| # Re-index | |
| try: | |
| _bm25_index.add(memory_to_observation(new_mem)) | |
| _bm25_index.remove(existing["id"]) | |
| except Exception: | |
| pass | |
| comb_text = new_mem["title"] + " " + new_mem["content"] | |
| vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]}) | |
| if _vector_index: | |
| _vector_index.remove(existing["id"]) | |
| if _index_persistence: | |
| _index_persistence.schedule_save() | |
| # Commit to Dolt | |
| agent_id = data.get("agentId") or get_agent_id() or new_mem.get("agentId") | |
| commit_if_enabled(kv, f"Evolve memory {new_mem['id']} (v{new_mem['version']}): {new_mem['title']}", agent_id) | |
| return {"success": True, "memory": new_mem} | |
| def auto_forget(kv: StateKV, dry_run: bool = False) -> Dict[str, Any]: | |
| now_dt = datetime.datetime.utcnow() | |
| now_str = now_dt.isoformat() + "Z" | |
| evicted_memories = [] | |
| evicted_observations = [] | |
| # 1. Evict expired memories | |
| memories = kv.list(KV.memories) | |
| for mem in memories: | |
| forget_after = mem.get("forgetAfter") | |
| if forget_after: | |
| try: | |
| import dateutil.parser | |
| fa_dt = dateutil.parser.parse(forget_after) | |
| if fa_dt.tzinfo: | |
| fa_dt = fa_dt.replace(tzinfo=None) | |
| if fa_dt < now_dt: | |
| evicted_memories.append(mem["id"]) | |
| except Exception as e: | |
| print(f"[auto_forget] Failed to parse forgetAfter '{forget_after}': {e}") | |
| # 2. Evict low-value old observations (importance <= 2, age > 180 days) | |
| sessions = kv.list(KV.sessions) | |
| for sess in sessions: | |
| sid = sess.get("id") | |
| if not sid: | |
| continue | |
| obs_list = kv.list(KV.observations(sid)) | |
| for obs in obs_list: | |
| importance = obs.get("importance") | |
| ts = obs.get("timestamp") | |
| if importance is not None and ts: | |
| try: | |
| import dateutil.parser | |
| ts_dt = dateutil.parser.parse(ts) | |
| if ts_dt.tzinfo: | |
| ts_dt = ts_dt.replace(tzinfo=None) | |
| age_days = (now_dt - ts_dt).days | |
| if importance <= 2 and age_days > 180: | |
| evicted_observations.append((sid, obs["id"])) | |
| except Exception as e: | |
| print(f"[auto_forget] Failed to parse timestamp '{ts}': {e}") | |
| if not dry_run: | |
| for mem_id in evicted_memories: | |
| mem = kv.get(KV.memories, mem_id) | |
| kv.delete(KV.memories, mem_id) | |
| if mem and mem.get("imageRef"): | |
| ref = mem["imageRef"] | |
| refs = kv.get(KV.imageRefs, ref) or 0 | |
| if refs > 0: | |
| kv.set(KV.imageRefs, ref, refs - 1) | |
| _bm25_index.remove(mem_id) | |
| if _vector_index: | |
| _vector_index.remove(mem_id) | |
| for sid, obs_id in evicted_observations: | |
| obs = kv.get(KV.observations(sid), obs_id) | |
| kv.delete(KV.observations(sid), obs_id) | |
| if obs: | |
| img = obs.get("imageData") or obs.get("imageRef") | |
| if img: | |
| refs = kv.get(KV.imageRefs, img) or 0 | |
| if refs > 0: | |
| kv.set(KV.imageRefs, img, refs - 1) | |
| _bm25_index.remove(obs_id) | |
| if _vector_index: | |
| _vector_index.remove(obs_id) | |
| if evicted_memories or evicted_observations: | |
| if _index_persistence: | |
| _index_persistence.schedule_save() | |
| safe_audit( | |
| kv, | |
| "auto_forget", | |
| "mem::auto_forget", | |
| evicted_memories + [oid for _, oid in evicted_observations], | |
| { | |
| "evictedMemoriesCount": len(evicted_memories), | |
| "evictedObservationsCount": len(evicted_observations), | |
| "dryRun": False | |
| } | |
| ) | |
| commit_if_enabled(kv, f"Auto forget: evicted {len(evicted_memories)} memories, {len(evicted_observations)} observations", "system") | |
| return { | |
| "success": True, | |
| "evictedMemories": evicted_memories, | |
| "evictedObservations": [oid for _, oid in evicted_observations], | |
| "evicted": len(evicted_memories) + len(evicted_observations), | |
| "dryRun": dry_run | |
| } | |
| def health_check(kv: StateKV) -> Dict[str, Any]: | |
| db_status = "connected" | |
| try: | |
| conn = kv._get_conn() | |
| conn.close() | |
| except Exception: | |
| db_status = "disconnected" | |
| return { | |
| "status": "healthy" if db_status == "connected" else "degraded", | |
| "service": "agentmemory", | |
| "version": "0.9.8", | |
| "database": "dolt", | |
| "databaseStatus": db_status | |
| } | |
| def strip_xml_wrappers(raw: str) -> str: | |
| if not raw: | |
| return "" | |
| cleaned = raw.strip() | |
| cleaned = re.sub(r'```xml\s*\n?', '', cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r'```', '', cleaned) | |
| cleaned = cleaned.strip() | |
| root_match = re.search(r'(<[a-zA-Z_][a-zA-Z0-9_-]*>[\s\S]*<\/[a-zA-Z_][a-zA-Z0-9_-]*>)', cleaned) | |
| if root_match: | |
| return root_match.group(1).strip() | |
| return cleaned | |
| def get_xml_tag(text: str, tag: str) -> Optional[str]: | |
| cleaned = strip_xml_wrappers(text) | |
| pattern = rf"<{tag}>(.*?)</{tag}>" | |
| match = re.search(pattern, cleaned, re.DOTALL) | |
| return match.group(1).strip() if match else None | |
| def get_xml_children(text: str, parent_tag: str, child_tag: str) -> List[str]: | |
| parent_content = get_xml_tag(text, parent_tag) | |
| if not parent_content: | |
| return [] | |
| pattern = rf"<{child_tag}>(.*?)</{child_tag}>" | |
| return [m.strip() for m in re.findall(pattern, parent_content, re.DOTALL)] | |
| def generate_content(system_instruction: str, prompt: str) -> str: | |
| api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise ValueError("No Gemini/Google API key found") | |
| model = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" | |
| payload = { | |
| "contents": [ | |
| { | |
| "role": "user", | |
| "parts": [ | |
| {"text": prompt} | |
| ] | |
| } | |
| ], | |
| "systemInstruction": { | |
| "parts": [ | |
| {"text": system_instruction} | |
| ] | |
| }, | |
| "generationConfig": { | |
| "temperature": 0.2 | |
| } | |
| } | |
| req_data = json.dumps(payload).encode("utf-8") | |
| import urllib.request | |
| req = urllib.request.Request( | |
| url, | |
| data=req_data, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST" | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=60.0) as response: | |
| resp_data = json.loads(response.read().decode("utf-8")) | |
| candidates = resp_data.get("candidates", []) | |
| if not candidates: | |
| raise RuntimeError("Gemini generateContent returned no candidates") | |
| parts = candidates[0].get("content", {}).get("parts", []) | |
| if not parts: | |
| raise RuntimeError("Gemini generateContent candidate content had no parts") | |
| return parts[0].get("text", "") | |
| except Exception as e: | |
| raise RuntimeError(f"Gemini generateContent call failed: {e}") | |
| def summarize(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]: | |
| session_id = data.get("sessionId") | |
| if not session_id: | |
| return {"success": False, "error": "sessionId is required"} | |
| session = kv.get(KV.sessions, session_id) | |
| if not session: | |
| return {"success": False, "error": "session_not_found"} | |
| observations = kv.list(KV.observations(session_id)) | |
| compressed = [o for o in observations if o.get("title")] | |
| if not compressed: | |
| return {"success": False, "error": "no_observations"} | |
| SUMMARY_SYSTEM = """You are a session summarization assistant. Your job is to read all raw tool executions and outcomes from a coding session and produce a high-fidelity summary. | |
| Output XML: | |
| <summary> | |
| <title>Concise title summarizing the session</title> | |
| <narrative>1-2 paragraphs of narrative describing what was done, what succeeded, and what failed</narrative> | |
| <decisions> | |
| <decision>Architectural decision, key insight, or choice made</decision> | |
| </decisions> | |
| <files> | |
| <file>path/to/modified/file</file> | |
| </files> | |
| <concepts> | |
| <concept>important concept, library, tool, or command used</concept> | |
| </concepts> | |
| </summary>""" | |
| chunk_size = 400 | |
| chunks = [compressed[i:i + chunk_size] for i in range(0, len(compressed), chunk_size)] | |
| partial_summaries = [] | |
| for idx, chunk in enumerate(chunks): | |
| obs_text = "" | |
| for o in chunk: | |
| obs_text += f"[{o.get('type')}] {o.get('title')}\n{o.get('narrative') or ''}\nFiles: {', '.join(o.get('files') or [])}\n\n" | |
| prompt = f"Summarize this chunk {idx+1}/{len(chunks)} of observations:\n\n{obs_text}" | |
| try: | |
| response = generate_content(SUMMARY_SYSTEM, prompt) | |
| cleaned = strip_xml_wrappers(response) | |
| title = get_xml_tag(cleaned, "title") | |
| if not title: | |
| continue | |
| partial_summaries.append({ | |
| "title": title, | |
| "narrative": get_xml_tag(cleaned, "narrative") or "", | |
| "keyDecisions": get_xml_children(cleaned, "decisions", "decision"), | |
| "filesModified": get_xml_children(cleaned, "files", "file"), | |
| "concepts": get_xml_children(cleaned, "concepts", "concept"), | |
| }) | |
| except Exception as e: | |
| last_error = str(e) | |
| print(f"[summarize] Chunk {idx+1} failed: {e}") | |
| if not partial_summaries: | |
| return {"success": False, "error": f"No chunks summarized successfully. Last error: {last_error}"} | |
| if len(partial_summaries) == 1: | |
| final_summary = { | |
| "sessionId": session_id, | |
| "project": session.get("project"), | |
| "createdAt": datetime.datetime.utcnow().isoformat() + "Z", | |
| "title": partial_summaries[0]["title"], | |
| "narrative": partial_summaries[0]["narrative"], | |
| "keyDecisions": partial_summaries[0]["keyDecisions"], | |
| "filesModified": partial_summaries[0]["filesModified"], | |
| "concepts": partial_summaries[0]["concepts"], | |
| "observationCount": len(compressed) | |
| } | |
| else: | |
| REDUCE_SYSTEM = """You are a session summarization reducer. Reduce multiple partial chunk summaries into a single final summary. | |
| Output XML: | |
| <summary> | |
| <title>Concise final title summarizing the entire session</title> | |
| <narrative>Comprehensive narrative describing what was done, what succeeded, and what failed</narrative> | |
| <decisions> | |
| <decision>Architectural decision, key insight, or choice made</decision> | |
| </decisions> | |
| <files> | |
| <file>path/to/modified/file</file> | |
| </files> | |
| <concepts> | |
| <concept>important concept, library, tool, or command used</concept> | |
| </concepts> | |
| </summary>""" | |
| reduce_prompt = "Reduce these partial summaries:\n\n" | |
| for idx, ps in enumerate(partial_summaries): | |
| reduce_prompt += f"[Chunk {idx+1}]\nTitle: {ps['title']}\nNarrative: {ps['narrative']}\nDecisions: {', '.join(ps['keyDecisions'])}\nFiles: {', '.join(ps['filesModified'])}\nConcepts: {', '.join(ps['concepts'])}\n\n" | |
| try: | |
| response = generate_content(REDUCE_SYSTEM, reduce_prompt) | |
| cleaned = strip_xml_wrappers(response) | |
| final_summary = { | |
| "sessionId": session_id, | |
| "project": session.get("project"), | |
| "createdAt": datetime.datetime.utcnow().isoformat() + "Z", | |
| "title": get_xml_tag(cleaned, "title") or partial_summaries[0]["title"], | |
| "narrative": get_xml_tag(cleaned, "narrative") or "", | |
| "keyDecisions": get_xml_children(cleaned, "decisions", "decision"), | |
| "filesModified": get_xml_children(cleaned, "files", "file"), | |
| "concepts": get_xml_children(cleaned, "concepts", "concept"), | |
| "observationCount": len(compressed) | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"Reduction failed: {e}"} | |
| kv.set(KV.summaries, session_id, final_summary) | |
| session = kv.get(KV.sessions, session_id) | |
| if session: | |
| session["title"] = final_summary["title"] | |
| session["summary"] = final_summary["narrative"] | |
| kv.set(KV.sessions, session_id, session) | |
| safe_audit(kv, "compress", "mem::summarize", [session_id], { | |
| "title": final_summary["title"], | |
| "observationCount": len(compressed) | |
| }) | |
| return {"success": True, "summary": final_summary} | |
| def consolidate(kv: StateKV, project: Optional[str] = None, min_observations: int = 10) -> Dict[str, Any]: | |
| sessions = list_sessions(kv) | |
| if project: | |
| sessions = [s for s in sessions if s.get("project") == project] | |
| all_obs = [] | |
| for s in sessions: | |
| obs_list = kv.list(KV.observations(s["id"])) | |
| for o in obs_list: | |
| if o.get("title") and o.get("importance", 5) >= 5: | |
| all_obs.append((o, s["id"])) | |
| if len(all_obs) < min_observations: | |
| return {"consolidated": 0, "reason": "insufficient_observations", "success": True} | |
| # Group observations by concepts | |
| concept_groups = {} | |
| for obs, sid in all_obs: | |
| concepts = obs.get("concepts") or [] | |
| for c in concepts: | |
| key = c.lower().strip() | |
| if not key: | |
| continue | |
| if key not in concept_groups: | |
| concept_groups[key] = [] | |
| concept_groups[key].append((obs, sid)) | |
| # Sort groups that have >= 3 observations by size descending | |
| sorted_groups = sorted( | |
| [(k, g) for k, g in concept_groups.items() if len(g) >= 3], | |
| key=lambda x: len(x[1]), | |
| reverse=True | |
| ) | |
| consolidated_count = 0 | |
| existing_memories = kv.list(KV.memories) | |
| MAX_LLM_CALLS = 10 | |
| llm_calls = 0 | |
| # Prompt templates | |
| CONSOLIDATION_SYSTEM = """You are a memory consolidation engine. Given a set of related observations from coding sessions, synthesize them into a single long-term memory. | |
| Output XML: | |
| <memory> | |
| <type>pattern|preference|architecture|bug|workflow|fact</type> | |
| <title>Concise memory title (max 80 chars)</title> | |
| <content>2-4 sentence description of the learned insight</content> | |
| <concepts> | |
| <concept>key term</concept> | |
| </concepts> | |
| <files> | |
| <file>relevant/file/path</file> | |
| </files> | |
| <strength>1-10 how confident/important this memory is</strength> | |
| </memory>""" | |
| for concept, obs_group in sorted_groups: | |
| if llm_calls >= MAX_LLM_CALLS: | |
| break | |
| # Get top 8 by importance | |
| top = sorted(obs_group, key=lambda x: x[0].get("importance", 5), reverse=True)[:8] | |
| session_ids = list(set([x[1] for x in top])) | |
| obs_ids = list(set([x[0]["id"] for x in top])) | |
| prompt_parts = [] | |
| for obs, sid in top: | |
| prompt_parts.append(f"[{obs.get('type')}] {obs.get('title')}\n{obs.get('narrative') or ''}\nFiles: {', '.join(obs.get('files') or [])}\nImportance: {obs.get('importance', 5)}") | |
| obs_prompt = "\n\n".join(prompt_parts) | |
| try: | |
| response = generate_content(CONSOLIDATION_SYSTEM, f"Concept: \"{concept}\"\n\nObservations:\n{obs_prompt}") | |
| llm_calls += 1 | |
| cleaned = strip_xml_wrappers(response) | |
| m_type = get_xml_tag(cleaned, "type") or "fact" | |
| m_title = get_xml_tag(cleaned, "title") | |
| m_content = get_xml_tag(cleaned, "content") | |
| if not m_title or not m_content: | |
| continue | |
| m_strength_str = get_xml_tag(cleaned, "strength") or "5" | |
| try: | |
| m_strength = max(1, min(10, int(m_strength_str))) | |
| except Exception: | |
| m_strength = 5 | |
| concepts_list = get_xml_children(cleaned, "concepts", "concept") | |
| files_list = get_xml_children(cleaned, "files", "file") | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| # Find existing memory with same title | |
| existing_match = None | |
| for mem in existing_memories: | |
| if mem.get("title", "").lower() == m_title.lower() and mem.get("isLatest") is not False: | |
| if not project or not mem.get("project") or mem.get("project") == project: | |
| existing_match = mem | |
| break | |
| if existing_match: | |
| existing_match["isLatest"] = False | |
| kv.set(KV.memories, existing_match["id"], existing_match) | |
| evolved = { | |
| "id": generate_id("mem"), | |
| "createdAt": now, | |
| "updatedAt": now, | |
| "type": m_type, | |
| "title": m_title, | |
| "content": m_content, | |
| "concepts": concepts_list, | |
| "files": files_list, | |
| "sessionIds": session_ids, | |
| "strength": m_strength, | |
| "version": (existing_match.get("version") or 1) + 1, | |
| "parentId": existing_match["id"], | |
| "supersedes": [existing_match["id"]] + (existing_match.get("supersedes") or []), | |
| "sourceObservationIds": obs_ids, | |
| "isLatest": True | |
| } | |
| if project: | |
| evolved["project"] = project | |
| kv.set(KV.memories, evolved["id"], evolved) | |
| consolidated_count += 1 | |
| else: | |
| memory = { | |
| "id": generate_id("mem"), | |
| "createdAt": now, | |
| "updatedAt": now, | |
| "type": m_type, | |
| "title": m_title, | |
| "content": m_content, | |
| "concepts": concepts_list, | |
| "files": files_list, | |
| "sessionIds": session_ids, | |
| "strength": m_strength, | |
| "version": 1, | |
| "sourceObservationIds": obs_ids, | |
| "isLatest": True | |
| } | |
| if project: | |
| memory["project"] = project | |
| kv.set(KV.memories, memory["id"], memory) | |
| consolidated_count += 1 | |
| except Exception as e: | |
| print(f"[consolidate] Concept '{concept}' failed: {e}") | |
| # === Semantic Memory Fact Merger === | |
| summaries = kv.list(KV.summaries) | |
| new_facts_count = 0 | |
| if len(summaries) >= 5: | |
| recent_summaries = sorted( | |
| summaries, | |
| key=lambda s: s.get("createdAt", ""), | |
| reverse=True | |
| )[:20] | |
| SEMANTIC_MERGE_SYSTEM = """You are a memory consolidation engine. Given overlapping episodic memories (session summaries), extract stable factual knowledge. | |
| Output format (XML): | |
| <facts> | |
| <fact confidence="0.0-1.0">Concise factual statement</fact> | |
| </facts> | |
| Rules: | |
| - Extract only facts that appear in 2+ episodes or are highly confident | |
| - Confidence reflects how well-supported the fact is across episodes | |
| - Combine overlapping information into single concise facts | |
| - Skip ephemeral details (specific error messages, temporary states)""" | |
| prompt_parts = [] | |
| for i, s in enumerate(recent_summaries): | |
| prompt_parts.append(f"[Episode {i + 1}]\nTitle: {s.get('title')}\nNarrative: {s.get('narrative') or ''}\nConcepts: {', '.join(s.get('concepts') or [])}") | |
| merge_prompt = "Consolidate these episodic memories into stable facts:\n\n" + "\n\n".join(prompt_parts) | |
| try: | |
| response = generate_content(SEMANTIC_MERGE_SYSTEM, merge_prompt) | |
| fact_matches = re.findall(r'<fact\s+confidence="([^"]+)">([^<]+)</fact>', response, re.DOTALL) | |
| existing_semantic = kv.list(KV.semantic) | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| for conf_str, fact_text in fact_matches: | |
| fact_text = fact_text.strip() | |
| try: | |
| confidence = float(conf_str) | |
| except Exception: | |
| confidence = 0.5 | |
| existing = None | |
| for es in existing_semantic: | |
| if es.get("fact", "").lower() == fact_text.lower(): | |
| existing = es | |
| break | |
| if existing: | |
| existing["accessCount"] = (existing.get("accessCount") or 0) + 1 | |
| existing["lastAccessedAt"] = now | |
| existing["updatedAt"] = now | |
| existing["confidence"] = max(existing.get("confidence", 0.5), confidence) | |
| kv.set(KV.semantic, existing["id"], existing) | |
| else: | |
| sem = { | |
| "id": generate_id("sem"), | |
| "fact": fact_text, | |
| "confidence": confidence, | |
| "sourceSessionIds": [s["sessionId"] for s in recent_summaries if "sessionId" in s], | |
| "sourceMemoryIds": [], | |
| "accessCount": 1, | |
| "lastAccessedAt": now, | |
| "strength": confidence, | |
| "createdAt": now, | |
| "updatedAt": now | |
| } | |
| kv.set(KV.semantic, sem["id"], sem) | |
| new_facts_count += 1 | |
| except Exception as e: | |
| print(f"[consolidate] Semantic merge failed: {e}") | |
| # === Procedural Memory Extraction === | |
| memories = kv.list(KV.memories) | |
| new_procs_count = 0 | |
| patterns = [] | |
| for m in memories: | |
| if m.get("isLatest") is not False and m.get("type") == "pattern": | |
| freq = len(m.get("sessionIds") or []) | |
| if freq >= 2: | |
| patterns.append({"content": m.get("content", ""), "frequency": freq}) | |
| if len(patterns) >= 2: | |
| PROCEDURAL_EXTRACTION_SYSTEM = """You are a procedural memory extractor. Given repeated patterns and workflows observed across sessions, extract reusable procedures. | |
| Output format (XML): | |
| <procedures> | |
| <procedure name="short descriptive name" trigger="when to use this procedure"> | |
| <step>Step 1 description</step> | |
| <step>Step 2 description</step> | |
| </procedure> | |
| </procedures> | |
| Rules: | |
| - Only extract procedures observed 2+ times | |
| - Steps should be concrete and actionable | |
| - Trigger condition should be specific enough to match automatically""" | |
| prompt_parts = [] | |
| for i, p in enumerate(patterns): | |
| prompt_parts.append(f"[Pattern {i + 1}] (seen {p['frequency']}x)\n{p['content']}") | |
| proc_prompt = "Extract reusable procedures from these recurring patterns:\n\n" + "\n\n".join(prompt_parts) | |
| try: | |
| response = generate_content(PROCEDURAL_EXTRACTION_SYSTEM, proc_prompt) | |
| proc_matches = re.findall(r'<procedure\s+name="([^"]+)"\s+trigger="([^"]+)">([\s\S]*?)</procedure>', response, re.DOTALL) | |
| existing_procs = kv.list(KV.procedural) | |
| now = datetime.datetime.utcnow().isoformat() + "Z" | |
| for name, trigger, steps_block in proc_matches: | |
| steps = [s.strip() for s in re.findall(r'<step>([^<]+)</step>', steps_block, re.DOTALL)] | |
| existing = None | |
| for ep in existing_procs: | |
| if ep.get("name", "").lower() == name.lower(): | |
| existing = ep | |
| break | |
| if existing: | |
| existing["frequency"] = (existing.get("frequency") or 1) + 1 | |
| existing["updatedAt"] = now | |
| existing["strength"] = min(1.0, (existing.get("strength") or 0.5) + 0.1) | |
| kv.set(KV.procedural, existing["id"], existing) | |
| else: | |
| proc = { | |
| "id": generate_id("proc"), | |
| "name": name, | |
| "steps": steps, | |
| "triggerCondition": trigger, | |
| "frequency": 1, | |
| "sourceSessionIds": [], | |
| "strength": 0.5, | |
| "createdAt": now, | |
| "updatedAt": now | |
| } | |
| kv.set(KV.procedural, proc["id"], proc) | |
| new_procs_count += 1 | |
| except Exception as e: | |
| print(f"[consolidate] Procedural extraction failed: {e}") | |
| res_summary = { | |
| "success": True, | |
| "consolidated": consolidated_count, | |
| "totalObservations": len(all_obs), | |
| "semantic": { | |
| "newFacts": new_facts_count, | |
| "totalSummaries": len(summaries) | |
| }, | |
| "procedural": { | |
| "newProcedures": new_procs_count, | |
| "patternsAnalyzed": len(patterns) | |
| } | |
| } | |
| safe_audit(kv, "consolidate", "mem::consolidate-pipeline", [], res_summary) | |
| commit_if_enabled(kv, f"Consolidation complete: consolidated={consolidated_count}, facts={new_facts_count}, procs={new_procs_count}", "system") | |
| return res_summary | |
| # Setup persistence helper wire-ups | |
| def set_index_persistence(persistence: IndexPersistence) -> None: | |
| global _index_persistence | |
| _index_persistence = persistence | |
| def set_embedding_provider(provider) -> None: | |
| global _embedding_provider, _hybrid_search | |
| _embedding_provider = provider | |
| _hybrid_search = HybridSearch( | |
| _bm25_index, | |
| _vector_index, | |
| _embedding_provider, | |
| None | |
| ) | |
| def set_stream_broadcaster(broadcaster) -> None: | |
| global _stream_broadcaster | |
| _stream_broadcaster = broadcaster | |
| def broadcast_stream(payload: Dict[str, Any]) -> None: | |
| if _stream_broadcaster: | |
| try: | |
| _stream_broadcaster(payload) | |
| except Exception as e: | |
| print(f"[broadcaster] Failed: {e}") | |