150 lines
5.2 KiB
Python
150 lines
5.2 KiB
Python
|
|
import os
|
||
|
|
import time
|
||
|
|
import email
|
||
|
|
from email.policy import default
|
||
|
|
from bs4 import BeautifulSoup
|
||
|
|
from watchdog.observers import Observer
|
||
|
|
from watchdog.events import FileSystemEventHandler
|
||
|
|
from database import SessionLocal, Email, EMBEDDING_DIMENSIONS
|
||
|
|
from google import genai
|
||
|
|
from google.genai import types
|
||
|
|
from datetime import datetime
|
||
|
|
from sqlalchemy.exc import IntegrityError
|
||
|
|
|
||
|
|
MAILDIR_PATH = os.environ.get("MAILDIR_PATH", "/Maildir")
|
||
|
|
|
||
|
|
# Initialize Gemini client
|
||
|
|
gemini_client = None
|
||
|
|
api_key = os.environ.get("GEMINI_API_KEY")
|
||
|
|
if api_key:
|
||
|
|
gemini_client = genai.Client(api_key=api_key)
|
||
|
|
print("Gemini client initialized for indexer.")
|
||
|
|
else:
|
||
|
|
print("WARNING: GEMINI_API_KEY not set. Indexer will skip embedding generation.")
|
||
|
|
|
||
|
|
def extract_text_from_email(msg):
|
||
|
|
text_content = ""
|
||
|
|
if msg.is_multipart():
|
||
|
|
for part in msg.walk():
|
||
|
|
content_type = part.get_content_type()
|
||
|
|
if content_type == "text/plain":
|
||
|
|
payload = part.get_payload(decode=True)
|
||
|
|
if payload:
|
||
|
|
text_content += payload.decode('utf-8', errors='ignore') + "\n"
|
||
|
|
elif content_type == "text/html":
|
||
|
|
payload = part.get_payload(decode=True)
|
||
|
|
if payload:
|
||
|
|
html_content = payload.decode('utf-8', errors='ignore')
|
||
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
|
text_content += soup.get_text(separator=' ') + "\n"
|
||
|
|
else:
|
||
|
|
content_type = msg.get_content_type()
|
||
|
|
payload = msg.get_payload(decode=True)
|
||
|
|
if payload:
|
||
|
|
if content_type == "text/html":
|
||
|
|
html_content = payload.decode('utf-8', errors='ignore')
|
||
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
|
text_content = soup.get_text(separator=' ')
|
||
|
|
else:
|
||
|
|
text_content = payload.decode('utf-8', errors='ignore')
|
||
|
|
return text_content.strip()
|
||
|
|
|
||
|
|
def process_email_file(filepath):
|
||
|
|
print(f"Processing new email file: {filepath}")
|
||
|
|
if not gemini_client:
|
||
|
|
print("Skipping embedding generation: Gemini API key is missing.")
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
with open(filepath, 'rb') as f:
|
||
|
|
msg = email.message_from_binary_file(f, policy=default)
|
||
|
|
|
||
|
|
message_id = msg.get('Message-ID', filepath)
|
||
|
|
subject = msg.get('Subject', '')
|
||
|
|
sender = msg.get('From', '')
|
||
|
|
date_str = msg.get('Date')
|
||
|
|
|
||
|
|
try:
|
||
|
|
email_date = email.utils.parsedate_to_datetime(date_str) if date_str else datetime.utcnow()
|
||
|
|
except:
|
||
|
|
email_date = datetime.utcnow()
|
||
|
|
|
||
|
|
content = extract_text_from_email(msg)
|
||
|
|
|
||
|
|
if not content:
|
||
|
|
print(f"No text content found in {filepath}. Skipping.")
|
||
|
|
return
|
||
|
|
|
||
|
|
# Combine subject and content for better embedding
|
||
|
|
text_to_embed = f"Subject: {subject}\nSender: {sender}\n\n{content}"
|
||
|
|
|
||
|
|
# Limit text to avoid token limits (very rough truncation)
|
||
|
|
text_to_embed = text_to_embed[:8000]
|
||
|
|
|
||
|
|
# Get embedding via Gemini — RETRIEVAL_DOCUMENT is the correct task type
|
||
|
|
# for content being stored and later retrieved by a query
|
||
|
|
response = gemini_client.models.embed_content(
|
||
|
|
model="gemini-embedding-001",
|
||
|
|
contents=text_to_embed,
|
||
|
|
config=types.EmbedContentConfig(
|
||
|
|
task_type="RETRIEVAL_DOCUMENT",
|
||
|
|
output_dimensionality=EMBEDDING_DIMENSIONS,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
embedding = response.embeddings[0].values
|
||
|
|
|
||
|
|
# Save to DB
|
||
|
|
db = SessionLocal()
|
||
|
|
try:
|
||
|
|
new_email = Email(
|
||
|
|
message_id=message_id,
|
||
|
|
subject=subject,
|
||
|
|
sender=sender,
|
||
|
|
date=email_date,
|
||
|
|
content=content,
|
||
|
|
embedding=embedding
|
||
|
|
)
|
||
|
|
db.add(new_email)
|
||
|
|
db.commit()
|
||
|
|
print(f"Successfully indexed email: {subject}")
|
||
|
|
except IntegrityError:
|
||
|
|
db.rollback()
|
||
|
|
print(f"Email {message_id} already exists in database.")
|
||
|
|
except Exception as e:
|
||
|
|
db.rollback()
|
||
|
|
print(f"Database error saving email: {e}")
|
||
|
|
finally:
|
||
|
|
db.close()
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error processing email {filepath}: {e}")
|
||
|
|
|
||
|
|
class NewEmailHandler(FileSystemEventHandler):
|
||
|
|
def on_created(self, event):
|
||
|
|
if not event.is_directory:
|
||
|
|
# Simple check if it's likely an email file (mbsync creates files in cur/ or new/)
|
||
|
|
if 'new/' in event.src_path or 'cur/' in event.src_path:
|
||
|
|
process_email_file(event.src_path)
|
||
|
|
|
||
|
|
def start_watching():
|
||
|
|
print(f"Starting to watch {MAILDIR_PATH} for new emails...")
|
||
|
|
|
||
|
|
# Optional: Do a full initial sync of existing files here.
|
||
|
|
# We will skip that for brevity and just watch for new ones.
|
||
|
|
|
||
|
|
event_handler = NewEmailHandler()
|
||
|
|
observer = Observer()
|
||
|
|
observer.schedule(event_handler, MAILDIR_PATH, recursive=True)
|
||
|
|
observer.start()
|
||
|
|
try:
|
||
|
|
while True:
|
||
|
|
time.sleep(1)
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
observer.stop()
|
||
|
|
observer.join()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
# Wait for DB to be initialized by FastAPI
|
||
|
|
time.sleep(5)
|
||
|
|
start_watching()
|