Spaces:
Sleeping
Sleeping
from googleapiclient.discovery import build | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from email.mime.text import MIMEText | |
import pickle | |
import os.path | |
import base64 | |
# Define los 谩mbitos que necesitas (ajusta seg煤n tus necesidades) | |
SCOPES = ['https://mail.google.com/', 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/gmail.modify'] | |
def gmail_tool(accion, parametros={}): | |
"""Interact煤a con la API de Gmail.""" | |
creds = None | |
if os.path.exists('token.pickle'): | |
with open('token.pickle', 'rb') as token: | |
creds = pickle.load(token) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
# Obtener URL de autorizaci贸n | |
auth_url, _ = flow.authorization_url(prompt='consent', redirect_uri='https://trainheartx-gmail.hf.space/callback') # IMPORTANTE: redirect_uri | |
return {"auth_url": auth_url, "requires_auth": True} # Indicar que se requiere autorizaci贸n | |
service = build('gmail', 'v1', credentials=creds) | |
result = {} # Inicializar result | |
try: | |
if accion == "leer_correos": | |
# ... (implementaci贸n para leer correos) ... | |
results = service.users().messages().list(userId='me', maxResults=parametros.get("maxResults", 10)).execute() # Usa parametros para maxResults | |
messages = results.get('messages', []) | |
result["messages"] = [] # Inicializa una lista para almacenar los mensajes | |
for message in messages: | |
msg = service.users().messages().get(userId='me', id=message['id']).execute() | |
# Decodificar el mensaje (puede ser multipart) | |
payload = msg['payload'] | |
parts = payload.get('parts', []) # Verificar si hay partes | |
body = "" | |
if parts: | |
for part in parts: | |
if part.get('mimeType') == 'text/plain': | |
data = part['body'].get('data', '') | |
body += base64.urlsafe_b64decode(data).decode() | |
else: # Si no hay partes, el cuerpo est谩 en payload['body'] | |
data = payload['body'].get('data','') | |
body+= base64.urlsafe_b64decode(data).decode() | |
print(body) | |
result["messages"].append({"body": body, "id": message['id']}) # Asocia el ID al mensaje | |
elif accion == "enviar_correo": | |
# ... (implementaci贸n para enviar correo) ... | |
destinatario = parametros.get("destinatario") | |
asunto = parametros.get("asunto") | |
cuerpo_correo = parametros.get("cuerpo") | |
message = MIMEText(cuerpo_correo) | |
message['to'] = destinatario | |
message['from'] = '[email protected]' #!CAMBIAR TU CORREO | |
message['subject'] = asunto | |
create_message = {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()} | |
send_message = service.users().messages().send(userId='me', body=create_message).execute() | |
print(F'Message Id: {send_message["id"]}') | |
result["message_id"] = send_message["id"] # Almacenar el ID | |
elif accion == "verificar_almacenamiento": | |
try: | |
drive_service = build('drive', 'v3', credentials=creds) | |
about = drive_service.about().get(fields="storageQuota").execute() | |
storage_quota = about.get('storageQuota') | |
# print(storage_quota) # Para depurar | |
return storage_quota # Devuelve la informaci贸n del almacenamiento | |
except Exception as e: | |
print(f"Error al verificar el almacenamiento: {e}") | |
return {"error": "Error al verificar el almacenamiento."} | |
return result # Retorna el resultado al final del try...except | |
# ... (otras acciones) | |
except Exception as e: | |
print(f"Error en gmail_tool: {e}") | |
result["error"] = str(e) | |
return result | |
# ... (Integraci贸n con el prompt del sistema) |