4 Commits

Author SHA1 Message Date
b48f0c95ab feat: Phase 2 Memory Core — structured memory with context integration
Memory Core implementation:
- Memory service with 6 types: identity, preference, project, episodic, knowledge, adaptation
- CRUD operations: create (with dedup), get (filtered), update, invalidate, supersede
- Confidence scoring (0.0-1.0) and lifecycle management (active/superseded/invalid)
- Memory API endpoints: POST/GET/PUT/DELETE /memory

Context builder integration (trust precedence per Master Plan):
  1. Trusted Project State (highest trust, 20% budget)
  2. Identity + Preference memories (10% budget)
  3. Retrieved chunks (remaining budget)

Also fixed database.py to use dynamic settings reference for test isolation.
45/45 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 09:54:52 -04:00
531c560db7 feat: Phase 1 ingestion hardening + Phase 5 Trusted Project State
Phase 1 - Ingestion hardening:
- Encoding fallback (UTF-8/UTF-8-sig/Latin-1/CP1252)
- Delete detection: purge DB/vector entries for removed files
- Ingestion stats endpoint (GET /stats)

Phase 5 - Trusted Project State:
- project_state table with categories (status, decision, requirement, contact, milestone, fact, config)
- CRUD API: POST/GET/DELETE /project/state
- Upsert semantics, invalidation (supersede) support
- Context builder integrates project state at highest trust precedence
- Project state gets 20% budget allocation, appears first in context
- Trust precedence: Project State > Retrieved Chunks (per Master Plan)

33/33 tests passing. Validated end-to-end with GigaBIT M1 project data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 09:41:59 -04:00
6081462058 fix: critical bugs and hardening from validation audit
- Fix infinite loop in chunker _hard_split when overlap >= max_size
- Fix tag filter false positives by quoting tag values in ChromaDB query
- Fix score boost semantics (additive → multiplicative) to stay within 0-1 range
- Add error handling and type hints to all API routes
- Update README with proper project documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 09:35:37 -04:00
b4afbbb53a feat: implement AtoCore Phase 0 + Phase 0.5 (foundation + PoC)
Complete implementation of the personal context engine foundation:
- FastAPI server with 5 endpoints (ingest, query, context/build, health, debug)
- SQLite database with 5 tables (documents, chunks, memories, projects, interactions)
- Heading-aware markdown chunker (800 char max, recursive splitting)
- Multilingual embeddings via sentence-transformers (EN/FR)
- ChromaDB vector store with cosine similarity retrieval
- Context builder with project boosting, dedup, and budget enforcement
- CLI scripts for batch ingestion and test prompt evaluation
- 19 unit tests passing, 79% coverage
- Validated on 482 real project files (8383 chunks, 0 errors)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 09:21:27 -04:00
40 changed files with 2958 additions and 2 deletions

8
.env.example Normal file
View File

@@ -0,0 +1,8 @@
ATOCORE_DEBUG=false
ATOCORE_DATA_DIR=./data
ATOCORE_HOST=127.0.0.1
ATOCORE_PORT=8100
ATOCORE_EMBEDDING_MODEL=sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
ATOCORE_CHUNK_MAX_SIZE=800
ATOCORE_CHUNK_OVERLAP=100
ATOCORE_CONTEXT_BUDGET=3000

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
data/
__pycache__/
*.pyc
.env
*.egg-info/
dist/
build/
.pytest_cache/
htmlcov/
.coverage
venv/
.venv/

View File

@@ -1,3 +1,67 @@
# ATODrive
# AtoCore
ATODrive project repository
Personal context engine that enriches LLM interactions with durable memory, structured context, and project knowledge.
## Quick Start
```bash
pip install -e .
uvicorn src.atocore.main:app --port 8100
```
## Usage
```bash
# Ingest markdown files
curl -X POST http://localhost:8100/ingest \
-H "Content-Type: application/json" \
-d '{"path": "/path/to/notes"}'
# Build enriched context for a prompt
curl -X POST http://localhost:8100/context/build \
-H "Content-Type: application/json" \
-d '{"prompt": "What is the project status?", "project": "myproject"}'
# CLI ingestion
python scripts/ingest_folder.py --path /path/to/notes
```
## API Endpoints
| Method | Path | Description |
|--------|------|-------------|
| POST | /ingest | Ingest markdown file or folder |
| POST | /query | Retrieve relevant chunks |
| POST | /context/build | Build full context pack |
| GET | /health | Health check |
| GET | /debug/context | Inspect last context pack |
## Architecture
```
FastAPI (port 8100)
├── Ingestion: markdown → parse → chunk → embed → store
├── Retrieval: query → embed → vector search → rank
├── Context Builder: retrieve → boost → budget → format
├── SQLite (documents, chunks, memories, projects, interactions)
└── ChromaDB (vector embeddings)
```
## Configuration
Set via environment variables (prefix `ATOCORE_`):
| Variable | Default | Description |
|----------|---------|-------------|
| ATOCORE_DEBUG | false | Enable debug logging |
| ATOCORE_PORT | 8100 | Server port |
| ATOCORE_CHUNK_MAX_SIZE | 800 | Max chunk size (chars) |
| ATOCORE_CONTEXT_BUDGET | 3000 | Context pack budget (chars) |
| ATOCORE_EMBEDDING_MODEL | paraphrase-multilingual-MiniLM-L12-v2 | Embedding model |
## Testing
```bash
pip install -e ".[dev]"
pytest
```

36
pyproject.toml Normal file
View File

@@ -0,0 +1,36 @@
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "atocore"
version = "0.1.0"
description = "Personal context engine for LLM interactions"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.110.0",
"uvicorn[standard]>=0.27.0",
"python-frontmatter>=1.1.0",
"chromadb>=0.4.22",
"sentence-transformers>=2.5.0",
"pydantic>=2.6.0",
"pydantic-settings>=2.1.0",
"structlog>=24.1.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"httpx>=0.27.0",
"pyyaml>=6.0.0",
]
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = "--cov=atocore --cov-report=term-missing -v"

5
requirements-dev.txt Normal file
View File

@@ -0,0 +1,5 @@
-r requirements.txt
pytest>=8.0.0
pytest-cov>=4.1.0
httpx>=0.27.0
pyyaml>=6.0.0

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
fastapi>=0.110.0
uvicorn[standard]>=0.27.0
python-frontmatter>=1.1.0
chromadb>=0.4.22
sentence-transformers>=2.5.0
pydantic>=2.6.0
pydantic-settings>=2.1.0
structlog>=24.1.0

54
scripts/ingest_folder.py Normal file
View File

@@ -0,0 +1,54 @@
"""CLI script to ingest a folder of markdown files."""
import argparse
import json
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from atocore.ingestion.pipeline import ingest_folder
from atocore.models.database import init_db
from atocore.observability.logger import setup_logging
def main():
parser = argparse.ArgumentParser(description="Ingest markdown files into AtoCore")
parser.add_argument("--path", required=True, help="Path to folder with markdown files")
args = parser.parse_args()
setup_logging()
init_db()
folder = Path(args.path)
if not folder.is_dir():
print(f"Error: {folder} is not a directory")
sys.exit(1)
results = ingest_folder(folder)
# Summary
ingested = sum(1 for r in results if r["status"] == "ingested")
skipped = sum(1 for r in results if r["status"] == "skipped")
errors = sum(1 for r in results if r["status"] == "error")
total_chunks = sum(r.get("chunks", 0) for r in results)
print(f"\n{'='*50}")
print(f"Ingestion complete:")
print(f" Files processed: {len(results)}")
print(f" Ingested: {ingested}")
print(f" Skipped (unchanged): {skipped}")
print(f" Errors: {errors}")
print(f" Total chunks created: {total_chunks}")
print(f"{'='*50}")
if errors:
print("\nErrors:")
for r in results:
if r["status"] == "error":
print(f" {r['file']}: {r['error']}")
if __name__ == "__main__":
main()

76
scripts/query_test.py Normal file
View File

@@ -0,0 +1,76 @@
"""CLI script to run test prompts and compare baseline vs enriched."""
import argparse
import sys
from pathlib import Path
import yaml
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from atocore.context.builder import build_context
from atocore.models.database import init_db
from atocore.observability.logger import setup_logging
def main():
parser = argparse.ArgumentParser(description="Run test prompts against AtoCore")
parser.add_argument(
"--prompts",
default=str(Path(__file__).parent.parent / "tests" / "test_prompts" / "prompts.yaml"),
help="Path to prompts YAML file",
)
args = parser.parse_args()
setup_logging()
init_db()
prompts_path = Path(args.prompts)
if not prompts_path.exists():
print(f"Error: {prompts_path} not found")
sys.exit(1)
with open(prompts_path) as f:
data = yaml.safe_load(f)
prompts = data.get("prompts", [])
print(f"Running {len(prompts)} test prompts...\n")
for p in prompts:
prompt_id = p["id"]
prompt_text = p["prompt"]
project = p.get("project")
expected = p.get("expected", "")
print(f"{'='*60}")
print(f"[{prompt_id}] {prompt_text}")
print(f"Project: {project or 'none'}")
print(f"Expected: {expected}")
print(f"-" * 60)
pack = build_context(
user_prompt=prompt_text,
project_hint=project,
)
print(f"Chunks retrieved: {len(pack.chunks_used)}")
print(f"Total chars: {pack.total_chars} / {pack.budget}")
print(f"Duration: {pack.duration_ms}ms")
print()
for i, chunk in enumerate(pack.chunks_used[:5]):
print(f" [{i+1}] Score: {chunk.score:.2f} | {chunk.source_file}")
print(f" Section: {chunk.heading_path}")
print(f" Preview: {chunk.content[:120]}...")
print()
print(f"Full prompt length: {len(pack.full_prompt)} chars")
print()
print(f"{'='*60}")
print("Done. Review output above to assess retrieval quality.")
if __name__ == "__main__":
main()

