import gradio as gr import gspread import shutil import os from datetime import datetime from google.cloud import storage import os import shutil import threading from google.cloud import speech # Set the environment variable for your service account JSON os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "story_legacy_service_account.json" # GCS Bucket Configuration GCS_BUCKET_NAME = "userrecordings" # Google Sheets API setup SHEET_URL = "https://docs.google.com/spreadsheets/d/1ZlO_YQyFV6HsZH6hWIEtCw4VHxa7NXKlRULG_2Dkyao/edit" PROJECT_CODE = "P2401" # Hardcoded project code def fetch_sheet_data(sheet_url): gc = gspread.service_account(filename="story_legacy_service_account.json") sh = gc.open_by_url(sheet_url) worksheet = sh.sheet1 data = worksheet.get_all_records() return data # Load Google Sheet data data = fetch_sheet_data(SHEET_URL) # Global variables to track the state current_project = None current_prompt_index = 0 prompts = [] recording_status = [] responses = [] # Load the project using the hardcoded project code current_project = [row for row in data if row["Project Code"] == PROJECT_CODE] if current_project: current_project = current_project[0] prompts = [current_project[f"Prompt{i+1}"] for i in range(4) if current_project.get(f"Prompt{i+1}")] recording_status = [False] * len(prompts) # Initialize recording status for each prompt responses = [None] * len(prompts) else: prompts = [] custom_html=""" """ # Function to transcribe audio files and save them locally def transcribe_audio(local_file_path): """Transcribes audio data from a Google Cloud Storage bucket.""" with open(local_file_path, "rb") as audio_file: content = audio_file.read() client = speech.SpeechClient() audio = speech.RecognitionAudio(content=content) config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=44100, # Adjust if needed language_code="en-US", ) response = client.recognize(config=config, audio=audio) return response.results[0].alternatives[0].transcript if response.results else "No transcription available." # function to upload responses and generate transcripts def upload_to_gcs_and_cleanup(): try: # Initialize GCS client client = storage.Client(project="story-legacy-442314") bucket = client.bucket(GCS_BUCKET_NAME) # Walk through the responses directory for root, _, files in os.walk(f"responses/{PROJECT_CODE}"): for file in files: local_path = os.path.join(root, file) # Define the relative path for GCS relative_path = os.path.relpath(local_path, f"responses/{PROJECT_CODE}") gcs_path = f"{PROJECT_CODE}/responses/{relative_path}" # Upload the file to GCS blob = bucket.blob(gcs_path) blob.upload_from_filename(local_path) print(f"Uploaded {local_path} to {gcs_path}") # Generate the transcript transcript_text = transcribe_audio(local_path) # Save the transcript in GCS under the transcripts folder with the new structure transcript_relative_path = relative_path.replace("responses/", "") # Remove "responses/" from path transcript_gcs_path = f"{PROJECT_CODE}/transcripts/{transcript_relative_path.replace(os.path.splitext(file)[1], '.txt')}" transcript_blob = bucket.blob(transcript_gcs_path) transcript_blob.upload_from_string(transcript_text) print(f"Transcript uploaded to {transcript_gcs_path}") # Delete the local response file os.remove(local_path) # Remove the responses directory after successful upload response_dir = f"responses/{PROJECT_CODE}" if os.path.exists(response_dir): shutil.rmtree(response_dir) print(f"Local directory {response_dir} has been deleted.") return True, "All responses and transcripts have been uploaded to GCS, and local files deleted." except Exception as e: print(f"Error during GCS upload: {e}") return False, f"Error during GCS upload: {str(e)}" # Background task function def async_upload_to_gcs_and_cleanup(): success, message = upload_to_gcs_and_cleanup() print(message) def submit_audio(audio_path): global current_prompt_index print(f"Received audio_path: {audio_path}") # Debugging line to check the value # Check if the response has already been submitted if recording_status[current_prompt_index]: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=f"""
⚠️ Response for Prompt {current_prompt_index + 1} has already been submitted.
""", visible=True), ) # Check if no audio is recorded if audio_path is None: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value="", visible=True), ) try: # Ensure the directory exists and move the file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") folder_path = f"responses/{PROJECT_CODE}/Prompt{current_prompt_index + 1}" os.makedirs(folder_path, exist_ok=True) file_path = f"{folder_path}/response_{timestamp}.wav" shutil.move(audio_path, file_path) # Update recording status and responses recording_status[current_prompt_index] = True responses[current_prompt_index] = file_path return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=f"""
Response for Prompt {current_prompt_index + 1} has been submitted successfully!
""", visible=True), ) except Exception as e: print(f"Error occurred during submission: {e}") # Debugging line for exception return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=f"""
⚠️ An error occurred during submission: {str(e)}
""", visible=True), ) def save_and_next(): global current_prompt_index if not recording_status[current_prompt_index]: return ( f"""
Prompt {current_prompt_index + 1} of {len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=f"""
⚠️ Please record and submit your response before moving to the next prompt.
""", visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), ) if current_prompt_index < len(prompts) - 1: current_prompt_index += 1 is_last_prompt = current_prompt_index == len(prompts) - 1 next_button_text = "Finish" if is_last_prompt else f"Next Prompt ({current_prompt_index + 2} of {len(prompts)})" return ( f"""
Prompt {current_prompt_index + 1} of {len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value="", visible=False), # Clear the alert message gr.update(value=None, visible=True), gr.update(value=next_button_text, visible=True), gr.update(visible=True), ) else: # "Finish" button clicked threading.Thread(target=async_upload_to_gcs_and_cleanup, daemon=True).start() # print(upload_to_gcs_and_cleanup()) # alert_class = "success-alert" if success else "fail-alert" return ( "Thank you for your participation in this research, all of your stories have been submitted. We place great value on your stories and will treat them with the respect they deserve.", gr.update(value="", visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), ) def erase_and_record(): global current_prompt_index # Check if a response exists for the current prompt if responses[current_prompt_index]: # Delete the file try: os.remove(responses[current_prompt_index]) except FileNotFoundError: pass # Ignore if the file doesn't exist responses[current_prompt_index] = None # Reset the response recording_status[current_prompt_index] = False # Reset recording status # Return updated prompt and alert message return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(value=None, visible=True), # Reset the audio input gr.update(value=f"""
Your previous response has been erased. Please re-record your response.
""", visible=True), # Updated alert message ) else: return ( f"""
Prompt {current_prompt_index + 1}/{len(prompts)}:
{prompts[current_prompt_index]}
""", gr.update(visible=True), # Reset the audio input gr.update(value=f"""
⚠️ Please record and submit your response first.
""", visible=True), # Updated alert message ) # Gradio Interface def gradio_app(): # reset_state() with gr.Blocks(css=custom_html) as demo: with gr.Row(): gr.Markdown("""
StoryLegacy
""",elem_classes=["story-legacy-heading"]) gr.Markdown(f"""
Project: {PROJECT_CODE}
""") with gr.Row(): alert_box = gr.Markdown("", elem_id="alert-box", visible=False) with gr.Row(): prompt_display = gr.Markdown( f"""
Prompt {current_prompt_index + 1} of {len(prompts)}:
{prompts[0] if prompts else 'No prompts available.'}
", """,elem_classes=["gr-prompt"] ) with gr.Row(): # Create a hidden label to dynamically update microphone status record_audio = gr.Audio( sources="microphone", type="filepath", label="Record your response", elem_classes=["gr-audio"] ) with gr.Row(): erase_button = gr.Button("Erase & Re - Record Current Prompt", elem_classes=["gr-erase_button"]) save_next_button = gr.Button(f"Next Prompt (2 of {len(prompts)})", elem_classes=["gr-next_button"]) record_audio.change(submit_audio, inputs=record_audio, outputs=[prompt_display,alert_box]) save_next_button.click( save_and_next, inputs=[], outputs=[ prompt_display, alert_box, record_audio, save_next_button, erase_button, ], ) erase_button.click( erase_and_record, inputs=[], outputs=[ prompt_display, record_audio, alert_box, ], ) @demo.load def initialize_state(): """Initialize or reset the app's state when reloaded.""" global current_project, current_prompt_index, prompts, recording_status, responses, custom_html # Load the Google Sheet data data = fetch_sheet_data(SHEET_URL) # Load the project using the hardcoded project code current_project = [row for row in data if row["Project Code"] == PROJECT_CODE] if current_project: current_project = current_project[0] prompts = [current_project[f"Prompt{i+1}"] for i in range(4) if current_project.get(f"Prompt{i+1}")] recording_status = [False] * len(prompts) responses = [None] * len(prompts) else: prompts = [] # Reset the current prompt index current_prompt_index = 0 print("State has been initialized.") return demo # Run the app # reset_state() app = gradio_app() app.launch()