Spaces:
Running
Running
File size: 2,935 Bytes
a7c2cd2 32c2587 61186ad 4a7815d 38695d8 32c2587 61186ad 16eaf19 61186ad 17145bf 6d086e2 9c8bc16 d6f376b 9c8bc16 738289f 9c8bc16 e944263 3434d5c 738289f 3434d5c 9c8bc16 501ba98 38bfe69 d6f376b 738289f d6f376b 738289f 6f0cdb7 9c8bc16 738289f 56cabc7 9c8bc16 17145bf 9c8bc16 17145bf 30c85ad 61186ad 30c85ad 32c2587 a7c2cd2 2b205c2 4db9fc0 32c2587 6020a54 16eaf19 61186ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import os
from queue import Queue
import gradio as gr
import argilla as rg
from argilla.webhooks import webhook_listener
client = rg.Argilla(
api_url=os.getenv("ARGILLA_API_URL"),
api_key=os.getenv("ARGILLA_API_KEY"),
)
server = rg.get_webhook_server()
incoming_events = Queue()
@webhook_listener(events=["response.created"])
async def update_validation_space_on_answer(response, type, timestamp):
"""
Webhook listener that triggers when a new response is added to an answering space.
It will automatically update the corresponding validation space with the new response.
"""
try:
incoming_events.put({"event": type, "timestamp": str(timestamp)})
record = response.record
original_user_id = str(response.user_id)
dataset_name = record.dataset.name
if not dataset_name.endswith("Responder"):
print(f"Ignoring event from non-answering dataset: {dataset_name}")
return
print(f"Processing response for dataset: {dataset_name}")
validation_dataset_name = dataset_name.replace("Responder", "Validar")
try:
validation_dataset = client.datasets(validation_dataset_name)
print(f"Found validation dataset: {validation_dataset_name}")
except Exception as e:
print(f"Error connecting to validation dataset: {e}")
response_dict = response.to_dict()
answers = [response_dict["values"]['answer_1']['value']]
if "answer_2" in response_dict["values"]:
answers.append(response_dict["values"]['answer_2']['value'])
if "answer_3" in response_dict["values"]:
answers.append(response_dict["values"]['answer_3']['value'])
new_records = []
for answer in answers:
if answer:
validation_record = {
"question": record.fields["question"],
"answer": answer,
"metadata": {
"original_responder_id": original_user_id,
"original_dataset": dataset_name
}
}
new_records.append(validation_record)
validation_dataset.records.log(records=new_records)
print(f"Added new response to {update_validation_space_on_answer}")
except Exception as e:
print(f"Error in webhook handler: {e}")
incoming_events.put({"event": "error", "error": str(e)})
def read_next_event():
if not incoming_events.empty():
return incoming_events.get()
return {}
with gr.Blocks() as demo:
argilla_server = client.http_client.base_url
gr.Markdown("# Argilla Webhooks - Top Secret")
gr.Timer(1, active=True).tick(read_next_event)
gr.mount_gradio_app(server, demo, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(server, host="0.0.0.0", port=7860) |