Stabilize core correctness and sync project plan state

This commit is contained in:
2026-04-05 17:53:23 -04:00
parent b48f0c95ab
commit b0889b3925
20 changed files with 551 additions and 168 deletions

View File

@@ -6,7 +6,6 @@ import time
import uuid
from pathlib import Path
from atocore.config import settings
from atocore.ingestion.chunker import chunk_markdown
from atocore.ingestion.parser import parse_markdown
from atocore.models.database import get_connection
@@ -45,7 +44,7 @@ def ingest_file(file_path: Path) -> dict:
return {"file": str(file_path), "status": "skipped", "reason": "unchanged"}
# Parse
parsed = parse_markdown(file_path)
parsed = parse_markdown(file_path, text=raw_content)
# Chunk
base_meta = {
@@ -55,85 +54,98 @@ def ingest_file(file_path: Path) -> dict:
}
chunks = chunk_markdown(parsed.body, base_metadata=base_meta)
if not chunks:
log.warning("no_chunks_created", file_path=str(file_path))
return {"file": str(file_path), "status": "empty", "chunks": 0}
# Store in DB and vector store
doc_id = str(uuid.uuid4())
vector_store = get_vector_store()
old_chunk_ids: list[str] = []
new_chunk_ids: list[str] = []
with get_connection() as conn:
# Remove old data if re-ingesting
if existing:
doc_id = existing["id"]
old_chunk_ids = [
row["id"]
for row in conn.execute(
"SELECT id FROM source_chunks WHERE document_id = ?",
(doc_id,),
).fetchall()
]
conn.execute(
"DELETE FROM source_chunks WHERE document_id = ?", (doc_id,)
)
conn.execute(
"UPDATE source_documents SET file_hash = ?, title = ?, tags = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(file_hash, parsed.title, json.dumps(parsed.tags), doc_id),
)
# Remove old vectors
if old_chunk_ids:
vector_store.delete(old_chunk_ids)
else:
conn.execute(
"INSERT INTO source_documents (id, file_path, file_hash, title, doc_type, tags) VALUES (?, ?, ?, ?, ?, ?)",
(doc_id, str(file_path), file_hash, parsed.title, "markdown", json.dumps(parsed.tags)),
)
try:
with get_connection() as conn:
# Remove old data if re-ingesting
if existing:
doc_id = existing["id"]
old_chunk_ids = [
row["id"]
for row in conn.execute(
"SELECT id FROM source_chunks WHERE document_id = ?",
(doc_id,),
).fetchall()
]
conn.execute(
"DELETE FROM source_chunks WHERE document_id = ?", (doc_id,)
)
conn.execute(
"UPDATE source_documents SET file_hash = ?, title = ?, tags = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(file_hash, parsed.title, json.dumps(parsed.tags), doc_id),
)
else:
conn.execute(
"INSERT INTO source_documents (id, file_path, file_hash, title, doc_type, tags) VALUES (?, ?, ?, ?, ?, ?)",
(doc_id, str(file_path), file_hash, parsed.title, "markdown", json.dumps(parsed.tags)),
)
# Insert chunks
chunk_ids = []
chunk_contents = []
chunk_metadatas = []
if not chunks:
log.warning("no_chunks_created", file_path=str(file_path))
else:
# Insert chunks
chunk_contents = []
chunk_metadatas = []
for chunk in chunks:
chunk_id = str(uuid.uuid4())
chunk_ids.append(chunk_id)
chunk_contents.append(chunk.content)
chunk_metadatas.append({
"document_id": doc_id,
"heading_path": chunk.heading_path,
"source_file": str(file_path),
"tags": json.dumps(parsed.tags),
"title": parsed.title,
})
for chunk in chunks:
chunk_id = str(uuid.uuid4())
new_chunk_ids.append(chunk_id)
chunk_contents.append(chunk.content)
chunk_metadatas.append({
"document_id": doc_id,
"heading_path": chunk.heading_path,
"source_file": str(file_path),
"tags": json.dumps(parsed.tags),
"title": parsed.title,
})
conn.execute(
"INSERT INTO source_chunks (id, document_id, chunk_index, content, heading_path, char_count, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
chunk_id,
doc_id,
chunk.chunk_index,
chunk.content,
chunk.heading_path,
chunk.char_count,
json.dumps(chunk.metadata),
),
)
conn.execute(
"INSERT INTO source_chunks (id, document_id, chunk_index, content, heading_path, char_count, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
chunk_id,
doc_id,
chunk.chunk_index,
chunk.content,
chunk.heading_path,
chunk.char_count,
json.dumps(chunk.metadata),
),
)
# Store embeddings
vector_store.add(chunk_ids, chunk_contents, chunk_metadatas)
# Add new vectors before commit so DB can still roll back on failure.
vector_store.add(new_chunk_ids, chunk_contents, chunk_metadatas)
except Exception:
if new_chunk_ids:
vector_store.delete(new_chunk_ids)
raise
# Delete stale vectors only after the DB transaction committed.
if old_chunk_ids:
vector_store.delete(old_chunk_ids)
duration_ms = int((time.time() - start) * 1000)
log.info(
"file_ingested",
file_path=str(file_path),
chunks_created=len(chunks),
duration_ms=duration_ms,
)
if chunks:
log.info(
"file_ingested",
file_path=str(file_path),
chunks_created=len(chunks),
duration_ms=duration_ms,
)
else:
log.info(
"file_ingested_empty",
file_path=str(file_path),
duration_ms=duration_ms,
)
return {
"file": str(file_path),
"status": "ingested",
"status": "ingested" if chunks else "empty",
"chunks": len(chunks),
"duration_ms": duration_ms,
}
@@ -152,7 +164,9 @@ def ingest_folder(folder_path: Path, purge_deleted: bool = True) -> list[dict]:
raise NotADirectoryError(f"Not a directory: {folder_path}")
results = []
md_files = sorted(folder_path.rglob("*.md"))
md_files = sorted(
list(folder_path.rglob("*.md")) + list(folder_path.rglob("*.markdown"))
)
current_paths = {str(f.resolve()) for f in md_files}
log.info("ingestion_started", folder=str(folder_path), file_count=len(md_files))
@@ -213,32 +227,35 @@ def _purge_deleted_files(folder_path: Path, current_paths: set[str]) -> int:
folder_str = str(folder_path)
deleted_count = 0
vector_store = get_vector_store()
chunk_ids_to_delete: list[str] = []
with get_connection() as conn:
# Find documents under this folder
rows = conn.execute(
"SELECT id, file_path FROM source_documents WHERE file_path LIKE ?",
(f"{folder_str}%",),
"SELECT id, file_path FROM source_documents"
).fetchall()
for row in rows:
doc_path = Path(row["file_path"])
try:
doc_path.relative_to(folder_path)
except ValueError:
continue
if row["file_path"] not in current_paths:
doc_id = row["id"]
# Get chunk IDs for vector deletion
chunk_ids = [
chunk_ids_to_delete.extend(
r["id"]
for r in conn.execute(
"SELECT id FROM source_chunks WHERE document_id = ?",
(doc_id,),
).fetchall()
]
# Delete from DB
)
conn.execute("DELETE FROM source_chunks WHERE document_id = ?", (doc_id,))
conn.execute("DELETE FROM source_documents WHERE id = ?", (doc_id,))
# Delete from vectors
if chunk_ids:
vector_store.delete(chunk_ids)
log.info("purged_deleted_file", file_path=row["file_path"])
deleted_count += 1
if chunk_ids_to_delete:
vector_store.delete(chunk_ids_to_delete)
return deleted_count