3
src/atocore/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""AtoCore — Personal Context Engine."""
__version__ = "0.1.0"

View File

323
src/atocore/api/routes.py Normal file
View File

@@ -0,0 +1,323 @@
"""FastAPI route definitions."""
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from atocore.context.builder import (
build_context,
get_last_context_pack,
_pack_to_dict,
)
from atocore.context.project_state import (
CATEGORIES,
get_state,
invalidate_state,
set_state,
)
from atocore.ingestion.pipeline import ingest_file, ingest_folder, get_ingestion_stats
from atocore.memory.service import (
MEMORY_TYPES,
create_memory,
get_memories,
invalidate_memory,
supersede_memory,
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.retrieval.retriever import retrieve
from atocore.retrieval.vector_store import get_vector_store
router = APIRouter()
log = get_logger("api")
# --- Request/Response models ---
class IngestRequest(BaseModel):
path: str
class IngestResponse(BaseModel):
results: list[dict]
class QueryRequest(BaseModel):
prompt: str
top_k: int = 10
filter_tags: list[str] | None = None
class QueryResponse(BaseModel):
results: list[dict]
class ContextBuildRequest(BaseModel):
prompt: str
project: str | None = None
budget: int | None = None
class ContextBuildResponse(BaseModel):
formatted_context: str
full_prompt: str
chunks_used: int
total_chars: int
budget: int
budget_remaining: int
duration_ms: int
chunks: list[dict]
class MemoryCreateRequest(BaseModel):
memory_type: str
content: str
project: str = ""
confidence: float = 1.0
class MemoryUpdateRequest(BaseModel):
content: str | None = None
confidence: float | None = None
status: str | None = None
class ProjectStateSetRequest(BaseModel):
project: str
category: str
key: str
value: str
source: str = ""
confidence: float = 1.0
class ProjectStateGetRequest(BaseModel):
project: str
category: str | None = None
class ProjectStateInvalidateRequest(BaseModel):
project: str
category: str
key: str
# --- Endpoints ---
@router.post("/ingest", response_model=IngestResponse)
def api_ingest(req: IngestRequest) -> IngestResponse:
"""Ingest a markdown file or folder."""
target = Path(req.path)
try:
if target.is_file():
results = [ingest_file(target)]
elif target.is_dir():
results = ingest_folder(target)
else:
raise HTTPException(status_code=404, detail=f"Path not found: {req.path}")
except HTTPException:
raise
except Exception as e:
log.error("ingest_failed", path=req.path, error=str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {e}")
return IngestResponse(results=results)
@router.post("/query", response_model=QueryResponse)
def api_query(req: QueryRequest) -> QueryResponse:
"""Retrieve relevant chunks for a prompt."""
try:
chunks = retrieve(req.prompt, top_k=req.top_k, filter_tags=req.filter_tags)
except Exception as e:
log.error("query_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Query failed: {e}")
return QueryResponse(
results=[
{
"chunk_id": c.chunk_id,
"content": c.content,
"score": c.score,
"heading_path": c.heading_path,
"source_file": c.source_file,
"title": c.title,
}
for c in chunks
]
)
@router.post("/context/build", response_model=ContextBuildResponse)
def api_build_context(req: ContextBuildRequest) -> ContextBuildResponse:
"""Build a full context pack for a prompt."""
try:
pack = build_context(
user_prompt=req.prompt,
project_hint=req.project,
budget=req.budget,
)
except Exception as e:
log.error("context_build_failed", prompt=req.prompt[:100], error=str(e))
raise HTTPException(status_code=500, detail=f"Context build failed: {e}")
pack_dict = _pack_to_dict(pack)
return ContextBuildResponse(
formatted_context=pack.formatted_context,
full_prompt=pack.full_prompt,
chunks_used=len(pack.chunks_used),
total_chars=pack.total_chars,
budget=pack.budget,
budget_remaining=pack.budget_remaining,
duration_ms=pack.duration_ms,
chunks=pack_dict["chunks"],
)
@router.post("/memory")
def api_create_memory(req: MemoryCreateRequest) -> dict:
"""Create a new memory entry."""
try:
mem = create_memory(
memory_type=req.memory_type,
content=req.content,
project=req.project,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"status": "ok", "id": mem.id, "memory_type": mem.memory_type}
@router.get("/memory")
def api_get_memories(
memory_type: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
) -> dict:
"""List memories, optionally filtered."""
memories = get_memories(
memory_type=memory_type,
active_only=active_only,
min_confidence=min_confidence,
limit=limit,
)
return {
"memories": [
{
"id": m.id,
"memory_type": m.memory_type,
"content": m.content,
"confidence": m.confidence,
"status": m.status,
"updated_at": m.updated_at,
}
for m in memories
],
"types": MEMORY_TYPES,
}
@router.put("/memory/{memory_id}")
def api_update_memory(memory_id: str, req: MemoryUpdateRequest) -> dict:
"""Update an existing memory."""
try:
success = update_memory(
memory_id=memory_id,
content=req.content,
confidence=req.confidence,
status=req.status,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "updated", "id": memory_id}
@router.delete("/memory/{memory_id}")
def api_invalidate_memory(memory_id: str) -> dict:
"""Invalidate a memory (error correction)."""
success = invalidate_memory(memory_id)
if not success:
raise HTTPException(status_code=404, detail="Memory not found")
return {"status": "invalidated", "id": memory_id}
@router.post("/project/state")
def api_set_project_state(req: ProjectStateSetRequest) -> dict:
"""Set or update a trusted project state entry."""
try:
entry = set_state(
project_name=req.project,
category=req.category,
key=req.key,
value=req.value,
source=req.source,
confidence=req.confidence,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
log.error("set_state_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to set state: {e}")
return {"status": "ok", "id": entry.id, "category": entry.category, "key": entry.key}
@router.get("/project/state/{project_name}")
def api_get_project_state(project_name: str, category: str | None = None) -> dict:
"""Get trusted project state entries."""
entries = get_state(project_name, category=category)
return {
"project": project_name,
"entries": [
{
"id": e.id,
"category": e.category,
"key": e.key,
"value": e.value,
"source": e.source,
"confidence": e.confidence,
"status": e.status,
"updated_at": e.updated_at,
}
for e in entries
],
"categories": CATEGORIES,
}
@router.delete("/project/state")
def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
"""Invalidate (supersede) a project state entry."""
success = invalidate_state(req.project, req.category, req.key)
if not success:
raise HTTPException(status_code=404, detail="State entry not found or already invalidated")
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
@router.get("/health")
def api_health() -> dict:
"""Health check."""
store = get_vector_store()
return {
"status": "ok",
"version": "0.1.0",
"vectors_count": store.count,
}
@router.get("/stats")
def api_stats() -> dict:
"""Ingestion statistics."""
return get_ingestion_stats()
@router.get("/debug/context")
def api_debug_context() -> dict:
"""Inspect the last assembled context pack."""
pack = get_last_context_pack()
if pack is None:
return {"message": "No context pack built yet."}
return _pack_to_dict(pack)

39
src/atocore/config.py Normal file
View File

@@ -0,0 +1,39 @@
"""AtoCore configuration via environment variables."""
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
debug: bool = False
data_dir: Path = Path("./data")
host: str = "127.0.0.1"
port: int = 8100
# Embedding
embedding_model: str = (
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
)
# Chunking
chunk_max_size: int = 800
chunk_overlap: int = 100
chunk_min_size: int = 50
# Context
context_budget: int = 3000
context_top_k: int = 15
model_config = {"env_prefix": "ATOCORE_"}
@property
def db_path(self) -> Path:
return self.data_dir / "atocore.db"
@property
def chroma_path(self) -> Path:
return self.data_dir / "chroma"
settings = Settings()

View File

View File

@@ -0,0 +1,284 @@
"""Context pack assembly: retrieve, rank, budget, format.
Trust precedence (per Master Plan):
1. Trusted Project State → always included first, highest authority
2. Identity + Preference memories → included next
3. Retrieved chunks → ranked, deduplicated, budget-constrained
"""
import time
from dataclasses import dataclass, field
from pathlib import Path
from atocore.config import settings
from atocore.context.project_state import format_project_state, get_state
from atocore.memory.service import get_memories_for_context
from atocore.observability.logger import get_logger
from atocore.retrieval.retriever import ChunkResult, retrieve
log = get_logger("context_builder")
SYSTEM_PREFIX = (
"You have access to the following personal context from the user's knowledge base.\n"
"Use it to inform your answer. If the context is not relevant, ignore it.\n"
"Do not mention the context system unless asked.\n"
"When project state is provided, treat it as the most authoritative source."
)
# Budget allocation (per Master Plan section 9):
# identity: 5%, preferences: 5%, project state: 20%, retrieval: 60%+
PROJECT_STATE_BUDGET_RATIO = 0.20
MEMORY_BUDGET_RATIO = 0.10 # 5% identity + 5% preference
# Last built context pack for debug inspection
_last_context_pack: "ContextPack | None" = None
@dataclass
class ContextChunk:
content: str
source_file: str
heading_path: str
score: float
char_count: int
@dataclass
class ContextPack:
chunks_used: list[ContextChunk] = field(default_factory=list)
project_state_text: str = ""
project_state_chars: int = 0
memory_text: str = ""
memory_chars: int = 0
total_chars: int = 0
budget: int = 0
budget_remaining: int = 0
formatted_context: str = ""
full_prompt: str = ""
query: str = ""
project_hint: str = ""
duration_ms: int = 0
def build_context(
user_prompt: str,
project_hint: str | None = None,
budget: int | None = None,
) -> ContextPack:
"""Build a context pack for a user prompt.
Trust precedence applied:
1. Project state is injected first (highest trust)
2. Identity + preference memories (second trust level)
3. Retrieved chunks fill the remaining budget
"""
global _last_context_pack
start = time.time()
budget = budget or settings.context_budget
# 1. Get Trusted Project State (highest precedence)
project_state_text = ""
project_state_chars = 0
if project_hint:
state_entries = get_state(project_hint)
if state_entries:
project_state_text = format_project_state(state_entries)
project_state_chars = len(project_state_text)
# 2. Get identity + preference memories (second precedence)
memory_budget = int(budget * MEMORY_BUDGET_RATIO)
memory_text, memory_chars = get_memories_for_context(
memory_types=["identity", "preference"],
budget=memory_budget,
)
# 3. Calculate remaining budget for retrieval
retrieval_budget = budget - project_state_chars - memory_chars
# 4. Retrieve candidates
candidates = retrieve(user_prompt, top_k=settings.context_top_k)
# 5. Score and rank
scored = _rank_chunks(candidates, project_hint)
# 6. Select within remaining budget
selected = _select_within_budget(scored, max(retrieval_budget, 0))
# 7. Format full context
formatted = _format_full_context(project_state_text, memory_text, selected)
# 8. Build full prompt
full_prompt = f"{SYSTEM_PREFIX}\n\n{formatted}\n\n{user_prompt}"
retrieval_chars = sum(c.char_count for c in selected)
total_chars = project_state_chars + memory_chars + retrieval_chars
duration_ms = int((time.time() - start) * 1000)
pack = ContextPack(
chunks_used=selected,
project_state_text=project_state_text,
project_state_chars=project_state_chars,
memory_text=memory_text,
memory_chars=memory_chars,
total_chars=total_chars,
budget=budget,
budget_remaining=budget - total_chars,
formatted_context=formatted,
full_prompt=full_prompt,
query=user_prompt,
project_hint=project_hint or "",
duration_ms=duration_ms,
)
_last_context_pack = pack
log.info(
"context_built",
chunks_used=len(selected),
project_state_chars=project_state_chars,
memory_chars=memory_chars,
retrieval_chars=retrieval_chars,
total_chars=total_chars,
budget_remaining=budget - total_chars,
duration_ms=duration_ms,
)
log.debug("context_pack_detail", pack=_pack_to_dict(pack))
return pack
def get_last_context_pack() -> ContextPack | None:
"""Return the last built context pack for debug inspection."""
return _last_context_pack
def _rank_chunks(
candidates: list[ChunkResult],
project_hint: str | None,
) -> list[tuple[float, ChunkResult]]:
"""Rank candidates with boosting for project match."""
scored = []
seen_content: set[str] = set()
for chunk in candidates:
# Deduplicate by content prefix (first 200 chars)
content_key = chunk.content[:200]
if content_key in seen_content:
continue
seen_content.add(content_key)
# Base score from similarity
final_score = chunk.score
# Project boost
if project_hint:
tags_str = chunk.tags.lower() if chunk.tags else ""
source_str = chunk.source_file.lower()
title_str = chunk.title.lower() if chunk.title else ""
hint_lower = project_hint.lower()
if hint_lower in tags_str or hint_lower in source_str or hint_lower in title_str:
final_score *= 1.3
scored.append((final_score, chunk))
# Sort by score descending
scored.sort(key=lambda x: x[0], reverse=True)
return scored
def _select_within_budget(
scored: list[tuple[float, ChunkResult]],
budget: int,
) -> list[ContextChunk]:
"""Select top chunks that fit within the character budget."""
selected = []
used = 0
for score, chunk in scored:
chunk_len = len(chunk.content)
if used + chunk_len > budget:
continue
selected.append(
ContextChunk(
content=chunk.content,
source_file=_shorten_path(chunk.source_file),
heading_path=chunk.heading_path,
score=score,
char_count=chunk_len,
)
)
used += chunk_len
return selected
def _format_full_context(
project_state_text: str,
memory_text: str,
chunks: list[ContextChunk],
) -> str:
"""Format project state + memories + retrieved chunks into full context block."""
parts = []
# 1. Project state first (highest trust)
if project_state_text:
parts.append(project_state_text)
parts.append("")
# 2. Identity + preference memories (second trust level)
if memory_text:
parts.append(memory_text)
parts.append("")
# 3. Retrieved chunks (lowest trust)
if chunks:
parts.append("--- AtoCore Retrieved Context ---")
for chunk in chunks:
parts.append(
f"[Source: {chunk.source_file} | Section: {chunk.heading_path} | Score: {chunk.score:.2f}]"
)
parts.append(chunk.content)
parts.append("")
parts.append("--- End Context ---")
elif not project_state_text and not memory_text:
parts.append("--- AtoCore Context ---\nNo relevant context found.\n--- End Context ---")
return "\n".join(parts)
def _shorten_path(path: str) -> str:
"""Shorten an absolute path to a relative-like display."""
p = Path(path)
parts = p.parts
if len(parts) > 3:
return str(Path(*parts[-3:]))
return str(p)
def _pack_to_dict(pack: ContextPack) -> dict:
"""Convert a context pack to a JSON-serializable dict."""
return {
"query": pack.query,
"project_hint": pack.project_hint,
"project_state_chars": pack.project_state_chars,
"memory_chars": pack.memory_chars,
"chunks_used": len(pack.chunks_used),
"total_chars": pack.total_chars,
"budget": pack.budget,
"budget_remaining": pack.budget_remaining,
"duration_ms": pack.duration_ms,
"has_project_state": bool(pack.project_state_text),
"has_memories": bool(pack.memory_text),
"chunks": [
{
"source_file": c.source_file,
"heading_path": c.heading_path,
"score": c.score,
"char_count": c.char_count,
"content_preview": c.content[:100],
}
for c in pack.chunks_used
],
}

View File

@@ -0,0 +1,231 @@
"""Trusted Project State — the highest-priority context source.
Per the Master Plan trust precedence:
1. Trusted Project State (this module)
2. AtoDrive artifacts
3. Recent validated memory
4. AtoVault summaries
5. PKM chunks
6. Historical / low-confidence
Project state is manually curated or explicitly confirmed facts about a project.
It always wins over retrieval-based context when there's a conflict.
"""
import json
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("project_state")
# DB schema extension for project state
PROJECT_STATE_SCHEMA = """
CREATE TABLE IF NOT EXISTS project_state (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT DEFAULT '',
confidence REAL DEFAULT 1.0,
status TEXT DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(project_id, category, key)
);
CREATE INDEX IF NOT EXISTS idx_project_state_project ON project_state(project_id);
CREATE INDEX IF NOT EXISTS idx_project_state_category ON project_state(category);
CREATE INDEX IF NOT EXISTS idx_project_state_status ON project_state(status);
"""
# Valid categories for project state entries
CATEGORIES = [
"status", # current project status, phase, blockers
"decision", # confirmed design/engineering decisions
"requirement", # key requirements and constraints
"contact", # key people, vendors, stakeholders
"milestone", # dates, deadlines, deliverables
"fact", # verified technical facts
"config", # project configuration, parameters
]
@dataclass
class ProjectStateEntry:
id: str
project_id: str
category: str
key: str
value: str
source: str = ""
confidence: float = 1.0
status: str = "active"
created_at: str = ""
updated_at: str = ""
def init_project_state_schema() -> None:
"""Create the project_state table if it doesn't exist."""
with get_connection() as conn:
conn.executescript(PROJECT_STATE_SCHEMA)
log.info("project_state_schema_initialized")
def ensure_project(name: str, description: str = "") -> str:
"""Get or create a project by name. Returns project_id."""
with get_connection() as conn:
row = conn.execute(
"SELECT id FROM projects WHERE name = ?", (name,)
).fetchone()
if row:
return row["id"]
project_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO projects (id, name, description) VALUES (?, ?, ?)",
(project_id, name, description),
)
log.info("project_created", name=name, project_id=project_id)
return project_id
def set_state(
project_name: str,
category: str,
key: str,
value: str,
source: str = "",
confidence: float = 1.0,
) -> ProjectStateEntry:
"""Set or update a project state entry. Upsert semantics."""
if category not in CATEGORIES:
raise ValueError(f"Invalid category '{category}'. Must be one of: {CATEGORIES}")
project_id = ensure_project(project_name)
entry_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
with get_connection() as conn:
# Check if entry exists
existing = conn.execute(
"SELECT id FROM project_state WHERE project_id = ? AND category = ? AND key = ?",
(project_id, category, key),
).fetchone()
if existing:
entry_id = existing["id"]
conn.execute(
"UPDATE project_state SET value = ?, source = ?, confidence = ?, "
"status = 'active', updated_at = CURRENT_TIMESTAMP "
"WHERE id = ?",
(value, source, confidence, entry_id),
)
log.info("project_state_updated", project=project_name, category=category, key=key)
else:
conn.execute(
"INSERT INTO project_state (id, project_id, category, key, value, source, confidence) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(entry_id, project_id, category, key, value, source, confidence),
)
log.info("project_state_created", project=project_name, category=category, key=key)
return ProjectStateEntry(
id=entry_id,
project_id=project_id,
category=category,
key=key,
value=value,
source=source,
confidence=confidence,
status="active",
created_at=now,
updated_at=now,
)
def get_state(
project_name: str,
category: str | None = None,
active_only: bool = True,
) -> list[ProjectStateEntry]:
"""Get project state entries, optionally filtered by category."""
with get_connection() as conn:
project = conn.execute(
"SELECT id FROM projects WHERE name = ?", (project_name,)
).fetchone()
if not project:
return []
query = "SELECT * FROM project_state WHERE project_id = ?"
params: list = [project["id"]]
if category:
query += " AND category = ?"
params.append(category)
if active_only:
query += " AND status = 'active'"
query += " ORDER BY category, key"
rows = conn.execute(query, params).fetchall()
return [
ProjectStateEntry(
id=r["id"],
project_id=r["project_id"],
category=r["category"],
key=r["key"],
value=r["value"],
source=r["source"],
confidence=r["confidence"],
status=r["status"],
created_at=r["created_at"],
updated_at=r["updated_at"],
)
for r in rows
]
def invalidate_state(project_name: str, category: str, key: str) -> bool:
"""Mark a project state entry as superseded."""
with get_connection() as conn:
project = conn.execute(
"SELECT id FROM projects WHERE name = ?", (project_name,)
).fetchone()
if not project:
return False
result = conn.execute(
"UPDATE project_state SET status = 'superseded', updated_at = CURRENT_TIMESTAMP "
"WHERE project_id = ? AND category = ? AND key = ? AND status = 'active'",
(project["id"], category, key),
)
if result.rowcount > 0:
log.info("project_state_invalidated", project=project_name, category=category, key=key)
return True
return False
def format_project_state(entries: list[ProjectStateEntry]) -> str:
"""Format project state entries for context injection."""
if not entries:
return ""
lines = ["--- Trusted Project State ---"]
current_category = ""
for entry in entries:
if entry.category != current_category:
current_category = entry.category
lines.append(f"\n[{current_category.upper()}]")
lines.append(f" {entry.key}: {entry.value}")
if entry.source:
lines.append(f" (source: {entry.source})")
lines.append("\n--- End Project State ---")
return "\n".join(lines)

