|
"""Module to search and list emails from Gmail.""" |
|
import os |
|
import re |
|
import base64 |
|
from datetime import datetime, timedelta |
|
from venv import logger |
|
from ics import Calendar |
|
|
|
import pandas as pd |
|
from langchain_core.documents import Document |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain_community.document_loaders.image import UnstructuredImageLoader |
|
from langchain_community.document_loaders import UnstructuredExcelLoader |
|
from langchain_community.document_loaders.csv_loader import CSVLoader |
|
|
|
from models.chroma import vectorstore |
|
from models.mails import build_gmail_service |
|
|
|
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] |
|
EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' |
|
|
|
ATTACHMENTS_DIR = "attachments" |
|
os.makedirs(ATTACHMENTS_DIR, exist_ok=True) |
|
|
|
service = build_gmail_service() |
|
|
|
def search_emails(query): |
|
"""Search emails based on a query.""" |
|
result = service.users().messages().list(userId='me', q=query).execute() |
|
messages = [] |
|
if 'messages' in result: |
|
messages.extend(result['messages']) |
|
while 'nextPageToken' in result: |
|
page_token = result['nextPageToken'] |
|
result = service.users().messages().list( |
|
userId='me', q=query, pageToken=page_token).execute() |
|
if 'messages' in result: |
|
messages.extend(result['messages']) |
|
return messages |
|
|
|
def list_emails(messages): |
|
""" |
|
Processes a list of email messages, extracts metadata, decodes content, and handles attachments. |
|
|
|
Args: |
|
messages (list): A list of email message dictionaries, where each dictionary contains |
|
at least an 'id' key representing the email's unique identifier. |
|
|
|
Returns: |
|
None: The function processes the emails and adds the extracted documents to a vector store. |
|
|
|
Functionality: |
|
- Retrieves email details using the Gmail API. |
|
- Extracts metadata such as sender, recipient, subject, CC, and date. |
|
- Decodes email content in plain text or HTML format. |
|
- Handles multipart emails, including attachments. |
|
- Processes attachments based on their MIME type: |
|
- PDF files are loaded using PyPDFLoader. |
|
- Images (PNG, JPEG) are loaded using UnstructuredImageLoader. |
|
- CSV files are loaded using CSVLoader. |
|
- Excel files are loaded using UnstructuredExcelLoader. |
|
- Calendar files (ICS) are parsed to extract event details. |
|
- Removes HTML tags from email content. |
|
- Stores processed documents and metadata in a vector store. |
|
- Deletes temporary files created during attachment processing. |
|
|
|
Notes: |
|
- The function assumes the existence of a global `service` object for Gmail API interactions. |
|
- The `vectorstore.add_documents` method is used to store the processed documents. |
|
- Attachments are temporarily saved in a directory specified by `ATTACHMENTS_DIR` and deleted after processing. |
|
- The function logs information about attachments being downloaded. |
|
""" |
|
ids = [] |
|
documents = [] |
|
for message in messages: |
|
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() |
|
metadata = {} |
|
for header in msg['payload']['headers']: |
|
if header['name'] == 'From': |
|
metadata['from'] = header['value'] |
|
elif header['name'] == 'To': |
|
metadata['to'] = header['value'] |
|
elif header['name'] == 'Subject': |
|
metadata['subject'] = header['value'] |
|
elif header['name'] == 'Cc': |
|
metadata['cc'] = header['value'] |
|
metadata['date'] = datetime.fromtimestamp( |
|
int(msg['internalDate']) / 1000).strftime("%d/%m/%Y %H:%M:%S") |
|
metadata['msg_id'] = msg['id'] |
|
print(metadata, msg['payload']['mimeType']) |
|
ids = [] |
|
documents = [] |
|
mimeType = [] |
|
if msg['payload']['mimeType'] in ['multipart/alternative', 'multipart/related', 'multipart/mixed']: |
|
mimeType = [] |
|
attach_docs = [] |
|
for part in msg['payload']['parts']: |
|
print("mimeType: ", part['mimeType']) |
|
mimeType.append(part['mimeType']) |
|
if part['mimeType'] == 'text/plain' and 'text/html' not in mimeType: |
|
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') |
|
body = re.sub(r'<[^>]+>', '', body) |
|
metadata['mimeType'] = part['mimeType'] |
|
documents.append(Document( |
|
page_content=body, |
|
metadata=metadata |
|
)) |
|
ids.append(msg['id']) |
|
elif part['mimeType'] == 'text/html' and 'text/plain' not in mimeType: |
|
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') |
|
body = re.sub(r'<[^>]+>', '', body) |
|
metadata['mimeType'] = part['mimeType'] |
|
documents.append(Document( |
|
page_content=body, |
|
metadata=metadata |
|
)) |
|
ids.append(msg['id']) |
|
if part['filename']: |
|
attachment_id = part['body']['attachmentId'] |
|
logger.info("Downloading attachment: %s", part['filename']) |
|
attachment = service.users().messages().attachments().get( |
|
userId='me', messageId=message['id'], id=attachment_id).execute() |
|
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) |
|
path = os.path.join(".", ATTACHMENTS_DIR, part['filename']) |
|
with open(path, 'wb') as f: |
|
f.write(file_data) |
|
if part['mimeType'] == 'application/pdf': |
|
attach_docs = PyPDFLoader(path).load() |
|
elif part['mimeType'] == 'image/png' or part['mimeType'] == 'image/jpeg': |
|
attach_docs = UnstructuredImageLoader(path).load() |
|
elif part['filename'].endswith('.csv'): |
|
attach_docs = CSVLoader(path).load() |
|
elif part['mimeType'] == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': |
|
attach_docs = UnstructuredExcelLoader(path).load() |
|
elif part['mimeType'] == 'application/ics': |
|
with open(path, 'r', encoding='utf-8') as f: |
|
calendar = Calendar(f.read()) |
|
for event in calendar.events: |
|
documents.append(Document( |
|
page_content = f"Event: {event.name}\nDescription: {event.description}\nStart: {event.begin}\nEnd: {event.end}", |
|
metadata = { |
|
"attachment": part['filename'], |
|
"mimeType": part['mimeType'], |
|
"location": event.location, |
|
"created": event.created.strftime("%d/%m/%Y %H:%M:%S"), |
|
"last_modified": event.last_modified.strftime("%d/%m/%Y %H:%M:%S"), |
|
"start": event.begin.strftime("%d/%m/%Y %H:%M:%S"), |
|
"end": event.end.strftime("%d/%m/%Y %H:%M:%S") |
|
} |
|
)) |
|
ids.append(f"{msg['id']}_{attachment_id}") |
|
if os.path.exists(path): |
|
os.remove(path) |
|
for index, document in enumerate(attach_docs or []): |
|
document.metadata['mimeType'] = part['mimeType'] |
|
if 'page_label' in document.metadata: |
|
document.metadata['page'] = document.metadata['page_label'] |
|
document.metadata['attachment'] = part['filename'] |
|
document.metadata = {key: value for key, value in document.metadata.items() if key in ['attachment', 'page']} |
|
document.metadata.update(metadata) |
|
documents.append(document) |
|
ids.append(f"{msg['id']}_{attachment_id}_{index}") |
|
elif msg['payload']['mimeType'] == 'text/plain' and 'data' in msg['payload']['body']: |
|
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') |
|
body = re.sub(r'<[^>]+>', '', body) |
|
metadata['mimeType'] = msg['payload']['mimeType'] |
|
documents.append(Document( |
|
page_content=body, |
|
metadata=metadata |
|
)) |
|
ids.append(msg['id']) |
|
elif msg['payload']['mimeType'] == 'text/html' and 'data' in msg['payload']['body']: |
|
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') |
|
body = re.sub(r'<[^>]+>', '', body) |
|
metadata['mimeType'] = msg['payload']['mimeType'] |
|
documents.append(Document( |
|
page_content=body, |
|
metadata=metadata |
|
)) |
|
ids.append(msg['id']) |
|
if 'multipart/alternative' in mimeType and len(mimeType) == 1: |
|
print("Only multipart/alternative found in the email.") |
|
else: |
|
vectorstore.add_documents(documents=documents, ids=ids) |
|
|
|
def collect(query = (datetime.today() - timedelta(days=21)).strftime('after:%Y/%m/%d')): |
|
""" |
|
Main function to search and list emails from Gmail. |
|
|
|
This function builds a Gmail service, constructs a query to search for emails |
|
received in the last 14 days, and lists the found emails. If no emails are found, |
|
it prints a message indicating so. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
emails = search_emails(query) |
|
if emails: |
|
print("Found %d emails:\n", len(emails)) |
|
logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) |
|
list_emails(emails) |
|
logger.info("Listing emails...") |
|
return f"{len(emails)} emails added to the collection." |
|
else: |
|
logger.info("No emails found after two weeks ago.") |
|
|
|
def get_documents(): |
|
""" |
|
Main function to list emails from the database. |
|
|
|
This function lists all emails stored in the database. |
|
|
|
Returns: |
|
None |
|
""" |
|
data = vectorstore.get() |
|
df = pd.DataFrame({ |
|
'ids': data['ids'], |
|
'documents': data['documents'], |
|
'metadatas': data['metadatas'] |
|
}) |
|
df.to_excel('collection_data.xlsx', index=False) |
|
df = pd.concat( |
|
[df.drop('metadatas', axis=1), df['metadatas'].apply(pd.Series)], |
|
axis=1).to_excel('collection_data_expand.xlsx', index=False) |
|
|
|
def get(): |
|
""" |
|
Main function to list emails from the database. |
|
|
|
This function lists all emails stored in the database. |
|
|
|
Returns: |
|
None |
|
""" |
|
data = vectorstore.get() |
|
df = pd.DataFrame({ |
|
'id': data['ids'], |
|
'documents': data['documents'], |
|
'metadatas': data['metadatas'] |
|
}) |
|
return df.to_dict(orient='records') |
|
|