import os
import re
import time
import uuid
import json
import hashlib
import datetime
from typing import Dict, Any, List, Optional, Tuple, Set
from db import StateKV
from search import (
SearchIndex,
VectorIndex,
GeminiEmbeddingProvider,
HybridSearch,
base64_to_float32,
float32_to_base64
)
# =====================================================================
# Global Variables / Module State
# =====================================================================
_bm25_index = SearchIndex()
_vector_index = VectorIndex()
_embedding_provider = None
_hybrid_search = HybridSearch(_bm25_index, _vector_index, None, None)
_index_persistence = None
_stream_broadcaster = None # Callable: (payload) -> None
# Default scopes matching schema.ts
class KV:
sessions = "mem:sessions"
memories = "mem:memories"
summaries = "mem:summaries"
config = "mem:config"
metrics = "mem:metrics"
health = "mem:health"
bm25Index = "mem:index:bm25"
relations = "mem:relations"
profiles = "mem:profiles"
claudeBridge = "mem:claude-bridge"
graphNodes = "mem:graph:nodes"
graphEdges = "mem:graph:edges"
graphSnapshot = "mem:graph:snapshot"
graphNameIndex = "mem:graph:name-index"
graphEdgeKey = "mem:graph:edge-key"
graphNodeDegree = "mem:graph:node-degree"
semantic = "mem:semantic"
procedural = "mem:procedural"
audit = "mem:audit"
actions = "mem:actions"
actionEdges = "mem:action-edges"
leases = "mem:leases"
routines = "mem:routines"
routineRuns = "mem:routine-runs"
signals = "mem:signals"
checkpoints = "mem:checkpoints"
mesh = "mem:mesh"
sketches = "mem:sketches"
facets = "mem:facets"
sentinels = "mem:sentinels"
crystals = "mem:crystals"
lessons = "mem:lessons"
insights = "mem:insights"
graphEdgeHistory = "mem:graph:edge-history"
retentionScores = "mem:retention"
accessLog = "mem:access"
imageRefs = "mem:image-refs"
slots = "mem:slots"
globalSlots = "mem:slots:global"
commits = "mem:commits"
recentSearches = "mem:recent-searches"
@staticmethod
def observations(session_id: str) -> str:
return f"mem:obs:{session_id}"
@staticmethod
def team_shared(team_id: str) -> str:
return f"mem:team:{team_id}:shared"
@staticmethod
def team_users(team_id: str, user_id: str) -> str:
return f"mem:team:{team_id}:users:{user_id}"
@staticmethod
def team_profile(team_id: str) -> str:
return f"mem:team:{team_id}:profile"
@staticmethod
def enriched_chunks(session_id: str) -> str:
return f"mem:enriched:{session_id}"
@staticmethod
def latent_embeddings(obs_id: str) -> str:
return f"mem:latent:{obs_id}"
# =====================================================================
# Core Helpers & Utilities
# =====================================================================
def generate_id(prefix: str) -> str:
t = int(time.time() * 1000)
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
ts_str = ""
while t > 0:
ts_str = chars[t % 36] + ts_str
t //= 36
if not ts_str:
ts_str = "0"
rand = uuid.uuid4().hex[:12]
return f"{prefix}_{ts_str}_{rand}"
def fingerprint_id(prefix: str, content: str) -> str:
h = hashlib.sha256(content.strip().lower().encode('utf-8')).hexdigest()
return f"{prefix}_{h[:16]}"
def auto_complete_old_active_sessions(kv: StateKV, current_session_id: str) -> int:
sessions = kv.list(KV.sessions)
count = 0
now = datetime.datetime.utcnow().isoformat() + "Z"
for s in sessions:
if s.get("id") != current_session_id and s.get("status") == "active":
s["status"] = "completed"
if "endedAt" not in s:
s["endedAt"] = now
s["updatedAt"] = now
kv.set(KV.sessions, s["id"], s)
count += 1
if count > 0:
print(f"[session] Auto-completed {count} dangling active sessions.")
return count
def jaccard_similarity(a: str, b: str) -> float:
tokens_a = [t for t in a.split() if len(t) > 2]
tokens_b = [t for t in b.split() if len(t) > 2]
set_a = set(tokens_a)
set_b = set(tokens_b)
if not set_a and not set_b:
return 1.0
if not set_a or not set_b:
return 0.0
intersection = len(set_a.intersection(set_b))
union = len(set_a.union(set_b))
return intersection / union
# =====================================================================
# Privacy & Data Scrubbing
# =====================================================================
PRIVATE_TAG_RE = re.compile(r'[\s\S]*?', re.IGNORECASE)
SECRET_PATTERN_SOURCES = [
re.compile(r'(?:api[_-]?key|secret|token|password|credential|auth)[\s]*[=:]\s*["\']?[A-Za-z0-9_\-/.+]{20,}["\']?', re.IGNORECASE),
re.compile(r'Bearer\s+[A-Za-z0-9._\-+/=]{20,}', re.IGNORECASE),
re.compile(r'sk-proj-[A-Za-z0-9\-_]{20,}', re.IGNORECASE),
re.compile(r'(?:sk|pk|rk|ak)-[A-Za-z0-9][A-Za-z0-9\-_]{19,}', re.IGNORECASE),
re.compile(r'sk-ant-[A-Za-z0-9\-_]{20,}', re.IGNORECASE),
re.compile(r'gh[pus]_[A-Za-z0-9]{36,}', re.IGNORECASE),
re.compile(r'github_pat_[A-Za-z0-9_]{22,}', re.IGNORECASE),
re.compile(r'xoxb-[A-Za-z0-9\-]+', re.IGNORECASE),
re.compile(r'AKIA[0-9A-Z]{16}', re.IGNORECASE),
re.compile(r'AIza[A-Za-z0-9\-_]{35}', re.IGNORECASE),
re.compile(r'eyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}', re.IGNORECASE),
re.compile(r'npm_[A-Za-z0-9]{36}', re.IGNORECASE),
re.compile(r'glpat-[A-Za-z0-9\-_]{20,}', re.IGNORECASE),
re.compile(r'dop_v1_[A-Za-z0-9]{64}', re.IGNORECASE),
]
def strip_private_data(input_str: str) -> str:
result = PRIVATE_TAG_RE.sub("[REDACTED]", input_str)
for pattern in SECRET_PATTERN_SOURCES:
result = pattern.sub("[REDACTED_SECRET]", result)
return result
# =====================================================================
# Audit Log System
# =====================================================================
def record_audit(
kv: StateKV,
operation: str,
function_id: str,
target_ids: List[str],
details: Dict[str, Any] = {},
quality_score: Optional[float] = None,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
entry = {
"id": generate_id("aud"),
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"operation": operation,
"userId": user_id,
"functionId": function_id,
"targetIds": target_ids,
"details": details,
"qualityScore": quality_score,
}
kv.set(KV.audit, entry["id"], entry)
return entry
def safe_audit(
kv: StateKV,
operation: str,
function_id: str,
target_ids: List[str],
details: Dict[str, Any] = {},
quality_score: Optional[float] = None,
user_id: Optional[str] = None,
) -> None:
try:
record_audit(kv, operation, function_id, target_ids, details, quality_score, user_id)
except Exception as e:
print(f"[audit] Failed to write audit: {e}")
def query_audit(
kv: StateKV,
filter_opts: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
all_entries = kv.list(KV.audit)
entries = sorted(all_entries, key=lambda x: x.get("timestamp", ""), reverse=True)
if not filter_opts:
return entries[:100]
op = filter_opts.get("operation")
if op:
entries = [e for e in entries if e.get("operation") == op]
import dateutil.parser
date_from = filter_opts.get("dateFrom")
if date_from:
try:
dt_from = dateutil.parser.parse(date_from).replace(tzinfo=None)
filtered_entries = []
for e in entries:
ts = e.get("timestamp")
if ts:
try:
dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None)
if dt_ts >= dt_from:
filtered_entries.append(e)
except Exception:
pass
entries = filtered_entries
except Exception:
pass
date_to = filter_opts.get("dateTo")
if date_to:
try:
dt_to = dateutil.parser.parse(date_to).replace(tzinfo=None)
filtered_entries = []
for e in entries:
ts = e.get("timestamp")
if ts:
try:
dt_ts = dateutil.parser.parse(ts).replace(tzinfo=None)
if dt_ts <= dt_to:
filtered_entries.append(e)
except Exception:
pass
entries = filtered_entries
except Exception:
pass
limit = filter_opts.get("limit", 100)
return entries[:limit]
# =====================================================================
# Image Store System
# =====================================================================
IMAGES_DIR = os.path.join(os.path.expanduser("~"), ".agentmemory", "images")
def get_max_bytes() -> int:
return int(os.getenv("AGENTMEMORY_IMAGE_STORE_MAX_BYTES", 500 * 1024 * 1024))
def is_managed_image_path(file_path: str) -> bool:
if not file_path:
return False
resolved = os.path.abspath(file_path)
normalized_images_dir = os.path.abspath(IMAGES_DIR)
return resolved.startswith(normalized_images_dir + os.sep) or resolved == normalized_images_dir
def save_image_to_disk(base64_data: str) -> Tuple[str, int]:
if not base64_data:
return "", 0
if not os.path.exists(IMAGES_DIR):
os.makedirs(IMAGES_DIR, exist_ok=True)
clean_base64 = base64_data
ext = "png"
if base64_data.startswith("data:image/"):
comma_idx = base64_data.find(",")
if comma_idx != -1:
meta = base64_data[:comma_idx]
if "jpeg" in meta or "jpg" in meta:
ext = "jpg"
elif "webp" in meta:
ext = "webp"
elif "gif" in meta:
ext = "gif"
clean_base64 = base64_data[comma_idx + 1:]
elif base64_data.startswith("/9j/"):
ext = "jpg"
h = hashlib.sha256(clean_base64.encode('utf-8')).hexdigest()
file_path = os.path.join(IMAGES_DIR, f"{h}.{ext}")
if os.path.exists(file_path):
return file_path, 0
import base64
buffer = base64.b64decode(clean_base64)
with open(file_path, "wb") as f:
f.write(buffer)
size = os.path.getsize(file_path)
return file_path, size
def delete_image(file_path: Optional[str]) -> int:
if not file_path or not is_managed_image_path(file_path):
return 0
try:
if os.path.exists(file_path):
size = os.path.getsize(file_path)
os.remove(file_path)
return size
except Exception as e:
print(f"[agentmemory] Failed to delete image context: {e}")
return 0
def touch_image(file_path: str) -> None:
if not file_path or not is_managed_image_path(file_path):
return
try:
if os.path.exists(file_path):
os.utime(file_path, None)
except Exception:
pass
# =====================================================================
# Index Persistence System (JSON Sharded)
# =====================================================================
class IndexPersistence:
def __init__(self, kv: StateKV, bm25: SearchIndex, vector: Optional[VectorIndex]):
self.kv = kv
self.bm25 = bm25
self.vector = vector
def schedule_save(self) -> None:
self.save()
def save(self) -> None:
try:
self.save_sharded_index(
json.dumps(self.bm25.serialize_data()),
"data:manifest",
"data",
"mem:index:bm25:bm25:"
)
if self.vector:
self.save_sharded_index(
json.dumps(self.vector.serialize_data()),
"vectors:manifest",
"vectors",
"mem:index:bm25:vectors:"
)
except Exception as e:
print(f"[index persistence] failed to save index: {e}")
def save_sharded_index(self, serialized: str, manifest_key: str, legacy_key: str, scope_prefix: str) -> None:
previous = self.kv.get(KV.bm25Index, manifest_key)
generation = generate_id("idx")
chunk_chars = 2000000
shards = []
chunks = []
offset = 0
shard_idx = 0
while offset < len(serialized):
scope = f"{scope_prefix}{generation}:{str(shard_idx).zfill(5)}"
chunk = serialized[offset:offset + chunk_chars]
shards.append({"scope": scope, "key": "data", "chars": len(chunk)})
chunks.append(chunk)
offset += chunk_chars
shard_idx += 1
for shard, chunk in zip(shards, chunks):
self.kv.set(shard["scope"], shard["key"], chunk)
next_manifest = {
"v": 1,
"generation": generation,
"shards": shards,
"chars": len(serialized)
}
self.kv.set(KV.bm25Index, manifest_key, next_manifest)
self.kv.delete(KV.bm25Index, legacy_key)
# Cleanup ALL obsolete shards starting with scope_prefix that are NOT in the current shards
try:
conn = self.kv._get_conn()
try:
with conn.cursor() as cursor:
cursor.execute(
"SELECT DISTINCT scope FROM kv_store WHERE scope LIKE %s",
(scope_prefix + "%",)
)
rows = cursor.fetchall()
current_scopes = {s["scope"] for s in shards}
to_delete = []
for row in rows:
scope_name = row["scope"]
if scope_name not in current_scopes:
to_delete.append(scope_name)
if to_delete:
for i in range(0, len(to_delete), 50):
chunk_delete = to_delete[i:i + 50]
format_strings = ','.join(['%s'] * len(chunk_delete))
cursor.execute(
f"DELETE FROM kv_store WHERE scope IN ({format_strings})",
tuple(chunk_delete)
)
finally:
conn.close()
except Exception as ex:
print(f"[index persistence] error cleaning up obsolete shards: {ex}")
if previous and isinstance(previous, dict) and previous.get("v") == 1 and isinstance(previous.get("shards"), list):
current_shards = {(s["scope"], s["key"]) for s in shards}
for old_shard in previous["shards"]:
if (old_shard["scope"], old_shard["key"]) not in current_shards:
self.kv.delete(old_shard["scope"], old_shard["key"])
def load(self) -> Dict[str, Any]:
bm25_data = self.load_sharded_data("data", "data:manifest")
bm25_loaded = False
if bm25_data:
try:
self.bm25.restore_from_data(json.loads(bm25_data))
bm25_loaded = True
except Exception as e:
print(f"[index persistence] failed to restore BM25: {e}")
vector_loaded = False
if self.vector:
vector_data = self.load_sharded_data("vectors", "vectors:manifest")
if vector_data:
try:
self.vector.restore_from_data(json.loads(vector_data))
vector_loaded = True
except Exception as e:
print(f"[index persistence] failed to restore vectors: {e}")
return {"bm25": bm25_loaded, "vector": vector_loaded}
def load_sharded_data(self, legacy_key: str, manifest_key: str) -> Optional[str]:
manifest = self.kv.get(KV.bm25Index, manifest_key)
if manifest and isinstance(manifest, dict) and manifest.get("v") == 1:
shards = manifest.get("shards", [])
chunks = []
for shard in shards:
chunk = self.kv.get(shard["scope"], shard["key"])
if chunk is None:
return None
chunks.append(chunk)
return "".join(chunks)
legacy = self.kv.get(KV.bm25Index, legacy_key)
if isinstance(legacy, str):
return legacy
return None
# =====================================================================
# Vector Index / Embedding Helpers
# =====================================================================
def clip_embed_input(text: str) -> str:
EMBED_MAX_CHARS = 16000
if len(text) <= EMBED_MAX_CHARS:
return text
return text[:EMBED_MAX_CHARS]
def get_agent_id() -> Optional[str]:
return os.getenv("AGENT_ID") or None
def commit_if_enabled(kv: StateKV, message: str, agent_id: Optional[str]) -> Optional[str]:
return kv.commit_version(message, agent_id or "unknown-agent")
def is_agent_scope_isolated() -> bool:
return os.getenv("AGENTMEMORY_AGENT_SCOPE") == "isolated"
def is_auto_compress_enabled() -> bool:
return os.getenv("AGENTMEMORY_AUTO_COMPRESS") == "true"
def is_slots_enabled() -> bool:
return os.getenv("AGENTMEMORY_SLOTS") == "true"
def is_reflect_enabled() -> bool:
return os.getenv("AGENTMEMORY_REFLECT") == "true"
def is_graph_extraction_enabled() -> bool:
return os.getenv("GRAPH_EXTRACTION_ENABLED") == "true"
def is_consolidation_enabled() -> bool:
val = os.getenv("CONSOLIDATION_ENABLED")
if val in ("false", "0"):
return False
if val in ("true", "1"):
return True
return bool(os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"))
def vector_index_add_guarded(
obs_id: str,
session_id: str,
text: str,
context: Dict[str, Any]
) -> bool:
vi = _vector_index
ep = _embedding_provider
if not vi or not ep:
return False
try:
clipped = clip_embed_input(text)
embedding = ep.embed(clipped)
if len(embedding) != ep.dimensions:
print(f"[vector-index] Dimension mismatch: expected {ep.dimensions}, got {len(embedding)}")
return False
vi.add(obs_id, session_id, embedding)
return True
except Exception as e:
print(f"[vector-index] Embed failed: {e}")
return False
# =====================================================================
# Observation System (Observe, Synthetic Compression)
# =====================================================================
def extract_image(d: Any) -> Optional[str]:
if not d:
return None
if isinstance(d, str):
if d.startswith("data:image/") or d.startswith("iVBORw0KGgo") or d.startswith("/9j/"):
return d
return None
if isinstance(d, dict):
for k in ["image_data", "image_path", "imageBase64", "imagePath"]:
if isinstance(d.get(k), str):
return d[k]
for key, val in d.items():
match = extract_image(val)
if match:
return match
return None
def infer_type(tool_name: Optional[str], hook_type: str) -> str:
if hook_type == "post_tool_failure":
return "error"
if hook_type == "prompt_submit":
return "conversation"
if hook_type in ("subagent_stop", "task_completed"):
return "subagent"
if hook_type == "notification":
return "notification"
if not tool_name:
return "other"
n = re.sub(r'([a-z])([A-Z])', r'\1_\2', tool_name)
n = re.sub(r'[-\s]+', '_', n).lower()
def has_word(word: str) -> bool:
return bool(re.search(rf"(^|_){word}(_|$)", n)) or n == word or n.endswith(word) or n.startswith(word)
if any(has_word(w) for w in ["fetch", "http", "web"]):
return "web_fetch"
if any(has_word(w) for w in ["grep", "search", "glob", "find"]):
return "search"
if any(has_word(w) for w in ["bash", "shell", "exec", "run"]):
return "command_run"
if any(has_word(w) for w in ["edit", "update", "patch", "replace"]):
return "file_edit"
if any(has_word(w) for w in ["write", "create"]):
return "file_write"
if any(has_word(w) for w in ["read", "view"]):
return "file_read"
if any(has_word(w) for w in ["task", "agent"]):
return "subagent"
return "other"
def extract_files(input_data: Any) -> List[str]:
if not input_data or not isinstance(input_data, dict):
return []
out = set()
for key in ["file_path", "filepath", "path", "filePath", "file", "pattern"]:
v = input_data.get(key)
if isinstance(v, str) and 0 < len(v) < 512:
out.add(v)
return list(out)
def stringify_for_narrative(v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
return v
try:
return json.dumps(v)
except Exception:
return str(v)
def build_synthetic_compression(raw: Dict[str, Any]) -> Dict[str, Any]:
tool_name = raw.get("toolName") or raw.get("hookType")
input_str = stringify_for_narrative(raw.get("toolInput"))
output_str = stringify_for_narrative(raw.get("toolOutput"))
prompt_str = raw.get("userPrompt") or ""
parts = [s for s in [prompt_str, input_str, output_str] if len(s) > 0]
narrative = " | ".join(parts)
if len(narrative) > 400:
narrative = narrative[:399] + "\u2026"
title = tool_name or "observation"
if len(title) > 80:
title = title[:79] + "\u2026"
subtitle = None
if input_str:
subtitle = input_str
if len(subtitle) > 120:
subtitle = subtitle[:119] + "\u2026"
res = {
"id": raw["id"],
"sessionId": raw["sessionId"],
"timestamp": raw["timestamp"],
"type": infer_type(raw.get("toolName"), raw["hookType"]),
"title": title,
"subtitle": subtitle,
"facts": [],
"narrative": narrative,
"concepts": [],
"files": extract_files(raw.get("toolInput")),
"importance": 5,
"confidence": 0.3,
}
for k in ["modality", "imageData", "agentId"]:
if raw.get(k) is not None:
res[k] = raw[k]
return res
def observe(kv: StateKV, payload: Dict[str, Any]) -> Dict[str, Any]:
session_id = payload.get("sessionId")
hook_type = payload.get("hookType")
timestamp = payload.get("timestamp")
if not session_id or not hook_type or not timestamp:
raise ValueError("Invalid payload: sessionId, hookType, and timestamp are required")
obs_id = generate_id("obs")
sanitized_data = payload.get("data")
try:
json_str = json.dumps(payload.get("data"))
sanitized = strip_private_data(json_str)
sanitized_data = json.loads(sanitized)
except Exception:
sanitized_data = strip_private_data(str(payload.get("data")))
raw = {
"id": obs_id,
"sessionId": session_id,
"timestamp": timestamp,
"hookType": hook_type,
"raw": sanitized_data,
}
extracted_img = extract_image(sanitized_data)
if isinstance(sanitized_data, dict):
if hook_type in ("post_tool_use", "post_tool_failure"):
raw["toolName"] = sanitized_data.get("tool_name")
raw["toolInput"] = sanitized_data.get("tool_input")
raw["toolOutput"] = sanitized_data.get("tool_output") or sanitized_data.get("error")
if hook_type == "prompt_submit":
raw["userPrompt"] = sanitized_data.get("prompt")
if extracted_img:
raw["modality"] = "mixed" if (raw.get("toolInput") or raw.get("toolOutput") or raw.get("userPrompt")) else "image"
elif isinstance(sanitized_data, str) and extracted_img:
raw["modality"] = "image"
max_obs = int(os.getenv("MAX_OBS_PER_SESSION", "500"))
if max_obs > 0:
existing = kv.list(KV.observations(session_id))
if len(existing) >= max_obs:
raise ValueError(f"Session observation limit reached ({max_obs})")
existing_session = kv.get(KV.sessions, session_id)
inherited_agent_id = existing_session.get("agentId") if existing_session else get_agent_id()
if inherited_agent_id:
raw["agentId"] = inherited_agent_id
if extracted_img and (extracted_img.startswith("data:image/") or extracted_img.startswith("iVBORw0KGgo") or extracted_img.startswith("/9j/")):
try:
file_path, bytes_written = save_image_to_disk(extracted_img)
raw["imageData"] = file_path
# Increment image ref count
img_refs = kv.get(KV.imageRefs, file_path) or 0
kv.set(KV.imageRefs, file_path, img_refs + 1)
except Exception as ex:
print(f"[image store] failed: {ex}")
# Set raw observation
kv.set(KV.observations(session_id), obs_id, raw)
# Stream raw observation
broadcast_stream({
"type": "raw_observation",
"sessionId": session_id,
"data": {
"type": "raw",
"observation": raw,
"sessionId": session_id
}
})
if existing_session:
updates = [
{"type": "set", "path": "updatedAt", "value": datetime.datetime.utcnow().isoformat() + "Z"},
{"type": "set", "path": "observationCount", "value": (existing_session.get("observationCount") or 0) + 1}
]
if not existing_session.get("firstPrompt") and isinstance(raw.get("userPrompt"), str):
trimmed = " ".join(raw["userPrompt"].split()).strip()
if trimmed:
updates.append({"type": "set", "path": "firstPrompt", "value": trimmed[:200]})
kv.update(KV.sessions, session_id, updates)
else:
auto_complete_old_active_sessions(kv, session_id)
project = payload.get("project") or "unknown"
cwd = payload.get("cwd") or os.getcwd()
trimmed_prompt = None
if isinstance(raw.get("userPrompt"), str):
trimmed_prompt = " ".join(raw["userPrompt"].split()).strip()[:200]
ts = datetime.datetime.utcnow().isoformat() + "Z"
new_sess = {
"id": session_id,
"project": project,
"cwd": cwd,
"startedAt": payload.get("timestamp") or ts,
"updatedAt": ts,
"status": "active",
"observationCount": 1,
}
if inherited_agent_id:
new_sess["agentId"] = inherited_agent_id
if trimmed_prompt:
new_sess["firstPrompt"] = trimmed_prompt
kv.set(KV.sessions, session_id, new_sess)
# Perform synthetic compression (we default to synthetic)
synthetic = build_synthetic_compression(raw)
for k in ["hookType", "raw", "toolName", "toolInput", "toolOutput", "userPrompt"]:
if k in raw:
synthetic[k] = raw[k]
kv.set(KV.observations(session_id), obs_id, synthetic)
_bm25_index.add(synthetic)
comb_text = synthetic["title"] + " " + (synthetic.get("narrative") or "")
vector_index_add_guarded(synthetic["id"], synthetic["sessionId"], comb_text, {"kind": "synthetic", "logId": synthetic["id"]})
if _index_persistence:
_index_persistence.schedule_save()
# Stream compressed observation
broadcast_stream({
"type": "compressed_observation",
"sessionId": session_id,
"data": {
"type": "compressed",
"observation": synthetic,
"sessionId": session_id
}
})
# Commit to Dolt
commit_if_enabled(kv, f"Observe: {synthetic.get('title', 'observation')} in session {session_id[:8]}", synthetic.get("agentId"))
return {"observationId": obs_id}
# =====================================================================
# Memory System (Remember, Forget, Evolve)
# =====================================================================
def memory_to_observation(memory: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": memory["id"],
"sessionId": memory.get("sessionIds", ["memory"])[0] if memory.get("sessionIds") else "memory",
"timestamp": memory["createdAt"],
"type": "decision",
"title": memory["title"],
"facts": [memory["content"]],
"narrative": memory["content"],
"concepts": memory.get("concepts", []),
"files": memory.get("files", []),
"importance": memory.get("strength", 7),
}
def remember(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
content = data.get("content")
if not content or not content.strip():
raise ValueError("content is required")
content = strip_private_data(content)
concepts = data.get("concepts") or []
files = data.get("files") or []
source_obs = data.get("sourceObservationIds") or []
ttl_days = data.get("ttlDays")
mem_type = data.get("type") or "fact"
project = data.get("project")
if project:
project = project.strip()
now = datetime.datetime.utcnow().isoformat() + "Z"
existing_memories = kv.list(KV.memories)
superseded_id = None
superseded_version = 1
superseded_memory = None
lower_content = content.lower()
for existing in existing_memories:
if existing.get("isLatest") is False:
continue
if project and existing.get("project") and existing["project"] != project:
continue
similarity = jaccard_similarity(lower_content, existing.get("content", "").lower())
if similarity > 0.7:
superseded_id = existing["id"]
superseded_version = existing.get("version") or 1
superseded_memory = existing
break
call_agent_id = data.get("agentId") or get_agent_id()
new_mem = {
"id": generate_id("mem"),
"createdAt": now,
"updatedAt": now,
"type": mem_type,
"title": content[:80],
"content": content,
"concepts": concepts,
"files": files,
"sessionIds": [],
"strength": 7,
"version": superseded_version + 1 if superseded_id else 1,
"parentId": superseded_id,
"supersedes": [superseded_id] if superseded_id else [],
"sourceObservationIds": [i for i in source_obs if i],
"isLatest": True,
}
if call_agent_id:
new_mem["agentId"] = call_agent_id
if project:
new_mem["project"] = project
if ttl_days and isinstance(ttl_days, (int, float)) and ttl_days > 0:
forget_time = datetime.datetime.utcnow() + datetime.timedelta(days=ttl_days)
new_mem["forgetAfter"] = forget_time.isoformat() + "Z"
if superseded_memory:
superseded_memory["isLatest"] = False
kv.set(KV.memories, superseded_memory["id"], superseded_memory)
kv.set(KV.memories, new_mem["id"], new_mem)
try:
_bm25_index.add(memory_to_observation(new_mem))
except Exception as ex:
print(f"[bm25] memory add failed: {ex}")
comb_text = new_mem["title"] + " " + new_mem["content"]
vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]})
if _index_persistence:
_index_persistence.schedule_save()
# Commit to Dolt
commit_if_enabled(kv, f"Remember: {new_mem.get('title', '')}", new_mem.get("agentId"))
return {"success": True, "memory": new_mem}
def forget(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
memory_id = data.get("memoryId")
session_id = data.get("sessionId")
obs_ids = data.get("observationIds") or []
deleted = 0
deleted_mem_ids = []
deleted_obs_ids = []
deleted_session = False
if memory_id:
mem = kv.get(KV.memories, memory_id)
kv.delete(KV.memories, memory_id)
if mem and mem.get("imageRef"):
ref = mem["imageRef"]
refs = kv.get(KV.imageRefs, ref) or 0
if refs > 0:
kv.set(KV.imageRefs, ref, refs - 1)
_bm25_index.remove(memory_id)
if _vector_index:
_vector_index.remove(memory_id)
deleted_mem_ids.append(memory_id)
deleted += 1
if session_id and obs_ids:
for oid in obs_ids:
obs = kv.get(KV.observations(session_id), oid)
kv.delete(KV.observations(session_id), oid)
if obs:
img = obs.get("imageData") or obs.get("imageRef")
if img:
refs = kv.get(KV.imageRefs, img) or 0
if refs > 0:
kv.set(KV.imageRefs, img, refs - 1)
_bm25_index.remove(oid)
if _vector_index:
_vector_index.remove(oid)
deleted_obs_ids.append(oid)
deleted += 1
if session_id and not obs_ids and not memory_id:
obs_list = kv.list(KV.observations(session_id))
for obs in obs_list:
kv.delete(KV.observations(session_id), obs["id"])
img = obs.get("imageData") or obs.get("imageRef")
if img:
refs = kv.get(KV.imageRefs, img) or 0
if refs > 0:
kv.set(KV.imageRefs, img, refs - 1)
_bm25_index.remove(obs["id"])
if _vector_index:
_vector_index.remove(obs["id"])
deleted_obs_ids.append(obs["id"])
deleted += 1
kv.delete(KV.sessions, session_id)
kv.delete(KV.summaries, session_id)
deleted_session = True
deleted += 2
if deleted > 0:
if _index_persistence:
_index_persistence.schedule_save()
safe_audit(
kv,
"forget",
"mem::forget",
deleted_mem_ids + deleted_obs_ids,
{
"sessionId": session_id,
"deleted": deleted,
"memoriesDeleted": len(deleted_mem_ids),
"observationsDeleted": len(deleted_obs_ids),
"sessionDeleted": deleted_session,
"reason": "user-initiated forget"
}
)
# Commit to Dolt
agent_id = data.get("agentId") or get_agent_id()
commit_if_enabled(kv, f"Forget: memory_id={memory_id} session_id={session_id}", agent_id)
return {"success": True, "deleted": deleted}
# =====================================================================
# Prompt Context Compilation System
# =====================================================================
def estimate_tokens(text: str) -> int:
return int(len(text) / 3)
def escape_xml_attr(s: str) -> str:
return s.replace("&", "&").replace('"', """).replace("<", "<").replace(">", ">")
def context(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
session_id = data.get("sessionId")
project = data.get("project")
budget = data.get("budget") or int(os.getenv("TOKEN_BUDGET", "2000"))
if not session_id or not project:
raise ValueError("sessionId and project are required")
blocks = []
# 1. Pinned Slots
pinned_slots = list_pinned_slots(kv)
slot_content = render_pinned_context(pinned_slots)
if slot_content:
blocks.append({
"type": "memory",
"content": slot_content,
"tokens": estimate_tokens(slot_content),
"recency": int(time.time() * 1000)
})
# 2. Profile
profile = kv.get(KV.profiles, project)
if profile:
profile_parts = []
if profile.get("topConcepts"):
profile_parts.append(
"Concepts: " + ", ".join([c["concept"] for c in profile["topConcepts"][:8]])
)
if profile.get("topFiles"):
profile_parts.append(
"Key files: " + ", ".join([f["file"] for f in profile["topFiles"][:5]])
)
if profile.get("conventions"):
profile_parts.append("Conventions: " + "; ".join(profile["conventions"]))
if profile.get("commonErrors"):
profile_parts.append("Common errors: " + "; ".join(profile["commonErrors"][:3]))
if profile_parts:
profile_content = f"## Project Profile\n" + "\n".join(profile_parts)
blocks.append({
"type": "memory",
"content": profile_content,
"tokens": estimate_tokens(profile_content),
"recency": int(time.time() * 1000)
})
# 3. Lessons
lessons = kv.list(KV.lessons)
relevant_lessons = [
l for l in lessons
if not l.get("deleted") and (not l.get("project") or l["project"] == project)
]
# Score lessons
def lesson_score(l):
factor = 1.5 if l.get("project") == project else 1.0
return factor * l.get("confidence", 0.5)
relevant_lessons.sort(key=lesson_score, reverse=True)
relevant_lessons = relevant_lessons[:10]
if relevant_lessons:
items = []
for l in relevant_lessons:
desc = f"- ({l['confidence']:.2f}) {l['content']}"
if l.get("context"):
desc += f" — {l['context']}"
items.append(desc)
lessons_content = "## Lessons Learned\n" + "\n".join(items)
blocks.append({
"type": "memory",
"content": lessons_content,
"tokens": estimate_tokens(lessons_content),
"recency": int(time.time() * 1000)
})
# 4. Sessions & Summaries
all_sessions = kv.list(KV.sessions)
sessions = [
s for s in all_sessions
if s.get("project") == project and s["id"] != session_id
]
sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True)
sessions = sessions[:10]
for s in sessions:
summary = kv.get(KV.summaries, s["id"])
if summary:
content = f"## {summary.get('title', 'Session summary')}\n{summary.get('narrative', '')}\n" \
f"Decisions: {'; '.join(summary.get('keyDecisions', []))}\n" \
f"Files: {', '.join(summary.get('filesModified', []))}"
blocks.append({
"type": "summary",
"content": content,
"tokens": estimate_tokens(content),
"recency": int(time.time() * 1000)
})
else:
# Fallback to important observations
obs_list = kv.list(KV.observations(s["id"]))
important = [o for o in obs_list if o.get("title") and o.get("importance", 0) >= 5]
if important:
important.sort(key=lambda o: o.get("importance", 0), reverse=True)
top = important[:5]
items = [f"- [{o.get('type')}] {o.get('title')}: {o.get('narrative')}" for o in top]
content = f"## Session {s['id'][:8]} ({s.get('startedAt')})\n" + "\n".join(items)
blocks.append({
"type": "observation",
"content": content,
"tokens": estimate_tokens(content),
"recency": int(time.time() * 1000)
})
blocks.sort(key=lambda b: b.get("recency", 0), reverse=True)
header = f''
footer = ""
used_tokens = estimate_tokens(header) + estimate_tokens(footer)
selected = []
for b in blocks:
if used_tokens + b["tokens"] > budget:
continue
selected.append(b["content"])
used_tokens += b["tokens"]
if not selected:
return {"context": "", "blocks": 0, "tokens": 0}
res_context = f"{header}\n" + "\n\n".join(selected) + f"\n{footer}"
return {"context": res_context, "blocks": len(selected), "tokens": used_tokens}
# =====================================================================
# Memory Slots System
# =====================================================================
DEFAULT_SLOTS = [
{
"label": "persona",
"content": "",
"sizeLimit": 1000,
"description": "How the agent should see itself: role, tone, behavioural guidelines.",
"pinned": True,
"readOnly": False,
"scope": "global",
},
{
"label": "user_preferences",
"content": "",
"sizeLimit": 2000,
"description": "Coding style, tool preferences, naming conventions, and other habits the user wants preserved across sessions.",
"pinned": True,
"readOnly": False,
"scope": "global",
},
{
"label": "tool_guidelines",
"content": "",
"sizeLimit": 1500,
"description": "Rules the agent should follow when picking or sequencing tools (e.g. prefer X over Y, never run Z without confirmation).",
"pinned": True,
"readOnly": False,
"scope": "global",
},
{
"label": "project_context",
"content": "",
"sizeLimit": 3000,
"description": "Architecture decisions, codebase conventions, build/test commands, and cross-cutting constraints for the current project.",
"pinned": True,
"readOnly": False,
"scope": "project",
},
{
"label": "guidance",
"content": "",
"sizeLimit": 1500,
"description": "Active advice for the next session: what to focus on, what to avoid, open risks.",
"pinned": True,
"readOnly": False,
"scope": "project",
},
{
"label": "pending_items",
"content": "",
"sizeLimit": 2000,
"description": "Unfinished work, explicit TODOs, and promises made but not yet delivered.",
"pinned": True,
"readOnly": False,
"scope": "project",
},
{
"label": "session_patterns",
"content": "",
"sizeLimit": 1500,
"description": "Recurring behaviours and common struggles observed across recent sessions.",
"pinned": False,
"readOnly": False,
"scope": "project",
},
{
"label": "self_notes",
"content": "",
"sizeLimit": 1500,
"description": "Free-form notes the agent keeps for itself: hypotheses, dead ends, things to revisit.",
"pinned": False,
"readOnly": False,
"scope": "project",
},
]
def seed_defaults(kv: StateKV) -> None:
now = datetime.datetime.utcnow().isoformat() + "Z"
for tmpl in DEFAULT_SLOTS:
scope = tmpl["scope"]
target = KV.globalSlots if scope == "global" else KV.slots
existing = kv.get(target, tmpl["label"])
if existing:
continue
slot = dict(tmpl)
slot["createdAt"] = now
slot["updatedAt"] = now
kv.set(target, tmpl["label"], slot)
def list_pinned_slots(kv: StateKV) -> List[Dict[str, Any]]:
p_slots = kv.list(KV.slots)
g_slots = kv.list(KV.globalSlots)
merged = {}
for s in g_slots:
merged[s["label"]] = s
for s in p_slots:
merged[s["label"]] = s
pinned = [s for s in merged.values() if s.get("pinned") and s.get("content", "").strip()]
pinned.sort(key=lambda s: s["label"])
return pinned
def render_pinned_context(slots: List[Dict[str, Any]]) -> str:
if not slots:
return ""
lines = ["# agentmemory pinned slots", ""]
for s in slots:
lines.append(f"## {s['label']}")
lines.append(s["content"].strip())
lines.append("")
return "\n".join(lines)
def slot_list(kv: StateKV) -> Dict[str, Any]:
p_slots = kv.list(KV.slots)
g_slots = kv.list(KV.globalSlots)
merged = {}
for s in g_slots:
merged[s["label"]] = s
for s in p_slots:
merged[s["label"]] = s
slots = sorted(list(merged.values()), key=lambda s: s["label"])
return {"success": True, "slots": slots}
def slot_get(kv: StateKV, label: str) -> Dict[str, Any]:
project = kv.get(KV.slots, label)
if project:
return {"success": True, "slot": project, "scope": "project"}
global_s = kv.get(KV.globalSlots, label)
if global_s:
return {"success": True, "slot": global_s, "scope": "global"}
return {"success": False, "error": "slot not found"}
def slot_create(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
label = data.get("label")
if not label or not re.match(r'^[a-z][a-z0-9_]*$', label):
return {"success": False, "error": "label required (lowercase, starts with letter, [a-z0-9_])"}
scope = data.get("scope") or "project"
if scope not in ("project", "global"):
return {"success": False, "error": "scope must be 'project' or 'global'"}
limit = data.get("sizeLimit") or 2000
if not isinstance(limit, int) or limit < 1 or limit > 20000:
return {"success": False, "error": "sizeLimit must be an integer between 1 and 20000"}
content = strip_private_data(data.get("content") or "")
if len(content) > limit:
return {"success": False, "error": f"content exceeds sizeLimit ({len(content)} > {limit})"}
description = data.get("description") or ""
pinned = data.get("pinned", True)
target_kv = KV.globalSlots if scope == "global" else KV.slots
existing = kv.get(target_kv, label)
if existing:
return {"success": False, "error": f"slot already exists in {scope} scope"}
now = datetime.datetime.utcnow().isoformat() + "Z"
slot = {
"label": label,
"content": content,
"sizeLimit": limit,
"description": description,
"pinned": pinned,
"readOnly": False,
"scope": scope,
"createdAt": now,
"updatedAt": now,
}
kv.set(target_kv, label, slot)
safe_audit(kv, "slot_create", "mem::slot-create", [label], {"scope": scope, "sizeLimit": limit, "pinned": pinned})
# Commit to Dolt
agent_id = data.get("agentId") or get_agent_id()
commit_if_enabled(kv, f"Create slot: {label}", agent_id)
return {"success": True, "slot": slot}
def slot_append(kv: StateKV, label: str, text: str, agent_id: Optional[str] = None) -> Dict[str, Any]:
res = slot_get(kv, label)
if not res.get("success"):
return {"success": False, "error": "slot not found"}
slot = res["slot"]
scope = res["scope"]
target_kv = KV.globalSlots if scope == "global" else KV.slots
if slot.get("readOnly"):
return {"success": False, "error": "slot is read-only"}
content = slot.get("content") or ""
sep = "\n" if content and not content.endswith("\n") else ""
next_content = content + sep + strip_private_data(text)
limit = slot.get("sizeLimit") or 2000
if len(next_content) > limit:
return {
"success": False,
"error": f"append would exceed sizeLimit ({len(next_content)} > {limit})",
"currentSize": len(content),
"sizeLimit": limit
}
slot["content"] = next_content
slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z"
kv.set(target_kv, label, slot)
safe_audit(kv, "slot_append", "mem::slot-append", [label], {"scope": scope, "added": len(text), "total": len(next_content)})
# Commit to Dolt
commit_if_enabled(kv, f"Append slot: {label}", agent_id or get_agent_id())
return {"success": True, "slot": slot, "size": len(next_content)}
def slot_replace(kv: StateKV, label: str, content: str, agent_id: Optional[str] = None) -> Dict[str, Any]:
res = slot_get(kv, label)
if not res.get("success"):
return {"success": False, "error": "slot not found"}
slot = res["slot"]
scope = res["scope"]
target_kv = KV.globalSlots if scope == "global" else KV.slots
if slot.get("readOnly"):
return {"success": False, "error": "slot is read-only"}
content = strip_private_data(content)
limit = slot.get("sizeLimit") or 2000
if len(content) > limit:
return {
"success": False,
"error": f"content exceeds sizeLimit ({len(content)} > {limit})",
"sizeLimit": limit
}
before_len = len(slot.get("content") or "")
slot["content"] = content
slot["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z"
kv.set(target_kv, label, slot)
safe_audit(kv, "slot_replace", "mem::slot-replace", [label], {"scope": scope, "before": before_len, "after": len(content)})
# Commit to Dolt
commit_if_enabled(kv, f"Replace slot: {label}", agent_id or get_agent_id())
return {"success": True, "slot": slot, "size": len(content)}
def slot_delete(kv: StateKV, label: str, agent_id: Optional[str] = None) -> Dict[str, Any]:
res = slot_get(kv, label)
if not res.get("success"):
return {"success": False, "error": "slot not found"}
slot = res["slot"]
scope = res["scope"]
target_kv = KV.globalSlots if scope == "global" else KV.slots
if slot.get("readOnly"):
return {"success": False, "error": "slot is read-only"}
kv.delete(target_kv, label)
safe_audit(kv, "slot_delete", "mem::slot-delete", [label], {"scope": scope, "size": len(slot.get("content") or "")})
# Commit to Dolt
commit_if_enabled(kv, f"Delete slot: {label}", agent_id or get_agent_id())
return {"success": True}
def slot_reflect(kv: StateKV, session_id: str, max_obs: int = 50) -> Dict[str, Any]:
observations = kv.list(KV.observations(session_id))
if not observations:
return {"success": True, "applied": 0, "reason": "no observations for session"}
recent = sorted(observations, key=lambda x: x.get("timestamp", ""), reverse=True)[:max_obs]
pending_lines = []
pattern_counts = {}
files = set()
for obs in recent:
title = (obs.get("title") or "").lower()
narrative = (obs.get("narrative") or "").lower()
if "todo" in narrative or "todo" in title:
pending_lines.append(f"- {obs.get('title') or obs['id']}")
if obs.get("type") == "error":
pattern_counts["errors"] = pattern_counts.get("errors", 0) + 1
if obs.get("type") == "command_run":
pattern_counts["commands"] = pattern_counts.get("commands", 0) + 1
for f in obs.get("files") or []:
files.add(f)
applied = 0
now = datetime.datetime.utcnow().isoformat() + "Z"
if pending_lines:
res = slot_get(kv, "pending_items")
if res.get("success"):
slot = res["slot"]
scope = res["scope"]
target_kv = scopeKv = KV.globalSlots if scope == "global" else KV.slots
already = set((slot.get("content") or "").split("\n"))
fresh = [l for l in pending_lines if l not in already]
if fresh:
sep = "\n" if slot.get("content") and not slot["content"].endswith("\n") else ""
next_content = (slot.get("content") or "") + sep + "\n".join(fresh)
limit = slot.get("sizeLimit") or 2000
if len(next_content) > limit:
next_content = next_content[-limit:]
slot["content"] = next_content
slot["updatedAt"] = now
kv.set(target_kv, "pending_items", slot)
applied += 1
if pattern_counts:
res = slot_get(kv, "session_patterns")
if res.get("success"):
slot = res["slot"]
scope = res["scope"]
target_kv = KV.globalSlots if scope == "global" else KV.slots
summary = [f"last reflection: {now}"]
for k, v in pattern_counts.items():
summary.append(f"- {k}: {v} in last {len(recent)} observations")
next_content = "\n".join(summary)
limit = slot.get("sizeLimit") or 2000
if len(next_content) > limit:
next_content = next_content[:limit]
slot["content"] = next_content
slot["updatedAt"] = now
kv.set(target_kv, "session_patterns", slot)
applied += 1
if files:
res = slot_get(kv, "project_context")
if res.get("success"):
slot = res["slot"]
scope = res["scope"]
target_kv = KV.globalSlots if scope == "global" else KV.slots
already = slot.get("content") or ""
fresh = [f for f in files if f not in already][:20]
if fresh:
header_line = "Files touched in recent sessions:" if not already else ""
sep = "\n" if already and not already.endswith("\n") else ""
lines = [already]
if header_line:
lines.append(header_line)
for f in fresh:
lines.append(f"- {f}")
next_content = sep.join([l for l in lines if l])
limit = slot.get("sizeLimit") or 2000
if len(next_content) > limit:
next_content = next_content[-limit:]
slot["content"] = next_content
slot["updatedAt"] = now
kv.set(target_kv, "project_context", slot)
applied += 1
if applied > 0:
safe_audit(kv, "slot_reflect", "mem::slot-reflect", [session_id], {"observationCount": len(recent), "slotsUpdated": applied})
commit_if_enabled(kv, f"Slot reflect: updated {applied} slots in session {session_id[:8]}", "system")
return {"success": True, "applied": applied, "observationsReviewed": len(recent)}
# =====================================================================
# Lessons Learned System
# =====================================================================
def reinforce_lesson(lesson: Dict[str, Any]) -> None:
now = datetime.datetime.utcnow().isoformat() + "Z"
lesson["reinforcements"] = lesson.get("reinforcements", 0) + 1
conf = lesson.get("confidence", 0.5)
lesson["confidence"] = min(1.0, conf + 0.1 * (1 - conf))
lesson["lastReinforcedAt"] = now
lesson["updatedAt"] = now
def lesson_save(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
content = data.get("content")
if not content or not content.strip():
return {"success": False, "error": "content is required"}
content = strip_private_data(content)
context_str = strip_private_data(data.get("context") or "")
agent_id = data.get("agentId") or get_agent_id()
fp = fingerprint_id("lsn", content)
existing = kv.get(KV.lessons, fp)
if existing and not existing.get("deleted"):
reinforce_lesson(existing)
if context_str and not existing.get("context"):
existing["context"] = context_str
kv.set(KV.lessons, existing["id"], existing)
safe_audit(kv, "lesson_strengthen", "mem::lesson-save", [existing["id"]])
# Commit to Dolt
commit_if_enabled(kv, f"Strengthen lesson: {existing.get('content', '')[:60]}", agent_id)
return {"success": True, "action": "strengthened", "lesson": existing}
confidence = data.get("confidence")
if not isinstance(confidence, (int, float)) or confidence < 0 or confidence > 1:
confidence = 0.5
now = datetime.datetime.utcnow().isoformat() + "Z"
lesson = {
"id": fp,
"content": content.strip(),
"context": context_str.strip(),
"confidence": confidence,
"reinforcements": 0,
"source": data.get("source") or "manual",
"sourceIds": data.get("sourceIds") or [],
"project": data.get("project"),
"tags": data.get("tags") or [],
"createdAt": now,
"updatedAt": now,
"decayRate": 0.05,
}
kv.set(KV.lessons, lesson["id"], lesson)
safe_audit(kv, "lesson_save", "mem::lesson-save", [lesson["id"]])
# Commit to Dolt
commit_if_enabled(kv, f"Create lesson: {lesson['content'][:60]}", agent_id)
return {"success": True, "action": "created", "lesson": lesson}
def lesson_list(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
limit = data.get("limit") or 50
min_confidence = data.get("minConfidence") or 0.0
all_lessons = kv.list(KV.lessons)
lessons = [
l for l in all_lessons
if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence
]
project = data.get("project")
if project:
lessons = [l for l in lessons if l.get("project") == project]
source = data.get("source")
if source:
lessons = [l for l in lessons if l.get("source") == source]
lessons.sort(key=lambda x: x.get("confidence", 0.5), reverse=True)
return {"success": True, "lessons": lessons[:limit]}
def lesson_recall(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
query = data.get("query")
if not query or not query.strip():
return {"success": False, "error": "query is required"}
query_lower = query.lower()
min_confidence = data.get("minConfidence") or 0.1
limit = data.get("limit") or 10
all_lessons = kv.list(KV.lessons)
lessons = [
l for l in all_lessons
if not l.get("deleted") and l.get("confidence", 0.5) >= min_confidence
]
project = data.get("project")
if project:
lessons = [l for l in lessons if l.get("project") == project]
scored = []
terms = [t for t in query_lower.split() if len(t) > 1]
for l in lessons:
text = f"{l.get('content', '')} {l.get('context', '')} {' '.join(l.get('tags') or [])}".lower()
match_count = sum(1 for t in terms if t in text)
if match_count == 0:
continue
relevance = match_count / len(terms)
baseline = l.get("lastReinforcedAt") or l.get("createdAt")
import dateutil.parser
dt = dateutil.parser.parse(baseline)
days = (datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24)
recency_boost = 1 / (1 + days * 0.01)
score = l.get("confidence", 0.5) * relevance * recency_boost
scored.append({"lesson": l, "score": score})
scored.sort(key=lambda x: x["score"], reverse=True)
results = []
for s in scored[:limit]:
item = dict(s["lesson"])
item["score"] = round(s["score"], 3)
results.append(item)
safe_audit(kv, "lesson_recall", "mem::lesson-recall", [], {"query": query, "resultCount": len(results)})
return {"success": True, "lessons": results}
def lesson_strengthen(kv: StateKV, lesson_id: str) -> Dict[str, Any]:
lesson = kv.get(KV.lessons, lesson_id)
if not lesson or lesson.get("deleted"):
return {"success": False, "error": "lesson not found"}
reinforce_lesson(lesson)
kv.set(KV.lessons, lesson["id"], lesson)
safe_audit(kv, "lesson_strengthen", "mem::lesson-strengthen", [lesson["id"]])
# Commit to Dolt
commit_if_enabled(kv, f"Strengthen lesson: {lesson.get('content', '')[:60]}", get_agent_id())
return {"success": True, "lesson": lesson}
def lesson_decay_sweep(kv: StateKV) -> Dict[str, Any]:
all_lessons = kv.list(KV.lessons)
decayed = 0
soft_deleted = 0
now = datetime.datetime.utcnow()
timestamp = now.isoformat() + "Z"
for l in all_lessons:
if l.get("deleted"):
continue
baseline_str = l.get("lastDecayedAt") or l.get("lastReinforcedAt") or l["createdAt"]
import dateutil.parser
dt = dateutil.parser.parse(baseline_str)
weeks = (now.replace(tzinfo=datetime.timezone.utc) - dt.replace(tzinfo=datetime.timezone.utc)).total_seconds() / (3600 * 24 * 7)
if weeks < 1.0:
continue
decay = l.get("decayRate", 0.05) * weeks
new_conf = max(0.05, l.get("confidence", 0.5) - decay)
if new_conf != l.get("confidence"):
before = l.get("confidence", 0.5)
l["confidence"] = round(new_conf, 3)
l["lastDecayedAt"] = timestamp
l["updatedAt"] = timestamp
if l["confidence"] <= 0.1 and l.get("reinforcements", 0) == 0:
l["deleted"] = True
soft_deleted += 1
else:
decayed += 1
kv.set(KV.lessons, l["id"], l)
safe_audit(kv, "lesson_strengthen", "mem::lesson-decay-sweep", [l["id"]], {
"action": "soft-delete" if l.get("deleted") else "decay",
"actor": "system",
"reason": "decay-sweep",
"before": {"confidence": before, "deleted": False},
"after": {"confidence": l["confidence"], "deleted": bool(l.get("deleted"))}
})
if decayed > 0 or soft_deleted > 0:
commit_if_enabled(kv, f"Lesson decay sweep: decayed {decayed}, soft-deleted {soft_deleted}", "system")
return {"success": True, "decayed": decayed, "softDeleted": soft_deleted, "total": len(all_lessons)}
# =====================================================================
# Database Rebuilder (Index Bootstrapper)
# =====================================================================
def rebuild_index(kv: StateKV) -> int:
_bm25_index.clear()
if _vector_index:
_vector_index.clear()
# Backfill BM25 with observations
sessions = kv.list(KV.sessions)
total_indexed = 0
for sess in sessions:
sid = sess.get("id")
if not sid:
continue
obs_list = kv.list(KV.observations(sid))
for obs in obs_list:
# Only index compressed (non-raw) observations
if obs.get("title") and obs.get("narrative"):
_bm25_index.add(obs)
comb_text = obs["title"] + " " + obs["narrative"]
vector_index_add_guarded(obs["id"], sid, comb_text, {"kind": "observation", "logId": obs["id"]})
total_indexed += 1
# Backfill BM25 with memories
memories = kv.list(KV.memories)
for mem in memories:
if mem.get("isLatest") is False:
continue
if not mem.get("title") or not mem.get("content"):
continue
converted = memory_to_observation(mem)
_bm25_index.add(converted)
comb_text = mem["title"] + " " + mem["content"]
vector_index_add_guarded(mem["id"], "memory", comb_text, {"kind": "memory", "logId": mem["id"]})
total_indexed += 1
if _index_persistence and total_indexed > 0:
_index_persistence.schedule_save()
return total_indexed
# =====================================================================
# Advanced Function Stubs / CRUD Operations
# =====================================================================
def list_sessions(kv: StateKV) -> List[Dict[str, Any]]:
sessions = kv.list(KV.sessions)
for s in sessions:
sid = s.get("id")
if sid:
summary = kv.get(KV.summaries, sid)
if summary:
s["title"] = summary.get("title")
s["summary"] = summary.get("narrative")
sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True)
return sessions
def get_session(kv: StateKV, session_id: str) -> Optional[Dict[str, Any]]:
s = kv.get(KV.sessions, session_id)
if s:
summary = kv.get(KV.summaries, session_id)
if summary:
s["title"] = summary.get("title")
s["summary"] = summary.get("narrative")
return s
def create_session(kv: StateKV, session: Dict[str, Any]) -> Dict[str, Any]:
auto_complete_old_active_sessions(kv, session["id"])
kv.set(KV.sessions, session["id"], session)
return session
def end_session(kv: StateKV, session_id: str) -> bool:
now = datetime.datetime.utcnow().isoformat() + "Z"
kv.update(KV.sessions, session_id, [
{"type": "set", "path": "endedAt", "value": now},
{"type": "set", "path": "status", "value": "completed"}
])
return True
def timeline(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
# Simple timeline query returning observations sorted by timestamp
anchor = data.get("anchor")
project = data.get("project")
session_id = data.get("sessionId")
before = data.get("before") or 10
after = data.get("after") or 10
sessions = kv.list(KV.sessions)
if session_id:
sessions = [s for s in sessions if s.get("id") == session_id]
elif project:
sessions = [s for s in sessions if s.get("project") == project]
all_obs = []
for s in sessions:
all_obs.extend(kv.list(KV.observations(s["id"])))
# sort by timestamp
all_obs.sort(key=lambda x: x.get("timestamp", ""))
anchor_idx = -1
for idx, obs in enumerate(all_obs):
if obs["id"] == anchor or obs.get("timestamp", "") >= (anchor or ""):
anchor_idx = idx
break
if anchor_idx == -1:
anchor_idx = len(all_obs) // 2
start = max(0, anchor_idx - before)
end = min(len(all_obs), anchor_idx + after + 1)
return {
"success": True,
"observations": all_obs[start:end],
"anchorIndex": anchor_idx - start
}
def get_project_profile(kv: StateKV, project: str) -> Dict[str, Any]:
prof = kv.get(KV.profiles, project)
if not prof:
prof = {
"project": project,
"topConcepts": [],
"topFiles": [],
"conventions": [],
"commonErrors": [],
"updatedAt": datetime.datetime.utcnow().isoformat() + "Z"
}
if not prof.get("topConcepts") and not prof.get("topFiles"):
prof = build_project_profile(kv, project)
return prof
def build_project_profile(kv: StateKV, project: str) -> Dict[str, Any]:
prof = kv.get(KV.profiles, project)
if not prof:
prof = {
"project": project,
"topConcepts": [],
"topFiles": [],
"conventions": [],
"commonErrors": [],
"updatedAt": datetime.datetime.utcnow().isoformat() + "Z"
}
# Stored profile may lack topConcepts/topFiles — compute from observations + memories if empty
if not prof.get("topConcepts") and not prof.get("topFiles"):
import re as _re, json as _j, os.path as _osp
from collections import Counter
sessions = kv.list(KV.sessions)
project_sessions = [s for s in sessions if s.get("project") == project]
concept_counts = Counter()
file_counts = Counter()
def _harvest_file(path, fc, cc):
if not isinstance(path, str) or not path:
return
fc[path] += 1
parts = _re.split(r'[\\/]', path)
fname = parts[-1] if parts else ""
skip = {"tmp", "temp", "claude", "appdata", "local", "users", "windows"}
for part in parts[:-1]:
p = part.lower().strip()
if p and len(p) > 2 and p not in skip and not _re.match(r'^[a-z]:|^\.|^--', p):
cc[p] += 1
stem = _osp.splitext(fname)[0]
if stem and len(stem) > 2:
cc[stem.lower()] += 1
ext = _osp.splitext(fname)[1].lstrip(".")
if ext in ("py", "ts", "js", "jsx", "tsx", "go", "rs", "java", "cs", "cpp"):
cc[ext] += 1
for s in project_sessions:
sid = s.get("id", "")
if not sid:
continue
for o in kv.list(KV.observations(sid)):
for c in (o.get("concepts") or []):
if isinstance(c, str) and c:
concept_counts[c] += 1
for f in (o.get("files") or []):
_harvest_file(f, file_counts, concept_counts)
tn = o.get("toolName")
if tn:
concept_counts[tn] += 1
ti = o.get("toolInput")
if isinstance(ti, str):
try: ti = _j.loads(ti)
except Exception: ti = {}
if isinstance(ti, dict):
for fk in ("path", "file_path", "file", "filename"):
_harvest_file(ti.get(fk, ""), file_counts, concept_counts)
narr = o.get("narrative") or o.get("raw") or ""
if isinstance(narr, str) and narr.startswith("{"):
try:
nd = _j.loads(narr)
if isinstance(nd, dict):
tn2 = nd.get("toolName") or nd.get("tool_name")
if tn2: concept_counts[tn2] += 1
for fk in ("path", "file_path", "file", "filename"):
_harvest_file(nd.get(fk, ""), file_counts, concept_counts)
except Exception:
pass
# memories for this project
for m in kv.list(KV.memories):
if m.get("project") == project:
for c in (m.get("concepts") or []):
if c: concept_counts[c] += 1
for f in (m.get("files") or []):
_harvest_file(f, file_counts, concept_counts)
prof["topConcepts"] = [{"concept": c, "frequency": n} for c, n in concept_counts.most_common(20)]
prof["topFiles"] = [{"file": f, "frequency": n} for f, n in file_counts.most_common(20)]
prof["sessionCount"] = len(project_sessions)
return prof
def export_data(kv: StateKV, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if data is None:
data = {}
raw_max = data.get("maxSessions")
max_sessions = None
if raw_max is not None:
try:
max_sessions = min(max(int(raw_max), 1), 1000)
except Exception:
pass
raw_offset = data.get("offset")
offset = 0
if raw_offset is not None:
try:
offset = max(int(raw_offset), 0)
except Exception:
pass
all_sessions = kv.list(KV.sessions)
all_sessions.sort(key=lambda s: s.get("startedAt", ""), reverse=True)
if max_sessions is not None:
paginated_sessions = all_sessions[offset:offset + max_sessions]
else:
paginated_sessions = all_sessions
memories = kv.list(KV.memories)
summaries = kv.list(KV.summaries)
observations = {}
for s in paginated_sessions:
sid = s.get("id")
if sid:
obs = kv.list(KV.observations(sid))
if obs:
observations[sid] = obs
profiles = []
unique_projects = list(set(s.get("project") for s in paginated_sessions if s.get("project")))
for proj in unique_projects:
p = kv.get(KV.profiles, proj)
if p:
profiles.append(p)
graph_nodes = kv.list(KV.graphNodes)
graph_edges = kv.list(KV.graphEdges)
semantic_memories = kv.list(KV.semantic)
procedural_memories = kv.list(KV.procedural)
actions = kv.list(KV.actions)
action_edges = kv.list(KV.actionEdges)
sentinels = kv.list(KV.sentinels)
sketches = kv.list(KV.sketches)
crystals = kv.list(KV.crystals)
facets = kv.list(KV.facets)
lessons = kv.list(KV.lessons)
insights = kv.list(KV.insights)
routines = kv.list(KV.routines)
signals = kv.list(KV.signals)
checkpoints = kv.list(KV.checkpoints)
access_logs = kv.list(KV.accessLog)
res = {
"version": "0.9.21",
"exportedAt": datetime.datetime.utcnow().isoformat() + "Z",
"sessions": paginated_sessions,
"observations": observations,
"memories": memories,
"summaries": summaries
}
if profiles: res["profiles"] = profiles
if graph_nodes: res["graphNodes"] = graph_nodes
if graph_edges: res["graphEdges"] = graph_edges
if semantic_memories: res["semanticMemories"] = semantic_memories
if procedural_memories: res["proceduralMemories"] = procedural_memories
if actions: res["actions"] = actions
if action_edges: res["actionEdges"] = action_edges
if sentinels: res["sentinels"] = sentinels
if sketches: res["sketches"] = sketches
if crystals: res["crystals"] = crystals
if facets: res["facets"] = facets
if lessons: res["lessons"] = lessons
if insights: res["insights"] = insights
if routines: res["routines"] = routines
if signals: res["signals"] = signals
if checkpoints: res["checkpoints"] = checkpoints
if access_logs: res["accessLogs"] = access_logs
if max_sessions is not None:
res["pagination"] = {
"offset": offset,
"limit": max_sessions,
"total": len(all_sessions),
"hasMore": offset + max_sessions < len(all_sessions)
}
return res
def set_project_profile(kv: StateKV, project: str, profile: Dict[str, Any]) -> Dict[str, Any]:
profile["updatedAt"] = datetime.datetime.utcnow().isoformat() + "Z"
kv.set(KV.profiles, project, profile)
# Commit to Dolt
commit_if_enabled(kv, f"Set project profile for {project}", get_agent_id())
return profile
def get_relations(kv: StateKV) -> List[Dict[str, Any]]:
return kv.list(KV.relations)
def add_relation(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
rel = {
"id": generate_id("rel"),
"sourceId": data["sourceId"],
"targetId": data["targetId"],
"type": data["type"],
"createdAt": datetime.datetime.utcnow().isoformat() + "Z"
}
kv.set(KV.relations, rel["id"], rel)
# Commit to Dolt
agent_id = data.get("agentId") or get_agent_id()
commit_if_enabled(kv, f"Add relation {rel['type']} between {rel['sourceId']} and {rel['targetId']}", agent_id)
return rel
def evolve_memory(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
# Update memory content and create a new version
mem_id = data["memoryId"]
new_content = data["newContent"]
new_title = data.get("newTitle")
existing = kv.get(KV.memories, mem_id)
if not existing:
raise ValueError("Memory not found")
existing["isLatest"] = False
kv.set(KV.memories, existing["id"], existing)
now = datetime.datetime.utcnow().isoformat() + "Z"
new_mem = dict(existing)
new_mem["id"] = generate_id("mem")
new_mem["content"] = new_content
if new_title:
new_mem["title"] = new_title
else:
new_mem["title"] = new_content[:80]
new_mem["version"] = existing.get("version", 1) + 1
new_mem["parentId"] = existing["id"]
new_mem["supersedes"] = [existing["id"]]
new_mem["createdAt"] = now
new_mem["updatedAt"] = now
new_mem["isLatest"] = True
kv.set(KV.memories, new_mem["id"], new_mem)
# Re-index
try:
_bm25_index.add(memory_to_observation(new_mem))
_bm25_index.remove(existing["id"])
except Exception:
pass
comb_text = new_mem["title"] + " " + new_mem["content"]
vector_index_add_guarded(new_mem["id"], "memory", comb_text, {"kind": "memory", "logId": new_mem["id"]})
if _vector_index:
_vector_index.remove(existing["id"])
if _index_persistence:
_index_persistence.schedule_save()
# Commit to Dolt
agent_id = data.get("agentId") or get_agent_id() or new_mem.get("agentId")
commit_if_enabled(kv, f"Evolve memory {new_mem['id']} (v{new_mem['version']}): {new_mem['title']}", agent_id)
return {"success": True, "memory": new_mem}
def auto_forget(kv: StateKV, dry_run: bool = False) -> Dict[str, Any]:
now_dt = datetime.datetime.utcnow()
now_str = now_dt.isoformat() + "Z"
evicted_memories = []
evicted_observations = []
# 1. Evict expired memories
memories = kv.list(KV.memories)
for mem in memories:
forget_after = mem.get("forgetAfter")
if forget_after:
try:
import dateutil.parser
fa_dt = dateutil.parser.parse(forget_after)
if fa_dt.tzinfo:
fa_dt = fa_dt.replace(tzinfo=None)
if fa_dt < now_dt:
evicted_memories.append(mem["id"])
except Exception as e:
print(f"[auto_forget] Failed to parse forgetAfter '{forget_after}': {e}")
# 2. Evict low-value old observations (importance <= 2, age > 180 days)
sessions = kv.list(KV.sessions)
for sess in sessions:
sid = sess.get("id")
if not sid:
continue
obs_list = kv.list(KV.observations(sid))
for obs in obs_list:
importance = obs.get("importance")
ts = obs.get("timestamp")
if importance is not None and ts:
try:
import dateutil.parser
ts_dt = dateutil.parser.parse(ts)
if ts_dt.tzinfo:
ts_dt = ts_dt.replace(tzinfo=None)
age_days = (now_dt - ts_dt).days
if importance <= 2 and age_days > 180:
evicted_observations.append((sid, obs["id"]))
except Exception as e:
print(f"[auto_forget] Failed to parse timestamp '{ts}': {e}")
if not dry_run:
for mem_id in evicted_memories:
mem = kv.get(KV.memories, mem_id)
kv.delete(KV.memories, mem_id)
if mem and mem.get("imageRef"):
ref = mem["imageRef"]
refs = kv.get(KV.imageRefs, ref) or 0
if refs > 0:
kv.set(KV.imageRefs, ref, refs - 1)
_bm25_index.remove(mem_id)
if _vector_index:
_vector_index.remove(mem_id)
for sid, obs_id in evicted_observations:
obs = kv.get(KV.observations(sid), obs_id)
kv.delete(KV.observations(sid), obs_id)
if obs:
img = obs.get("imageData") or obs.get("imageRef")
if img:
refs = kv.get(KV.imageRefs, img) or 0
if refs > 0:
kv.set(KV.imageRefs, img, refs - 1)
_bm25_index.remove(obs_id)
if _vector_index:
_vector_index.remove(obs_id)
if evicted_memories or evicted_observations:
if _index_persistence:
_index_persistence.schedule_save()
safe_audit(
kv,
"auto_forget",
"mem::auto_forget",
evicted_memories + [oid for _, oid in evicted_observations],
{
"evictedMemoriesCount": len(evicted_memories),
"evictedObservationsCount": len(evicted_observations),
"dryRun": False
}
)
commit_if_enabled(kv, f"Auto forget: evicted {len(evicted_memories)} memories, {len(evicted_observations)} observations", "system")
return {
"success": True,
"evictedMemories": evicted_memories,
"evictedObservations": [oid for _, oid in evicted_observations],
"evicted": len(evicted_memories) + len(evicted_observations),
"dryRun": dry_run
}
def health_check(kv: StateKV) -> Dict[str, Any]:
db_status = "connected"
try:
conn = kv._get_conn()
conn.close()
except Exception:
db_status = "disconnected"
return {
"status": "healthy" if db_status == "connected" else "degraded",
"service": "agentmemory",
"version": "0.9.8",
"database": "dolt",
"databaseStatus": db_status
}
def strip_xml_wrappers(raw: str) -> str:
if not raw:
return ""
cleaned = raw.strip()
cleaned = re.sub(r'```xml\s*\n?', '', cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r'```', '', cleaned)
cleaned = cleaned.strip()
root_match = re.search(r'(<[a-zA-Z_][a-zA-Z0-9_-]*>[\s\S]*<\/[a-zA-Z_][a-zA-Z0-9_-]*>)', cleaned)
if root_match:
return root_match.group(1).strip()
return cleaned
def get_xml_tag(text: str, tag: str) -> Optional[str]:
cleaned = strip_xml_wrappers(text)
pattern = rf"<{tag}>(.*?){tag}>"
match = re.search(pattern, cleaned, re.DOTALL)
return match.group(1).strip() if match else None
def get_xml_children(text: str, parent_tag: str, child_tag: str) -> List[str]:
parent_content = get_xml_tag(text, parent_tag)
if not parent_content:
return []
pattern = rf"<{child_tag}>(.*?){child_tag}>"
return [m.strip() for m in re.findall(pattern, parent_content, re.DOTALL)]
def generate_content(system_instruction: str, prompt: str) -> str:
api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
if not api_key:
raise ValueError("No Gemini/Google API key found")
model = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}"
payload = {
"contents": [
{
"role": "user",
"parts": [
{"text": prompt}
]
}
],
"systemInstruction": {
"parts": [
{"text": system_instruction}
]
},
"generationConfig": {
"temperature": 0.2
}
}
req_data = json.dumps(payload).encode("utf-8")
import urllib.request
req = urllib.request.Request(
url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60.0) as response:
resp_data = json.loads(response.read().decode("utf-8"))
candidates = resp_data.get("candidates", [])
if not candidates:
raise RuntimeError("Gemini generateContent returned no candidates")
parts = candidates[0].get("content", {}).get("parts", [])
if not parts:
raise RuntimeError("Gemini generateContent candidate content had no parts")
return parts[0].get("text", "")
except Exception as e:
raise RuntimeError(f"Gemini generateContent call failed: {e}")
def summarize(kv: StateKV, data: Dict[str, Any]) -> Dict[str, Any]:
session_id = data.get("sessionId")
if not session_id:
return {"success": False, "error": "sessionId is required"}
session = kv.get(KV.sessions, session_id)
if not session:
return {"success": False, "error": "session_not_found"}
observations = kv.list(KV.observations(session_id))
compressed = [o for o in observations if o.get("title")]
if not compressed:
return {"success": False, "error": "no_observations"}
SUMMARY_SYSTEM = """You are a session summarization assistant. Your job is to read all raw tool executions and outcomes from a coding session and produce a high-fidelity summary.
Output XML:
Concise title summarizing the session
1-2 paragraphs of narrative describing what was done, what succeeded, and what failed
Architectural decision, key insight, or choice made
path/to/modified/file
important concept, library, tool, or command used
"""
chunk_size = 400
chunks = [compressed[i:i + chunk_size] for i in range(0, len(compressed), chunk_size)]
partial_summaries = []
for idx, chunk in enumerate(chunks):
obs_text = ""
for o in chunk:
obs_text += f"[{o.get('type')}] {o.get('title')}\n{o.get('narrative') or ''}\nFiles: {', '.join(o.get('files') or [])}\n\n"
prompt = f"Summarize this chunk {idx+1}/{len(chunks)} of observations:\n\n{obs_text}"
try:
response = generate_content(SUMMARY_SYSTEM, prompt)
cleaned = strip_xml_wrappers(response)
title = get_xml_tag(cleaned, "title")
if not title:
continue
partial_summaries.append({
"title": title,
"narrative": get_xml_tag(cleaned, "narrative") or "",
"keyDecisions": get_xml_children(cleaned, "decisions", "decision"),
"filesModified": get_xml_children(cleaned, "files", "file"),
"concepts": get_xml_children(cleaned, "concepts", "concept"),
})
except Exception as e:
last_error = str(e)
print(f"[summarize] Chunk {idx+1} failed: {e}")
if not partial_summaries:
return {"success": False, "error": f"No chunks summarized successfully. Last error: {last_error}"}
if len(partial_summaries) == 1:
final_summary = {
"sessionId": session_id,
"project": session.get("project"),
"createdAt": datetime.datetime.utcnow().isoformat() + "Z",
"title": partial_summaries[0]["title"],
"narrative": partial_summaries[0]["narrative"],
"keyDecisions": partial_summaries[0]["keyDecisions"],
"filesModified": partial_summaries[0]["filesModified"],
"concepts": partial_summaries[0]["concepts"],
"observationCount": len(compressed)
}
else:
REDUCE_SYSTEM = """You are a session summarization reducer. Reduce multiple partial chunk summaries into a single final summary.
Output XML:
Concise final title summarizing the entire session
Comprehensive narrative describing what was done, what succeeded, and what failed
Architectural decision, key insight, or choice made
path/to/modified/file
important concept, library, tool, or command used
"""
reduce_prompt = "Reduce these partial summaries:\n\n"
for idx, ps in enumerate(partial_summaries):
reduce_prompt += f"[Chunk {idx+1}]\nTitle: {ps['title']}\nNarrative: {ps['narrative']}\nDecisions: {', '.join(ps['keyDecisions'])}\nFiles: {', '.join(ps['filesModified'])}\nConcepts: {', '.join(ps['concepts'])}\n\n"
try:
response = generate_content(REDUCE_SYSTEM, reduce_prompt)
cleaned = strip_xml_wrappers(response)
final_summary = {
"sessionId": session_id,
"project": session.get("project"),
"createdAt": datetime.datetime.utcnow().isoformat() + "Z",
"title": get_xml_tag(cleaned, "title") or partial_summaries[0]["title"],
"narrative": get_xml_tag(cleaned, "narrative") or "",
"keyDecisions": get_xml_children(cleaned, "decisions", "decision"),
"filesModified": get_xml_children(cleaned, "files", "file"),
"concepts": get_xml_children(cleaned, "concepts", "concept"),
"observationCount": len(compressed)
}
except Exception as e:
return {"success": False, "error": f"Reduction failed: {e}"}
kv.set(KV.summaries, session_id, final_summary)
session = kv.get(KV.sessions, session_id)
if session:
session["title"] = final_summary["title"]
session["summary"] = final_summary["narrative"]
kv.set(KV.sessions, session_id, session)
safe_audit(kv, "compress", "mem::summarize", [session_id], {
"title": final_summary["title"],
"observationCount": len(compressed)
})
return {"success": True, "summary": final_summary}
def consolidate(kv: StateKV, project: Optional[str] = None, min_observations: int = 10) -> Dict[str, Any]:
sessions = list_sessions(kv)
if project:
sessions = [s for s in sessions if s.get("project") == project]
all_obs = []
for s in sessions:
obs_list = kv.list(KV.observations(s["id"]))
for o in obs_list:
if o.get("title") and o.get("importance", 5) >= 5:
all_obs.append((o, s["id"]))
if len(all_obs) < min_observations:
return {"consolidated": 0, "reason": "insufficient_observations", "success": True}
# Group observations by concepts
concept_groups = {}
for obs, sid in all_obs:
concepts = obs.get("concepts") or []
for c in concepts:
key = c.lower().strip()
if not key:
continue
if key not in concept_groups:
concept_groups[key] = []
concept_groups[key].append((obs, sid))
# Sort groups that have >= 3 observations by size descending
sorted_groups = sorted(
[(k, g) for k, g in concept_groups.items() if len(g) >= 3],
key=lambda x: len(x[1]),
reverse=True
)
consolidated_count = 0
existing_memories = kv.list(KV.memories)
MAX_LLM_CALLS = 10
llm_calls = 0
# Prompt templates
CONSOLIDATION_SYSTEM = """You are a memory consolidation engine. Given a set of related observations from coding sessions, synthesize them into a single long-term memory.
Output XML:
pattern|preference|architecture|bug|workflow|fact
Concise memory title (max 80 chars)
2-4 sentence description of the learned insight
key term
relevant/file/path
1-10 how confident/important this memory is
"""
for concept, obs_group in sorted_groups:
if llm_calls >= MAX_LLM_CALLS:
break
# Get top 8 by importance
top = sorted(obs_group, key=lambda x: x[0].get("importance", 5), reverse=True)[:8]
session_ids = list(set([x[1] for x in top]))
obs_ids = list(set([x[0]["id"] for x in top]))
prompt_parts = []
for obs, sid in top:
prompt_parts.append(f"[{obs.get('type')}] {obs.get('title')}\n{obs.get('narrative') or ''}\nFiles: {', '.join(obs.get('files') or [])}\nImportance: {obs.get('importance', 5)}")
obs_prompt = "\n\n".join(prompt_parts)
try:
response = generate_content(CONSOLIDATION_SYSTEM, f"Concept: \"{concept}\"\n\nObservations:\n{obs_prompt}")
llm_calls += 1
cleaned = strip_xml_wrappers(response)
m_type = get_xml_tag(cleaned, "type") or "fact"
m_title = get_xml_tag(cleaned, "title")
m_content = get_xml_tag(cleaned, "content")
if not m_title or not m_content:
continue
m_strength_str = get_xml_tag(cleaned, "strength") or "5"
try:
m_strength = max(1, min(10, int(m_strength_str)))
except Exception:
m_strength = 5
concepts_list = get_xml_children(cleaned, "concepts", "concept")
files_list = get_xml_children(cleaned, "files", "file")
now = datetime.datetime.utcnow().isoformat() + "Z"
# Find existing memory with same title
existing_match = None
for mem in existing_memories:
if mem.get("title", "").lower() == m_title.lower() and mem.get("isLatest") is not False:
if not project or not mem.get("project") or mem.get("project") == project:
existing_match = mem
break
if existing_match:
existing_match["isLatest"] = False
kv.set(KV.memories, existing_match["id"], existing_match)
evolved = {
"id": generate_id("mem"),
"createdAt": now,
"updatedAt": now,
"type": m_type,
"title": m_title,
"content": m_content,
"concepts": concepts_list,
"files": files_list,
"sessionIds": session_ids,
"strength": m_strength,
"version": (existing_match.get("version") or 1) + 1,
"parentId": existing_match["id"],
"supersedes": [existing_match["id"]] + (existing_match.get("supersedes") or []),
"sourceObservationIds": obs_ids,
"isLatest": True
}
if project:
evolved["project"] = project
kv.set(KV.memories, evolved["id"], evolved)
consolidated_count += 1
else:
memory = {
"id": generate_id("mem"),
"createdAt": now,
"updatedAt": now,
"type": m_type,
"title": m_title,
"content": m_content,
"concepts": concepts_list,
"files": files_list,
"sessionIds": session_ids,
"strength": m_strength,
"version": 1,
"sourceObservationIds": obs_ids,
"isLatest": True
}
if project:
memory["project"] = project
kv.set(KV.memories, memory["id"], memory)
consolidated_count += 1
except Exception as e:
print(f"[consolidate] Concept '{concept}' failed: {e}")
# === Semantic Memory Fact Merger ===
summaries = kv.list(KV.summaries)
new_facts_count = 0
if len(summaries) >= 5:
recent_summaries = sorted(
summaries,
key=lambda s: s.get("createdAt", ""),
reverse=True
)[:20]
SEMANTIC_MERGE_SYSTEM = """You are a memory consolidation engine. Given overlapping episodic memories (session summaries), extract stable factual knowledge.
Output format (XML):
Concise factual statement
Rules:
- Extract only facts that appear in 2+ episodes or are highly confident
- Confidence reflects how well-supported the fact is across episodes
- Combine overlapping information into single concise facts
- Skip ephemeral details (specific error messages, temporary states)"""
prompt_parts = []
for i, s in enumerate(recent_summaries):
prompt_parts.append(f"[Episode {i + 1}]\nTitle: {s.get('title')}\nNarrative: {s.get('narrative') or ''}\nConcepts: {', '.join(s.get('concepts') or [])}")
merge_prompt = "Consolidate these episodic memories into stable facts:\n\n" + "\n\n".join(prompt_parts)
try:
response = generate_content(SEMANTIC_MERGE_SYSTEM, merge_prompt)
fact_matches = re.findall(r'([^<]+)', response, re.DOTALL)
existing_semantic = kv.list(KV.semantic)
now = datetime.datetime.utcnow().isoformat() + "Z"
for conf_str, fact_text in fact_matches:
fact_text = fact_text.strip()
try:
confidence = float(conf_str)
except Exception:
confidence = 0.5
existing = None
for es in existing_semantic:
if es.get("fact", "").lower() == fact_text.lower():
existing = es
break
if existing:
existing["accessCount"] = (existing.get("accessCount") or 0) + 1
existing["lastAccessedAt"] = now
existing["updatedAt"] = now
existing["confidence"] = max(existing.get("confidence", 0.5), confidence)
kv.set(KV.semantic, existing["id"], existing)
else:
sem = {
"id": generate_id("sem"),
"fact": fact_text,
"confidence": confidence,
"sourceSessionIds": [s["sessionId"] for s in recent_summaries if "sessionId" in s],
"sourceMemoryIds": [],
"accessCount": 1,
"lastAccessedAt": now,
"strength": confidence,
"createdAt": now,
"updatedAt": now
}
kv.set(KV.semantic, sem["id"], sem)
new_facts_count += 1
except Exception as e:
print(f"[consolidate] Semantic merge failed: {e}")
# === Procedural Memory Extraction ===
memories = kv.list(KV.memories)
new_procs_count = 0
patterns = []
for m in memories:
if m.get("isLatest") is not False and m.get("type") == "pattern":
freq = len(m.get("sessionIds") or [])
if freq >= 2:
patterns.append({"content": m.get("content", ""), "frequency": freq})
if len(patterns) >= 2:
PROCEDURAL_EXTRACTION_SYSTEM = """You are a procedural memory extractor. Given repeated patterns and workflows observed across sessions, extract reusable procedures.
Output format (XML):
Step 1 description
Step 2 description
Rules:
- Only extract procedures observed 2+ times
- Steps should be concrete and actionable
- Trigger condition should be specific enough to match automatically"""
prompt_parts = []
for i, p in enumerate(patterns):
prompt_parts.append(f"[Pattern {i + 1}] (seen {p['frequency']}x)\n{p['content']}")
proc_prompt = "Extract reusable procedures from these recurring patterns:\n\n" + "\n\n".join(prompt_parts)
try:
response = generate_content(PROCEDURAL_EXTRACTION_SYSTEM, proc_prompt)
proc_matches = re.findall(r'([\s\S]*?)', response, re.DOTALL)
existing_procs = kv.list(KV.procedural)
now = datetime.datetime.utcnow().isoformat() + "Z"
for name, trigger, steps_block in proc_matches:
steps = [s.strip() for s in re.findall(r'([^<]+)', steps_block, re.DOTALL)]
existing = None
for ep in existing_procs:
if ep.get("name", "").lower() == name.lower():
existing = ep
break
if existing:
existing["frequency"] = (existing.get("frequency") or 1) + 1
existing["updatedAt"] = now
existing["strength"] = min(1.0, (existing.get("strength") or 0.5) + 0.1)
kv.set(KV.procedural, existing["id"], existing)
else:
proc = {
"id": generate_id("proc"),
"name": name,
"steps": steps,
"triggerCondition": trigger,
"frequency": 1,
"sourceSessionIds": [],
"strength": 0.5,
"createdAt": now,
"updatedAt": now
}
kv.set(KV.procedural, proc["id"], proc)
new_procs_count += 1
except Exception as e:
print(f"[consolidate] Procedural extraction failed: {e}")
res_summary = {
"success": True,
"consolidated": consolidated_count,
"totalObservations": len(all_obs),
"semantic": {
"newFacts": new_facts_count,
"totalSummaries": len(summaries)
},
"procedural": {
"newProcedures": new_procs_count,
"patternsAnalyzed": len(patterns)
}
}
safe_audit(kv, "consolidate", "mem::consolidate-pipeline", [], res_summary)
commit_if_enabled(kv, f"Consolidation complete: consolidated={consolidated_count}, facts={new_facts_count}, procs={new_procs_count}", "system")
return res_summary
# Setup persistence helper wire-ups
def set_index_persistence(persistence: IndexPersistence) -> None:
global _index_persistence
_index_persistence = persistence
def set_embedding_provider(provider) -> None:
global _embedding_provider, _hybrid_search
_embedding_provider = provider
_hybrid_search = HybridSearch(
_bm25_index,
_vector_index,
_embedding_provider,
None
)
def set_stream_broadcaster(broadcaster) -> None:
global _stream_broadcaster
_stream_broadcaster = broadcaster
def broadcast_stream(payload: Dict[str, Any]) -> None:
if _stream_broadcaster:
try:
_stream_broadcaster(payload)
except Exception as e:
print(f"[broadcaster] Failed: {e}")