Awell00 commited on
Commit
6a85123
·
1 Parent(s): ae67ec6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +215 -132
app.py CHANGED
@@ -23,205 +23,265 @@ sample_rate = 44100
23
  amplitude_scaling_factor = 10.0
24
 
25
 
26
- # -----------------Filter----------------- #
27
-
28
- def butter_bandpass(sr, order=5):
29
- """
30
- This function designs a Butterworth bandpass filter.
31
-
32
- Parameters:
33
- sr (int): The sample rate of the audio.
34
- order (int): The order of the filter.
 
 
 
 
35
 
36
- Returns:
37
- tuple: The filter coefficients `b` and `a`.
38
- """
39
- # Calculate the Nyquist frequency
40
- nyquist = 0.5 * sr
41
 
42
- # Normalize the cutoff frequencies
43
- low = low_frequency / nyquist
44
- high = high_frequency / nyquist
 
 
45
 
46
- # Design the Butterworth bandpass filter
47
- coefficient = butter(order, [low, high], btype='band')
48
 
49
- # Extract the filter coefficients
50
- b = coefficient[0]
51
- a = coefficient[1]
52
 
53
- return b, a
 
 
 
 
 
 
 
54
 
55
 
56
- def butter_bandpass_filter(data, sr, order=5):
57
- """
58
- This function applies the Butterworth bandpass filter to a given data.
59
 
60
- Parameters:
61
- data (array): The audio data to be filtered.
62
- sr (int): The sample rate of the audio.
63
- order (int): The order of the filter.
 
 
 
 
64
 
65
- Returns:
66
- array: The filtered audio data.
67
- """
68
- # Get the filter coefficients
69
- b, a = butter_bandpass(sr, order=order)
70
 
71
- # Apply the filter to the data
 
72
  y = lfilter(b, a, data)
73
-
74
  return y
75
 
76
 
77
- def filtered():
78
- """
79
- This function reads an audio file, applies the bandpass filter to the audio data,
80
- and then writes the filtered data to an output file.
81
-
82
- Returns:
83
- str: A success message if the audio is filtered correctly, otherwise an error message.
84
- """
85
- try:
86
- input_file = 'input_text.wav'
87
- output_file = 'output_filtered_receiver.wav'
88
-
89
- # Read the audio data from the input file
90
- sr, data = read(input_file)
91
 
92
- # Apply the bandpass filter to the audio data
93
- filtered_data = butter_bandpass_filter(data, sr)
94
 
95
- # Write the filtered data to the output file
96
- write(output_file, sr, np.int16(filtered_data))
 
 
97
 
98
- return "Filtered Audio Generated"
99
- except Exception as e:
100
- # If an error occurs, return an error message
101
- return f"Error: {str(e)}"
102
-
103
-
104
- # -----------------Record----------------- #
105
 
106
- def record(audio):
107
  """
108
- This function records audio and writes it to a .wav file.
109
 
110
  Parameters:
111
- audio (tuple): A tuple containing the sample rate and the audio data.
 
 
 
112
 
113
  Returns:
114
- str: A success message if the audio is recorded correctly, otherwise an error message.
115
  """
116
  try:
117
- # Check if the audio tuple contains exactly two elements
118
- if len(audio) != 2:
119
- return f"Error: Expected a tuple with 2 elements, but got {len(audio)}"
120
 
121
- # Unpack the sample rate and data from the audio tuple
122
- sr, data = audio
123
 
124
- # Write the audio data to a .wav file
125
- wavio.write("recorded.wav", data, sr)
126
 
127
- # Call the filtered function to apply the bandpass filter to the audio data
128
- filtered()
129
 
130
- # Return a success message
131
- return f"Audio receive correctly"
132
- except Exception as e:
133
- # If an error occurs, return an error message
134
- return f"Error: {str(e)}"
135
 
 
 
136
 
137
- # -----------------Frame----------------- #
 
138
 
