receiver-signal / app.py
Awell00's picture
Update app.py
ae67ec6
raw
history blame
8.51 kB
import numpy as np
from scipy.io.wavfile import write
from scipy.signal import find_peaks
from scipy.fft import fft
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.io.wavfile import read
from scipy import signal
import gradio as gr
import reedsolo
import wavio
from scipy.signal import butter, lfilter
# ---------------Parameters--------------- #
input_file = 'input_text.wav'
output_file = 'output_filtered_receiver.wav'
low_frequency = 18000
high_frequency = 19000
bit_duration = 0.007
sample_rate = 44100
amplitude_scaling_factor = 10.0
# -----------------Filter----------------- #
def butter_bandpass(sr, order=5):
"""
This function designs a Butterworth bandpass filter.
Parameters:
sr (int): The sample rate of the audio.
order (int): The order of the filter.
Returns:
tuple: The filter coefficients `b` and `a`.
"""
# Calculate the Nyquist frequency
nyquist = 0.5 * sr
# Normalize the cutoff frequencies
low = low_frequency / nyquist
high = high_frequency / nyquist
# Design the Butterworth bandpass filter
coefficient = butter(order, [low, high], btype='band')
# Extract the filter coefficients
b = coefficient[0]
a = coefficient[1]
return b, a
def butter_bandpass_filter(data, sr, order=5):
"""
This function applies the Butterworth bandpass filter to a given data.
Parameters:
data (array): The audio data to be filtered.
sr (int): The sample rate of the audio.
order (int): The order of the filter.
Returns:
array: The filtered audio data.
"""
# Get the filter coefficients
b, a = butter_bandpass(sr, order=order)
# Apply the filter to the data
y = lfilter(b, a, data)
return y
def filtered():
"""
This function reads an audio file, applies the bandpass filter to the audio data,
and then writes the filtered data to an output file.
Returns:
str: A success message if the audio is filtered correctly, otherwise an error message.
"""
try:
input_file = 'input_text.wav'
output_file = 'output_filtered_receiver.wav'
# Read the audio data from the input file
sr, data = read(input_file)
# Apply the bandpass filter to the audio data
filtered_data = butter_bandpass_filter(data, sr)
# Write the filtered data to the output file
write(output_file, sr, np.int16(filtered_data))
return "Filtered Audio Generated"
except Exception as e:
# If an error occurs, return an error message
return f"Error: {str(e)}"
# -----------------Record----------------- #
def record(audio):
"""
This function records audio and writes it to a .wav file.
Parameters:
audio (tuple): A tuple containing the sample rate and the audio data.
Returns:
str: A success message if the audio is recorded correctly, otherwise an error message.
"""
try:
# Check if the audio tuple contains exactly two elements
if len(audio) != 2:
return f"Error: Expected a tuple with 2 elements, but got {len(audio)}"
# Unpack the sample rate and data from the audio tuple
sr, data = audio
# Write the audio data to a .wav file
wavio.write("recorded.wav", data, sr)
# Call the filtered function to apply the bandpass filter to the audio data
filtered()
# Return a success message
return f"Audio receive correctly"
except Exception as e:
# If an error occurs, return an error message
return f"Error: {str(e)}"
# -----------------Frame----------------- #
def calculate_snr(data, start, end, target_frequency):
segment = data[start:end]
spectrum = np.fft.fft(segment)
frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
target_index = np.abs(frequencies - target_frequency).argmin()
amplitude = np.abs(spectrum[target_index])
noise_segment = data[100:1000 + len(segment)]
noise_spectrum = np.fft.fft(noise_segment)
noise_amplitude = np.abs(noise_spectrum[target_index])
snr = 10 * np.log10(amplitude / noise_amplitude)
return snr
def frame_analyse(filename):
sr, y = read(filename)
first_part_start = 0
first_part_end = len(y) // 2
second_part_start = len(y) // 2
second_part_end = len(y)
segment_length = 256
overlap_size = 128
f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
plt.figure()
plt.pcolormesh(t, f, sxx, shading="gouraud")
plt.xlabel("Time [s]")
plt.ylabel("Frequency [Hz]")
plt.title("Spectrogram of the signal")
plt.show()
f0 = 18000
f_idx = np.argmin(np.abs(f - f0))
thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
t_start = t[t_idx_start]
t_idx_end = t_idx_start
while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
t_idx_end += 1
t_end = t[t_idx_end]
return t_start, t_end
# -----------------Receiver----------------- #
def dominant_frequency(signal_value):
yf = fft(signal_value)
xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
def binary_to_text(binary):
try:
return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
except Exception as e:
return f"Except: {e}"
def decode_rs(binary_string, ecc_bytes):
byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
rs = reedsolo.RSCodec(ecc_bytes)
corrected_data_tuple = rs.decode(byte_data)
corrected_data = corrected_data_tuple[0]
corrected_data = corrected_data.rstrip(b'\x00')
corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
return corrected_binary_string
def manchester_decoding(binary_string):
decoded_string = ''
for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
if i + 1 < len(binary_string):
if binary_string[i] == '0' and binary_string[i + 1] == '1':
decoded_string += '0'
elif binary_string[i] == '1' and binary_string[i + 1] == '0':
decoded_string += '1'
else:
print("Error: Invalid Manchester Encoding")
return None
return decoded_string
def signal_to_binary_between_times(filename):
start_time, end_time = frame_analyse(filename)
sr, data = read(filename)
start_sample = int((start_time - 0.007) * sr)
end_sample = int((end_time - 0.007) * sr)
binary_string = ''
start_analyse_time = time.time()
for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
signal_value = data[i:i + int(sr * bit_duration)]
frequency = dominant_frequency(signal_value)
if np.abs(frequency - low_frequency) < np.abs(frequency - high_frequency):
binary_string += '0'
else:
binary_string += '1'
index_start = binary_string.find("1000001")
substrings = ["0111110", "011110"]
index_end = -1
for substring in substrings:
index = binary_string.find(substring)
if index != -1:
index_end = index
break
print("Binary String:", binary_string)
binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
decoded_binary_string = decode_rs(binary_string_decoded, 20)
return decoded_binary_string
def receive():
try:
audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
return binary_to_text(audio_receive)
except Exception as e:
return f"Error: {e}"
# -----------------Interface----------------- #
with gr.Blocks() as demo:
input_audio = gr.Audio(sources=["upload"])
output_text = gr.Textbox(label="Record Sound")
btn_convert = gr.Button(value="Convert")
btn_convert.click(fn=record, inputs=input_audio, outputs=output_text)
output_convert = gr.Textbox(label="Received Text")
btn_receive = gr.Button(value="Received Text")
btn_receive.click(fn=receive, outputs=output_convert)
demo.launch()