Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -8,7 +8,8 @@ import os
|
|
8 |
|
9 |
# ---------------Parameters--------------- #
|
10 |
|
11 |
-
|
|
|
12 |
|
13 |
low_frequency = 18000
|
14 |
high_frequency = 19000
|
@@ -17,48 +18,187 @@ sample_rate = 44100
|
|
17 |
amplitude_scaling_factor = 15.0
|
18 |
|
19 |
|
20 |
-
# ----------------Useless----------------
|
|
|
21 |
def delete_file(file_path):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
22 |
try:
|
|
|
23 |
os.remove(file_path)
|
|
|
|
|
24 |
print(f"File '{file_path}' deleted successfully.")
|
25 |
except OSError as e:
|
|
|
26 |
print(f"Error deleting file '{file_path}': {e}")
|
27 |
|
28 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
29 |
# -----------------Sender----------------- #
|
30 |
|
31 |
def text_to_binary(text):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
32 |
binary_string = ''.join(format(ord(char), '08b') for char in text)
|
33 |
return binary_string
|
34 |
|
35 |
|
36 |
def signal_function(frequency, time):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
37 |
return np.sin(2 * np.pi * frequency * time)
|
38 |
|
39 |
|
40 |
def generate_silence(duration):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
41 |
return np.zeros(int(sample_rate * duration))
|
42 |
|
43 |
|
44 |
def binary_signal(binary_string):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
45 |
t = np.linspace(0, bit_duration, int(sample_rate * bit_duration), False)
|
46 |
signal = []
|
47 |
|
|
|
48 |
for bit in tqdm(binary_string, desc="Generating Signal"):
|
49 |
if bit == '0':
|
50 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(low_frequency, t)))
|
51 |
else:
|
52 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(high_frequency, t)))
|
53 |
|
|
|
54 |
return np.concatenate(signal)
|
55 |
|
56 |
|
57 |
def flag_encoding(bit_value):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
58 |
flag_duration = 6 * 0.0014
|
59 |
t_flag = np.linspace(0, flag_duration, int(sample_rate * flag_duration), False)
|
60 |
signal = []
|
61 |
|
|
|
62 |
if bit_value == 0:
|
63 |
binary_flag = "100001"
|
64 |
for bit in binary_flag:
|
@@ -80,19 +220,48 @@ def flag_encoding(bit_value):
|
|
80 |
|
81 |
|
82 |
def encode_rs(binary_string, ecc_bytes):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
83 |
byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
|
|
|
|
|
84 |
rs = reedsolo.RSCodec(ecc_bytes)
|
|
|
|
|
85 |
encoded_data = rs.encode(byte_data)
|
|
|
|
|
86 |
encoded_binary_string = ''.join(format(byte, '08b') for byte in encoded_data)
|
87 |
return encoded_binary_string
|
88 |
|
89 |
|
90 |
def manchester_encoding(binary_string):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
91 |
encode_binary_string = encode_rs(binary_string, 20)
|
92 |
|
|
|
93 |
t = np.linspace(0, bit_duration, int(sample_rate * bit_duration), False)
|
94 |
signal = []
|
95 |
|
|
|
96 |
for bit in tqdm(encode_binary_string, desc="Generating Signal"):
|
97 |
if bit == '0':
|
98 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(low_frequency, t)))
|
@@ -105,68 +274,70 @@ def manchester_encoding(binary_string):
|
|
105 |
|
106 |
|
107 |
def binary_to_signal(binary_string):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
108 |
flag_start = flag_encoding(0)
|
109 |
flag_end = flag_encoding(1)
|
110 |
silence_duration = 0.1
|
111 |
silence_before = generate_silence(silence_duration)
|
112 |
silence_after = generate_silence(silence_duration)
|
113 |
|
|
|
114 |
signal = np.concatenate([silence_before, flag_start, manchester_encoding(binary_string), flag_end, silence_after])
|
115 |
|
116 |
return signal
|
117 |
|
118 |
|
119 |
def encode_and_generate_audio(text):
|
120 |
-
|
121 |
-
|
122 |
-
|
123 |
-
|
124 |
-
|
125 |
-
|
126 |
-
|
127 |
-
|
128 |
-
|
129 |
-
|
130 |
-
|
131 |
-
|
132 |
-
|
133 |
-
low = low_frequency / nyquist
|
134 |
-
high = high_frequency / nyquist
|
135 |
-
coef = butter(order, [low, high], btype='band')
|
136 |
-
b = coef[0]
|
137 |
-
a = coef[1]
|
138 |
-
return b, a
|
139 |
-
|
140 |
-
|
141 |
-
def butter_bandpass_filter(data, sr, order=5):
|
142 |
-
b, a = butter_bandpass(sr, order=order)
|
143 |
-
y = lfilter(b, a, data)
|
144 |
-
return y
|
145 |
-
|
146 |
-
|
147 |
-
def main():
|
148 |
-
input_file = 'input_text.wav'
|
149 |
-
output_file = 'output_filtered_sender.wav'
|
150 |
|
151 |
-
|
152 |
-
|
153 |
-
|
154 |
-
|
155 |
-
|
156 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
157 |
except Exception as e:
|
|
|
158 |
return f"Error: {str(e)}"
|
159 |
|
160 |
|
161 |
# -----------------Player----------------- #
|
162 |
|
163 |
def play_sound():
|
164 |
-
return gr.Audio(
|
165 |
|
166 |
|
167 |
-
# -----------------Interface
|
168 |
|
169 |
-
#
|
170 |
with gr.Blocks() as demo:
|
171 |
name = gr.Textbox(label="Your Text")
|
172 |
output = gr.Textbox(label="Output")
|
|
|
8 |
|
9 |
# ---------------Parameters--------------- #
|
10 |
|
11 |
+
input_file = 'input_text.wav'
|
12 |
+
output_file = 'output_filtered_sender.wav'
|
13 |
|
14 |
low_frequency = 18000
|
15 |
high_frequency = 19000
|
|
|
18 |
amplitude_scaling_factor = 15.0
|
19 |
|
20 |
|
21 |
+
# ----------------Useless---------------- #
|
22 |
+
|
23 |
def delete_file(file_path):
|
24 |
+
"""
|
25 |
+
This function deletes a file at the specified path.
|
26 |
+
|
27 |
+
Parameters:
|
28 |
+
file_path (str): The path to the file to be deleted.
|
29 |
+
|
30 |
+
Returns:
|
31 |
+
None
|
32 |
+
"""
|
33 |
try:
|
34 |
+
# Attempt to remove the file
|
35 |
os.remove(file_path)
|
36 |
+
|
37 |
+
# If successful, print a success message
|
38 |
print(f"File '{file_path}' deleted successfully.")
|
39 |
except OSError as e:
|
40 |
+
# If an error occurs (like the file does not exist), print an error message
|
41 |
print(f"Error deleting file '{file_path}': {e}")
|
42 |
|
43 |
|
44 |
+
# -----------------Filter----------------- #
|
45 |
+
|
46 |
+
def butter_bandpass(sr, order=5):
|
47 |
+
"""
|
48 |
+
This function designs a Butterworth bandpass filter.
|
49 |
+
|
50 |
+
Parameters:
|
51 |
+
sr (int): The sample rate of the audio.
|
52 |
+
order (int): The order of the filter.
|
53 |
+
|
54 |
+
Returns:
|
55 |
+
tuple: The filter coefficients `b` and `a`.
|
56 |
+
"""
|
57 |
+
# Calculate the Nyquist frequency
|
58 |
+
nyquist = 0.5 * sr
|
59 |
+
|
60 |
+
# Normalize the cutoff frequencies with a 500 Hz offset
|
61 |
+
low = (low_frequency - 500) / nyquist
|
62 |
+
high = (high_frequency + 500) / nyquist
|
63 |
+
|
64 |
+
# Design the Butterworth bandpass filter
|
65 |
+
coef = butter(order, [low, high], btype='band')
|
66 |
+
|
67 |
+
# Extract the filter coefficients
|
68 |
+
b = coef[0]
|
69 |
+
a = coef[1]
|
70 |
+
|
71 |
+
return b, a
|
72 |
+
|
73 |
+
|
74 |
+
def butter_bandpass_filter(data, sr, order=5):
|
75 |
+
"""
|
76 |
+
This function applies the Butterworth bandpass filter to a given data.
|
77 |
+
|
78 |
+
Parameters:
|
79 |
+
data (array): The audio data to be filtered.
|
80 |
+
sr (int): The sample rate of the audio.
|
81 |
+
order (int): The order of the filter.
|
82 |
+
|
83 |
+
Returns:
|
84 |
+
array: The filtered audio data.
|
85 |
+
"""
|
86 |
+
# Get the filter coefficients
|
87 |
+
b, a = butter_bandpass(sr, order=order)
|
88 |
+
|
89 |
+
# Apply the filter to the data
|
90 |
+
y = lfilter(b, a, data)
|
91 |
+
|
92 |
+
return y
|
93 |
+
|
94 |
+
|
95 |
+
def filtered():
|
96 |
+
"""
|
97 |
+
This function reads an audio file, applies the bandpass filter to the audio data,
|
98 |
+
and then writes the filtered data to an output file.
|
99 |
+
|
100 |
+
Returns:
|
101 |
+
str: A success message if the audio is filtered correctly, otherwise an error message.
|
102 |
+
"""
|
103 |
+
# Read the audio data from the input file
|
104 |
+
sr, data = read(input_file)
|
105 |
+
|
106 |
+
# Apply the bandpass filter to the audio data
|
107 |
+
filtered_data = butter_bandpass_filter(data, sr)
|
108 |
+
|
109 |
+
# Write the filtered data to the output file
|
110 |
+
write(output_file, sr, np.int16(filtered_data))
|
111 |
+
|
112 |
+
return "Filtered Audio Generated"
|
113 |
+
|
114 |
+
|
115 |
# -----------------Sender----------------- #
|
116 |
|
117 |
def text_to_binary(text):
|
118 |
+
"""
|
119 |
+
This function converts a text string to a binary string.
|
120 |
+
|
121 |
+
Parameters:
|
122 |
+
text (str): The text string.
|
123 |
+
|
124 |
+
Returns:
|
125 |
+
str: The binary string.
|
126 |
+
"""
|
127 |
+
# Convert each character in the text to its ASCII value, format it as an 8-bit binary number, and join them together
|
128 |
binary_string = ''.join(format(ord(char), '08b') for char in text)
|
129 |
return binary_string
|
130 |
|
131 |
|
132 |
def signal_function(frequency, time):
|
133 |
+
"""
|
134 |
+
This function generates a sinusoidal signal with a given frequency and time.
|
135 |
+
|
136 |
+
Parameters:
|
137 |
+
frequency (float): The frequency of the signal.
|
138 |
+
time (array): The time values for the signal.
|
139 |
+
|
140 |
+
Returns:
|
141 |
+
array: The generated signal.
|
142 |
+
"""
|
143 |
+
# Return a sinusoidal signal with the given frequency and time
|
144 |
return np.sin(2 * np.pi * frequency * time)
|
145 |
|
146 |
|
147 |
def generate_silence(duration):
|
148 |
+
"""
|
149 |
+
This function generates a silence signal with a given duration.
|
150 |
+
|
151 |
+
Parameters:
|
152 |
+
duration (float): The duration of the silence.
|
153 |
+
|
154 |
+
Returns:
|
155 |
+
array: The silence signal.
|
156 |
+
"""
|
157 |
+
# Return a zero signal with the length corresponding to the given duration
|
158 |
return np.zeros(int(sample_rate * duration))
|
159 |
|
160 |
|
161 |
def binary_signal(binary_string):
|
162 |
+
"""
|
163 |
+
This function converts a binary string to a signal.
|
164 |
+
|
165 |
+
Parameters:
|
166 |
+
binary_string (str): The binary string.
|
167 |
+
|
168 |
+
Returns:
|
169 |
+
array: The signal.
|
170 |
+
"""
|
171 |
+
# Generate the time values for the signal
|
172 |
t = np.linspace(0, bit_duration, int(sample_rate * bit_duration), False)
|
173 |
signal = []
|
174 |
|
175 |
+
# For each bit in the binary string, generate a signal with the low or high frequency depending on the bit value
|
176 |
for bit in tqdm(binary_string, desc="Generating Signal"):
|
177 |
if bit == '0':
|
178 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(low_frequency, t)))
|
179 |
else:
|
180 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(high_frequency, t)))
|
181 |
|
182 |
+
# Concatenate the generated signals into one signal
|
183 |
return np.concatenate(signal)
|
184 |
|
185 |
|
186 |
def flag_encoding(bit_value):
|
187 |
+
"""
|
188 |
+
This function encodes a bit value into a flag signal.
|
189 |
+
|
190 |
+
Parameters:
|
191 |
+
bit_value (int): The bit value (0 or 1).
|
192 |
+
|
193 |
+
Returns:
|
194 |
+
array: The flag signal.
|
195 |
+
"""
|
196 |
+
# Generate the time values for the flag signal
|
197 |
flag_duration = 6 * 0.0014
|
198 |
t_flag = np.linspace(0, flag_duration, int(sample_rate * flag_duration), False)
|
199 |
signal = []
|
200 |
|
201 |
+
# Depending on the bit value, generate a flag signal with the corresponding binary flag
|
202 |
if bit_value == 0:
|
203 |
binary_flag = "100001"
|
204 |
for bit in binary_flag:
|
|
|
220 |
|
221 |
|
222 |
def encode_rs(binary_string, ecc_bytes):
|
223 |
+
"""
|
224 |
+
This function encodes a binary string using Reed-Solomon encoding.
|
225 |
+
|
226 |
+
Parameters:
|
227 |
+
binary_string (str): The binary string.
|
228 |
+
ecc_bytes (int): The number of error correction bytes used in the encoding.
|
229 |
+
|
230 |
+
Returns:
|
231 |
+
str: The encoded binary string.
|
232 |
+
"""
|
233 |
+
# Convert the binary string to a bytearray
|
234 |
byte_data = bytearray(int(binary_string[i:i + 8], 2) for i in range(0, len(binary_string), 8))
|
235 |
+
|
236 |
+
# Initialize a Reed-Solomon codec
|
237 |
rs = reedsolo.RSCodec(ecc_bytes)
|
238 |
+
|
239 |
+
# Encode the bytearray
|
240 |
encoded_data = rs.encode(byte_data)
|
241 |
+
|
242 |
+
# Convert the encoded bytearray back to a binary string
|
243 |
encoded_binary_string = ''.join(format(byte, '08b') for byte in encoded_data)
|
244 |
return encoded_binary_string
|
245 |
|
246 |
|
247 |
def manchester_encoding(binary_string):
|
248 |
+
"""
|
249 |
+
This function encodes a binary string using Manchester encoding.
|
250 |
+
|
251 |
+
Parameters:
|
252 |
+
binary_string (str): The binary string.
|
253 |
+
|
254 |
+
Returns:
|
255 |
+
array: The Manchester encoded signal.
|
256 |
+
"""
|
257 |
+
# Encode the binary string using Reed-Solomon encoding
|
258 |
encode_binary_string = encode_rs(binary_string, 20)
|
259 |
|
260 |
+
# Generate the time values for the signal
|
261 |
t = np.linspace(0, bit_duration, int(sample_rate * bit_duration), False)
|
262 |
signal = []
|
263 |
|
264 |
+
# For each bit in the encoded binary string, generate a Manchester encoded signal
|
265 |
for bit in tqdm(encode_binary_string, desc="Generating Signal"):
|
266 |
if bit == '0':
|
267 |
signal.append(amplitude_scaling_factor * np.sign(signal_function(low_frequency, t)))
|
|
|
274 |
|
275 |
|
276 |
def binary_to_signal(binary_string):
|
277 |
+
"""
|
278 |
+
This function converts a binary string to a signal.
|
279 |
+
|
280 |
+
Parameters:
|
281 |
+
binary_string (str): The binary string.
|
282 |
+
|
283 |
+
Returns:
|
284 |
+
array: The signal.
|
285 |
+
"""
|
286 |
+
# Generate the start and end flags and the silence signals
|
287 |
flag_start = flag_encoding(0)
|
288 |
flag_end = flag_encoding(1)
|
289 |
silence_duration = 0.1
|
290 |
silence_before = generate_silence(silence_duration)
|
291 |
silence_after = generate_silence(silence_duration)
|
292 |
|
293 |
+
# Concatenate the silence signals, the start and end flags, and the Manchester encoded signal into one signal
|
294 |
signal = np.concatenate([silence_before, flag_start, manchester_encoding(binary_string), flag_end, silence_after])
|
295 |
|
296 |
return signal
|
297 |
|
298 |
|
299 |
def encode_and_generate_audio(text):
|
300 |
+
"""
|
301 |
+
This function encodes a text string into a binary string, converts the binary string to a signal, and writes the signal to an audio file.
|
302 |
+
|
303 |
+
Parameters:
|
304 |
+
text (str): The text string.
|
305 |
+
|
306 |
+
Returns:
|
307 |
+
str: A success message if the audio file is generated correctly, otherwise an error message.
|
308 |
+
"""
|
309 |
+
try:
|
310 |
+
# Delete the input and output files if they exist
|
311 |
+
delete_file(input_file)
|
312 |
+
delete_file(output_file)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
313 |
|
314 |
+
# Convert the text to a binary string
|
315 |
+
binary_string_to_send = text_to_binary(text)
|
316 |
+
|
317 |
+
# Convert the binary string to a signal
|
318 |
+
signal = binary_to_signal(binary_string_to_send)
|
319 |
+
|
320 |
+
# Write the signal to an audio file
|
321 |
+
write('output_text.wav', 44100, signal.astype(np.int16))
|
322 |
+
|
323 |
+
# Apply the bandpass filter to the audio data and write the filtered data to an output file
|
324 |
+
filtered()
|
325 |
+
|
326 |
+
return "WAV file generated and ready to be sent."
|
327 |
except Exception as e:
|
328 |
+
# If an error occurs, return an error message
|
329 |
return f"Error: {str(e)}"
|
330 |
|
331 |
|
332 |
# -----------------Player----------------- #
|
333 |
|
334 |
def play_sound():
|
335 |
+
return gr.Audio(output_file, autoplay=True)
|
336 |
|
337 |
|
338 |
+
# -----------------Interface-----------------#
|
339 |
|
340 |
+
# Start a Gradio Blocks interface
|
341 |
with gr.Blocks() as demo:
|
342 |
name = gr.Textbox(label="Your Text")
|
343 |
output = gr.Textbox(label="Output")
|