139
- def calculate_snr(data, start, end, target_frequency):
140
- segment = data[start:end]
141
- spectrum = np.fft.fft(segment)
142
- frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
143
- target_index = np.abs(frequencies - target_frequency).argmin()
144
- amplitude = np.abs(spectrum[target_index])
145
 
146
- noise_segment = data[100:1000 + len(segment)]
147
- noise_spectrum = np.fft.fft(noise_segment)
148
- noise_amplitude = np.abs(noise_spectrum[target_index])
149
 
150
- snr = 10 * np.log10(amplitude / noise_amplitude)
151
- return snr
 
 
152
 
153
 
154
  def frame_analyse(filename):
155
- sr, y = read(filename)
156
-
157
- first_part_start = 0
158
- first_part_end = len(y) // 2
159
-
160
- second_part_start = len(y) // 2
161
- second_part_end = len(y)
162
-
163
- segment_length = 256
164
- overlap_size = 128
165
-
166
- f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
167
-
168
- plt.figure()
169
- plt.pcolormesh(t, f, sxx, shading="gouraud")
170
- plt.xlabel("Time [s]")
171
- plt.ylabel("Frequency [Hz]")
172
- plt.title("Spectrogram of the signal")
173
- plt.show()
174
-
175
- f0 = 18000
176
-
177
- f_idx = np.argmin(np.abs(f - f0))
178
-
179
- thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
180
- thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
181
-
182
- t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
183
-
184
- t_start = t[t_idx_start]
185
-
186
- t_idx_end = t_idx_start
187
- while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
188
- t_idx_end += 1
189
 
190
- t_end = t[t_idx_end]
 
191
 
192
- return t_start, t_end
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
 
194
 
195
  # -----------------Receiver----------------- #
196
 
197
  def dominant_frequency(signal_value):
 
 
 
 
 
 
 
 
 
 
198
  yf = fft(signal_value)
 
 
199
  xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
 
 
200
  peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
 
 
201
  return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
202
 
203
 
204
  def binary_to_text(binary):
 
 
 
 
 
 
 
 
 
205
  try:
 
206
  return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
207
  except Exception as e:
208
- return f"Except: {e}"
 
209
 
210
 
211
  def decode_rs(binary_string, ecc_bytes):
 
 
 
 
 
 
 
 
 
 
 
212
  byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
 
 
213
  rs = reedsolo.RSCodec(ecc_bytes)
 
 
214
  corrected_data_tuple = rs.decode(byte_data)
215
  corrected_data = corrected_data_tuple[0]
216
 
 
217
  corrected_data = corrected_data.rstrip(b'\x00')
218
 
 
219
  corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
220
 
221
  return corrected_binary_string
222
 
223
 
224
  def manchester_decoding(binary_string):
 
 
 
 
 
 
 
 
 
225
  decoded_string = ''
226
  for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
227
  if i + 1 < len(binary_string):
@@ -236,16 +296,27 @@ def manchester_decoding(binary_string):
236
 
237
 
238
  def signal_to_binary_between_times(filename):
 
 
 
 
 
 
 
 
 
 
239
  start_time, end_time = frame_analyse(filename)
240
 
 
241
  sr, data = read(filename)
242
 
 
243
  start_sample = int((start_time - 0.007) * sr)
244
  end_sample = int((end_time - 0.007) * sr)
245
  binary_string = ''
246
 
247
- start_analyse_time = time.time()
248
-
249
  for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
250
  signal_value = data[i:i + int(sr * bit_duration)]
251
  frequency = dominant_frequency(signal_value)
@@ -254,10 +325,10 @@ def signal_to_binary_between_times(filename):
254
  else:
255
  binary_string += '1'
256
 
 
257
  index_start = binary_string.find("1000001")
258
  substrings = ["0111110", "011110"]
259
  index_end = -1
260
-
261
  for substring in substrings:
262
  index = binary_string.find(substring)
263
  if index != -1:
