168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
|
|
"""V1-0 one-time backfill: flag existing active entities that have no
|
||
|
|
provenance (empty source_refs) as hand_authored=1 so they stop failing
|
||
|
|
the F-8 invariant.
|
||
|
|
|
||
|
|
Runs against the live AtoCore DB. Idempotent: a second run after the
|
||
|
|
first touches nothing because the flagged rows already have
|
||
|
|
hand_authored=1.
|
||
|
|
|
||
|
|
Per the Engineering V1 Completion Plan (V1-0 scope), the three options
|
||
|
|
for an existing active entity without provenance are:
|
||
|
|
|
||
|
|
1. Attach provenance — impossible without human review, not automatable
|
||
|
|
2. Flag hand-authored — safe, additive, this script's default
|
||
|
|
3. Invalidate — destructive, requires operator sign-off
|
||
|
|
|
||
|
|
This script picks option (2) by default. Add --dry-run to see what
|
||
|
|
would change without writing. Add --invalidate-instead to pick option
|
||
|
|
(3) for all rows (not recommended for first run).
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python scripts/v1_0_backfill_provenance.py --base-url http://dalidou:8100 --dry-run
|
||
|
|
python scripts/v1_0_backfill_provenance.py --base-url http://dalidou:8100
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import sqlite3
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
|
||
|
|
def run(db_path: Path, dry_run: bool, invalidate_instead: bool) -> int:
|
||
|
|
if not db_path.exists():
|
||
|
|
print(f"ERROR: db not found: {db_path}", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
conn = sqlite3.connect(str(db_path))
|
||
|
|
conn.row_factory = sqlite3.Row
|
||
|
|
|
||
|
|
# Verify the V1-0 migration ran: if hand_authored column is missing
|
||
|
|
# the operator hasn't deployed V1-0 yet, and running this script
|
||
|
|
# would crash. Fail loud rather than attempt the ALTER here.
|
||
|
|
cols = {r["name"] for r in conn.execute("PRAGMA table_info(entities)").fetchall()}
|
||
|
|
if "hand_authored" not in cols:
|
||
|
|
print(
|
||
|
|
"ERROR: entities table lacks the hand_authored column. "
|
||
|
|
"Deploy V1-0 migrations first (init_db + init_engineering_schema).",
|
||
|
|
file=sys.stderr,
|
||
|
|
)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
# Scope differs by mode:
|
||
|
|
# - Default (flag hand_authored=1): safe/additive, applies to active
|
||
|
|
# AND superseded rows so the historical trail is consistent.
|
||
|
|
# - --invalidate-instead: destructive — scope to ACTIVE rows only.
|
||
|
|
# Invalidating already-superseded history would collapse the audit
|
||
|
|
# trail, which the plan's remediation scope never intended
|
||
|
|
# (V1-0 talks about existing active no-provenance entities).
|
||
|
|
if invalidate_instead:
|
||
|
|
scope_sql = "status = 'active'"
|
||
|
|
else:
|
||
|
|
scope_sql = "status IN ('active', 'superseded')"
|
||
|
|
rows = conn.execute(
|
||
|
|
f"SELECT id, entity_type, name, project, status, source_refs, hand_authored "
|
||
|
|
f"FROM entities WHERE {scope_sql} AND hand_authored = 0"
|
||
|
|
).fetchall()
|
||
|
|
|
||
|
|
needs_fix = []
|
||
|
|
for row in rows:
|
||
|
|
refs_raw = row["source_refs"] or "[]"
|
||
|
|
try:
|
||
|
|
refs = json.loads(refs_raw)
|
||
|
|
except Exception:
|
||
|
|
refs = []
|
||
|
|
if not refs:
|
||
|
|
needs_fix.append(row)
|
||
|
|
|
||
|
|
print(f"found {len(needs_fix)} active/superseded entities with no provenance")
|
||
|
|
for row in needs_fix:
|
||
|
|
print(
|
||
|
|
f" - {row['id'][:8]} [{row['entity_type']}] "
|
||
|
|
f"{row['name']!r} project={row['project']!r} status={row['status']}"
|
||
|
|
)
|
||
|
|
|
||
|
|
if dry_run:
|
||
|
|
print("--dry-run: no changes written")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
if not needs_fix:
|
||
|
|
print("nothing to do")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
action = "invalidate" if invalidate_instead else "flag hand_authored=1"
|
||
|
|
print(f"applying: {action}")
|
||
|
|
|
||
|
|
cur = conn.cursor()
|
||
|
|
for row in needs_fix:
|
||
|
|
if invalidate_instead:
|
||
|
|
cur.execute(
|
||
|
|
"UPDATE entities SET status = 'invalid', "
|
||
|
|
"updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||
|
|
(row["id"],),
|
||
|
|
)
|
||
|
|
cur.execute(
|
||
|
|
"INSERT INTO memory_audit "
|
||
|
|
"(id, memory_id, action, actor, before_json, after_json, note, entity_kind) "
|
||
|
|
"VALUES (?, ?, 'invalidated', 'v1_0_backfill', ?, ?, ?, 'entity')",
|
||
|
|
(
|
||
|
|
f"v10bf-{row['id'][:8]}-inv",
|
||
|
|
row["id"],
|
||
|
|
json.dumps({"status": row["status"]}),
|
||
|
|
json.dumps({"status": "invalid"}),
|
||
|
|
"V1-0 backfill: invalidated, no provenance",
|
||
|
|
),
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
cur.execute(
|
||
|
|
"UPDATE entities SET hand_authored = 1, "
|
||
|
|
"updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||
|
|
(row["id"],),
|
||
|
|
)
|
||
|
|
cur.execute(
|
||
|
|
"INSERT INTO memory_audit "
|
||
|
|
"(id, memory_id, action, actor, before_json, after_json, note, entity_kind) "
|
||
|
|
"VALUES (?, ?, 'hand_authored_flagged', 'v1_0_backfill', ?, ?, ?, 'entity')",
|
||
|
|
(
|
||
|
|
f"v10bf-{row['id'][:8]}-ha",
|
||
|
|
row["id"],
|
||
|
|
json.dumps({"hand_authored": False}),
|
||
|
|
json.dumps({"hand_authored": True}),
|
||
|
|
"V1-0 backfill: flagged hand_authored since source_refs empty",
|
||
|
|
),
|
||
|
|
)
|
||
|
|
|
||
|
|
conn.commit()
|
||
|
|
print(f"done: updated {len(needs_fix)} entities")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
||
|
|
parser.add_argument(
|
||
|
|
"--db",
|
||
|
|
type=Path,
|
||
|
|
default=Path("data/db/atocore.db"),
|
||
|
|
help="Path to the SQLite database (default: data/db/atocore.db)",
|
||
|
|
)
|
||
|
|
parser.add_argument("--dry-run", action="store_true", help="Report only; no writes")
|
||
|
|
parser.add_argument(
|
||
|
|
"--invalidate-instead",
|
||
|
|
action="store_true",
|
||
|
|
help=(
|
||
|
|
"DESTRUCTIVE. Invalidate active rows with no provenance instead "
|
||
|
|
"of flagging them hand_authored. Scoped to status='active' only "
|
||
|
|
"(superseded rows are left alone to preserve audit history). "
|
||
|
|
"Not recommended for first run — start with --dry-run, then "
|
||
|
|
"the default hand_authored flag path."
|
||
|
|
),
|
||
|
|
)
|
||
|
|
args = parser.parse_args()
|
||
|
|
return run(args.db, args.dry_run, args.invalidate_instead)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|