Spaces:
Running
Running
from datetime import datetime | |
from db.schema import Feedback, Response | |
from db.setup import db_setup | |
# Create operation (Ingest data into Firebase with unique user_id check) | |
def ingest(data: Feedback): | |
ref = db_setup() # Ensure Firebase is initialized | |
print(f"Attempting to ingest feedback data from user '{data.user_id}'...") | |
# Check if the user_id already exists in the feedback data | |
# user_id_str = str(data.user_id) | |
# existing_feedback = ref.child('feedback').order_by_child('user_id').equal_to(user_id_str).get() | |
# | |
# | |
# # TODO: This should probably change. If the same user has multiple feedbacks, we should allow it. -> change to update | |
# if existing_feedback: | |
# print(f"Feedback from user '{data.user_id}' already exists. Ingestion aborted.") | |
# else: | |
# # feedback_data = data.dict() # Convert Pydantic model to a dictionary | |
feedback_data = data.model_dump() | |
feedback_data["time_stamp"] = feedback_data['time_stamp'].isoformat() | |
ref.child('feedback').push(feedback_data) | |
print(f"Feedback data from user '{data.user_id}' pushed to Firebase!") | |
# Read operation (Fetch feedback data from Firebase) | |
def read(user_id: str): | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get() | |
if feedback_ref: | |
return feedback_ref | |
else: | |
print("Feedback not found!") | |
return None | |
# Update operation (Update feedback data in Firebase) | |
def update(feedback_id: int, updated_data: Feedback): | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
if feedback_ref: | |
# Assuming we're updating the first entry found | |
for key in feedback_ref: | |
ref.child('feedback').child(key).update(updated_data.dict()) | |
print("Feedback data updated in Firebase!") | |
else: | |
print("Feedback not found to update!") | |
# Delete operation (Remove feedback data from Firebase) | |
def delete(feedback_id: int): | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
if feedback_ref: | |
# Assuming we're deleting the first entry found | |
for key in feedback_ref: | |
ref.child('feedback').child(key).delete() | |
print("Feedback data deleted from Firebase!") | |
else: | |
print("Feedback not found to delete!") | |
def test(): | |
# Create a feedback object | |
# feedback = Feedback( | |
# id=2, | |
# user_id='tt', | |
# time_stamp=datetime(2025, 1, 30, 21, 25, 15, 581994), | |
# responses=[ | |
# Response(config_id='c_p_0_pop_low_easy', rating_v=1, rating_p0=1, rating_p1=1, comment='', | |
# timestamp='2025-01-30T21:25:12.642038'), | |
# Response(config_id='c_p_1_pop_medium_medium', rating_v=1, rating_p0=1, rating_p1=1, comment='', | |
# timestamp='2025-01-30T21:25:13.854227'), | |
# Response(config_id='c_p_2_pop_high_hard', rating_v=1, rating_p0=1, rating_p1=1, comment='', | |
# timestamp='2025-01-30T21:25:15.581948'), | |
# ] | |
# ) | |
# # Create (Ingest) | |
# ingest(feedback) | |
# Read (Fetch) | |
feedback_data = read("cdSf") | |
if feedback_data: | |
print(feedback_data) | |
# Update (Modify) | |
# updated_feedback = Feedback( | |
# id=1, | |
# user_id="user123", | |
# time_stamp=datetime.now(), | |
# responses=[ | |
# {"q_id": "q1", "ans": 4}, # Updated answer | |
# {"q_id": "q2", "ans": 3} | |
# ] | |
# ) | |
# update(1, updated_feedback) | |
# | |
# # Delete (Remove) | |
# delete(1) | |