Yash030's picture
feat: migrate from Dolt/MySQL to SQLite for faster boot and sync
169f06f
#!/usr/bin/env python3
"""
Sync agentmemory data to/from a private HF Dataset repo.
Usage:
python3 sync.py restore -- download DB from HF on startup
python3 sync.py backup -- upload DB to HF (called in loop)
"""
import json
import os
import sys
import shutil
import tempfile
import time
try:
from huggingface_hub import HfApi, snapshot_download, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
except ImportError:
print("[sync] huggingface_hub not installed, skipping sync")
sys.exit(0)
HF_TOKEN = os.environ.get("HF_TOKEN", "")
REPO_ID = os.environ.get("AGENTMEMORY_DATASET_REPO", "Yash030/agentmemory-python-data")
DATA_DIR = os.path.expanduser("~/.agentmemory")
# Only these paths are backed up/restored — everything else is ephemeral
SYNC_FILES = [
"agentmemory.db",
".hmac",
]
SYNC_DIRS = [
"second-brain",
]
STATE_FILE = os.path.join(DATA_DIR, ".backup_state")
def get_api():
return HfApi(token=HF_TOKEN)
def _collect_sync_targets():
"""Return list of (abs_path, repo_rel_path) for all files to sync."""
targets = []
for fname in SYNC_FILES:
full = os.path.join(DATA_DIR, fname)
if os.path.isfile(full):
targets.append((full, fname))
for dname in SYNC_DIRS:
dpath = os.path.join(DATA_DIR, dname)
if os.path.isdir(dpath):
for root, _, files in os.walk(dpath):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, DATA_DIR).replace("\\", "/")
targets.append((full, rel))
return targets
def _state_fingerprint(targets):
entries = {}
for full, rel in targets:
try:
s = os.stat(full)
entries[rel] = (s.st_size, s.st_mtime)
except OSError:
pass
return json.dumps(entries, sort_keys=True)
def restore():
if not HF_TOKEN:
print("[sync] No HF_TOKEN — skipping restore")
return
os.makedirs(DATA_DIR, exist_ok=True)
api = get_api()
# Check repo exists
try:
api.repo_info(REPO_ID, repo_type="dataset")
except RepositoryNotFoundError:
print(f"[sync] Dataset repo {REPO_ID} not found — fresh start")
return
except Exception as e:
print(f"[sync] restore repo check error: {e}")
return
# Download each sync target individually
all_targets = SYNC_FILES + [
f for f in _list_repo_prefix(api, "second-brain/")
]
if not all_targets:
print("[sync] Dataset empty — fresh start")
return
for fname in all_targets:
try:
local_path = os.path.join(DATA_DIR, fname)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
hf_hub_download(
repo_id=REPO_ID,
filename=fname,
repo_type="dataset",
token=HF_TOKEN,
local_dir=DATA_DIR,
local_dir_use_symlinks=False,
)
print(f"[sync] restored {fname}")
except EntryNotFoundError:
pass # file not yet in repo, skip
except Exception as e:
print(f"[sync] restore {fname} error: {e}")
print("[sync] restore complete")
def _list_repo_prefix(api, prefix):
"""List files in repo matching a path prefix."""
try:
from huggingface_hub import list_repo_files
return [f for f in list_repo_files(REPO_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(prefix)]
except Exception:
return []
def backup():
if not HF_TOKEN:
return
api = get_api()
targets = _collect_sync_targets()
if not targets:
print("[sync] nothing to backup")
return
# Fast change detection
current_state = _state_fingerprint(targets)
if os.path.exists(STATE_FILE):
try:
if open(STATE_FILE).read() == current_state:
print("[sync] no changes — skipping backup")
return
except Exception:
pass
# Ensure repo exists
try:
api.repo_info(REPO_ID, repo_type="dataset")
except RepositoryNotFoundError:
print(f"[sync] Creating dataset repo {REPO_ID}")
api.create_repo(REPO_ID, repo_type="dataset", private=True)
except Exception as e:
print(f"[sync] repo_info error: {e}")
return
# Stage only the targeted files
staging = tempfile.mkdtemp(prefix="agentmemory_sync_")
try:
for full, rel in targets:
dest = os.path.join(staging, rel.replace("/", os.sep))
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
shutil.copy2(full, dest)
except Exception as e:
print(f"[sync] stage {rel} error: {e}")
print(f"[sync] uploading {len(targets)} files to {REPO_ID}...")
api.upload_folder(
folder_path=staging,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN,
commit_message="sync: periodic backup",
)
print("[sync] backup complete")
try:
open(STATE_FILE, "w").write(current_state)
except Exception:
pass
except Exception as e:
print(f"[sync] backup error: {e}")
finally:
shutil.rmtree(staging, ignore_errors=True)
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "backup"
if cmd == "restore":
restore()
elif cmd == "backup":
backup()
else:
print(f"[sync] unknown command: {cmd}")
sys.exit(1)