Enable time-travel queries and history viewing by creating immutable version records on every node update. Includes database schema changes, store functions, MCP tools, and CLI commands for viewing history, comparing versions, and restoring to previous states.
374 lines
12 KiB
TypeScript
374 lines
12 KiB
TypeScript
import { randomUUID as uuid } from 'crypto';
|
|
import { getDb } from './db';
|
|
import { Node, Edge, AddNodeInput, UpdateNodeInput, ListOptions, QueryOptions, SearchResult, EdgeType, NodeVersion, HistoricalNode, NodeDiff } from '../types';
|
|
import { hybridSearch, deserializeEmbedding } from './search/index';
|
|
import { getEmbedding } from './search/ollama';
|
|
|
|
function rowToNode(row: any): Node {
|
|
return {
|
|
id: row.id,
|
|
kind: row.kind,
|
|
title: row.title,
|
|
content: row.content,
|
|
status: row.status ?? undefined,
|
|
tags: JSON.parse(row.tags || '[]'),
|
|
metadata: JSON.parse(row.metadata || '{}'),
|
|
embedding: row.embedding ? deserializeEmbedding(row.embedding) : null,
|
|
createdAt: row.created_at,
|
|
updatedAt: row.updated_at,
|
|
lastAccessedAt: row.last_accessed_at ?? row.updated_at,
|
|
isStale: !!row.is_stale,
|
|
};
|
|
}
|
|
|
|
function serializeEmbedding(embedding: number[]): Buffer {
|
|
return Buffer.from(new Float32Array(embedding).buffer);
|
|
}
|
|
|
|
function notifyDirty(): void {
|
|
try {
|
|
const { markDirty } = require('../server/heartbeat');
|
|
markDirty();
|
|
} catch {}
|
|
}
|
|
|
|
export async function addNode(input: AddNodeInput): Promise<Node> {
|
|
const db = getDb();
|
|
const id = uuid();
|
|
const now = Date.now();
|
|
const tags = input.tags ?? [];
|
|
const metadata = input.metadata ?? {};
|
|
const content = input.content ?? '';
|
|
|
|
// Try to get embedding
|
|
const embedding = await getEmbedding(`${input.title} ${content}`);
|
|
|
|
const transaction = db.transaction(() => {
|
|
db.prepare(`
|
|
INSERT INTO nodes (id, kind, title, content, status, tags, metadata, embedding, created_at, updated_at, last_accessed_at, version)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`).run(
|
|
id, input.kind, input.title, content, input.status ?? null,
|
|
JSON.stringify(tags), JSON.stringify(metadata),
|
|
embedding ? serializeEmbedding(embedding) : null,
|
|
now, now, now, 1
|
|
);
|
|
|
|
// Insert tags
|
|
const insertTag = db.prepare('INSERT OR IGNORE INTO node_tags (node_id, tag) VALUES (?, ?)');
|
|
for (const tag of tags) {
|
|
insertTag.run(id, tag);
|
|
}
|
|
|
|
// Create initial version record
|
|
const versionId = uuid();
|
|
db.prepare(`
|
|
INSERT INTO node_versions (id, node_id, version, title, content, status, tags, metadata, valid_from, valid_until, created_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`).run(
|
|
versionId,
|
|
id,
|
|
1,
|
|
input.title,
|
|
content,
|
|
input.status ?? null,
|
|
JSON.stringify(tags),
|
|
JSON.stringify(metadata),
|
|
now,
|
|
null,
|
|
'user'
|
|
);
|
|
});
|
|
|
|
transaction();
|
|
|
|
notifyDirty();
|
|
return {
|
|
id, kind: input.kind, title: input.title, content, status: input.status,
|
|
tags, metadata, embedding, createdAt: now, updatedAt: now, lastAccessedAt: now, isStale: false,
|
|
};
|
|
}
|
|
|
|
export function getNode(id: string): Node | null {
|
|
const db = getDb();
|
|
const row = db.prepare('SELECT * FROM nodes WHERE id = ?').get(id) as any;
|
|
if (!row) return null;
|
|
db.prepare('UPDATE nodes SET last_accessed_at = ? WHERE id = ?').run(Date.now(), id);
|
|
return rowToNode(row);
|
|
}
|
|
|
|
export function findNodeByPrefix(prefix: string): Node | null {
|
|
const db = getDb();
|
|
const row = db.prepare('SELECT * FROM nodes WHERE id LIKE ?').get(`${prefix}%`) as any;
|
|
return row ? rowToNode(row) : null;
|
|
}
|
|
|
|
export function listNodes(options: ListOptions = {}): Node[] {
|
|
const db = getDb();
|
|
const conditions: string[] = [];
|
|
const params: any[] = [];
|
|
|
|
if (!options.includeStale) {
|
|
conditions.push('is_stale = 0');
|
|
}
|
|
if (options.kind) {
|
|
conditions.push('kind = ?');
|
|
params.push(options.kind);
|
|
}
|
|
if (options.status) {
|
|
conditions.push('status = ?');
|
|
params.push(options.status);
|
|
}
|
|
|
|
let sql = 'SELECT * FROM nodes';
|
|
if (conditions.length) sql += ' WHERE ' + conditions.join(' AND ');
|
|
sql += ' ORDER BY created_at DESC';
|
|
if (options.limit) {
|
|
sql += ' LIMIT ?';
|
|
params.push(options.limit);
|
|
}
|
|
|
|
let nodes = (db.prepare(sql).all(...params) as any[]).map(rowToNode);
|
|
|
|
if (options.tags?.length) {
|
|
nodes = nodes.filter(n => options.tags!.some(t => n.tags.includes(t)));
|
|
}
|
|
|
|
return nodes;
|
|
}
|
|
|
|
export async function updateNode(id: string, input: UpdateNodeInput, createdBy: string = 'user'): Promise<Node | null> {
|
|
const db = getDb();
|
|
// Get existing node without updating last_accessed_at
|
|
const existingRow = db.prepare('SELECT * FROM nodes WHERE id = ?').get(id) as any;
|
|
if (!existingRow) return null;
|
|
const existing = rowToNode(existingRow);
|
|
|
|
const now = Date.now();
|
|
|
|
// Get current version number
|
|
const currentVersion = existingRow.version ?? 1;
|
|
const newVersion = currentVersion + 1;
|
|
|
|
// Create version snapshot in a transaction
|
|
const transaction = db.transaction(() => {
|
|
// Close out the current version by setting valid_until
|
|
db.prepare(`
|
|
UPDATE node_versions SET valid_until = ? WHERE node_id = ? AND valid_until IS NULL
|
|
`).run(now, id);
|
|
|
|
// Insert new version record with the NEW state (after update)
|
|
const versionId = uuid();
|
|
const newTitle = input.title ?? existing.title;
|
|
const newContent = input.content ?? existing.content;
|
|
const newStatus = input.status !== undefined ? input.status : existing.status;
|
|
const newTags = input.tags ?? existing.tags;
|
|
const newMetadata = input.metadata !== undefined ? { ...existing.metadata, ...input.metadata } : existing.metadata;
|
|
|
|
db.prepare(`
|
|
INSERT INTO node_versions (id, node_id, version, title, content, status, tags, metadata, valid_from, valid_until, created_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`).run(
|
|
versionId,
|
|
id,
|
|
newVersion,
|
|
newTitle,
|
|
newContent,
|
|
newStatus ?? null,
|
|
JSON.stringify(newTags),
|
|
JSON.stringify(newMetadata),
|
|
now,
|
|
null,
|
|
createdBy
|
|
);
|
|
|
|
// Build the update query
|
|
const sets: string[] = ['updated_at = ?', 'version = ?'];
|
|
const params: any[] = [now, newVersion];
|
|
|
|
if (input.title !== undefined) { sets.push('title = ?'); params.push(input.title); }
|
|
if (input.content !== undefined) { sets.push('content = ?'); params.push(input.content); }
|
|
if (input.status !== undefined) { sets.push('status = ?'); params.push(input.status); }
|
|
if (input.isStale !== undefined) { sets.push('is_stale = ?'); params.push(input.isStale ? 1 : 0); }
|
|
if (input.tags !== undefined) { sets.push('tags = ?'); params.push(JSON.stringify(input.tags)); }
|
|
if (input.metadata !== undefined) {
|
|
const merged = { ...existing.metadata, ...input.metadata };
|
|
sets.push('metadata = ?');
|
|
params.push(JSON.stringify(merged));
|
|
}
|
|
|
|
params.push(id);
|
|
db.prepare(`UPDATE nodes SET ${sets.join(', ')} WHERE id = ?`).run(...params);
|
|
|
|
// Update tags if changed
|
|
if (input.tags !== undefined) {
|
|
db.prepare('DELETE FROM node_tags WHERE node_id = ?').run(id);
|
|
const insertTag = db.prepare('INSERT OR IGNORE INTO node_tags (node_id, tag) VALUES (?, ?)');
|
|
for (const tag of input.tags) {
|
|
insertTag.run(id, tag);
|
|
}
|
|
}
|
|
});
|
|
|
|
transaction();
|
|
|
|
// Re-embed if title or content changed (outside transaction since it's async)
|
|
if (input.title !== undefined || input.content !== undefined) {
|
|
const newTitle = input.title ?? existing.title;
|
|
const newContent = input.content ?? existing.content;
|
|
const embedding = await getEmbedding(`${newTitle} ${newContent}`);
|
|
if (embedding) {
|
|
db.prepare('UPDATE nodes SET embedding = ? WHERE id = ?').run(serializeEmbedding(embedding), id);
|
|
}
|
|
}
|
|
|
|
notifyDirty();
|
|
return getNode(id);
|
|
}
|
|
|
|
export function removeNode(id: string, hard: boolean = false): boolean {
|
|
const db = getDb();
|
|
if (hard) {
|
|
const result = db.prepare('DELETE FROM nodes WHERE id = ?').run(id);
|
|
return result.changes > 0;
|
|
} else {
|
|
const result = db.prepare('UPDATE nodes SET is_stale = 1, updated_at = ? WHERE id = ?').run(Date.now(), id);
|
|
if (result.changes > 0) notifyDirty();
|
|
return result.changes > 0;
|
|
}
|
|
}
|
|
|
|
export function addEdge(fromId: string, toId: string, type: EdgeType, metadata: Record<string, any> = {}): Edge {
|
|
const db = getDb();
|
|
const id = uuid();
|
|
const now = Date.now();
|
|
|
|
db.prepare(`
|
|
INSERT INTO edges (id, from_id, to_id, type, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?)
|
|
`).run(id, fromId, toId, type, JSON.stringify(metadata), now);
|
|
|
|
return { id, fromId, toId, type, metadata, createdAt: now };
|
|
}
|
|
|
|
export function removeEdge(id: string): boolean {
|
|
const db = getDb();
|
|
const result = db.prepare('DELETE FROM edges WHERE id = ?').run(id);
|
|
return result.changes > 0;
|
|
}
|
|
|
|
export async function query(text: string, options: QueryOptions = {}): Promise<SearchResult[]> {
|
|
const nodes = listNodes({ kind: options.kind, tags: options.tags, includeStale: options.includeStale });
|
|
return hybridSearch(nodes, text, options);
|
|
}
|
|
|
|
// Version tracking functions
|
|
|
|
function rowToNodeVersion(row: any): NodeVersion {
|
|
return {
|
|
id: row.id,
|
|
nodeId: row.node_id,
|
|
version: row.version,
|
|
title: row.title,
|
|
content: row.content,
|
|
status: row.status ?? undefined,
|
|
tags: JSON.parse(row.tags || '[]'),
|
|
metadata: JSON.parse(row.metadata || '{}'),
|
|
validFrom: row.valid_from,
|
|
validUntil: row.valid_until ?? null,
|
|
createdBy: row.created_by,
|
|
};
|
|
}
|
|
|
|
function rowToHistoricalNode(row: any, nodeRow: any): HistoricalNode {
|
|
return {
|
|
id: nodeRow.id,
|
|
kind: nodeRow.kind,
|
|
title: row.title,
|
|
content: row.content,
|
|
status: row.status ?? undefined,
|
|
tags: JSON.parse(row.tags || '[]'),
|
|
metadata: JSON.parse(row.metadata || '{}'),
|
|
version: row.version,
|
|
validFrom: row.valid_from,
|
|
validUntil: row.valid_until ?? null,
|
|
};
|
|
}
|
|
|
|
export function getNodeHistory(id: string): NodeVersion[] {
|
|
const db = getDb();
|
|
const rows = db.prepare(`
|
|
SELECT * FROM node_versions WHERE node_id = ? ORDER BY version DESC
|
|
`).all(id) as any[];
|
|
return rows.map(rowToNodeVersion);
|
|
}
|
|
|
|
export function getNodeAtTime(id: string, timestamp: number): HistoricalNode | null {
|
|
const db = getDb();
|
|
const nodeRow = db.prepare('SELECT * FROM nodes WHERE id = ?').get(id) as any;
|
|
if (!nodeRow) return null;
|
|
|
|
const versionRow = db.prepare(`
|
|
SELECT * FROM node_versions
|
|
WHERE node_id = ? AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)
|
|
ORDER BY version DESC LIMIT 1
|
|
`).get(id, timestamp, timestamp) as any;
|
|
|
|
if (!versionRow) return null;
|
|
return rowToHistoricalNode(versionRow, nodeRow);
|
|
}
|
|
|
|
export function getNodeVersion(id: string, version: number): HistoricalNode | null {
|
|
const db = getDb();
|
|
const nodeRow = db.prepare('SELECT * FROM nodes WHERE id = ?').get(id) as any;
|
|
if (!nodeRow) return null;
|
|
|
|
const versionRow = db.prepare(`
|
|
SELECT * FROM node_versions WHERE node_id = ? AND version = ?
|
|
`).get(id, version) as any;
|
|
|
|
if (!versionRow) return null;
|
|
return rowToHistoricalNode(versionRow, nodeRow);
|
|
}
|
|
|
|
export function diffVersions(id: string, v1: number, v2: number): NodeDiff | null {
|
|
const version1 = getNodeVersion(id, v1);
|
|
const version2 = getNodeVersion(id, v2);
|
|
|
|
if (!version1 || !version2) return null;
|
|
|
|
const changes: NodeDiff['changes'] = [];
|
|
const fieldsToCompare: (keyof HistoricalNode)[] = ['title', 'content', 'status', 'tags', 'metadata'];
|
|
|
|
for (const field of fieldsToCompare) {
|
|
const oldVal = version1[field];
|
|
const newVal = version2[field];
|
|
const oldStr = JSON.stringify(oldVal);
|
|
const newStr = JSON.stringify(newVal);
|
|
if (oldStr !== newStr) {
|
|
changes.push({ field, old: oldVal, new: newVal });
|
|
}
|
|
}
|
|
|
|
return { nodeId: id, v1, v2, changes };
|
|
}
|
|
|
|
export async function restoreVersion(id: string, version: number, createdBy: string = 'restore'): Promise<Node | null> {
|
|
const historical = getNodeVersion(id, version);
|
|
if (!historical) return null;
|
|
|
|
// updateNode will handle creating a new version
|
|
return updateNode(id, {
|
|
title: historical.title,
|
|
content: historical.content,
|
|
status: historical.status,
|
|
tags: historical.tags,
|
|
metadata: historical.metadata,
|
|
}, createdBy);
|
|
}
|
|
|
|
export function getCurrentVersion(id: string): number {
|
|
const db = getDb();
|
|
const row = db.prepare('SELECT version FROM nodes WHERE id = ?').get(id) as any;
|
|
return row?.version ?? 1;
|
|
}
|