2026-04-12 10:57:18 -04:00
""" Host-side LLM batch extraction — pure HTTP client, no atocore imports.
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
Fetches interactions from the AtoCore API , runs ` ` claude - p ` ` locally
for each , and POSTs candidates back . Zero dependency on atocore source
or Python packages — only uses stdlib + the ` ` claude ` ` CLI on PATH .
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
This is necessary because the ` ` claude ` ` CLI is on the Dalidou HOST
but not inside the Docker container , and the host ' s Python doesn ' t
have the container ' s dependencies (pydantic_settings, etc.).
2026-04-12 10:55:22 -04:00
"""
from __future__ import annotations
import argparse
import json
import os
2026-04-12 10:57:18 -04:00
import shutil
import subprocess
2026-04-12 10:55:22 -04:00
import sys
2026-04-12 10:57:18 -04:00
import tempfile
2026-04-12 10:55:22 -04:00
import urllib . error
import urllib . parse
import urllib . request
2026-04-12 10:57:18 -04:00
from datetime import datetime , timezone
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
DEFAULT_BASE_URL = os . environ . get ( " ATOCORE_BASE_URL " , " http://localhost:8100 " )
DEFAULT_MODEL = os . environ . get ( " ATOCORE_LLM_EXTRACTOR_MODEL " , " sonnet " )
DEFAULT_TIMEOUT_S = float ( os . environ . get ( " ATOCORE_LLM_EXTRACTOR_TIMEOUT_S " , " 90 " ) )
MAX_RESPONSE_CHARS = 8000
MAX_PROMPT_CHARS = 2000
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
MEMORY_TYPES = { " identity " , " preference " , " project " , " episodic " , " knowledge " , " adaptation " }
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
SYSTEM_PROMPT = """ You extract durable memory candidates from LLM conversation turns for a personal context engine called AtoCore.
Your job is to read one user prompt plus the assistant ' s response and decide which durable facts, decisions, preferences, architectural rules, or project invariants should be remembered across future sessions.
Rules :
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
1. Only surface durable claims . Skip transient status ( " deploy is still running " ) , instructional guidance ( " here is how to run the command " ) , troubleshooting tactics , ephemeral recommendations ( " merge this PR now " ) , and session recaps .
2. A candidate is durable when a reader coming back in two weeks would still need to know it . Architectural choices , named rules , ratified decisions , invariants , procurement commitments , and project - level constraints qualify . Conversational fillers and step - by - step instructions do not .
3. Each candidate must stand alone . Rewrite the claim in one sentence under 200 characters with enough context that a reader without the conversation understands it .
4. Each candidate must have a type from this closed set : project , knowledge , preference , adaptation .
5. If the conversation is clearly scoped to a project ( p04 - gigabit , p05 - interferometer , p06 - polisher , atocore ) , set ` ` project ` ` to that id . Otherwise leave ` ` project ` ` empty .
6. If the response makes no durable claim , return an empty list . It is correct and expected to return [ ] on most conversational turns .
7. Confidence should be 0.5 by default so human review workload is honest . Raise to 0.6 only when the response states the claim in an unambiguous , committed form ( e . g . " the decision is X " , " the selected approach is Y " , " X is non-negotiable " ) .
8. Output must be a raw JSON array and nothing else . No prose before or after . No markdown fences . No explanations .
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
Each array element has exactly this shape :
{ " type " : " project|knowledge|preference|adaptation " , " content " : " ... " , " project " : " ... " , " confidence " : 0.5 }
Return [ ] when there is nothing to extract . """
_sandbox_cwd = None
def get_sandbox_cwd ( ) :
global _sandbox_cwd
if _sandbox_cwd is None :
_sandbox_cwd = tempfile . mkdtemp ( prefix = " ato-llm-extract- " )
return _sandbox_cwd
def api_get ( base_url , path , timeout = 10 ) :
2026-04-12 10:55:22 -04:00
req = urllib . request . Request ( f " { base_url } { path } " )
with urllib . request . urlopen ( req , timeout = timeout ) as resp :
return json . loads ( resp . read ( ) . decode ( " utf-8 " ) )
2026-04-12 10:57:18 -04:00
def api_post ( base_url , path , body , timeout = 10 ) :
2026-04-12 10:55:22 -04:00
data = json . dumps ( body ) . encode ( " utf-8 " )
req = urllib . request . Request (
2026-04-12 10:57:18 -04:00
f " { base_url } { path } " , method = " POST " ,
headers = { " Content-Type " : " application/json " } , data = data ,
2026-04-12 10:55:22 -04:00
)
with urllib . request . urlopen ( req , timeout = timeout ) as resp :
return json . loads ( resp . read ( ) . decode ( " utf-8 " ) )
2026-04-12 10:57:18 -04:00
def get_last_run ( base_url ) :
2026-04-12 10:55:22 -04:00
try :
state = api_get ( base_url , " /project/state/atocore?category=status " )
for entry in state . get ( " entries " , [ ] ) :
if entry . get ( " key " ) == " last_extract_batch_run " :
return entry [ " value " ]
except Exception :
pass
return None
2026-04-12 10:57:18 -04:00
def set_last_run ( base_url , timestamp ) :
2026-04-12 10:55:22 -04:00
try :
api_post ( base_url , " /project/state " , {
2026-04-12 10:57:18 -04:00
" project " : " atocore " , " category " : " status " ,
" key " : " last_extract_batch_run " , " value " : timestamp ,
2026-04-12 10:55:22 -04:00
" source " : " batch_llm_extract_live.py " ,
} )
except Exception :
pass
2026-04-12 10:57:18 -04:00
def extract_one ( prompt , response , project , model , timeout_s ) :
""" Run claude -p on one interaction, return parsed candidates. """
if not shutil . which ( " claude " ) :
return [ ] , " claude_cli_missing "
prompt_excerpt = prompt [ : MAX_PROMPT_CHARS ]
response_excerpt = response [ : MAX_RESPONSE_CHARS ]
user_message = (
f " PROJECT HINT (may be empty): { project } \n \n "
f " USER PROMPT: \n { prompt_excerpt } \n \n "
f " ASSISTANT RESPONSE: \n { response_excerpt } \n \n "
" Return the JSON array now. "
)
args = [
" claude " , " -p " ,
" --model " , model ,
" --append-system-prompt " , SYSTEM_PROMPT ,
" --no-session-persistence " ,
" --disable-slash-commands " ,
user_message ,
]
try :
completed = subprocess . run (
args , capture_output = True , text = True ,
timeout = timeout_s , cwd = get_sandbox_cwd ( ) ,
encoding = " utf-8 " , errors = " replace " ,
)
except subprocess . TimeoutExpired :
return [ ] , " timeout "
except Exception as exc :
return [ ] , f " subprocess_error: { exc } "
if completed . returncode != 0 :
return [ ] , f " exit_ { completed . returncode } "
raw = ( completed . stdout or " " ) . strip ( )
return parse_candidates ( raw , project ) , " "
def parse_candidates ( raw , interaction_project ) :
""" Parse model JSON output into candidate dicts. """
text = raw . strip ( )
if text . startswith ( " ``` " ) :
text = text . strip ( " ` " )
nl = text . find ( " \n " )
if nl > = 0 :
text = text [ nl + 1 : ]
if text . endswith ( " ``` " ) :
text = text [ : - 3 ]
text = text . strip ( )
if not text or text == " [] " :
return [ ]
if not text . lstrip ( ) . startswith ( " [ " ) :
start = text . find ( " [ " )
end = text . rfind ( " ] " )
if start > = 0 and end > start :
text = text [ start : end + 1 ]
try :
parsed = json . loads ( text )
except json . JSONDecodeError :
return [ ]
if not isinstance ( parsed , list ) :
return [ ]
results = [ ]
for item in parsed :
if not isinstance ( item , dict ) :
continue
mem_type = str ( item . get ( " type " ) or " " ) . strip ( ) . lower ( )
content = str ( item . get ( " content " ) or " " ) . strip ( )
project = str ( item . get ( " project " ) or " " ) . strip ( )
if not project and interaction_project :
project = interaction_project
conf = item . get ( " confidence " , 0.5 )
if mem_type not in MEMORY_TYPES or not content :
continue
try :
conf = max ( 0.0 , min ( 1.0 , float ( conf ) ) )
except ( TypeError , ValueError ) :
conf = 0.5
results . append ( {
" memory_type " : mem_type ,
" content " : content [ : 1000 ] ,
" project " : project ,
" confidence " : conf ,
} )
return results
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
def main ( ) :
2026-04-12 10:55:22 -04:00
parser = argparse . ArgumentParser ( description = " Host-side LLM batch extraction " )
parser . add_argument ( " --base-url " , default = DEFAULT_BASE_URL )
parser . add_argument ( " --limit " , type = int , default = 50 )
2026-04-12 10:57:18 -04:00
parser . add_argument ( " --since " , default = None )
parser . add_argument ( " --model " , default = DEFAULT_MODEL )
2026-04-12 10:55:22 -04:00
args = parser . parse_args ( )
since = args . since or get_last_run ( args . base_url )
2026-04-12 10:57:18 -04:00
print ( f " since= { since or ' (first run) ' } limit= { args . limit } model= { args . model } " )
2026-04-12 10:55:22 -04:00
2026-04-12 10:57:18 -04:00
params = [ f " limit= { args . limit } " ]
if since :
params . append ( f " since= { urllib . parse . quote ( since ) } " )
2026-04-12 10:58:00 -04:00
listing = api_get ( args . base_url , f " /interactions? { ' & ' . join ( params ) } " )
interaction_summaries = listing . get ( " interactions " , [ ] )
print ( f " listed { len ( interaction_summaries ) } interactions " )
2026-04-12 10:55:22 -04:00
processed = 0
total_candidates = 0
total_persisted = 0
errors = 0
2026-04-12 10:58:00 -04:00
for summary in interaction_summaries :
resp_chars = summary . get ( " response_chars " , 0 ) or 0
if resp_chars < 50 :
continue
iid = summary [ " id " ]
try :
raw = api_get (
args . base_url ,
f " /interactions/ { urllib . parse . quote ( iid , safe = ' ' ) } " ,
)
except Exception as exc :
print ( f " ! { iid [ : 8 ] } : fetch failed: { exc } " , file = sys . stderr )
errors + = 1
continue
2026-04-12 10:55:22 -04:00
response_text = raw . get ( " response " , " " ) or " "
2026-04-12 10:57:18 -04:00
if not response_text . strip ( ) or len ( response_text ) < 50 :
2026-04-12 10:55:22 -04:00
continue
2026-04-12 10:57:18 -04:00
candidates , error = extract_one (
2026-04-12 10:55:22 -04:00
prompt = raw . get ( " prompt " , " " ) or " " ,
response = response_text ,
project = raw . get ( " project " , " " ) or " " ,
2026-04-12 10:57:18 -04:00
model = args . model ,
timeout_s = DEFAULT_TIMEOUT_S ,
2026-04-12 10:55:22 -04:00
)
2026-04-12 10:57:18 -04:00
if error :
print ( f " ! { raw [ ' id ' ] [ : 8 ] } : { error } " , file = sys . stderr )
2026-04-12 10:55:22 -04:00
errors + = 1
continue
processed + = 1
total_candidates + = len ( candidates )
for c in candidates :
try :
api_post ( args . base_url , " /memory " , {
2026-04-12 10:57:18 -04:00
" memory_type " : c [ " memory_type " ] ,
" content " : c [ " content " ] ,
" project " : c [ " project " ] ,
" confidence " : c [ " confidence " ] ,
2026-04-12 10:55:22 -04:00
" status " : " candidate " ,
} )
total_persisted + = 1
except urllib . error . HTTPError as exc :
2026-04-12 10:57:18 -04:00
if exc . code != 400 :
2026-04-12 10:55:22 -04:00
errors + = 1
except Exception :
errors + = 1
now = datetime . now ( timezone . utc ) . strftime ( " % Y- % m- %d % H: % M: % S " )
set_last_run ( args . base_url , now )
print ( f " processed= { processed } candidates= { total_candidates } persisted= { total_persisted } errors= { errors } " )
if __name__ == " __main__ " :
2026-04-12 10:57:18 -04:00
main ( )