camenduru's picture
thanks to NVIDIA ❤
7934b29
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import random
import shutil
import urllib.request
from pathlib import Path
import pandas as pd
from joblib import Parallel, delayed
from nemo_text_processing.text_normalization.normalize import Normalizer
from tqdm import tqdm
from nemo.utils import logging
# full corpus.
URLS_FULL = {
"Bernd_Ungerer": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/Bernd_Ungerer.zip",
"Eva_K": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/Eva_K.zip",
"Friedrich": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/Friedrich.zip",
"Hokuspokus": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/Hokuspokus.zip",
"Karlsson": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/Karlsson.zip",
"others": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_full/others.zip",
}
URL_STATS_FULL = "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/datasetStatistic.zip"
# the clean subset of the full corpus.
URLS_CLEAN = {
"Bernd_Ungerer": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/Bernd_Ungerer_Clean.zip",
"Eva_K": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/Eva_K_Clean.zip",
"Friedrich": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/Friedrich_Clean.zip",
"Hokuspokus": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/Hokuspokus_Clean.zip",
"Karlsson": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/Karlsson_Clean.zip",
"others": "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/dataset_clean/others_Clean.zip",
}
URL_STATS_CLEAN = "https://opendata.iisys.de/opendata/Datasets/HUI-Audio-Corpus-German/datasetStatisticClean.zip"
def get_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Download HUI-Audio-Corpus-German and create manifests with predefined split. "
"Please check details about the corpus in https://github.com/iisys-hof/HUI-Audio-Corpus-German.",
)
parser.add_argument("--data-root", required=True, type=Path, help="where the resulting dataset will reside.")
parser.add_argument("--manifests-root", required=True, type=Path, help="where the manifests files will reside.")
parser.add_argument("--set-type", default="clean", choices=["full", "clean"], type=str)
parser.add_argument("--min-duration", default=0.1, type=float)
parser.add_argument("--max-duration", default=15, type=float)
parser.add_argument(
"--num-workers",
default=-1,
type=int,
help="Specify the max number of concurrently Python workers processes. "
"If -1 all CPUs are used. If 1 no parallel computing is used.",
)
parser.add_argument(
"--normalize-text",
default=False,
action='store_true',
help="Normalize original text and add a new entry 'normalized_text' to .json file if True.",
)
parser.add_argument(
"--val-num-utts-per-speaker",
default=1,
type=int,
help="Specify the number of utterances for each speaker in val split. All speakers are covered.",
)
parser.add_argument(
"--test-num-utts-per-speaker",
default=1,
type=int,
help="Specify the number of utterances for each speaker in test split. All speakers are covered.",
)
parser.add_argument(
"--seed-for-ds-split",
default=100,
type=float,
help="Seed for deterministic split of train/dev/test, NVIDIA's default is 100",
)
args = parser.parse_args()
return args
def __maybe_download_file(source_url, destination_path):
if not destination_path.exists():
logging.info(f"Downloading data: {source_url} --> {destination_path}")
tmp_file_path = destination_path.with_suffix(".tmp")
urllib.request.urlretrieve(source_url, filename=tmp_file_path)
tmp_file_path.rename(destination_path)
else:
logging.info(f"Skipped downloading data because it exists: {destination_path}")
def __extract_file(filepath, data_dir):
logging.info(f"Unzipping data: {filepath} --> {data_dir}")
shutil.unpack_archive(filepath, data_dir)
logging.info(f"Unzipping data is complete: {filepath}.")
def __save_json(json_file, dict_list):
logging.info(f"Saving JSON split to {json_file}.")
with open(json_file, "w") as f:
for d in dict_list:
f.write(json.dumps(d) + "\n")
def __process_data(
dataset_path, stat_path_root, speaker_id, min_duration, max_duration, val_size, test_size, seed_for_ds_split,
):
logging.info(f"Preparing JSON split for speaker {speaker_id}.")
# parse statistic.txt
stat_path = stat_path_root / "statistic.txt"
with open(stat_path, 'r') as fstat:
lines = fstat.readlines()
num_utts = int(lines[4].strip().split()[-1])
hours = round(float(lines[9].strip().split()[-1]) / 3600.0, 2)
# parse overview.csv to generate JSON splits.
overview_path = stat_path_root / "overview.csv"
entries = []
with open(overview_path, 'r') as foverview:
# Let's skip the header
foverview.readline()
for line in tqdm(foverview):
file_stem, duration, *_, text = line.strip().split("|")
duration = float(duration)
# file_stem -> dir_name (e.g. maerchen_01_f000051 -> maerchen)
dir_name = "_".join(file_stem.split("_")[:-2])
audio_path = dataset_path / dir_name / "wavs" / f"{file_stem}.wav"
if min_duration <= duration <= max_duration:
entry = {
"audio_filepath": str(audio_path),
"duration": duration,
"text": text,
"speaker": speaker_id,
}
entries.append(entry)
random.Random(seed_for_ds_split).shuffle(entries)
train_size = len(entries) - val_size - test_size
if train_size <= 0:
logging.warning(f"Skipped speaker {speaker_id}. Not enough data for train, val and test.")
train, val, test, is_skipped = [], [], [], True
else:
logging.info(f"Preparing JSON split for speaker {speaker_id} is complete.")
train, val, test, is_skipped = (
entries[:train_size],
entries[train_size : train_size + val_size],
entries[train_size + val_size :],
False,
)
return {
"train": train,
"val": val,
"test": test,
"is_skipped": is_skipped,
"hours": hours,
"num_utts": num_utts,
}
def __text_normalization(json_file, num_workers=-1):
text_normalizer_call_kwargs = {
"punct_pre_process": True,
"punct_post_process": True,
}
text_normalizer = Normalizer(
lang="de", input_case="cased", overwrite_cache=True, cache_dir=str(json_file.parent / "cache_dir"),
)
def normalizer_call(x):
return text_normalizer.normalize(x, **text_normalizer_call_kwargs)
def add_normalized_text(line_dict):
normalized_text = normalizer_call(line_dict["text"])
line_dict.update({"normalized_text": normalized_text})
return line_dict
logging.info(f"Normalizing text for {json_file}.")
with open(json_file, 'r', encoding='utf-8') as fjson:
lines = fjson.readlines()
# Note: you need to verify which backend works well on your cluster.
# backend="loky" is fine on multi-core Ubuntu OS; backend="threading" on Slurm.
dict_list = Parallel(n_jobs=num_workers)(
delayed(add_normalized_text)(json.loads(line)) for line in tqdm(lines)
)
json_file_text_normed = json_file.parent / f"{json_file.stem}_text_normed{json_file.suffix}"
with open(json_file_text_normed, 'w', encoding="utf-8") as fjson_norm:
for dct in dict_list:
fjson_norm.write(json.dumps(dct) + "\n")
logging.info(f"Normalizing text is complete: {json_file} --> {json_file_text_normed}")
def main():
args = get_args()
data_root = args.data_root
manifests_root = args.manifests_root
set_type = args.set_type
dataset_root = data_root / f"HUI-Audio-Corpus-German-{set_type}"
dataset_root.mkdir(parents=True, exist_ok=True)
if set_type == "full":
data_source = URLS_FULL
stats_source = URL_STATS_FULL
elif set_type == "clean":
data_source = URLS_CLEAN
stats_source = URL_STATS_CLEAN
else:
raise ValueError(f"Unknown {set_type}. Please choose either clean or full.")
# download and unzip dataset stats
zipped_stats_path = dataset_root / Path(stats_source).name
__maybe_download_file(stats_source, zipped_stats_path)
__extract_file(zipped_stats_path, dataset_root)
# download datasets
# Note: you need to verify which backend works well on your cluster.
# backend="loky" is fine on multi-core Ubuntu OS; backend="threading" on Slurm.
Parallel(n_jobs=args.num_workers)(
delayed(__maybe_download_file)(data_url, dataset_root / Path(data_url).name)
for _, data_url in data_source.items()
)
# unzip datasets
# Note: you need to verify which backend works well on your cluster.
# backend="loky" is fine on multi-core Ubuntu OS; backend="threading" on Slurm.
Parallel(n_jobs=args.num_workers)(
delayed(__extract_file)(dataset_root / Path(data_url).name, dataset_root)
for _, data_url in data_source.items()
)
# generate json files for train/val/test splits
stats_path_root = dataset_root / Path(stats_source).stem / "speacker"
entries_train, entries_val, entries_test = [], [], []
speaker_entries = []
num_speakers = 0
for child in stats_path_root.iterdir():
if child.is_dir():
speaker = child.name
num_speakers += 1
speaker_stats_root = stats_path_root / speaker
speaker_data_path = dataset_root / speaker
logging.info(f"Processing Speaker: {speaker}")
results = __process_data(
speaker_data_path,
speaker_stats_root,
num_speakers,
args.min_duration,
args.max_duration,
args.val_num_utts_per_speaker,
args.test_num_utts_per_speaker,
args.seed_for_ds_split,
)
entries_train.extend(results["train"])
entries_val.extend(results["val"])
entries_test.extend(results["test"])
speaker_entry = {
"speaker_name": speaker,
"speaker_id": num_speakers,
"hours": results["hours"],
"num_utts": results["num_utts"],
"is_skipped": results["is_skipped"],
}
speaker_entries.append(speaker_entry)
# shuffle in place across multiple speakers
random.Random(args.seed_for_ds_split).shuffle(entries_train)
random.Random(args.seed_for_ds_split).shuffle(entries_val)
random.Random(args.seed_for_ds_split).shuffle(entries_test)
# save speaker stats.
df = pd.DataFrame.from_records(speaker_entries)
df.sort_values(by="hours", ascending=False, inplace=True)
spk2id_file_path = manifests_root / "spk2id.csv"
df.to_csv(spk2id_file_path, index=False)
logging.info(f"Saving Speaker to ID mapping to {spk2id_file_path}.")
# save json splits.
train_json = manifests_root / "train_manifest.json"
val_json = manifests_root / "val_manifest.json"
test_json = manifests_root / "test_manifest.json"
__save_json(train_json, entries_train)
__save_json(val_json, entries_val)
__save_json(test_json, entries_test)
# normalize text if requested. New json file, train_manifest_text_normed.json, will be generated.
if args.normalize_text:
__text_normalization(train_json, args.num_workers)
__text_normalization(val_json, args.num_workers)
__text_normalization(test_json, args.num_workers)
if __name__ == "__main__":
main()