"""Screenshot capture for macOS. Reads a JPEG file written by the Swift host app (LockInBro.app), which holds the Screen Recording TCC permission. The Swift app writes atomically to /tmp/lockinbro_capture.jpg every ~5s using ScreenCaptureKit. capture_screenshot() blocks up to 15 seconds waiting for a fresh file, then raises RuntimeError if nothing arrives — so loop.py can handle it gracefully without falling back to the screencapture CLI (which would fail with a permissions error when called from a Python subprocess). """ from __future__ import annotations import io import os import time from PIL import Image from argus.config import SCREENSHOT_JPEG_QUALITY, SCREENSHOT_MAX_WIDTH _SWIFT_FRAME_PATH = "/tmp/lockinbro_capture.jpg" _SWIFT_FRAME_MAX_AGE_S = 10.0 # treat as stale if older than this _WAIT_TIMEOUT_S = 15.0 # how long to wait for Swift to write the first frame _WAIT_POLL_S = 0.5 # polling interval while waiting def _to_jpeg(img: Image.Image) -> bytes: """Convert a PIL Image to JPEG bytes, handling RGBA → RGB.""" if img.width > SCREENSHOT_MAX_WIDTH: ratio = SCREENSHOT_MAX_WIDTH / img.width img = img.resize( (SCREENSHOT_MAX_WIDTH, int(img.height * ratio)), Image.LANCZOS, ) if img.mode == "RGBA": img = img.convert("RGB") buf = io.BytesIO() img.save(buf, format="JPEG", quality=SCREENSHOT_JPEG_QUALITY) return buf.getvalue() def _read_swift_frame() -> bytes | None: """Return JPEG bytes from the Swift-provided file if it exists and is fresh.""" try: age = time.time() - os.path.getmtime(_SWIFT_FRAME_PATH) if age >= _SWIFT_FRAME_MAX_AGE_S: return None with open(_SWIFT_FRAME_PATH, "rb") as f: data = f.read() img = Image.open(io.BytesIO(data)) img.load() return _to_jpeg(img) except Exception: return None def capture_screenshot() -> bytes: """Return JPEG bytes for the current screen. Blocks up to _WAIT_TIMEOUT_S seconds waiting for the Swift app to write a fresh frame. Raises RuntimeError if no frame arrives in time. """ deadline = time.time() + _WAIT_TIMEOUT_S while time.time() < deadline: frame = _read_swift_frame() if frame is not None: return frame time.sleep(_WAIT_POLL_S) raise RuntimeError( f"No screenshot received from LockInBro.app within {_WAIT_TIMEOUT_S}s. " "Make sure the app is running and has Screen Recording permission." ) def load_image_file(path: str) -> bytes: """Load an image file and return JPEG bytes (for testing).""" return _to_jpeg(Image.open(path))