From 37503231b3d1638edda50fe7bd13e9115811b036 Mon Sep 17 00:00:00 2001 From: pulipakaa24 Date: Sun, 29 Mar 2026 06:57:34 -0400 Subject: [PATCH] API --- .gitignore | 8 + CLAUDE.md | 109 ++++++ alembic.ini | 149 ++++++++ alembic/README | 1 + alembic/env.py | 78 ++++ alembic/script.py.mako | 28 ++ alembic/versions/001_initial_schema.py | 116 ++++++ alembic/versions/002_cross_device_handoff.py | 29 ++ alembic/versions/003_proactive_actions.py | 36 ++ app/__init__.py | 0 app/config.py | 25 ++ app/main.py | 65 ++++ app/middleware/__init__.py | 0 app/middleware/auth.py | 67 ++++ app/models.py | 368 ++++++++++++++++++ app/routers/__init__.py | 0 app/routers/analytics.py | 94 +++++ app/routers/auth.py | 145 ++++++++ app/routers/distractions.py | 308 +++++++++++++++ app/routers/proactive.py | 96 +++++ app/routers/sessions.py | 371 +++++++++++++++++++ app/routers/steps.py | 140 +++++++ app/routers/tasks.py | 298 +++++++++++++++ app/services/__init__.py | 0 app/services/db.py | 18 + app/services/hex_service.py | 49 +++ app/services/llm.py | 351 ++++++++++++++++++ app/services/push.py | 283 ++++++++++++++ app/types.py | 69 ++++ environment.yml | 84 +++++ requirements.txt | 59 +++ 31 files changed, 3444 insertions(+) create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 alembic.ini create mode 100644 alembic/README create mode 100644 alembic/env.py create mode 100644 alembic/script.py.mako create mode 100644 alembic/versions/001_initial_schema.py create mode 100644 alembic/versions/002_cross_device_handoff.py create mode 100644 alembic/versions/003_proactive_actions.py create mode 100644 app/__init__.py create mode 100644 app/config.py create mode 100644 app/main.py create mode 100644 app/middleware/__init__.py create mode 100644 app/middleware/auth.py create mode 100644 app/models.py create mode 100644 app/routers/__init__.py create mode 100644 app/routers/analytics.py create mode 100644 app/routers/auth.py create mode 100644 app/routers/distractions.py create mode 100644 app/routers/proactive.py create mode 100644 app/routers/sessions.py create mode 100644 app/routers/steps.py create mode 100644 app/routers/tasks.py create mode 100644 app/services/__init__.py create mode 100644 app/services/db.py create mode 100644 app/services/hex_service.py create mode 100644 app/services/llm.py create mode 100644 app/services/push.py create mode 100644 app/types.py create mode 100644 environment.yml create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c6203c --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +.env +*.egg-info/ +dist/ +build/ +.eggs/ +*.p8 \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..b025d56 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,109 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## What This Is + +LockInBro API — the FastAPI backend for an ADHD-aware productivity system. Runs on a DigitalOcean droplet (1GB RAM) behind nginx with SSL at `https://wahwa.com/api/v1`. PostgreSQL database `focusapp` on the same droplet. + +## Design Doc (Source of Truth) + +**Always consult `/home/devuser/.github/profile/README.md` before making architectural decisions.** This is the full technical design document. Keep it up to date by pulling/pushing, but avoid frequent changes — batch updates when possible. + +## Related Repositories + +- `/home/devuser/BlindMaster/blinds_express` — Previous Express.js project on the same droplet. Contains **server-side auth workflow** (Argon2 + JWT) used as scaffolding reference for this project's auth module. +- `/home/devuser/BlindMaster/blinds_flutter` — Flutter app with **app-side auth flow** being ported to iOS/SwiftUI. + +## Stack + +- **Python / FastAPI** on port 3000, served by uvicorn +- **PostgreSQL** on port 5432, database `focusapp`, `root` superuser +- **Auth:** Argon2id (email/password) + Apple Sign In + JWT (stateless) +- **AI:** Claude API or Gemini API (auto-selects based on which key is set). Brain-dump parsing, VLM screenshot analysis, step planning, context resume +- **Analytics:** Hex API (notebooks query Postgres directly) +- **nginx** reverse-proxies `wahwa.com` → `localhost:3000`, WebSocket support enabled, SSL via Certbot + +## Commands + +```bash +# Activate conda environment (must do first) +source ~/miniconda3/bin/activate && conda activate lockinbro + +# Run the server +uvicorn app.main:app --host 0.0.0.0 --port 3000 --reload + +# Install dependencies +pip install -r requirements.txt + +# Export environment after adding packages +conda env export --no-builds > environment.yml +pip freeze > requirements.txt + +# Database migrations (requires Postgres connection) +alembic upgrade head # apply migrations +alembic revision -m "description" # create new migration (manual SQL in upgrade/downgrade) +``` + +## Architecture + +### Project Structure +``` +├── app/ +│ ├── main.py # FastAPI entry point +│ ├── config.py # Settings from env vars +│ ├── middleware/auth.py # JWT validation + Argon2 utils +│ ├── routers/ # auth, tasks, steps, sessions, distractions, analytics +│ ├── services/ +│ │ ├── llm.py # All Claude API calls + prompt templates +│ │ ├── hex_service.py # Hex notebook trigger + poll +│ │ └── db.py # asyncpg Postgres client +│ ├── models.py # Pydantic request/response schemas +│ └── types.py +├── alembic/ +├── requirements.txt +└── .env +``` + +### Key Data Flows + +1. **Brain-dump parsing:** iOS sends raw text → `POST /tasks/brain-dump` → Claude extracts structured tasks → optionally `POST /tasks/{id}/plan` → Claude generates 5-15 min ADHD-friendly steps +2. **Distraction detection:** macOS sends screenshot (raw JPEG binary via multipart) + task context → `POST /distractions/analyze-screenshot` → Claude Vision analyzes → backend auto-updates step statuses + writes `checkpoint_note` → returns nudge if distracted (confidence > 0.7) +3. **Context resume:** `GET /sessions/{id}/resume` → uses `checkpoint_note` to generate hyper-specific "welcome back" card +4. **Analytics:** Backend writes events to Postgres → Hex notebooks query directly → results served via `/analytics/*` endpoints + +### Critical Design Decisions + +- **Steps are a separate table, not JSONB** — VLM updates individual step statuses every ~20s. Separate rows avoid read-modify-write races. +- **No dynamic step splitting** — Growing a task list mid-work causes ADHD decision paralysis. Use `checkpoint_note` for within-step progress instead. +- **Screenshots are never persisted** — base64 in-memory only, discarded after VLM response. Privacy-critical. +- **Step auto-update is a backend side-effect** — `/analyze-screenshot` applies step changes server-side before responding. Client doesn't make separate step-update calls. +- **1GB RAM constraint** — Limit concurrent VLM requests to 1-2 in-flight. FastAPI + uvicorn uses ~40-60MB. + +### AI/LLM Guidelines + +All AI calls go through `services/llm.py`. Key principles: +- Non-judgmental tone (never "you got distracted again") +- Concise, scannable outputs (ADHD working memory) +- All AI responses are structured JSON +- Every prompt includes full task + step context +- Graceful degradation if Claude API is down + +### Environment Variables (.env) + +``` +DATABASE_URL=postgresql://devuser@/focusapp +JWT_SECRET= +ANTHROPIC_API_KEY= # set one of these two +GEMINI_API_KEY= # prefers Anthropic if both set +HEX_API_TOKEN= +HEX_NB_DISTRACTIONS= +HEX_NB_FOCUS_TRENDS= +HEX_NB_WEEKLY_REPORT= +``` + +## Development Notes + +- Run Claude Code from local machine over SSH, not on the droplet (saves RAM/disk) +- The droplet hostname is `blindmaster-ubuntu` +- PM2 is killed, `blinds_express` is stopped, port 3000 is free for this project diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..f13116f --- /dev/null +++ b/alembic.ini @@ -0,0 +1,149 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s +# Or organize into date-based subdirectories (requires recursive_version_locations = true) +# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the tzdata library which can be installed by adding +# `alembic[tz]` to the pip requirements. +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +sqlalchemy.url = postgresql://devuser@/focusapp + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..36112a3 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,78 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/001_initial_schema.py b/alembic/versions/001_initial_schema.py new file mode 100644 index 0000000..deb6442 --- /dev/null +++ b/alembic/versions/001_initial_schema.py @@ -0,0 +1,116 @@ +"""Initial schema + +Revision ID: 001 +Revises: +Create Date: 2026-03-28 +""" + +from alembic import op + +revision = "001" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute(""" + CREATE TABLE IF NOT EXISTS public.users ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + email TEXT UNIQUE, + password_hash TEXT, + apple_user_id TEXT UNIQUE, + display_name TEXT, + timezone TEXT DEFAULT 'America/Chicago', + distraction_apps TEXT[] DEFAULT '{}', + preferences JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now(), + CONSTRAINT auth_method CHECK (email IS NOT NULL OR apple_user_id IS NOT NULL) + ); + + CREATE TABLE IF NOT EXISTS public.tasks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(id) ON DELETE CASCADE, + title TEXT NOT NULL, + description TEXT, + priority INT DEFAULT 0, + status TEXT DEFAULT 'pending', + deadline TIMESTAMPTZ, + estimated_minutes INT, + source TEXT DEFAULT 'manual', + tags TEXT[] DEFAULT '{}', + plan_type TEXT, + brain_dump_raw TEXT, + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now() + ); + + CREATE TABLE IF NOT EXISTS public.steps ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + task_id UUID NOT NULL REFERENCES public.tasks(id) ON DELETE CASCADE, + sort_order INT NOT NULL, + title TEXT NOT NULL, + description TEXT, + estimated_minutes INT, + status TEXT DEFAULT 'pending', + checkpoint_note TEXT, + last_checked_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT now() + ); + + CREATE TABLE IF NOT EXISTS public.sessions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(id) ON DELETE CASCADE, + task_id UUID REFERENCES public.tasks(id) ON DELETE SET NULL, + started_at TIMESTAMPTZ DEFAULT now(), + ended_at TIMESTAMPTZ, + status TEXT DEFAULT 'active', + checkpoint JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT now() + ); + + CREATE TABLE IF NOT EXISTS public.distractions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(id) ON DELETE CASCADE, + session_id UUID REFERENCES public.sessions(id) ON DELETE SET NULL, + detected_at TIMESTAMPTZ DEFAULT now(), + distraction_type TEXT, + app_name TEXT, + duration_seconds INT, + confidence FLOAT, + vlm_summary TEXT, + nudge_shown BOOLEAN DEFAULT false + ); + + CREATE TABLE IF NOT EXISTS public.distraction_patterns ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(id) ON DELETE CASCADE, + pattern_type TEXT, + description TEXT, + frequency INT DEFAULT 1, + last_seen TIMESTAMPTZ DEFAULT now(), + metadata JSONB DEFAULT '{}' + ); + + CREATE INDEX IF NOT EXISTS idx_tasks_user ON tasks(user_id, status); + CREATE INDEX IF NOT EXISTS idx_steps_task ON steps(task_id, sort_order); + CREATE INDEX IF NOT EXISTS idx_steps_status ON steps(task_id, status); + CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id, started_at DESC); + CREATE INDEX IF NOT EXISTS idx_sessions_active ON sessions(user_id, status) WHERE status = 'active'; + CREATE INDEX IF NOT EXISTS idx_distractions_user ON distractions(user_id, detected_at DESC); + CREATE INDEX IF NOT EXISTS idx_distractions_app ON distractions(user_id, app_name, detected_at DESC); + CREATE INDEX IF NOT EXISTS idx_distractions_hourly ON distractions(user_id, EXTRACT(HOUR FROM detected_at AT TIME ZONE 'UTC')); + """) + + +def downgrade(): + op.execute(""" + DROP TABLE IF EXISTS public.distraction_patterns CASCADE; + DROP TABLE IF EXISTS public.distractions CASCADE; + DROP TABLE IF EXISTS public.sessions CASCADE; + DROP TABLE IF EXISTS public.steps CASCADE; + DROP TABLE IF EXISTS public.tasks CASCADE; + DROP TABLE IF EXISTS public.users CASCADE; + """) diff --git a/alembic/versions/002_cross_device_handoff.py b/alembic/versions/002_cross_device_handoff.py new file mode 100644 index 0000000..655c1c3 --- /dev/null +++ b/alembic/versions/002_cross_device_handoff.py @@ -0,0 +1,29 @@ +"""Cross-device handoff: device_tokens + session platform + +Revision ID: 002 +Revises: 001 +Create Date: 2026-03-28 +""" + +from alembic import op + +revision = "002" +down_revision = "001" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute(""" + ALTER TABLE users ADD COLUMN IF NOT EXISTS device_tokens JSONB DEFAULT '[]'; + + ALTER TABLE sessions ADD COLUMN IF NOT EXISTS platform TEXT DEFAULT 'mac'; + ALTER TABLE sessions ALTER COLUMN platform SET NOT NULL; + """) + + +def downgrade(): + op.execute(""" + ALTER TABLE sessions DROP COLUMN IF EXISTS platform; + ALTER TABLE users DROP COLUMN IF EXISTS device_tokens; + """) diff --git a/alembic/versions/003_proactive_actions.py b/alembic/versions/003_proactive_actions.py new file mode 100644 index 0000000..306ff8e --- /dev/null +++ b/alembic/versions/003_proactive_actions.py @@ -0,0 +1,36 @@ +"""Proactive actions table for Argus layer + +Revision ID: 003 +Revises: 002 +Create Date: 2026-03-28 +""" + +from alembic import op + +revision = "003" +down_revision = "002" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute(""" + CREATE TABLE IF NOT EXISTS public.proactive_actions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(id) ON DELETE CASCADE, + session_id UUID REFERENCES public.sessions(id) ON DELETE SET NULL, + friction_type TEXT NOT NULL, + proposed_action TEXT NOT NULL, + user_choice TEXT, + chosen_action TEXT, + executed BOOLEAN DEFAULT false, + detected_at TIMESTAMPTZ DEFAULT now(), + responded_at TIMESTAMPTZ + ); + + CREATE INDEX IF NOT EXISTS idx_proactive_user ON proactive_actions(user_id, friction_type); + """) + + +def downgrade(): + op.execute("DROP TABLE IF EXISTS public.proactive_actions CASCADE;") diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..257aa79 --- /dev/null +++ b/app/config.py @@ -0,0 +1,25 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + DATABASE_URL: str = "postgresql://root@localhost:5432/focusapp" + JWT_SECRET: str = "change-me" + JWT_ALGORITHM: str = "HS256" + JWT_ACCESS_EXPIRE_MINUTES: int = 60 + JWT_REFRESH_EXPIRE_DAYS: int = 30 + ANTHROPIC_API_KEY: str = "" + GEMINI_API_KEY: str = "" + HEX_API_TOKEN: str = "" + HEX_NB_DISTRACTIONS: str = "" + HEX_NB_FOCUS_TRENDS: str = "" + HEX_NB_WEEKLY_REPORT: str = "" + APPLE_BUNDLE_ID: str = "com.adipu.LockInBroMobile" + APNS_KEY_ID: str = "" + APNS_TEAM_ID: str = "" + APNS_P8_PATH: str = "" # path to the .p8 file on disk + APNS_SANDBOX: bool = False # True for dev/TestFlight, False for App Store + + model_config = {"env_file": ".env", "extra": "ignore"} + + +settings = Settings() diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..b8ea05b --- /dev/null +++ b/app/main.py @@ -0,0 +1,65 @@ +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +from app.config import settings +from app.routers import analytics, auth, distractions, proactive, sessions, steps, tasks +from app.services.db import close_pool, get_pool + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await get_pool() + print(f"APNs config → KEY_ID={settings.APNS_KEY_ID or '(empty)'!r} " + f"TEAM_ID={settings.APNS_TEAM_ID or '(empty)'!r} " + f"P8_PATH={settings.APNS_P8_PATH or '(empty)'!r} " + f"SANDBOX={settings.APNS_SANDBOX} " + f"BUNDLE={settings.APPLE_BUNDLE_ID}") + yield + await close_pool() + + +app = FastAPI( + title="LockInBro API", + version="1.0.0", + root_path="/api/v1", + lifespan=lifespan, +) + +@app.exception_handler(Exception) +async def llm_error_handler(request: Request, exc: Exception): + # Surface LLM provider errors as 502 instead of 500 + exc_name = type(exc).__name__ + if "ClientError" in exc_name or "APIError" in exc_name or "APIConnectionError" in exc_name: + return JSONResponse(status_code=502, content={"detail": f"LLM provider error: {exc}"}) + if isinstance(exc, RuntimeError) and "No LLM API key" in str(exc): + return JSONResponse(status_code=503, content={"detail": str(exc)}) + raise exc + + +@app.middleware("http") +async def log_client_info(request: Request, call_next): + real_ip = request.headers.get("cf-connecting-ip", request.headers.get("x-forwarded-for", "unknown")) + ua = request.headers.get("user-agent", "unknown") + response = await call_next(request) + if request.url.path != "/api/v1/health": + print(f"[REQ] {request.method} {request.url.path} → {response.status_code} | ip={real_ip} ua={ua[:80]}") + return response + + +app.include_router(auth.router) +app.include_router(tasks.router) +app.include_router(steps.router) +app.include_router(sessions.router) +app.include_router(distractions.router) +app.include_router(proactive.router) +app.include_router(analytics.router) + + +@app.get("/health") +async def health(): + return {"status": "ok"} diff --git a/app/middleware/__init__.py b/app/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/middleware/auth.py b/app/middleware/auth.py new file mode 100644 index 0000000..7424d97 --- /dev/null +++ b/app/middleware/auth.py @@ -0,0 +1,67 @@ +from datetime import datetime, timedelta, timezone +from uuid import uuid4 + +from argon2 import PasswordHasher +from argon2.exceptions import VerifyMismatchError +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from jose import JWTError, jwt + +from app.config import settings + +ph = PasswordHasher() +bearer_scheme = HTTPBearer() + + +def hash_password(password: str) -> str: + return ph.hash(password) + + +def verify_password(password: str, hashed: str) -> bool: + try: + return ph.verify(hashed, password) + except VerifyMismatchError: + return False + + +def create_access_token(user_id: str) -> str: + now = datetime.now(timezone.utc) + payload = { + "sub": user_id, + "iat": now, + "exp": now + timedelta(minutes=settings.JWT_ACCESS_EXPIRE_MINUTES), + "type": "access", + } + return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) + + +def create_refresh_token(user_id: str) -> str: + now = datetime.now(timezone.utc) + payload = { + "sub": user_id, + "iat": now, + "exp": now + timedelta(days=settings.JWT_REFRESH_EXPIRE_DAYS), + "type": "refresh", + "jti": str(uuid4()), + } + return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) + + +def decode_token(token: str, expected_type: str = "access") -> dict: + try: + payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) + except JWTError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + if payload.get("type") != expected_type: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type") + return payload + + +async def get_current_user_id( + credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), +) -> str: + payload = decode_token(credentials.credentials, expected_type="access") + user_id = payload.get("sub") + if not user_id: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + return user_id diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..2dcb16f --- /dev/null +++ b/app/models.py @@ -0,0 +1,368 @@ +from datetime import datetime +from uuid import UUID + +from pydantic import BaseModel, EmailStr, Field + + +# ── Auth ── + + +class RegisterRequest(BaseModel): + email: EmailStr + password: str = Field(min_length=8) + display_name: str | None = None + timezone: str = "America/Chicago" + + +class LoginRequest(BaseModel): + email: EmailStr + password: str + + +class AppleAuthRequest(BaseModel): + identity_token: str + authorization_code: str + full_name: str | None = None + + +class RefreshRequest(BaseModel): + refresh_token: str + + +class AuthResponse(BaseModel): + access_token: str + refresh_token: str + expires_in: int + user: "UserOut" + + +class UserOut(BaseModel): + id: UUID + email: str | None + display_name: str | None + timezone: str + created_at: datetime + + +# ── Tasks ── + + +class TaskCreate(BaseModel): + title: str + description: str | None = None + priority: int = Field(0, ge=0, le=4) + deadline: datetime | None = None + estimated_minutes: int | None = None + tags: list[str] = [] + + +class TaskUpdate(BaseModel): + title: str | None = None + description: str | None = None + priority: int | None = Field(None, ge=0, le=4) + status: str | None = None + deadline: datetime | None = None + estimated_minutes: int | None = None + tags: list[str] | None = None + + +class TaskOut(BaseModel): + id: UUID + user_id: UUID + title: str + description: str | None + priority: int + status: str + deadline: datetime | None + estimated_minutes: int | None + source: str + tags: list[str] + plan_type: str | None + brain_dump_raw: str | None + created_at: datetime + updated_at: datetime + + +class BrainDumpRequest(BaseModel): + raw_text: str + source: str = "manual" + timezone: str = "America/Chicago" + + +class ParsedSubtask(BaseModel): + title: str + description: str | None = None + deadline: datetime | None = None + estimated_minutes: int | None = None + suggested: bool = False + + +class ParsedTask(BaseModel): + task_id: str | None = None + title: str + description: str | None = None + priority: int = 0 + deadline: datetime | None = None + estimated_minutes: int | None = None + tags: list[str] = [] + subtasks: list[ParsedSubtask] = [] + +class BrainDumpResponse(BaseModel): + parsed_tasks: list[ParsedTask] + unparseable_fragments: list[str] = [] + ask_for_plans: bool = True + + +class PlanRequest(BaseModel): + plan_type: str = "llm_generated" + + +# ── Steps ── + + +class StepOut(BaseModel): + id: UUID + task_id: UUID + sort_order: int + title: str + description: str | None + estimated_minutes: int | None + status: str + checkpoint_note: str | None + last_checked_at: datetime | None + completed_at: datetime | None + created_at: datetime + + +class StepUpdate(BaseModel): + title: str | None = None + description: str | None = None + estimated_minutes: int | None = None + sort_order: int | None = None + status: str | None = None + checkpoint_note: str | None = None + + +class PlanResponse(BaseModel): + task_id: UUID + plan_type: str + steps: list[StepOut] + + +# ── Sessions ── + + +class SessionStartRequest(BaseModel): + task_id: UUID | None = None + platform: str = "mac" + work_app_bundle_ids: list[str] | None = None + + +class SessionCheckpointRequest(BaseModel): + current_step_id: UUID | None = None + last_action_summary: str | None = None + next_up: str | None = None + goal: str | None = None + active_app: str | None = None + last_screenshot_analysis: str | None = None + attention_score: int | None = None + distraction_count: int | None = None + + +class SessionEndRequest(BaseModel): + status: str = "completed" + + +class OpenSessionOut(BaseModel): + id: UUID + task_id: UUID | None + task: dict | None + status: str + platform: str + started_at: datetime + ended_at: datetime | None + checkpoint: dict + + +class SessionJoinRequest(BaseModel): + platform: str = "ipad" + work_app_bundle_ids: list[str] | None = None + + +class SessionJoinResponse(BaseModel): + session_id: UUID + joined: bool + task: dict | None + current_step: dict | None + all_steps: list[StepOut] + suggested_app_scheme: str | None + suggested_app_name: str | None + + +class SessionOut(BaseModel): + id: UUID + user_id: UUID + task_id: UUID | None + platform: str + started_at: datetime + ended_at: datetime | None + status: str + checkpoint: dict + created_at: datetime + + +class ResumeCard(BaseModel): + welcome_back: str + you_were_doing: str + next_step: str + motivation: str + + +class SessionResumeResponse(BaseModel): + session_id: UUID + task: dict | None + current_step: dict | None + progress: dict + resume_card: ResumeCard + + +# ── Distractions ── + + +class ScreenshotAnalysisResponse(BaseModel): + on_task: bool + current_step_id: UUID | None + checkpoint_note_update: str | None + steps_completed: list[UUID] = [] + distraction_type: str | None + app_name: str | None + confidence: float + gentle_nudge: str | None + vlm_summary: str | None + + +class AppActivityRequest(BaseModel): + session_id: UUID + app_bundle_id: str + app_name: str + duration_seconds: int + returned_to_task: bool = False + + +class AppActivityResponse(BaseModel): + distraction_logged: bool + session_distraction_count: int + gentle_nudge: str | None + + +class AppCheckRequest(BaseModel): + app_bundle_id: str + + +class AppCheckResponse(BaseModel): + is_distraction_app: bool + pending_task_count: int + most_urgent_task: dict | None + nudge: str | None + + +# ── Analyze Result (Device-Side VLM) ── + + +class ProposedAction(BaseModel): + label: str + action_type: str + details: str | None = None + + +class FrictionDetection(BaseModel): + type: str = "none" + confidence: float = 0.0 + description: str | None = None + proposed_actions: list[ProposedAction] = [] + source_context: str | None = None + target_context: str | None = None + + +class SessionAction(BaseModel): + type: str = "none" # resume | switch | complete | start_new | none + session_id: UUID | None = None + reason: str | None = None + + +class AnalyzeResultRequest(BaseModel): + session_id: UUID | None = None # optional — VLM can run without an active session + on_task: bool + current_step_id: UUID | None = None + inferred_task: str | None = None + checkpoint_note_update: str | None = None + steps_completed: list[UUID] = [] + friction: FrictionDetection = FrictionDetection() + session_action: SessionAction = SessionAction() + intent: str | None = None + distraction_type: str | None = None + app_name: str | None = None + confidence: float = 0.0 + gentle_nudge: str | None = None + vlm_summary: str | None = None + + +class AnalyzeResultResponse(BaseModel): + side_effects_applied: bool + steps_updated: int + distraction_logged: bool + proactive_action_id: UUID | None = None + + +# ── Proactive Actions ── + + +class ProactiveRespondRequest(BaseModel): + proactive_action_id: UUID + user_choice: str # accepted | declined | alternative_chosen + chosen_action: str | None = None + + +class ProactiveRespondResponse(BaseModel): + logged: bool + should_execute: bool + + +class ProactiveExecuteRequest(BaseModel): + proactive_action_id: UUID + action_type: str + execution_params: dict = {} + + +class ProactiveExecuteResponse(BaseModel): + executed: bool + result: str | None = None + + +class ProactivePreference(BaseModel): + preferred_action: str | None + total_choices: int + acceptance_rate: float + + +class ProactivePreferencesResponse(BaseModel): + preferences: dict[str, ProactivePreference] + + +# ── Device Token ── + + +class DeviceTokenRequest(BaseModel): + platform: str + token: str + + +# ── Analytics ── + + +class AnalyticsSummary(BaseModel): + total_focus_minutes: float + sessions_completed: int + tasks_completed: int + top_distractors: list[dict] + avg_attention_score: float | None diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/analytics.py b/app/routers/analytics.py new file mode 100644 index 0000000..cea9701 --- /dev/null +++ b/app/routers/analytics.py @@ -0,0 +1,94 @@ +from fastapi import APIRouter, Depends, HTTPException + +from app.middleware.auth import get_current_user_id +from app.models import AnalyticsSummary +from app.services.db import get_pool +from app.services.hex_service import run_notebook + +router = APIRouter(prefix="/analytics", tags=["analytics"]) + + +@router.get("/distractions") +async def distraction_analytics(user_id: str = Depends(get_current_user_id)): + try: + return await run_notebook("distraction_patterns", user_id) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Hex error: {e}") + + +@router.get("/focus-trends") +async def focus_trends(user_id: str = Depends(get_current_user_id)): + try: + return await run_notebook("focus_trends", user_id) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Hex error: {e}") + + +@router.get("/weekly-report") +async def weekly_report(user_id: str = Depends(get_current_user_id)): + try: + return await run_notebook("weekly_report", user_id) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Hex error: {e}") + + +@router.post("/refresh") +async def refresh_analytics(user_id: str = Depends(get_current_user_id)): + results = {} + for key in ("distraction_patterns", "focus_trends", "weekly_report"): + try: + results[key] = await run_notebook(key, user_id) + except Exception as e: + results[key] = {"error": str(e)} + return results + + +@router.get("/summary", response_model=AnalyticsSummary) +async def analytics_summary(user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Direct Postgres queries — no Hex needed + focus_minutes = await pool.fetchval( + """SELECT COALESCE(SUM(EXTRACT(EPOCH FROM (ended_at - started_at)) / 60), 0) + FROM sessions WHERE user_id = $1::uuid AND ended_at IS NOT NULL + AND started_at > now() - interval '7 days'""", + user_id, + ) + + sessions_completed = await pool.fetchval( + """SELECT COUNT(*) FROM sessions + WHERE user_id = $1::uuid AND status = 'completed' + AND started_at > now() - interval '7 days'""", + user_id, + ) + + tasks_completed = await pool.fetchval( + """SELECT COUNT(*) FROM tasks + WHERE user_id = $1::uuid AND status = 'done' + AND updated_at > now() - interval '7 days'""", + user_id, + ) + + top_distractors = await pool.fetch( + """SELECT app_name, COUNT(*) as count + FROM distractions + WHERE user_id = $1::uuid AND detected_at > now() - interval '7 days' + GROUP BY app_name ORDER BY count DESC LIMIT 5""", + user_id, + ) + + avg_attention = await pool.fetchval( + """SELECT AVG((checkpoint->>'attention_score')::float) + FROM sessions + WHERE user_id = $1::uuid AND checkpoint->>'attention_score' IS NOT NULL + AND started_at > now() - interval '7 days'""", + user_id, + ) + + return AnalyticsSummary( + total_focus_minutes=float(focus_minutes or 0), + sessions_completed=sessions_completed or 0, + tasks_completed=tasks_completed or 0, + top_distractors=[{"app_name": r["app_name"], "count": r["count"]} for r in top_distractors], + avg_attention_score=float(avg_attention) if avg_attention else None, + ) diff --git a/app/routers/auth.py b/app/routers/auth.py new file mode 100644 index 0000000..fdad094 --- /dev/null +++ b/app/routers/auth.py @@ -0,0 +1,145 @@ +from fastapi import APIRouter, Depends, HTTPException, status + +from app.middleware.auth import ( + create_access_token, + create_refresh_token, + decode_token, + get_current_user_id, + hash_password, + verify_password, +) +from app.models import ( + AppleAuthRequest, + AuthResponse, + DeviceTokenRequest, + LoginRequest, + RefreshRequest, + RegisterRequest, + UserOut, +) +from app.services import push +from app.services.db import get_pool + +router = APIRouter(prefix="/auth", tags=["auth"]) + + +def _build_auth_response(user_row) -> AuthResponse: + user_id = str(user_row["id"]) + return AuthResponse( + access_token=create_access_token(user_id), + refresh_token=create_refresh_token(user_id), + expires_in=3600, + user=UserOut( + id=user_row["id"], + email=user_row["email"], + display_name=user_row["display_name"], + timezone=user_row["timezone"], + created_at=user_row["created_at"], + ), + ) + + +@router.post("/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED) +async def register(req: RegisterRequest): + pool = await get_pool() + + existing = await pool.fetchrow("SELECT id FROM users WHERE email = $1", req.email) + if existing: + raise HTTPException(status_code=409, detail="Email already registered") + + hashed = hash_password(req.password) + row = await pool.fetchrow( + """INSERT INTO users (email, password_hash, display_name, timezone) + VALUES ($1, $2, $3, $4) + RETURNING id, email, display_name, timezone, created_at""", + req.email, + hashed, + req.display_name, + req.timezone, + ) + return _build_auth_response(row) + + +@router.post("/login", response_model=AuthResponse) +async def login(req: LoginRequest): + pool = await get_pool() + + row = await pool.fetchrow( + "SELECT id, email, password_hash, display_name, timezone, created_at FROM users WHERE email = $1", + req.email, + ) + if not row or not row["password_hash"]: + raise HTTPException(status_code=401, detail="Invalid credentials") + + if not verify_password(req.password, row["password_hash"]): + raise HTTPException(status_code=401, detail="Invalid credentials") + + return _build_auth_response(row) + + +@router.post("/apple", response_model=AuthResponse) +async def apple_auth(req: AppleAuthRequest): + # Decode the Apple identity token to extract the subject (user ID) + # In production, verify signature against Apple's public keys + from jose import jwt as jose_jwt + + try: + # Decode without verification for hackathon — in prod, fetch Apple's JWKS + claims = jose_jwt.get_unverified_claims(req.identity_token) + apple_user_id = claims["sub"] + email = claims.get("email") + except Exception: + raise HTTPException(status_code=400, detail="Invalid Apple identity token") + + pool = await get_pool() + + # Try to find existing user + row = await pool.fetchrow( + "SELECT id, email, display_name, timezone, created_at FROM users WHERE apple_user_id = $1", + apple_user_id, + ) + if row: + return _build_auth_response(row) + + # Check if email already exists (link accounts) + if email: + row = await pool.fetchrow( + "SELECT id, email, display_name, timezone, created_at FROM users WHERE email = $1", + email, + ) + if row: + await pool.execute("UPDATE users SET apple_user_id = $1 WHERE id = $2", apple_user_id, row["id"]) + return _build_auth_response(row) + + # Create new user + row = await pool.fetchrow( + """INSERT INTO users (apple_user_id, email, display_name, timezone) + VALUES ($1, $2, $3, $4) + RETURNING id, email, display_name, timezone, created_at""", + apple_user_id, + email, + req.full_name, + "America/Chicago", + ) + return _build_auth_response(row) + + +@router.post("/refresh", response_model=AuthResponse) +async def refresh(req: RefreshRequest): + payload = decode_token(req.refresh_token, expected_type="refresh") + user_id = payload["sub"] + + pool = await get_pool() + row = await pool.fetchrow( + "SELECT id, email, display_name, timezone, created_at FROM users WHERE id = $1::uuid", + user_id, + ) + if not row: + raise HTTPException(status_code=401, detail="User not found") + + return _build_auth_response(row) + + +@router.post("/device-token", status_code=204) +async def register_device(req: DeviceTokenRequest, user_id: str = Depends(get_current_user_id)): + await push.register_device_token(user_id, req.platform, req.token) diff --git a/app/routers/distractions.py b/app/routers/distractions.py new file mode 100644 index 0000000..1ba61aa --- /dev/null +++ b/app/routers/distractions.py @@ -0,0 +1,308 @@ +import json +from uuid import UUID + +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile + +from app.middleware.auth import get_current_user_id +from app.models import ( + AnalyzeResultRequest, + AnalyzeResultResponse, + AppActivityRequest, + AppActivityResponse, + AppCheckRequest, + AppCheckResponse, + ScreenshotAnalysisResponse, +) +from app.services import llm +from app.services.db import get_pool + +router = APIRouter(prefix="/distractions", tags=["distractions"]) + + +@router.post("/analyze-result", response_model=AnalyzeResultResponse) +async def analyze_result(req: AnalyzeResultRequest, user_id: str = Depends(get_current_user_id)): + """Primary endpoint: receives pre-analyzed VLM JSON from device-side. No image. + Works with or without an active session (VLM can run in always-on mode).""" + pool = await get_pool() + + # Session is optional — VLM can run without one (always-on mode) + session = None + if req.session_id: + session = await pool.fetchrow( + "SELECT id, task_id FROM sessions WHERE id = $1 AND user_id = $2::uuid AND status = 'active'", + req.session_id, + user_id, + ) + + session_id_str = str(req.session_id) if req.session_id else None + steps_updated = 0 + + # Side-effect 1: mark completed steps + for completed_id in req.steps_completed: + await pool.execute( + "UPDATE steps SET status = 'done', completed_at = now() WHERE id = $1", + completed_id, + ) + steps_updated += 1 + + # Side-effect 2: update checkpoint_note on current step + if req.current_step_id and req.checkpoint_note_update: + await pool.execute( + "UPDATE steps SET checkpoint_note = $1, last_checked_at = now() WHERE id = $2", + req.checkpoint_note_update, + req.current_step_id, + ) + steps_updated += 1 + + # Side-effect 3: log distraction if off-task + distraction_logged = False + if not req.on_task: + await pool.execute( + """INSERT INTO distractions (user_id, session_id, distraction_type, app_name, + confidence, vlm_summary, nudge_shown) + VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7)""", + user_id, + session_id_str, + req.distraction_type, + req.app_name, + req.confidence, + req.vlm_summary, + req.confidence > 0.7, + ) + distraction_logged = True + + # Side-effect 4: store proactive action if friction detected + proactive_action_id = None + if req.friction.type != "none" and req.friction.confidence > 0.7: + actions_json = json.dumps([a.model_dump() for a in req.friction.proposed_actions]) + row = await pool.fetchrow( + """INSERT INTO proactive_actions (user_id, session_id, friction_type, proposed_action) + VALUES ($1::uuid, $2::uuid, $3, $4) + RETURNING id""", + user_id, + session_id_str, + req.friction.type, + actions_json, + ) + proactive_action_id = row["id"] + + # Side-effect 5: update session checkpoint (if session exists) + if session: + checkpoint_data = { + "last_vlm_summary": req.vlm_summary, + "active_app": req.app_name, + } + if req.current_step_id: + checkpoint_data["current_step_id"] = str(req.current_step_id) + if req.inferred_task: + checkpoint_data["inferred_task"] = req.inferred_task + await pool.execute( + "UPDATE sessions SET checkpoint = checkpoint || $1::jsonb WHERE id = $2", + json.dumps(checkpoint_data), + req.session_id, + ) + + return AnalyzeResultResponse( + side_effects_applied=True, + steps_updated=steps_updated, + distraction_logged=distraction_logged, + proactive_action_id=proactive_action_id, + ) + + +@router.post("/analyze-screenshot", response_model=ScreenshotAnalysisResponse) +async def analyze_screenshot( + screenshot: UploadFile = File(...), + window_title: str = Form(...), + session_id: str = Form(...), + task_context: str = Form(...), + user_id: str = Depends(get_current_user_id), +): + pool = await get_pool() + + # Verify session belongs to user + session = await pool.fetchrow( + "SELECT id, task_id FROM sessions WHERE id = $1::uuid AND user_id = $2::uuid AND status = 'active'", + session_id, + user_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Active session not found") + + screenshot_bytes = await screenshot.read() + context = json.loads(task_context) + + # Call Claude Vision + analysis = await llm.analyze_screenshot(screenshot_bytes, window_title, context) + + # Side-effect: update step statuses + for completed_id in analysis.get("steps_completed", []): + await pool.execute( + "UPDATE steps SET status = 'done', completed_at = now() WHERE id = $1::uuid", + str(completed_id), + ) + + # Side-effect: update checkpoint_note on current step + current_step_id = analysis.get("current_step_id") + checkpoint_update = analysis.get("checkpoint_note_update") + if current_step_id and checkpoint_update: + await pool.execute( + "UPDATE steps SET checkpoint_note = $1, last_checked_at = now() WHERE id = $2::uuid", + checkpoint_update, + str(current_step_id), + ) + + # Side-effect: log distraction event if off-task + if not analysis.get("on_task", True): + await pool.execute( + """INSERT INTO distractions (user_id, session_id, distraction_type, app_name, + confidence, vlm_summary, nudge_shown) + VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7)""", + user_id, + session_id, + analysis.get("distraction_type"), + analysis.get("app_name"), + analysis.get("confidence", 0), + analysis.get("vlm_summary"), + analysis.get("confidence", 0) > 0.7, + ) + + # Update session checkpoint + checkpoint_data = { + "current_step_id": str(current_step_id) if current_step_id else None, + "last_screenshot_analysis": analysis.get("vlm_summary"), + "active_app": analysis.get("app_name"), + } + await pool.execute( + "UPDATE sessions SET checkpoint = checkpoint || $1::jsonb WHERE id = $2::uuid", + json.dumps(checkpoint_data), + session_id, + ) + + return ScreenshotAnalysisResponse( + on_task=analysis.get("on_task", True), + current_step_id=current_step_id, + checkpoint_note_update=checkpoint_update, + steps_completed=analysis.get("steps_completed", []), + distraction_type=analysis.get("distraction_type"), + app_name=analysis.get("app_name"), + confidence=analysis.get("confidence", 0), + gentle_nudge=analysis.get("gentle_nudge") if analysis.get("confidence", 0) > 0.7 else None, + vlm_summary=analysis.get("vlm_summary"), + ) + + +@router.post("/app-check", response_model=AppCheckResponse) +async def app_check(req: AppCheckRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Check if app is in user's distraction list + user = await pool.fetchrow( + "SELECT distraction_apps FROM users WHERE id = $1::uuid", + user_id, + ) + is_distraction = req.app_bundle_id in (user["distraction_apps"] or []) if user else False + + # Get pending task count and most urgent + pending_count = await pool.fetchval( + "SELECT COUNT(*) FROM tasks WHERE user_id = $1::uuid AND status NOT IN ('done', 'deferred')", + user_id, + ) + + urgent_task = await pool.fetchrow( + """SELECT t.id, t.title, t.priority, t.deadline, + (SELECT COUNT(*) FROM steps WHERE task_id = t.id AND status != 'done') as steps_remaining, + (SELECT title FROM steps WHERE task_id = t.id AND status = 'in_progress' ORDER BY sort_order LIMIT 1) as current_step + FROM tasks t + WHERE t.user_id = $1::uuid AND t.status NOT IN ('done', 'deferred') + ORDER BY t.priority DESC, t.deadline ASC NULLS LAST + LIMIT 1""", + user_id, + ) + + most_urgent = None + nudge = None + if urgent_task: + most_urgent = { + "title": urgent_task["title"], + "priority": urgent_task["priority"], + "deadline": urgent_task["deadline"].isoformat() if urgent_task["deadline"] else None, + "current_step": urgent_task["current_step"], + "steps_remaining": urgent_task["steps_remaining"], + } + if is_distraction: + nudge = f"Hey, quick check-in! You have {pending_count} task{'s' if pending_count != 1 else ''} waiting. Top priority: {urgent_task['title']}" + if urgent_task["deadline"]: + nudge += f" (due {urgent_task['deadline'].strftime('%b %d')})." + else: + nudge += "." + + return AppCheckResponse( + is_distraction_app=is_distraction, + pending_task_count=pending_count, + most_urgent_task=most_urgent, + nudge=nudge, + ) + + +@router.post("/app-activity", response_model=AppActivityResponse) +async def app_activity(req: AppActivityRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Verify session belongs to user and is active + session = await pool.fetchrow( + "SELECT id, task_id, checkpoint FROM sessions WHERE id = $1 AND user_id = $2::uuid AND status = 'active'", + req.session_id, + user_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Active session not found") + + # Log distraction + await pool.execute( + """INSERT INTO distractions (user_id, session_id, distraction_type, app_name, + duration_seconds, confidence, nudge_shown) + VALUES ($1::uuid, $2, 'app_switch', $3, $4, 1.0, true)""", + user_id, + str(req.session_id), + req.app_name, + req.duration_seconds, + ) + + # Count session distractions + distraction_count = await pool.fetchval( + "SELECT COUNT(*) FROM distractions WHERE session_id = $1", + req.session_id, + ) + + # Update session checkpoint distraction_count + await pool.execute( + "UPDATE sessions SET checkpoint = checkpoint || $1::jsonb WHERE id = $2", + json.dumps({"distraction_count": distraction_count}), + req.session_id, + ) + + # Generate nudge using task + step context + nudge = None + if session["task_id"]: + task = await pool.fetchrow("SELECT title FROM tasks WHERE id = $1", session["task_id"]) + current_step = await pool.fetchrow( + "SELECT title, checkpoint_note FROM steps WHERE task_id = $1 AND status = 'in_progress' ORDER BY sort_order LIMIT 1", + session["task_id"], + ) + try: + nudge = await llm.generate_app_activity_nudge( + app_name=req.app_name, + duration_seconds=req.duration_seconds, + task_title=task["title"] if task else "your task", + current_step_title=current_step["title"] if current_step else None, + checkpoint_note=current_step["checkpoint_note"] if current_step else None, + ) + except Exception: + nudge = f"Hey, {req.app_name} grabbed your attention for a bit. Ready to jump back in?" + + return AppActivityResponse( + distraction_logged=True, + session_distraction_count=distraction_count, + gentle_nudge=nudge, + ) diff --git a/app/routers/proactive.py b/app/routers/proactive.py new file mode 100644 index 0000000..a30d0ff --- /dev/null +++ b/app/routers/proactive.py @@ -0,0 +1,96 @@ +from fastapi import APIRouter, Depends, HTTPException + +from app.middleware.auth import get_current_user_id +from app.models import ( + ProactiveExecuteRequest, + ProactiveExecuteResponse, + ProactivePreference, + ProactivePreferencesResponse, + ProactiveRespondRequest, + ProactiveRespondResponse, +) +from app.services.db import get_pool + +router = APIRouter(prefix="/proactive", tags=["proactive"]) + + +@router.post("/respond", response_model=ProactiveRespondResponse) +async def respond_to_action(req: ProactiveRespondRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + row = await pool.fetchrow( + "SELECT id, user_id FROM proactive_actions WHERE id = $1 AND user_id = $2::uuid", + req.proactive_action_id, + user_id, + ) + if not row: + raise HTTPException(status_code=404, detail="Proactive action not found") + + await pool.execute( + "UPDATE proactive_actions SET user_choice = $1, chosen_action = $2, responded_at = now() WHERE id = $3", + req.user_choice, + req.chosen_action, + req.proactive_action_id, + ) + + return ProactiveRespondResponse( + logged=True, + should_execute=req.user_choice == "accepted", + ) + + +@router.post("/execute", response_model=ProactiveExecuteResponse) +async def execute_action(req: ProactiveExecuteRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + row = await pool.fetchrow( + "SELECT id, user_choice FROM proactive_actions WHERE id = $1 AND user_id = $2::uuid", + req.proactive_action_id, + user_id, + ) + if not row: + raise HTTPException(status_code=404, detail="Proactive action not found") + + # Mark as executed — actual execution happens device-side (AppleScript/Computer Use) + # This endpoint logs that it was executed and can store results + await pool.execute( + "UPDATE proactive_actions SET executed = true WHERE id = $1", + req.proactive_action_id, + ) + + return ProactiveExecuteResponse( + executed=True, + result=f"Action {req.action_type} marked as executed. Device-side execution handles the actual work.", + ) + + +@router.get("/preferences", response_model=ProactivePreferencesResponse) +async def get_preferences(user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + rows = await pool.fetch( + """SELECT + friction_type, + COUNT(*) as total, + COUNT(*) FILTER (WHERE user_choice = 'accepted') as accepted, + (SELECT chosen_action FROM proactive_actions pa2 + WHERE pa2.user_id = $1::uuid AND pa2.friction_type = pa.friction_type + AND pa2.user_choice = 'accepted' + GROUP BY chosen_action ORDER BY COUNT(*) DESC LIMIT 1) as top_action + FROM proactive_actions pa + WHERE user_id = $1::uuid AND user_choice IS NOT NULL + GROUP BY friction_type""", + user_id, + ) + + preferences = {} + for r in rows: + total = r["total"] + accepted = r["accepted"] + preferences[r["friction_type"]] = ProactivePreference( + preferred_action=r["top_action"], + total_choices=total, + acceptance_rate=accepted / total if total > 0 else 0.0, + ) + + return ProactivePreferencesResponse(preferences=preferences) diff --git a/app/routers/sessions.py b/app/routers/sessions.py new file mode 100644 index 0000000..f81e3d6 --- /dev/null +++ b/app/routers/sessions.py @@ -0,0 +1,371 @@ +import json +from datetime import datetime, timezone +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException + +from app.middleware.auth import get_current_user_id +from app.models import ( + OpenSessionOut, + ResumeCard, + SessionCheckpointRequest, + SessionEndRequest, + SessionJoinRequest, + SessionJoinResponse, + SessionOut, + SessionResumeResponse, + SessionStartRequest, + StepOut, +) +from app.services import llm, push +from app.services.db import get_pool + +router = APIRouter(prefix="/sessions", tags=["sessions"]) + +SESSION_COLUMNS = "id, user_id, task_id, platform, started_at, ended_at, status, checkpoint, created_at" + + +def _parse_session_row(row) -> SessionOut: + result = dict(row) + result["checkpoint"] = json.loads(result["checkpoint"]) if isinstance(result["checkpoint"], str) else result["checkpoint"] + return SessionOut(**result) + + +@router.post("/start", response_model=SessionOut, status_code=201) +async def start_session(req: SessionStartRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Check if an active session already exists for this account + active = await pool.fetchrow( + f"SELECT {SESSION_COLUMNS} FROM sessions WHERE user_id = $1::uuid AND status = 'active'", + user_id, + ) + if active: + # Idempotently return the existing active session and don't create a new one + return _parse_session_row(active) + + checkpoint = {} + if req.task_id: + task = await pool.fetchrow( + "SELECT id, title, description FROM tasks WHERE id = $1 AND user_id = $2::uuid", + req.task_id, + user_id, + ) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + await pool.execute( + "UPDATE tasks SET status = 'in_progress', updated_at = now() WHERE id = $1", + req.task_id, + ) + checkpoint["goal"] = task["title"] + + if req.work_app_bundle_ids: + checkpoint["work_app_bundle_ids"] = req.work_app_bundle_ids + + checkpoint["devices"] = [req.platform] + + row = await pool.fetchrow( + f"""INSERT INTO sessions (user_id, task_id, platform, checkpoint) + VALUES ($1::uuid, $2, $3, $4) + RETURNING {SESSION_COLUMNS}""", + user_id, + req.task_id, + req.platform, + json.dumps(checkpoint), + ) + + # Notify other devices about new session + if req.task_id: + task_row = await pool.fetchrow("SELECT title FROM tasks WHERE id = $1", req.task_id) + task_title = task_row["title"] if task_row else "Focus Session" + await push.send_push(user_id, "ipad" if req.platform == "mac" else "mac", { + "type": "session_started", + "session_id": str(row["id"]), + "task_title": task_title, + "platform": req.platform, + }) + # Start Live Activity on all registered devices + await push.send_activity_start(user_id, task_title, task_id=req.task_id) + + return _parse_session_row(row) + + +@router.get("/active", response_model=SessionOut) +async def get_active_session(user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + row = await pool.fetchrow( + f"SELECT {SESSION_COLUMNS} FROM sessions WHERE user_id = $1::uuid AND status = 'active'", + user_id, + ) + if not row: + raise HTTPException(status_code=404, detail="No active session") + return _parse_session_row(row) + + +@router.get("/open", response_model=list[OpenSessionOut]) +async def get_open_sessions(user_id: str = Depends(get_current_user_id)): + """All active + interrupted sessions. Used by VLM on startup for session-aware analysis.""" + pool = await get_pool() + rows = await pool.fetch( + f"SELECT {SESSION_COLUMNS} FROM sessions WHERE user_id = $1::uuid AND status IN ('active', 'interrupted') ORDER BY started_at DESC", + user_id, + ) + results = [] + for row in rows: + checkpoint = json.loads(row["checkpoint"]) if isinstance(row["checkpoint"], str) else (row["checkpoint"] or {}) + task_info = None + if row["task_id"]: + task_row = await pool.fetchrow( + "SELECT title, description FROM tasks WHERE id = $1", row["task_id"] + ) + if task_row: + task_info = {"title": task_row["title"], "goal": task_row["description"]} + results.append(OpenSessionOut( + id=row["id"], + task_id=row["task_id"], + task=task_info, + status=row["status"], + platform=row["platform"], + started_at=row["started_at"], + ended_at=row["ended_at"], + checkpoint=checkpoint, + )) + return results + + +@router.post("/{session_id}/join", response_model=SessionJoinResponse) +async def join_session( + session_id: UUID, + req: SessionJoinRequest, + user_id: str = Depends(get_current_user_id), +): + pool = await get_pool() + + session = await pool.fetchrow( + f"SELECT {SESSION_COLUMNS} FROM sessions WHERE id = $1 AND user_id = $2::uuid", + session_id, + user_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session["status"] != "active": + raise HTTPException(status_code=400, detail="Session is not active") + + # Update checkpoint with joining device + checkpoint = json.loads(session["checkpoint"]) if isinstance(session["checkpoint"], str) else (session["checkpoint"] or {}) + devices = checkpoint.get("devices", [session["platform"]]) + if req.platform not in devices: + devices.append(req.platform) + checkpoint["devices"] = devices + if req.work_app_bundle_ids: + checkpoint["work_app_bundle_ids"] = req.work_app_bundle_ids + + await pool.execute( + "UPDATE sessions SET checkpoint = $1 WHERE id = $2", + json.dumps(checkpoint), + session_id, + ) + + # Build response with full task + step context + task_info = None + current_step = None + all_steps = [] + suggested_app_scheme = None + suggested_app_name = None + + if session["task_id"]: + task_row = await pool.fetchrow( + "SELECT id, title, description FROM tasks WHERE id = $1", + session["task_id"], + ) + if task_row: + task_info = { + "id": str(task_row["id"]), + "title": task_row["title"], + "goal": task_row["description"], + } + # Suggest a work app based on task + try: + suggestion = await llm.suggest_work_apps(task_row["title"], task_row["description"]) + suggested_app_scheme = suggestion.get("suggested_app_scheme") + suggested_app_name = suggestion.get("suggested_app_name") + except Exception: + pass + + step_rows = await pool.fetch( + """SELECT id, task_id, sort_order, title, description, estimated_minutes, + status, checkpoint_note, last_checked_at, completed_at, created_at + FROM steps WHERE task_id = $1 ORDER BY sort_order""", + session["task_id"], + ) + all_steps = [StepOut(**dict(r)) for r in step_rows] + + # Find current in-progress step + for s in step_rows: + if s["status"] == "in_progress": + current_step = { + "id": str(s["id"]), + "title": s["title"], + "status": s["status"], + "checkpoint_note": s["checkpoint_note"], + } + break + + return SessionJoinResponse( + session_id=session["id"], + joined=True, + task=task_info, + current_step=current_step, + all_steps=all_steps, + suggested_app_scheme=suggested_app_scheme, + suggested_app_name=suggested_app_name, + ) + + +@router.post("/{session_id}/checkpoint", response_model=SessionOut) +async def save_checkpoint( + session_id: UUID, + req: SessionCheckpointRequest, + user_id: str = Depends(get_current_user_id), +): + pool = await get_pool() + + session = await pool.fetchrow( + "SELECT id, status FROM sessions WHERE id = $1 AND user_id = $2::uuid", + session_id, + user_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + if session["status"] != "active": + raise HTTPException(status_code=400, detail="Session is not active") + + checkpoint = req.model_dump(exclude_unset=True) + if "current_step_id" in checkpoint and checkpoint["current_step_id"]: + checkpoint["current_step_id"] = str(checkpoint["current_step_id"]) + + row = await pool.fetchrow( + f"""UPDATE sessions SET checkpoint = checkpoint || $1::jsonb + WHERE id = $2 + RETURNING {SESSION_COLUMNS}""", + json.dumps(checkpoint), + session_id, + ) + return _parse_session_row(row) + + +@router.post("/{session_id}/end", response_model=SessionOut) +async def end_session( + session_id: UUID, + req: SessionEndRequest, + user_id: str = Depends(get_current_user_id), +): + pool = await get_pool() + + row = await pool.fetchrow( + f"""UPDATE sessions SET status = $1, ended_at = now() + WHERE id = $2 AND user_id = $3::uuid AND status = 'active' + RETURNING {SESSION_COLUMNS}""", + req.status, + session_id, + user_id, + ) + if not row: + raise HTTPException(status_code=404, detail="Active session not found") + + # Notify other joined devices that session ended + checkpoint = json.loads(row["checkpoint"]) if isinstance(row["checkpoint"], str) else (row["checkpoint"] or {}) + devices = checkpoint.get("devices", []) + for device in devices: + if device != row["platform"]: + await push.send_push(user_id, device, { + "type": "session_ended", + "session_id": str(row["id"]), + "ended_by": row["platform"], + }) + + # End Live Activity on all devices + task_title = checkpoint.get("goal", "Session ended") + await push.send_activity_end(user_id, task_title=task_title, task_id=row["task_id"]) + + return _parse_session_row(row) + + +@router.get("/{session_id}/resume", response_model=SessionResumeResponse) +async def resume_session(session_id: UUID, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + session = await pool.fetchrow( + f"SELECT {SESSION_COLUMNS} FROM sessions WHERE id = $1 AND user_id = $2::uuid", + session_id, + user_id, + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + checkpoint = json.loads(session["checkpoint"]) if isinstance(session["checkpoint"], str) else (session["checkpoint"] or {}) + + task_info = None + current_step = None + completed_count = 0 + total_count = 0 + next_step_title = None + + if session["task_id"]: + task_row = await pool.fetchrow( + "SELECT id, title, description FROM tasks WHERE id = $1", + session["task_id"], + ) + if task_row: + task_info = {"title": task_row["title"], "overall_goal": task_row["description"]} + + step_rows = await pool.fetch( + "SELECT id, sort_order, title, status, checkpoint_note, last_checked_at FROM steps WHERE task_id = $1 ORDER BY sort_order", + session["task_id"], + ) + total_count = len(step_rows) + + found_current = False + for s in step_rows: + if s["status"] == "done": + completed_count += 1 + elif s["status"] == "in_progress" and not found_current: + current_step = { + "id": str(s["id"]), + "title": s["title"], + "checkpoint_note": s["checkpoint_note"], + "last_checked_at": s["last_checked_at"].isoformat() if s["last_checked_at"] else None, + } + found_current = True + elif found_current and next_step_title is None: + next_step_title = s["title"] + + now = datetime.now(timezone.utc) + last_activity = session["ended_at"] or session["started_at"] + minutes_away = int((now - last_activity).total_seconds() / 60) + + resume_card_data = await llm.generate_resume_card( + task_title=task_info["title"] if task_info else "Unknown task", + goal=task_info.get("overall_goal") if task_info else None, + current_step_title=current_step["title"] if current_step else None, + checkpoint_note=current_step["checkpoint_note"] if current_step else None, + completed_count=completed_count, + total_count=total_count, + next_step_title=next_step_title, + minutes_away=minutes_away, + attention_score=checkpoint.get("attention_score"), + ) + + return SessionResumeResponse( + session_id=session["id"], + task=task_info, + current_step=current_step, + progress={ + "completed": completed_count, + "total": total_count, + "attention_score": checkpoint.get("attention_score"), + "distraction_count": checkpoint.get("distraction_count", 0), + }, + resume_card=ResumeCard(**resume_card_data), + ) diff --git a/app/routers/steps.py b/app/routers/steps.py new file mode 100644 index 0000000..9739644 --- /dev/null +++ b/app/routers/steps.py @@ -0,0 +1,140 @@ +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from app.middleware.auth import get_current_user_id +from app.models import StepOut, StepUpdate +from app.services.db import get_pool + +router = APIRouter(prefix="", tags=["steps"]) + +STEP_COLUMNS = "s.id, s.task_id, s.sort_order, s.title, s.description, s.estimated_minutes, s.status, s.checkpoint_note, s.last_checked_at, s.completed_at, s.created_at" + + +class CreateStepRequest(BaseModel): + title: str + description: str | None = None + estimated_minutes: int | None = None + + +@router.post("/tasks/{task_id}/steps", response_model=StepOut) +async def create_step(task_id: UUID, req: CreateStepRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + task = await pool.fetchrow("SELECT id FROM tasks WHERE id = $1 AND user_id = $2::uuid", task_id, user_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + # Place new step at the end + max_order = await pool.fetchval("SELECT COALESCE(MAX(sort_order), 0) FROM steps WHERE task_id = $1", task_id) + + row = await pool.fetchrow( + """INSERT INTO steps (task_id, sort_order, title, description, estimated_minutes) + VALUES ($1, $2, $3, $4, $5) + RETURNING id, task_id, sort_order, title, description, estimated_minutes, + status, checkpoint_note, last_checked_at, completed_at, created_at""", + task_id, + max_order + 1, + req.title, + req.description, + req.estimated_minutes, + ) + return StepOut(**dict(row)) + + +@router.get("/tasks/{task_id}/steps", response_model=list[StepOut]) +async def list_steps(task_id: UUID, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Verify task belongs to user + task = await pool.fetchrow("SELECT id FROM tasks WHERE id = $1 AND user_id = $2::uuid", task_id, user_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + rows = await pool.fetch( + f"SELECT {STEP_COLUMNS} FROM steps s WHERE s.task_id = $1 ORDER BY s.sort_order", + task_id, + ) + return [StepOut(**dict(r)) for r in rows] + + +@router.patch("/steps/{step_id}", response_model=StepOut) +async def update_step(step_id: UUID, req: StepUpdate, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + # Verify step belongs to user's task + step = await pool.fetchrow( + """SELECT s.id FROM steps s + JOIN tasks t ON s.task_id = t.id + WHERE s.id = $1 AND t.user_id = $2::uuid""", + step_id, + user_id, + ) + if not step: + raise HTTPException(status_code=404, detail="Step not found") + + fields = [] + values = [] + idx = 2 # $1 = step_id + + update_data = req.model_dump(exclude_unset=True) + for key, val in update_data.items(): + fields.append(f"{key} = ${idx}") + values.append(val) + idx += 1 + + if not fields: + raise HTTPException(status_code=400, detail="No fields to update") + + set_clause = ", ".join(fields) + row = await pool.fetchrow( + f"""UPDATE steps SET {set_clause} + WHERE id = $1 + RETURNING id, task_id, sort_order, title, description, estimated_minutes, + status, checkpoint_note, last_checked_at, completed_at, created_at""", + step_id, + *values, + ) + return StepOut(**dict(row)) + + +@router.post("/steps/{step_id}/complete", response_model=StepOut) +async def complete_step(step_id: UUID, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + step = await pool.fetchrow( + """SELECT s.id FROM steps s + JOIN tasks t ON s.task_id = t.id + WHERE s.id = $1 AND t.user_id = $2::uuid""", + step_id, + user_id, + ) + if not step: + raise HTTPException(status_code=404, detail="Step not found") + + row = await pool.fetchrow( + """UPDATE steps SET status = 'done', completed_at = now() + WHERE id = $1 + RETURNING id, task_id, sort_order, title, description, estimated_minutes, + status, checkpoint_note, last_checked_at, completed_at, created_at""", + step_id, + ) + return StepOut(**dict(row)) + + +@router.delete("/steps/{step_id}", status_code=204) +async def delete_step(step_id: UUID, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + step = await pool.fetchrow( + """SELECT s.id FROM steps s + JOIN tasks t ON s.task_id = t.id + WHERE s.id = $1 AND t.user_id = $2::uuid""", + step_id, + user_id, + ) + if not step: + raise HTTPException(status_code=404, detail="Step not found") + + await pool.execute("DELETE FROM steps WHERE id = $1", step_id) diff --git a/app/routers/tasks.py b/app/routers/tasks.py new file mode 100644 index 0000000..21d1586 --- /dev/null +++ b/app/routers/tasks.py @@ -0,0 +1,298 @@ +from datetime import datetime as dt +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query + +from app.middleware.auth import get_current_user_id +from app.models import ( + BrainDumpRequest, + BrainDumpResponse, + PlanRequest, + PlanResponse, + StepOut, + TaskCreate, + TaskOut, + TaskUpdate, +) +from app.services import llm, push +from app.services.db import get_pool + +router = APIRouter(prefix="/tasks", tags=["tasks"]) + + +def _row_to_task(row) -> TaskOut: + return TaskOut( + id=row["id"], + user_id=row["user_id"], + title=row["title"], + description=row["description"], + priority=row["priority"], + status=row["status"], + deadline=row["deadline"], + estimated_minutes=row["estimated_minutes"], + source=row["source"], + tags=row["tags"] or [], + plan_type=row["plan_type"], + brain_dump_raw=row["brain_dump_raw"], + created_at=row["created_at"], + updated_at=row["updated_at"], + ) + + +TASK_COLUMNS = "id, user_id, title, description, priority, status, deadline, estimated_minutes, source, tags, plan_type, brain_dump_raw, created_at, updated_at" + + +@router.get("", response_model=list[TaskOut]) +async def list_tasks( + status: str | None = None, + priority: int | None = None, + sort_by: str = Query("priority", pattern="^(priority|deadline|created_at)$"), + user_id: str = Depends(get_current_user_id), +): + pool = await get_pool() + + query = f"SELECT {TASK_COLUMNS} FROM tasks WHERE user_id = $1::uuid AND status != 'deferred'" + params: list = [user_id] + idx = 2 + + if status: + query += f" AND status = ${idx}" + params.append(status) + idx += 1 + if priority is not None: + query += f" AND priority = ${idx}" + params.append(priority) + idx += 1 + + sort_dir = "DESC" if sort_by == "priority" else "ASC" + query += f" ORDER BY {sort_by} {sort_dir}" + + rows = await pool.fetch(query, *params) + return [_row_to_task(r) for r in rows] + + +@router.get("/upcoming", response_model=list[TaskOut]) +async def upcoming_tasks(user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + rows = await pool.fetch( + f"""SELECT {TASK_COLUMNS} FROM tasks + WHERE user_id = $1::uuid AND deadline IS NOT NULL + AND deadline <= now() + interval '48 hours' + AND status NOT IN ('done', 'deferred') + ORDER BY deadline ASC""", + user_id, + ) + return [_row_to_task(r) for r in rows] + + +@router.post("", response_model=TaskOut, status_code=201) +async def create_task(req: TaskCreate, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + row = await pool.fetchrow( + f"""INSERT INTO tasks (user_id, title, description, priority, deadline, estimated_minutes, tags) + VALUES ($1::uuid, $2, $3, $4, $5, $6, $7) + RETURNING {TASK_COLUMNS}""", + user_id, + req.title, + req.description, + req.priority, + req.deadline, + req.estimated_minutes, + req.tags, + ) + await push.send_task_added(user_id, row["title"], step_count=0) + return _row_to_task(row) + + +@router.post("/brain-dump", response_model=BrainDumpResponse) +async def brain_dump(req: BrainDumpRequest, user_id: str = Depends(get_current_user_id)): + result = await llm.parse_brain_dump(req.raw_text, req.timezone) + + pool = await get_pool() + parsed_tasks = [] + for t in result.get("parsed_tasks", []): + # Parse deadline string from LLM into datetime (asyncpg needs datetime, not str) + deadline = t.get("deadline") + if isinstance(deadline, str) and deadline and deadline != "null": + try: + deadline = dt.fromisoformat(deadline) + except ValueError: + deadline = None + else: + deadline = None + + est_minutes = t.get("estimated_minutes") + if isinstance(est_minutes, str): + try: + est_minutes = int(est_minutes) + except ValueError: + est_minutes = None + + subtasks_raw = t.get("subtasks") or [] + has_subtasks = len(subtasks_raw) > 0 + + row = await pool.fetchrow( + f"""INSERT INTO tasks (user_id, title, description, priority, deadline, + estimated_minutes, source, tags, brain_dump_raw, plan_type) + VALUES ($1::uuid, $2, $3, $4, $5::timestamptz, $6, $7, $8, $9, $10) + RETURNING {TASK_COLUMNS}""", + user_id, + t["title"], + t.get("description"), + int(t.get("priority", 0)), + deadline, + est_minutes, + req.source, + t.get("tags", []), + req.raw_text, + "brain_dump" if has_subtasks else None, + ) + task_id = row["id"] + + all_subtasks = [] + sort_order = 1 + for sub in subtasks_raw: + sub_est = sub.get("estimated_minutes") + if isinstance(sub_est, str): + try: + sub_est = int(sub_est) + except ValueError: + sub_est = None + + sub_deadline = sub.get("deadline") + if isinstance(sub_deadline, str) and sub_deadline and sub_deadline != "null": + try: + sub_deadline = dt.fromisoformat(sub_deadline) + except ValueError: + sub_deadline = None + else: + sub_deadline = None + + is_suggested = bool(sub.get("suggested", False)) + + # Only save non-suggested steps now; suggested ones are opt-in from the client + if not is_suggested: + await pool.fetchrow( + """INSERT INTO steps (task_id, sort_order, title, description, estimated_minutes) + VALUES ($1, $2, $3, $4, $5) + RETURNING id""", + task_id, + sort_order, + sub["title"], + sub.get("description"), + sub_est, + ) + sort_order += 1 + + all_subtasks.append({ + "title": sub["title"], + "description": sub.get("description"), + "deadline": sub_deadline.isoformat() if sub_deadline else None, + "estimated_minutes": sub_est, + "suggested": is_suggested, + }) + + saved_count = sum(1 for s in all_subtasks if not s["suggested"]) + await push.send_task_added(user_id, row["title"], step_count=saved_count) + + parsed_tasks.append({ + "task_id": str(row["id"]), + "title": row["title"], + "description": row["description"], + "priority": row["priority"], + "deadline": row["deadline"], + "estimated_minutes": row["estimated_minutes"], + "tags": row["tags"] or [], + "subtasks": all_subtasks, + }) + + return BrainDumpResponse( + parsed_tasks=parsed_tasks, + unparseable_fragments=result.get("unparseable_fragments", []), + ask_for_plans=True, + ) + + +@router.post("/{task_id}/plan", response_model=PlanResponse) +async def plan_task(task_id: UUID, req: PlanRequest, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + task = await pool.fetchrow( + "SELECT id, title, description, estimated_minutes FROM tasks WHERE id = $1 AND user_id = $2::uuid", + task_id, + user_id, + ) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + steps_data = await llm.generate_step_plan(task["title"], task["description"], task["estimated_minutes"]) + + steps = [] + for s in steps_data: + row = await pool.fetchrow( + """INSERT INTO steps (task_id, sort_order, title, description, estimated_minutes) + VALUES ($1, $2, $3, $4, $5) + RETURNING id, task_id, sort_order, title, description, estimated_minutes, + status, checkpoint_note, last_checked_at, completed_at, created_at""", + task_id, + s["sort_order"], + s["title"], + s.get("description"), + s.get("estimated_minutes"), + ) + steps.append(StepOut(**dict(row))) + + await pool.execute( + "UPDATE tasks SET plan_type = $1, status = 'ready', updated_at = now() WHERE id = $2", + req.plan_type, + task_id, + ) + await push.send_task_added(user_id, task["title"], step_count=len(steps)) + + return PlanResponse(task_id=task_id, plan_type=req.plan_type, steps=steps) + + +@router.patch("/{task_id}", response_model=TaskOut) +async def update_task(task_id: UUID, req: TaskUpdate, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + + fields = [] + values = [] + idx = 3 # $1 = task_id, $2 = user_id + + update_data = req.model_dump(exclude_unset=True) + for key, val in update_data.items(): + fields.append(f"{key} = ${idx}") + values.append(val) + idx += 1 + + if not fields: + raise HTTPException(status_code=400, detail="No fields to update") + + fields.append("updated_at = now()") + set_clause = ", ".join(fields) + + row = await pool.fetchrow( + f"""UPDATE tasks SET {set_clause} + WHERE id = $1 AND user_id = $2::uuid + RETURNING {TASK_COLUMNS}""", + task_id, + user_id, + *values, + ) + if not row: + raise HTTPException(status_code=404, detail="Task not found") + + return _row_to_task(row) + + +@router.delete("/{task_id}", status_code=204) +async def delete_task(task_id: UUID, user_id: str = Depends(get_current_user_id)): + pool = await get_pool() + result = await pool.execute( + "UPDATE tasks SET status = 'deferred', updated_at = now() WHERE id = $1 AND user_id = $2::uuid", + task_id, + user_id, + ) + if result == "UPDATE 0": + raise HTTPException(status_code=404, detail="Task not found") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/db.py b/app/services/db.py new file mode 100644 index 0000000..9079b41 --- /dev/null +++ b/app/services/db.py @@ -0,0 +1,18 @@ +import asyncpg +from app.config import settings + +pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + global pool + if pool is None: + pool = await asyncpg.create_pool(settings.DATABASE_URL, min_size=2, max_size=10) + return pool + + +async def close_pool(): + global pool + if pool: + await pool.close() + pool = None diff --git a/app/services/hex_service.py b/app/services/hex_service.py new file mode 100644 index 0000000..a4d702d --- /dev/null +++ b/app/services/hex_service.py @@ -0,0 +1,49 @@ +import asyncio + +import httpx + +from app.config import settings + +HEX_API_BASE = "https://app.hex.tech/api/v1" +HEADERS = { + "Authorization": f"Bearer {settings.HEX_API_TOKEN}", + "Content-Type": "application/json", +} + +NOTEBOOKS = { + "distraction_patterns": settings.HEX_NB_DISTRACTIONS, + "focus_trends": settings.HEX_NB_FOCUS_TRENDS, + "weekly_report": settings.HEX_NB_WEEKLY_REPORT, +} + + +async def run_notebook(notebook_key: str, user_id: str) -> dict: + project_id = NOTEBOOKS.get(notebook_key) + if not project_id: + raise ValueError(f"Unknown notebook: {notebook_key}") + + async with httpx.AsyncClient(timeout=60) as http: + # Trigger run — POST /projects/{projectId}/runs + resp = await http.post( + f"{HEX_API_BASE}/projects/{project_id}/runs", + headers=HEADERS, + json={"inputParams": {"user_id": user_id}}, + ) + resp.raise_for_status() + run_id = resp.json()["runId"] + + # Poll for completion — GET /projects/{projectId}/runs/{runId} + for _ in range(30): + status_resp = await http.get( + f"{HEX_API_BASE}/projects/{project_id}/runs/{run_id}", + headers=HEADERS, + ) + status_resp.raise_for_status() + data = status_resp.json() + if data["status"] == "COMPLETED": + return {"status": "COMPLETED", "elapsed": data.get("elapsedTime")} + if data["status"] in ("ERRORED", "KILLED", "UNABLE_TO_ALLOCATE_KERNEL"): + raise RuntimeError(f"Hex run failed: {data['status']}") + await asyncio.sleep(2) + + raise TimeoutError("Hex notebook run timed out") diff --git a/app/services/llm.py b/app/services/llm.py new file mode 100644 index 0000000..f396b10 --- /dev/null +++ b/app/services/llm.py @@ -0,0 +1,351 @@ +import base64 +import json +import logging + +from app.config import settings + +logger = logging.getLogger(__name__) + +# ── Provider setup: prefer Anthropic, fall back to Gemini ── + +_provider: str | None = None + +if settings.ANTHROPIC_API_KEY: + import anthropic + _anthropic_client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY) + _provider = "anthropic" + _model = "claude-sonnet-4-20250514" + logger.info("LLM provider: Anthropic (Claude)") + +elif settings.GEMINI_API_KEY: + from google import genai + from google.genai import types as genai_types + _gemini_client = genai.Client(api_key=settings.GEMINI_API_KEY) + _provider = "gemini" + _model = "gemini-3.1-pro-preview" + logger.info("LLM provider: Google (Gemini)") + + +def _parse_json(text: str) -> dict | list: + import re + text = text.strip() + # Strip markdown code fences + if text.startswith("```"): + text = text.split("\n", 1)[1] + text = text.rsplit("```", 1)[0] + # Find the first { or [ and last } or ] + start = -1 + for i, c in enumerate(text): + if c in "{[": + start = i + break + if start == -1: + raise ValueError(f"No JSON found in LLM response: {text[:200]}") + end = max(text.rfind("}"), text.rfind("]")) + if end == -1: + raise ValueError(f"No closing bracket in LLM response: {text[:200]}") + json_str = text[start:end + 1] + # Strip // comments (Gemini sometimes adds these) + json_str = re.sub(r'//[^\n]*', '', json_str) + # Strip trailing commas before } or ] + json_str = re.sub(r',\s*([}\]])', r'\1', json_str) + return json.loads(json_str) + + +def _check_provider(): + if not _provider: + raise RuntimeError("No LLM API key configured. Set ANTHROPIC_API_KEY or GEMINI_API_KEY in .env") + + +async def _text_completion(system: str, user_content: str, max_tokens: int = 1024) -> str: + _check_provider() + if _provider == "anthropic": + response = _anthropic_client.messages.create( + model=_model, + max_tokens=max_tokens, + messages=[{"role": "user", "content": f"{system}\n\n{user_content}"}], + ) + return response.content[0].text + else: + response = _gemini_client.models.generate_content( + model=_model, + config={"system_instruction": system}, + contents=user_content, + ) + return response.text + + +async def _vision_completion(system: str, image_bytes: bytes, user_text: str, max_tokens: int = 512) -> str: + _check_provider() + if _provider == "anthropic": + image_b64 = base64.b64encode(image_bytes).decode() + response = _anthropic_client.messages.create( + model=_model, + max_tokens=max_tokens, + messages=[{ + "role": "user", + "content": [ + {"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_b64}}, + {"type": "text", "text": f"{system}\n\n{user_text}"}, + ], + }], + ) + return response.content[0].text + else: + response = _gemini_client.models.generate_content( + model=_model, + config={"system_instruction": system}, + contents=[ + genai_types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg"), + user_text, + ], + ) + return response.text + + +# ── Public API (unchanged signatures) ── + + +async def parse_brain_dump(raw_text: str, timezone: str) -> dict: + from datetime import datetime + + system = f"""You are a task parser and ADHD-friendly planner. +Extract structured tasks from this brain dump, then break each task into +concrete, actionable steps someone with ADHD can start immediately. + +Today's date: {datetime.now().strftime("%Y-%m-%d")} +User's timezone: {timezone} + +Task extraction rules: +- Be generous with deadlines — infer from context. +- If no deadline is obvious, set priority to 0 (unset). +- Unrelated items stay as separate top-level tasks. + +Step rules (applied to every task's subtasks array): +- Each step should be 5-15 minutes, specific enough to start without decision paralysis. +- First step should be the EASIEST to reduce activation energy. +- Steps explicitly mentioned in the brain dump have "suggested": false. +- Then ADD 1-3 additional steps the user likely needs but didn't mention, with "suggested": true. + Examples: "gather materials", "review before sending", "set a reminder", "test it works". +- Keep step titles short and action-oriented. +- Every task should have at least 2 steps total. + +Respond ONLY with JSON, no other text. +Example: +{{ + "parsed_tasks": [{{ + "title": "concise task title", + "description": "any extra detail from the dump", + "deadline": "ISO 8601 or null", + "priority": "0-4 integer (0=unset, 1=low, 2=med, 3=high, 4=urgent)", + "estimated_minutes": "total for all steps or null", + "tags": ["work", "personal", "health", "errands", etc.], + "subtasks": [ + {{"title": "step from the dump", "description": null, "deadline": null, "estimated_minutes": 10, "suggested": false}}, + {{"title": "AI-suggested next step", "description": null, "deadline": null, "estimated_minutes": 5, "suggested": true}} + ] + }}], + "unparseable_fragments": ["text that couldn't be parsed into tasks"] +}}""" + + text = await _text_completion(system, f"Brain dump:\n{raw_text}", max_tokens=2048) + return _parse_json(text) + + + + +async def generate_step_plan(task_title: str, task_description: str | None, estimated_minutes: int | None) -> list: + est = f"{estimated_minutes} minutes" if estimated_minutes else "unknown" + system = f"""You are an ADHD-friendly task planner. +Break this task into concrete steps of 5-15 minutes each. +Each step should be specific enough that someone with ADHD +can start immediately without decision paralysis. + +Rules: +- First step should be the EASIEST (reduce activation energy) +- Steps should be independently completable +- Include time estimates per step +- Total estimated time should roughly match the task estimate +- No step longer than 15 minutes + +Respond ONLY with JSON array: +[{{ + "sort_order": 1, + "title": "specific action description", + "description": "additional detail if needed", + "estimated_minutes": number +}}]""" + + text = await _text_completion(system, f"Task: {task_title}\nDescription: {task_description or 'N/A'}\nEstimated total: {est}") + return _parse_json(text) + + +async def analyze_screenshot( + screenshot_bytes: bytes, + window_title: str, + task_context: dict, + recent_summaries: list[str] | None = None, +) -> dict: + """Legacy server-side VLM analysis. Upgraded with friction detection prompt.""" + steps_text = "" + for s in task_context.get("steps", []): + cp = f' checkpoint_note="{s["checkpoint_note"]}"' if s.get("checkpoint_note") else "" + steps_text += f' - [{s["status"]}] {s["sort_order"]}. {s["title"]} (id={s["id"]}){cp}\n' + + history_text = "" + if recent_summaries: + for i, summary in enumerate(recent_summaries): + history_text += f" - [{(len(recent_summaries) - i) * 5}s ago] {summary}\n" + + system = f"""You are a proactive focus assistant analyzing a user's screen. +The user's current task and step progress: + Task: {task_context.get("task_title", "")} + Goal: {task_context.get("task_goal", "")} + Steps: +{steps_text} Window title reported by OS: {window_title} +{"Recent screen history:" + chr(10) + history_text if history_text else ""} +Analyze the current screenshot. Determine: + +1. TASK STATUS: Is the user working on their task? Which step? Any steps completed? +2. CHECKPOINT: What specific within-step progress have they made? +3. FRICTION DETECTION: Is the user stuck in any of these patterns? + - REPETITIVE_LOOP: Switching between same 2-3 windows (copying data manually) + - STALLED: Same screen region with minimal changes for extended time + - TEDIOUS_MANUAL: Doing automatable work (filling forms, organizing files, transcribing) + - CONTEXT_OVERHEAD: Many windows open, visibly searching across them + - TASK_RESUMPTION: User just returned to a task they were working on earlier +4. INTENT: If viewing informational content, is the user SKIMMING, ENGAGED, or UNCLEAR? +5. PROPOSED ACTION: If friction detected, suggest a specific action the AI could take. + +Respond ONLY with JSON: +{{ + "on_task": boolean, + "current_step_id": "step UUID or null", + "checkpoint_note_update": "within-step progress or null", + "steps_completed": ["UUIDs"], + "friction": {{ + "type": "repetitive_loop | stalled | tedious_manual | context_overhead | task_resumption | none", + "confidence": 0.0-1.0, + "description": "what the user is struggling with or null", + "proposed_actions": [ + {{"label": "action description", "action_type": "auto_extract | brain_dump", "details": "specifics"}} + ], + "source_context": "what info to extract from or null", + "target_context": "where to put it or null" + }}, + "intent": "skimming | engaged | unclear | null", + "distraction_type": "app_switch | browsing | idle | null", + "app_name": "primary visible application", + "confidence": 0.0-1.0, + "gentle_nudge": "nudge if distracted and no friction action applies, null otherwise", + "vlm_summary": "1-sentence factual description of screen" +}}""" + + text = await _vision_completion(system, screenshot_bytes, "Analyze this screenshot.") + return _parse_json(text) + + +async def generate_resume_card( + task_title: str, + goal: str | None, + current_step_title: str | None, + checkpoint_note: str | None, + completed_count: int, + total_count: int, + next_step_title: str | None, + minutes_away: int, + attention_score: int | None, +) -> dict: + system = """Generate a brief, encouraging context-resume card for +someone with ADHD returning to their task. +Be warm, specific, and action-oriented. No shame. No generic platitudes. +Use the checkpoint_note to give hyper-specific context about where they left off. + +Respond ONLY with JSON: +{ + "welcome_back": "short friendly greeting (max 8 words)", + "you_were_doing": "1 sentence referencing checkpoint_note specifically", + "next_step": "concrete next action with time estimate", + "motivation": "1 sentence encouragement (ADHD-friendly, no shame)" +}""" + + user_content = f"""Inputs: + - Task: {task_title} + - Overall goal: {goal or "N/A"} + - Current step: {current_step_title or "N/A"} + - Current step checkpoint_note: {checkpoint_note or "N/A"} + - Steps completed: {completed_count} of {total_count} + - Next step after current: {next_step_title or "N/A"} + - Time away: {minutes_away} minutes + - Attention score before leaving: {attention_score or "N/A"}""" + + text = await _text_completion(system, user_content, max_tokens=256) + return _parse_json(text) + + +async def generate_app_activity_nudge( + app_name: str, + duration_seconds: int, + task_title: str, + current_step_title: str | None, + checkpoint_note: str | None, +) -> str: + minutes = duration_seconds // 60 + duration_text = f"{minutes} minute{'s' if minutes != 1 else ''}" if minutes > 0 else f"{duration_seconds} seconds" + + system = """Generate a single gentle, non-judgmental nudge for someone with ADHD +who drifted to a non-work app during a focus session. +Reference their specific progress to make returning easier. +No shame. Keep it under 30 words. +Respond with ONLY the nudge text, no JSON, no quotes.""" + + user_content = f"""Context: + - Distraction app: {app_name} + - Time spent: {duration_text} + - Current task: {task_title} + - Current step: {current_step_title or "N/A"} + - Progress so far: {checkpoint_note or "N/A"}""" + + return (await _text_completion(system, user_content, max_tokens=100)).strip() + + +async def suggest_work_apps(task_title: str, task_description: str | None) -> dict: + system = """Given this task, suggest which Apple apps the user likely needs. +Return the most likely single app as the primary suggestion. + +Respond ONLY with JSON: +{ + "suggested_app_scheme": "URL scheme (e.g. mobilenotes://, x-apple-pages://, com.google.docs://)", + "suggested_app_name": "human-readable name (e.g. Notes, Pages, Google Docs)" +}""" + + text = await _text_completion(system, f"Task: {task_title}\nDescription: {task_description or 'N/A'}", max_tokens=100) + return _parse_json(text) + + +async def prioritize_tasks(tasks_json: list, timezone: str) -> list: + from datetime import datetime + + system = """You are an ADHD-friendly task prioritizer. +Consider: deadlines, estimated effort, task dependencies, +and the user's energy patterns. + +Rules: +- Hard deadlines always take top priority +- Front-load quick wins (<15min) for momentum +- Group errands together +- Deprioritize tasks with no deadline and low urgency + +Respond ONLY with JSON array: +[{ + "task_id": "uuid", + "recommended_priority": 1-4, + "reason": "1-sentence explanation" +}]""" + + user_content = f"""Input: {json.dumps(tasks_json)} +Current time: {datetime.now().isoformat()} +User's timezone: {timezone}""" + + text = await _text_completion(system, user_content, max_tokens=512) + return _parse_json(text) diff --git a/app/services/push.py b/app/services/push.py new file mode 100644 index 0000000..f35aaa3 --- /dev/null +++ b/app/services/push.py @@ -0,0 +1,283 @@ +"""APNs push notification service. + +Uses HTTP/2 APNs provider API with .p8 auth key (token-based auth). +Falls back to logging if APNS_KEY_ID / APNS_TEAM_ID / APNS_P8_PATH are not configured. + +Required .env vars: + APNS_KEY_ID — 10-char key ID from Apple Developer portal + APNS_TEAM_ID — 10-char team ID from Apple Developer portal + APNS_P8_PATH — absolute path to the AuthKey_XXXXXXXXXX.p8 file + APNS_SANDBOX — True for development/TestFlight, False (default) for production +""" + +import base64 +import json +import logging +import time + +import httpx +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import load_pem_private_key + +from app.config import settings +from app.services.db import get_pool + +logger = logging.getLogger(__name__) + +# Cache the provider JWT — valid for 60 min, refresh 5 min early +_apns_token: str | None = None +_apns_token_exp: float = 0.0 +_private_key = None + + +def _b64(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode() + + +def _apns_configured() -> bool: + return bool(settings.APNS_KEY_ID and settings.APNS_TEAM_ID and settings.APNS_P8_PATH) + + +def _make_apns_jwt() -> str: + global _apns_token, _apns_token_exp, _private_key + + now = time.time() + if _apns_token and now < _apns_token_exp: + return _apns_token + + if _private_key is None: + with open(settings.APNS_P8_PATH, "rb") as f: + _private_key = load_pem_private_key(f.read(), password=None) + + header = _b64(json.dumps({"alg": "ES256", "kid": settings.APNS_KEY_ID}).encode()) + payload = _b64(json.dumps({"iss": settings.APNS_TEAM_ID, "iat": int(now)}).encode()) + msg = f"{header}.{payload}".encode() + sig = _b64(_private_key.sign(msg, ECDSA(hashes.SHA256()))) + token = f"{header}.{payload}.{sig}" + + _apns_token = token + _apns_token_exp = now + 3300 # 55-minute lifetime (APNs tokens last 60 min) + return token + + +async def _send_apns(device_token: str, aps_payload: dict, push_type: str = "alert") -> bool: + host = "api.sandbox.push.apple.com" if settings.APNS_SANDBOX else "api.push.apple.com" + url = f"https://{host}/3/device/{device_token}" + + topic = settings.APPLE_BUNDLE_ID + if push_type == "liveactivity": + topic += ".push-type.liveactivity" + + headers = { + "authorization": f"bearer {_make_apns_jwt()}", + "apns-topic": topic, + "apns-push-type": push_type, + "apns-priority": "10", + } + + try: + async with httpx.AsyncClient(http2=True) as client: + resp = await client.post(url, json=aps_payload, headers=headers, timeout=10.0) + print(f"APNs response: {resp.status_code} http_version={resp.http_version} token=…{device_token[-8:]} body={resp.text}") + print(f"APNs request: url={url} payload={json.dumps(aps_payload)}") + if resp.status_code == 200: + return True + if resp.status_code == 410: + # Token is dead — device uninstalled or revoked push. Remove from DB. + logger.warning(f"APNs 410 Unregistered for token …{device_token[-8:]}, removing from DB") + await _remove_device_token(device_token) + return False + logger.error(f"APNs {resp.status_code} for token …{device_token[-8:]}: {resp.text}") + return False + except Exception as exc: + logger.error(f"APNs request failed: {exc}") + return False + + +async def _remove_device_token(device_token: str): + """Remove a dead APNs token from all users.""" + pool = await get_pool() + await pool.execute( + """UPDATE users SET device_tokens = ( + SELECT COALESCE(jsonb_agg(t), '[]'::jsonb) + FROM jsonb_array_elements(device_tokens) t + WHERE t->>'token' != $1 + ) WHERE device_tokens @> $2::jsonb""", + device_token, + json.dumps([{"token": device_token}]), + ) + + +# ── Public API ──────────────────────────────────────────────────────────────── + + +async def get_device_tokens(user_id: str, platform: str | None = None) -> list[dict]: + pool = await get_pool() + row = await pool.fetchrow( + "SELECT device_tokens FROM users WHERE id = $1::uuid", user_id + ) + if not row or not row["device_tokens"]: + return [] + tokens = ( + json.loads(row["device_tokens"]) + if isinstance(row["device_tokens"], str) + else row["device_tokens"] + ) + if platform: + tokens = [t for t in tokens if t.get("platform", "").startswith(platform)] + return tokens + + +async def register_device_token(user_id: str, platform: str, token: str): + pool = await get_pool() + await pool.execute( + """UPDATE users SET device_tokens = ( + SELECT COALESCE(jsonb_agg(t), '[]'::jsonb) + FROM jsonb_array_elements(device_tokens) t + WHERE t->>'platform' != $2 + ) || $3::jsonb + WHERE id = $1::uuid""", + user_id, + platform, + json.dumps([{"platform": platform, "token": token}]), + ) + + +async def send_push(user_id: str, platform: str, aps_payload: dict): + """Send an APNs push to all registered tokens for a user/platform.""" + tokens = await get_device_tokens(user_id, platform) + print(f"send_push → user={user_id} platform={platform} tokens={tokens} configured={_apns_configured()}") + if not tokens: + return + + if not _apns_configured(): + for t in tokens: + logger.info( + f"[APNs STUB] platform={t['platform']} token=…{t['token'][-8:]} payload={aps_payload}" + ) + return + + for t in tokens: + await _send_apns(t["token"], aps_payload) + + +async def send_task_added(user_id: str, task_title: str, step_count: int = 0): + """Notify the user that a new task was added.""" + subtitle = f"{step_count} subtask{'s' if step_count != 1 else ''}" + payload = { + "aps": { + "alert": {"title": task_title, "subtitle": subtitle}, + "sound": "default", + } + } + for platform in ["iphone", "ipad"]: + await send_push(user_id, platform, payload) + + +async def send_activity_update(user_id: str, task_title: str, task_id=None, started_at: int | None = None): + """Send ActivityKit push to update Live Activity on all devices with current step progress.""" + tokens = await get_device_tokens(user_id, "liveactivity_update_") + if not tokens: + return + + step_progress = await _get_step_progress(task_id) + now_ts = started_at or int(time.time()) + content_state = _build_content_state(task_title, now_ts, step_progress) + + if not _apns_configured(): + for t in tokens: + logger.info(f"[ActivityKit STUB] token=…{t['token'][-8:]} state={content_state}") + return + + payload = {"aps": {"timestamp": int(time.time()), "content-state": content_state, "event": "update"}} + for t in tokens: + await _send_apns(t["token"], payload, push_type="liveactivity") + + +async def send_activity_end(user_id: str, task_title: str = "Session ended", task_id=None): + """Send ActivityKit push-to-end using per-activity update tokens.""" + tokens = await get_device_tokens(user_id, "liveactivity_update_") + if not tokens: + return + + now_ts = int(time.time()) + step_progress = await _get_step_progress(task_id) + payload = { + "aps": { + "timestamp": now_ts, + "event": "end", + "content-state": _build_content_state(task_title, now_ts, step_progress), + "dismissal-date": now_ts, + } + } + + if not _apns_configured(): + for t in tokens: + logger.info(f"[ActivityKit END STUB] token=...{t['token'][-8:]}") + return + + for t in tokens: + await _send_apns(t["token"], payload, push_type="liveactivity") + + +async def _get_step_progress(task_id) -> dict: + """Fetch step progress for a task: completed count, total count, current step title.""" + if not task_id: + return {"stepsCompleted": 0, "stepsTotal": 0, "currentStepTitle": None, "lastCompletedStepTitle": None} + pool = await get_pool() + rows = await pool.fetch( + "SELECT title, status FROM steps WHERE task_id = $1 ORDER BY sort_order", task_id + ) + total = len(rows) + completed = sum(1 for r in rows if r["status"] == "done") + current = next((r["title"] for r in rows if r["status"] in ("in_progress", "pending")), None) + last_completed = next((r["title"] for r in reversed(rows) if r["status"] == "done"), None) + return {"stepsCompleted": completed, "stepsTotal": total, "currentStepTitle": current, "lastCompletedStepTitle": last_completed} + + +def _build_content_state(task_title: str, started_at: int, step_progress: dict) -> dict: + state = { + "taskTitle": task_title, + "startedAt": started_at, + "stepsCompleted": step_progress["stepsCompleted"], + "stepsTotal": step_progress["stepsTotal"], + } + if step_progress["currentStepTitle"]: + state["currentStepTitle"] = step_progress["currentStepTitle"] + if step_progress.get("lastCompletedStepTitle"): + state["lastCompletedStepTitle"] = step_progress["lastCompletedStepTitle"] + return state + + +async def send_activity_start(user_id: str, task_title: str, task_id=None): + """Send ActivityKit push-to-start to all liveactivity tokens.""" + tokens = await get_device_tokens(user_id, "liveactivity") + if not tokens: + return + + now_ts = int(time.time()) + step_progress = await _get_step_progress(task_id) + payload = { + "aps": { + "timestamp": now_ts, + "event": "start", + "content-state": _build_content_state(task_title, now_ts, step_progress), + "attributes-type": "FocusSessionAttributes", + "attributes": { + "sessionType": "Focus" + }, + "alert": { + "title": "Focus Session Started", + "body": task_title + } + } + } + + if not _apns_configured(): + for t in tokens: + logger.info(f"[ActivityKit START STUB] token=...{t['token'][-8:]} start payload={payload}") + return + + for t in tokens: + await _send_apns(t["token"], payload, push_type="liveactivity") diff --git a/app/types.py b/app/types.py new file mode 100644 index 0000000..39dba80 --- /dev/null +++ b/app/types.py @@ -0,0 +1,69 @@ +from enum import StrEnum + + +class TaskStatus(StrEnum): + PENDING = "pending" + PLANNING = "planning" + READY = "ready" + IN_PROGRESS = "in_progress" + DONE = "done" + DEFERRED = "deferred" + + +class StepStatus(StrEnum): + PENDING = "pending" + IN_PROGRESS = "in_progress" + DONE = "done" + SKIPPED = "skipped" + + +class SessionStatus(StrEnum): + ACTIVE = "active" + COMPLETED = "completed" + INTERRUPTED = "interrupted" + + +class DistractionType(StrEnum): + APP_SWITCH = "app_switch" + OFF_SCREEN = "off_screen" + IDLE = "idle" + BROWSING = "browsing" + + +class TaskSource(StrEnum): + MANUAL = "manual" + BRAIN_DUMP = "brain_dump" + VOICE = "voice" + + +class PlanType(StrEnum): + LLM_GENERATED = "llm_generated" + USER_DEFINED = "user_defined" + HYBRID = "hybrid" + + +class FrictionType(StrEnum): + REPETITIVE_LOOP = "repetitive_loop" + STALLED = "stalled" + TEDIOUS_MANUAL = "tedious_manual" + CONTEXT_OVERHEAD = "context_overhead" + TASK_RESUMPTION = "task_resumption" + NONE = "none" + + +class IntentType(StrEnum): + SKIMMING = "skimming" + ENGAGED = "engaged" + UNCLEAR = "unclear" + + +class ProactiveUserChoice(StrEnum): + ACCEPTED = "accepted" + DECLINED = "declined" + ALTERNATIVE_CHOSEN = "alternative_chosen" + + +class DevicePlatform(StrEnum): + MAC = "mac" + IPAD = "ipad" + IPHONE = "iphone" diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..a753072 --- /dev/null +++ b/environment.yml @@ -0,0 +1,84 @@ +name: lockinbro +channels: + - defaults +dependencies: + - _libgcc_mutex=0.1 + - _openmp_mutex=5.1 + - bzip2=1.0.8 + - ca-certificates=2025.12.2 + - ld_impl_linux-64=2.44 + - libexpat=2.7.5 + - libffi=3.4.4 + - libgcc=15.2.0 + - libgcc-ng=15.2.0 + - libgomp=15.2.0 + - libstdcxx=15.2.0 + - libstdcxx-ng=15.2.0 + - libuuid=1.41.5 + - libxcb=1.17.0 + - libzlib=1.3.1 + - ncurses=6.5 + - openssl=3.5.5 + - packaging=25.0 + - pip=26.0.1 + - pthread-stubs=0.3 + - python=3.12.13 + - readline=8.3 + - setuptools=80.10.2 + - sqlite=3.51.2 + - tk=8.6.15 + - tzdata=2026a + - wheel=0.46.3 + - xorg-libx11=1.8.12 + - xorg-libxau=1.0.12 + - xorg-libxdmcp=1.1.5 + - xorg-xorgproto=2024.1 + - xz=5.8.2 + - zlib=1.3.1 + - pip: + - alembic==1.18.4 + - annotated-doc==0.0.4 + - annotated-types==0.7.0 + - anthropic==0.86.0 + - anyio==4.13.0 + - argon2-cffi==25.1.0 + - argon2-cffi-bindings==25.1.0 + - asyncpg==0.31.0 + - certifi==2026.2.25 + - cffi==2.0.0 + - click==8.3.1 + - cryptography==46.0.6 + - distro==1.9.0 + - docstring-parser==0.17.0 + - ecdsa==0.19.2 + - fastapi==0.135.2 + - greenlet==3.3.2 + - h11==0.16.0 + - httpcore==1.0.9 + - httptools==0.7.1 + - httpx==0.28.1 + - idna==3.11 + - jiter==0.13.0 + - mako==1.3.10 + - markupsafe==3.0.3 + - pyasn1==0.6.3 + - pycparser==3.0 + - pydantic==2.12.5 + - pydantic-core==2.41.5 + - pydantic-settings==2.13.1 + - python-dotenv==1.2.2 + - python-jose==3.5.0 + - python-multipart==0.0.22 + - pyyaml==6.0.3 + - rsa==4.9.1 + - six==1.17.0 + - sniffio==1.3.1 + - sqlalchemy==2.0.48 + - starlette==1.0.0 + - typing-extensions==4.15.0 + - typing-inspection==0.4.2 + - uvicorn==0.42.0 + - uvloop==0.22.1 + - watchfiles==1.1.1 + - websockets==16.0 +prefix: /home/devuser/miniconda3/envs/lockinbro diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4afde23 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,59 @@ +alembic==1.18.4 +annotated-doc==0.0.4 +annotated-types==0.7.0 +anthropic==0.86.0 +anyio==4.13.0 +argon2-cffi==25.1.0 +argon2-cffi-bindings==25.1.0 +asyncpg==0.31.0 +certifi==2026.2.25 +cffi==2.0.0 +charset-normalizer==3.4.6 +click==8.3.1 +cryptography==46.0.6 +distro==1.9.0 +dnspython==2.8.0 +docstring_parser==0.17.0 +ecdsa==0.19.2 +email-validator==2.3.0 +fastapi==0.135.2 +google-auth==2.49.1 +google-genai==1.69.0 +greenlet==3.3.2 +h11==0.16.0 +h2>=4.1.0 +httpcore==1.0.9 +httptools==0.7.1 +httpx==0.28.1 +idna==3.11 +jiter==0.13.0 +Mako==1.3.10 +MarkupSafe==3.0.3 +packaging @ file:///home/task_176104874243446/conda-bld/packaging_1761049080023/work +psycopg2-binary==2.9.11 +pyasn1==0.6.3 +pyasn1_modules==0.4.2 +pycparser==3.0 +pydantic==2.12.5 +pydantic-settings==2.13.1 +pydantic_core==2.41.5 +python-dotenv==1.2.2 +python-jose==3.5.0 +python-multipart==0.0.22 +PyYAML==6.0.3 +requests==2.33.0 +rsa==4.9.1 +setuptools==80.10.2 +six==1.17.0 +sniffio==1.3.1 +SQLAlchemy==2.0.48 +starlette==1.0.0 +tenacity==9.1.4 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +urllib3==2.6.3 +uvicorn==0.42.0 +uvloop==0.22.1 +watchfiles==1.1.1 +websockets==16.0 +wheel==0.46.3