user-feedback / db /crud.py
Ashmi Banerjee
added next button on the intro screen
e4a1a2b
raw
history blame
3.66 kB
from datetime import datetime
from db.schema import Feedback, Response
from db.setup import db_setup
# Create operation (Ingest data into Firebase with unique user_id check)
def ingest(data: Feedback):
ref = db_setup() # Ensure Firebase is initialized
print(f"Attempting to ingest feedback data from user '{data.user_id}'...")
# Check if the user_id already exists in the feedback data
# user_id_str = str(data.user_id)
# existing_feedback = ref.child('feedback').order_by_child('user_id').equal_to(user_id_str).get()
#
#
# # TODO: This should probably change. If the same user has multiple feedbacks, we should allow it. -> change to update
# if existing_feedback:
# print(f"Feedback from user '{data.user_id}' already exists. Ingestion aborted.")
# else:
# # feedback_data = data.dict() # Convert Pydantic model to a dictionary
feedback_data = data.model_dump()
feedback_data["time_stamp"] = feedback_data['time_stamp'].isoformat()
ref.child('feedback').push(feedback_data)
print(f"Feedback data from user '{data.user_id}' pushed to Firebase!")
# Read operation (Fetch feedback data from Firebase)
def read(user_id: str):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get()
if feedback_ref:
return feedback_ref
else:
print("Feedback not found!")
return None
# Update operation (Update feedback data in Firebase)
def update(feedback_id: int, updated_data: Feedback):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're updating the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).update(updated_data.dict())
print("Feedback data updated in Firebase!")
else:
print("Feedback not found to update!")
# Delete operation (Remove feedback data from Firebase)
def delete(feedback_id: int):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're deleting the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).delete()
print("Feedback data deleted from Firebase!")
else:
print("Feedback not found to delete!")
def test():
# Create a feedback object
# feedback = Feedback(
# id=2,
# user_id='tt',
# time_stamp=datetime(2025, 1, 30, 21, 25, 15, 581994),
# responses=[
# Response(config_id='c_p_0_pop_low_easy', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:12.642038'),
# Response(config_id='c_p_1_pop_medium_medium', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:13.854227'),
# Response(config_id='c_p_2_pop_high_hard', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:15.581948'),
# ]
# )
# # Create (Ingest)
# ingest(feedback)
# Read (Fetch)
feedback_data = read("cdSf")
if feedback_data:
print(feedback_data)
# Update (Modify)
# updated_feedback = Feedback(
# id=1,
# user_id="user123",
# time_stamp=datetime.now(),
# responses=[
# {"q_id": "q1", "ans": 4}, # Updated answer
# {"q_id": "q2", "ans": 3}
# ]
# )
# update(1, updated_feedback)
#
# # Delete (Remove)
# delete(1)