import os import sys import uuid from pathlib import Path from contextlib import contextmanager import numpy as np import torch import matplotlib.pyplot as plt import gradio as gr from scipy.io.wavfile import write as wavwrite from audiotools import AudioSignal from audioseal import AudioSeal # allow local imports of your encodec folder @contextmanager def chdir(path: str): origin = Path().absolute() try: os.chdir(path) yield finally: os.chdir(origin) _path = Path(__file__).parent sys.path.insert(0, str(_path)) with chdir(_path): from encodec import Encodec OUT_DIR = _path / "gradio-outputs" OUT_DIR.mkdir(exist_ok=True) LOUDNESS_DB = -16. SAMPLE_RATE = 48_000 ENCODEC_SAMPLE_RATE = 16_000 AUDIOSEAL_SAMPLE_RATE = 16_000 # load codec config = { "sample_rate": 16_000, "target_bandwidths": [2.2], "channels": 1, "causal": False, "codebook_size": 2048, "n_filters": 64, "model_norm": "weight_norm", "audio_normalize": False, "true_skip": True, "ratios": [8, 5, 4, 2], "encoder_kwargs": {"pad_mode": "constant"}, "decoder_kwargs": {"pad_mode": "constant"}, } codec = Encodec(**config) codec.load_state_dict(torch.load("ckpt/encodec_voicecraft.pt", map_location="cpu")) codec.eval() for p in codec.parameters(): p.requires_grad_(False) codec.set_target_bandwidth(2.2) # watermark models embedder = AudioSeal.load_generator("audioseal_wm_16bits") detector = AudioSeal.load_detector("audioseal_detector_16bits") @torch.no_grad() def encode(signal: AudioSignal, codec: torch.nn.Module): n_b, n_ch, n_s = signal.shape sr = signal.sample_rate loud_db = signal.loudness() x = signal.clone().resample(ENCODEC_SAMPLE_RATE).audio_data x = x.reshape(n_b * n_ch, 1, -1) codes, *_ = codec.encode(x) return codes, n_b, n_ch, n_s, sr, loud_db @torch.no_grad() def decode(codes, n_b, n_ch, n_s, sr, loud_db, codec): x = codec.decode(codes).reshape(n_b, n_ch, -1) sig = AudioSignal(x, sample_rate=ENCODEC_SAMPLE_RATE) sig = sig.resample(sr) sig.audio_data = sig.audio_data[..., :n_s] sig.audio_data = torch.nn.functional.pad( sig.audio_data, (0, max(0, n_s - sig.signal_length)) ) return sig.normalize(loud_db) @torch.no_grad() def split_bands(signal: AudioSignal, sample_rate: float = ENCODEC_SAMPLE_RATE): nyq = sample_rate // 2 high = signal.clone().high_pass(cutoffs=int(nyq * 0.95), zeros=51) low = signal.clone().low_pass(cutoffs=int(nyq * 1.05), zeros=51) loud_db = low.loudness() low = low.resample(sample_rate) return low, high, loud_db @torch.no_grad() def merge_bands(low, high, loud_db): low = low.clone().to(high.device).resample(high.sample_rate) low.audio_data = low.audio_data[..., :high.signal_length] low.audio_data = torch.nn.functional.pad( low.audio_data, (0, max(0, high.signal_length - low.signal_length)) ) return low.normalize(loud_db) + high @torch.no_grad() def attack(signal: AudioSignal, codec, split_rate_hz=AUDIOSEAL_SAMPLE_RATE): if split_rate_hz: low, high, loud_db = split_bands(signal, split_rate_hz) low = decode(*encode(low, codec), codec) return merge_bands(low, high, loud_db) else: return decode(*encode(signal, codec), codec) @torch.no_grad() def embed(signal: AudioSignal, embedder: torch.nn.Module): orig_ch, orig_sr = signal.num_channels, signal.sample_rate sig = signal.clone().resample(SAMPLE_RATE) if orig_ch > 1: b, c, n = sig.audio_data.shape sig.audio_data = sig.audio_data.reshape(b * c, 1, n) low, high, loud = split_bands(sig.clone(), AUDIOSEAL_SAMPLE_RATE) wm = embedder.get_watermark(low.audio_data, AUDIOSEAL_SAMPLE_RATE) low.audio_data = low.audio_data + wm merged = merge_bands(low, high, loud) if orig_ch > 1: b2, c2, n2 = merged.audio_data.shape merged.audio_data = merged.audio_data.reshape(-1, orig_ch * c2, n2) return merged.resample(orig_sr) @torch.no_grad() def detect(signal: AudioSignal, detector: torch.nn.Module): sig = signal.clone().to_mono().resample(AUDIOSEAL_SAMPLE_RATE) result, _ = detector.forward(sig.audio_data, sample_rate=AUDIOSEAL_SAMPLE_RATE) return result[0, 1, :].detach().cpu().numpy() def pipeline(audio_tuple): sr, audio_np = audio_tuple print("GOT SR", sr) print("GOT AUDIO", audio_np.shape) if audio_np.ndim == 1: audio_np = audio_np[None, None, :] else: audio_np = np.transpose(audio_np, (1, 0))[None, ...] print("FORMATTED AUDIO", audio_np.shape) sig = AudioSignal(torch.from_numpy(audio_np).float(), sample_rate=sr) orig_loud = sig.loudness() sig = sig.to_mono().resample(SAMPLE_RATE).normalize(LOUDNESS_DB).ensure_max_of_audio() print("REFORMATTED AUDIO") print(sig) # Detect scores = detect(sig, detector) # Embed + detect without attack wm_sig = embed(sig.clone(), embedder).normalize(LOUDNESS_DB).ensure_max_of_audio() scores_clean = detect(wm_sig, detector) print(np.mean(scores_clean)) # Attack + detect att_sig = attack(wm_sig.clone(), codec).normalize(LOUDNESS_DB).ensure_max_of_audio() scores_att = detect(att_sig, detector) print(np.mean(scores_att)) # Match loudness priot to writing wm_sig.normalize(orig_loud).ensure_max_of_audio() att_sig.normalize(orig_loud).ensure_max_of_audio() # Write audio files to disk uid = uuid.uuid4().hex wm_path = OUT_DIR / f"watermarked_{uid}.wav" att_path = OUT_DIR / f"attacked_{uid}.wav" wm_arr = wm_sig.audio_data.squeeze().numpy() att_arr = att_sig.audio_data.squeeze().numpy() wavwrite(str(wm_path), SAMPLE_RATE, wm_arr) wavwrite(str(att_path), SAMPLE_RATE, att_arr) # Plot scores with waveform background # Plot: waveform on top, detection scores on bottom sig_bg = sig.clone().to_mono().resample(AUDIOSEAL_SAMPLE_RATE) wav = sig_bg.audio_data.squeeze().numpy() N = len(scores) if wav.shape[0] < N: wav = np.pad(wav, (0, N - wav.shape[0]), mode="constant") else: wav = wav[:N] fig, (ax_wav, ax_score) = plt.subplots(2, 1, sharex=True, figsize=(8, 6)) # Top: waveform (no labels) ax_wav.plot(wav, alpha=0.3) ax_wav.axis("off") # Bottom: detection scores ax_score.plot(scores, label="No watermark", color="blue") ax_score.plot(scores_clean, label="Watermark (no attack)", color="green") ax_score.plot(scores_att, label="Watermark (codec attack)", color="red") ax_score.set_xlabel("Frame Index") ax_score.set_ylabel("Detection Score") ax_score.set_ylim(-0.05, 1.05) ax_score.set_yticks([0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) ax_score.legend() plt.tight_layout() plot_path = OUT_DIR / f"detection_plot_{uid}.png" fig.savefig(str(plot_path), format="png") plt.close(fig) return str(wm_path), str(att_path), str(plot_path) demo = gr.Interface( fn=pipeline, inputs= gr.Audio(sources=["upload"], type="numpy", label="Upload Input Audio"), outputs=[ gr.Audio(type="filepath", label="Watermarked Audio"), gr.Audio(type="filepath", label="Attacked Audio"), gr.Image(type="filepath", label="Detection Scores Plot"), ], title="Watermark Stress Test", description=""" This is an educational demonstration of state-of-the-art audio watermark performance under codec processing. Upload any (speech) audio file to test watermark performance before and after processing with a low-bitrate neural codec [1]. For this demo, we use the AudioSeal [2] watermark, which is well documented, open source, and provides state-of-the-art localized detection performance. Both the watermark and codec operate at 16kHz, meaning all frequencies above 8kHz are left unaltered. To ensure consistent watermark performance, we normalize audio to -16db LUFS and downmix to mono prior to embedding. [1] https://github.com/jasonppy/VoiceCraft [2] https://github.com/facebookresearch/audioseal """, article=""" The citation info for our corresponding paper is: ``` @inproceedings{deepwatermarksareshallow, author ={Patrick O'Reilly and Zeyu Jin and Jiaqi Su and Bryan Pardo}, title = {Deep Audio Watermarks are Shallow: Limitations of Post-Hoc Watermarking Techniques for Speech}, booktitle = {ICLR Workshop on GenAI Watermarking}, year = {2025} } ``` For the VoiceCraft codec: ``` @article{voicecraft, author={Puyuan Peng and Po-Yao Huang and Daniel Li and Abdelrahman Mohamed and David Harwath}, year={2024}, title={VoiceCraft: Zero-Shot Speech Editing and Text-to-Speech in the Wild}, journal={arXiv preprint arXiv:2403.16973v1}, } ``` And for the AudioSeal watermark: ``` @article{audioseal, title={Proactive Detection of Voice Cloning with Localized Watermarking}, author={San Roman, Robin and Fernandez, Pierre and Elsahar, Hady and D“efossez, Alexandre and Furon, Teddy and Tran, Tuan}, journal={International Conference on Machine Learning (ICML)}, year={2024} } ``` """, allow_flagging="never", ) if __name__ == "__main__": demo.launch(share=True)