2026-04-13 14:37:12 -04:00
|
|
|
"""Human Mirror — derived readable project views from structured data.
|
|
|
|
|
|
|
|
|
|
Layer 3 of the AtoCore architecture. Generates human-readable markdown
|
|
|
|
|
pages from the engineering entity graph, Trusted Project State, and
|
|
|
|
|
active memories. These pages are DERIVED — they are not canonical
|
|
|
|
|
machine truth. They are support surfaces for human inspection and
|
|
|
|
|
audit comfort.
|
|
|
|
|
|
|
|
|
|
The mirror never invents content. Every line traces back to an entity,
|
|
|
|
|
a state entry, or a memory. If the structured data is wrong, the
|
|
|
|
|
mirror is wrong — fix the source, not the page.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from atocore.context.project_state import get_state
|
|
|
|
|
from atocore.engineering.service import (
|
|
|
|
|
get_entities,
|
|
|
|
|
get_relationships,
|
|
|
|
|
)
|
|
|
|
|
from atocore.memory.service import get_memories
|
|
|
|
|
from atocore.observability.logger import get_logger
|
|
|
|
|
|
|
|
|
|
log = get_logger("mirror")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_project_overview(project: str) -> str:
|
|
|
|
|
"""Generate a full project overview page in markdown."""
|
|
|
|
|
sections = [
|
|
|
|
|
_header(project),
|
2026-04-13 21:08:13 -04:00
|
|
|
_synthesis_section(project),
|
2026-04-13 14:37:12 -04:00
|
|
|
_state_section(project),
|
|
|
|
|
_system_architecture(project),
|
|
|
|
|
_decisions_section(project),
|
|
|
|
|
_requirements_section(project),
|
|
|
|
|
_materials_section(project),
|
|
|
|
|
_vendors_section(project),
|
|
|
|
|
_active_memories_section(project),
|
|
|
|
|
_footer(project),
|
|
|
|
|
]
|
|
|
|
|
return "\n\n".join(s for s in sections if s)
|
|
|
|
|
|
|
|
|
|
|
2026-04-13 21:08:13 -04:00
|
|
|
def _synthesis_section(project: str) -> str:
|
|
|
|
|
"""Generate a short LLM synthesis of the current project state.
|
|
|
|
|
|
|
|
|
|
Reads the cached synthesis from project_state if available
|
|
|
|
|
(category=status, key=synthesis_cache). If not cached, returns
|
|
|
|
|
a deterministic summary from the existing structured data.
|
|
|
|
|
The actual LLM-generated synthesis is produced by the weekly
|
|
|
|
|
lint/synthesis pass on Dalidou (where claude CLI is available).
|
|
|
|
|
"""
|
|
|
|
|
entries = get_state(project)
|
|
|
|
|
cached = ""
|
|
|
|
|
for e in entries:
|
|
|
|
|
if e.category == "status" and e.key == "synthesis_cache":
|
|
|
|
|
cached = e.value
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if cached:
|
|
|
|
|
return f"## Current State (auto-synthesis)\n\n> {cached}"
|
|
|
|
|
|
|
|
|
|
# Fallback: deterministic summary from structured data
|
|
|
|
|
stage = ""
|
|
|
|
|
summary = ""
|
|
|
|
|
next_focus = ""
|
|
|
|
|
for e in entries:
|
|
|
|
|
if e.category == "status":
|
|
|
|
|
if e.key == "stage":
|
|
|
|
|
stage = e.value
|
|
|
|
|
elif e.key == "summary":
|
|
|
|
|
summary = e.value
|
|
|
|
|
elif e.key == "next_focus":
|
|
|
|
|
next_focus = e.value
|
|
|
|
|
|
|
|
|
|
if not (stage or summary or next_focus):
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
bits = []
|
|
|
|
|
if summary:
|
|
|
|
|
bits.append(summary)
|
|
|
|
|
if stage:
|
|
|
|
|
bits.append(f"**Stage**: {stage}")
|
|
|
|
|
if next_focus:
|
|
|
|
|
bits.append(f"**Next**: {next_focus}")
|
|
|
|
|
|
|
|
|
|
return "## Current State\n\n" + "\n\n".join(bits)
|
|
|
|
|
|
|
|
|
|
|
2026-04-13 14:37:12 -04:00
|
|
|
def _header(project: str) -> str:
|
|
|
|
|
return (
|
|
|
|
|
f"# {project} — Project Overview\n\n"
|
|
|
|
|
f"> This page is auto-generated from AtoCore structured data.\n"
|
|
|
|
|
f"> It is a **derived view**, not canonical truth. "
|
|
|
|
|
f"If something is wrong here, fix the source data."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _state_section(project: str) -> str:
|
|
|
|
|
entries = get_state(project)
|
|
|
|
|
if not entries:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Trusted Project State"]
|
|
|
|
|
by_category: dict[str, list] = {}
|
|
|
|
|
for e in entries:
|
|
|
|
|
by_category.setdefault(e.category.upper(), []).append(e)
|
|
|
|
|
|
|
|
|
|
for cat in ["DECISION", "REQUIREMENT", "STATUS", "FACT", "MILESTONE", "CONFIG", "CONTACT"]:
|
|
|
|
|
items = by_category.get(cat, [])
|
|
|
|
|
if not items:
|
|
|
|
|
continue
|
|
|
|
|
lines.append(f"\n### {cat.title()}")
|
|
|
|
|
for item in items:
|
|
|
|
|
value = item.value[:300]
|
|
|
|
|
lines.append(f"- **{item.key}**: {value}")
|
|
|
|
|
if item.source:
|
|
|
|
|
lines.append(f" *(source: {item.source})*")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _system_architecture(project: str) -> str:
|
|
|
|
|
systems = get_entities(entity_type="system", project=project)
|
|
|
|
|
subsystems = get_entities(entity_type="subsystem", project=project)
|
|
|
|
|
components = get_entities(entity_type="component", project=project)
|
|
|
|
|
interfaces = get_entities(entity_type="interface", project=project)
|
|
|
|
|
|
|
|
|
|
if not systems and not subsystems and not components:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## System Architecture"]
|
|
|
|
|
|
|
|
|
|
for system in systems:
|
|
|
|
|
lines.append(f"\n### {system.name}")
|
|
|
|
|
if system.description:
|
|
|
|
|
lines.append(f"{system.description}")
|
|
|
|
|
|
|
|
|
|
rels = get_relationships(system.id, direction="outgoing")
|
|
|
|
|
children = []
|
|
|
|
|
for rel in rels:
|
|
|
|
|
if rel.relationship_type == "contains":
|
|
|
|
|
child = next(
|
|
|
|
|
(s for s in subsystems + components if s.id == rel.target_entity_id),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
if child:
|
|
|
|
|
children.append(child)
|
|
|
|
|
|
|
|
|
|
if children:
|
|
|
|
|
lines.append("\n**Contains:**")
|
|
|
|
|
for child in children:
|
|
|
|
|
desc = f" — {child.description}" if child.description else ""
|
|
|
|
|
lines.append(f"- [{child.entity_type}] **{child.name}**{desc}")
|
|
|
|
|
|
|
|
|
|
child_rels = get_relationships(child.id, direction="both")
|
|
|
|
|
for cr in child_rels:
|
|
|
|
|
if cr.relationship_type in ("uses_material", "interfaces_with", "constrained_by"):
|
|
|
|
|
other_id = (
|
|
|
|
|
cr.target_entity_id
|
|
|
|
|
if cr.source_entity_id == child.id
|
|
|
|
|
else cr.source_entity_id
|
|
|
|
|
)
|
|
|
|
|
other = next(
|
|
|
|
|
(e for e in get_entities(project=project, limit=200)
|
|
|
|
|
if e.id == other_id),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
if other:
|
|
|
|
|
lines.append(
|
|
|
|
|
f" - *{cr.relationship_type}* → "
|
|
|
|
|
f"[{other.entity_type}] {other.name}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _decisions_section(project: str) -> str:
|
|
|
|
|
decisions = get_entities(entity_type="decision", project=project)
|
|
|
|
|
if not decisions:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Decisions"]
|
|
|
|
|
for d in decisions:
|
|
|
|
|
lines.append(f"\n### {d.name}")
|
|
|
|
|
if d.description:
|
|
|
|
|
lines.append(d.description)
|
|
|
|
|
rels = get_relationships(d.id, direction="outgoing")
|
|
|
|
|
for rel in rels:
|
|
|
|
|
if rel.relationship_type == "affected_by_decision":
|
|
|
|
|
affected = next(
|
|
|
|
|
(e for e in get_entities(project=project, limit=200)
|
|
|
|
|
if e.id == rel.target_entity_id),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
if affected:
|
|
|
|
|
lines.append(
|
|
|
|
|
f"- Affects: [{affected.entity_type}] {affected.name}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _requirements_section(project: str) -> str:
|
|
|
|
|
reqs = get_entities(entity_type="requirement", project=project)
|
|
|
|
|
constraints = get_entities(entity_type="constraint", project=project)
|
|
|
|
|
if not reqs and not constraints:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Requirements & Constraints"]
|
|
|
|
|
for r in reqs:
|
|
|
|
|
lines.append(f"- **{r.name}**: {r.description}" if r.description else f"- **{r.name}**")
|
|
|
|
|
for c in constraints:
|
|
|
|
|
lines.append(f"- [constraint] **{c.name}**: {c.description}" if c.description else f"- [constraint] **{c.name}**")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _materials_section(project: str) -> str:
|
|
|
|
|
materials = get_entities(entity_type="material", project=project)
|
|
|
|
|
if not materials:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Materials"]
|
|
|
|
|
for m in materials:
|
|
|
|
|
desc = f" — {m.description}" if m.description else ""
|
|
|
|
|
lines.append(f"- **{m.name}**{desc}")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _vendors_section(project: str) -> str:
|
|
|
|
|
vendors = get_entities(entity_type="vendor", project=project)
|
|
|
|
|
if not vendors:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Vendors"]
|
|
|
|
|
for v in vendors:
|
|
|
|
|
desc = f" — {v.description}" if v.description else ""
|
|
|
|
|
lines.append(f"- **{v.name}**{desc}")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _active_memories_section(project: str) -> str:
|
|
|
|
|
memories = get_memories(project=project, active_only=True, limit=20)
|
|
|
|
|
if not memories:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
lines = ["## Active Memories"]
|
|
|
|
|
for m in memories:
|
|
|
|
|
conf = f" (conf: {m.confidence:.2f})" if m.confidence < 1.0 else ""
|
|
|
|
|
refs = f" | refs: {m.reference_count}" if m.reference_count > 0 else ""
|
|
|
|
|
lines.append(f"- [{m.memory_type}]{conf}{refs} {m.content[:200]}")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _footer(project: str) -> str:
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
|
|
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
|
|
|
|
return (
|
|
|
|
|
f"---\n\n"
|
|
|
|
|
f"*Generated by AtoCore Human Mirror at {now}. "
|
|
|
|
|
f"This is a derived view — not canonical truth.*"
|
|
|
|
|
)
|