3 Commits

Author SHA1 Message Date
2e449a4c33 docs(arch): engineering query catalog as the V1 driving target
First doc in the engineering-layer planning sprint. The premise of
this document is the inverse of the existing ontology doc: instead of
listing objects and seeing what they could do, we list the questions
we need to answer and let those drive what objects and relationships
must exist.

The rule established here:

> If a typed object or relationship does not serve at least one query
> in this catalog, it is not in V1.

Contents:

- 20 v1-required queries grouped into 5 tiers:
  - structure (Q-001..Q-004)
  - intent (Q-005..Q-009)
  - validation (Q-010..Q-012)
  - change/time (Q-013..Q-014)
  - cross-cutting (Q-016..Q-020)
- 3 v1-stretch queries (Q-021..Q-023)
- 4 v2 deferred queries (Q-024..Q-027) so V1 does not paint us into
  a corner

Each entry has: id, question, invocation, expected result shape,
required objects, required relationships, provenance requirement,
and tier.

Three queries are flagged as the "killer correctness" queries:
- Q-006 orphan requirements (engineering equivalent of untested code)
- Q-009 decisions based on flagged assumptions (catches fragile design)
- Q-011 validation claims with no supporting result (catches
  unevidenced claims)

The catalog ends with the implied implementation order for V1, the
list of object families intentionally deferred (BOM, manufacturing,
software, electrical, test correlation), and the open questions this
catalog raises for the next planning docs:

- when do orphan/unsupported queries flag (insert time vs query time)?
- when an Assumption flips, are dependent Decisions auto-flagged?
- does AtoCore block conflicts or always save-and-flag?
- is EVIDENCED_BY mandatory at insert?
- when does the Human Mirror regenerate?

These are the questions the next planning docs (memory-vs-entities,
conflict-model, promotion-rules) should answer before any engineering
layer code is written.

This is doc work only. No code, no schema, no behavior change.
Per the working rule in master-plan-status.md: the architecture docs
shape decisions, they do not force premature schema work.
2026-04-06 19:33:44 -04:00
ea3fed3d44 feat(phase9-A): interaction capture loop foundation
Phase 9 Commit A from the agreed plan: turn AtoCore from a stateless
context enhancer into a system that records what it actually fed to an
LLM and what came back. This is the audit trail Reflection (Commit B)
and Extraction (Commit C) will be layered on top of.

The interactions table existed in the schema since the original PoC
but nothing wrote to it. This change makes it real:

Schema migration (additive only):
- response                full LLM response (caller decides how much)
- memories_used           JSON list of memory ids in the context pack
- chunks_used             JSON list of chunk ids in the context pack
- client                  identifier of the calling system
                          (openclaw, claude-code, manual, ...)
- session_id              groups multi-turn conversations
- project                 project name (mirrors the memory module pattern,
                          no FK so capture stays cheap)
- indexes on session_id, project, created_at

The created_at column is now written explicitly with a SQLite-compatible
'YYYY-MM-DD HH:MM:SS' format so the same string lives in the DB and the
returned dataclass. Without this the `since` filter on list_interactions
would silently fail because CURRENT_TIMESTAMP and isoformat use different
shapes that do not compare cleanly as strings.

New module src/atocore/interactions/:
- Interaction dataclass
- record_interaction()    persists one round-trip (prompt required;
                          everything else optional). Refuses empty prompts.
- list_interactions()     filters by project / session_id / client / since,
                          newest-first, hard-capped at 500
- get_interaction()       fetch by id, full response + context pack

API endpoints:
- POST   /interactions                capture one interaction
- GET    /interactions                list with summaries (no full response)
- GET    /interactions/{id}           full record incl. response + pack

Trust model:
- Capture is read-only with respect to memories, project state, and
  source chunks. Nothing here promotes anything into trusted state.
- The audit trail becomes the dataset Commit B (reinforcement) and
  Commit C (extraction + review queue) will operate on.

Tests (13 new, all green):
- service: persist + roundtrip every field
- service: minimum-fields path (prompt only)
- service: empty / whitespace prompt rejected
- service: get by id returns None for missing
- service: filter by project, session, client
- service: ordering newest-first with limit
- service: since filter inclusive on cutoff (the bug the timestamp
  fix above caught)
- service: limit=0 returns empty
- API: POST records and round-trips through GET /interactions/{id}
- API: empty prompt returns 400
- API: missing id returns 404
- API: list filter returns summaries (not full response bodies)

Full suite: 118 passing (was 105).

master-plan-status.md updated to move Phase 9 from "not started" to
"started" with the explicit note that Commit A is in and Commits B/C
remain.
2026-04-06 19:31:43 -04:00
c9b9eede25 feat: tunable ranking, refresh status, chroma backup + admin endpoints
Three small improvements that move the operational baseline forward
without changing the existing trust model.

1. Tunable retrieval ranking weights
   - rank_project_match_boost, rank_query_token_step,
     rank_query_token_cap, rank_path_high_signal_boost,
     rank_path_low_signal_penalty are now Settings fields
   - all overridable via ATOCORE_* env vars
   - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32
   - lets ranking be tuned per environment as Wave 1 is exercised
     without code changes

2. /projects/{name}/refresh status
   - refresh_registered_project now returns an overall status field
     ("ingested", "partial", "nothing_to_ingest") plus roots_ingested
     and roots_skipped counters
   - ProjectRefreshResponse advertises the new fields so callers can
     rely on them
   - covers the case where every configured root is missing on disk

3. Chroma cold snapshot + admin backup endpoints
   - create_runtime_backup now accepts include_chroma and writes a
     cold directory copy of the chroma persistence path
   - new list_runtime_backups() and validate_backup() helpers
   - new endpoints:
     - POST /admin/backup            create snapshot (optional chroma)
     - GET  /admin/backup            list snapshots
     - GET  /admin/backup/{stamp}/validate  structural validation
   - chroma snapshots are taken under exclusive_ingestion() so a refresh
     or ingest cannot race with the cold copy
   - backup metadata records what was actually included and how big

Tests:
- 8 new tests covering tunable weights, refresh status branches
  (ingested / partial / nothing_to_ingest), chroma snapshot, list,
  validate, and the API endpoints (including the lock-acquisition path)
- existing fake refresh stubs in test_api_storage.py updated for the
  expanded ProjectRefreshResponse model
- full suite: 105 passing (was 97)

next-steps doc updated to reflect that the chroma snapshot + restore
validation gap from current-state.md is now closed in code; only the
operational retention policy remains.
2026-04-06 18:42:19 -04:00
16 changed files with 1598 additions and 14 deletions

View File

