Spaces:
Building
Building
"""Module to search and list emails from Gmail.""" | |
import os | |
import re | |
import base64 | |
from datetime import datetime, timedelta | |
from venv import logger | |
from ics import Calendar | |
import pandas as pd | |
from langchain_core.documents import Document | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.document_loaders.image import UnstructuredImageLoader | |
from langchain_community.document_loaders import UnstructuredExcelLoader | |
from langchain_community.document_loaders.csv_loader import CSVLoader | |
from models.chroma import vectorstore | |
from models.mails import build_gmail_service | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' | |
ATTACHMENTS_DIR = "attachments" | |
os.makedirs(ATTACHMENTS_DIR, exist_ok=True) | |
service = build_gmail_service() | |
def search_emails(query): | |
"""Search emails based on a query.""" | |
result = service.users().messages().list(userId='me', q=query).execute() | |
messages = [] | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
while 'nextPageToken' in result: | |
page_token = result['nextPageToken'] | |
result = service.users().messages().list( | |
userId='me', q=query, pageToken=page_token).execute() | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
return messages | |
def list_emails(messages): | |
""" | |
Processes a list of email messages, extracts metadata, decodes content, and handles attachments. | |
Args: | |
messages (list): A list of email message dictionaries, where each dictionary contains | |
at least an 'id' key representing the email's unique identifier. | |
Returns: | |
None: The function processes the emails and adds the extracted documents to a vector store. | |
Functionality: | |
- Retrieves email details using the Gmail API. | |
- Extracts metadata such as sender, recipient, subject, CC, and date. | |
- Decodes email content in plain text or HTML format. | |
- Handles multipart emails, including attachments. | |
- Processes attachments based on their MIME type: | |
- PDF files are loaded using PyPDFLoader. | |
- Images (PNG, JPEG) are loaded using UnstructuredImageLoader. | |
- CSV files are loaded using CSVLoader. | |
- Excel files are loaded using UnstructuredExcelLoader. | |
- Calendar files (ICS) are parsed to extract event details. | |
- Removes HTML tags from email content. | |
- Stores processed documents and metadata in a vector store. | |
- Deletes temporary files created during attachment processing. | |
Notes: | |
- The function assumes the existence of a global `service` object for Gmail API interactions. | |
- The `vectorstore.add_documents` method is used to store the processed documents. | |
- Attachments are temporarily saved in a directory specified by `ATTACHMENTS_DIR` and deleted after processing. | |
- The function logs information about attachments being downloaded. | |
""" | |
ids = [] | |
documents = [] | |
for message in messages: | |
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() | |
metadata = {} | |
for header in msg['payload']['headers']: | |
if header['name'] == 'From': | |
metadata['from'] = header['value'] | |
elif header['name'] == 'To': | |
metadata['to'] = header['value'] | |
elif header['name'] == 'Subject': | |
metadata['subject'] = header['value'] | |
elif header['name'] == 'Cc': | |
metadata['cc'] = header['value'] | |
metadata['date'] = datetime.fromtimestamp( | |
int(msg['internalDate']) / 1000).strftime("%d/%m/%Y %H:%M:%S") | |
metadata['msg_id'] = msg['id'] | |
print(metadata, msg['payload']['mimeType']) | |
ids = [] | |
documents = [] | |
mimeType = [] | |
if msg['payload']['mimeType'] in ['multipart/alternative', 'multipart/related', 'multipart/mixed']: | |
mimeType = [] | |
attach_docs = [] | |
for part in msg['payload']['parts']: | |
print("mimeType: ", part['mimeType']) | |
mimeType.append(part['mimeType']) | |
if part['mimeType'] == 'text/plain' and 'text/html' not in mimeType: | |
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') | |
body = re.sub(r'<[^>]+>', '', body) # Remove HTML tags | |
metadata['mimeType'] = part['mimeType'] | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
ids.append(msg['id']) | |
elif part['mimeType'] == 'text/html' and 'text/plain' not in mimeType: | |
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') | |
body = re.sub(r'<[^>]+>', '', body) | |
metadata['mimeType'] = part['mimeType'] | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
ids.append(msg['id']) | |
if part['filename']: | |
attachment_id = part['body']['attachmentId'] | |
logger.info("Downloading attachment: %s", part['filename']) | |
attachment = service.users().messages().attachments().get( | |
userId='me', messageId=message['id'], id=attachment_id).execute() | |
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) | |
path = os.path.join(".", ATTACHMENTS_DIR, part['filename']) | |
with open(path, 'wb') as f: | |
f.write(file_data) | |
if part['mimeType'] == 'application/pdf': | |
attach_docs = PyPDFLoader(path).load() | |
elif part['mimeType'] == 'image/png' or part['mimeType'] == 'image/jpeg': | |
attach_docs = UnstructuredImageLoader(path).load() | |
elif part['filename'].endswith('.csv'): | |
attach_docs = CSVLoader(path).load() | |
elif part['mimeType'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': | |
attach_docs = UnstructuredExcelLoader(path).load() | |
elif part['mimeType'] == 'application/ics': | |
with open(path, 'r', encoding='utf-8') as f: | |
calendar = Calendar(f.read()) | |
for event in calendar.events: | |
documents.append(Document( | |
page_content = f"Event: {event.name}\nDescription: {event.description}\nStart: {event.begin}\nEnd: {event.end}", | |
metadata = { | |
"attachment": part['filename'], | |
"mimeType": part['mimeType'], | |
"location": event.location, | |
"created": event.created.strftime("%d/%m/%Y %H:%M:%S"), | |
"last_modified": event.last_modified.strftime("%d/%m/%Y %H:%M:%S"), | |
"start": event.begin.strftime("%d/%m/%Y %H:%M:%S"), | |
"end": event.end.strftime("%d/%m/%Y %H:%M:%S") | |
} | |
)) | |
ids.append(f"{msg['id']}_{attachment_id}") | |
if os.path.exists(path): | |
os.remove(path) | |
for index, document in enumerate(attach_docs or []): | |
document.metadata['mimeType'] = part['mimeType'] | |
if 'page_label' in document.metadata: | |
document.metadata['page'] = document.metadata['page_label'] | |
document.metadata['attachment'] = part['filename'] | |
document.metadata = {key: value for key, value in document.metadata.items() if key in ['attachment', 'page']} | |
document.metadata.update(metadata) | |
documents.append(document) | |
ids.append(f"{msg['id']}_{attachment_id}_{index}") | |
elif msg['payload']['mimeType'] == 'text/plain' and 'data' in msg['payload']['body']: | |
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') | |
body = re.sub(r'<[^>]+>', '', body) | |
metadata['mimeType'] = msg['payload']['mimeType'] | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
ids.append(msg['id']) | |
elif msg['payload']['mimeType'] == 'text/html' and 'data' in msg['payload']['body']: | |
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') | |
body = re.sub(r'<[^>]+>', '', body) | |
metadata['mimeType'] = msg['payload']['mimeType'] | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
ids.append(msg['id']) | |
if 'multipart/alternative' in mimeType and len(mimeType) == 1: | |
print("Only multipart/alternative found in the email.") | |
else: | |
vectorstore.add_documents(documents=documents, ids=ids) | |
def collect(query = (datetime.today() - timedelta(days=21)).strftime('after:%Y/%m/%d')): | |
""" | |
Main function to search and list emails from Gmail. | |
This function builds a Gmail service, constructs a query to search for emails | |
received in the last 14 days, and lists the found emails. If no emails are found, | |
it prints a message indicating so. | |
Returns: | |
None | |
""" | |
# query = "subject:Re: Smartcareers algorithm debug and improvement'" | |
emails = search_emails(query) | |
if emails: | |
print("Found %d emails:\n", len(emails)) | |
logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) | |
list_emails(emails) | |
logger.info("Listing emails...") | |
return f"{len(emails)} emails added to the collection." | |
else: | |
logger.info("No emails found after two weeks ago.") | |
def get_documents(): | |
""" | |
Main function to list emails from the database. | |
This function lists all emails stored in the database. | |
Returns: | |
None | |
""" | |
data = vectorstore.get() | |
df = pd.DataFrame({ | |
'ids': data['ids'], | |
'documents': data['documents'], | |
'metadatas': data['metadatas'] | |
}) | |
df.to_excel('collection_data.xlsx', index=False) | |
df = pd.concat( | |
[df.drop('metadatas', axis=1), df['metadatas'].apply(pd.Series)], | |
axis=1).to_excel('collection_data_expand.xlsx', index=False) | |
def get(): | |
""" | |
Main function to list emails from the database. | |
This function lists all emails stored in the database. | |
Returns: | |
None | |
""" | |
data = vectorstore.get() | |
df = pd.DataFrame({ | |
'id': data['ids'], | |
'documents': data['documents'], | |
'metadatas': data['metadatas'] | |
}) | |
return df.to_dict(orient='records') | |