Awell00 commited on
Commit
a3c18bc
·
1 Parent(s): 2557ecb

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +42 -170
app.py CHANGED
@@ -134,196 +134,91 @@ def record(audio):
134
  # -----------------Frame----------------- #
135
 
136
  def calculate_snr(data, start, end, target_frequency):
137
- """
138
- This function calculates the Signal-to-Noise Ratio (SNR) for a given frequency within a segment of data.
 
 
 
139
 
140
- Parameters:
141
- data (array): The audio data.
142
- start (int): The start index of the segment.
143
- end (int): The end index of the segment.
144
- target_frequency (float): The frequency for which the SNR is to be calculated.
145
 
146
- Returns:
147
- float: The calculated SNR.
148
- """
149
- try:
150
- # Extract the segment from the data
151
- segment = data[start:end]
152
 
153
- # Perform a Fast Fourier Transform on the segment
154
- spectrum = np.fft.fft(segment)
155
 
156
- # Generate the frequencies corresponding to the FFT coefficients
157
- frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
158
 
159
- # Find the index of the target frequency
160
- target_index = np.abs(frequencies - target_frequency).argmin()
161
 
162
- # Calculate the amplitude of the target frequency
163
- amplitude = np.abs(spectrum[target_index])
164
 
165
- # Define a noise segment
166
- noise_segment = data[100:1000 + len(segment)]
167
 
168
- # Perform a Fast Fourier Transform on the noise segment
169
- noise_spectrum = np.fft.fft(noise_segment)
170
 
171
- # Calculate the amplitude of the noise at the target frequency
172
- noise_amplitude = np.abs(noise_spectrum[target_index])
 
 
 
 
173
 
174
- # Calculate the SNR
175
- snr = 10 * np.log10(amplitude / noise_amplitude)
176
 
177
- return snr
178
- except Exception as e:
179
- # If an error occurs, return an error message
180
- return f"Error: {e}"
181
 
 
 
182
 
183
- def frame_analyse(filename):
184
- """
185
- This function analyses an audio file and returns the start and end times of the signal of interest.
186
 
187
- Parameters:
188
- filename (str): The path to the audio file.
189
 
190
- Returns:
191
- tuple: The start and end times of the signal of interest.
192
- """
193
- try:
194
- # Read the audio file
195
- sr, y = read(filename)
196
-
197
- # Define the start and end indices of the first and second parts of the audio data
198
- first_part_start = 0
199
- first_part_end = len(y) // 2
200
- second_part_start = len(y) // 2
201
- second_part_end = len(y)
202
-
203
- # Define the segment length and overlap size for the spectrogram
204
- segment_length = 256
205
- overlap_size = 128
206
-
207
- # Calculate the spectrogram of the audio data
208
- f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
209
-
210
- # Plot the spectrogram
211
- plt.figure()
212
- plt.pcolormesh(t, f, sxx, shading="gouraud")
213
- plt.xlabel("Time [s]")
214
- plt.ylabel("Frequency [Hz]")
215
- plt.title("Spectrogram of the signal")
216
- plt.show()
217
-
218
- # Define the target frequency
219
- f0 = 18000
220
-
221
- # Find the index of the target frequency
222
- f_idx = np.argmin(np.abs(f - f0))
223
-
224
- # Calculate the SNR thresholds for the start and end of the signal
225
- thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
226
- thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
227
-
228
- # Find the start and end indices of the signal of interest
229
- t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
230
- t_idx_end = t_idx_start
231
- while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
232
- t_idx_end += 1
233
-
234
- # Convert the start and end indices to times
235
- t_start = t[t_idx_start]
236
- t_end = t[t_idx_end]
237
-
238
- return t_start, t_end
239
- except Exception as e:
240
- # If an error occurs, return an error message
241
- return f"Error: {e}"
242
 
 
243
 
244
- # -----------------Receiver----------------- #
245
 
246
- def dominant_frequency(signal_value):
247
- """
248
- This function calculates the dominant frequency in a given signal.
249
 
250
- Parameters:
251
- signal_value (array): The signal data.
252
 
253
- Returns:
254
- float: The dominant frequency.
255
- """
256
- # Perform a Fast Fourier Transform on the signal
257
  yf = fft(signal_value)
258
-
259
- # Generate the frequencies corresponding to the FFT coefficients
260
  xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
261
-
262
- # Find the peaks in the absolute values of the FFT coefficients
263
  peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
264
-
265
- # Return the frequency corresponding to the peak with the highest amplitude
266
  return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
267
 
268
 
269
  def binary_to_text(binary):
270
- """
271
- This function converts a binary string to text.
272
-
273
- Parameters:
274
- binary (str): The binary string.
275
-
276
- Returns:
277
- str: The converted text.
278
- """
279
  try:
280
- # Convert each 8-bit binary number to a character and join them together
281
  return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
282
  except Exception as e:
