Files
AllMail/api/indexer.py
Aditya Pulipaka 70ee32efdd
Some checks failed
Deploy to Server / deploy (push) Failing after 5s
beforeLocal
2026-05-05 00:47:39 +00:00

150 lines
5.2 KiB
Python

import os
import time
import email
from email.policy import default
from bs4 import BeautifulSoup
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from database import SessionLocal, Email, EMBEDDING_DIMENSIONS
from google import genai
from google.genai import types
from datetime import datetime
from sqlalchemy.exc import IntegrityError
MAILDIR_PATH = os.environ.get("MAILDIR_PATH", "/Maildir")
# Initialize Gemini client
gemini_client = None
api_key = os.environ.get("GEMINI_API_KEY")
if api_key:
gemini_client = genai.Client(api_key=api_key)
print("Gemini client initialized for indexer.")
else:
print("WARNING: GEMINI_API_KEY not set. Indexer will skip embedding generation.")
def extract_text_from_email(msg):
text_content = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
text_content += payload.decode('utf-8', errors='ignore') + "\n"
elif content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
html_content = payload.decode('utf-8', errors='ignore')
soup = BeautifulSoup(html_content, 'html.parser')
text_content += soup.get_text(separator=' ') + "\n"
else:
content_type = msg.get_content_type()
payload = msg.get_payload(decode=True)
if payload:
if content_type == "text/html":
html_content = payload.decode('utf-8', errors='ignore')
soup = BeautifulSoup(html_content, 'html.parser')
text_content = soup.get_text(separator=' ')
else:
text_content = payload.decode('utf-8', errors='ignore')
return text_content.strip()
def process_email_file(filepath):
print(f"Processing new email file: {filepath}")
if not gemini_client:
print("Skipping embedding generation: Gemini API key is missing.")
return
try:
with open(filepath, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
message_id = msg.get('Message-ID', filepath)
subject = msg.get('Subject', '')
sender = msg.get('From', '')
date_str = msg.get('Date')
try:
email_date = email.utils.parsedate_to_datetime(date_str) if date_str else datetime.utcnow()
except:
email_date = datetime.utcnow()
content = extract_text_from_email(msg)
if not content:
print(f"No text content found in {filepath}. Skipping.")
return
# Combine subject and content for better embedding
text_to_embed = f"Subject: {subject}\nSender: {sender}\n\n{content}"
# Limit text to avoid token limits (very rough truncation)
text_to_embed = text_to_embed[:8000]
# Get embedding via Gemini — RETRIEVAL_DOCUMENT is the correct task type
# for content being stored and later retrieved by a query
response = gemini_client.models.embed_content(
model="gemini-embedding-001",
contents=text_to_embed,
config=types.EmbedContentConfig(
task_type="RETRIEVAL_DOCUMENT",
output_dimensionality=EMBEDDING_DIMENSIONS,
),
)
embedding = response.embeddings[0].values
# Save to DB
db = SessionLocal()
try:
new_email = Email(
message_id=message_id,
subject=subject,
sender=sender,
date=email_date,
content=content,
embedding=embedding
)
db.add(new_email)
db.commit()
print(f"Successfully indexed email: {subject}")
except IntegrityError:
db.rollback()
print(f"Email {message_id} already exists in database.")
except Exception as e:
db.rollback()
print(f"Database error saving email: {e}")
finally:
db.close()
except Exception as e:
print(f"Error processing email {filepath}: {e}")
class NewEmailHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
# Simple check if it's likely an email file (mbsync creates files in cur/ or new/)
if 'new/' in event.src_path or 'cur/' in event.src_path:
process_email_file(event.src_path)
def start_watching():
print(f"Starting to watch {MAILDIR_PATH} for new emails...")
# Optional: Do a full initial sync of existing files here.
# We will skip that for brevity and just watch for new ones.
event_handler = NewEmailHandler()
observer = Observer()
observer.schedule(event_handler, MAILDIR_PATH, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
# Wait for DB to be initialized by FastAPI
time.sleep(5)
start_watching()