import requests import os from scipy.io.wavfile import write import gradio as gr import numpy as np import uuid #import boto3 import datetime import time # access_key = os.environ.get('access_key', None) # secret_access_key = os.environ.get('secret_access_key', None) # session = boto3.Session( # aws_access_key_id=access_key, # aws_secret_access_key=secret_access_key, # ) # s3 = session.resource('s3') # BUCKET = "audio-text-938" token_hf = os.environ.get('token_hf', None) API_URL = "https://tfugbov5t776omzd.us-east-1.aws.endpoints.huggingface.cloud" headers = { "Accept" : "application/json", "Authorization": f"Bearer {token_hf}", "Content-Type": "audio/wav" } def query(data): with open("test.wav", "rb") as f: ndata = f.read() response = requests.post(API_URL, headers=headers, data=ndata) return response.json() def greet(audio): write('test.wav', audio[0], audio[1]) output = query(audio) return output["text"] # print("cur path", os.listdir(os.path.join("..", "..", ".."))) # if not os.path.isdir(os.path.join("..", "..", "..", "data", "hfcache")): # os.mkdir(os.path.join("..", "..", "..", "data", "hfcache")) # if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio")): # os.mkdir(os.path.join("..", "..", "..", "data", "audio")) # if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio_texts")): # os.mkdir(os.path.join("..", "..", "..", "data", "audio_texts")) # os.environ["HF_HOME"] = os.path.join("..", "..", "..", "data", "hfcache") def post_process(text): text = text.replace("nouvelle ligne", "\n") text = text.replace("à la ligne", "\n") text = text.replace("point d'intérogation", "?") text = text.replace("point d'intérrogation", "?") text = text.replace("point d'interrogation", "?") text = text.replace("point d'interogation", "?") text = text.replace(" virgule", ",") text = text.replace(" virgule", ",") text = text.replace(" deux points", ":") text = text.replace(" deux points", ":") text = text.replace(" point", ".") text = text.replace(" point", ".") text = text.replace(" nouveau paragraphe ", "\n\n") text = text.replace(" paragraphe ", "\n\n") text = text.split("\n") text = [t.strip() for t in text] text = "\n".join(text) return text def transcribe(state, audio): sr, y = audio y = y.astype(np.float32) y /= np.max(np.abs(y)) if state is not None: state = np.concatenate([state, y]) else: state = y text = greet([sr, state]) text = post_process(text) return state, text def save_fn(audio, text): # sr, y = audio # y = y.astype(np.float32) # y /= np.max(np.abs(y)) # uid = str(uuid.uuid4()) # with open(f"{uid}.txt", "w", encoding="utf-8") as f: # f.write(text) # s3.Bucket(BUCKET).upload_file(f"{uid}.txt", f"texts/{uid}.txt") #local path, bucket path # write(f"{uid}.wav", sr, y) # s3.Bucket(BUCKET).upload_file(f"{uid}.wav", f"audios/{uid}.wav") #local path, bucket path return [None, None, ""] with gr.Blocks() as demo: state = gr.State(None) current_speaches = gr.State(1) old_text = gr.State("") last_text = gr.State("") audio = gr.Audio(streaming=True) text = gr.TextArea(show_copy_button=True) audio.stream(fn=transcribe, inputs=[state, audio], outputs=[state, text]) save = gr.Button("save") save.click(fn=save_fn, inputs=[audio, text], outputs=[state, audio, text]) demo.launch()