Spaces:
Running
Running
from functools import wraps | |
from flask import request | |
from flask_restful import Resource | |
from werkzeug.exceptions import BadRequest, NotFound, Unauthorized | |
from controllers.web.error import WebSSOAuthRequiredError | |
from extensions.ext_database import db | |
from libs.passport import PassportService | |
from models.model import App, EndUser, Site | |
from services.feature_service import FeatureService | |
def validate_jwt_token(view=None): | |
def decorator(view): | |
def decorated(*args, **kwargs): | |
app_model, end_user = decode_jwt_token() | |
return view(app_model, end_user, *args, **kwargs) | |
return decorated | |
if view: | |
return decorator(view) | |
return decorator | |
def decode_jwt_token(): | |
system_features = FeatureService.get_system_features() | |
try: | |
auth_header = request.headers.get('Authorization') | |
if auth_header is None: | |
raise Unauthorized('Authorization header is missing.') | |
if ' ' not in auth_header: | |
raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.') | |
auth_scheme, tk = auth_header.split(None, 1) | |
auth_scheme = auth_scheme.lower() | |
if auth_scheme != 'bearer': | |
raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.') | |
decoded = PassportService().verify(tk) | |
app_code = decoded.get('app_code') | |
app_model = db.session.query(App).filter(App.id == decoded['app_id']).first() | |
site = db.session.query(Site).filter(Site.code == app_code).first() | |
if not app_model: | |
raise NotFound() | |
if not app_code or not site: | |
raise BadRequest('Site URL is no longer valid.') | |
if app_model.enable_site is False: | |
raise BadRequest('Site is disabled.') | |
end_user = db.session.query(EndUser).filter(EndUser.id == decoded['end_user_id']).first() | |
if not end_user: | |
raise NotFound() | |
_validate_web_sso_token(decoded, system_features) | |
return app_model, end_user | |
except Unauthorized as e: | |
if system_features.sso_enforced_for_web: | |
raise WebSSOAuthRequiredError() | |
raise Unauthorized(e.description) | |
def _validate_web_sso_token(decoded, system_features): | |
# Check if SSO is enforced for web, and if the token source is not SSO, raise an error and redirect to SSO login | |
if system_features.sso_enforced_for_web: | |
source = decoded.get('token_source') | |
if not source or source != 'sso': | |
raise WebSSOAuthRequiredError() | |
# Check if SSO is not enforced for web, and if the token source is SSO, raise an error and redirect to normal passport login | |
if not system_features.sso_enforced_for_web: | |
source = decoded.get('token_source') | |
if source and source == 'sso': | |
raise Unauthorized('sso token expired.') | |
class WebApiResource(Resource): | |
method_decorators = [validate_jwt_token] | |