File size: 8,229 Bytes
5536e5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1f991d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5536e5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import gradio as gr
from datasets import load_dataset, Dataset, Audio, concatenate_datasets
import json
import os
from datetime import datetime
import shutil

# Directory to save recordings
AUDIO_DIR = "data/audios"
SAMPLING_RATE = 16000
os.makedirs(AUDIO_DIR, exist_ok=True)

# State variables
state = {
    "sentences": [],
    "recordings": {},  # Dictionary to store recordings by ID
    "index": 0,  # Index for navigating through sentences
    "idx": 0,  # Index for sentences (IDs)
    "json_loaded": False

}

def load_json(file):
    with open(file.name, "r", encoding="utf-8") as f:
        content = json.load(f)
    state["sentences"].extend(content)
    state["recordings"].update({k["id"]:[] for k in content})
    state["json_loaded"] = True
    return update_display()

def update_display():
    if not state["sentences"]:
        return "No data loaded.", None, "", "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
    
    idx = state["index"]
    progress = ""
    if state["json_loaded"]:
        if idx >= len(state["sentences"]):
            export_json()
            return "✅ All sentences recorded!\n💾 Data Exported to Json", None, "", "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)

        progress = 0
        for recordings in state["recordings"].values(): 
            if len(recordings) > 0: 
                progress += 1
        progress = f"{progress} / {len(state['sentences'])} recorded"
    
    
    # Enable/Disable buttons based on the current index
    next_btn_enabled = gr.update(visible= not (state["index"] == len(state["sentences"]) - 1))
    prev_btn_enabled = gr.update(visible= not (state["index"] == 0))
    
    recordings = []
    text = ""
    current_id = f"s_{state['idx']}"
    if idx < len(state["sentences"]):
        current = state["sentences"][idx]
        current_id = current['id']
        text = current["text"]
        recordings = state["recordings"].get(current["id"], [])
        
    if recordings:
        # Get the most recent recording for that sentence ID
        current_recording = recordings[-1]
        current_audio = current_recording["audio"]
        audio_visibility = gr.update(visible=True)
    else:
        current_audio = None
        audio_visibility = gr.update(visible=False)

    return text, None, f"ID: {current_id}", progress, gr.update(visible=True), prev_btn_enabled, next_btn_enabled, current_audio, audio_visibility

def record_audio(audio, text):
    if state["sentences"] and state["index"] >= len(state["sentences"]):
        return update_display()

    if audio is None: 
        gr.Warning("The audio is empty, please provide a valid audio")
        return update_display()
    if state["json_loaded"]:
        state["sentences"][state["index"]]["text"] = text # overwrite with current written value
    else:
        state["sentences"].append({"id": f"s_{state['idx']}", "text": text})
        state["idx"] += 1

    sentence = state["sentences"][state["index"]]
    uid = sentence["id"]

    filename = f"{uid}_{datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
    filepath = os.path.join(AUDIO_DIR, filename)

    shutil.copy(audio, filepath)

    # Add the new recording under the correct ID in the recordings dictionary

    uid_versioning = uid
    recordings = state["recordings"].get(uid, [])
    if recordings:
        uid_versioning = f"{uid}_v{len(recordings)}"
    
    state["recordings"].setdefault(uid, []).append({
        "id": uid_versioning,
        "text": sentence["text"],
        "audio": filepath
    })
    state["index"] += 1
    return update_display()

def export_json():
    output_path = "data/tts_dataset.json"
    data = [record for records in state["recordings"].values() for record in records]
    if data: 
        with open(output_path, "w") as f:
            json.dump(data, f, indent=2)
    else:
        gr.Warning("There is no recorded data")
    return output_path

def go_previous():
    if state["index"] > 0:
        state["index"] -= 1
    return update_display()

def go_next():
    if state["index"] < len(state["sentences"]) - 1:
        state["index"] += 1
    return update_display()
def push_to_hub(hub_id, is_new_dataset, sampling_rate):
    if hub_id:
        # flatten recordings 
        recordings = []
        for element in state["recordings"].values():
            for version in element:
                recordings.append({"id": version["id"], "audio": version["audio"], "text": version["text"]}) 
            
        dataset = Dataset.from_list(recordings)
        dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate))
        if not is_new_dataset:
            previous_dataset = load_dataset(hub_id, split="train")
            dataset = concatenate_datasets([previous_dataset, dataset])
        dataset.push_to_hub(hub_id)
        gr.Info("Succesfully synched with the hub")
    else:
        gr.Warning("The hub_id field is empty, please provide a relevant hub id.")
    return update_display()
with gr.Blocks() as demo:
    gr.Markdown("""# 🗣️ TTS Dataset Recorder

Welcome to the **TTS Dataset Recorder**! This tool helps you quickly create a high-quality dataset for Text-to-Speech (TTS) models. Whether you're starting from scratch or have a pre-existing set of text data, this app lets you record audio samples and export them with the corresponding metadata.

### **How to Use?**
1. **Upload a JSON File** containing the sentences you'd like to record (or manually input them through the app).
2. **Record Audio** for each sentence. The app will automatically associate your recordings with the correct text.
3. **Export the Dataset** as a JSON file or **Sync** to HuggingFace for easy sharing and use.

### **Data Input Format**
Your JSON file should follow this structure:
```json
[
    { "id": "001", "text": "Hello, how are you?" },
    { "id": "002", "text": "This is a sample sentence." }
]

                """)
    
    with gr.Row():
        json_file = gr.File(label="Upload Sentences JSON", file_types=[".json"])
        with gr.Column():
            export_btn = gr.Button("💾 Export Metadata")
            with gr.Row():
                hub_id = gr.Textbox(label="Hub id", interactive=True)
                with gr.Row():
                    is_new_dataset = gr.Checkbox(label="New dataset", interactive=True)
                    sampling_rate = gr.Number(label="Sampling rate", value=SAMPLING_RATE, precision=0)
            push_to_hub_btn = gr.Button("🤗 Sync to HuggingFace")
    
    id_display = gr.Textbox(label="ID", interactive=False)
    progress_text = gr.Textbox(label="Progress", interactive=False)
    sentence_text = gr.Textbox(label="Sentence", interactive=True)
    audio_input = gr.Audio(type="filepath", label="Record your voice", interactive=True)
    record_btn = gr.Button("✅ Submit Recording")

    
    with gr.Row():
        prev_btn = gr.Button("⬅️ Previous")
        next_btn = gr.Button("➡️ Next")

    # audio_player = gr.Audio(label="Play Recorded Audio", interactive=False)
    audio_player = gr.Audio(label="Play Recorded Audio", type="filepath")


    json_file.change(load_json, inputs=json_file, outputs=[sentence_text, audio_input, id_display, progress_text, record_btn, prev_btn, next_btn, audio_player, audio_player])
    record_btn.click(record_audio, inputs=[audio_input, sentence_text], outputs=[sentence_text, audio_input, id_display, progress_text, record_btn, prev_btn, next_btn, audio_player, audio_player])
    export_btn.click(export_json, outputs=gr.File())

    prev_btn.click(go_previous, outputs=[sentence_text, audio_input, id_display, progress_text, record_btn, prev_btn, next_btn, audio_player, audio_player])
    next_btn.click(go_next, outputs=[sentence_text, audio_input, id_display, progress_text, record_btn, prev_btn, next_btn, audio_player, audio_player])


    push_to_hub_btn.click(push_to_hub, inputs=[hub_id, is_new_dataset], outputs=[sentence_text, audio_input, id_display, progress_text, record_btn, prev_btn, next_btn, audio_player, audio_player])

demo.launch()