@@ -0,0 +1,380 @@
# Engineering Query Catalog (V1 driving target)
## Purpose
This document is the **single most important driver** of the engineering
layer V1 design. The ontology, the schema, the relationship types, and
the human mirror templates should all be designed *to answer the queries
in this catalog*. Anything in the ontology that does not serve at least
one of these queries is overdesign for V1.
The rule is:
> If we cannot describe what question a typed object or relationship
> lets us answer, that object or relationship is not in V1.
The catalog is also the **acceptance test** for the engineering layer.
"V1 is done" means: AtoCore can answer at least the V1-required queries
in this list against the active project set (`p04-gigabit`,
`p05-interferometer`, `p06-polisher`).
## Structure of each entry
Each query is documented as:
- **id**: stable identifier (`Q-001`, `Q-002`, ...)
- **question**: the natural-language question a human or LLM would ask
- **example invocation**: how a client would call AtoCore to ask it
- **expected result shape**: the structure of the answer (not real data)
- **objects required**: which engineering objects must exist
- **relationships required**: which relationships must exist
- **provenance requirement**: what evidence must be linkable
- **tier**: `v1-required` | `v1-stretch` | `v2`
## Tiering
- **v1-required** queries are the floor. The engineering layer cannot
ship without all of them working.
- **v1-stretch** queries should be doable with V1 objects but may need
additional adapters.
- **v2** queries are aspirational; they belong to a later wave of
ontology work and are listed here only to make sure V1 does not
paint us into a corner.
## V1 minimum object set (recap)
For reference, the V1 ontology includes:
- Project, Subsystem, Component
- Requirement, Constraint, Decision
- Material, Parameter
- AnalysisModel, Result, ValidationClaim
- Artifact
And the four relationship families:
- Structural: `CONTAINS`, `PART_OF`, `INTERFACES_WITH`
- Intent: `SATISFIES`, `CONSTRAINED_BY`, `BASED_ON_ASSUMPTION`,
`AFFECTED_BY_DECISION`, `SUPERSEDES`
- Validation: `ANALYZED_BY`, `VALIDATED_BY`, `SUPPORTS`,
`CONFLICTS_WITH`, `DEPENDS_ON`
- Provenance: `DESCRIBED_BY`, `UPDATED_BY_SESSION`, `EVIDENCED_BY`,
`SUMMARIZED_IN`
Every query below is annotated with which of these it depends on, so
that the V1 implementation order is unambiguous.
---
## Tier 1: Structure queries
### Q-001 — What does this subsystem contain?
- **question**: "What components and child subsystems make up
Subsystem `<name>`?"
- **invocation**: `GET /entities/Subsystem/<id>?expand=contains`
- **expected**: `{ subsystem, contains: [{ id, type, name, status }] }`
- **objects**: Subsystem, Component
- **relationships**: `CONTAINS`
- **provenance**: each child must link back to at least one Artifact or
source chunk via `DESCRIBED_BY` / `EVIDENCED_BY`
- **tier**: v1-required
### Q-002 — What is this component a part of?
- **question**: "Which subsystem(s) does Component `<name>` belong to?"
- **invocation**: `GET /entities/Component/<id>?expand=parents`
- **expected**: `{ component, part_of: [{ id, type, name, status }] }`
- **objects**: Component, Subsystem
- **relationships**: `PART_OF` (inverse of `CONTAINS`)
- **provenance**: same as Q-001
- **tier**: v1-required
### Q-003 — What interfaces does this subsystem have, and to what?
- **question**: "What does Subsystem `<name>` interface with, and on
which interfaces?"
- **invocation**: `GET /entities/Subsystem/<id>/interfaces`
- **expected**: `[{ interface_id, peer: { id, type, name }, role }]`
- **objects**: Subsystem (Interface object deferred to v2)
- **relationships**: `INTERFACES_WITH`
- **tier**: v1-required (with simplified Interface = string label;
full Interface object becomes v2)
### Q-004 — What is the system map for this project right now?
- **question**: "Give me the current structural tree of Project `<id>`."
- **invocation**: `GET /projects/<id>/system-map`
- **expected**: nested tree of `{ id, type, name, status, children: [] }`
- **objects**: Project, Subsystem, Component
- **relationships**: `CONTAINS`, `PART_OF`
- **tier**: v1-required
---
## Tier 2: Intent queries
### Q-005 — Which requirements does this component satisfy?
- **question**: "Which Requirements does Component `<name>` satisfy
today?"
- **invocation**: `GET /entities/Component/<id>?expand=satisfies`
- **expected**: `[{ requirement_id, name, status, confidence }]`
- **objects**: Component, Requirement
- **relationships**: `SATISFIES`
- **provenance**: each `SATISFIES` edge must link to a Result or
ValidationClaim that supports the satisfaction (or be flagged as
`unverified`)
- **tier**: v1-required
### Q-006 — Which requirements are not satisfied by anything?
- **question**: "Show me orphan Requirements in Project `<id>`
requirements with no `SATISFIES` edge from any Component."
- **invocation**: `GET /projects/<id>/requirements?coverage=orphan`
- **expected**: `[{ requirement_id, name, status, last_updated }]`
- **objects**: Project, Requirement, Component
- **relationships**: absence of `SATISFIES`
- **tier**: v1-required (this is the killer correctness query — it's
the engineering equivalent of "untested code")
### Q-007 — What constrains this component?
- **question**: "What Constraints apply to Component `<name>`?"
- **invocation**: `GET /entities/Component/<id>?expand=constraints`
- **expected**: `[{ constraint_id, name, value, source_decision_id? }]`
- **objects**: Component, Constraint
- **relationships**: `CONSTRAINED_BY`
- **tier**: v1-required
### Q-008 — Which decisions affect this subsystem or component?
- **question**: "Show me every Decision that affects `<entity>`."
- **invocation**: `GET /entities/<type>/<id>?expand=decisions`
- **expected**: `[{ decision_id, name, status, made_at, supersedes? }]`
- **objects**: Decision, plus the affected entity
- **relationships**: `AFFECTED_BY_DECISION`, `SUPERSEDES`
- **tier**: v1-required
### Q-009 — Which decisions are based on assumptions that are now flagged?
- **question**: "Are any active Decisions in Project `<id>` based on an
Assumption that has been marked invalid or needs_review?"
- **invocation**: `GET /projects/<id>/decisions?assumption_status=needs_review,invalid`
- **expected**: `[{ decision_id, assumption_id, assumption_status }]`
- **objects**: Decision, Assumption
- **relationships**: `BASED_ON_ASSUMPTION`
- **tier**: v1-required (this is the second killer correctness query —
catches fragile design)
---
## Tier 3: Validation queries
### Q-010 — What result validates this claim?
- **question**: "Show me the Result(s) supporting ValidationClaim
`<name>`."
- **invocation**: `GET /entities/ValidationClaim/<id>?expand=supports`
- **expected**: `[{ result_id, analysis_model_id, summary, confidence }]`
- **objects**: ValidationClaim, Result, AnalysisModel
- **relationships**: `SUPPORTS`, `ANALYZED_BY`
- **provenance**: every Result must link to its AnalysisModel and an
Artifact via `DESCRIBED_BY`
- **tier**: v1-required
### Q-011 — Are there any active validation claims with no supporting result?
- **question**: "Which active ValidationClaims in Project `<id>` have
no `SUPPORTS` edge from any Result?"
- **invocation**: `GET /projects/<id>/validation?coverage=unsupported`
- **expected**: `[{ claim_id, name, status, last_updated }]`
- **objects**: ValidationClaim, Result
- **relationships**: absence of `SUPPORTS`
- **tier**: v1-required (third killer correctness query — catches
claims that are not yet evidenced)
### Q-012 — Are there conflicting results for the same claim?
- **question**: "Show me ValidationClaims where multiple Results
disagree (one `SUPPORTS`, another `CONFLICTS_WITH`)."
- **invocation**: `GET /projects/<id>/validation?coverage=conflict`
- **expected**: `[{ claim_id, supporting_results, conflicting_results }]`
- **objects**: ValidationClaim, Result
- **relationships**: `SUPPORTS`, `CONFLICTS_WITH`
- **tier**: v1-required
---
## Tier 4: Change / time queries
### Q-013 — What changed in this project recently?
- **question**: "List entities in Project `<id>` whose `updated_at`
is within the last `<window>`."
- **invocation**: `GET /projects/<id>/changes?since=<iso>`
- **expected**: `[{ id, type, name, status, updated_at, change_kind }]`
- **objects**: any
- **relationships**: any
- **tier**: v1-required
### Q-014 — What is the decision history for this subsystem?
- **question**: "Show me all Decisions affecting Subsystem `<id>` in
chronological order, including superseded ones."
- **invocation**: `GET /entities/Subsystem/<id>/decision-log`
- **expected**: ordered list with supersession chain
- **objects**: Decision, Subsystem
- **relationships**: `AFFECTED_BY_DECISION`, `SUPERSEDES`
- **tier**: v1-required (this is what a human-readable decision log
is generated from)
### Q-015 — What was the trusted state of this entity at time T?
- **question**: "Reconstruct the active fields of `<entity>` as of
timestamp `<T>`."
- **invocation**: `GET /entities/<type>/<id>?as_of=<iso>`
- **expected**: the entity record as it would have been seen at T
- **objects**: any
- **relationships**: status lifecycle
- **tier**: v1-stretch (requires status history table — defer if
baseline implementation runs long)
---
## Tier 5: Cross-cutting queries
### Q-016 — Which interfaces are affected by changing this component?
- **question**: "If Component `<name>` changes, which Interfaces and
which peer subsystems are impacted?"
- **invocation**: `GET /entities/Component/<id>/impact`
- **expected**: `[{ interface_id, peer_id, peer_type, peer_name }]`
- **objects**: Component, Subsystem
- **relationships**: `PART_OF`, `INTERFACES_WITH`
- **tier**: v1-required (this is the change-impact-analysis query the
whole engineering layer exists for)
### Q-017 — What evidence supports this fact?
- **question**: "Give me the source documents and chunks that support
the current value of `<entity>.<field>`."
- **invocation**: `GET /entities/<type>/<id>/evidence?field=<field>`
- **expected**: `[{ source_file, chunk_id, heading_path, score }]`
- **objects**: any
- **relationships**: `EVIDENCED_BY`, `DESCRIBED_BY`
- **tier**: v1-required (without this the engineering layer cannot
pass the AtoCore "trust + provenance" rule)
### Q-018 — What is active vs superseded for this concept?
- **question**: "Show me the current active record for `<key>` plus
the chain of superseded versions."
- **invocation**: `GET /entities/<type>/<id>?include=superseded`
- **expected**: `{ active, superseded_chain: [...] }`
- **objects**: any
- **relationships**: `SUPERSEDES`
- **tier**: v1-required
### Q-019 — Which components depend on this material?
- **question**: "List every Component whose Material is `<material>`."
- **invocation**: `GET /entities/Material/<id>/components`
- **expected**: `[{ component_id, name, subsystem_id }]`
- **objects**: Component, Material
- **relationships**: derived from Component.material field, no edge
needed
- **tier**: v1-required
### Q-020 — What does this project look like as a project overview?
- **question**: "Generate the human-readable Project Overview for
Project `<id>` from current trusted state."
- **invocation**: `GET /projects/<id>/mirror/overview`
- **expected**: formatted markdown derived from active entities
- **objects**: Project, Subsystem, Component, Decision, Requirement,
ValidationClaim
- **relationships**: structural + intent
- **tier**: v1-required (this is the Layer 3 Human Mirror entry
point — the moment the engineering layer becomes useful to humans
who do not want to call APIs)
---
## v1-stretch (nice to have)
### Q-021 — Which parameters drive this analysis result?
- **objects**: AnalysisModel, Parameter, Result
- **relationships**: `ANALYZED_BY`, plus a new `DRIVEN_BY` edge
### Q-022 — Which decisions cite which prior decisions?
- **objects**: Decision
- **relationships**: `BASED_ON_DECISION` (new)
### Q-023 — Cross-project comparison
- **question**: "Are any Materials shared between p04, p05, and p06,
and are their Constraints consistent?"
- **objects**: Project, Material, Constraint
---
## v2 (deferred)
### Q-024 — Cost rollup
- requires BOM Item, Cost Driver, Vendor — out of V1 scope
### Q-025 — Manufacturing readiness
- requires Manufacturing Process, Inspection Step, Assembly Procedure
— out of V1 scope
### Q-026 — Software / control state
- requires Software Module, State Machine, Sensor, Actuator — out
of V1 scope
### Q-027 — Test correlation across analyses
- requires Test, Correlation Record — out of V1 scope
---
## What this catalog implies for V1 implementation order
The 20 v1-required queries above tell us what to build first, in
roughly this order:
1. **Structural** (Q-001 to Q-004): need Project, Subsystem, Component
and `CONTAINS` / `PART_OF` / `INTERFACES_WITH` (with Interface as a
simple string label, not its own entity).
2. **Intent core** (Q-005 to Q-008): need Requirement, Constraint,
Decision and `SATISFIES` / `CONSTRAINED_BY` / `AFFECTED_BY_DECISION`.
3. **Killer correctness queries** (Q-006, Q-009, Q-011): need the
absence-of-edge query patterns and the Assumption object.
4. **Validation** (Q-010 to Q-012): need AnalysisModel, Result,
ValidationClaim and `SUPPORTS` / `ANALYZED_BY` / `CONFLICTS_WITH`.
5. **Change/time** (Q-013, Q-014): need a write log per entity (the
existing `updated_at` plus a status history if Q-015 is in scope).
6. **Cross-cutting** (Q-016 to Q-019): impact analysis is mostly a
graph traversal once the structural and intent edges exist.
7. **Provenance** (Q-017): the entity store must always link to
chunks/artifacts via `EVIDENCED_BY` / `DESCRIBED_BY`. This is
non-negotiable and should be enforced at insert time, not later.
8. **Human Mirror** (Q-020): the markdown generator is the *last*
thing built, not the first. It is derived from everything above.
## What is intentionally left out of V1
- BOM, manufacturing, vendor, cost objects (entire family deferred)
- Software, control, electrical objects (entire family deferred)
- Test correlation objects (entire family deferred)
- Full Interface as its own entity (string label is enough for V1)
- Time-travel queries beyond `since=<iso>` (Q-015 is stretch)
- Multi-project rollups (Q-023 is stretch)
## Open questions this catalog raises
These are the design questions that need to be answered in the next
planning docs (memory-vs-entities, conflict-model, promotion-rules):
- **Q-006, Q-011 (orphan / unsupported queries)**: do orphans get
flagged at insert time, computed at query time, or both?
- **Q-009 (assumption-driven decisions)**: when an Assumption flips
to `needs_review`, are all dependent Decisions auto-flagged or do
they only show up when this query is run?
- **Q-012 (conflicting results)**: does AtoCore *block* a conflict
from being saved, or always save and flag? (The trust rule says
flag, never block — but the implementation needs the explicit nod.)
- **Q-017 (evidence)**: is `EVIDENCED_BY` mandatory at insert? If yes,
how do we backfill entities extracted from older interactions where
the source link is fuzzy?
- **Q-020 (Project Overview mirror)**: when does it regenerate?
On every entity write? On a schedule? On demand?
These are the questions the next architecture docs in the planning
sprint should resolve before any code is written.
## Working rule
> If a v1-required query in this catalog cannot be answered against
> at least one of `p04-gigabit`, `p05-interferometer`, or
> `p06-polisher`, the engineering layer is not done.
This catalog is the contract.

