JaganathC commited on
Commit
236b4e0
·
verified ·
1 Parent(s): a3c5aa1

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +74 -114
app.py CHANGED
@@ -4,169 +4,129 @@ import yt_dlp
4
  import os
5
  import subprocess
6
  import json
 
7
  import time
8
  import langdetect
9
  import uuid
10
- from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
11
 
12
- # Load Hugging Face Token
13
- HF_TOKEN = os.getenv("HF_TOKEN")
14
-
15
- print("Starting the program...")
16
  model_path = "Qwen/Qwen2.5-7B-Instruct"
17
-
18
- # **Efficient Model Loading**
19
- bnb_config = BitsAndBytesConfig(load_in_8bit=True) # Use 8-bit precision to reduce memory usage
20
-
21
- device = "cuda" if torch.cuda.is_available() else "cpu"
22
- print(f"Using device: {device}")
23
-
24
  tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
25
- model = AutoModelForCausalLM.from_pretrained(
26
- model_path,
27
- torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
28
- quantization_config=bnb_config, # Load in 8-bit to save memory
29
- trust_remote_code=True
30
- ).to(device).eval()
31
  print("Model successfully loaded.")
32
 
 
33
  def generate_unique_filename(extension):
34
  return f"{uuid.uuid4()}{extension}"
35
 
 
36
  def cleanup_files(*files):
37
  for file in files:
38
  if file and os.path.exists(file):
39
  os.remove(file)
40
  print(f"Removed file: {file}")
41
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  def download_youtube_audio(url):
43
- """Downloads audio from a YouTube video and converts it to WAV format."""
44
- print(f"Downloading audio from YouTube: {url}")
45
  output_path = generate_unique_filename(".wav")
46
-
47
  ydl_opts = {
48
  'format': 'bestaudio/best',
49
- 'postprocessors': [{
50
- 'key': 'FFmpegExtractAudio',
51
- 'preferredcodec': 'wav',
52
- 'preferredquality': '192',
53
- }],
54
- 'outtmpl': output_path[:-4] # Remove .wav to prevent duplication
55
  }
 
 
 
56
 
57
- try:
58
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
59
- ydl.download([url])
60
- return output_path if os.path.exists(output_path) else "Download Failed"
61
- except Exception as e:
62
- return f"Error downloading audio: {str(e)}"
63
-
64
  def transcribe_audio(file_path):
65
- """Transcribes audio using `insanely-fast-whisper` and handles large files efficiently."""
66
- print(f"Starting transcription of file: {file_path}")
67
- temp_audio = None
68
-
69
  if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
70
- print("Video file detected. Extracting audio using ffmpeg...")
71
- temp_audio = generate_unique_filename(".wav")
72
- command = ["ffmpeg", "-i", file_path, "-q:a", "0", "-map", "a", temp_audio]
73
- subprocess.run(command, check=True)
74
- file_path = temp_audio # Use extracted audio file
75
-
76
  output_file = generate_unique_filename(".json")
77
  command = [
78
- "insanely-fast-whisper",
79
- "--file-name", file_path,
80
- "--device-id", "0",
81
- "--model-name", "openai/whisper-large-v3",
82
- "--task", "transcribe",
83
- "--timestamp", "chunk",
84
  "--transcript-path", output_file
85
  ]
86
-
87
- try:
88
- subprocess.run(command, check=True)
89
- except Exception as e:
90
- return f"Error in transcription: {str(e)}"
91
 
92
- # Process the JSON file in chunks to avoid memory overflow
93
- result = []
94
- try:
95
- with open(output_file, "r") as f:
96
- data = json.load(f) # Load full JSON safely
97
- result = [chunk.get("text", "") for chunk in data]
98
- except Exception as e:
99
- return f"Error reading transcription file: {str(e)}"
100
-
101
- cleanup_files(output_file)
102
- if temp_audio:
103
- cleanup_files(temp_audio)
104
 
