Spaces:
Running
Running
import os | |
import uuid | |
import json | |
import numpy as np | |
import gradio as gr | |
import soundfile as sf | |
import xxhash | |
from huggingface_hub import upload_file, HfApi | |
from dotenv import load_dotenv | |
from datasets import Audio | |
# Load environment variables | |
load_dotenv() | |
# Ensure the outputs directory exists | |
os.makedirs("outputs", exist_ok=True) | |
# Initialize Hugging Face API client | |
hf_api = HfApi(token=os.getenv("HF_TOKEN")) | |
DATASET_REPO = "alisartazkhan/audioLLM_judge" | |
CATEGORY = "pilot_tempo_control7" | |
MAX_RECORDINGS = 5 # Number of prompts to record | |
COMPLETION_CODE = "CEO4RWQ6" | |
resampler = Audio(sampling_rate=16_000) | |
# ====== MODIFY THIS SECTION TO CHANGE INSTRUCTIONS AND PROMPT ====== | |
# Instructions for the user | |
USER_INSTRUCTIONS = """ | |
## Recording Instructions: | |
Please record yourself reading your instruction clearly and naturally, speaking into the microphone in a quiet environment. | |
""" | |
# The prompt that users will record | |
RECORDING_PROMPT = "" | |
# ================================================================ | |
# Create a JSON database to track uploads | |
class UploadTracker: | |
def __init__(self, filename="recording_tracker.json"): | |
self.filename = filename | |
self.data = [] | |
# Create file if it doesn't exist | |
if not os.path.exists(filename): | |
with open(filename, "w") as f: | |
json.dump([], f) | |
else: | |
# Load existing data | |
with open(filename, "r") as f: | |
self.data = json.load(f) | |
def add_recording(self, audio_hash, filename): | |
"""Add a record of an uploaded recording""" | |
record = { | |
"prompt": RECORDING_PROMPT, | |
"audio_hash": audio_hash, | |
"filename": filename, | |
"timestamp": str(uuid.uuid4()) | |
} | |
self.data.append(record) | |
# Save to file | |
with open(self.filename, "w") as f: | |
json.dump(self.data, f, indent=2) | |
# Upload tracker file to HF | |
self.upload_tracker() | |
return record | |
def upload_tracker(self): | |
"""Upload the tracker JSON to Hugging Face""" | |
try: | |
upload_file( | |
path_or_fileobj=self.filename, | |
path_in_repo=f"{CATEGORY}/{self.filename}", | |
repo_id=DATASET_REPO, | |
repo_type="dataset", | |
token=os.getenv("HF_TOKEN") | |
) | |
print(f"Uploaded tracker file to Hugging Face") | |
return True | |
except Exception as e: | |
print(f"Error uploading tracker file: {e}") | |
return False | |
# Initialize the tracker | |
tracker = UploadTracker() | |
def upload_to_hf(local_path, repo_path): | |
"""Upload a file to the Hugging Face dataset repository""" | |
try: | |
upload_file( | |
path_or_fileobj=local_path, | |
path_in_repo=repo_path, | |
repo_id=DATASET_REPO, | |
repo_type="dataset", | |
token=os.getenv("HF_TOKEN") | |
) | |
print(f"Uploaded file: {local_path} to Hugging Face at {repo_path}") | |
return True | |
except Exception as e: | |
print(f"Error uploading file to HF: {e}") | |
return False | |
def on_submit(audio_input, recording_count): | |
"""Handle the submission of a recorded audio prompt""" | |
if audio_input is None: | |
return next_prompt(recording_count) | |
# Process the audio | |
sr, y = audio_input | |
audio_hash = xxhash.xxh32(bytes(y)).hexdigest() | |
y = y.astype(np.float32) | |
y /= np.max(np.abs(y)) if np.max(np.abs(y)) > 0 else 1.0 | |
unique_id = str(uuid.uuid4())[:8] | |
clean_prompt = RECORDING_PROMPT.replace(" ", "_").replace(".", "").replace(",", "")[:20] | |
local_filename = f"outputs/{clean_prompt}_{audio_hash}_{unique_id}.wav" | |
sf.write(local_filename, y, sr, format="wav") | |
hf_path = f"{CATEGORY}/{clean_prompt}_{audio_hash}_{unique_id}.wav" | |
upload_to_hf(local_filename, hf_path) | |
tracker.add_recording(audio_hash, hf_path) | |
# After successful upload, immediately move to the next prompt | |
return next_prompt(recording_count) | |
def next_prompt(recording_count): | |
"""Move to the next prompt""" | |
recording_count += 1 | |
# Check if we've gone through all prompts | |
if recording_count >= MAX_RECORDINGS: | |
return ( | |
gr.Markdown(f"# All recordings complete! Completion code: {COMPLETION_CODE}"), | |
gr.Markdown("## Thank you for your participation."), | |
gr.Markdown("### You have completed all recordings."), | |
gr.Audio(visible=False), | |
gr.Button(visible=False), | |
gr.Button(visible=False), | |
recording_count | |
) | |
# Display the next recording screen | |
return ( | |
gr.Markdown(f"# Recording {recording_count + 1}/{MAX_RECORDINGS}"), | |
gr.Markdown(USER_INSTRUCTIONS), | |
gr.Markdown(f"### \"{RECORDING_PROMPT}\""), | |
gr.Audio(value=None, label="Record your response", sources=["microphone"]), | |
gr.Button("Submit Recording", interactive=False), | |
gr.Button("Next Recording", visible=False), | |
recording_count | |
) | |
def enable_submit_button(audio_input): | |
"""Enable the submit button when audio is recorded""" | |
if audio_input is not None: | |
return gr.Button("Submit Recording", interactive=True) | |
return gr.Button("Submit Recording", interactive=False) | |
# Create a theme | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="indigo", | |
neutral_hue="slate", | |
) | |
# Create Gradio interface | |
with gr.Blocks(theme=theme, css="footer {visibility: hidden}") as demo: | |
recording_count = gr.State(0) | |
title = gr.Markdown(f"# Recording 1/{MAX_RECORDINGS}") | |
instructions = gr.Markdown(USER_INSTRUCTIONS) | |
prompt_text = gr.Markdown(f"### \"{RECORDING_PROMPT}\"") | |
audio_input = gr.Audio( | |
label="Record your response", | |
sources=["microphone"], | |
streaming=False | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Submit Recording", interactive=False) | |
next_btn = gr.Button("Next Recording", visible=False) | |
# Enable submit button when audio is recorded | |
audio_input.change( | |
fn=enable_submit_button, | |
inputs=[audio_input], | |
outputs=[submit_btn] | |
) | |
# Handle submission | |
submit_btn.click( | |
fn=on_submit, | |
inputs=[audio_input, recording_count], | |
outputs=[title, instructions, prompt_text, audio_input, submit_btn, next_btn, recording_count] | |
) | |
# Handle next button | |
next_btn.click( | |
fn=next_prompt, | |
inputs=[recording_count], | |
outputs=[title, instructions, prompt_text, audio_input, submit_btn, next_btn, recording_count] | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch(share=True) |