agentmemory-python / src /replay_import.py
Yash030's picture
Initialize Hugging Face Space deployment for AgentMemory Python (clean without assets)
b2d9e47
import os
import re
import json
import hashlib
import datetime
from typing import List, Dict, Any, Tuple
# Constants
MAX_FILES_DEFAULT = 200
MAX_FILES_UPPER_BOUND = 1000
SENSITIVE_PATH_PATTERNS = [
re.compile(r"(^|[\\/_.-])secret([\\/_.-]|s?$)", re.IGNORECASE),
re.compile(r"(^|[\\/_.-])credentials?([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/_.-])private[_-]?key([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/])\.env(\.[\w-]+)?$", re.IGNORECASE),
re.compile(r"(^|[\\/_.-])id_rsa([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/])auth[_-]?token([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/])bearer[_-]?token([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/])access[_-]?token([\\/_.-]|$)", re.IGNORECASE),
re.compile(r"(^|[\\/])api[_-]?token([\\/_.-]|$)", re.IGNORECASE),
]
LESSON_PATTERNS = [
re.compile(r"\b(always|never|don'?t|do not|make sure|remember to|note:|caveat:|warning:)\b[^.\n]{10,200}[.!\n]", re.IGNORECASE),
re.compile(r"\b(prefer|avoid)\s[^.\n]{10,200}[.!\n]", re.IGNORECASE),
]
def generate_id(prefix: str) -> str:
import uuid
return f"{prefix}_{uuid.uuid4().hex}"
def fingerprint_id(prefix: str, content: str) -> str:
# Hash content for stable ID (similar to TS fingerprintId)
h = hashlib.sha256(content.strip().encode("utf-8")).hexdigest()
return f"{prefix}_{h[:32]}"
def is_sensitive(path: str) -> bool:
return any(pattern.search(path) for pattern in SENSITIVE_PATH_PATTERNS)
def derive_project(cwd: str) -> str:
if not cwd:
return "unknown"
parts = [p for p in re.split(r'[\\/]', cwd) if p]
return parts[-1] if parts else "unknown"
def to_text(content: Any) -> str:
if isinstance(content, str):
return content
if not isinstance(content, list):
return ""
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str):
parts.append(item["text"])
return "\n".join(parts)
def extract_tool_uses(content: Any) -> List[Dict[str, Any]]:
if not isinstance(content, list):
return []
out = []
for item in content:
if isinstance(item, dict) and item.get("type") == "tool_use":
out.append({
"id": item.get("id", ""),
"name": item.get("name", "unknown"),
"input": item.get("input")
})
return out
def extract_tool_results(content: Any) -> List[Dict[str, Any]]:
if not isinstance(content, list):
return []
out = []
for item in content:
if isinstance(item, dict) and item.get("type") == "tool_result":
out.append({
"toolUseId": item.get("tool_use_id", ""),
"output": item.get("content"),
"isError": item.get("is_error") is True
})
return out
def parse_jsonl_text(text: str, fallback_session_id: str = None) -> Dict[str, Any]:
lines = [l for l in text.split("\n") if l.strip()]
entries = []
for line in lines:
try:
parsed = json.loads(line)
if isinstance(parsed, dict):
entries.append(parsed)
except Exception:
pass
session_id = ""
cwd = ""
first_ts = ""
last_ts = ""
observations = []
for entry in entries:
if entry.get("sessionId") and not session_id:
session_id = entry["sessionId"]
if entry.get("cwd") and not cwd:
cwd = entry["cwd"]
ts = entry.get("timestamp") or datetime.datetime.utcnow().isoformat() + "Z"
if not first_ts:
first_ts = ts
last_ts = ts
msg = entry.get("message") or {}
role = msg.get("role")
content = msg.get("content")
if entry.get("type") == "user" and role == "user":
tool_results = extract_tool_results(content)
if tool_results:
for result in tool_results:
observations.append({
"id": generate_id("obs"),
"sessionId": session_id or "imported",
"timestamp": ts,
"hookType": "post_tool_failure" if result["isError"] else "post_tool_use",
"toolName": None,
"toolInput": {"toolUseId": result["toolUseId"]},
"toolOutput": result["output"],
"raw": entry
})
else:
txt = to_text(content)
if txt.strip():
observations.append({
"id": generate_id("obs"),
"sessionId": session_id or "imported",
"timestamp": ts,
"hookType": "prompt_submit",
"userPrompt": txt,
"raw": entry
})
elif entry.get("type") == "assistant" and role == "assistant":
txt = to_text(content)
tools = extract_tool_uses(content)
if txt.strip():
observations.append({
"id": generate_id("obs"),
"sessionId": session_id or "imported",
"timestamp": ts,
"hookType": "stop",
"assistantResponse": txt,
"raw": entry
})
for tool in tools:
observations.append({
"id": generate_id("obs"),
"sessionId": session_id or "imported",
"timestamp": ts,
"hookType": "pre_tool_use",
"toolName": tool["name"],
"toolInput": tool["input"],
"raw": {"toolUseId": tool["id"], "entry": entry}
})
effective_session_id = session_id or fallback_session_id or generate_id("sess")
for obs in observations:
if obs["sessionId"] == "imported":
obs["sessionId"] = effective_session_id
now_iso = datetime.datetime.utcnow().isoformat() + "Z"
return {
"sessionId": effective_session_id,
"project": derive_project(cwd),
"cwd": cwd or os.getcwd(),
"startedAt": first_ts or now_iso,
"endedAt": last_ts or now_iso,
"observations": observations
}
def derive_crystal_and_lessons(
kv,
session_id: str,
project: str,
raw_obs: List[Dict[str, Any]],
compressed: List[Dict[str, Any]],
first_prompt: str = None
) -> None:
from functions import KV
if not raw_obs:
return
created_at = datetime.datetime.utcnow().isoformat() + "Z"
files = set()
tools = set()
for c in compressed:
for f in c.get("files", []):
files.add(f)
if c.get("type") and c.get("type") != "conversation" and c.get("title"):
tools.add(c["title"])
assistant_texts = []
user_prompts = []
for r in raw_obs:
if isinstance(r.get("assistantResponse"), str) and r["assistantResponse"].strip():
assistant_texts.append(r["assistantResponse"])
if isinstance(r.get("userPrompt"), str) and r["userPrompt"].strip():
user_prompts.append(r["userPrompt"])
lesson_matches = {}
for text in (assistant_texts + user_prompts)[:200]:
for pat in LESSON_PATTERNS:
for m in pat.finditer(text):
if len(lesson_matches) >= 40:
break
snippet = re.sub(r"\s+", " ", m.group(0)).strip()
if 20 <= len(snippet) <= 220:
key = snippet.lower()
if key not in lesson_matches:
lesson_matches[key] = snippet
lesson_entries = list(lesson_matches.values())[:20]
lesson_ids = []
for content in lesson_entries:
lesson_id = fingerprint_id("lesson", content.lower())
try:
existing = kv.get(KV.lessons, lesson_id)
if existing:
existing_sources = existing.get("sourceIds", [])
merged_sources = existing_sources if session_id in existing_sources else (existing_sources + [session_id])
existing_tags = existing.get("tags", [])
merged_tags = existing_tags if "auto-import" in existing_tags else (existing_tags + ["auto-import"])
existing["sourceIds"] = merged_sources
existing["tags"] = merged_tags
existing["reinforcements"] = existing.get("reinforcements", 0) + 1
existing["updatedAt"] = created_at
existing["lastReinforcedAt"] = created_at
kv.set(KV.lessons, lesson_id, existing)
else:
lesson = {
"id": lesson_id,
"content": content,
"context": first_prompt or project,
"confidence": 0.4,
"reinforcements": 0,
"source": "consolidation",
"sourceIds": [session_id],
"project": project,
"tags": ["auto-import"],
"createdAt": created_at,
"updatedAt": created_at,
"decayRate": 0.05
}
kv.set(KV.lessons, lesson_id, lesson)
lesson_ids.append(lesson_id)
except Exception:
pass
crystal_id = fingerprint_id("crystal", session_id)
if first_prompt:
narrative_preview = first_prompt[:300]
else:
previews = []
for c in compressed[:5]:
p = c.get("narrative") or c.get("title")
if p:
previews.append(p)
narrative_preview = (" · ".join(previews))[:300]
try:
existing_crystal = kv.get(KV.crystals, crystal_id) or {}
crystal = {
"id": crystal_id,
"narrative": narrative_preview or f"Session {session_id[:12]} ({len(raw_obs)} observations)",
"keyOutcomes": list(tools)[:8],
"filesAffected": list(files)[:20],
"lessons": lesson_ids,
"sourceActionIds": existing_crystal.get("sourceActionIds", []),
"sessionId": session_id,
"project": project,
"createdAt": existing_crystal.get("createdAt", created_at)
}
kv.set(KV.crystals, crystal_id, crystal)
except Exception:
pass
def find_jsonl_files(root: str, limit=200) -> Tuple[List[str], bool, int, bool]:
out = []
discovered = 0
walked = 0
traversal_cap = max(limit * 50, 50000)
for dirpath, dirnames, filenames in os.walk(root):
if walked >= traversal_cap:
break
# skip symlinks or hidden directories
dirnames[:] = [d for d in dirnames if not d.startswith('.') and not os.path.islink(os.path.join(dirpath, d))]
for name in filenames:
walked += 1
if walked >= traversal_cap:
break
if name.endswith(".jsonl"):
full = os.path.join(dirpath, name)
if not os.path.islink(full):
discovered += 1
if len(out) < limit:
out.append(full)
traversal_capped = walked >= traversal_cap
truncated = discovered > len(out) or traversal_capped
return out, truncated, discovered, traversal_capped
def import_jsonl_data(kv, path: str = None, max_files: int = None) -> Dict[str, Any]:
from functions import KV, build_synthetic_compression, IndexPersistence, _bm25_index, _vector_index
default_root = os.path.expanduser(os.path.join("~", ".claude", "projects"))
raw_path = path or default_root
expanded = os.path.expanduser(raw_path)
abs_path = os.path.abspath(expanded)
if is_sensitive(abs_path):
return {"success": False, "error": "refusing to process sensitive-looking path"}
if os.path.islink(abs_path):
return {"success": False, "error": "symlinks are not supported"}
if not os.path.exists(abs_path):
return {"success": False, "error": "path not found"}
limit = max_files if isinstance(max_files, int) and max_files > 0 else MAX_FILES_DEFAULT
limit = min(limit, MAX_FILES_UPPER_BOUND)
files = []
truncated = False
discovered = 0
traversal_capped = False
if os.path.isdir(abs_path):
files, truncated, discovered, traversal_capped = find_jsonl_files(abs_path, limit)
elif os.path.isfile(abs_path) and abs_path.endswith(".jsonl"):
files = [abs_path]
discovered = 1
else:
return {"success": False, "error": "path must be a .jsonl file or directory"}
if not files:
return {
"success": True,
"imported": 0,
"sessionIds": [],
"observations": 0,
"discovered": discovered,
"truncated": truncated,
"traversalCapped": traversal_capped,
"maxFiles": limit,
"maxFilesUpperBound": MAX_FILES_UPPER_BOUND
}
session_ids = []
observation_count = 0
for file in files:
if is_sensitive(file):
continue
if os.path.islink(file):
continue
try:
with open(file, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
except Exception as e:
print(f"[import-jsonl] Failed to read {file}: {e}")
continue
parsed = parse_jsonl_text(text, generate_id("sess"))
if not parsed["observations"]:
continue
first_prompt_obs = None
for o in parsed["observations"]:
if isinstance(o.get("userPrompt"), str) and o["userPrompt"].strip():
first_prompt_obs = o
break
first_prompt = None
if first_prompt_obs:
first_prompt = re.sub(r"\s+", " ", first_prompt_obs["userPrompt"]).strip()[:200]
existing = kv.get(KV.sessions, parsed["sessionId"])
if existing:
existing["observationCount"] = existing.get("observationCount", 0) + len(parsed["observations"])
if parsed["endedAt"] > existing.get("endedAt", ""):
existing["endedAt"] = parsed["endedAt"]
if existing.get("status") == "active":
existing["status"] = "completed"
existing_tags = existing.get("tags", [])
if "jsonl-import" not in existing_tags:
existing["tags"] = existing_tags + ["jsonl-import"]
if not existing.get("firstPrompt") and first_prompt:
existing["firstPrompt"] = first_prompt
if not existing.get("id"):
existing["id"] = parsed["sessionId"]
kv.set(KV.sessions, parsed["sessionId"], existing)
else:
session = {
"id": parsed["sessionId"],
"project": parsed["project"],
"cwd": parsed["cwd"],
"startedAt": parsed["startedAt"],
"endedAt": parsed["endedAt"],
"status": "completed",
"observationCount": len(parsed["observations"]),
"tags": ["jsonl-import"],
"firstPrompt": first_prompt
}
kv.set(KV.sessions, session["id"], session)
from functions import _bm25_index, vector_index_add_guarded
compressed = []
for obs in parsed["observations"]:
synthetic = build_synthetic_compression(obs)
compressed.append(synthetic)
kv.set(KV.observations(parsed["sessionId"]), obs["id"], synthetic)
# Index
_bm25_index.add(synthetic)
comb_text = synthetic["title"] + " " + (synthetic.get("narrative") or "")
vector_index_add_guarded(synthetic["id"], synthetic["sessionId"], comb_text, {"kind": "synthetic", "logId": synthetic["id"]})
observation_count += len(parsed["observations"])
session_ids.append(parsed["sessionId"])
derive_crystal_and_lessons(
kv,
parsed["sessionId"],
parsed["project"],
parsed["observations"],
compressed,
first_prompt
)
# Save the updated persistence state
import functions
if functions._index_persistence:
try:
functions._index_persistence.save()
except Exception as e:
print(f"[import-jsonl] Warning saving index persistence: {e}")
# Audit trail
try:
from functions import log_audit
log_audit(kv, "import", "mem::replay::import-jsonl", f"Imported {len(session_ids)} sessions: {','.join(session_ids[:3])}...")
except Exception:
pass
# Dolt commit if enabled
try:
kv.commit_version(f"Import {len(session_ids)} Claude Code sessions from JSONL", "system")
except Exception:
pass
return {
"success": True,
"imported": len(files),
"sessionIds": session_ids,
"observations": observation_count,
"discovered": discovered,
"truncated": truncated,
"traversalCapped": traversal_capped,
"maxFiles": limit,
"maxFilesUpperBound": MAX_FILES_UPPER_BOUND
}
def kind_from_hook(obs: Dict[str, Any]) -> str:
ht = obs.get("hookType")
if ht == "session_start":
return "session_start"
elif ht == "session_end":
return "session_end"
elif ht == "prompt_submit":
return "prompt"
elif ht == "stop":
return "response" if obs.get("assistantResponse") else "hook"
elif ht == "pre_tool_use":
return "tool_call"
elif ht == "post_tool_use":
return "tool_result"
elif ht == "post_tool_failure":
return "tool_error"
else:
return "hook"
def label_for(obs: Dict[str, Any], kind: str) -> str:
if kind == "prompt":
val = obs.get("userPrompt") or "User prompt"
return val[:79] + "…" if len(val) > 80 else val
elif kind == "response":
val = obs.get("assistantResponse") or "Assistant response"
return val[:79] + "…" if len(val) > 80 else val
elif kind == "tool_call":
return f"{obs.get('toolName') or 'tool'} ▸ call"
elif kind == "tool_result":
return f"{obs.get('toolName') or 'tool'} ▸ result"
elif kind == "tool_error":
return f"{obs.get('toolName') or 'tool'} ▸ error"
elif kind == "session_start":
return "Session start"
elif kind == "session_end":
return "Session end"
else:
return obs.get("hookType") or ""
def estimate_duration_ms(event: Dict[str, Any]) -> int:
body = event.get("body") or ""
tool_input = event.get("toolInput") or ""
tool_output = event.get("toolOutput") or ""
chars = len(body)
if isinstance(tool_input, str):
chars += len(tool_input)
elif tool_input is not None:
chars += len(json.dumps(tool_input))
if isinstance(tool_output, str):
chars += len(tool_output)
elif tool_output is not None:
chars += len(json.dumps(tool_output))
if chars == 0:
return 300
ms = round((chars / 40) * 1000)
return max(300, min(20000, ms))
def project_timeline(observations: List[Dict[str, Any]]) -> Dict[str, Any]:
if not observations:
now = datetime.datetime.utcnow().isoformat() + "Z"
return {
"sessionId": "",
"startedAt": now,
"endedAt": now,
"totalDurationMs": 0,
"eventCount": 0,
"events": []
}
sorted_obs = sorted(observations, key=lambda o: o.get("timestamp", ""))
started_at = sorted_obs[0].get("timestamp", "")
try:
import dateutil.parser
start_dt = dateutil.parser.isoparse(started_at)
start_ms = start_dt.timestamp() * 1000
except Exception:
start_ms = 0
events = []
synthetic_offset = 0
all_same_ts = all(o.get("timestamp") == started_at for o in sorted_obs)
for obs in sorted_obs:
kind = kind_from_hook(obs)
body = obs.get("userPrompt") if kind == "prompt" else (obs.get("assistantResponse") if kind == "response" else None)
try:
import dateutil.parser
obs_dt = dateutil.parser.isoparse(obs.get("timestamp", ""))
obs_ms = obs_dt.timestamp() * 1000
offset_ms = int(max(0, obs_ms - start_ms)) if not all_same_ts else synthetic_offset
except Exception:
offset_ms = synthetic_offset
event = {
"id": obs.get("id"),
"sessionId": obs.get("sessionId"),
"ts": obs.get("timestamp"),
"offsetMs": offset_ms,
"durationMs": 0,
"kind": kind,
"label": label_for(obs, kind),
"body": body,
"toolName": obs.get("toolName"),
"toolInput": obs.get("toolInput"),
"toolOutput": obs.get("toolOutput")
}
event["durationMs"] = estimate_duration_ms(event)
events.append(event)
synthetic_offset += event["durationMs"]
if not events:
total_duration_ms = 0
else:
last = events[-1]
total_duration_ms = last["offsetMs"] + last["durationMs"]
return {
"sessionId": sorted_obs[0].get("sessionId"),
"startedAt": started_at,
"endedAt": sorted_obs[-1].get("timestamp"),
"totalDurationMs": total_duration_ms,
"eventCount": len(events),
"events": events
}