View File

View File

@@ -0,0 +1,150 @@
"""Heading-aware recursive markdown chunking."""
import re
from dataclasses import dataclass, field
from atocore.config import settings
@dataclass
class Chunk:
content: str
chunk_index: int
heading_path: str
char_count: int
metadata: dict = field(default_factory=dict)
def chunk_markdown(
body: str,
base_metadata: dict | None = None,
max_size: int | None = None,
overlap: int | None = None,
min_size: int | None = None,
) -> list[Chunk]:
"""Split markdown body into chunks using heading-aware strategy.
1. Split on H2 boundaries
2. If section > max_size, split on H3
3. If still > max_size, split on paragraph breaks
4. If still > max_size, hard split with overlap
"""
max_size = max_size or settings.chunk_max_size
overlap = overlap or settings.chunk_overlap
min_size = min_size or settings.chunk_min_size
base_metadata = base_metadata or {}
sections = _split_by_heading(body, level=2)
raw_chunks: list[tuple[str, str]] = [] # (heading_path, content)
for heading, content in sections:
if len(content) <= max_size:
raw_chunks.append((heading, content))
else:
# Try splitting on H3
subsections = _split_by_heading(content, level=3)
for sub_heading, sub_content in subsections:
full_path = (
f"{heading} > {sub_heading}" if heading and sub_heading else heading or sub_heading
)
if len(sub_content) <= max_size:
raw_chunks.append((full_path, sub_content))
else:
# Split on paragraphs
para_chunks = _split_by_paragraphs(
sub_content, max_size, overlap
)
for pc in para_chunks:
raw_chunks.append((full_path, pc))
# Build final chunks, filtering out too-small ones
chunks = []
idx = 0
for heading_path, content in raw_chunks:
content = content.strip()
if len(content) < min_size:
continue
chunks.append(
Chunk(
content=content,
chunk_index=idx,
heading_path=heading_path,
char_count=len(content),
metadata={**base_metadata},
)
)
idx += 1
return chunks
def _split_by_heading(text: str, level: int) -> list[tuple[str, str]]:
"""Split text by heading level. Returns (heading_text, section_content) pairs."""
pattern = rf"^({'#' * level})\s+(.+)$"
parts: list[tuple[str, str]] = []
current_heading = ""
current_lines: list[str] = []
for line in text.split("\n"):
match = re.match(pattern, line)
if match:
# Save previous section
if current_lines:
parts.append((current_heading, "\n".join(current_lines)))
current_heading = match.group(2).strip()
current_lines = []
else:
current_lines.append(line)
# Save last section
if current_lines:
parts.append((current_heading, "\n".join(current_lines)))
return parts
def _split_by_paragraphs(
text: str, max_size: int, overlap: int
) -> list[str]:
"""Split text by paragraph breaks, then hard-split if needed."""
paragraphs = re.split(r"\n\n+", text)
chunks: list[str] = []
current = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current) + len(para) + 2 <= max_size:
current = f"{current}\n\n{para}" if current else para
else:
if current:
chunks.append(current)
# If single paragraph exceeds max, hard split
if len(para) > max_size:
chunks.extend(_hard_split(para, max_size, overlap))
else:
current = para
continue
current = ""
if current:
chunks.append(current)
return chunks
def _hard_split(text: str, max_size: int, overlap: int) -> list[str]:
"""Hard split text at max_size with overlap."""
# Prevent infinite loop: overlap must be less than max_size
if overlap >= max_size:
overlap = max_size // 4
chunks = []
start = 0
while start < len(text):
end = start + max_size
chunks.append(text[start:end])
start = end - overlap
return chunks