View File

@@ -29,10 +29,14 @@ read-only additive mode.
- Phase 4 - Identity / Preferences
- Phase 8 - OpenClaw Integration
### Started
- Phase 9 - Reflection (Commit A: capture loop in place; Commits B/C
reinforcement and extraction still pending)
### Not Yet Complete In The Intended Sense
- Phase 6 - AtoDrive
- Phase 9 - Reflection
- Phase 10 - Write-back
- Phase 11 - Multi-model
- Phase 12 - Evaluation

View File

@@ -54,6 +54,9 @@ This working list should be read alongside:
- exercise the new SQLite + registry snapshot path on Dalidou
- Chroma backup or rebuild policy
- retention and restore validation
- admin backup endpoint now supports `include_chroma` cold snapshot
under the ingestion lock and `validate` confirms each snapshot is
openable; remaining work is the operational retention policy
8. Keep deeper automatic runtime integration modest until the organic read-only
model has proven value

View File

@@ -25,6 +25,11 @@ from atocore.ingestion.pipeline import (
ingest_file,
ingest_folder,
)
from atocore.interactions.service import (
get_interaction,
list_interactions,
record_interaction,
)
from atocore.memory.service import (
MEMORY_TYPES,
create_memory,
@@ -34,6 +39,11 @@ from atocore.memory.service import (
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
@@ -69,6 +79,9 @@ class ProjectRefreshResponse(BaseModel):
aliases: list[str]
description: str
purge_deleted: bool
status: str
roots_ingested: int
roots_skipped: int
roots: list[dict]
@@ -438,6 +451,149 @@ def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
class InteractionRecordRequest(BaseModel):
prompt: str
response: str = ""
response_summary: str = ""
project: str = ""
client: str = ""
session_id: str = ""
memories_used: list[str] = []
chunks_used: list[str] = []
context_pack: dict | None = None
@router.post("/interactions")
def api_record_interaction(req: InteractionRecordRequest) -> dict:
"""Capture one interaction (prompt + response + what was used).
This is the foundation of the AtoCore reflection loop. It records
what the system fed to an LLM and what came back, but does not
promote anything into trusted state. Phase 9 Commit B/C will layer
reinforcement and extraction on top of this audit trail.
"""
try:
interaction = record_interaction(
prompt=req.prompt,
response=req.response,
response_summary=req.response_summary,
project=req.project,
client=req.client,
session_id=req.session_id,
memories_used=req.memories_used,
chunks_used=req.chunks_used,
context_pack=req.context_pack,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {
"status": "recorded",
"id": interaction.id,
"created_at": interaction.created_at,
}
@router.get("/interactions")
def api_list_interactions(
project: str | None = None,
session_id: str | None = None,
client: str | None = None,
since: str | None = None,
limit: int = 50,
) -> dict:
"""List captured interactions, optionally filtered by project, session,
client, or creation time. Hard-capped at 500 entries per call."""
interactions = list_interactions(
project=project,
session_id=session_id,
client=client,
since=since,
limit=limit,
)
return {
"count": len(interactions),
"interactions": [
{
"id": i.id,
"prompt": i.prompt,
"response_summary": i.response_summary,
"response_chars": len(i.response),
"project": i.project,
"client": i.client,
"session_id": i.session_id,
"memories_used": i.memories_used,
"chunks_used": i.chunks_used,
"created_at": i.created_at,
}
for i in interactions
],
}
@router.get("/interactions/{interaction_id}")
def api_get_interaction(interaction_id: str) -> dict:
"""Fetch a single interaction with the full response and context pack."""
interaction = get_interaction(interaction_id)
if interaction is None:
raise HTTPException(status_code=404, detail=f"Interaction not found: {interaction_id}")
return {
"id": interaction.id,
"prompt": interaction.prompt,
"response": interaction.response,
"response_summary": interaction.response_summary,
"project": interaction.project,
"client": interaction.client,
"session_id": interaction.session_id,
"memories_used": interaction.memories_used,
"chunks_used": interaction.chunks_used,
"context_pack": interaction.context_pack,
"created_at": interaction.created_at,
}
class BackupCreateRequest(BaseModel):
include_chroma: bool = False
@router.post("/admin/backup")
def api_create_backup(req: BackupCreateRequest | None = None) -> dict:
"""Create a runtime backup snapshot.
When ``include_chroma`` is true the call holds the ingestion lock so a
safe cold copy of the vector store can be taken without racing against
refresh or ingest endpoints.
"""
payload = req or BackupCreateRequest()
try:
if payload.include_chroma:
with exclusive_ingestion():
metadata = create_runtime_backup(include_chroma=True)
else:
metadata = create_runtime_backup(include_chroma=False)
except Exception as e:
log.error("admin_backup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Backup failed: {e}")
return metadata
@router.get("/admin/backup")
def api_list_backups() -> dict:
"""List all runtime backups under the configured backup directory."""
return {
"backup_dir": str(_config.settings.resolved_backup_dir),
"backups": list_runtime_backups(),
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""
result = validate_backup(stamp)
if not result.get("exists", False):
raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}")
return result
@router.get("/health")
def api_health() -> dict:
"""Health check."""

View File

@@ -40,6 +40,15 @@ class Settings(BaseSettings):
context_budget: int = 3000
context_top_k: int = 15
# Retrieval ranking weights (tunable per environment).
# All multipliers default to the values used since Wave 1; tighten or
# loosen them via ATOCORE_* env vars without touching code.
rank_project_match_boost: float = 2.0
rank_query_token_step: float = 0.08
rank_query_token_cap: float = 1.32
rank_path_high_signal_boost: float = 1.18
rank_path_low_signal_penalty: float = 0.72
model_config = {"env_prefix": "ATOCORE_"}
@property

View File

@@ -0,0 +1,27 @@
"""Interactions: capture loop for AtoCore.
This module is the foundation for Phase 9 (Reflection) and Phase 10
(Write-back). It records what AtoCore fed to an LLM and what came back,
so that later phases can:
- reinforce active memories that the LLM actually relied on
- extract candidate memories / project state from real conversations
- inspect the audit trail of any answer the system helped produce
Nothing here automatically promotes information into trusted state.
The capture loop is intentionally read-only with respect to trust.
"""
from atocore.interactions.service import (
Interaction,
get_interaction,
list_interactions,
record_interaction,
)
__all__ = [
"Interaction",
"get_interaction",
"list_interactions",
"record_interaction",
]

View File

@@ -0,0 +1,219 @@
"""Interaction capture service.
An *interaction* is one round-trip of:
- a user prompt
- the AtoCore context pack that was assembled for it
- the LLM response (full text or a summary, caller's choice)
- which memories and chunks were actually used in the pack
- a client identifier (e.g. ``openclaw``, ``claude-code``, ``manual``)
- an optional session identifier so multi-turn conversations can be
reconstructed later
The capture is intentionally additive: it never modifies memories,
project state, or chunks. Reflection (Phase 9 Commit B/C) and
write-back (Phase 10) are layered on top of this audit trail without
violating the AtoCore trust hierarchy.
"""
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from atocore.models.database import get_connection
from atocore.observability.logger import get_logger
log = get_logger("interactions")
@dataclass
class Interaction:
id: str
prompt: str
response: str
response_summary: str
project: str
client: str
session_id: str
memories_used: list[str] = field(default_factory=list)
chunks_used: list[str] = field(default_factory=list)
context_pack: dict = field(default_factory=dict)
created_at: str = ""
def record_interaction(
prompt: str,
response: str = "",
response_summary: str = "",
project: str = "",
client: str = "",
session_id: str = "",
memories_used: list[str] | None = None,
chunks_used: list[str] | None = None,
context_pack: dict | None = None,
) -> Interaction:
"""Persist a single interaction to the audit trail.
The only required field is ``prompt`` so this can be called even when
the caller is in the middle of a partial turn (for example to record
that AtoCore was queried even before the LLM response is back).
"""
if not prompt or not prompt.strip():
raise ValueError("Interaction prompt must be non-empty")
interaction_id = str(uuid.uuid4())
# Store created_at explicitly so the same string lives in both the DB
# column and the returned dataclass. SQLite's CURRENT_TIMESTAMP uses
# 'YYYY-MM-DD HH:MM:SS' which would not compare cleanly against ISO
# timestamps with 'T' and tz offset, breaking the `since` filter on
# list_interactions.
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
memories_used = list(memories_used or [])
chunks_used = list(chunks_used or [])
context_pack_payload = context_pack or {}
with get_connection() as conn:
conn.execute(
"""
INSERT INTO interactions (
id, prompt, context_pack, response_summary, response,
memories_used, chunks_used, client, session_id, project,
created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
interaction_id,
prompt,
json.dumps(context_pack_payload, ensure_ascii=True),
response_summary,
response,
json.dumps(memories_used, ensure_ascii=True),
json.dumps(chunks_used, ensure_ascii=True),
client,
session_id,
project,
now,
),
)
log.info(
"interaction_recorded",
interaction_id=interaction_id,
project=project,
client=client,
session_id=session_id,
memories_used=len(memories_used),
chunks_used=len(chunks_used),
response_chars=len(response),
)
return Interaction(
id=interaction_id,
prompt=prompt,
response=response,
response_summary=response_summary,
project=project,
client=client,
session_id=session_id,
memories_used=memories_used,
chunks_used=chunks_used,
context_pack=context_pack_payload,
created_at=now,
)
def list_interactions(
project: str | None = None,
session_id: str | None = None,
client: str | None = None,
since: str | None = None,
limit: int = 50,
) -> list[Interaction]:
"""List captured interactions, optionally filtered.
``since`` is an ISO timestamp string; only interactions created at or
after that time are returned. ``limit`` is hard-capped at 500 to keep
casual API listings cheap.
"""
if limit <= 0:
return []
limit = min(limit, 500)
query = "SELECT * FROM interactions WHERE 1=1"
params: list = []
if project:
query += " AND project = ?"
params.append(project)
if session_id:
query += " AND session_id = ?"
params.append(session_id)
if client:
query += " AND client = ?"
params.append(client)
if since:
query += " AND created_at >= ?"
params.append(since)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
with get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [_row_to_interaction(row) for row in rows]
def get_interaction(interaction_id: str) -> Interaction | None:
"""Fetch one interaction by id, or return None if it does not exist."""
if not interaction_id:
return None
with get_connection() as conn:
row = conn.execute(
"SELECT * FROM interactions WHERE id = ?", (interaction_id,)
).fetchone()
if row is None:
return None
return _row_to_interaction(row)
def _row_to_interaction(row) -> Interaction:
return Interaction(
id=row["id"],
prompt=row["prompt"],
response=row["response"] or "",
response_summary=row["response_summary"] or "",
project=row["project"] or "",
client=row["client"] or "",
session_id=row["session_id"] or "",
memories_used=_safe_json_list(row["memories_used"]),
chunks_used=_safe_json_list(row["chunks_used"]),
context_pack=_safe_json_dict(row["context_pack"]),
created_at=row["created_at"] or "",
)
def _safe_json_list(raw: str | None) -> list[str]:
if not raw:
return []
try:
value = json.loads(raw)
except json.JSONDecodeError:
return []
if not isinstance(value, list):
return []
return [str(item) for item in value]
def _safe_json_dict(raw: str | None) -> dict:
if not raw:
return {}
try:
value = json.loads(raw)
except json.JSONDecodeError:
return {}
if not isinstance(value, dict):
return {}
return value

View File

@@ -59,6 +59,12 @@ CREATE TABLE IF NOT EXISTS interactions (
prompt TEXT NOT NULL,
context_pack TEXT DEFAULT '{}',
response_summary TEXT DEFAULT '',
response TEXT DEFAULT '',
memories_used TEXT DEFAULT '[]',
chunks_used TEXT DEFAULT '[]',
client TEXT DEFAULT '',
session_id TEXT DEFAULT '',
project TEXT DEFAULT '',
project_id TEXT REFERENCES projects(id),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
@@ -68,6 +74,9 @@ CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project);
CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status);
CREATE INDEX IF NOT EXISTS idx_interactions_project ON interactions(project_id);
CREATE INDEX IF NOT EXISTS idx_interactions_project_name ON interactions(project);
CREATE INDEX IF NOT EXISTS idx_interactions_session ON interactions(session_id);
CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at);
"""
@@ -90,6 +99,33 @@ def _apply_migrations(conn: sqlite3.Connection) -> None:
conn.execute("ALTER TABLE memories ADD COLUMN project TEXT DEFAULT ''")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_project ON memories(project)")
# Phase 9 Commit A: capture loop columns on the interactions table.
# The original schema only carried prompt + project_id + a context_pack
# JSON blob. To make interactions a real audit trail of what AtoCore fed
# the LLM and what came back, we record the full response, the chunk
# and memory ids that were actually used, plus client + session metadata.
if not _column_exists(conn, "interactions", "response"):
conn.execute("ALTER TABLE interactions ADD COLUMN response TEXT DEFAULT ''")
if not _column_exists(conn, "interactions", "memories_used"):
conn.execute("ALTER TABLE interactions ADD COLUMN memories_used TEXT DEFAULT '[]'")
if not _column_exists(conn, "interactions", "chunks_used"):
conn.execute("ALTER TABLE interactions ADD COLUMN chunks_used TEXT DEFAULT '[]'")
if not _column_exists(conn, "interactions", "client"):
conn.execute("ALTER TABLE interactions ADD COLUMN client TEXT DEFAULT ''")
if not _column_exists(conn, "interactions", "session_id"):
conn.execute("ALTER TABLE interactions ADD COLUMN session_id TEXT DEFAULT ''")
if not _column_exists(conn, "interactions", "project"):
conn.execute("ALTER TABLE interactions ADD COLUMN project TEXT DEFAULT ''")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_interactions_session ON interactions(session_id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_interactions_project_name ON interactions(project)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_interactions_created_at ON interactions(created_at)"
)
def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()

View File

@@ -1,8 +1,24 @@
"""Create safe runtime backups for the AtoCore machine store."""
"""Create safe runtime backups for the AtoCore machine store.
This module is intentionally conservative:
- The SQLite snapshot uses the online ``conn.backup()`` API and is safe to
call while the database is in use.
- The project registry snapshot is a simple file copy of the canonical
registry JSON.
- The Chroma snapshot is a *cold* directory copy. To stay safe it must be
taken while no ingestion is running. The recommended pattern from the API
layer is to acquire ``exclusive_ingestion()`` for the duration of the
backup so refreshes and ingestions cannot run concurrently with the copy.
The backup metadata file records what was actually included so restore
tooling does not have to guess.
"""
from __future__ import annotations
import json
import shutil
import sqlite3
from datetime import datetime, UTC
from pathlib import Path
@@ -14,8 +30,17 @@ from atocore.observability.logger import get_logger
log = get_logger("backup")
def create_runtime_backup(timestamp: datetime | None = None) -> dict:
"""Create a hot backup of the SQLite DB plus registry/config metadata."""
def create_runtime_backup(
timestamp: datetime | None = None,
include_chroma: bool = False,
) -> dict:
"""Create a hot SQLite backup plus registry/config metadata.
When ``include_chroma`` is true the Chroma persistence directory is also
snapshotted as a cold directory copy. The caller is responsible for
ensuring no ingestion is running concurrently. The HTTP layer enforces
this by holding ``exclusive_ingestion()`` around the call.
"""
init_db()
now = timestamp or datetime.now(UTC)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
@@ -23,6 +48,7 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp
db_backup_dir = backup_root / "db"
config_backup_dir = backup_root / "config"
chroma_backup_dir = backup_root / "chroma"
metadata_path = backup_root / "backup-metadata.json"
db_backup_dir.mkdir(parents=True, exist_ok=True)
@@ -35,7 +61,26 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
registry_path = _config.settings.resolved_project_registry_path
if registry_path.exists():
registry_snapshot = config_backup_dir / registry_path.name
registry_snapshot.write_text(registry_path.read_text(encoding="utf-8"), encoding="utf-8")
registry_snapshot.write_text(
registry_path.read_text(encoding="utf-8"), encoding="utf-8"
)
chroma_snapshot_path = ""
chroma_files_copied = 0
chroma_bytes_copied = 0
if include_chroma:
source_chroma = _config.settings.chroma_path
if source_chroma.exists() and source_chroma.is_dir():
chroma_backup_dir.mkdir(parents=True, exist_ok=True)
chroma_files_copied, chroma_bytes_copied = _copy_directory_tree(
source_chroma, chroma_backup_dir
)
chroma_snapshot_path = str(chroma_backup_dir)
else:
log.info(
"chroma_snapshot_skipped_missing",
path=str(source_chroma),
)
metadata = {
"created_at": now.isoformat(),
@@ -43,14 +88,134 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
"db_snapshot_path": str(db_snapshot_path),
"db_size_bytes": db_snapshot_path.stat().st_size,
"registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "",
"vector_store_note": "Chroma hot backup is not included in this script; use a cold snapshot or rebuild/export workflow.",
"chroma_snapshot_path": chroma_snapshot_path,
"chroma_snapshot_bytes": chroma_bytes_copied,
"chroma_snapshot_files": chroma_files_copied,
"chroma_snapshot_included": include_chroma,
"vector_store_note": (
"Chroma snapshot included as cold directory copy."
if include_chroma and chroma_snapshot_path
else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()."
),
}
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
metadata_path.write_text(
json.dumps(metadata, indent=2, ensure_ascii=True) + "\n",
encoding="utf-8",
)
log.info("runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path))
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
)
return metadata
def list_runtime_backups() -> list[dict]:
"""List all runtime backups under the configured backup directory."""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return []
entries: list[dict] = []
for snapshot_dir in sorted(snapshots_root.iterdir()):
if not snapshot_dir.is_dir():
continue
metadata_path = snapshot_dir / "backup-metadata.json"
entry: dict = {
"stamp": snapshot_dir.name,
"path": str(snapshot_dir),
"has_metadata": metadata_path.exists(),
}
if metadata_path.exists():
try:
entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
entry["metadata"] = None
entry["metadata_error"] = "invalid_json"
entries.append(entry)
return entries
def validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable.
Checks:
- the snapshot directory exists
- the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok
- the registry snapshot, if recorded, parses as JSON
- the chroma snapshot directory, if recorded, exists
"""
snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp
result: dict = {
"stamp": stamp,
"path": str(snapshot_dir),
"exists": snapshot_dir.exists(),
"db_ok": False,
"registry_ok": None,
"chroma_ok": None,
"errors": [],
}
if not snapshot_dir.exists():
result["errors"].append("snapshot_directory_missing")
return result
metadata_path = snapshot_dir / "backup-metadata.json"
if not metadata_path.exists():
result["errors"].append("metadata_missing")
return result
try:
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
result["errors"].append(f"metadata_invalid_json: {exc}")
return result
result["metadata"] = metadata
db_path = Path(metadata.get("db_snapshot_path", ""))
if not db_path.exists():
result["errors"].append("db_snapshot_missing")
else:
try:
with sqlite3.connect(str(db_path)) as conn:
row = conn.execute("PRAGMA integrity_check").fetchone()
result["db_ok"] = bool(row and row[0] == "ok")
if not result["db_ok"]:
result["errors"].append(
f"db_integrity_check_failed: {row[0] if row else 'no_row'}"
)
except sqlite3.DatabaseError as exc:
result["errors"].append(f"db_open_failed: {exc}")
registry_snapshot_path = metadata.get("registry_snapshot_path", "")
if registry_snapshot_path:
registry_path = Path(registry_snapshot_path)
if not registry_path.exists():
result["registry_ok"] = False
result["errors"].append("registry_snapshot_missing")
else:
try:
json.loads(registry_path.read_text(encoding="utf-8"))
result["registry_ok"] = True
except json.JSONDecodeError as exc:
result["registry_ok"] = False
result["errors"].append(f"registry_invalid_json: {exc}")
chroma_snapshot_path = metadata.get("chroma_snapshot_path", "")
if chroma_snapshot_path:
chroma_dir = Path(chroma_snapshot_path)
if chroma_dir.exists() and chroma_dir.is_dir():
result["chroma_ok"] = True
else:
result["chroma_ok"] = False
result["errors"].append("chroma_snapshot_missing")
result["valid"] = not result["errors"]
return result
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
@@ -61,6 +226,21 @@ def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn.close()
def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]:
"""Copy a directory tree and return (file_count, total_bytes)."""
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(source, dest)
file_count = 0
total_bytes = 0
for path in dest.rglob("*"):
if path.is_file():
file_count += 1
total_bytes += path.stat().st_size
return file_count, total_bytes
def main() -> None:
result = create_runtime_backup()
print(json.dumps(result, indent=2, ensure_ascii=True))

View File

@@ -255,12 +255,23 @@ def get_registered_project(project_name: str) -> RegisteredProject | None:
def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict:
"""Ingest all configured source roots for a registered project."""
"""Ingest all configured source roots for a registered project.
The returned dict carries an overall ``status`` so callers can tell at a
glance whether the refresh was fully successful, partial, or did nothing
at all because every configured root was missing or not a directory:
- ``ingested``: every root was a real directory and was ingested
- ``partial``: at least one root ingested and at least one was unusable
- ``nothing_to_ingest``: no roots were usable
"""
project = get_registered_project(project_name)
if project is None:
raise ValueError(f"Unknown project: {project_name}")
roots = []
ingested_count = 0
skipped_count = 0
for source_ref in project.ingest_roots:
resolved = _resolve_ingest_root(source_ref)
root_result = {
@@ -271,9 +282,11 @@ def refresh_registered_project(project_name: str, purge_deleted: bool = False) -
}
if not resolved.exists():
roots.append({**root_result, "status": "missing"})
skipped_count += 1
continue
if not resolved.is_dir():
roots.append({**root_result, "status": "not_directory"})
skipped_count += 1
continue
roots.append(
@@ -283,12 +296,23 @@ def refresh_registered_project(project_name: str, purge_deleted: bool = False) -
"results": ingest_folder(resolved, purge_deleted=purge_deleted),
}
)
ingested_count += 1
if ingested_count == 0:
overall_status = "nothing_to_ingest"
elif skipped_count == 0:
overall_status = "ingested"
else:
overall_status = "partial"
return {
"project": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"purge_deleted": purge_deleted,
"status": overall_status,
"roots_ingested": ingested_count,
"roots_skipped": skipped_count,
"roots": roots,
}

View File

@@ -173,7 +173,7 @@ def _project_match_boost(project_hint: str, metadata: dict) -> float:
for candidate in candidate_names:
if candidate and candidate in searchable:
return 2.0
return _config.settings.rank_project_match_boost
return 1.0
@@ -198,7 +198,10 @@ def _query_match_boost(query: str, metadata: dict) -> float:
matches = sum(1 for token in set(tokens) if token in searchable)
if matches <= 0:
return 1.0
return min(1.0 + matches * 0.08, 1.32)
return min(
1.0 + matches * _config.settings.rank_query_token_step,
_config.settings.rank_query_token_cap,
)
def _path_signal_boost(metadata: dict) -> float:
@@ -213,9 +216,9 @@ def _path_signal_boost(metadata: dict) -> float:
multiplier = 1.0
if any(hint in searchable for hint in _LOW_SIGNAL_HINTS):
multiplier *= 0.72
multiplier *= _config.settings.rank_path_low_signal_penalty
if any(hint in searchable for hint in _HIGH_SIGNAL_HINTS):
multiplier *= 1.18
multiplier *= _config.settings.rank_path_high_signal_boost
return multiplier

View File

@@ -129,6 +129,9 @@ def test_project_refresh_endpoint_uses_registered_roots(tmp_data_dir, monkeypatc
"aliases": ["p05"],
"description": "P05 docs",
"purge_deleted": purge_deleted,
"status": "ingested",
"roots_ingested": 1,
"roots_skipped": 0,
"roots": [
{
"source": "vault",
@@ -173,6 +176,9 @@ def test_project_refresh_endpoint_serializes_ingestion(tmp_data_dir, monkeypatch
"aliases": ["p05"],
"description": "P05 docs",
"purge_deleted": purge_deleted,
"status": "nothing_to_ingest",
"roots_ingested": 0,
"roots_skipped": 0,
"roots": [],
}
@@ -429,6 +435,125 @@ def test_project_update_endpoint_rejects_collisions(tmp_data_dir, monkeypatch):
assert "collisions" in response.json()["detail"]
def test_admin_backup_create_without_chroma(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
captured = {}
def fake_create_runtime_backup(timestamp=None, include_chroma=False):
captured["include_chroma"] = include_chroma
return {
"created_at": "2026-04-06T23:00:00+00:00",
"backup_root": "/tmp/fake",
"db_snapshot_path": "/tmp/fake/db/atocore.db",
"db_size_bytes": 0,
"registry_snapshot_path": "",
"chroma_snapshot_path": "",
"chroma_snapshot_bytes": 0,
"chroma_snapshot_files": 0,
"chroma_snapshot_included": False,
"vector_store_note": "skipped",
}
monkeypatch.setattr("atocore.api.routes.create_runtime_backup", fake_create_runtime_backup)
client = TestClient(app)
response = client.post("/admin/backup", json={})
assert response.status_code == 200
assert captured == {"include_chroma": False}
body = response.json()
assert body["chroma_snapshot_included"] is False
def test_admin_backup_create_with_chroma_holds_lock(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
events = []
@contextmanager
def fake_lock():
events.append("enter")
try:
yield
finally:
events.append("exit")
def fake_create_runtime_backup(timestamp=None, include_chroma=False):
events.append(("backup", include_chroma))
return {
"created_at": "2026-04-06T23:30:00+00:00",
"backup_root": "/tmp/fake",
"db_snapshot_path": "/tmp/fake/db/atocore.db",
"db_size_bytes": 0,
"registry_snapshot_path": "",
"chroma_snapshot_path": "/tmp/fake/chroma",
"chroma_snapshot_bytes": 4,
"chroma_snapshot_files": 1,
"chroma_snapshot_included": True,
"vector_store_note": "included",
}
monkeypatch.setattr("atocore.api.routes.exclusive_ingestion", fake_lock)
monkeypatch.setattr("atocore.api.routes.create_runtime_backup", fake_create_runtime_backup)
client = TestClient(app)
response = client.post("/admin/backup", json={"include_chroma": True})
assert response.status_code == 200
assert events == ["enter", ("backup", True), "exit"]
assert response.json()["chroma_snapshot_included"] is True
def test_admin_backup_list_and_validate_endpoints(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
def fake_list_runtime_backups():
return [
{
"stamp": "20260406T220000Z",
"path": "/tmp/fake/snapshots/20260406T220000Z",
"has_metadata": True,
"metadata": {"db_snapshot_path": "/tmp/fake/snapshots/20260406T220000Z/db/atocore.db"},
}
]
def fake_validate_backup(stamp):
if stamp == "missing":
return {
"stamp": stamp,
"path": f"/tmp/fake/snapshots/{stamp}",
"exists": False,
"errors": ["snapshot_directory_missing"],
}
return {
"stamp": stamp,
"path": f"/tmp/fake/snapshots/{stamp}",
"exists": True,
"db_ok": True,
"registry_ok": True,
"chroma_ok": None,
"valid": True,
"errors": [],
}
monkeypatch.setattr("atocore.api.routes.list_runtime_backups", fake_list_runtime_backups)
monkeypatch.setattr("atocore.api.routes.validate_backup", fake_validate_backup)
client = TestClient(app)
listing = client.get("/admin/backup")
assert listing.status_code == 200
listing_body = listing.json()
assert "backup_dir" in listing_body
assert listing_body["backups"][0]["stamp"] == "20260406T220000Z"
valid = client.get("/admin/backup/20260406T220000Z/validate")
assert valid.status_code == 200
assert valid.json()["valid"] is True
missing = client.get("/admin/backup/missing/validate")
assert missing.status_code == 404
def test_query_endpoint_accepts_project_hint(monkeypatch):
def fake_retrieve(prompt, top_k=10, filter_tags=None, project_hint=None):
assert prompt == "architecture"

View File

@@ -6,7 +6,11 @@ from datetime import UTC, datetime
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import create_runtime_backup
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
@@ -53,6 +57,89 @@ def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
assert metadata["registry_snapshot_path"] == str(registry_snapshot)
def test_create_runtime_backup_includes_chroma_when_requested(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
# Create a fake chroma directory tree with a couple of files.
chroma_dir = config.settings.chroma_path
(chroma_dir / "collection-a").mkdir(parents=True, exist_ok=True)
(chroma_dir / "collection-a" / "data.bin").write_bytes(b"\x00\x01\x02\x03")
(chroma_dir / "metadata.json").write_text('{"ok":true}', encoding="utf-8")
result = create_runtime_backup(
datetime(2026, 4, 6, 20, 0, 0, tzinfo=UTC),
include_chroma=True,
)
finally:
config.settings = original_settings
chroma_snapshot_root = (
tmp_path / "backups" / "snapshots" / "20260406T200000Z" / "chroma"
)
assert result["chroma_snapshot_included"] is True
assert result["chroma_snapshot_path"] == str(chroma_snapshot_root)
assert result["chroma_snapshot_files"] >= 2
assert result["chroma_snapshot_bytes"] > 0
assert (chroma_snapshot_root / "collection-a" / "data.bin").exists()
assert (chroma_snapshot_root / "metadata.json").exists()
def test_list_and_validate_runtime_backups(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
first = create_runtime_backup(datetime(2026, 4, 6, 21, 0, 0, tzinfo=UTC))
second = create_runtime_backup(datetime(2026, 4, 6, 22, 0, 0, tzinfo=UTC))
listing = list_runtime_backups()
first_validation = validate_backup("20260406T210000Z")
second_validation = validate_backup("20260406T220000Z")
missing_validation = validate_backup("20260101T000000Z")
finally:
config.settings = original_settings
assert len(listing) == 2
assert {entry["stamp"] for entry in listing} == {
"20260406T210000Z",
"20260406T220000Z",
}
for entry in listing:
assert entry["has_metadata"] is True
assert entry["metadata"]["db_snapshot_path"]
assert first_validation["valid"] is True
assert first_validation["db_ok"] is True
assert first_validation["errors"] == []
assert second_validation["valid"] is True
assert missing_validation["exists"] is False
assert "snapshot_directory_missing" in missing_validation["errors"]
# both metadata paths are reachable on disk
assert json.loads(
(tmp_path / "backups" / "snapshots" / "20260406T210000Z" / "backup-metadata.json")
.read_text(encoding="utf-8")
)["db_snapshot_path"] == first["db_snapshot_path"]
assert second["db_snapshot_path"].endswith("atocore.db")
def test_create_runtime_backup_handles_missing_registry(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))

View File

@@ -44,6 +44,22 @@ def test_settings_keep_legacy_db_path_when_present(tmp_path, monkeypatch):
assert settings.db_path == legacy_db.resolve()
def test_ranking_weights_are_tunable_via_env(monkeypatch):
monkeypatch.setenv("ATOCORE_RANK_PROJECT_MATCH_BOOST", "3.5")
monkeypatch.setenv("ATOCORE_RANK_QUERY_TOKEN_STEP", "0.12")
monkeypatch.setenv("ATOCORE_RANK_QUERY_TOKEN_CAP", "1.5")
monkeypatch.setenv("ATOCORE_RANK_PATH_HIGH_SIGNAL_BOOST", "1.25")
monkeypatch.setenv("ATOCORE_RANK_PATH_LOW_SIGNAL_PENALTY", "0.5")
settings = config.Settings()
assert settings.rank_project_match_boost == 3.5
assert settings.rank_query_token_step == 0.12
assert settings.rank_query_token_cap == 1.5
assert settings.rank_path_high_signal_boost == 1.25
assert settings.rank_path_low_signal_penalty == 0.5
def test_ensure_runtime_dirs_creates_machine_dirs_only(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(tmp_path / "vault-source"))

211
tests/test_interactions.py Normal file
View File

@@ -0,0 +1,211 @@
"""Tests for the Phase 9 Commit A interaction capture loop."""
import time
import pytest
from fastapi.testclient import TestClient
from atocore.interactions.service import (
get_interaction,
list_interactions,
record_interaction,
)
from atocore.main import app
from atocore.models.database import init_db
# --- Service-level tests --------------------------------------------------
def test_record_interaction_persists_all_fields(tmp_data_dir):
init_db()
interaction = record_interaction(
prompt="What is the lateral support material for p05?",
response="The current lateral support uses GF-PTFE pads per Decision D-024.",
response_summary="lateral support: GF-PTFE per D-024",
project="p05-interferometer",
client="claude-code",
session_id="sess-001",
memories_used=["mem-aaa", "mem-bbb"],
chunks_used=["chunk-111", "chunk-222", "chunk-333"],
context_pack={"budget": 3000, "chunks": 3},
)
assert interaction.id
assert interaction.created_at
fetched = get_interaction(interaction.id)
assert fetched is not None
assert fetched.prompt.startswith("What is the lateral support")
assert fetched.response.startswith("The current lateral support")
assert fetched.response_summary == "lateral support: GF-PTFE per D-024"
assert fetched.project == "p05-interferometer"
assert fetched.client == "claude-code"
assert fetched.session_id == "sess-001"
assert fetched.memories_used == ["mem-aaa", "mem-bbb"]
assert fetched.chunks_used == ["chunk-111", "chunk-222", "chunk-333"]
assert fetched.context_pack == {"budget": 3000, "chunks": 3}
def test_record_interaction_minimum_fields(tmp_data_dir):
init_db()
interaction = record_interaction(prompt="ping")
assert interaction.id
assert interaction.prompt == "ping"
assert interaction.response == ""
assert interaction.memories_used == []
assert interaction.chunks_used == []
def test_record_interaction_rejects_empty_prompt(tmp_data_dir):
init_db()
with pytest.raises(ValueError):
record_interaction(prompt="")
with pytest.raises(ValueError):
record_interaction(prompt=" ")
def test_get_interaction_returns_none_for_unknown_id(tmp_data_dir):
init_db()
assert get_interaction("does-not-exist") is None
assert get_interaction("") is None
def test_list_interactions_filters_by_project(tmp_data_dir):
init_db()
record_interaction(prompt="p04 question", project="p04-gigabit")
record_interaction(prompt="p05 question", project="p05-interferometer")
record_interaction(prompt="another p05", project="p05-interferometer")
p05 = list_interactions(project="p05-interferometer")
p04 = list_interactions(project="p04-gigabit")
assert len(p05) == 2
assert len(p04) == 1
assert all(i.project == "p05-interferometer" for i in p05)
assert p04[0].prompt == "p04 question"
def test_list_interactions_filters_by_session_and_client(tmp_data_dir):
init_db()
record_interaction(prompt="a", session_id="sess-A", client="openclaw")
record_interaction(prompt="b", session_id="sess-A", client="claude-code")
record_interaction(prompt="c", session_id="sess-B", client="openclaw")
sess_a = list_interactions(session_id="sess-A")
openclaw = list_interactions(client="openclaw")
assert len(sess_a) == 2
assert len(openclaw) == 2
assert {i.client for i in sess_a} == {"openclaw", "claude-code"}
def test_list_interactions_orders_newest_first_and_respects_limit(tmp_data_dir):
init_db()
# created_at has 1-second resolution; sleep enough to keep ordering
# deterministic regardless of insert speed.
for index in range(5):
record_interaction(prompt=f"prompt-{index}")
time.sleep(1.05)
items = list_interactions(limit=3)
assert len(items) == 3
# Newest first: prompt-4, prompt-3, prompt-2
assert items[0].prompt == "prompt-4"
assert items[1].prompt == "prompt-3"
assert items[2].prompt == "prompt-2"
def test_list_interactions_respects_since_filter(tmp_data_dir):
init_db()
first = record_interaction(prompt="early")
time.sleep(1.05)
second = record_interaction(prompt="late")
after_first = list_interactions(since=first.created_at)
ids_after_first = {item.id for item in after_first}
assert second.id in ids_after_first
assert first.id in ids_after_first # cutoff is inclusive
after_second = list_interactions(since=second.created_at)
ids_after_second = {item.id for item in after_second}
assert second.id in ids_after_second
assert first.id not in ids_after_second
def test_list_interactions_zero_limit_returns_empty(tmp_data_dir):
init_db()
record_interaction(prompt="ping")
assert list_interactions(limit=0) == []
# --- API-level tests ------------------------------------------------------
def test_post_interactions_endpoint_records_interaction(tmp_data_dir):
init_db()
client = TestClient(app)
response = client.post(
"/interactions",
json={
"prompt": "What changed in p06 this week?",
"response": "Polisher kinematic frame parameters updated to v0.3.",
"response_summary": "p06 frame parameters bumped to v0.3",
"project": "p06-polisher",
"client": "claude-code",
"session_id": "sess-xyz",
"memories_used": ["mem-1"],
"chunks_used": ["chunk-a", "chunk-b"],
"context_pack": {"chunks": 2},
},
)
assert response.status_code == 200
body = response.json()
assert body["status"] == "recorded"
interaction_id = body["id"]
# Round-trip via the GET endpoint
fetched = client.get(f"/interactions/{interaction_id}")
assert fetched.status_code == 200
fetched_body = fetched.json()
assert fetched_body["prompt"].startswith("What changed in p06")
assert fetched_body["response"].startswith("Polisher kinematic frame")
assert fetched_body["project"] == "p06-polisher"
assert fetched_body["chunks_used"] == ["chunk-a", "chunk-b"]
assert fetched_body["context_pack"] == {"chunks": 2}
def test_post_interactions_rejects_empty_prompt(tmp_data_dir):
init_db()
client = TestClient(app)
response = client.post("/interactions", json={"prompt": ""})
assert response.status_code == 400
def test_get_unknown_interaction_returns_404(tmp_data_dir):
init_db()
client = TestClient(app)
response = client.get("/interactions/does-not-exist")
assert response.status_code == 404
def test_list_interactions_endpoint_returns_summaries(tmp_data_dir):
init_db()
client = TestClient(app)
client.post(
"/interactions",
json={"prompt": "alpha", "project": "p04-gigabit", "response": "x" * 10},
)
client.post(
"/interactions",
json={"prompt": "beta", "project": "p05-interferometer", "response": "y" * 50},
)
response = client.get("/interactions", params={"project": "p05-interferometer"})
assert response.status_code == 200
body = response.json()
assert body["count"] == 1
assert body["interactions"][0]["prompt"] == "beta"
assert body["interactions"][0]["response_chars"] == 50
# The list endpoint never includes the full response body
assert "response" not in body["interactions"][0]

View File

@@ -154,6 +154,110 @@ def test_refresh_registered_project_ingests_registered_roots(tmp_path, monkeypat
assert calls[0][0].endswith("p06-polisher")
assert calls[0][1] is False
assert result["roots"][0]["status"] == "ingested"
assert result["status"] == "ingested"
assert result["roots_ingested"] == 1
assert result["roots_skipped"] == 0
def test_refresh_registered_project_reports_nothing_to_ingest_when_all_missing(
tmp_path, monkeypatch
):
vault_dir = tmp_path / "vault"
drive_dir = tmp_path / "drive"
config_dir = tmp_path / "config"
vault_dir.mkdir()
drive_dir.mkdir()
config_dir.mkdir()
registry_path = config_dir / "project-registry.json"
registry_path.write_text(
json.dumps(
{
"projects": [
{
"id": "p07-ghost",
"aliases": ["ghost"],
"description": "Project whose roots do not exist on disk",
"ingest_roots": [
{"source": "vault", "subpath": "incoming/projects/p07-ghost"}
],
}
]
}
),
encoding="utf-8",
)
def fail_ingest_folder(path, purge_deleted=True):
raise AssertionError(f"ingest_folder should not be called for missing root: {path}")
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path))
original_settings = config.settings
try:
config.settings = config.Settings()
monkeypatch.setattr("atocore.projects.registry.ingest_folder", fail_ingest_folder)
result = refresh_registered_project("ghost")
finally:
config.settings = original_settings
assert result["status"] == "nothing_to_ingest"
assert result["roots_ingested"] == 0
assert result["roots_skipped"] == 1
assert result["roots"][0]["status"] == "missing"
def test_refresh_registered_project_reports_partial_status(tmp_path, monkeypatch):
vault_dir = tmp_path / "vault"
drive_dir = tmp_path / "drive"
config_dir = tmp_path / "config"
real_root = vault_dir / "incoming" / "projects" / "p08-mixed"
real_root.mkdir(parents=True)
drive_dir.mkdir()
config_dir.mkdir()
registry_path = config_dir / "project-registry.json"
registry_path.write_text(
json.dumps(
{
"projects": [
{
"id": "p08-mixed",
"aliases": ["mixed"],
"description": "One root present, one missing",
"ingest_roots": [
{"source": "vault", "subpath": "incoming/projects/p08-mixed"},
{"source": "vault", "subpath": "incoming/projects/p08-mixed-missing"},
],
}
]
}
),
encoding="utf-8",
)
def fake_ingest_folder(path, purge_deleted=True):
return [{"file": str(path / "README.md"), "status": "ingested"}]
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path))
original_settings = config.settings
try:
config.settings = config.Settings()
monkeypatch.setattr("atocore.projects.registry.ingest_folder", fake_ingest_folder)
result = refresh_registered_project("mixed")
finally:
config.settings = original_settings
assert result["status"] == "partial"
assert result["roots_ingested"] == 1
assert result["roots_skipped"] == 1
statuses = sorted(root["status"] for root in result["roots"])
assert statuses == ["ingested", "missing"]
def test_project_registry_template_has_expected_shape():