@@ -267,21 +338,33 @@ def signal_to_binary_between_times(filename):
267
  print("Binary String:", binary_string)
268
  binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
269
 
 
270
  decoded_binary_string = decode_rs(binary_string_decoded, 20)
271
 
272
  return decoded_binary_string
273
 
274
 
275
  def receive():
 
 
 
 
 
 
276
  try:
 
277
  audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
 
 
278
  return binary_to_text(audio_receive)
279
  except Exception as e:
 
280
  return f"Error: {e}"
281
 
282
 
283
  # -----------------Interface----------------- #
284
 
 
285
  with gr.Blocks() as demo:
286
  input_audio = gr.Audio(sources=["upload"])
287
  output_text = gr.Textbox(label="Record Sound")
 
23
  amplitude_scaling_factor = 10.0
24
 
25
 
26
+ import numpy as np
27
+ from scipy.io.wavfile import write
28
+ from scipy.signal import find_peaks
29
+ from scipy.fft import fft
30
+ from tqdm import tqdm
31
+ import time
32
+ import matplotlib.pyplot as plt
33
+ from scipy.io.wavfile import read
34
+ from scipy import signal
35
+ import gradio as gr
36
+ import reedsolo
37
+ import wavio
38
+ from scipy.signal import butter, lfilter
39
 
40
+ # ---------------Parameters--------------- #
 
 
 
 
41
 
42
+ low_frequency = 18000
43
+ high_frequency = 19000
44
+ bit_duration = 0.007
45
+ sample_rate = 44100
46
+ amplitude_scaling_factor = 10.0
47
 
 
 
48
 
49
+ # -----------------Record----------------- #
 
 
50
 
51
+ def record(audio):
52
+ try:
53
+ sr, data = audio
54
+ wavio.write("recorded.wav", data, sr)
55
+ main()
56
+ return f"Audio receive correctly"
57
+ except Exception as e:
58
+ return f"Error: {e}"
59
 
60
 
61
+ # -----------------Filter----------------- #
 
 
62
 
63
+ def butter_bandpass(lowcut, highcut, sr, order=5):
64
+ nyquist = 0.5 * sr
65
+ low = lowcut / nyquist
66
+ high = highcut / nyquist
67
+ coef = butter(order, [low, high], btype='band')
68
+ b = coef[0]
69
+ a = coef[1]
70
+ return b, a
71
 
 
 
 
 
 
72
 
73
+ def butter_bandpass_filter(data, lowcut, highcut, sr, order=5):
74
+ b, a = butter_bandpass(lowcut, highcut, sr, order=order)
75
  y = lfilter(b, a, data)
 
76
  return y
77
 
78
 
79
+ def main():
80
+ input_file = 'recorded.wav'
81
+ output_file = 'output_filtered_receiver.wav'
82
+ lowcut = 17500
83
+ highcut = 19500
 
 
 
 
 
 
 
 
 
84
 
85
+ sr, data = read(input_file)
 
86
 
87
+ filtered_data = butter_bandpass_filter(data, lowcut, highcut, sr)
88
+ write(output_file, sr, np.int16(filtered_data))
89
+ return "Filtered Audio Generated"
90
+
91
 
92
+ # -----------------Frame----------------- #
 
 
 
 
 
 
93
 
94
+ def calculate_snr(data, start, end, target_frequency):
95
  """
96
+ This function calculates the Signal-to-Noise Ratio (SNR) for a given frequency within a segment of data.
97
 
98
  Parameters:
99
+ data (array): The audio data.
100
+ start (int): The start index of the segment.
101
+ end (int): The end index of the segment.
102
+ target_frequency (float): The frequency for which the SNR is to be calculated.
103
 
104
  Returns:
105
+ float: The calculated SNR.
106
  """
107
  try:
108
+ # Extract the segment from the data
109
+ segment = data[start:end]
 
110
 
111
+ # Perform a Fast Fourier Transform on the segment
112
+ spectrum = np.fft.fft(segment)
113
 
