Spaces:
Running
Running
File size: 2,973 Bytes
9a6b7dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
from datetime import datetime
from db.schema import Feedback
from db.setup import db_setup
# Create operation (Ingest data into Firebase with unique user_id check)
def ingest(data: Feedback):
ref = db_setup() # Ensure Firebase is initialized
# Check if the user_id already exists in the feedback data
existing_feedback = ref.child('feedback').order_by_child('user_id').equal_to(data.user_id).get()
# TODO: This should probably change. If the same user has multiple feedbacks, we should allow it. -> change to update
if existing_feedback:
print(f"Feedback from user '{data.user_id}' already exists. Ingestion aborted.")
else:
feedback_data = data.dict() # Convert Pydantic model to a dictionary
ref.child('feedback').push(feedback_data)
print(f"Feedback data from user '{data.user_id}' pushed to Firebase!")
# Read operation (Fetch feedback data from Firebase)
def read(feedback_id: str):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
return feedback_ref
else:
print("Feedback not found!")
return None
# Update operation (Update feedback data in Firebase)
def update(feedback_id: int, updated_data: Feedback):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're updating the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).update(updated_data.dict())
print("Feedback data updated in Firebase!")
else:
print("Feedback not found to update!")
# Delete operation (Remove feedback data from Firebase)
def delete(feedback_id: int):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're deleting the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).delete()
print("Feedback data deleted from Firebase!")
else:
print("Feedback not found to delete!")
def test():
# Create a feedback object
feedback_example = Feedback(
id=1,
user_id="user1234",
time_stamp=datetime.now(),
responses=[
{"q_id": "q1", "ans": 5},
{"q_id": "q2", "ans": 3}
]
)
# Create (Ingest)
ingest(feedback_example)
# Read (Fetch)
feedback_data = read(1)
if feedback_data:
print(feedback_data)
# Update (Modify)
updated_feedback = Feedback(
id=1,
user_id="user123",
time_stamp=datetime.now(),
responses=[
{"q_id": "q1", "ans": 4}, # Updated answer
{"q_id": "q2", "ans": 3}
]
)
update(1, updated_feedback)
# Delete (Remove)
delete(1)
# Example usage
if __name__ == "__main__":
test()
|