Implement token blacklisting and logout functionality
Browse files
main.py
CHANGED
@@ -200,7 +200,17 @@ def verify_token(token: str, secret_key: str):
|
|
200 |
return username
|
201 |
|
202 |
def verify_access_token(token: str = Depends(oauth2_scheme)):
|
203 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
204 |
|
205 |
# Endpoints
|
206 |
@app.get("/")
|
@@ -266,6 +276,36 @@ async def refresh(refresh_request: RefreshRequest):
|
|
266 |
detail="Could not validate credentials",
|
267 |
headers={"WWW-Authenticate": "Bearer"},
|
268 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
269 |
|
270 |
@app.post("/search", response_model=List[SearchResult])
|
271 |
async def search(
|
|
|
200 |
return username
|
201 |
|
202 |
def verify_access_token(token: str = Depends(oauth2_scheme)):
|
203 |
+
username = verify_token(token, SECRET_KEY)
|
204 |
+
|
205 |
+
# Check if token is blacklisted
|
206 |
+
if cache.get(f"blacklist_{token}"):
|
207 |
+
raise HTTPException(
|
208 |
+
status_code=status.HTTP_401_UNAUTHORIZED,
|
209 |
+
detail="Token has been revoked",
|
210 |
+
headers={"WWW-Authenticate": "Bearer"},
|
211 |
+
)
|
212 |
+
|
213 |
+
return username
|
214 |
|
215 |
# Endpoints
|
216 |
@app.get("/")
|
|
|
276 |
detail="Could not validate credentials",
|
277 |
headers={"WWW-Authenticate": "Bearer"},
|
278 |
)
|
279 |
+
|
280 |
+
@app.post("/logout")
|
281 |
+
def logout(
|
282 |
+
token: str = Depends(oauth2_scheme),
|
283 |
+
username: str = Depends(verify_access_token)
|
284 |
+
):
|
285 |
+
try:
|
286 |
+
# Decode token to get expiration time
|
287 |
+
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
288 |
+
exp_timestamp = payload.get("exp")
|
289 |
+
if exp_timestamp is None:
|
290 |
+
raise HTTPException(status_code=400, detail="Token missing expiration time")
|
291 |
+
|
292 |
+
# Calculate remaining token validity
|
293 |
+
current_time = datetime.now(timezone.utc).timestamp()
|
294 |
+
remaining_time = exp_timestamp - current_time
|
295 |
+
|
296 |
+
if remaining_time > 0:
|
297 |
+
# Add to blacklist cache with TTL matching token expiration
|
298 |
+
cache_key = f"blacklist_{token}"
|
299 |
+
cache.set(cache_key, True, expire=remaining_time)
|
300 |
+
|
301 |
+
return {"message": "Successfully logged out"}
|
302 |
+
|
303 |
+
except JWTError:
|
304 |
+
raise HTTPException(
|
305 |
+
status_code=status.HTTP_401_UNAUTHORIZED,
|
306 |
+
detail="Invalid token",
|
307 |
+
headers={"WWW-Authenticate": "Bearer"},
|
308 |
+
)
|
309 |
|
310 |
@app.post("/search", response_model=List[SearchResult])
|
311 |
async def search(
|