114
+ # Generate the frequencies corresponding to the FFT coefficients
115
+ frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
116
 
117
+ # Find the index of the target frequency
118
+ target_index = np.abs(frequencies - target_frequency).argmin()
119
 
120
+ # Calculate the amplitude of the target frequency
121
+ amplitude = np.abs(spectrum[target_index])
 
 
 
122
 
123
+ # Define a noise segment
124
+ noise_segment = data[100:1000 + len(segment)]
125
 
126
+ # Perform a Fast Fourier Transform on the noise segment
127
+ noise_spectrum = np.fft.fft(noise_segment)
128
 
129
+ # Calculate the amplitude of the noise at the target frequency
130
+ noise_amplitude = np.abs(noise_spectrum[target_index])
 
 
 
 
131
 
132
+ # Calculate the SNR
133
+ snr = 10 * np.log10(amplitude / noise_amplitude)
 
134
 
135
+ return snr
136
+ except Exception as e:
137
+ # If an error occurs, return an error message
138
+ return f"Error: {e}"
139
 
140
 
141
  def frame_analyse(filename):
142
+ """
143
+ This function analyses an audio file and returns the start and end times of the signal of interest.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
+ Parameters:
146
+ filename (str): The path to the audio file.
147
 
148
+ Returns:
149
+ tuple: The start and end times of the signal of interest.
150
+ """
151
+ try:
152
+ # Read the audio file
153
+ sr, y = read(filename)
154
+
155
+ # Define the start and end indices of the first and second parts of the audio data
156
+ first_part_start = 0
157
+ first_part_end = len(y) // 2
158
+ second_part_start = len(y) // 2
159
+ second_part_end = len(y)
160
+
161
+ # Define the segment length and overlap size for the spectrogram
162
+ segment_length = 256
163
+ overlap_size = 128
164
+
165
+ # Calculate the spectrogram of the audio data
166
+ f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
167
+
168
+ # Plot the spectrogram
169
+ plt.figure()
170
+ plt.pcolormesh(t, f, sxx, shading="gouraud")
171
+ plt.xlabel("Time [s]")
172
+ plt.ylabel("Frequency [Hz]")
173
+ plt.title("Spectrogram of the signal")
174
+ plt.show()
175
+
176
+ # Define the target frequency
177
+ f0 = 18000
178
+
179
+ # Find the index of the target frequency
180
+ f_idx = np.argmin(np.abs(f - f0))
181
+
182
+ # Calculate the SNR thresholds for the start and end of the signal
183
+ thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
184
+ thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
185
+
186
+ # Find the start and end indices of the signal of interest
187
+ t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
188
+ t_idx_end = t_idx_start
189
+ while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
190
+ t_idx_end += 1
191
+
192
+ # Convert the start and end indices to times
193
+ t_start = t[t_idx_start]
194
+ t_end = t[t_idx_end]
195
+
196
+ return t_start, t_end
197
+ except Exception as e:
198
+ # If an error occurs, return an error message
199
+ return f"Error: {e}"
200
 
201
 
202
  # -----------------Receiver----------------- #
203
 
204
  def dominant_frequency(signal_value):
205
+ """
206
+ This function calculates the dominant frequency in a given signal.
207
+
208
+ Parameters:
209
+ signal_value (array): The signal data.
210
+
211
+ Returns:
212
+ float: The dominant frequency.
213
+ """
214
+ # Perform a Fast Fourier Transform on the signal
215
  yf = fft(signal_value)
216
+
217
+ # Generate the frequencies corresponding to the FFT coefficients
218
  xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
219
+
220
+ # Find the peaks in the absolute values of the FFT coefficients
221
  peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
222
+
223
+ # Return the frequency corresponding to the peak with the highest amplitude
224
  return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
225
 
226
 
227
  def binary_to_text(binary):
