Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import uuid | |
from pathlib import Path | |
from contextlib import contextmanager | |
import numpy as np | |
import torch | |
import matplotlib.pyplot as plt | |
import gradio as gr | |
from scipy.io.wavfile import write as wavwrite | |
from audiotools import AudioSignal | |
from audioseal import AudioSeal | |
# allow local imports of your encodec folder | |
def chdir(path: str): | |
origin = Path().absolute() | |
try: | |
os.chdir(path) | |
yield | |
finally: | |
os.chdir(origin) | |
_path = Path(__file__).parent | |
sys.path.insert(0, str(_path)) | |
with chdir(_path): | |
from encodec import Encodec | |
OUT_DIR = _path / "gradio-outputs" | |
OUT_DIR.mkdir(exist_ok=True) | |
LOUDNESS_DB = -16. | |
SAMPLE_RATE = 48_000 | |
ENCODEC_SAMPLE_RATE = 16_000 | |
AUDIOSEAL_SAMPLE_RATE = 16_000 | |
# load codec | |
config = { | |
"sample_rate": 16_000, | |
"target_bandwidths": [2.2], | |
"channels": 1, | |
"causal": False, | |
"codebook_size": 2048, | |
"n_filters": 64, | |
"model_norm": "weight_norm", | |
"audio_normalize": False, | |
"true_skip": True, | |
"ratios": [8, 5, 4, 2], | |
"encoder_kwargs": {"pad_mode": "constant"}, | |
"decoder_kwargs": {"pad_mode": "constant"}, | |
} | |
codec = Encodec(**config) | |
codec.load_state_dict(torch.load("ckpt/encodec_voicecraft.pt", map_location="cpu")) | |
codec.eval() | |
for p in codec.parameters(): p.requires_grad_(False) | |
codec.set_target_bandwidth(2.2) | |
# watermark models | |
embedder = AudioSeal.load_generator("audioseal_wm_16bits") | |
detector = AudioSeal.load_detector("audioseal_detector_16bits") | |
def encode(signal: AudioSignal, codec: torch.nn.Module): | |
n_b, n_ch, n_s = signal.shape | |
sr = signal.sample_rate | |
loud_db = signal.loudness() | |
x = signal.clone().resample(ENCODEC_SAMPLE_RATE).audio_data | |
x = x.reshape(n_b * n_ch, 1, -1) | |
codes, *_ = codec.encode(x) | |
return codes, n_b, n_ch, n_s, sr, loud_db | |
def decode(codes, n_b, n_ch, n_s, sr, loud_db, codec): | |
x = codec.decode(codes).reshape(n_b, n_ch, -1) | |
sig = AudioSignal(x, sample_rate=ENCODEC_SAMPLE_RATE) | |
sig = sig.resample(sr) | |
sig.audio_data = sig.audio_data[..., :n_s] | |
sig.audio_data = torch.nn.functional.pad( | |
sig.audio_data, (0, max(0, n_s - sig.signal_length)) | |
) | |
return sig.normalize(loud_db) | |
def split_bands(signal: AudioSignal, sample_rate: float = ENCODEC_SAMPLE_RATE): | |
nyq = sample_rate // 2 | |
high = signal.clone().high_pass(cutoffs=int(nyq * 0.95), zeros=51) | |
low = signal.clone().low_pass(cutoffs=int(nyq * 1.05), zeros=51) | |
loud_db = low.loudness() | |
low = low.resample(sample_rate) | |
return low, high, loud_db | |
def merge_bands(low, high, loud_db): | |
low = low.clone().to(high.device).resample(high.sample_rate) | |
low.audio_data = low.audio_data[..., :high.signal_length] | |
low.audio_data = torch.nn.functional.pad( | |
low.audio_data, (0, max(0, high.signal_length - low.signal_length)) | |
) | |
return low.normalize(loud_db) + high | |
def attack(signal: AudioSignal, codec, split_rate_hz=AUDIOSEAL_SAMPLE_RATE): | |
if split_rate_hz: | |
low, high, loud_db = split_bands(signal, split_rate_hz) | |
low = decode(*encode(low, codec), codec) | |
return merge_bands(low, high, loud_db) | |
else: | |
return decode(*encode(signal, codec), codec) | |
def embed(signal: AudioSignal, embedder: torch.nn.Module): | |
orig_ch, orig_sr = signal.num_channels, signal.sample_rate | |
sig = signal.clone().resample(SAMPLE_RATE) | |
if orig_ch > 1: | |
b, c, n = sig.audio_data.shape | |
sig.audio_data = sig.audio_data.reshape(b * c, 1, n) | |
low, high, loud = split_bands(sig.clone(), AUDIOSEAL_SAMPLE_RATE) | |
wm = embedder.get_watermark(low.audio_data, AUDIOSEAL_SAMPLE_RATE) | |
low.audio_data = low.audio_data + wm | |
merged = merge_bands(low, high, loud) | |
if orig_ch > 1: | |
b2, c2, n2 = merged.audio_data.shape | |
merged.audio_data = merged.audio_data.reshape(-1, orig_ch * c2, n2) | |
return merged.resample(orig_sr) | |
def detect(signal: AudioSignal, detector: torch.nn.Module): | |
sig = signal.clone().to_mono().resample(AUDIOSEAL_SAMPLE_RATE) | |
result, _ = detector.forward(sig.audio_data, sample_rate=AUDIOSEAL_SAMPLE_RATE) | |
return result[0, 1, :].detach().cpu().numpy() | |
def pipeline(audio_tuple): | |
sr, audio_np = audio_tuple | |
print("GOT SR", sr) | |
print("GOT AUDIO", audio_np.shape) | |
if audio_np.ndim == 1: | |
audio_np = audio_np[None, None, :] | |
else: | |
audio_np = np.transpose(audio_np, (1, 0))[None, ...] | |
print("FORMATTED AUDIO", audio_np.shape) | |
sig = AudioSignal(torch.from_numpy(audio_np).float(), sample_rate=sr) | |
orig_loud = sig.loudness() | |
sig = sig.to_mono().resample(SAMPLE_RATE).normalize(LOUDNESS_DB).ensure_max_of_audio() | |
print("REFORMATTED AUDIO") | |
print(sig) | |
# Detect | |
scores = detect(sig, detector) | |
# Embed + detect without attack | |
wm_sig = embed(sig.clone(), embedder).normalize(LOUDNESS_DB).ensure_max_of_audio() | |
scores_clean = detect(wm_sig, detector) | |
print(np.mean(scores_clean)) | |
# Attack + detect | |
att_sig = attack(wm_sig.clone(), codec).normalize(LOUDNESS_DB).ensure_max_of_audio() | |
scores_att = detect(att_sig, detector) | |
print(np.mean(scores_att)) | |
# Match loudness priot to writing | |
wm_sig.normalize(orig_loud).ensure_max_of_audio() | |
att_sig.normalize(orig_loud).ensure_max_of_audio() | |
# Write audio files to disk | |
uid = uuid.uuid4().hex | |
wm_path = OUT_DIR / f"watermarked_{uid}.wav" | |
att_path = OUT_DIR / f"attacked_{uid}.wav" | |
wm_arr = wm_sig.audio_data.squeeze().numpy() | |
att_arr = att_sig.audio_data.squeeze().numpy() | |
wavwrite(str(wm_path), SAMPLE_RATE, wm_arr) | |
wavwrite(str(att_path), SAMPLE_RATE, att_arr) | |
# Plot scores with waveform background | |
# Plot: waveform on top, detection scores on bottom | |
sig_bg = sig.clone().to_mono().resample(AUDIOSEAL_SAMPLE_RATE) | |
wav = sig_bg.audio_data.squeeze().numpy() | |
N = len(scores) | |
if wav.shape[0] < N: | |
wav = np.pad(wav, (0, N - wav.shape[0]), mode="constant") | |
else: | |
wav = wav[:N] | |
fig, (ax_wav, ax_score) = plt.subplots(2, 1, sharex=True, figsize=(8, 6)) | |
# Top: waveform (no labels) | |
ax_wav.plot(wav, alpha=0.3) | |
ax_wav.axis("off") | |
# Bottom: detection scores | |
ax_score.plot(scores, label="No watermark", color="blue") | |
ax_score.plot(scores_clean, label="Watermark (no attack)", color="green") | |
ax_score.plot(scores_att, label="Watermark (codec attack)", color="red") | |
ax_score.set_xlabel("Frame Index") | |
ax_score.set_ylabel("Detection Score") | |
ax_score.set_ylim(-0.05, 1.05) | |
ax_score.set_yticks([0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) | |
ax_score.legend() | |
plt.tight_layout() | |
plot_path = OUT_DIR / f"detection_plot_{uid}.png" | |
fig.savefig(str(plot_path), format="png") | |
plt.close(fig) | |
return str(wm_path), str(att_path), str(plot_path) | |
demo = gr.Interface( | |
fn=pipeline, | |
inputs= gr.Audio(sources=["upload"], type="numpy", label="Upload Input Audio"), | |
outputs=[ | |
gr.Audio(type="filepath", label="Watermarked Audio"), | |
gr.Audio(type="filepath", label="Attacked Audio"), | |
gr.Image(type="filepath", label="Detection Scores Plot"), | |
], | |
title="Watermark Stress Test", | |
description=""" | |
This is an educational demonstration of state-of-the-art audio watermark performance under codec processing. Upload any (speech) audio file to test watermark performance before and after processing with a low-bitrate neural codec [1]. | |
For this demo, we use the AudioSeal [2] watermark, which is well documented, open source, and provides state-of-the-art localized detection performance. Both the watermark and codec operate at 16kHz, meaning all frequencies above 8kHz are left unaltered. To ensure consistent watermark performance, we normalize audio to -16db LUFS and downmix to mono prior to embedding. | |
[1] https://github.com/jasonppy/VoiceCraft | |
[2] https://github.com/facebookresearch/audioseal | |
""", | |
article=""" | |
The citation info for our corresponding paper is: | |
``` | |
@inproceedings{deepwatermarksareshallow, | |
author ={Patrick O'Reilly and Zeyu Jin and Jiaqi Su and Bryan Pardo}, | |
title = {Deep Audio Watermarks are Shallow: Limitations of Post-Hoc Watermarking Techniques for Speech}, | |
booktitle = {ICLR Workshop on GenAI Watermarking}, | |
year = {2025} | |
} | |
``` | |
For the VoiceCraft codec: | |
``` | |
@article{voicecraft, | |
author={Puyuan Peng and Po-Yao Huang and Daniel Li and Abdelrahman Mohamed and David Harwath}, | |
year={2024}, | |
title={VoiceCraft: Zero-Shot Speech Editing and Text-to-Speech in the Wild}, | |
journal={arXiv preprint arXiv:2403.16973v1}, | |
} | |
``` | |
And for the AudioSeal watermark: | |
``` | |
@article{audioseal, | |
title={Proactive Detection of Voice Cloning with Localized Watermarking}, | |
author={San Roman, Robin and Fernandez, Pierre and Elsahar, Hady and D´efossez, Alexandre and Furon, Teddy and Tran, Tuan}, | |
journal={International Conference on Machine Learning (ICML)}, | |
year={2024} | |
} | |
``` | |
""", | |
allow_flagging="never", | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True) | |