105
- return " ".join(result)[:500000] # Limit transcription size
106
-
107
- def generate_summary_stream(transcription):
108
- """Summarizes the transcription efficiently to avoid memory overflow."""
109
- if not transcription:
110
- return "No transcription available."
111
-
112
- detected_language = langdetect.detect(transcription[:1000]) # Detect using a smaller portion
113
-
114
- # Use smaller chunks for processing
115
- chunk_size = 1000 # Reduce chunk size
116
- transcript_chunks = [transcription[i:i+chunk_size] for i in range(0, len(transcription), chunk_size)]
117
- summary_result = []
118
-
119
- for chunk in transcript_chunks[:5]: # Process only the first 5 chunks
120
- prompt = f"""Summarize the following video transcription in 150-300 words in {detected_language}:\n{chunk}"""
121
- try:
122
- input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
123
- output_ids = model.generate(input_ids, max_length=300) # Limit output size
124
- response = tokenizer.decode(output_ids[0], skip_special_tokens=True)
125
- except Exception as e:
126
- response = f"Error generating summary: {str(e)}"
127
- summary_result.append(response)
128
-
129
- return "\n\n".join(summary_result)
130
 
 
131
  def process_youtube(url):
132
- """Handles YouTube video processing: downloads audio, transcribes it, and cleans up."""
133
  if not url:
134
- return "Please enter a YouTube URL.", None
135
-
136
  audio_file = download_youtube_audio(url)
137
- if "Error" in audio_file or audio_file == "Download Failed":
138
- return audio_file, None
139
-
140
- transcription = transcribe_audio(audio_file)
141
- cleanup_files(audio_file) # Clean up the downloaded file
142
- return transcription, None
143
 
 
144
  def process_uploaded_video(video_path):
145
- """Processes uploaded video file for transcription."""
146
- transcription = transcribe_audio(video_path)
147
- return transcription, None
148
 
149
- with gr.Blocks(theme=gr.themes.Soft()) as demo:
 
 
150
  gr.Markdown("""
151
- # 🎥 Video Transcription and Smart Summary
152
- Upload a video or provide a YouTube link to get a transcription and AI-generated summary.
153
  """)
154
-
155
  with gr.Tabs():
156
  with gr.TabItem("📤 Video Upload"):
157
- video_input = gr.Video()
158
  video_button = gr.Button("🚀 Process Video")
159
 
160
  with gr.TabItem("🔗 YouTube Link"):
161
- url_input = gr.Textbox(placeholder="https://www.youtube.com/watch?v=...")
162
  url_button = gr.Button("🚀 Process URL")
163
-
164
  transcription_output = gr.Textbox(label="📝 Transcription", lines=10, show_copy_button=True)
165
  summary_output = gr.Textbox(label="📊 Summary", lines=10, show_copy_button=True)
166
  summary_button = gr.Button("📝 Generate Summary")
167
-
168
  video_button.click(process_uploaded_video, inputs=[video_input], outputs=[transcription_output, summary_output])
169
  url_button.click(process_youtube, inputs=[url_input], outputs=[transcription_output, summary_output])
170
- summary_button.click(generate_summary_stream, inputs=[transcription_output], outputs=[summary_output])
171
 
172
- demo.launch(share=True, debug=True, queue=True)
 
4
  import os
5
  import subprocess
6
  import json
7
+ import moviepy.editor as mp
8
  import time
9
  import langdetect
10
  import uuid
11
+ from transformers import AutoTokenizer, AutoModelForCausalLM
12
 
13
+ # Load Hugging Face Model
14
+ HF_TOKEN = os.environ.get("HF_TOKEN")
 
 
15
  model_path = "Qwen/Qwen2.5-7B-Instruct"
16
+ print(f"Loading model {model_path}...")
 
 
 
 
 
 
17
  tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
18
+ model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, trust_remote_code=True).cuda()
19
+ model = model.eval()
 
 
 
 
20
  print("Model successfully loaded.")
21
 
22
+ # Generate unique filenames
23
  def generate_unique_filename(extension):
24
  return f"{uuid.uuid4()}{extension}"
25
 
26
+ # Cleanup temporary files
27
  def cleanup_files(*files):
28
  for file in files:
29
  if file and os.path.exists(file):
30
  os.remove(file)
31
  print(f"Removed file: {file}")
32
 
