Spaces:
Running
Running
from fastapi import APIRouter, HTTPException, Depends, Request, status | |
from typing import List, Optional | |
from pydantic import BaseModel | |
from .auth_router import get_current_user | |
router = APIRouter() | |
# Models | |
class JournalBase(BaseModel): | |
title: str | |
content: Optional[str] = None | |
class JournalCreate(JournalBase): | |
project_id: int | |
class JournalUpdate(BaseModel): | |
title: Optional[str] = None | |
content: Optional[str] = None | |
class JournalResponse(JournalBase): | |
id: int | |
project_id: int | |
created_at: str | |
updated_at: str | |
# Helper function to check project ownership | |
async def check_project_ownership(request: Request, project_id: int, user_id: int): | |
query = "SELECT * FROM projects WHERE id = ? AND owner_id = ?" | |
project = request.app.db_conn.execute(query, (project_id, user_id)).fetchone() | |
if not project: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail="Project not found or you don't have access to it" | |
) | |
return project | |
# Routes | |
async def create_journal( | |
request: Request, | |
journal: JournalCreate, | |
current_user = Depends(get_current_user) | |
): | |
try: | |
# Check if project exists and belongs to user | |
await check_project_ownership(request, journal.project_id, current_user[0]) | |
# Insert the new journal | |
insert_query = """ | |
INSERT INTO journals (project_id, title, content) | |
VALUES (?, ?, ?) | |
""" | |
request.app.db_conn.execute( | |
insert_query, | |
(journal.project_id, journal.title, journal.content) | |
) | |
request.app.db_conn.commit() | |
# Get the newly created journal | |
new_journal = request.app.db_conn.execute( | |
"SELECT * FROM journals WHERE project_id = ? ORDER BY id DESC LIMIT 1", | |
(journal.project_id,) | |
).fetchone() | |
return { | |
"id": new_journal[0], | |
"project_id": new_journal[1], | |
"title": new_journal[2], | |
"content": new_journal[3], | |
"created_at": new_journal[4], | |
"updated_at": new_journal[5] | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=str(e) | |
) | |
async def get_journals_by_project( | |
request: Request, | |
project_id: int, | |
current_user = Depends(get_current_user), | |
skip: int = 0, | |
limit: int = 100 | |
): | |
try: | |
# Check if project exists and belongs to user | |
await check_project_ownership(request, project_id, current_user[0]) | |
# Get journals for the project | |
query = """ | |
SELECT * FROM journals | |
WHERE project_id = ? | |
ORDER BY updated_at DESC | |
LIMIT ? OFFSET ? | |
""" | |
journals = request.app.db_conn.execute( | |
query, | |
(project_id, limit, skip) | |
).fetchall() | |
result = [] | |
for journal in journals: | |
result.append({ | |
"id": journal[0], | |
"project_id": journal[1], | |
"title": journal[2], | |
"content": journal[3], | |
"created_at": journal[4], | |
"updated_at": journal[5] | |
}) | |
return result | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=str(e) | |
) | |
async def get_journal( | |
request: Request, | |
journal_id: int, | |
current_user = Depends(get_current_user) | |
): | |
try: | |
# Get the journal | |
query = "SELECT * FROM journals WHERE id = ?" | |
journal = request.app.db_conn.execute(query, (journal_id,)).fetchone() | |
if not journal: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail="Journal not found" | |
) | |
# Check if the journal's project belongs to the user | |
await check_project_ownership(request, journal[1], current_user[0]) | |
return { | |
"id": journal[0], | |
"project_id": journal[1], | |
"title": journal[2], | |
"content": journal[3], | |
"created_at": journal[4], | |
"updated_at": journal[5] | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=str(e) | |
) | |
async def update_journal( | |
request: Request, | |
journal_id: int, | |
journal_update: JournalUpdate, | |
current_user = Depends(get_current_user) | |
): | |
try: | |
# Get the journal | |
query = "SELECT * FROM journals WHERE id = ?" | |
journal = request.app.db_conn.execute(query, (journal_id,)).fetchone() | |
if not journal: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail="Journal not found" | |
) | |
# Check if the journal's project belongs to the user | |
await check_project_ownership(request, journal[1], current_user[0]) | |
# Prepare update fields | |
update_fields = [] | |
params = [] | |
if journal_update.title is not None: | |
update_fields.append("title = ?") | |
params.append(journal_update.title) | |
if journal_update.content is not None: | |
update_fields.append("content = ?") | |
params.append(journal_update.content) | |
# Add updated_at field | |
update_fields.append("updated_at = CURRENT_TIMESTAMP") | |
# If no fields to update, return the existing journal | |
if not params: | |
return { | |
"id": journal[0], | |
"project_id": journal[1], | |
"title": journal[2], | |
"content": journal[3], | |
"created_at": journal[4], | |
"updated_at": journal[5] | |
} | |
# Update the journal | |
update_query = f""" | |
UPDATE journals | |
SET {', '.join(update_fields)} | |
WHERE id = ? | |
""" | |
params.append(journal_id) | |
request.app.db_conn.execute(update_query, params) | |
request.app.db_conn.commit() | |
# Get the updated journal | |
updated_journal = request.app.db_conn.execute( | |
"SELECT * FROM journals WHERE id = ?", | |
(journal_id,) | |
).fetchone() | |
return { | |
"id": updated_journal[0], | |
"project_id": updated_journal[1], | |
"title": updated_journal[2], | |
"content": updated_journal[3], | |
"created_at": updated_journal[4], | |
"updated_at": updated_journal[5] | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=str(e) | |
) | |
async def delete_journal( | |
request: Request, | |
journal_id: int, | |
current_user = Depends(get_current_user) | |
): | |
try: | |
# Get the journal | |
query = "SELECT * FROM journals WHERE id = ?" | |
journal = request.app.db_conn.execute(query, (journal_id,)).fetchone() | |
if not journal: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail="Journal not found" | |
) | |
# Check if the journal's project belongs to the user | |
await check_project_ownership(request, journal[1], current_user[0]) | |
# Delete the journal | |
delete_query = "DELETE FROM journals WHERE id = ?" | |
request.app.db_conn.execute(delete_query, (journal_id,)) | |
request.app.db_conn.commit() | |
return None | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=str(e) | |
) | |