View File

@@ -0,0 +1,65 @@
"""Markdown file parsing with frontmatter extraction."""
import re
from dataclasses import dataclass, field
from pathlib import Path
import frontmatter
@dataclass
class ParsedDocument:
file_path: str
title: str
body: str
tags: list[str] = field(default_factory=list)
frontmatter: dict = field(default_factory=dict)
headings: list[tuple[int, str]] = field(default_factory=list)
def parse_markdown(file_path: Path) -> ParsedDocument:
"""Parse a markdown file, extracting frontmatter and structure."""
text = file_path.read_text(encoding="utf-8")
post = frontmatter.loads(text)
meta = dict(post.metadata) if post.metadata else {}
body = post.content.strip()
# Extract title: first H1, or filename
title = _extract_title(body, file_path)
# Extract tags from frontmatter
tags = meta.get("tags", [])
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
tags = tags or []
# Extract heading structure
headings = _extract_headings(body)
return ParsedDocument(
file_path=str(file_path.resolve()),
title=title,
body=body,
tags=tags,
frontmatter=meta,
headings=headings,
)
def _extract_title(body: str, file_path: Path) -> str:
"""Get title from first H1 or fallback to filename."""
match = re.search(r"^#\s+(.+)$", body, re.MULTILINE)
if match:
return match.group(1).strip()
return file_path.stem.replace("_", " ").replace("-", " ").title()
def _extract_headings(body: str) -> list[tuple[int, str]]:
"""Extract all headings with their level."""
headings = []
for match in re.finditer(r"^(#{1,4})\s+(.+)$", body, re.MULTILINE):
level = len(match.group(1))
text = match.group(2).strip()
headings.append((level, text))
return headings

View File

