import asyncio import httpx from app.config import settings HEX_API_BASE = "https://app.hex.tech/api/v1" HEADERS = { "Authorization": f"Bearer {settings.HEX_API_TOKEN}", "Content-Type": "application/json", } NOTEBOOKS = { "distraction_patterns": settings.HEX_NB_DISTRACTIONS, "focus_trends": settings.HEX_NB_FOCUS_TRENDS, "weekly_report": settings.HEX_NB_WEEKLY_REPORT, } async def run_notebook(notebook_key: str, user_id: str) -> dict: project_id = NOTEBOOKS.get(notebook_key) if not project_id: raise ValueError(f"Unknown notebook: {notebook_key}") async with httpx.AsyncClient(timeout=60) as http: # Trigger run — POST /projects/{projectId}/runs resp = await http.post( f"{HEX_API_BASE}/projects/{project_id}/runs", headers=HEADERS, json={"inputParams": {"user_id": user_id}}, ) resp.raise_for_status() run_id = resp.json()["runId"] # Poll for completion — GET /projects/{projectId}/runs/{runId} for _ in range(30): status_resp = await http.get( f"{HEX_API_BASE}/projects/{project_id}/runs/{run_id}", headers=HEADERS, ) status_resp.raise_for_status() data = status_resp.json() if data["status"] == "COMPLETED": return {"status": "COMPLETED", "elapsed": data.get("elapsedTime")} if data["status"] in ("ERRORED", "KILLED", "UNABLE_TO_ALLOCATE_KERNEL"): raise RuntimeError(f"Hex run failed: {data['status']}") await asyncio.sleep(2) raise TimeoutError("Hex notebook run timed out")