33
+ # Extract audio from video
34
+ def extract_audio(video_path):
35
+ audio_path = generate_unique_filename(".wav")
36
+ try:
37
+ video = mp.VideoFileClip(video_path)
38
+ video.audio.write_audiofile(audio_path)
39
+ return audio_path
40
+ except Exception as e:
41
+ print(f"Error extracting audio: {e}")
42
+ return None
43
+
44
+ # Download YouTube audio
45
  def download_youtube_audio(url):
 
 
46
  output_path = generate_unique_filename(".wav")
 
47
  ydl_opts = {
48
  'format': 'bestaudio/best',
49
+ 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}],
50
+ 'outtmpl': output_path,
51
+ 'keepvideo': True,
 
 
 
52
  }
53
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
54
+ ydl.download([url])
55
+ return output_path if os.path.exists(output_path) else None
56
 
57
+ # Transcribe audio using Whisper
 
 
 
 
 
 
58
  def transcribe_audio(file_path):
 
 
 
 
59
  if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
60
+ file_path = extract_audio(file_path)
61
+ if not file_path:
62
+ return "Audio extraction failed.", None
63
+
 
 
64
  output_file = generate_unique_filename(".json")
65
  command = [
66
+ "insanely-fast-whisper", "--file-name", file_path,
67
+ "--device-id", "cpu", "--model-name", "openai/whisper-large-v3",
68
+ "--task", "transcribe", "--timestamp", "chunk",
 
 
 
69
  "--transcript-path", output_file
70
  ]
 
 
 
 
 
71
 
72
+ result = subprocess.run(command, capture_output=True, text=True)
73
+ if result.returncode != 0:
74
+ return f"Transcription failed: {result.stderr}", None
 
 
 
 
 
 
 
 
 
75
 
76
+ if not os.path.exists(output_file):
77
+ return "Transcription file missing.", None
78
+
79
+ with open(output_file, "r") as f:
80
+ transcription = json.load(f)
81
+
82
+ text = transcription.get("text", " ".join([chunk["text"] for chunk in transcription.get("chunks", [])]))
83
+ cleanup_files(output_file, file_path)
84
+ return text, None
85
+
86
+ # Generate summary using Qwen Model
87
+ def generate_summary(transcription):
88
+ detected_language = langdetect.detect(transcription)
89
+ prompt = f"""Summarize the following transcription in 150-300 words:
90
+ Language: {detected_language}
91
+ {transcription[:100000]}"""
92
+
93
+ response, _ = model.chat(tokenizer, prompt, history=[])
94
+ return response
 
 
 
 
 
 
95
 
96
+ # Process YouTube video
97
  def process_youtube(url):
 
98
  if not url:
99
+ return "Please enter a valid YouTube URL.", None
 
100
  audio_file = download_youtube_audio(url)
101
+ return transcribe_audio(audio_file) if audio_file else ("Download failed.", None)
 
 
 
 
 
102
 
103
+ # Process uploaded video
104
  def process_uploaded_video(video_path):
105
+ return transcribe_audio(video_path)
 
 
106
 
107
+ # Gradio Interface
108
+ demo = gr.Blocks()
109
+ with demo:
110
  gr.Markdown("""
111
+ # 🎥 AI Video Transcription & Summary
112
+ Upload a video or provide a YouTube link to get a transcription and AI-generated summary.
113
  """)
114
+
115
  with gr.Tabs():
116
  with gr.TabItem("📤 Video Upload"):
117
+ video_input = gr.File(label="Upload a video file")
118
  video_button = gr.Button("🚀 Process Video")
119
 
120
  with gr.TabItem("🔗 YouTube Link"):
121
+ url_input = gr.Textbox(label="Paste YouTube URL")
122
  url_button = gr.Button("🚀 Process URL")
123
+
124
  transcription_output = gr.Textbox(label="📝 Transcription", lines=10, show_copy_button=True)
125
  summary_output = gr.Textbox(label="📊 Summary", lines=10, show_copy_button=True)
126
  summary_button = gr.Button("📝 Generate Summary")
127
+
128
  video_button.click(process_uploaded_video, inputs=[video_input], outputs=[transcription_output, summary_output])
129
  url_button.click(process_youtube, inputs=[url_input], outputs=[transcription_output, summary_output])
130
+ summary_button.click(generate_summary, inputs=[transcription_output], outputs=[summary_output])
131
 
132
+ demo.launch()