158 lines
5.2 KiB
Python
158 lines
5.2 KiB
Python
|
|
"""Ingestion pipeline: parse → chunk → embed → store."""
|
||
|
|
|
||
|
|
import hashlib
|
||
|
|
import json
|
||
|
|
import time
|
||
|
|
import uuid
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from atocore.config import settings
|
||
|
|
from atocore.ingestion.chunker import chunk_markdown
|
||
|
|
from atocore.ingestion.parser import parse_markdown
|
||
|
|
from atocore.models.database import get_connection
|
||
|
|
from atocore.observability.logger import get_logger
|
||
|
|
from atocore.retrieval.vector_store import get_vector_store
|
||
|
|
|
||
|
|
log = get_logger("ingestion")
|
||
|
|
|
||
|
|
|
||
|
|
def ingest_file(file_path: Path) -> dict:
|
||
|
|
"""Ingest a single markdown file. Returns stats."""
|
||
|
|
start = time.time()
|
||
|
|
file_path = file_path.resolve()
|
||
|
|
|
||
|
|
if not file_path.exists():
|
||
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
||
|
|
if file_path.suffix.lower() not in (".md", ".markdown"):
|
||
|
|
raise ValueError(f"Not a markdown file: {file_path}")
|
||
|
|
|
||
|
|
# Read and hash
|
||
|
|
raw_content = file_path.read_text(encoding="utf-8")
|
||
|
|
file_hash = hashlib.sha256(raw_content.encode()).hexdigest()
|
||
|
|
|
||
|
|
# Check if already ingested and unchanged
|
||
|
|
with get_connection() as conn:
|
||
|
|
existing = conn.execute(
|
||
|
|
"SELECT id, file_hash FROM source_documents WHERE file_path = ?",
|
||
|
|
(str(file_path),),
|
||
|
|
).fetchone()
|
||
|
|
|
||
|
|
if existing and existing["file_hash"] == file_hash:
|
||
|
|
log.info("file_skipped_unchanged", file_path=str(file_path))
|
||
|
|
return {"file": str(file_path), "status": "skipped", "reason": "unchanged"}
|
||
|
|
|
||
|
|
# Parse
|
||
|
|
parsed = parse_markdown(file_path)
|
||
|
|
|
||
|
|
# Chunk
|
||
|
|
base_meta = {
|
||
|
|
"source_file": str(file_path),
|
||
|
|
"tags": parsed.tags,
|
||
|
|
"title": parsed.title,
|
||
|
|
}
|
||
|
|
chunks = chunk_markdown(parsed.body, base_metadata=base_meta)
|
||
|
|
|
||
|
|
if not chunks:
|
||
|
|
log.warning("no_chunks_created", file_path=str(file_path))
|
||
|
|
return {"file": str(file_path), "status": "empty", "chunks": 0}
|
||
|
|
|
||
|
|
# Store in DB and vector store
|
||
|
|
doc_id = str(uuid.uuid4())
|
||
|
|
vector_store = get_vector_store()
|
||
|
|
|
||
|
|
with get_connection() as conn:
|
||
|
|
# Remove old data if re-ingesting
|
||
|
|
if existing:
|
||
|
|
doc_id = existing["id"]
|
||
|
|
old_chunk_ids = [
|
||
|
|
row["id"]
|
||
|
|
for row in conn.execute(
|
||
|
|
"SELECT id FROM source_chunks WHERE document_id = ?",
|
||
|
|
(doc_id,),
|
||
|
|
).fetchall()
|
||
|
|
]
|
||
|
|
conn.execute(
|
||
|
|
"DELETE FROM source_chunks WHERE document_id = ?", (doc_id,)
|
||
|
|
)
|
||
|
|
conn.execute(
|
||
|
|
"UPDATE source_documents SET file_hash = ?, title = ?, tags = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||
|
|
(file_hash, parsed.title, json.dumps(parsed.tags), doc_id),
|
||
|
|
)
|
||
|
|
# Remove old vectors
|
||
|
|
if old_chunk_ids:
|
||
|
|
vector_store.delete(old_chunk_ids)
|
||
|
|
else:
|
||
|
|
conn.execute(
|
||
|
|
"INSERT INTO source_documents (id, file_path, file_hash, title, doc_type, tags) VALUES (?, ?, ?, ?, ?, ?)",
|
||
|
|
(doc_id, str(file_path), file_hash, parsed.title, "markdown", json.dumps(parsed.tags)),
|
||
|
|
)
|
||
|
|
|
||
|
|
# Insert chunks
|
||
|
|
chunk_ids = []
|
||
|
|
chunk_contents = []
|
||
|
|
chunk_metadatas = []
|
||
|
|
|
||
|
|
for chunk in chunks:
|
||
|
|
chunk_id = str(uuid.uuid4())
|
||
|
|
chunk_ids.append(chunk_id)
|
||
|
|
chunk_contents.append(chunk.content)
|
||
|
|
chunk_metadatas.append({
|
||
|
|
"document_id": doc_id,
|
||
|
|
"heading_path": chunk.heading_path,
|
||
|
|
"source_file": str(file_path),
|
||
|
|
"tags": json.dumps(parsed.tags),
|
||
|
|
"title": parsed.title,
|
||
|
|
})
|
||
|
|
|
||
|
|
conn.execute(
|
||
|
|
"INSERT INTO source_chunks (id, document_id, chunk_index, content, heading_path, char_count, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
|
|
(
|
||
|
|
chunk_id,
|
||
|
|
doc_id,
|
||
|
|
chunk.chunk_index,
|
||
|
|
chunk.content,
|
||
|
|
chunk.heading_path,
|
||
|
|
chunk.char_count,
|
||
|
|
json.dumps(chunk.metadata),
|
||
|
|
),
|
||
|
|
)
|
||
|
|
|
||
|
|
# Store embeddings
|
||
|
|
vector_store.add(chunk_ids, chunk_contents, chunk_metadatas)
|
||
|
|
|
||
|
|
duration_ms = int((time.time() - start) * 1000)
|
||
|
|
log.info(
|
||
|
|
"file_ingested",
|
||
|
|
file_path=str(file_path),
|
||
|
|
chunks_created=len(chunks),
|
||
|
|
duration_ms=duration_ms,
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"file": str(file_path),
|
||
|
|
"status": "ingested",
|
||
|
|
"chunks": len(chunks),
|
||
|
|
"duration_ms": duration_ms,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def ingest_folder(folder_path: Path) -> list[dict]:
|
||
|
|
"""Ingest all markdown files in a folder recursively."""
|
||
|
|
folder_path = folder_path.resolve()
|
||
|
|
if not folder_path.is_dir():
|
||
|
|
raise NotADirectoryError(f"Not a directory: {folder_path}")
|
||
|
|
|
||
|
|
results = []
|
||
|
|
md_files = sorted(folder_path.rglob("*.md"))
|
||
|
|
log.info("ingestion_started", folder=str(folder_path), file_count=len(md_files))
|
||
|
|
|
||
|
|
for md_file in md_files:
|
||
|
|
try:
|
||
|
|
result = ingest_file(md_file)
|
||
|
|
results.append(result)
|
||
|
|
except Exception as e:
|
||
|
|
log.error("ingestion_error", file_path=str(md_file), error=str(e))
|
||
|
|
results.append({"file": str(md_file), "status": "error", "error": str(e)})
|
||
|
|
|
||
|
|
return results
|