import gradio as gr import gspread import shutil import os from datetime import datetime from google.cloud import storage import os import shutil import threading from google.cloud import speech # Set the environment variable for your service account JSON os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "story_legacy_service_account.json" # GCS Bucket Configuration GCS_BUCKET_NAME = "userrecordings" # Google Sheets API setup SHEET_URL = "https://docs.google.com/spreadsheets/d/1ZlO_YQyFV6HsZH6hWIEtCw4VHxa7NXKlRULG_2Dkyao/edit" PROJECT_CODE = "P2401" # Hardcoded project code def fetch_sheet_data(sheet_url): gc = gspread.service_account(filename="story_legacy_service_account.json") sh = gc.open_by_url(sheet_url) worksheet = sh.sheet1 data = worksheet.get_all_records() return data # Load Google Sheet data data = fetch_sheet_data(SHEET_URL) # Helper function to initialize a new session state def initialize_session_state(): current_project = [row for row in data if row["Project Code"] == PROJECT_CODE] if current_project: current_project = current_project[0] prompts = [ current_project[f"Prompt{i+1}"] for i in range(4) if current_project.get(f"Prompt{i+1}") ] recording_status = [False] * len(prompts) responses = [None] * len(prompts) else: prompts = [] recording_status = [] responses = [] return { "current_prompt_index": 0, "prompts": prompts, "recording_status": recording_status, "responses": responses, } def submit_audio(audio_path, state): current_prompt_index = state["current_prompt_index"] prompts = state["prompts"] recording_status = state["recording_status"] responses = state["responses"] # Check if response for current prompt is already recorded if recording_status[current_prompt_index]: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=responses[current_prompt_index]), # Show already recorded response gr.update(value=f"""
Response for Prompt {current_prompt_index + 1} has been recorded.
""", visible=True), state, ) # Ensure audio is available before saving if audio_path is None: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=None), # Reset the audio input gr.update(value="", visible=True), state, ) try: # Ensure the directory exists and move the file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") folder_path = f"responses/{PROJECT_CODE}/Prompt{current_prompt_index + 1}" os.makedirs(folder_path, exist_ok=True) file_path = f"{folder_path}/response_{timestamp}.wav" shutil.move(audio_path, file_path) # Update recording status and responses recording_status[current_prompt_index] = True responses[current_prompt_index] = file_path # Update state state["recording_status"] = recording_status state["responses"] = responses return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=file_path), # Show the saved response for playback gr.update(value=f"""
Response for Prompt {current_prompt_index + 1} has been submitted successfully!
""", visible=True), state, ) except Exception as e: print(f"Error occurred during submission: {e}") return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=None), # Reset the audio input gr.update(value=f"""
⚠️ An error occurred during submission: {str(e)}
""", visible=True), state, ) def erase_and_record(state): current_prompt_index = state["current_prompt_index"] prompts = state["prompts"] recording_status = state["recording_status"] responses = state["responses"] # Check if a response exists for the current prompt if responses[current_prompt_index]: try: # Delete the recorded file os.remove(responses[current_prompt_index]) except FileNotFoundError: pass # Ignore if the file doesn't exist # Reset the response and recording status responses[current_prompt_index] = None recording_status[current_prompt_index] = False # Update state state["recording_status"] = recording_status state["responses"] = responses return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=None, visible=True), # Reset the audio input gr.update(value=f"""
Your previous response has been erased. Please re-record your response.
""", visible=True), state, ) else: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(visible=True), # Reset the audio input gr.update(value=f"""
⚠️ Please record your response first.
""", visible=True), state, ) def save_and_next(state): current_prompt_index = state["current_prompt_index"] prompts = state["prompts"] recording_status = state["recording_status"] # Check if the response for the current prompt is recorded if not recording_status[current_prompt_index]: return ( f"""
Prompt {current_prompt_index + 1} of {len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=f"""
⚠️ Please record your response before moving to the next prompt.
""", visible=True), gr.update(value=None, visible=True), # Audio input is still visible gr.update(value=f"Next Prompt ({current_prompt_index + 2} of {len(prompts)})", visible=True), # Update button text gr.update(visible=True), # Erase button should still be visible state, ) # Increment prompt index if not the last prompt if current_prompt_index < len(prompts) - 1: current_prompt_index += 1 state["current_prompt_index"] = current_prompt_index # Update state with new prompt index is_last_prompt = current_prompt_index == len(prompts) - 1 next_button_text = "Finish" if is_last_prompt else f"Next Prompt ({current_prompt_index + 2} of {len(prompts)})" return ( f"""
Prompt {current_prompt_index + 1} of {len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value="", visible=False), # Clear the alert message gr.update(value=None, visible=True), # Show the audio input for the next prompt gr.update(value=next_button_text, visible=True), # Update button text gr.update(visible=True), # Keep the Erase button visible until the last prompt state, ) else: # "Finish" button clicked threading.Thread(target=async_upload_to_gcs_and_cleanup, daemon=True).start() # Hide the "Next" button and "Erase" button after Finish return ( "Thank you for your participation in this research, all of your stories have been submitted. We place great value on your stories and will treat them with the respect they deserve.", gr.update(value="", visible=True), gr.update(visible=False), # Hide the audio input gr.update(visible=False), # Hide the "Next" button gr.update(visible=False), # Hide the "Erase" button state, ) GCS_BUCKET_NAME = "userrecordings" # function to upload responses and generate transcripts def upload_to_gcs_and_cleanup(): try: # Initialize GCS client client = storage.Client(project="story-legacy-442314") bucket = client.bucket(GCS_BUCKET_NAME) # Walk through the responses directory for root, _, files in os.walk(f"responses/{PROJECT_CODE}"): for file in files: local_path = os.path.join(root, file) # Define the relative path for GCS relative_path = os.path.relpath(local_path, f"responses/{PROJECT_CODE}") gcs_path = f"{PROJECT_CODE}/responses/{relative_path}" # Upload the file to GCS blob = bucket.blob(gcs_path) blob.upload_from_filename(local_path) print(f"Uploaded {local_path} to {gcs_path}") # Delete the local response file os.remove(local_path) # Remove the responses directory after successful upload response_dir = f"responses/{PROJECT_CODE}" if os.path.exists(response_dir): shutil.rmtree(response_dir) print(f"Local directory {response_dir} has been deleted.") return True, "All responses and transcripts have been uploaded to GCS, and local files deleted." except Exception as e: print(f"Error during GCS upload: {e}") return False, f"Error during GCS upload: {str(e)}" # Background task function def async_upload_to_gcs_and_cleanup(): success, message = upload_to_gcs_and_cleanup() print(message) custom_html=""" """ # JavaScript to detect microphone access on page load js_code = """ async () => { try { const devices = await navigator.mediaDevices.enumerateDevices(); const mic = devices.find(device => device.kind === 'audioinput'); return mic ? "Microphone Array / Default" : "No Microphone Found"; } catch (err) { return "No Microphone Found"; } } """ def gradio_app(): with gr.Blocks(theme="allenai/gradio-theme",css = custom_html) as demo: # Initialize session state state = gr.State(initialize_session_state()) with gr.Row(): with gr.Blocks(theme="allenai/gradio-theme",css = custom_html): gr.Markdown( """# StoryLegacy""", elem_classes=["gr-story_legacy_heading"] ) gr.Markdown( f"""# Project: {PROJECT_CODE}""", elem_classes=["gr-project_code"] ) with gr.Row(): alert_box = gr.Markdown("", elem_id="alert-box", visible=False) with gr.Row(): prompt_display = gr.Markdown( f"""
Prompt 1 of {len(state.value['prompts'])}:
{state.value['prompts'][0] if state.value['prompts'] else 'No prompts available.'}
""", elem_classes=["gr-prompt"], ) with gr.Row(): # Create a hidden label to dynamically update microphone status record_audio = gr.Audio( sources="microphone", type="filepath", label="Record your response", elem_classes=["gr-audio"], ) demo.load(js=js_code, inputs=[], outputs=[record_audio]) with gr.Row(): erase_button = gr.Button("Erase & Re-record Current Prompt", elem_classes=["gr-erase_button"]) save_next_button = gr.Button(f"Next Prompt (2 of {len(state.value['prompts'])})", elem_classes=["gr-next_button"]) record_audio.stop_recording( submit_audio, inputs=[record_audio, state], outputs=[prompt_display, record_audio, alert_box, state] ) erase_button.click( erase_and_record, inputs=state, outputs=[prompt_display, record_audio, alert_box, state] ) save_next_button.click( save_and_next, inputs=state, outputs=[prompt_display, alert_box, record_audio, save_next_button,erase_button, state] ) return demo app = gradio_app() app.launch(share=True)