import os import time import email from email.policy import default from bs4 import BeautifulSoup from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from database import SessionLocal, Email, EMBEDDING_DIMENSIONS from google import genai from google.genai import types from datetime import datetime from sqlalchemy.exc import IntegrityError MAILDIR_PATH = os.environ.get("MAILDIR_PATH", "/Maildir") # Initialize Gemini client gemini_client = None api_key = os.environ.get("GEMINI_API_KEY") if api_key: gemini_client = genai.Client(api_key=api_key) print("Gemini client initialized for indexer.") else: print("WARNING: GEMINI_API_KEY not set. Indexer will skip embedding generation.") def extract_text_from_email(msg): text_content = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain": payload = part.get_payload(decode=True) if payload: text_content += payload.decode('utf-8', errors='ignore') + "\n" elif content_type == "text/html": payload = part.get_payload(decode=True) if payload: html_content = payload.decode('utf-8', errors='ignore') soup = BeautifulSoup(html_content, 'html.parser') text_content += soup.get_text(separator=' ') + "\n" else: content_type = msg.get_content_type() payload = msg.get_payload(decode=True) if payload: if content_type == "text/html": html_content = payload.decode('utf-8', errors='ignore') soup = BeautifulSoup(html_content, 'html.parser') text_content = soup.get_text(separator=' ') else: text_content = payload.decode('utf-8', errors='ignore') return text_content.strip() def process_email_file(filepath): print(f"Processing new email file: {filepath}") if not gemini_client: print("Skipping embedding generation: Gemini API key is missing.") return try: with open(filepath, 'rb') as f: msg = email.message_from_binary_file(f, policy=default) message_id = msg.get('Message-ID', filepath) subject = msg.get('Subject', '') sender = msg.get('From', '') date_str = msg.get('Date') try: email_date = email.utils.parsedate_to_datetime(date_str) if date_str else datetime.utcnow() except: email_date = datetime.utcnow() content = extract_text_from_email(msg) if not content: print(f"No text content found in {filepath}. Skipping.") return # Combine subject and content for better embedding text_to_embed = f"Subject: {subject}\nSender: {sender}\n\n{content}" # Limit text to avoid token limits (very rough truncation) text_to_embed = text_to_embed[:8000] # Get embedding via Gemini — RETRIEVAL_DOCUMENT is the correct task type # for content being stored and later retrieved by a query response = gemini_client.models.embed_content( model="gemini-embedding-001", contents=text_to_embed, config=types.EmbedContentConfig( task_type="RETRIEVAL_DOCUMENT", output_dimensionality=EMBEDDING_DIMENSIONS, ), ) embedding = response.embeddings[0].values # Save to DB db = SessionLocal() try: new_email = Email( message_id=message_id, subject=subject, sender=sender, date=email_date, content=content, embedding=embedding ) db.add(new_email) db.commit() print(f"Successfully indexed email: {subject}") except IntegrityError: db.rollback() print(f"Email {message_id} already exists in database.") except Exception as e: db.rollback() print(f"Database error saving email: {e}") finally: db.close() except Exception as e: print(f"Error processing email {filepath}: {e}") class NewEmailHandler(FileSystemEventHandler): def on_created(self, event): if not event.is_directory: # Simple check if it's likely an email file (mbsync creates files in cur/ or new/) if 'new/' in event.src_path or 'cur/' in event.src_path: process_email_file(event.src_path) def start_watching(): print(f"Starting to watch {MAILDIR_PATH} for new emails...") # Optional: Do a full initial sync of existing files here. # We will skip that for brevity and just watch for new ones. event_handler = NewEmailHandler() observer = Observer() observer.schedule(event_handler, MAILDIR_PATH, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": # Wait for DB to be initialized by FastAPI time.sleep(5) start_watching()