283
- # If an error occurs, return an error message
284
- return f"Error: {e}"
285
 
286
 
287
  def decode_rs(binary_string, ecc_bytes):
288
- """
289
- This function decodes a Reed-Solomon encoded binary string.
290
-
291
- Parameters:
292
- binary_string (str): The binary string.
293
- ecc_bytes (int): The number of error correction bytes used in the encoding.
294
-
295
- Returns:
296
- str: The decoded binary string.
297
- """
298
- # Convert the binary string to a bytearray
299
  byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
300
-
301
- # Initialize a Reed-Solomon codec
302
  rs = reedsolo.RSCodec(ecc_bytes)
303
-
304
- # Decode the bytearray
305
  corrected_data_tuple = rs.decode(byte_data)
306
  corrected_data = corrected_data_tuple[0]
307
 
308
- # Remove trailing null bytes
309
  corrected_data = corrected_data.rstrip(b'\x00')
310
 
311
- # Convert the bytearray back to a binary string
312
  corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
313
 
314
  return corrected_binary_string
315
 
316
 
317
  def manchester_decoding(binary_string):
318
- """
319
- This function decodes a Manchester encoded binary string.
320
-
321
- Parameters:
322
- binary_string (str): The binary string.
323
-
324
- Returns:
325
- str: The decoded binary string.
326
- """
327
  decoded_string = ''
328
  for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
329
  if i + 1 < len(binary_string):
@@ -338,27 +233,16 @@ def manchester_decoding(binary_string):
338
 
339
 
340
  def signal_to_binary_between_times(filename):
341
- """
342
- This function converts a signal to a binary string between specified times.
343
-
344
- Parameters:
345
- filename (str): The path to the audio file.
346
-
347
- Returns:
348
- str: The binary string.
349
- """
350
- # Get the start and end times of the signal of interest
351
  start_time, end_time = frame_analyse(filename)
352
 
353
- # Read the audio file
354
  sr, data = read(filename)
355
 
356
- # Calculate the start and end samples of the signal of interest
357
  start_sample = int((start_time - 0.007) * sr)
358
  end_sample = int((end_time - 0.007) * sr)
359
  binary_string = ''
360
 
361
- # Convert each sample to a binary digit
 
362
  for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
363
  signal_value = data[i:i + int(sr * bit_duration)]
364
  frequency = dominant_frequency(signal_value)
@@ -367,10 +251,10 @@ def signal_to_binary_between_times(filename):
367
  else:
368
  binary_string += '1'
369
 
370
- # Find the start and end indices of the binary string
371
  index_start = binary_string.find("1000001")
372
  substrings = ["0111110", "011110"]
373
  index_end = -1
 
374
  for substring in substrings:
375
  index = binary_string.find(substring)
376
  if index != -1:
@@ -380,33 +264,21 @@ def signal_to_binary_between_times(filename):
380
  print("Binary String:", binary_string)
381
  binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
382
 
383
- # Decode the binary string
384
  decoded_binary_string = decode_rs(binary_string_decoded, 20)
385
 
386
  return decoded_binary_string
387
 
388
 
389
  def receive():
390
- """
391
- This function receives an audio signal, converts it to a binary string, and then converts the binary string to text.
392
-
393
- Returns:
394
- str: The received text.
395
- """
396
  try:
397
- # Convert the audio signal to a binary string
398
  audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
399
-
400
- # Convert the binary string to text
401
  return binary_to_text(audio_receive)
402
  except Exception as e:
403
- # If an error occurs, return an error message
404
  return f"Error: {e}"
405
 
406
 
407
  # -----------------Interface----------------- #
408
 
409
- # Start a Gradio Blocks interface
410
  with gr.Blocks() as demo:
411
  input_audio = gr.Audio(sources=["upload"])
412
  output_text = gr.Textbox(label="Record Sound")
 
134
  # -----------------Frame----------------- #
135
 
136
  def calculate_snr(data, start, end, target_frequency):
137
+ segment = data[start:end]
138
+ spectrum = np.fft.fft(segment)
139
+ frequencies = np.fft.fftfreq(len(spectrum), 1 / sample_rate)
140
+ target_index = np.abs(frequencies - target_frequency).argmin()
141
+ amplitude = np.abs(spectrum[target_index])
142
 
143
+ noise_segment = data[100:1000 + len(segment)]
144
+ noise_spectrum = np.fft.fft(noise_segment)
145
+ noise_amplitude = np.abs(noise_spectrum[target_index])
 
 
146
 
147
+ snr = 10 * np.log10(amplitude / noise_amplitude)
148
+ return snr
 
 
 
 
149
 
 
 
150
 
151
+ def frame_analyse(filename):
152
+ sr, y = read(filename)
153
 
154
+ first_part_start = 0
155
+ first_part_end = len(y) // 2
156
 