@@ -0,0 +1,244 @@
"""Ingestion pipeline: parse → chunk → embed → store."""
import hashlib
import json
import time
import uuid
from pathlib import Path
from atocore.config import settings
from atocore.ingestion.chunker import chunk_markdown
from atocore.ingestion.parser import parse_markdown
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
from atocore.retrieval.vector_store import get_vector_store
log = get_logger("ingestion")
# Encodings to try when reading markdown files
_ENCODINGS = ["utf-8", "utf-8-sig", "latin-1", "cp1252"]
def ingest_file(file_path: Path) -> dict:
"""Ingest a single markdown file. Returns stats."""
start = time.time()
file_path = file_path.resolve()
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if file_path.suffix.lower() not in (".md", ".markdown"):
raise ValueError(f"Not a markdown file: {file_path}")
# Read with encoding fallback
raw_content = _read_file_safe(file_path)
file_hash = hashlib.sha256(raw_content.encode("utf-8")).hexdigest()
# Check if already ingested and unchanged
with get_connection() as conn:
existing = conn.execute(
"SELECT id, file_hash FROM source_documents WHERE file_path = ?",
(str(file_path),),
).fetchone()
if existing and existing["file_hash"] == file_hash:
log.info("file_skipped_unchanged", file_path=str(file_path))
return {"file": str(file_path), "status": "skipped", "reason": "unchanged"}
# Parse
parsed = parse_markdown(file_path)
# Chunk
base_meta = {
"source_file": str(file_path),
"tags": parsed.tags,
"title": parsed.title,
}
chunks = chunk_markdown(parsed.body, base_metadata=base_meta)
if not chunks:
log.warning("no_chunks_created", file_path=str(file_path))
return {"file": str(file_path), "status": "empty", "chunks": 0}
# Store in DB and vector store
doc_id = str(uuid.uuid4())
vector_store = get_vector_store()
with get_connection() as conn:
# Remove old data if re-ingesting
if existing:
doc_id = existing["id"]
old_chunk_ids = [
row["id"]
for row in conn.execute(
"SELECT id FROM source_chunks WHERE document_id = ?",
(doc_id,),
).fetchall()
]
conn.execute(
"DELETE FROM source_chunks WHERE document_id = ?", (doc_id,)
)
conn.execute(
"UPDATE source_documents SET file_hash = ?, title = ?, tags = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(file_hash, parsed.title, json.dumps(parsed.tags), doc_id),
)
# Remove old vectors
if old_chunk_ids:
vector_store.delete(old_chunk_ids)
else:
conn.execute(
"INSERT INTO source_documents (id, file_path, file_hash, title, doc_type, tags) VALUES (?, ?, ?, ?, ?, ?)",
(doc_id, str(file_path), file_hash, parsed.title, "markdown", json.dumps(parsed.tags)),
)
# Insert chunks
chunk_ids = []
chunk_contents = []
chunk_metadatas = []
for chunk in chunks:
chunk_id = str(uuid.uuid4())
chunk_ids.append(chunk_id)
chunk_contents.append(chunk.content)
chunk_metadatas.append({
"document_id": doc_id,
"heading_path": chunk.heading_path,
"source_file": str(file_path),
"tags": json.dumps(parsed.tags),
"title": parsed.title,
})
conn.execute(
"INSERT INTO source_chunks (id, document_id, chunk_index, content, heading_path, char_count, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
chunk_id,
doc_id,
chunk.chunk_index,
chunk.content,
chunk.heading_path,
chunk.char_count,
json.dumps(chunk.metadata),
),
)
# Store embeddings
vector_store.add(chunk_ids, chunk_contents, chunk_metadatas)
duration_ms = int((time.time() - start) * 1000)
log.info(
"file_ingested",
file_path=str(file_path),
chunks_created=len(chunks),
duration_ms=duration_ms,
)
return {
"file": str(file_path),
"status": "ingested",
"chunks": len(chunks),
"duration_ms": duration_ms,
}
def ingest_folder(folder_path: Path, purge_deleted: bool = True) -> list[dict]:
"""Ingest all markdown files in a folder recursively.
Args:
folder_path: Directory to scan for .md files.
purge_deleted: If True, remove DB/vector entries for files
that no longer exist on disk.
"""
folder_path = folder_path.resolve()
if not folder_path.is_dir():
raise NotADirectoryError(f"Not a directory: {folder_path}")
results = []
md_files = sorted(folder_path.rglob("*.md"))
current_paths = {str(f.resolve()) for f in md_files}
log.info("ingestion_started", folder=str(folder_path), file_count=len(md_files))
# Ingest new/changed files
for md_file in md_files:
try:
result = ingest_file(md_file)
results.append(result)
except Exception as e:
log.error("ingestion_error", file_path=str(md_file), error=str(e))
results.append({"file": str(md_file), "status": "error", "error": str(e)})
# Purge entries for deleted files
if purge_deleted:
deleted = _purge_deleted_files(folder_path, current_paths)
if deleted:
log.info("purged_deleted_files", count=deleted)
results.append({"status": "purged", "deleted_count": deleted})
return results
def get_ingestion_stats() -> dict:
"""Return ingestion statistics."""
with get_connection() as conn:
docs = conn.execute("SELECT COUNT(*) as c FROM source_documents").fetchone()
chunks = conn.execute("SELECT COUNT(*) as c FROM source_chunks").fetchone()
recent = conn.execute(
"SELECT file_path, title, ingested_at FROM source_documents "
"ORDER BY updated_at DESC LIMIT 5"
).fetchall()
vector_store = get_vector_store()
return {
"total_documents": docs["c"],
"total_chunks": chunks["c"],
"total_vectors": vector_store.count,
"recent_documents": [
{"file_path": r["file_path"], "title": r["title"], "ingested_at": r["ingested_at"]}
for r in recent
],
}
def _read_file_safe(file_path: Path) -> str:
"""Read a file with encoding fallback."""
for encoding in _ENCODINGS:
try:
return file_path.read_text(encoding=encoding)
except (UnicodeDecodeError, ValueError):
continue
# Last resort: read with errors replaced
return file_path.read_text(encoding="utf-8", errors="replace")
def _purge_deleted_files(folder_path: Path, current_paths: set[str]) -> int:
"""Remove DB/vector entries for files under folder_path that no longer exist."""
folder_str = str(folder_path)
deleted_count = 0
vector_store = get_vector_store()
with get_connection() as conn:
# Find documents under this folder
rows = conn.execute(
"SELECT id, file_path FROM source_documents WHERE file_path LIKE ?",
(f"{folder_str}%",),
).fetchall()
for row in rows:
if row["file_path"] not in current_paths:
doc_id = row["id"]
# Get chunk IDs for vector deletion
chunk_ids = [
r["id"]
for r in conn.execute(
"SELECT id FROM source_chunks WHERE document_id = ?",
(doc_id,),
).fetchall()
]
# Delete from DB
conn.execute("DELETE FROM source_chunks WHERE document_id = ?", (doc_id,))
conn.execute("DELETE FROM source_documents WHERE id = ?", (doc_id,))
# Delete from vectors
if chunk_ids:
vector_store.delete(chunk_ids)
log.info("purged_deleted_file", file_path=row["file_path"])
deleted_count += 1
return deleted_count

35
src/atocore/main.py Normal file
View File

@@ -0,0 +1,35 @@
"""AtoCore — FastAPI application entry point."""
from fastapi import FastAPI
from atocore.api.routes import router
from atocore.config import settings
from atocore.context.project_state import init_project_state_schema
from atocore.models.database import init_db
from atocore.observability.logger import setup_logging
app = FastAPI(
title="AtoCore",
description="Personal Context Engine for LLM interactions",
version="0.1.0",
)
app.include_router(router)
@app.on_event("startup")
def startup():
setup_logging()
init_db()
init_project_state_schema()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"atocore.main:app",
host=settings.host,
port=settings.port,
reload=True,
)

View File

View File

