Spaces:
Running
Running
"""Module to search and list emails from Gmail.""" | |
import os | |
import re | |
import base64 | |
from datetime import datetime, timedelta | |
from venv import logger | |
import pandas as pd | |
from langchain_core.documents import Document | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.document_loaders.image import UnstructuredImageLoader | |
from langchain_community.document_loaders.csv_loader import CSVLoader | |
from models.chroma import vectorstore | |
from models.mails import build_gmail_service | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' | |
ATTACHMENTS_DIR = "attachments" | |
os.makedirs(ATTACHMENTS_DIR, exist_ok=True) | |
service = build_gmail_service() | |
def search_emails(query): | |
"""Search emails based on a query.""" | |
result = service.users().messages().list(userId='me', q=query).execute() | |
messages = [] | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
while 'nextPageToken' in result: | |
page_token = result['nextPageToken'] | |
result = service.users().messages().list( | |
userId='me', q=query, pageToken=page_token).execute() | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
return messages | |
def list_emails(messages): | |
"""List emails from the search results and download attachments.""" | |
ids = [] | |
documents = [] | |
for message in messages[:100]: | |
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() | |
metadata = {} | |
for header in msg['payload']['headers']: | |
if header['name'] == 'From': | |
metadata['from'] = header['value'] | |
elif header['name'] == 'To': | |
metadata['to'] = header['value'] | |
elif header['name'] == 'Subject': | |
metadata['subject'] = header['value'] | |
elif header['name'] == 'Cc': | |
metadata['cc'] = header['value'] | |
metadata['date'] = datetime.fromtimestamp( | |
int(msg['internalDate']) / 1000).strftime("%d/%m/%Y %H:%M:%S") | |
body = "" | |
if 'parts' in msg['payload']: | |
for part in msg['payload']['parts']: | |
if part['filename']: | |
attachment_id = part['body']['attachmentId'] | |
logger.info("Downloading attachment: %s", part['filename']) | |
attachment = service.users().messages().attachments().get( | |
userId='me', messageId=message['id'], id=attachment_id).execute() | |
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) | |
path = os.path.join(".", ATTACHMENTS_DIR, part['filename']) | |
with open(path, 'wb') as f: | |
f.write(file_data) | |
if part['filename'].endswith('.pdf'): | |
attachment_documents = PyPDFLoader(path).load() | |
documents = documents + attachment_documents | |
if part['filename'].endswith('.png'): | |
attachment_documents = UnstructuredImageLoader(path).load() | |
documents = documents + attachment_documents | |
if part['filename'].endswith('.csv'): | |
attachment_documents = CSVLoader(path).load() | |
documents = documents + attachment_documents | |
for index, document in enumerate(documents): | |
_id = f"{msg['id']}_{index}" | |
if 'source' in document.metadata: | |
document.metadata['source'] = document.metadata['source'].replace(f"./{ATTACHMENTS_DIR}/", "") | |
document.metadata.update(metadata) | |
ids.append(_id) | |
else: | |
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') | |
body = re.sub(r'<[^>]+>', '', body) # Remove HTML tags | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
ids.append(msg['id']) | |
return vectorstore.add_documents(documents=documents, ids=ids) | |
def collect(query = (datetime.today() - timedelta(days=21)).strftime('after:%Y/%m/%d')): | |
""" | |
Main function to search and list emails from Gmail. | |
This function builds a Gmail service, constructs a query to search for emails | |
received in the last 14 days, and lists the found emails. If no emails are found, | |
it prints a message indicating so. | |
Returns: | |
None | |
""" | |
emails = search_emails(query) | |
if emails: | |
logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) | |
return f"{len(list_emails(emails))} emails added to the collection." | |
else: | |
logger.info("No emails found after two weeks ago.") | |
def get_documents(): | |
""" | |
Main function to list emails from the database. | |
This function lists all emails stored in the database. | |
Returns: | |
None | |
""" | |
data = vectorstore.get() | |
df = pd.DataFrame({ | |
'ids': data['ids'], | |
'documents': data['documents'], | |
'metadatas': data['metadatas'] | |
}) | |
df = pd.concat( | |
[df.drop('metadatas', axis=1), df['metadatas'].apply(pd.Series)], | |
axis=1).to_csv('collection_data.csv', index=False) | |