"""Module to search and list emails from Gmail.""" import os import re import base64 from datetime import datetime, timedelta from venv import logger from ics import Calendar # import pandas as pd from langchain_core.documents import Document from langchain_community.document_loaders import ( PyPDFLoader, UnstructuredExcelLoader, CSVLoader, UnstructuredImageLoader, ) from models.db import vectorstore # from models.mails import build_gmail_service SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"] EMAIL_PATTERN = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}" ATTACHMENTS_DIR = "cache" os.makedirs(ATTACHMENTS_DIR, exist_ok=True) # service = build_gmail_service() def search_emails(service, query): """Search emails based on a query.""" result = service.users().messages().list(userId="me", q=query).execute() messages = [] if "messages" in result: messages.extend(result["messages"]) while "nextPageToken" in result: page_token = result["nextPageToken"] result = ( service.users().messages().list(userId="me", q=query, pageToken=page_token).execute() ) if "messages" in result: messages.extend(result["messages"]) return messages def list_emails(service, messages): """ Processes a list of email messages, extracts metadata, decodes content, and handles attachments. Args: messages (list): A list of email message dictionaries, where each dictionary contains at least an 'id' key representing the email's unique identifier. Returns: None: The function processes the emails and adds the extracted documents to a vector store. Functionality: - Retrieves email details using the Gmail API. - Extracts metadata such as sender, recipient, subject, CC, and date. - Decodes email content in plain text or HTML format. - Handles multipart emails, including attachments. - Processes attachments based on their MIME type: - PDF files are loaded using PyPDFLoader. - Images (PNG, JPEG) are loaded using UnstructuredImageLoader. - CSV files are loaded using CSVLoader. - Excel files are loaded using UnstructuredExcelLoader. - Calendar files (ICS) are parsed to extract event details. - Removes HTML tags from email content. - Stores processed documents and metadata in a vector store. - Deletes temporary files created during attachment processing. Notes: - The function assumes the existence of a global `service` object for Gmail API interactions. - The `vectorstore.add_documents` method is used to store the processed documents. - Attachments are temporarily saved in a directory specified by `ATTACHMENTS_DIR` and deleted after processing. - The function logs information about attachments being downloaded. """ ids = [] documents = [] for message in messages: msg = service.users().messages().get(userId="me", id=message["id"], format="full").execute() metadata = {} for header in msg["payload"]["headers"]: if header["name"] == "From": metadata["from"] = header["value"] elif header["name"] == "To": metadata["to"] = header["value"] elif header["name"] == "Subject": metadata["subject"] = header["value"] print(f"subject: {metadata["subject"]}") elif header["name"] == "Cc": metadata["cc"] = header["value"] metadata["date"] = datetime.fromtimestamp(int(msg["internalDate"]) / 1000).strftime( "%d/%m/%Y %H:%M:%S" ) metadata["user_id"] = service.users().getProfile(userId="me").execute().get("emailAddress") metadata["msg_id"] = msg["id"] # print(metadata, msg["payload"]["mimeType"]) ids = [] documents = [] mime_types = [] if msg["payload"]["mimeType"] in [ "multipart/alternative", "multipart/related", "multipart/mixed", ]: mime_types = [] attach_docs = [] for part in msg["payload"]["parts"]: print("mimeType: ", part["mimeType"]) mime_types.append(part["mimeType"]) if part["mimeType"] == "text/plain" and "text/html" not in mime_types: body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") body = re.sub(r"<[^>]+>", "", body) # Remove HTML tags metadata["mimeType"] = part["mimeType"] documents.append(Document(page_content=body, metadata=metadata)) ids.append(msg["id"]) elif part["mimeType"] == "text/html" and "text/plain" not in mime_types: body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") body = re.sub(r"<[^>]+>", "", body) metadata["mimeType"] = part["mimeType"] documents.append(Document(page_content=body, metadata=metadata)) ids.append(msg["id"]) if part["filename"]: attachment_id = part["body"]["attachmentId"] logger.info("Downloading attachment: %s", part["filename"]) attachment = ( service.users() .messages() .attachments() .get(userId="me", messageId=message["id"], id=attachment_id) .execute() ) file_data = base64.urlsafe_b64decode(attachment["data"].encode("UTF-8")) path = os.path.join(".", ATTACHMENTS_DIR, part["filename"]) with open(path, "wb") as f: f.write(file_data) if part["mimeType"] == "application/pdf": attach_docs = PyPDFLoader(path).load() elif part["mimeType"] == "image/png" or part["mimeType"] == "image/jpeg": attach_docs = UnstructuredImageLoader(path).load() elif part["filename"].endswith(".csv"): attach_docs = CSVLoader(path).load() elif ( part["mimeType"] == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ): attach_docs = UnstructuredExcelLoader(path).load() elif part["mimeType"] == "application/ics": with open(path, "r", encoding="utf-8") as f: calendar = Calendar(f.read()) for event in calendar.events: documents.append( Document( page_content=f"Event: {event.name}\nDescription: {event.description}\nStart: {event.begin}\nEnd: {event.end}", metadata={ "attachment": part["filename"], "mimeType": part["mimeType"], "location": event.location, "created": event.created.strftime("%d/%m/%Y %H:%M:%S"), "last_modified": event.last_modified.strftime( "%d/%m/%Y %H:%M:%S" ), "start": event.begin.strftime("%d/%m/%Y %H:%M:%S"), "end": event.end.strftime("%d/%m/%Y %H:%M:%S"), }, ) ) ids.append(f"{msg['id']}_{attachment_id}") if os.path.exists(path): os.remove(path) for index, document in enumerate(attach_docs or []): document.metadata["mimeType"] = part["mimeType"] if "page_label" in document.metadata: document.metadata["page"] = document.metadata["page_label"] document.metadata["attachment"] = part["filename"] document.metadata = { key: value for key, value in document.metadata.items() if key in ["attachment", "page"] } document.metadata.update(metadata) documents.append(document) ids.append(f"{msg['id']}_{attachment_id}_{index}") elif msg["payload"]["mimeType"] == "text/plain" and "data" in msg["payload"]["body"]: body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") body = re.sub(r"<[^>]+>", "", body) metadata["mimeType"] = msg["payload"]["mimeType"] documents.append(Document(page_content=body, metadata=metadata)) ids.append(msg["id"]) elif msg["payload"]["mimeType"] == "text/html" and "data" in msg["payload"]["body"]: body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") body = re.sub(r"<[^>]+>", "", body) metadata["mimeType"] = msg["payload"]["mimeType"] documents.append(Document(page_content=body, metadata=metadata)) ids.append(msg["id"]) if "multipart/alternative" in mime_types and len(mime_types) == 1: print("Only multipart/alternative found in the email.") else: vectorstore.add_documents(documents=documents, ids=ids) def collect(service, query=(datetime.today() - timedelta(days=21)).strftime("after:%Y/%m/%d")): """ Main function to search and list emails from Gmail. This function builds a Gmail service, constructs a query to search for emails received in the last 14 days, and lists the found emails. If no emails are found, it prints a message indicating so. Returns: None """ query = "subject:Re: Smartcareers algorithm debug and improvement'" emails = search_emails(service, query) if emails: print("Found %d emails:\n", len(emails)) logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) list_emails(service, emails) logger.info("Listing emails...") return f"{len(emails)} emails added to the collection." else: logger.info("No emails found after two weeks ago.") # def get_documents(self): # """ # Main function to list emails from the database. # This function lists all emails stored in the database. # Returns: # None # """ # data = vectorstore.get() # df = pd.DataFrame( # {"ids": data["ids"], "documents": data["documents"], "metadatas": data["metadatas"]} # ) # df.to_excel("collection_data.xlsx", index=False) # df = pd.concat( # [df.drop("metadatas", axis=1), df["metadatas"].apply(pd.Series)], axis=1 # ).to_excel("collection_data_expand.xlsx", index=False) # def get(self): # """ # Main function to list emails from the database. # This function lists all emails stored in the database. # Returns: # None # """ # data = vectorstore.get() # df = pd.DataFrame( # {"id": data["ids"], "documents": data["documents"], "metadatas": data["metadatas"]} # ) # return df.to_dict(orient="records")