385 lines
14 KiB
Python
385 lines
14 KiB
Python
"""Agentic executor — uses Gemini 2.5 Pro with tool use to complete actions.
|
|
|
|
When the user approves a proposed action, the executor gets:
|
|
- The recent screenshots (so it can read source content)
|
|
- Tool access: read_file, write_file, run_command
|
|
- A loop: Gemini proposes tool calls → we execute → feed results back → repeat
|
|
|
|
This is a full agent loop, not a single-shot LLM call.
|
|
|
|
Swift portability notes:
|
|
- The agent loop is the same HTTP pattern (Gemini function calling API)
|
|
- Tool implementations map to FileManager, NSTask, NSPasteboard
|
|
- The loop structure is identical in Swift async/await
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
import httpx
|
|
|
|
from argus.buffer import HistoryBuffer
|
|
from argus.config import GEMINI_API_KEY
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
EXECUTOR_MODEL = os.environ.get("EXECUTOR_MODEL", "gemini-2.5-pro")
|
|
EXECUTOR_URL = (
|
|
f"https://generativelanguage.googleapis.com/v1beta/models/{EXECUTOR_MODEL}:generateContent"
|
|
)
|
|
|
|
MAX_AGENT_STEPS = 10
|
|
|
|
# ── Tool definitions (Gemini function calling format) ────────────────────
|
|
|
|
TOOLS = [
|
|
{
|
|
"functionDeclarations": [
|
|
{
|
|
"name": "read_file",
|
|
"description": "Read the contents of a file. Use this to inspect source files, code, configs, etc.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute or relative file path to read",
|
|
}
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
{
|
|
"name": "output",
|
|
"description": "Display content to the user as a sticky note. Use for: extracted text from PDFs/images, form fill suggestions, content the user needs to paste into binary formats (docx, ppt, websites). The user will copy/paste from this.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"title": {
|
|
"type": "string",
|
|
"description": "Short title (e.g. 'Extracted receipt', 'Form values')",
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "The full content to display",
|
|
},
|
|
},
|
|
"required": ["title", "content"],
|
|
},
|
|
},
|
|
{
|
|
"name": "write_file",
|
|
"description": "Write content to an EXISTING plain text file (code, markdown, config, txt). Only use when: (1) you have confirmed the file path via read_file or mdfind, AND (2) the file is a plain text format you can write correctly. NEVER use for binary formats (docx, ppt, pdf, images).",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute file path (must have been confirmed to exist first)",
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": "Complete file content to write",
|
|
},
|
|
},
|
|
"required": ["path", "content"],
|
|
},
|
|
},
|
|
{
|
|
"name": "run_command",
|
|
"description": "Run a shell command and return its output. Use for compiling, testing, listing files, etc.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {
|
|
"type": "string",
|
|
"description": "The shell command to execute",
|
|
}
|
|
},
|
|
"required": ["command"],
|
|
},
|
|
},
|
|
{
|
|
"name": "done",
|
|
"description": "Signal that the task is complete. Call this AFTER using output() to present results. Summary should describe what was shown to the user, not file operations.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"summary": {
|
|
"type": "string",
|
|
"description": "Brief summary of what was done",
|
|
}
|
|
},
|
|
"required": ["summary"],
|
|
},
|
|
},
|
|
]
|
|
}
|
|
]
|
|
|
|
# ── Tool implementations ─────────────────────────────────────────────────
|
|
|
|
|
|
def _exec_read_file(path: str) -> str:
|
|
resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path)
|
|
try:
|
|
with open(resolved, "r") as f:
|
|
content = f.read()
|
|
log.info(" read_file: %s (%d bytes)", resolved, len(content))
|
|
return content
|
|
except OSError as e:
|
|
return f"Error reading {resolved}: {e}"
|
|
|
|
|
|
def _exec_write_file(path: str, content: str) -> str:
|
|
resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path)
|
|
# Safety: only write to existing text files
|
|
if not os.path.exists(resolved):
|
|
return f"Error: {resolved} does not exist. Use output() instead for new content."
|
|
try:
|
|
with open(resolved, "r"):
|
|
pass # confirm it's readable as text
|
|
except (OSError, UnicodeDecodeError):
|
|
return f"Error: {resolved} is not a readable text file. Use output() instead."
|
|
try:
|
|
with open(resolved, "w") as f:
|
|
f.write(content)
|
|
log.info(" write_file: %s (%d bytes)", resolved, len(content))
|
|
return f"Successfully wrote {len(content)} bytes to {resolved}"
|
|
except OSError as e:
|
|
return f"Error writing {resolved}: {e}"
|
|
|
|
|
|
def _exec_output(title: str, content: str) -> str:
|
|
"""Display content to the user via terminal.
|
|
Swift portability: becomes a sticky note / floating card UI.
|
|
"""
|
|
print()
|
|
print(f"┌── 📋 {title} " + "─" * max(0, 50 - len(title)) + "┐")
|
|
for line in content.split("\n"):
|
|
print(f"│ {line}")
|
|
print(f"└" + "─" * 58 + "┘")
|
|
print()
|
|
log.info(" output: %s (%d chars)", title, len(content))
|
|
return f"Displayed '{title}' to user ({len(content)} chars)"
|
|
|
|
|
|
def _exec_run_command(command: str) -> str:
|
|
log.info(" run_command: %s", command[:80])
|
|
try:
|
|
result = subprocess.run(
|
|
command, shell=True, capture_output=True, text=True, timeout=30
|
|
)
|
|
output = result.stdout
|
|
if result.stderr:
|
|
output += "\nSTDERR:\n" + result.stderr
|
|
if result.returncode != 0:
|
|
output += f"\n(exit code {result.returncode})"
|
|
return output[:4000] # cap output length
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: command timed out after 30s"
|
|
|
|
|
|
def _execute_tool(name: str, args: dict) -> str:
|
|
if name == "read_file":
|
|
return _exec_read_file(args["path"])
|
|
elif name == "output":
|
|
return _exec_output(args["title"], args["content"])
|
|
elif name == "write_file":
|
|
return _exec_write_file(args["path"], args["content"])
|
|
elif name == "run_command":
|
|
return _exec_run_command(args["command"])
|
|
elif name == "done":
|
|
return args.get("summary", "Done.")
|
|
else:
|
|
return f"Unknown tool: {name}"
|
|
|
|
|
|
# ── Agent loop ───────────────────────────────────────────────────────────
|
|
|
|
|
|
async def execute(
|
|
vlm_payload: dict,
|
|
action_index: int = 0,
|
|
*,
|
|
history: HistoryBuffer | None = None,
|
|
current_screenshot: bytes | None = None,
|
|
api_key: str | None = None,
|
|
) -> str | None:
|
|
"""Run the agentic executor loop.
|
|
|
|
The agent can read files, write files, and run commands to complete
|
|
the user's approved action. It loops until it calls done() or hits
|
|
the step limit.
|
|
|
|
Returns a summary of what was done, or None on failure.
|
|
"""
|
|
friction = vlm_payload.get("friction", {})
|
|
actions = friction.get("proposed_actions", [])
|
|
if action_index >= len(actions):
|
|
log.warning("Action index %d out of range", action_index)
|
|
return None
|
|
|
|
chosen = actions[action_index]
|
|
key = api_key or GEMINI_API_KEY
|
|
if not key:
|
|
log.warning("No API key for executor")
|
|
return None
|
|
|
|
log.info("Agent executing: %s", chosen.get("label", "?")[:80])
|
|
|
|
# Build initial message with screenshots + task context
|
|
initial_parts = _build_initial_parts(vlm_payload, chosen, history, current_screenshot)
|
|
|
|
# Conversation history for the agent loop
|
|
contents = [{"role": "user", "parts": initial_parts}]
|
|
|
|
for step in range(MAX_AGENT_STEPS):
|
|
log.debug("Agent step %d/%d", step + 1, MAX_AGENT_STEPS)
|
|
|
|
payload = {
|
|
"contents": contents,
|
|
"tools": TOOLS,
|
|
"generationConfig": {"temperature": 0.2, "maxOutputTokens": 8192},
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
for attempt in range(3):
|
|
resp = await client.post(f"{EXECUTOR_URL}?key={key}", json=payload)
|
|
if resp.status_code == 429:
|
|
wait = 2 ** attempt
|
|
log.warning("Executor 429, retrying in %ds...", wait)
|
|
await asyncio.sleep(wait)
|
|
continue
|
|
resp.raise_for_status()
|
|
break
|
|
else:
|
|
resp.raise_for_status()
|
|
except Exception:
|
|
log.exception("Agent API call failed at step %d", step + 1)
|
|
return None
|
|
|
|
body = resp.json()
|
|
candidate = body["candidates"][0]
|
|
response_parts = candidate["content"]["parts"]
|
|
|
|
# Add assistant response to conversation
|
|
contents.append({"role": "model", "parts": response_parts})
|
|
|
|
# Check for function calls
|
|
function_calls = [p for p in response_parts if "functionCall" in p]
|
|
|
|
if not function_calls:
|
|
# No tool calls — agent returned text, we're done
|
|
text = "".join(p.get("text", "") for p in response_parts)
|
|
log.info("Agent finished with text response (step %d)", step + 1)
|
|
return text.strip() if text.strip() else "Done."
|
|
|
|
# Execute each tool call
|
|
tool_results = []
|
|
done_summary = None
|
|
|
|
for fc_part in function_calls:
|
|
fc = fc_part["functionCall"]
|
|
name = fc["name"]
|
|
args = fc.get("args", {})
|
|
|
|
print(f" 🔧 {name}({_summarize_args(args)})")
|
|
result = _execute_tool(name, args)
|
|
|
|
tool_results.append({
|
|
"functionResponse": {
|
|
"name": name,
|
|
"response": {"result": result},
|
|
}
|
|
})
|
|
|
|
if name == "done":
|
|
done_summary = result
|
|
|
|
# Add tool results to conversation
|
|
contents.append({"role": "user", "parts": tool_results})
|
|
|
|
if done_summary:
|
|
log.info("Agent called done() at step %d: %s", step + 1, done_summary[:80])
|
|
return done_summary
|
|
|
|
log.warning("Agent hit step limit (%d)", MAX_AGENT_STEPS)
|
|
return "Agent reached maximum steps without completing."
|
|
|
|
|
|
def _build_initial_parts(
|
|
vlm_payload: dict,
|
|
action: dict,
|
|
history: HistoryBuffer | None,
|
|
current_screenshot: bytes | None,
|
|
) -> list[dict]:
|
|
"""Build the initial message parts: screenshots + task prompt."""
|
|
parts: list[dict] = []
|
|
|
|
# Include screenshots so agent can read source content
|
|
if history:
|
|
entries = history.get_entries()
|
|
for i, entry in enumerate(entries):
|
|
b64 = base64.b64encode(entry.jpeg).decode()
|
|
parts.append({"text": f"[Screenshot {i + 1}/{len(entries)}]"})
|
|
parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}})
|
|
|
|
if current_screenshot:
|
|
b64 = base64.b64encode(current_screenshot).decode()
|
|
parts.append({"text": "[Current screenshot]"})
|
|
parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}})
|
|
|
|
friction = vlm_payload.get("friction", {})
|
|
prompt = f"""\
|
|
The user approved this action. Complete it using the tools available to you.
|
|
|
|
ACTION: {action.get('label', '')}
|
|
DETAILS: {action.get('details', '')}
|
|
|
|
Context:
|
|
User's task: {vlm_payload.get('inferred_task', '')}
|
|
Problem: {friction.get('description', '')}
|
|
Current state: {vlm_payload.get('checkpoint_note_update', '')}
|
|
Application: {vlm_payload.get('app_name', '')}
|
|
Source: {friction.get('source_context', '')}
|
|
Target: {friction.get('target_context', '')}
|
|
|
|
INSTRUCTIONS:
|
|
1. For BINARY files (PDFs, images, etc.): use your VISION. Read content directly
|
|
from the screenshots — this is your most reliable source for non-text files.
|
|
2. For TEXT files (code, markdown, configs, txt): use read_file to get exact content.
|
|
3. If you need a file but only know the filename (not the path), FIND IT FIRST:
|
|
- run_command("mdfind -name 'filename'") — fast macOS Spotlight search
|
|
- run_command("lsof -c AppName | grep filename") — find what file an app has open
|
|
Do NOT guess paths. Search first.
|
|
4. Choose the right output method:
|
|
- Binary format targets (docx, ppt, website forms, PDFs): use output() — user will copy/paste.
|
|
- Existing plain text files (code, markdown, config): use write_file() — modify directly.
|
|
- write_file() only works on files that ALREADY EXIST. Confirm the path with read_file first.
|
|
5. Use run_command to compile, test, or search for files. Never to write files.
|
|
6. Do NOT hallucinate content. If you can't read something, say so.
|
|
7. Call done() with a summary when the action is complete.
|
|
|
|
Working directory: {os.getcwd()}"""
|
|
|
|
parts.append({"text": prompt})
|
|
return parts
|
|
|
|
|
|
def _summarize_args(args: dict) -> str:
|
|
"""Short summary of tool args for terminal display."""
|
|
parts = []
|
|
for k, v in args.items():
|
|
sv = str(v)
|
|
if len(sv) > 50:
|
|
sv = sv[:47] + "..."
|
|
parts.append(f"{k}={sv}")
|
|
return ", ".join(parts)
|