"""Agentic executor — uses Gemini 2.5 Pro with tool use to complete actions. When the user approves a proposed action, the executor gets: - The recent screenshots (so it can read source content) - Tool access: read_file, write_file, run_command - A loop: Gemini proposes tool calls → we execute → feed results back → repeat This is a full agent loop, not a single-shot LLM call. Swift portability notes: - The agent loop is the same HTTP pattern (Gemini function calling API) - Tool implementations map to FileManager, NSTask, NSPasteboard - The loop structure is identical in Swift async/await """ from __future__ import annotations import asyncio import base64 import logging import os import subprocess import httpx from argus.buffer import HistoryBuffer from argus.config import GEMINI_API_KEY log = logging.getLogger(__name__) EXECUTOR_MODEL = os.environ.get("EXECUTOR_MODEL", "gemini-2.5-pro") EXECUTOR_URL = ( f"https://generativelanguage.googleapis.com/v1beta/models/{EXECUTOR_MODEL}:generateContent" ) MAX_AGENT_STEPS = 10 # ── Tool definitions (Gemini function calling format) ──────────────────── TOOLS = [ { "functionDeclarations": [ { "name": "read_file", "description": "Read the contents of a file. Use this to inspect source files, code, configs, etc.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute or relative file path to read", } }, "required": ["path"], }, }, { "name": "output", "description": "Display content to the user as a sticky note. Use for: extracted text from PDFs/images, form fill suggestions, content the user needs to paste into binary formats (docx, ppt, websites). The user will copy/paste from this.", "parameters": { "type": "object", "properties": { "title": { "type": "string", "description": "Short title (e.g. 'Extracted receipt', 'Form values')", }, "content": { "type": "string", "description": "The full content to display", }, }, "required": ["title", "content"], }, }, { "name": "write_file", "description": "Write content to an EXISTING plain text file (code, markdown, config, txt). Only use when: (1) you have confirmed the file path via read_file or mdfind, AND (2) the file is a plain text format you can write correctly. NEVER use for binary formats (docx, ppt, pdf, images).", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute file path (must have been confirmed to exist first)", }, "content": { "type": "string", "description": "Complete file content to write", }, }, "required": ["path", "content"], }, }, { "name": "run_command", "description": "Run a shell command and return its output. Use for compiling, testing, listing files, etc.", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute", } }, "required": ["command"], }, }, { "name": "done", "description": "Signal that the task is complete. Call this AFTER using output() to present results. Summary should describe what was shown to the user, not file operations.", "parameters": { "type": "object", "properties": { "summary": { "type": "string", "description": "Brief summary of what was done", } }, "required": ["summary"], }, }, ] } ] # ── Tool implementations ───────────────────────────────────────────────── def _exec_read_file(path: str) -> str: resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path) try: with open(resolved, "r") as f: content = f.read() log.info(" read_file: %s (%d bytes)", resolved, len(content)) return content except OSError as e: return f"Error reading {resolved}: {e}" def _exec_write_file(path: str, content: str) -> str: resolved = path if os.path.isabs(path) else os.path.join(os.getcwd(), path) # Safety: only write to existing text files if not os.path.exists(resolved): return f"Error: {resolved} does not exist. Use output() instead for new content." try: with open(resolved, "r"): pass # confirm it's readable as text except (OSError, UnicodeDecodeError): return f"Error: {resolved} is not a readable text file. Use output() instead." try: with open(resolved, "w") as f: f.write(content) log.info(" write_file: %s (%d bytes)", resolved, len(content)) return f"Successfully wrote {len(content)} bytes to {resolved}" except OSError as e: return f"Error writing {resolved}: {e}" def _exec_output(title: str, content: str) -> str: """Display content to the user via terminal. Swift portability: becomes a sticky note / floating card UI. """ print() print(f"┌── 📋 {title} " + "─" * max(0, 50 - len(title)) + "┐") for line in content.split("\n"): print(f"│ {line}") print(f"└" + "─" * 58 + "┘") print() log.info(" output: %s (%d chars)", title, len(content)) return f"Displayed '{title}' to user ({len(content)} chars)" def _exec_run_command(command: str) -> str: log.info(" run_command: %s", command[:80]) try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=30 ) output = result.stdout if result.stderr: output += "\nSTDERR:\n" + result.stderr if result.returncode != 0: output += f"\n(exit code {result.returncode})" return output[:4000] # cap output length except subprocess.TimeoutExpired: return "Error: command timed out after 30s" def _execute_tool(name: str, args: dict) -> str: if name == "read_file": return _exec_read_file(args["path"]) elif name == "output": return _exec_output(args["title"], args["content"]) elif name == "write_file": return _exec_write_file(args["path"], args["content"]) elif name == "run_command": return _exec_run_command(args["command"]) elif name == "done": return args.get("summary", "Done.") else: return f"Unknown tool: {name}" # ── Agent loop ─────────────────────────────────────────────────────────── async def execute( vlm_payload: dict, action_index: int = 0, *, history: HistoryBuffer | None = None, current_screenshot: bytes | None = None, api_key: str | None = None, ) -> str | None: """Run the agentic executor loop. The agent can read files, write files, and run commands to complete the user's approved action. It loops until it calls done() or hits the step limit. Returns a summary of what was done, or None on failure. """ friction = vlm_payload.get("friction", {}) actions = friction.get("proposed_actions", []) if action_index >= len(actions): log.warning("Action index %d out of range", action_index) return None chosen = actions[action_index] key = api_key or GEMINI_API_KEY if not key: log.warning("No API key for executor") return None log.info("Agent executing: %s", chosen.get("label", "?")[:80]) # Build initial message with screenshots + task context initial_parts = _build_initial_parts(vlm_payload, chosen, history, current_screenshot) # Conversation history for the agent loop contents = [{"role": "user", "parts": initial_parts}] for step in range(MAX_AGENT_STEPS): log.debug("Agent step %d/%d", step + 1, MAX_AGENT_STEPS) payload = { "contents": contents, "tools": TOOLS, "generationConfig": {"temperature": 0.2, "maxOutputTokens": 8192}, } try: async with httpx.AsyncClient(timeout=120.0) as client: for attempt in range(3): resp = await client.post(f"{EXECUTOR_URL}?key={key}", json=payload) if resp.status_code == 429: wait = 2 ** attempt log.warning("Executor 429, retrying in %ds...", wait) await asyncio.sleep(wait) continue resp.raise_for_status() break else: resp.raise_for_status() except Exception: log.exception("Agent API call failed at step %d", step + 1) return None body = resp.json() candidate = body["candidates"][0] response_parts = candidate["content"]["parts"] # Add assistant response to conversation contents.append({"role": "model", "parts": response_parts}) # Check for function calls function_calls = [p for p in response_parts if "functionCall" in p] if not function_calls: # No tool calls — agent returned text, we're done text = "".join(p.get("text", "") for p in response_parts) log.info("Agent finished with text response (step %d)", step + 1) return text.strip() if text.strip() else "Done." # Execute each tool call tool_results = [] done_summary = None for fc_part in function_calls: fc = fc_part["functionCall"] name = fc["name"] args = fc.get("args", {}) print(f" 🔧 {name}({_summarize_args(args)})") result = _execute_tool(name, args) tool_results.append({ "functionResponse": { "name": name, "response": {"result": result}, } }) if name == "done": done_summary = result # Add tool results to conversation contents.append({"role": "user", "parts": tool_results}) if done_summary: log.info("Agent called done() at step %d: %s", step + 1, done_summary[:80]) return done_summary log.warning("Agent hit step limit (%d)", MAX_AGENT_STEPS) return "Agent reached maximum steps without completing." def _build_initial_parts( vlm_payload: dict, action: dict, history: HistoryBuffer | None, current_screenshot: bytes | None, ) -> list[dict]: """Build the initial message parts: screenshots + task prompt.""" parts: list[dict] = [] # Include screenshots so agent can read source content if history: entries = history.get_entries() for i, entry in enumerate(entries): b64 = base64.b64encode(entry.jpeg).decode() parts.append({"text": f"[Screenshot {i + 1}/{len(entries)}]"}) parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}}) if current_screenshot: b64 = base64.b64encode(current_screenshot).decode() parts.append({"text": "[Current screenshot]"}) parts.append({"inlineData": {"mimeType": "image/jpeg", "data": b64}}) friction = vlm_payload.get("friction", {}) prompt = f"""\ The user approved this action. Complete it using the tools available to you. ACTION: {action.get('label', '')} DETAILS: {action.get('details', '')} Context: User's task: {vlm_payload.get('inferred_task', '')} Problem: {friction.get('description', '')} Current state: {vlm_payload.get('checkpoint_note_update', '')} Application: {vlm_payload.get('app_name', '')} Source: {friction.get('source_context', '')} Target: {friction.get('target_context', '')} INSTRUCTIONS: 1. For BINARY files (PDFs, images, etc.): use your VISION. Read content directly from the screenshots — this is your most reliable source for non-text files. 2. For TEXT files (code, markdown, configs, txt): use read_file to get exact content. 3. If you need a file but only know the filename (not the path), FIND IT FIRST: - run_command("mdfind -name 'filename'") — fast macOS Spotlight search - run_command("lsof -c AppName | grep filename") — find what file an app has open Do NOT guess paths. Search first. 4. Choose the right output method: - Binary format targets (docx, ppt, website forms, PDFs): use output() — user will copy/paste. - Existing plain text files (code, markdown, config): use write_file() — modify directly. - write_file() only works on files that ALREADY EXIST. Confirm the path with read_file first. 5. Use run_command to compile, test, or search for files. Never to write files. 6. Do NOT hallucinate content. If you can't read something, say so. 7. Call done() with a summary when the action is complete. Working directory: {os.getcwd()}""" parts.append({"text": prompt}) return parts def _summarize_args(args: dict) -> str: """Short summary of tool args for terminal display.""" parts = [] for k, v in args.items(): sv = str(v) if len(sv) > 50: sv = sv[:47] + "..." parts.append(f"{k}={sv}") return ", ".join(parts)