228
+ """
229
+ This function converts a binary string to text.
230
+
231
+ Parameters:
232
+ binary (str): The binary string.
233
+
234
+ Returns:
235
+ str: The converted text.
236
+ """
237
  try:
238
+ # Convert each 8-bit binary number to a character and join them together
239
  return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
240
  except Exception as e:
241
+ # If an error occurs, return an error message
242
+ return f"Error: {e}"
243
 
244
 
245
  def decode_rs(binary_string, ecc_bytes):
246
+ """
247
+ This function decodes a Reed-Solomon encoded binary string.
248
+
249
+ Parameters:
250
+ binary_string (str): The binary string.
251
+ ecc_bytes (int): The number of error correction bytes used in the encoding.
252
+
253
+ Returns:
254
+ str: The decoded binary string.
255
+ """
256
+ # Convert the binary string to a bytearray
257
  byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
258
+
259
+ # Initialize a Reed-Solomon codec
260
  rs = reedsolo.RSCodec(ecc_bytes)
261
+
262
+ # Decode the bytearray
263
  corrected_data_tuple = rs.decode(byte_data)
264
  corrected_data = corrected_data_tuple[0]
265
 
266
+ # Remove trailing null bytes
267
  corrected_data = corrected_data.rstrip(b'\x00')
268
 
269
+ # Convert the bytearray back to a binary string
270
  corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
271
 
272
  return corrected_binary_string
273
 
274
 
275
  def manchester_decoding(binary_string):
276
+ """
277
+ This function decodes a Manchester encoded binary string.
278
+
279
+ Parameters:
280
+ binary_string (str): The binary string.
281
+
282
+ Returns:
283
+ str: The decoded binary string.
284
+ """
285
  decoded_string = ''
286
  for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
287
  if i + 1 < len(binary_string):
 
296
 
297
 
298
  def signal_to_binary_between_times(filename):
299
+ """
300
+ This function converts a signal to a binary string between specified times.
301
+
302
+ Parameters:
303
+ filename (str): The path to the audio file.
304
+
305
+ Returns:
306
+ str: The binary string.
307
+ """
308
+ # Get the start and end times of the signal of interest
309
  start_time, end_time = frame_analyse(filename)
310
 
311
+ # Read the audio file
312
  sr, data = read(filename)
313
 
314
+ # Calculate the start and end samples of the signal of interest
315
  start_sample = int((start_time - 0.007) * sr)
316
  end_sample = int((end_time - 0.007) * sr)
317
  binary_string = ''
318
 
319
+ # Convert each sample to a binary digit
 
320
  for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
321
  signal_value = data[i:i + int(sr * bit_duration)]
322
  frequency = dominant_frequency(signal_value)
 
325
  else:
326
  binary_string += '1'
327
 
328
+ # Find the start and end indices of the binary string
329
  index_start = binary_string.find("1000001")
330
  substrings = ["0111110", "011110"]
331
  index_end = -1
 
332
  for substring in substrings:
333
  index = binary_string.find(substring)
334
  if index != -1:
 
338
  print("Binary String:", binary_string)
339
  binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
340
 
341
+ # Decode the binary string
342
  decoded_binary_string = decode_rs(binary_string_decoded, 20)
343
 
344
  return decoded_binary_string
345
 
346
 
347
  def receive():
348
+ """
349
+ This function receives an audio signal, converts it to a binary string, and then converts the binary string to text.
350
+
351
+ Returns:
352
+ str: The received text.
353
+ """
354
  try:
355
+ # Convert the audio signal to a binary string
356
  audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
357
+
358
+ # Convert the binary string to text
359
  return binary_to_text(audio_receive)
360
  except Exception as e:
361
+ # If an error occurs, return an error message
362
  return f"Error: {e}"
363
 
364
 
365
  # -----------------Interface----------------- #
366
 
367
+ # Start a Gradio Blocks interface
368
  with gr.Blocks() as demo:
369
  input_audio = gr.Audio(sources=["upload"])
370
  output_text = gr.Textbox(label="Record Sound")