gavinzli's picture
Refactor Google OAuth2 callback to include state validation and error handling
c75e17a
raw
history blame
5.14 kB
"""Module for defining the main routes of the API."""
import os
import json
import pickle
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import JSONResponse
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
router = APIRouter(tags=["auth"])
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
REDIRECT_URI = os.environ.get("REDIRECT_URI")
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
# Client config for OAuth flow
CLIENT_CONFIG = {
"web": {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uris": [REDIRECT_URI],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
}
@router.get("/auth/google/url")
async def get_auth_url():
"""
Handles the generation of a Google OAuth 2.0 authorization URL.
This endpoint initializes an OAuth 2.0 flow using the provided client configuration
and scopes, sets the redirect URI, and generates an authorization URL for the user
to grant access.
Returns:
dict: A dictionary containing the generated authorization URL under the key "url".
"""
flow = InstalledAppFlow.from_client_config(CLIENT_CONFIG, SCOPES)
flow.redirect_uri = REDIRECT_URI
auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent")
return JSONResponse({"url": auth_url})
@router.get("/auth/google/callback")
async def google_callback(code: str, state: str = None, scope: str = None, request: Request = None):
try:
# Validate state (optional, for CSRF protection)
if state and request.state.session.get("oauth_state") != state:
raise HTTPException(status_code=400, detail="Invalid state parameter")
flow = InstalledAppFlow.from_client_config(CLIENT_CONFIG, SCOPES)
flow.redirect_uri = REDIRECT_URI
flow.fetch_token(code=code)
credentials = flow.credentials
request.state.session["credential"] = json.loads(credentials.to_json())
service = build("gmail", "v1", credentials=credentials)
profile = service.users().getProfile(userId="me").execute()
# Ensure cache directory exists
os.makedirs("cache", exist_ok=True)
with open(f"cache/{profile['emailAddress']}.pickle", "wb") as token:
pickle.dump(credentials, token)
return JSONResponse(profile)
except InvalidGrantError:
raise HTTPException(status_code=400, detail="Invalid or expired authorization code")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Authentication failed: {str(e)}")
# @router.get("/auth/google/callback")
# async def google_callback(code: str, request: Request):
# """
# Handles the Google OAuth2 callback by exchanging the authorization code for credentials,
# retrieving the user's Gmail profile, and saving the credentials to a file.
# Args:
# code (str): The authorization code returned by Google's OAuth2 server.
# request (Request): The incoming HTTP request object.
# Returns:
# JSONResponse: A JSON response containing the user's Gmail profile information.
# Side Effects:
# - Saves the user's credentials to a pickle file named after their email address.
# - Stores the credentials in the session state of the request.
# Dependencies:
# - google_auth_oauthlib.flow.InstalledAppFlow: Used to handle the OAuth2 flow.
# - googleapiclient.discovery.build: Used to build the Gmail API service.
# - json: Used to serialize and deserialize credentials.
# - pickle: Used to save credentials to a file.
# """
# flow = InstalledAppFlow.from_client_config(CLIENT_CONFIG, SCOPES)
# flow.redirect_uri = REDIRECT_URI
# flow.fetch_token(code=code)
# credentials = flow.credentials
# request.state.session["credential"] = json.loads(credentials.to_json())
# # cred_dict = (request.state.session.get("credential"))
# # cred = Credentials(
# # token=cred_dict["token"],
# # refresh_token=cred_dict["refresh_token"],
# # token_uri=cred_dict["token_uri"],
# # client_id=cred_dict["client_id"],
# # client_secret=cred_dict["client_secret"],
# # scopes=cred_dict["scopes"],
# # )
# # service = build("gmail", "v1", credentials=Credentials(
# # token=cred_dict["token"],
# # refresh_token=cred_dict["refresh_token"],
# # token_uri=cred_dict["token_uri"],
# # client_id=cred_dict["client_id"],
# # client_secret=cred_dict["client_secret"],
# # scopes=cred_dict["scopes"],
# # ))
# service = build("gmail", "v1", credentials=credentials)
# profile = service.users().getProfile(userId="me").execute()
# with open(f"cache/{profile['emailAddress']}.pickle", "wb") as token:
# pickle.dump(credentials, token)
# return JSONResponse(profile)