157
+ second_part_start = len(y) // 2
158
+ second_part_end = len(y)
159
 
160
+ segment_length = 256
161
+ overlap_size = 128
162
 
163
+ f, t, sxx = signal.spectrogram(y, sr, nperseg=segment_length, noverlap=overlap_size)
 
164
 
165
+ plt.figure()
166
+ plt.pcolormesh(t, f, sxx, shading="gouraud")
167
+ plt.xlabel("Time [s]")
168
+ plt.ylabel("Frequency [Hz]")
169
+ plt.title("Spectrogram of the signal")
170
+ plt.show()
171
 
172
+ f0 = 18000
 
173
 
174
+ f_idx = np.argmin(np.abs(f - f0))
 
 
 
175
 
176
+ thresholds_start = calculate_snr(y, first_part_start, first_part_end, low_frequency)
177
+ thresholds_end = calculate_snr(y, second_part_start, second_part_end, high_frequency)
178
 
179
+ t_idx_start = np.argmax(sxx[f_idx] > thresholds_start)
 
 
180
 
181
+ t_start = t[t_idx_start]
 
182
 
183
+ t_idx_end = t_idx_start
184
+ while t_idx_end < len(t) and np.max(sxx[f_idx, t_idx_end:]) > thresholds_end:
185
+ t_idx_end += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
 
187
+ t_end = t[t_idx_end]
188
 
189
+ return t_start, t_end
190
 
 
 
 
191
 
192
+ # -----------------Receiver----------------- #
 
193
 
194
+ def dominant_frequency(signal_value):
 
 
 
195
  yf = fft(signal_value)
 
 
196
  xf = np.linspace(0.0, sample_rate / 2.0, len(signal_value) // 2)
 
 
197
  peaks, _ = find_peaks(np.abs(yf[0:len(signal_value) // 2]))
 
 
198
  return xf[peaks[np.argmax(np.abs(yf[0:len(signal_value) // 2][peaks]))]]
199
 
200
 
201
  def binary_to_text(binary):
 
 
 
 
 
 
 
 
 
202
  try:
 
203
  return ''.join(chr(int(binary[i:i + 8], 2)) for i in range(0, len(binary), 8))
204
  except Exception as e:
205
+ return f"Except: {e}"
 
206
 
207
 
208
  def decode_rs(binary_string, ecc_bytes):
 
 
 
 
 
 
 
 
 
 
 
209
  byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
 
 
210
  rs = reedsolo.RSCodec(ecc_bytes)
 
 
211
  corrected_data_tuple = rs.decode(byte_data)
212
  corrected_data = corrected_data_tuple[0]
213
 
 
214
  corrected_data = corrected_data.rstrip(b'\x00')
215
 
 
216
  corrected_binary_string = ''.join(format(byte, '08b') for byte in corrected_data)
217
 
218
  return corrected_binary_string
219
 
220
 
221
  def manchester_decoding(binary_string):
 
 
 
 
 
 
 
 
 
222
  decoded_string = ''
223
  for i in tqdm(range(0, len(binary_string), 2), desc="Decoding"):
224
  if i + 1 < len(binary_string):
 
233
 
234
 
235
  def signal_to_binary_between_times(filename):
 
 
 
 
 
 
 
 
 
 
236
  start_time, end_time = frame_analyse(filename)
237
 
 
238
  sr, data = read(filename)
239
 
 
240
  start_sample = int((start_time - 0.007) * sr)
241
  end_sample = int((end_time - 0.007) * sr)
242
  binary_string = ''
243
 
244
+ start_analyse_time = time.time()
245
+
246
  for i in tqdm(range(start_sample, end_sample, int(sr * bit_duration))):
247
  signal_value = data[i:i + int(sr * bit_duration)]
248
  frequency = dominant_frequency(signal_value)
 
251
  else:
252
  binary_string += '1'
253
 
 
254
  index_start = binary_string.find("1000001")
255
  substrings = ["0111110", "011110"]
256
  index_end = -1
257
+
258
  for substring in substrings:
259
  index = binary_string.find(substring)
260
  if index != -1:
 
264
  print("Binary String:", binary_string)
265
  binary_string_decoded = manchester_decoding(binary_string[index_start + 7:index_end])
266
 
 
267
  decoded_binary_string = decode_rs(binary_string_decoded, 20)
268
 
269
  return decoded_binary_string
270
 
271
 
272
  def receive():
 
 
 
 
 
 
273
  try:
 
274
  audio_receive = signal_to_binary_between_times('output_filtered_receiver.wav')
 
 
275
  return binary_to_text(audio_receive)
276
  except Exception as e:
 
277
  return f"Error: {e}"
278
 
279
 
280
  # -----------------Interface----------------- #
281
 
 
282
  with gr.Blocks() as demo:
283
  input_audio = gr.Audio(sources=["upload"])
284
  output_text = gr.Textbox(label="Record Sound")