Files

385 lines
14 KiB
Python
Raw Permalink Normal View History

2026-03-29 06:29:18 -04:00
"""Agentic executor — uses Gemini 2.5 Pro with tool use to complete actions.
When the user approves a proposed action, the executor gets:
- The recent screenshots (so it can read source content)
- Tool access: read_file, write_file, run_command
- A loop: Gemini proposes tool calls we execute feed results back repeat
This is a full agent loop, not a single-shot LLM call.
Swift portability notes:
- The agent loop is the same HTTP pattern (Gemini function calling API)
- Tool implementations map to FileManager, NSTask, NSPasteboard
- The loop structure is identical in Swift async/await
"""
from __future__ import annotations
import asyncio
import base64
import logging
import os
import subprocess
import httpx
from argus.buffer import HistoryBuffer
from argus.config import GEMINI_API_KEY
log = logging.getLogger(__name__)
EXECUTOR_MODEL = os.environ.get("EXECUTOR_MODEL", "gemini-2.5-pro")
EXECUTOR_URL = (
f"https://generativelanguage.googleapis.com/v1beta/models/{EXECUTOR_MODEL}:generateContent"
)
MAX_AGENT_STEPS = 10
# ── Tool definitions (Gemini function calling format) ────────────────────
TOOLS = [
{
"functionDeclarations": [
{
"name": "read_file",
"description": "Read the contents of a file. Use this to inspect source files, code, configs, etc.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative file path to read",
}
},
"required": ["path"],
},
},
{
"name": "output",
"description": "Display content to the user as a sticky note. Use for: extracted text from PDFs/images, form fill suggestions, content the user needs to paste into binary formats (docx, ppt, websites). The user will copy/paste from this.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Short title (e.g. 'Extracted receipt', 'Form values')",
},
"content": {
"type": "string",
"description": "The full content to display",
},
},
"required": ["title", "content"],
},
},
{
"name": "write_file",
"description": "Write content to an EXISTING plain text file (code, markdown, config, txt). Only use when: (1) you have confirmed the file path via read_file or mdfind, AND (2) the file is a plain text format you can write correctly. NEVER use for binary formats (docx, ppt, pdf, images).",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute file path (must have been confirmed to exist first)",
},
"content": {
"type": "string",
"description": "Complete file content to write",
},
},
"required": ["path", "content"],
},
},
{
"name": "run_command",
"description": "Run a shell command and return its output. Use for compiling, testing, listing files, etc.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute",
}
},
"required": ["command"],
},
},
{
"name": "done",
"description": "Signal that the task is complete. Call this AFTER using output() to present results. Summary should describe what was shown to the user, not file operations.",
"parameters": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Brief summary of what was done",
}
},
"required": ["summary"],
},
},
]
}
]
# ── Tool implementations ─────────────────────────────────────────────────
def _exec_read_file(path: str) -> str:
resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path)
try:
with open(resolved, "r") as f:
content = f.read()
log.info(" read_file: %s (%d bytes)", resolved, len(content))
return content
except OSError as e:
return f"Error reading {resolved}: {e}"
def _exec_write_file(path: str, content: str) -> str:
resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path)
# Safety: only write to existing text files
if not os.path.exists(resolved):
return f"Error: {resolved} does not exist. Use output() instead for new content."
try:
with open(resolved, "r"):
pass # confirm it's readable as text
except (OSError, UnicodeDecodeError):
return f"Error: {resolved} is not a readable text file. Use output() instead."
try:
with open(resolved, "w") as f:
f.write(content)
log.info(" write_file: %s (%d bytes)", resolved, len(content))
return f"Successfully wrote {len(content)} bytes to {resolved}"
except OSError as e:
return f"Error writing {resolved}: {e}"
def _exec_output(title: str, content: str) -> str:
"""Display content to the user via terminal.
Swift portability: becomes a sticky note / floating card UI.
"""
print()
print(f"┌── 📋 {title} " + "" * max(0, 50 - len(title)) + "")
for line in content.split("\n"):
print(f"{line}")
print(f"" + "" * 58 + "")
print()
log.info(" output: %s (%d chars)", title, len(content))
return f"Displayed '{title}' to user ({len(content)} chars)"
def _exec_run_command(command: str) -> str:
log.info(" run_command: %s", command[:80])
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True, timeout=30
)
output = result.stdout
if result.stderr:
output += "\nSTDERR:\n" + result.stderr
if result.returncode != 0:
output += f"\n(exit code {result.returncode})"
return output[:4000] # cap output length
except subprocess.TimeoutExpired:
return "Error: command timed out after 30s"
def _execute_tool(name: str, args: dict) -> str:
if name == "read_file":
return _exec_read_file(args["path"])
elif name == "output":
return _exec_output(args["title"], args["content"])
elif name == "write_file":
return _exec_write_file(args["path"], args["content"])
elif name == "run_command":
return _exec_run_command(args["command"])
elif name == "done":
return args.get("summary", "Done.")
else:
return f"Unknown tool: {name}"
# ── Agent loop ───────────────────────────────────────────────────────────
async def execute(
vlm_payload: dict,
action_index: int = 0,
*,
history: HistoryBuffer | None = None,
current_screenshot: bytes | None = None,
api_key: str | None = None,
) -> str | None:
"""Run the agentic executor loop.
The agent can read files, write files, and run commands to complete
the user's approved action. It loops until it calls done() or hits
the step limit.
Returns a summary of what was done, or None on failure.
"""
friction = vlm_payload.get("friction", {})
actions = friction.get("proposed_actions", [])
if action_index >= len(actions):
log.warning("Action index %d out of range", action_index)
return None
chosen = actions[action_index]
key = api_key or GEMINI_API_KEY
if not key:
log.warning("No API key for executor")
return None
log.info("Agent executing: %s", chosen.get("label", "?")[:80])
# Build initial message with screenshots + task context
initial_parts = _build_initial_parts(vlm_payload, chosen, history, current_screenshot)
# Conversation history for the agent loop
contents = [{"role": "user", "parts": initial_parts}]
for step in range(MAX_AGENT_STEPS):
log.debug("Agent step %d/%d", step + 1, MAX_AGENT_STEPS)
payload = {
"contents": contents,
"tools": TOOLS,
"generationConfig": {"temperature": 0.2, "maxOutputTokens": 8192},
}
try:
async with httpx.AsyncClient(timeout=120.0) as client:
for attempt in range(3):
resp = await client.post(f"{EXECUTOR_URL}?key={key}", json=payload)
if resp.status_code == 429:
wait = 2 ** attempt
log.warning("Executor 429, retrying in %ds...", wait)
await asyncio.sleep(wait)
continue
resp.raise_for_status()
break
else:
resp.raise_for_status()
except Exception:
log.exception("Agent API call failed at step %d", step + 1)
return None
body = resp.json()
candidate = body["candidates"][0]
response_parts = candidate["content"]["parts"]
# Add assistant response to conversation
contents.append({"role": "model", "parts": response_parts})
# Check for function calls
function_calls = [p for p in response_parts if "functionCall" in p]
if not function_calls:
# No tool calls — agent returned text, we're done
text = "".join(p.get("text", "") for p in response_parts)
log.info("Agent finished with text response (step %d)", step + 1)
return text.strip() if text.strip() else "Done."
# Execute each tool call
tool_results = []
done_summary = None
for fc_part in function_calls:
fc = fc_part["functionCall"]
name = fc["name"]
args = fc.get("args", {})
print(f" 🔧 {name}({_summarize_args(args)})")
result = _execute_tool(name, args)
tool_results.append({
"functionResponse": {
"name": name,
"response": {"result": result},
}
})
if name == "done":
done_summary = result
# Add tool results to conversation
contents.append({"role": "user", "parts": tool_results})
if done_summary:
log.info("Agent called done() at step %d: %s", step + 1, done_summary[:80])
return done_summary
log.warning("Agent hit step limit (%d)", MAX_AGENT_STEPS)
return "Agent reached maximum steps without completing."
def _build_initial_parts(
vlm_payload: dict,
action: dict,
history: HistoryBuffer | None,
current_screenshot: bytes | None,
) -> list[dict]:
"""Build the initial message parts: screenshots + task prompt."""
parts: list[dict] = []
# Include screenshots so agent can read source content
if history:
entries = history.get_entries()
for i, entry in enumerate(entries):
b64 = base64.b64encode(entry.jpeg).decode()
parts.append({"text": f"[Screenshot {i + 1}/{len(entries)}]"})
parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}})
if current_screenshot:
b64 = base64.b64encode(current_screenshot).decode()
parts.append({"text": "[Current screenshot]"})
parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}})
friction = vlm_payload.get("friction", {})
prompt = f"""\
The user approved this action. Complete it using the tools available to you.
ACTION: {action.get('label', '')}
DETAILS: {action.get('details', '')}
Context:
User's task: {vlm_payload.get('inferred_task', '')}
Problem: {friction.get('description', '')}
Current state: {vlm_payload.get('checkpoint_note_update', '')}
Application: {vlm_payload.get('app_name', '')}
Source: {friction.get('source_context', '')}
Target: {friction.get('target_context', '')}
INSTRUCTIONS:
1. For BINARY files (PDFs, images, etc.): use your VISION. Read content directly
from the screenshots this is your most reliable source for non-text files.
2. For TEXT files (code, markdown, configs, txt): use read_file to get exact content.
3. If you need a file but only know the filename (not the path), FIND IT FIRST:
- run_command("mdfind -name 'filename'") fast macOS Spotlight search
- run_command("lsof -c AppName | grep filename") find what file an app has open
Do NOT guess paths. Search first.
4. Choose the right output method:
- Binary format targets (docx, ppt, website forms, PDFs): use output() user will copy/paste.
- Existing plain text files (code, markdown, config): use write_file() modify directly.
- write_file() only works on files that ALREADY EXIST. Confirm the path with read_file first.
5. Use run_command to compile, test, or search for files. Never to write files.
6. Do NOT hallucinate content. If you can't read something, say so.
7. Call done() with a summary when the action is complete.
Working directory: {os.getcwd()}"""
parts.append({"text": prompt})
return parts
def _summarize_args(args: dict) -> str:
"""Short summary of tool args for terminal display."""
parts = []
for k, v in args.items():
sv = str(v)
if len(sv) > 50:
sv = sv[:47] + "..."
parts.append(f"{k}={sv}")
return ", ".join(parts)