124 lines
4.3 KiB
Python
124 lines
4.3 KiB
Python
|
|
"""Rolling history buffer for VLM screenshot analysis.
|
||
|
|
|
||
|
|
Two tiers:
|
||
|
|
- Image buffer: deque(maxlen=4) of recent screenshots sent as images
|
||
|
|
- Text history: deque(maxlen=12) of older VLM summaries + previous outputs
|
||
|
|
for extended context (what happened 30-60s ago) and self-refinement
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import time
|
||
|
|
from collections import deque
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class BufferEntry:
|
||
|
|
jpeg: bytes
|
||
|
|
vlm_summary: str
|
||
|
|
timestamp: float = field(default_factory=time.time)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TextEntry:
|
||
|
|
vlm_summary: str
|
||
|
|
timestamp: float
|
||
|
|
|
||
|
|
|
||
|
|
class HistoryBuffer:
|
||
|
|
def __init__(self, image_maxlen: int = 4, text_maxlen: int = 12):
|
||
|
|
self._images: deque[BufferEntry] = deque(maxlen=image_maxlen)
|
||
|
|
self._text_history: deque[TextEntry] = deque(maxlen=text_maxlen)
|
||
|
|
self._last_output: dict | None = None
|
||
|
|
self._last_execution: str | None = None
|
||
|
|
|
||
|
|
def push(self, jpeg: bytes, vlm_summary: str) -> None:
|
||
|
|
now = time.time()
|
||
|
|
# When an image entry gets evicted from the image buffer,
|
||
|
|
# it's already captured in text_history, so nothing extra needed.
|
||
|
|
self._images.append(BufferEntry(jpeg=jpeg, vlm_summary=vlm_summary, timestamp=now))
|
||
|
|
self._text_history.append(TextEntry(vlm_summary=vlm_summary, timestamp=now))
|
||
|
|
|
||
|
|
def set_last_output(self, output: dict) -> None:
|
||
|
|
"""Store the previous VLM JSON output for self-refinement."""
|
||
|
|
self._last_output = output
|
||
|
|
|
||
|
|
def set_last_execution(self, summary: str | None) -> None:
|
||
|
|
"""Store the result of the last executor action."""
|
||
|
|
self._last_execution = summary
|
||
|
|
|
||
|
|
def get_last_execution(self) -> str | None:
|
||
|
|
return self._last_execution
|
||
|
|
|
||
|
|
def clear_last_execution(self) -> None:
|
||
|
|
self._last_execution = None
|
||
|
|
|
||
|
|
def get_entries(self) -> list[BufferEntry]:
|
||
|
|
"""Return image entries oldest-first."""
|
||
|
|
return list(self._images)
|
||
|
|
|
||
|
|
def format_for_prompt(self) -> str:
|
||
|
|
"""Format the full timeline: text history + image labels."""
|
||
|
|
now = time.time()
|
||
|
|
lines: list[str] = []
|
||
|
|
|
||
|
|
# Text-only history (entries older than what's in the image buffer)
|
||
|
|
image_timestamps = {round(e.timestamp, 2) for e in self._images}
|
||
|
|
older = [
|
||
|
|
e for e in self._text_history
|
||
|
|
if round(e.timestamp, 2) not in image_timestamps
|
||
|
|
]
|
||
|
|
older = [e for e in older if e.vlm_summary] # skip empty summaries
|
||
|
|
if older:
|
||
|
|
lines.append("Older context (text only, no images):")
|
||
|
|
for entry in older:
|
||
|
|
ago = int(now - entry.timestamp)
|
||
|
|
lines.append(f" - [{ago}s ago] {entry.vlm_summary}")
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
# Image timeline
|
||
|
|
n = len(self._images)
|
||
|
|
if n > 0:
|
||
|
|
lines.append(f"Recent screenshots ({n} prior + 1 current = {n + 1} images):")
|
||
|
|
for i, entry in enumerate(self._images):
|
||
|
|
ago = int(now - entry.timestamp)
|
||
|
|
lines.append(f" - Screenshot {i + 1}/{n + 1}: [{ago}s ago]")
|
||
|
|
lines.append(f" - Screenshot {n + 1}/{n + 1}: [now] (current)")
|
||
|
|
else:
|
||
|
|
lines.append("Screenshots:")
|
||
|
|
lines.append(" - Screenshot 1/1: [now] (current, first capture)")
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
def format_last_output(self) -> str:
|
||
|
|
"""Format previous VLM output for self-refinement context."""
|
||
|
|
if not self._last_output:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
import json
|
||
|
|
# Only include the key fields, not the full blob
|
||
|
|
prev = self._last_output
|
||
|
|
parts = [
|
||
|
|
f" on_task: {prev.get('on_task')}",
|
||
|
|
f" app: {prev.get('app_name')}",
|
||
|
|
f" friction: {prev.get('friction', {}).get('type')}",
|
||
|
|
f" summary: {prev.get('vlm_summary')}",
|
||
|
|
]
|
||
|
|
note = prev.get("checkpoint_note_update")
|
||
|
|
if note:
|
||
|
|
parts.append(f" checkpoint: {note}")
|
||
|
|
desc = prev.get("friction", {}).get("description")
|
||
|
|
if desc:
|
||
|
|
parts.append(f" friction_desc: {desc}")
|
||
|
|
return "\n".join(parts)
|
||
|
|
|
||
|
|
def __len__(self) -> int:
|
||
|
|
return len(self._images)
|
||
|
|
|
||
|
|
def clear(self) -> None:
|
||
|
|
self._images.clear()
|
||
|
|
self._text_history.clear()
|
||
|
|
self._last_output = None
|
||
|
|
self._last_execution = None
|