@@ -0,0 +1,231 @@
"""Memory Core — structured memory management.
Memory types (per Master Plan):
- identity: who the user is, role, background
- preference: how they like to work, style, tools
- project: project-specific knowledge and context
- episodic: what happened, conversations, events
- knowledge: verified facts, technical knowledge
- adaptation: learned corrections, behavioral adjustments
Memories have:
- confidence (0.01.0): how certain we are
- status (active/superseded/invalid): lifecycle state
- optional link to source chunk: traceability
"""
import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("memory")
MEMORY_TYPES = [
"identity",
"preference",
"project",
"episodic",
"knowledge",
"adaptation",
]
@dataclass
class Memory:
id: str
memory_type: str
content: str
project: str
source_chunk_id: str
confidence: float
status: str
created_at: str
updated_at: str
def create_memory(
memory_type: str,
content: str,
project: str = "",
source_chunk_id: str = "",
confidence: float = 1.0,
) -> Memory:
"""Create a new memory entry."""
if memory_type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type '{memory_type}'. Must be one of: {MEMORY_TYPES}")
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Check for duplicate content within same type+project
with get_connection() as conn:
existing = conn.execute(
"SELECT id FROM memories WHERE memory_type = ? AND content = ? AND status = 'active'",
(memory_type, content),
).fetchone()
if existing:
log.info("memory_duplicate_skipped", memory_type=memory_type, content_preview=content[:80])
return _row_to_memory(
conn.execute("SELECT * FROM memories WHERE id = ?", (existing["id"],)).fetchone()
)
conn.execute(
"INSERT INTO memories (id, memory_type, content, source_chunk_id, confidence, status) "
"VALUES (?, ?, ?, ?, ?, 'active')",
(memory_id, memory_type, content, source_chunk_id or None, confidence),
)
log.info("memory_created", memory_type=memory_type, content_preview=content[:80])
return Memory(
id=memory_id,
memory_type=memory_type,
content=content,
project=project,
source_chunk_id=source_chunk_id,
confidence=confidence,
status="active",
created_at=now,
updated_at=now,
)
def get_memories(
memory_type: str | None = None,
active_only: bool = True,
min_confidence: float = 0.0,
limit: int = 50,
) -> list[Memory]:
"""Retrieve memories, optionally filtered."""
query = "SELECT * FROM memories WHERE 1=1"
params: list = []
if memory_type:
query += " AND memory_type = ?"
params.append(memory_type)
if active_only:
query += " AND status = 'active'"
if min_confidence > 0:
query += " AND confidence >= ?"
params.append(min_confidence)
query += " ORDER BY confidence DESC, updated_at DESC LIMIT ?"
params.append(limit)
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_memory(r) for r in rows]
def update_memory(
memory_id: str,
content: str | None = None,
confidence: float | None = None,
status: str | None = None,
) -> bool:
"""Update an existing memory."""
updates = []
params: list = []
if content is not None:
updates.append("content = ?")
params.append(content)
if confidence is not None:
updates.append("confidence = ?")
params.append(confidence)
if status is not None:
if status not in ("active", "superseded", "invalid"):
raise ValueError(f"Invalid status '{status}'")
updates.append("status = ?")
params.append(status)
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(memory_id)
with get_connection() as conn:
result = conn.execute(
f"UPDATE memories SET {', '.join(updates)} WHERE id = ?",
params,
)
if result.rowcount > 0:
log.info("memory_updated", memory_id=memory_id)
return True
return False
def invalidate_memory(memory_id: str) -> bool:
"""Mark a memory as invalid (error correction)."""
return update_memory(memory_id, status="invalid")
def supersede_memory(memory_id: str) -> bool:
"""Mark a memory as superseded (replaced by newer info)."""
return update_memory(memory_id, status="superseded")
def get_memories_for_context(
memory_types: list[str] | None = None,
budget: int = 500,
) -> tuple[str, int]:
"""Get formatted memories for context injection.
Returns (formatted_text, char_count).
Budget allocation per Master Plan section 9:
identity: 5%, preference: 5%, rest from retrieval budget
"""
if memory_types is None:
memory_types = ["identity", "preference"]
memories = []
for mtype in memory_types:
memories.extend(get_memories(memory_type=mtype, min_confidence=0.5, limit=10))
if not memories:
return "", 0
lines = ["--- AtoCore Memory ---"]
used = len(lines[0]) + 1
included = []
for mem in memories:
entry = f"[{mem.memory_type}] {mem.content}"
entry_len = len(entry) + 1
if used + entry_len > budget:
break
lines.append(entry)
used += entry_len
included.append(mem)
if len(included) == 0:
return "", 0
lines.append("--- End Memory ---")
text = "\n".join(lines)
log.info("memories_for_context", count=len(included), chars=len(text))
return text, len(text)
def _row_to_memory(row) -> Memory:
"""Convert a DB row to Memory dataclass."""
return Memory(
id=row["id"],
memory_type=row["memory_type"],
content=row["content"],
project="",
source_chunk_id=row["source_chunk_id"] or "",
confidence=row["confidence"],
status=row["status"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)

View File

View File

@@ -0,0 +1,98 @@
"""SQLite database schema and connection management."""
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Generator
import atocore.config as _config
from atocore.observability.logger import get_logger
log = get_logger("database")
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS source_documents (
id TEXT PRIMARY KEY,
file_path TEXT UNIQUE NOT NULL,
file_hash TEXT NOT NULL,
title TEXT,
doc_type TEXT DEFAULT 'markdown',
tags TEXT DEFAULT '[]',
ingested_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES source_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
heading_path TEXT DEFAULT '',
char_count INTEGER NOT NULL,
metadata TEXT DEFAULT '{}',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
source_chunk_id TEXT REFERENCES source_chunks(id),
confidence REAL DEFAULT 1.0,
status TEXT DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS interactions (
id TEXT PRIMARY KEY,
prompt TEXT NOT NULL,
context_pack TEXT DEFAULT '{}',
response_summary TEXT DEFAULT '',
project_id TEXT REFERENCES projects(id),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_chunks_document ON source_chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status);
CREATE INDEX IF NOT EXISTS idx_interactions_project ON interactions(project_id);
"""
def _ensure_data_dir() -> None:
_config.settings.data_dir.mkdir(parents=True, exist_ok=True)
def init_db() -> None:
"""Initialize the database with schema."""
_ensure_data_dir()
with get_connection() as conn:
conn.executescript(SCHEMA_SQL)
log.info("database_initialized", path=str(_config.settings.db_path))
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Get a database connection with row factory."""
_ensure_data_dir()
conn = sqlite3.connect(str(_config.settings.db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()

View File

View File

@@ -0,0 +1,41 @@
"""Structured logging for AtoCore."""
import logging
import structlog
from atocore.config import settings
_LOG_LEVELS = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
}
def setup_logging() -> None:
"""Configure structlog with JSON output."""
log_level = "DEBUG" if settings.debug else "INFO"
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer()
if settings.debug
else structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
_LOG_LEVELS.get(log_level, logging.INFO)
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str) -> structlog.BoundLogger:
"""Get a named logger."""
return structlog.get_logger(name)

View File

View File

@@ -0,0 +1,32 @@
"""Embedding model management."""
from sentence_transformers import SentenceTransformer
from atocore.config import settings
from atocore.observability.logger import get_logger
log = get_logger("embeddings")
_model: SentenceTransformer | None = None
def get_model() -> SentenceTransformer:
"""Load and cache the embedding model."""
global _model
if _model is None:
log.info("loading_embedding_model", model=settings.embedding_model)
_model = SentenceTransformer(settings.embedding_model)
log.info("embedding_model_loaded", model=settings.embedding_model)
return _model
def embed_texts(texts: list[str]) -> list[list[float]]:
"""Generate embeddings for a list of texts."""
model = get_model()
embeddings = model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
return embeddings.tolist()
def embed_query(query: str) -> list[float]:
"""Generate embedding for a single query."""
return embed_texts([query])[0]

View File

@@ -0,0 +1,92 @@
"""Retrieval: query → ranked chunks."""
import time
from dataclasses import dataclass
from atocore.config import settings
from atocore.observability.logger import get_logger
from atocore.retrieval.embeddings import embed_query
from atocore.retrieval.vector_store import get_vector_store
log = get_logger("retriever")
@dataclass
class ChunkResult:
chunk_id: str
content: str
score: float
heading_path: str
source_file: str
tags: str
title: str
document_id: str
def retrieve(
query: str,
top_k: int | None = None,
filter_tags: list[str] | None = None,
) -> list[ChunkResult]:
"""Retrieve the most relevant chunks for a query."""
top_k = top_k or settings.context_top_k
start = time.time()
query_embedding = embed_query(query)
store = get_vector_store()
# Build filter
# Tags are stored as JSON strings like '["tag1", "tag2"]'.
# We use $contains with quoted tag to avoid substring false positives
# (e.g. searching "prod" won't match "production" because we search '"prod"').
where = None
if filter_tags:
if len(filter_tags) == 1:
where = {"tags": {"$contains": f'"{filter_tags[0]}"'}}
else:
where = {
"$and": [
{"tags": {"$contains": f'"{tag}"'}}
for tag in filter_tags
]
}
results = store.query(
query_embedding=query_embedding,
top_k=top_k,
where=where,
)
chunks = []
if results and results["ids"] and results["ids"][0]:
for i, chunk_id in enumerate(results["ids"][0]):
# ChromaDB returns distances (lower = more similar for cosine)
# Convert to similarity score (1 - distance)
distance = results["distances"][0][i] if results["distances"] else 0
score = 1.0 - distance
meta = results["metadatas"][0][i] if results["metadatas"] else {}
content = results["documents"][0][i] if results["documents"] else ""
chunks.append(
ChunkResult(
chunk_id=chunk_id,
content=content,
score=round(score, 4),
heading_path=meta.get("heading_path", ""),
source_file=meta.get("source_file", ""),
tags=meta.get("tags", "[]"),
title=meta.get("title", ""),
document_id=meta.get("document_id", ""),
)
)
duration_ms = int((time.time() - start) * 1000)
log.info(
"retrieval_done",
query=query[:100],
top_k=top_k,
results_count=len(chunks),
duration_ms=duration_ms,
)
return chunks

View File

@@ -0,0 +1,77 @@
"""ChromaDB vector store wrapper."""
import chromadb
from atocore.config import settings
from atocore.observability.logger import get_logger
from atocore.retrieval.embeddings import embed_texts
log = get_logger("vector_store")
COLLECTION_NAME = "atocore_chunks"
_store: "VectorStore | None" = None
class VectorStore:
"""Wrapper around ChromaDB for chunk storage and retrieval."""
def __init__(self) -> None:
settings.chroma_path.mkdir(parents=True, exist_ok=True)
self._client = chromadb.PersistentClient(path=str(settings.chroma_path))
self._collection = self._client.get_or_create_collection(
name=COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
log.info("vector_store_initialized", path=str(settings.chroma_path))
def add(
self,
ids: list[str],
documents: list[str],
metadatas: list[dict],
) -> None:
"""Add chunks with embeddings to the store."""
embeddings = embed_texts(documents)
self._collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
log.debug("vectors_added", count=len(ids))
def query(
self,
query_embedding: list[float],
top_k: int = 10,
where: dict | None = None,
) -> dict:
"""Query the store for similar chunks."""
kwargs: dict = {
"query_embeddings": [query_embedding],
"n_results": top_k,
"include": ["documents", "metadatas", "distances"],
}
if where:
kwargs["where"] = where
return self._collection.query(**kwargs)
def delete(self, ids: list[str]) -> None:
"""Delete chunks by IDs."""
if ids:
self._collection.delete(ids=ids)
log.debug("vectors_deleted", count=len(ids))
@property
def count(self) -> int:
return self._collection.count()
def get_vector_store() -> VectorStore:
"""Get or create the singleton vector store."""
global _store
if _store is None:
_store = VectorStore()
return _store

0
tests/__init__.py Normal file
View File

115
tests/conftest.py Normal file
View File

@@ -0,0 +1,115 @@
"""pytest configuration and shared fixtures."""
import os
import tempfile
from pathlib import Path
import pytest
# Default test data directory — overridden per-test by fixtures
_default_test_dir = tempfile.mkdtemp(prefix="atocore_test_")
os.environ["ATOCORE_DATA_DIR"] = _default_test_dir
os.environ["ATOCORE_DEBUG"] = "true"
@pytest.fixture
def tmp_data_dir(tmp_path):
"""Provide a temporary data directory for tests."""
os.environ["ATOCORE_DATA_DIR"] = str(tmp_path)
# Reset singletons
from atocore import config
config.settings = config.Settings()
import atocore.retrieval.vector_store as vs
vs._store = None
return tmp_path
@pytest.fixture
def sample_markdown(tmp_path) -> Path:
"""Create a sample markdown file for testing."""
md_file = tmp_path / "test_note.md"
md_file.write_text(
"""---
tags:
- atocore
- architecture
date: 2026-04-05
---
# AtoCore Architecture
## Overview
AtoCore is a personal context engine that enriches LLM interactions
with durable memory, structured context, and project knowledge.
## Layers
The system has these layers:
1. Main PKM (human, messy, exploratory)
2. AtoVault (system mirror)
3. AtoDrive (trusted project truth)
4. Structured Memory (DB)
5. Semantic Retrieval (vector DB)
## Memory Types
AtoCore supports these memory types:
- Identity
- Preferences
- Project Memory
- Episodic Memory
- Knowledge Objects
- Adaptation Memory
- Trusted Project State
## Trust Precedence
When sources conflict:
1. Trusted Project State wins
2. AtoDrive overrides PKM
3. Most recent confirmed wins
4. Higher confidence wins
5. Equal → flag conflict
No silent merging.
""",
encoding="utf-8",
)
return md_file
@pytest.fixture
def sample_folder(tmp_path, sample_markdown) -> Path:
"""Create a folder with multiple markdown files."""
# Already has test_note.md from sample_markdown
second = tmp_path / "second_note.md"
second.write_text(
"""---
tags:
- chunking
---
# Chunking Strategy
## Approach
Heading-aware recursive splitting:
1. Split on H2 boundaries first
2. If section > 800 chars, split on H3
3. If still > 800 chars, split on paragraphs
4. Hard split at 800 chars with 100 char overlap
## Parameters
- max_chunk_size: 800 characters
- overlap: 100 characters
- min_chunk_size: 50 characters
""",
encoding="utf-8",
)
return tmp_path

73
tests/test_chunker.py Normal file
View File

@@ -0,0 +1,73 @@
"""Tests for the markdown chunker."""
from atocore.ingestion.chunker import chunk_markdown
def test_basic_chunking():
"""Test that markdown is split into chunks."""
body = """## Section One
This is the first section with some content that is long enough to pass the minimum chunk size filter applied by the chunker.
## Section Two
This is the second section with different content that is also long enough to pass the minimum chunk size threshold.
"""
chunks = chunk_markdown(body)
assert len(chunks) >= 2
assert all(c.char_count > 0 for c in chunks)
assert all(c.chunk_index >= 0 for c in chunks)
def test_heading_path_preserved():
"""Test that heading paths are captured."""
body = """## Architecture
### Layers
The system has multiple layers organized in a clear hierarchy for separation of concerns and maintainability.
"""
chunks = chunk_markdown(body)
assert len(chunks) >= 1
# At least one chunk should have heading info
has_heading = any(c.heading_path for c in chunks)
assert has_heading
def test_small_chunks_filtered():
"""Test that very small chunks are discarded."""
body = """## A
Hi
## B
This is a real section with enough content to pass the minimum size threshold.
"""
chunks = chunk_markdown(body, min_size=50)
# "Hi" should be filtered out
for c in chunks:
assert c.char_count >= 50
def test_large_section_split():
"""Test that large sections are split further."""
large_content = "Word " * 200 # ~1000 chars
body = f"## Big Section\n\n{large_content}"
chunks = chunk_markdown(body, max_size=400)
assert len(chunks) >= 2
def test_metadata_passed_through():
"""Test that base metadata is included in chunks."""
body = "## Test\n\nSome content here that is long enough."
meta = {"source_file": "/test/file.md", "tags": ["test"]}
chunks = chunk_markdown(body, base_metadata=meta)
if chunks:
assert chunks[0].metadata.get("source_file") == "/test/file.md"
def test_empty_body():
"""Test chunking an empty body."""
chunks = chunk_markdown("")
assert chunks == []

View File

@@ -0,0 +1,108 @@
"""Tests for the context builder."""
from atocore.context.builder import build_context, get_last_context_pack
from atocore.context.project_state import init_project_state_schema, set_state
from atocore.ingestion.pipeline import ingest_file
from atocore.models.database import init_db
def test_build_context_returns_pack(tmp_data_dir, sample_markdown):
"""Test that context builder returns a valid pack."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
pack = build_context("What is AtoCore?")
assert pack.total_chars > 0
assert len(pack.chunks_used) > 0
assert pack.budget_remaining >= 0
assert "--- End Context ---" in pack.formatted_context
def test_context_respects_budget(tmp_data_dir, sample_markdown):
"""Test that context builder respects character budget."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
pack = build_context("What is AtoCore?", budget=500)
assert pack.total_chars <= 500
def test_context_with_project_hint(tmp_data_dir, sample_markdown):
"""Test that project hint boosts relevant chunks."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
pack = build_context("What is the architecture?", project_hint="atocore")
assert len(pack.chunks_used) > 0
assert pack.total_chars > 0
def test_last_context_pack_stored(tmp_data_dir, sample_markdown):
"""Test that last context pack is stored for debug."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
build_context("test prompt")
last = get_last_context_pack()
assert last is not None
assert last.query == "test prompt"
def test_full_prompt_structure(tmp_data_dir, sample_markdown):
"""Test that the full prompt has correct structure."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
pack = build_context("What are memory types?")
assert "knowledge base" in pack.full_prompt.lower()
assert "What are memory types?" in pack.full_prompt
def test_project_state_included_in_context(tmp_data_dir, sample_markdown):
"""Test that trusted project state is injected into context."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
# Set some project state
set_state("atocore", "status", "phase", "Phase 0.5 complete")
set_state("atocore", "decision", "database", "SQLite for structured data")
pack = build_context("What is AtoCore?", project_hint="atocore")
# Project state should appear in context
assert "--- Trusted Project State ---" in pack.formatted_context
assert "Phase 0.5 complete" in pack.formatted_context
assert "SQLite for structured data" in pack.formatted_context
assert pack.project_state_chars > 0
def test_project_state_takes_priority_budget(tmp_data_dir, sample_markdown):
"""Test that project state is included even with tight budget."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
set_state("atocore", "status", "phase", "Phase 1 in progress")
# Small budget — project state should still be included
pack = build_context("status?", project_hint="atocore", budget=500)
assert "Phase 1 in progress" in pack.formatted_context
def test_no_project_state_without_hint(tmp_data_dir, sample_markdown):
"""Test that project state is not included without project hint."""
init_db()
init_project_state_schema()
ingest_file(sample_markdown)
set_state("atocore", "status", "phase", "Phase 1")
pack = build_context("What is AtoCore?")
assert pack.project_state_chars == 0
assert "--- Trusted Project State ---" not in pack.formatted_context

71
tests/test_ingestion.py Normal file
View File

@@ -0,0 +1,71 @@
"""Tests for the ingestion pipeline."""
from pathlib import Path
from atocore.ingestion.parser import parse_markdown
from atocore.models.database import get_connection, init_db
from atocore.ingestion.pipeline import ingest_file
def test_parse_markdown(sample_markdown):
"""Test markdown parsing with frontmatter."""
parsed = parse_markdown(sample_markdown)
assert parsed.title == "AtoCore Architecture"
assert "atocore" in parsed.tags
assert "architecture" in parsed.tags
assert len(parsed.body) > 0
assert len(parsed.headings) > 0
def test_parse_extracts_headings(sample_markdown):
"""Test that headings are extracted correctly."""
parsed = parse_markdown(sample_markdown)
heading_texts = [h[1] for h in parsed.headings]
assert "AtoCore Architecture" in heading_texts
assert "Overview" in heading_texts
def test_ingest_file(tmp_data_dir, sample_markdown):
"""Test ingesting a single file."""
init_db()
result = ingest_file(sample_markdown)
assert result["status"] == "ingested"
assert result["chunks"] > 0
# Verify the file was stored in DB
with get_connection() as conn:
doc = conn.execute(
"SELECT COUNT(*) as c FROM source_documents WHERE file_path = ?",
(str(sample_markdown.resolve()),),
).fetchone()
assert doc["c"] == 1
chunks = conn.execute(
"SELECT COUNT(*) as c FROM source_chunks sc "
"JOIN source_documents sd ON sc.document_id = sd.id "
"WHERE sd.file_path = ?",
(str(sample_markdown.resolve()),),
).fetchone()
assert chunks["c"] > 0
def test_ingest_skips_unchanged(tmp_data_dir, sample_markdown):
"""Test that re-ingesting unchanged file is skipped."""
init_db()
ingest_file(sample_markdown)
result = ingest_file(sample_markdown)
assert result["status"] == "skipped"
def test_ingest_updates_changed(tmp_data_dir, sample_markdown):
"""Test that changed files are re-ingested."""
init_db()
ingest_file(sample_markdown)
# Modify the file
sample_markdown.write_text(
sample_markdown.read_text(encoding="utf-8") + "\n\n## New Section\n\nNew content added.",
encoding="utf-8",
)
result = ingest_file(sample_markdown)
assert result["status"] == "ingested"

133
tests/test_memory.py Normal file
View File

@@ -0,0 +1,133 @@
"""Tests for Memory Core."""
import os
import tempfile
import pytest
import atocore.config as _config
from atocore.models.database import init_db
@pytest.fixture(autouse=True)
def isolated_db():
"""Give each test a completely isolated database."""
tmpdir = tempfile.mkdtemp()
os.environ["ATOCORE_DATA_DIR"] = tmpdir
# Replace the global settings so all modules see the new data_dir
_config.settings = _config.Settings()
# Also reset any module-level references to the old settings
import atocore.models.database
# database.py now uses _config.settings dynamically, so no patch needed
init_db()
yield tmpdir
def test_create_memory(isolated_db):
from atocore.memory.service import create_memory
mem = create_memory("identity", "User is a mechanical engineer specializing in optics")
assert mem.memory_type == "identity"
assert mem.status == "active"
assert mem.confidence == 1.0
def test_create_memory_invalid_type(isolated_db):
from atocore.memory.service import create_memory
with pytest.raises(ValueError, match="Invalid memory type"):
create_memory("invalid_type", "some content")
def test_create_memory_dedup(isolated_db):
from atocore.memory.service import create_memory
m1 = create_memory("identity", "User is an engineer")
m2 = create_memory("identity", "User is an engineer")
assert m1.id == m2.id
def test_get_memories_all(isolated_db):
from atocore.memory.service import create_memory, get_memories
create_memory("identity", "User is an engineer")
create_memory("preference", "Prefers Python with type hints")
create_memory("knowledge", "Zerodur has near-zero thermal expansion")
mems = get_memories()
assert len(mems) == 3
def test_get_memories_by_type(isolated_db):
from atocore.memory.service import create_memory, get_memories
create_memory("identity", "User is an engineer")
create_memory("preference", "Prefers concise code")
create_memory("preference", "Uses FastAPI for APIs")
mems = get_memories(memory_type="preference")
assert len(mems) == 2
def test_get_memories_active_only(isolated_db):
from atocore.memory.service import create_memory, get_memories, invalidate_memory
m = create_memory("knowledge", "Fact about optics")
invalidate_memory(m.id)
assert len(get_memories(active_only=True)) == 0
assert len(get_memories(active_only=False)) == 1
def test_get_memories_min_confidence(isolated_db):
from atocore.memory.service import create_memory, get_memories
create_memory("knowledge", "High confidence fact", confidence=0.9)
create_memory("knowledge", "Low confidence fact", confidence=0.3)
high = get_memories(min_confidence=0.5)
assert len(high) == 1
assert high[0].confidence == 0.9
def test_update_memory(isolated_db):
from atocore.memory.service import create_memory, get_memories, update_memory
mem = create_memory("knowledge", "Initial fact")
update_memory(mem.id, content="Updated fact", confidence=0.8)
mems = get_memories()
assert len(mems) == 1
assert mems[0].content == "Updated fact"
assert mems[0].confidence == 0.8
def test_invalidate_memory(isolated_db):
from atocore.memory.service import create_memory, get_memories, invalidate_memory
mem = create_memory("knowledge", "Wrong fact")
invalidate_memory(mem.id)
assert len(get_memories(active_only=True)) == 0
def test_supersede_memory(isolated_db):
from atocore.memory.service import create_memory, get_memories, supersede_memory
mem = create_memory("knowledge", "Old fact")
supersede_memory(mem.id)
mems = get_memories(active_only=False)
assert len(mems) == 1
assert mems[0].status == "superseded"
def test_memories_for_context(isolated_db):
from atocore.memory.service import create_memory, get_memories_for_context
create_memory("identity", "User is a senior mechanical engineer")
create_memory("preference", "Prefers Python with type hints")
text, chars = get_memories_for_context(memory_types=["identity", "preference"], budget=500)
assert "--- AtoCore Memory ---" in text
assert "[identity]" in text
assert "[preference]" in text
assert chars > 0
def test_memories_for_context_empty(isolated_db):
from atocore.memory.service import get_memories_for_context
text, chars = get_memories_for_context()
assert text == ""
assert chars == 0

127
tests/test_project_state.py Normal file
View File

@@ -0,0 +1,127 @@
"""Tests for Trusted Project State."""
import pytest
from atocore.context.project_state import (
CATEGORIES,
ensure_project,
format_project_state,
get_state,
init_project_state_schema,
invalidate_state,
set_state,
)
from atocore.models.database import init_db
@pytest.fixture(autouse=True)
def setup_db(tmp_data_dir):
"""Initialize DB and project state schema for every test."""
init_db()
init_project_state_schema()
def test_ensure_project_creates():
"""Test creating a new project."""
pid = ensure_project("test-project", "A test project")
assert pid
# Second call returns same ID
pid2 = ensure_project("test-project")
assert pid == pid2
def test_set_state_creates_entry():
"""Test creating a project state entry."""
entry = set_state("myproject", "status", "phase", "Phase 0.5 — PoC complete")
assert entry.category == "status"
assert entry.key == "phase"
assert entry.value == "Phase 0.5 — PoC complete"
assert entry.status == "active"
def test_set_state_upserts():
"""Test that setting same key updates the value."""
set_state("myproject", "status", "phase", "Phase 0")
entry = set_state("myproject", "status", "phase", "Phase 1")
assert entry.value == "Phase 1"
# Only one entry should exist
entries = get_state("myproject", category="status")
assert len(entries) == 1
assert entries[0].value == "Phase 1"
def test_set_state_invalid_category():
"""Test that invalid category raises ValueError."""
with pytest.raises(ValueError, match="Invalid category"):
set_state("myproject", "invalid_category", "key", "value")
def test_get_state_all():
"""Test getting all state entries for a project."""
set_state("proj", "status", "phase", "Phase 1")
set_state("proj", "decision", "database", "SQLite for v1")
set_state("proj", "requirement", "latency", "<2 seconds")
entries = get_state("proj")
assert len(entries) == 3
categories = {e.category for e in entries}
assert categories == {"status", "decision", "requirement"}
def test_get_state_by_category():
"""Test filtering by category."""
set_state("proj", "status", "phase", "Phase 1")
set_state("proj", "decision", "database", "SQLite")
set_state("proj", "decision", "vectordb", "ChromaDB")
entries = get_state("proj", category="decision")
assert len(entries) == 2
assert all(e.category == "decision" for e in entries)
def test_get_state_nonexistent_project():
"""Test getting state for a project that doesn't exist."""
entries = get_state("nonexistent")
assert entries == []
def test_invalidate_state():
"""Test marking a state entry as superseded."""
set_state("invalidate-test", "decision", "approach", "monolith")
success = invalidate_state("invalidate-test", "decision", "approach")
assert success
# Active entries should be empty
entries = get_state("invalidate-test", active_only=True)
assert len(entries) == 0
# But entry still exists if we include inactive
entries = get_state("invalidate-test", active_only=False)
assert len(entries) == 1
assert entries[0].status == "superseded"
def test_invalidate_nonexistent():
"""Test invalidating a nonexistent entry."""
success = invalidate_state("proj", "decision", "nonexistent")
assert not success
def test_format_project_state():
"""Test formatting state entries for context injection."""
set_state("proj", "status", "phase", "Phase 1")
set_state("proj", "decision", "database", "SQLite", source="Build Spec V1")
entries = get_state("proj")
formatted = format_project_state(entries)
assert "--- Trusted Project State ---" in formatted
assert "--- End Project State ---" in formatted
assert "phase: Phase 1" in formatted
assert "database: SQLite" in formatted
assert "(source: Build Spec V1)" in formatted
def test_format_empty():
"""Test formatting empty state."""
assert format_project_state([]) == ""

View File

@@ -0,0 +1,40 @@
prompts:
- id: g1
prompt: "What is the GigaBIT M1 project about?"
project: gigabit
expected: "Should mention 1.2m primary mirror, StarSpec, telescope"
- id: g2
prompt: "What are the main requirements for the M1 mirror?"
project: gigabit
expected: "Should mention optical/mechanical requirements, SOW, diameter, Zerodur"
- id: g3
prompt: "What vendors are involved in the project?"
project: gigabit
expected: "Should mention Optiques Fullum, StarSpec, Atomaste, or subcontractors"
- id: g4
prompt: "What is the status of the CDR?"
project: gigabit
expected: "Should mention Critical Design Review status, CBUSH, design completion"
- id: g5
prompt: "What are the key design decisions made so far?"
project: gigabit
expected: "Should mention design phases, PDR, assumptions, blank order"
- id: g6
prompt: "What FEA optimization work has been done?"
project: gigabit
expected: "Should mention FEA analysis, optimization approach, WFE, displacement data"
- id: g7
prompt: "What is the cost reduction strategy?"
project: gigabit
expected: "Should mention cost reduction campaign, trade-off, topology selection"
- id: g8
prompt: "What are the mirror blank specifications?"
project: gigabit
expected: "Should mention 1200mm diameter, Zerodur, optical specifications"

View File

@@ -0,0 +1,40 @@
prompts:
- id: p1
prompt: "What is AtoCore's architecture?"
project: atocore
expected: "Should mention layered architecture, SQLite, vector DB"
- id: p2
prompt: "What chunking strategy does AtoCore use?"
project: atocore
expected: "Should mention heading-aware splitting, 800 char max"
- id: p3
prompt: "What is the trust precedence order?"
project: atocore
expected: "Should list: Trusted Project State > AtoDrive > validated memory"
- id: p4
prompt: "How does AtoCore handle conflicts between sources?"
project: atocore
expected: "Should mention conflict resolution rules, no silent merging"
- id: p5
prompt: "What are the different memory types?"
project: atocore
expected: "Should list: Identity, Preferences, Project, Episodic, Knowledge, Adaptation, Trusted Project State"
- id: p6
prompt: "What is the context budget allocation?"
project: atocore
expected: "Should mention percentages: identity 5%, preferences 5%, project 20%, episodic 10%, retrieval 60%"
- id: p7
prompt: "What is a trivial prompt in AtoCore?"
project: atocore
expected: "Should mention: no project ref, no proper nouns, no past context dependency"
- id: p8
prompt: "What are the success criteria for the first win?"
project: atocore
expected: "Should mention: saves >=5 min lookup, >=80-90% accuracy, >=10 test prompts"

41
tests/test_retrieval.py Normal file
View File

@@ -0,0 +1,41 @@
"""Tests for the retrieval system."""
from atocore.ingestion.pipeline import ingest_file
from atocore.models.database import init_db
from atocore.retrieval.retriever import retrieve
from atocore.retrieval.vector_store import get_vector_store
def test_retrieve_returns_results(tmp_data_dir, sample_markdown):
"""Test that retrieval returns relevant chunks."""
init_db()
ingest_file(sample_markdown)
results = retrieve("What are the memory types?", top_k=5)
assert len(results) > 0
assert all(r.score > 0 for r in results)
assert all(r.content for r in results)
def test_retrieve_scores_ranked(tmp_data_dir, sample_markdown):
"""Test that results are ranked by score."""
init_db()
ingest_file(sample_markdown)
results = retrieve("architecture layers", top_k=5)
if len(results) >= 2:
scores = [r.score for r in results]
assert scores == sorted(scores, reverse=True)
def test_vector_store_count(tmp_data_dir, sample_markdown):
"""Test that vector store tracks chunk count."""
init_db()
# Reset singleton for clean test
import atocore.retrieval.vector_store as vs
vs._store = None
ingest_file(sample_markdown)
store = get_vector_store()
assert store.count > 0