SNAPZION-TESTING / main.py
Niansuh's picture
Update main.py
58893fb verified
raw
history blame contribute delete
8.79 kB
import os
import secrets
import datetime
import requests
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
# Load environment variables from .env file
load_dotenv()
# Environment variables
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_DB = os.getenv("MYSQL_DB")
MAIN_API_KEY = os.getenv("MAIN_API_KEY")
MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions")
MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator")
DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# ---------------------------
# Database Models
# ---------------------------
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
hashed_password = Column(String(128), nullable=False)
is_admin = Column(Boolean, default=False)
credits = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship to API keys
api_keys = relationship("APIKey", back_populates="owner", cascade="all, delete-orphan")
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(64), unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
expiry_date = Column(DateTime, nullable=True)
active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
owner = relationship("User", back_populates="api_keys")
Base.metadata.create_all(bind=engine)
# ---------------------------
# FastAPI App & Config
# ---------------------------
app = FastAPI(title="Real API Key Platform")
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def generate_api_key() -> str:
return secrets.token_hex(16)
# ---------------------------
# Pydantic Models
# ---------------------------
class UserCreate(BaseModel):
username: str
password: str
class UserOut(BaseModel):
id: int
username: str
is_admin: bool
credits: int
created_at: datetime.datetime
class Config:
orm_mode = True
class APIKeyOut(BaseModel):
id: int
key: str
expiry_date: datetime.datetime | None = None
active: bool
created_at: datetime.datetime
class Config:
orm_mode = True
class GenerateKeyPayload(BaseModel):
expiry_date: datetime.datetime | None = None
class RequestPayload(BaseModel):
prompt: str
class CreditPayload(BaseModel):
username: str
credits: int
class DeactivateKeyPayload(BaseModel):
key: str
# ---------------------------
# Authentication Dependency
# ---------------------------
def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User:
api_key_obj = db.query(APIKey).filter(APIKey.key == x_api_key, APIKey.active == True).first()
if not api_key_obj:
raise HTTPException(status_code=401, detail="Invalid or inactive API Key")
if api_key_obj.expiry_date and datetime.datetime.utcnow() > api_key_obj.expiry_date:
raise HTTPException(status_code=401, detail="API Key expired")
return api_key_obj.owner
# ---------------------------
# Endpoints
# ---------------------------
# Registration: Create a new user and generate a primary API key (no expiry)
@app.post("/register", response_model=UserOut)
def register(user: UserCreate, db: Session = Depends(get_db)):
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(status_code=400, detail="Username already exists")
new_user = User(username=user.username, hashed_password=user.password, is_admin=False)
db.add(new_user)
db.commit()
db.refresh(new_user)
primary_key = APIKey(key=generate_api_key(), user_id=new_user.id, expiry_date=None, active=True)
db.add(primary_key)
db.commit()
db.refresh(primary_key)
return new_user
# User panel: Get current user details
@app.get("/user/me", response_model=UserOut)
def read_user_me(current_user: User = Depends(get_current_user)):
return current_user
# List all API keys belonging to the current user
@app.get("/user/api_keys", response_model=list[APIKeyOut])
def get_user_api_keys(current_user: User = Depends(get_current_user)):
return current_user.api_keys
# Allow user to generate a new API key (with optional expiry date)
@app.post("/user/generate_key", response_model=APIKeyOut)
def generate_key(payload: GenerateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
new_key = APIKey(key=generate_api_key(), user_id=current_user.id, expiry_date=payload.expiry_date, active=True)
db.add(new_key)
db.commit()
db.refresh(new_key)
return new_key
# Test endpoint for users
@app.get("/user/test_api")
def test_api(current_user: User = Depends(get_current_user)):
return {"message": "API is working", "username": current_user.username, "credits": current_user.credits}
# Proxy endpoint: Forwards request to main API using the secured main API key
@app.post("/generate")
def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)):
headers = {
"Authorization": f"Bearer {MAIN_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": MODEL_NAME,
"prompt": payload.prompt
}
response = requests.post(MAIN_API_URL, json=data, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error from main API")
return response.json()
# ---------------------------
# Admin Endpoints
# ---------------------------
# List all users (admin-only)
@app.get("/admin/users", response_model=list[UserOut])
def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
users = db.query(User).all()
return users
# Add credits to a user's account (admin-only)
@app.post("/admin/add_credit")
def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
user = db.query(User).filter(User.username == payload.username).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.credits += payload.credits
db.commit()
db.refresh(user)
return {"message": f"Added {payload.credits} credits to {user.username}. Total credits: {user.credits}"}
# List all API keys in the system (admin-only)
@app.get("/admin/api_keys", response_model=list[APIKeyOut])
def list_all_api_keys(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
keys = db.query(APIKey).all()
return keys
# Deactivate an API key (admin-only)
@app.post("/admin/deactivate_key")
def deactivate_key(payload: DeactivateKeyPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
key_obj = db.query(APIKey).filter(APIKey.key == payload.key).first()
if not key_obj:
raise HTTPException(status_code=404, detail="API Key not found")
key_obj.active = False
db.commit()
return {"message": f"API Key {payload.key} deactivated."}
# ---------------------------
# UI Endpoints (Panels)
# ---------------------------
@app.get("/admin/ui")
def admin_ui(request: Request):
return templates.TemplateResponse("admin.html", {"request": request})
@app.get("/user/ui")
def user_ui(request: Request):
return templates.TemplateResponse("user.html", {"request": request})