"""Module to search and list emails from Gmail.""" import os import re import base64 from datetime import datetime, timedelta from venv import logger import pandas as pd from langchain_core.documents import Document from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders.image import UnstructuredImageLoader from langchain_community.document_loaders.csv_loader import CSVLoader from models.chroma import vectorstore from models.mails import build_gmail_service SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' ATTACHMENTS_DIR = "attachments" os.makedirs(ATTACHMENTS_DIR, exist_ok=True) service = build_gmail_service() def search_emails(query): """Search emails based on a query.""" result = service.users().messages().list(userId='me', q=query).execute() messages = [] if 'messages' in result: messages.extend(result['messages']) while 'nextPageToken' in result: page_token = result['nextPageToken'] result = service.users().messages().list( userId='me', q=query, pageToken=page_token).execute() if 'messages' in result: messages.extend(result['messages']) return messages def list_emails(messages): """List emails from the search results and download attachments.""" ids = [] documents = [] for message in messages[:100]: msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() metadata = {} for header in msg['payload']['headers']: if header['name'] == 'From': metadata['from'] = header['value'] elif header['name'] == 'To': metadata['to'] = header['value'] elif header['name'] == 'Subject': metadata['subject'] = header['value'] elif header['name'] == 'Cc': metadata['cc'] = header['value'] metadata['date'] = datetime.fromtimestamp( int(msg['internalDate']) / 1000).strftime("%d/%m/%Y %H:%M:%S") body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['filename']: attachment_id = part['body']['attachmentId'] logger.info("Downloading attachment: %s", part['filename']) attachment = service.users().messages().attachments().get( userId='me', messageId=message['id'], id=attachment_id).execute() file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) path = os.path.join(".", ATTACHMENTS_DIR, part['filename']) with open(path, 'wb') as f: f.write(file_data) if part['filename'].endswith('.pdf'): attachment_documents = PyPDFLoader(path).load() documents = documents + attachment_documents if part['filename'].endswith('.png'): attachment_documents = UnstructuredImageLoader(path).load() documents = documents + attachment_documents if part['filename'].endswith('.csv'): attachment_documents = CSVLoader(path).load() documents = documents + attachment_documents for index, document in enumerate(documents): _id = f"{msg['id']}_{index}" if 'source' in document.metadata: document.metadata['source'] = document.metadata['source'].replace(f"./{ATTACHMENTS_DIR}/", "") document.metadata.update(metadata) ids.append(_id) else: body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') body = re.sub(r'<[^>]+>', '', body) # Remove HTML tags documents.append(Document( page_content=body, metadata=metadata )) ids.append(msg['id']) return vectorstore.add_documents(documents=documents, ids=ids) def collect(query = (datetime.today() - timedelta(days=21)).strftime('after:%Y/%m/%d')): """ Main function to search and list emails from Gmail. This function builds a Gmail service, constructs a query to search for emails received in the last 14 days, and lists the found emails. If no emails are found, it prints a message indicating so. Returns: None """ emails = search_emails(query) if emails: logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) return f"{len(list_emails(emails))} emails added to the collection." else: logger.info("No emails found after two weeks ago.") def get_documents(): """ Main function to list emails from the database. This function lists all emails stored in the database. Returns: None """ data = vectorstore.get() df = pd.DataFrame({ 'ids': data['ids'], 'documents': data['documents'], 'metadatas': data['metadatas'] }) df = pd.concat( [df.drop('metadatas', axis=1), df['metadatas'].apply(pd.Series)], axis=1).to_csv('collection_data.csv', index=False)