Spaces:
Sleeping
Sleeping
import gradio as gr | |
import gspread | |
import shutil | |
import os | |
from datetime import datetime | |
from google.cloud import storage | |
import os | |
import shutil | |
import threading | |
from google.cloud import speech | |
# Set the environment variable for your service account JSON | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "story_legacy_service_account.json" | |
# GCS Bucket Configuration | |
GCS_BUCKET_NAME = "userrecordings" | |
# Google Sheets API setup | |
SHEET_URL = "https://docs.google.com/spreadsheets/d/1ZlO_YQyFV6HsZH6hWIEtCw4VHxa7NXKlRULG_2Dkyao/edit" | |
PROJECT_CODE = "P2401" # Hardcoded project code | |
def fetch_sheet_data(sheet_url): | |
gc = gspread.service_account(filename="story_legacy_service_account.json") | |
sh = gc.open_by_url(sheet_url) | |
worksheet = sh.sheet1 | |
data = worksheet.get_all_records() | |
return data | |
# Load Google Sheet data | |
data = fetch_sheet_data(SHEET_URL) | |
# Helper function to initialize a new session state | |
def initialize_session_state(): | |
current_project = [row for row in data if row["Project Code"] == PROJECT_CODE] | |
if current_project: | |
current_project = current_project[0] | |
prompts = [ | |
current_project[f"Prompt{i+1}"] for i in range(4) if current_project.get(f"Prompt{i+1}") | |
] | |
recording_status = [False] * len(prompts) | |
responses = [None] * len(prompts) | |
else: | |
prompts = [] | |
recording_status = [] | |
responses = [] | |
return { | |
"current_prompt_index": 0, | |
"prompts": prompts, | |
"recording_status": recording_status, | |
"responses": responses, | |
} | |
def submit_audio(audio_path, state): | |
current_prompt_index = state["current_prompt_index"] | |
prompts = state["prompts"] | |
recording_status = state["recording_status"] | |
responses = state["responses"] | |
# Check if response for current prompt is already recorded | |
if recording_status[current_prompt_index]: | |
return ( | |
f"""<div class ="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value=responses[current_prompt_index]), # Show already recorded response | |
gr.update(value=f"""<div class="success-alert"> Response for Prompt {current_prompt_index + 1} has been recorded.</div>""", visible=True), | |
state, | |
) | |
# Ensure audio is available before saving | |
if audio_path is None: | |
return ( | |
f"""<div class ="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value=None), # Reset the audio input | |
gr.update(value="", visible=True), | |
state, | |
) | |
try: | |
# Ensure the directory exists and move the file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
folder_path = f"responses/{PROJECT_CODE}/Prompt{current_prompt_index + 1}" | |
os.makedirs(folder_path, exist_ok=True) | |
file_path = f"{folder_path}/response_{timestamp}.wav" | |
shutil.move(audio_path, file_path) | |
# Update recording status and responses | |
recording_status[current_prompt_index] = True | |
responses[current_prompt_index] = file_path | |
# Update state | |
state["recording_status"] = recording_status | |
state["responses"] = responses | |
return ( | |
f"""<div class ="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value=file_path), # Show the saved response for playback | |
gr.update(value=f"""<div class="success-alert">Response for Prompt {current_prompt_index + 1} has been submitted successfully!</div>""", visible=True), | |
state, | |
) | |
except Exception as e: | |
print(f"Error occurred during submission: {e}") | |
return ( | |
f"""<div class="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value=None), # Reset the audio input | |
gr.update(value=f"""<div class="fail-alert">⚠️ An error occurred during submission: {str(e)}</div>""", visible=True), | |
state, | |
) | |
def erase_and_record(state): | |
current_prompt_index = state["current_prompt_index"] | |
prompts = state["prompts"] | |
recording_status = state["recording_status"] | |
responses = state["responses"] | |
# Check if a response exists for the current prompt | |
if responses[current_prompt_index]: | |
try: | |
# Delete the recorded file | |
os.remove(responses[current_prompt_index]) | |
except FileNotFoundError: | |
pass # Ignore if the file doesn't exist | |
# Reset the response and recording status | |
responses[current_prompt_index] = None | |
recording_status[current_prompt_index] = False | |
# Update state | |
state["recording_status"] = recording_status | |
state["responses"] = responses | |
return ( | |
f"""<div class="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value=None, visible=True), # Reset the audio input | |
gr.update(value=f"""<div class="success-alert">Your previous response has been erased. Please re-record your response.</div>""", visible=True), | |
state, | |
) | |
else: | |
return ( | |
f"""<div class="prompt-class">Prompt {current_prompt_index + 1}/{len(prompts)}:</div> | |
<div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(visible=True), # Reset the audio input | |
gr.update(value=f"""<div class="warning-alert">⚠️ Please record your response first.</div>""", visible=True), | |
state, | |
) | |
def save_and_next(state): | |
current_prompt_index = state["current_prompt_index"] | |
prompts = state["prompts"] | |
recording_status = state["recording_status"] | |
# Check if the response for the current prompt is recorded | |
if not recording_status[current_prompt_index]: | |
return ( | |
f"""<div class ="prompt-class">Prompt {current_prompt_index + 1} of {len(prompts)}:</div><div class="prompt-text">{prompts[current_prompt_index]}</div>""", | |
gr.update(value=f"""<div class="warning-alert">⚠️ Please record your response before moving to the next prompt.</div>""", visible=True), | |
gr.update(value=None, visible=True), # Audio input is still visible | |
gr.update(value=f"Next Prompt ({current_prompt_index + 2} of {len(prompts)})", visible=True), # Update button text | |
gr.update(visible=True), # Erase button should still be visible | |
state, | |
) | |
# Increment prompt index if not the last prompt | |
if current_prompt_index < len(prompts) - 1: | |
current_prompt_index += 1 | |
state["current_prompt_index"] = current_prompt_index # Update state with new prompt index | |
is_last_prompt = current_prompt_index == len(prompts) - 1 | |
next_button_text = "Finish" if is_last_prompt else f"Next Prompt ({current_prompt_index + 2} of {len(prompts)})" | |
return ( | |
f"""<div class ="prompt-class">Prompt {current_prompt_index + 1} of {len(prompts)}:</div><div class="prompt-text"> {prompts[current_prompt_index]}</div>""", | |
gr.update(value="", visible=False), # Clear the alert message | |
gr.update(value=None, visible=True), # Show the audio input for the next prompt | |
gr.update(value=next_button_text, visible=True), # Update button text | |
gr.update(visible=True), # Keep the Erase button visible until the last prompt | |
state, | |
) | |
else: | |
# "Finish" button clicked | |
threading.Thread(target=async_upload_to_gcs_and_cleanup, daemon=True).start() | |
# Hide the "Next" button and "Erase" button after Finish | |
return ( | |
"Thank you for your participation in this research, all of your stories have been submitted. We place great value on your stories and will treat them with the respect they deserve.", | |
gr.update(value="", visible=True), | |
gr.update(visible=False), # Hide the audio input | |
gr.update(visible=False), # Hide the "Next" button | |
gr.update(visible=False), # Hide the "Erase" button | |
state, | |
) | |
GCS_BUCKET_NAME = "userrecordings" | |
# function to upload responses and generate transcripts | |
def upload_to_gcs_and_cleanup(): | |
try: | |
# Initialize GCS client | |
client = storage.Client(project="story-legacy-442314") | |
bucket = client.bucket(GCS_BUCKET_NAME) | |
# Walk through the responses directory | |
for root, _, files in os.walk(f"responses/{PROJECT_CODE}"): | |
for file in files: | |
local_path = os.path.join(root, file) | |
# Define the relative path for GCS | |
relative_path = os.path.relpath(local_path, f"responses/{PROJECT_CODE}") | |
gcs_path = f"{PROJECT_CODE}/responses/{relative_path}" | |
# Upload the file to GCS | |
blob = bucket.blob(gcs_path) | |
blob.upload_from_filename(local_path) | |
print(f"Uploaded {local_path} to {gcs_path}") | |
# Delete the local response file | |
os.remove(local_path) | |
# Remove the responses directory after successful upload | |
response_dir = f"responses/{PROJECT_CODE}" | |
if os.path.exists(response_dir): | |
shutil.rmtree(response_dir) | |
print(f"Local directory {response_dir} has been deleted.") | |
return True, "All responses and transcripts have been uploaded to GCS, and local files deleted." | |
except Exception as e: | |
print(f"Error during GCS upload: {e}") | |
return False, f"Error during GCS upload: {str(e)}" | |
# Background task function | |
def async_upload_to_gcs_and_cleanup(): | |
success, message = upload_to_gcs_and_cleanup() | |
print(message) | |
custom_html=""" | |
<style> | |
/* Body Styles */ | |
.gr-container { | |
font-family: Arial, sans-serif !important; | |
background-color: #F5F5DC !important; | |
color: #333 !important; | |
} | |
/* Row Styles */ | |
.gr-row { | |
display: flex; | |
justify-content: center; | |
align-items: center; | |
margin: 15px 0; | |
padding: 10px; | |
width: 100%; | |
} | |
/* Audio Component Styles */ | |
.gr-audio { | |
padding: 10px 10px; | |
border-radius: 10px; | |
font-size:20px; | |
background-color: #dfdf8f | |
} | |
.gr-audio button[title="Clear"] { | |
display: none !important; | |
} | |
/* Prompt style */ | |
.gr-prompt { | |
color: #00008B; /* Deep Blue */ | |
padding: 10px 10px; | |
border-radius: 10px; | |
} | |
.gr-submit_button{ | |
background-color: #28a745; | |
color: white; | |
border: none; | |
padding: 10px 20px; | |
border-radius: 20px; | |
font-size: 20px; | |
cursor: pointer; | |
margin: 5px; | |
} | |
/* Erase button */ | |
.gr-erase_button { | |
background-color: #EE4B2B; | |
color: white; | |
border: none; | |
padding: 10px 20px; | |
border-radius: 20px; | |
font-size: 20px; | |
cursor: pointer; | |
margin: 5px; | |
} | |
.gr-erase_button:hover { | |
background-color: #A52A2A; | |
} | |
/* NExt button */ | |
.gr-next_button { | |
background-color: #28a745; | |
color: white; | |
border: none; | |
padding: 10px 20px; | |
border-radius: 20px; | |
font-size: 20px; | |
cursor: pointer; | |
margin: 5px; | |
} | |
.gr-next_button:hover { | |
background-color: #218838; | |
} | |
.warning-alert { | |
font-size: 16px; | |
color: black; | |
margin-top: 2px; | |
text-align: center; | |
font-weight: bold; | |
border: 1px solid #d9534f; | |
padding: 10px; | |
background-color: #FFED01; /* Light red background for alert */ | |
} | |
.fail-alert { | |
font-size: 16px; | |
color: black; | |
margin-top: 2px; | |
text-align: center; | |
font-weight: bold; | |
border: 1px solid #d9534f; | |
padding: 10px; | |
background-color: #FF0000; /* Light red background for alert */ | |
} | |
.success-alert { | |
font-size: 16px; | |
color: black; | |
margin-top: 2px; | |
text-align: center; | |
font-weight: bold; | |
border: 1px solid #d9534f; | |
padding: 10px; | |
background-color:#90EE90; /* Light red background for alert */ | |
} | |
.gr-story_legacy_heading { | |
font-weight: bold; | |
color:#00008B; | |
text-align: left; | |
# margin-top:-10px; | |
# margin-bottom: 10px; | |
} | |
.gr-project_code { | |
# background-color: White; | |
# font-size: 30px; | |
# font-weight: bold; | |
color:#00008B; | |
text-align: right; | |
# margin-top:-10px; | |
# margin-bottom: 10px; | |
} | |
.prompt-class { | |
font-weight: bold; | |
font-size: 20px; | |
} | |
.gr-markdown { | |
position: fixed; | |
width: 95%; | |
z-index: 1000; | |
border-radius: 5px; | |
margin-top:-10px; | |
margin-bottom: 20px; | |
} | |
.prompt-text { | |
font-size: 15px; | |
} | |
</style> | |
""" | |
# JavaScript to detect microphone access on page load | |
js_code = """ | |
async () => { | |
try { | |
const devices = await navigator.mediaDevices.enumerateDevices(); | |
const mic = devices.find(device => device.kind === 'audioinput'); | |
return mic ? "Microphone Array / Default" : "No Microphone Found"; | |
} catch (err) { | |
return "No Microphone Found"; | |
} | |
} | |
""" | |
def gradio_app(): | |
with gr.Blocks(theme="allenai/gradio-theme",css = custom_html) as demo: | |
# Initialize session state | |
state = gr.State(initialize_session_state()) | |
with gr.Row(): | |
with gr.Blocks(theme="allenai/gradio-theme",css = custom_html): | |
gr.Markdown( | |
"""# StoryLegacy""", elem_classes=["gr-story_legacy_heading"] | |
) | |
gr.Markdown( | |
f"""# Project: {PROJECT_CODE}""", elem_classes=["gr-project_code"] | |
) | |
with gr.Row(): | |
alert_box = gr.Markdown("", elem_id="alert-box", visible=False) | |
with gr.Row(): | |
prompt_display = gr.Markdown( | |
f"""<div class ="prompt-class">Prompt 1 of {len(state.value['prompts'])}:</div> | |
<div class="prompt-text">{state.value['prompts'][0] if state.value['prompts'] else 'No prompts available.'}</div>""", | |
elem_classes=["gr-prompt"], | |
) | |
with gr.Row(): | |
# Create a hidden label to dynamically update microphone status | |
record_audio = gr.Audio( | |
sources="microphone", | |
type="filepath", | |
label="Record your response", | |
elem_classes=["gr-audio"], | |
) | |
demo.load(js=js_code, inputs=[], outputs=[record_audio]) | |
with gr.Row(): | |
erase_button = gr.Button("Erase & Re-record Current Prompt", elem_classes=["gr-erase_button"]) | |
save_next_button = gr.Button(f"Next Prompt (2 of {len(state.value['prompts'])})", elem_classes=["gr-next_button"]) | |
record_audio.stop_recording( | |
submit_audio, inputs=[record_audio, state], outputs=[prompt_display, record_audio, alert_box, state] | |
) | |
erase_button.click( | |
erase_and_record, inputs=state, outputs=[prompt_display, record_audio, alert_box, state] | |
) | |
save_next_button.click( | |
save_and_next, inputs=state, outputs=[prompt_display, alert_box, record_audio, save_next_button,erase_button, state] | |
) | |
return demo | |
app = gradio_app() | |
app.launch(share=True) | |