|
from smolagents.tools import Tool |
|
from typing import Optional, Union, Dict, Any |
|
import os |
|
import time |
|
import requests |
|
import io |
|
from PIL import Image |
|
from pytubefix import YouTube |
|
import docx |
|
from docx.shared import Pt, RGBColor, Inches |
|
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT |
|
import google.generativeai as genai |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
class TranscriptSummarizer(Tool): |
|
description = "Summarizes a transcript and generates blog content using Google's Gemini model for summarization and Hugging Face API for image generation." |
|
name = "transcript_summarizer" |
|
inputs = { |
|
'transcript': {'type': 'string', 'description': 'The transcript to summarize.'}, |
|
'language': {'type': 'string', 'description': 'The language of the transcript.', 'nullable': True} |
|
} |
|
output_type = "string" |
|
|
|
def __init__(self, *args, hf_api_key: str = None, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
gemini_api_key = os.getenv("GEMINI_API_KEY") |
|
if gemini_api_key: |
|
|
|
genai.configure(api_key=gemini_api_key) |
|
|
|
self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') |
|
else: |
|
self.gemini_model = None |
|
|
|
|
|
self.api_url = "https://api-inference.huggingface.co/models/ZB-Tech/Text-to-Image" |
|
self.hf_api_key = hf_api_key |
|
self.headers = {"Authorization": f"Bearer {self.hf_api_key}"} |
|
|
|
def query_image_api(self, payload): |
|
response = requests.post(self.api_url, headers=self.headers, json=payload) |
|
return response.content |
|
|
|
def summarize_with_gemini(self, text, language='en', max_tokens=1000): |
|
"""Use Gemini to summarize text in the specified language""" |
|
|
|
language_map = { |
|
'en': 'English', |
|
'hi': 'Hindi', |
|
'es': 'Spanish', |
|
'fr': 'French', |
|
'de': 'German', |
|
'it': 'Italian', |
|
'ja': 'Japanese', |
|
'ko': 'Korean', |
|
'pt': 'Portuguese', |
|
'ru': 'Russian', |
|
'zh': 'Chinese', |
|
'ar': 'Arabic', |
|
'bn': 'Bengali', |
|
'ta': 'Tamil', |
|
'te': 'Telugu', |
|
'mr': 'Marathi', |
|
'gu': 'Gujarati', |
|
'kn': 'Kannada', |
|
'ml': 'Malayalam', |
|
'pa': 'Punjabi', |
|
'ur': 'Urdu' |
|
|
|
} |
|
|
|
language_name = language_map.get(language, language) |
|
|
|
prompt = f""" |
|
Please summarize the following transcript in a concise but comprehensive way. |
|
Focus on the main points and key information. |
|
|
|
IMPORTANT: The transcript is in {language_name}. Please provide the summary in the SAME LANGUAGE ({language_name}). |
|
Do not translate to any other language. Keep the summary in the original language of the transcript. |
|
|
|
Transcript: |
|
{text} |
|
""" |
|
|
|
generation_config = { |
|
"temperature": 0.4, |
|
"top_p": 0.95, |
|
"top_k": 40, |
|
"max_output_tokens": max_tokens, |
|
} |
|
|
|
response = self.gemini_model.generate_content( |
|
prompt, |
|
generation_config=generation_config |
|
) |
|
|
|
return response.text |
|
|
|
def forward(self, transcript: str, language: str = 'en') -> str: |
|
try: |
|
if not self.hf_api_key: |
|
return "Hugging Face API key is required for image generation. Please provide it in the input field." |
|
|
|
if not self.gemini_model: |
|
return "Gemini API key is required for summarization. Please add it to your .env file." |
|
|
|
transcript_length = len(transcript) |
|
|
|
|
|
if transcript_length < 100: |
|
return "Transcript is too short to summarize." |
|
|
|
|
|
if transcript_length > 30000: |
|
chunk_size = 25000 |
|
transcript_chunks = [transcript[i:i+chunk_size] for i in range(0, len(transcript), chunk_size)] |
|
|
|
|
|
chunk_summaries = [] |
|
for chunk in transcript_chunks: |
|
chunk_summary = self.summarize_with_gemini(chunk, language=language, max_tokens=1000) |
|
chunk_summaries.append(chunk_summary) |
|
|
|
|
|
combined_summary = "\n\n".join(chunk_summaries) |
|
if len(combined_summary) > 25000: |
|
full_summary = self.summarize_with_gemini(combined_summary, language=language, max_tokens=2000) |
|
else: |
|
full_summary = combined_summary |
|
else: |
|
|
|
full_summary = self.summarize_with_gemini(transcript, language=language, max_tokens=2000) |
|
|
|
|
|
try: |
|
key_entities = full_summary.split()[:15] |
|
image_prompt = f"Generate an image related to: {' '.join(key_entities)}, cartoon style" |
|
image_bytes = self.query_image_api({"inputs": image_prompt}) |
|
|
|
|
|
if not image_bytes or len(image_bytes) < 100: |
|
print("Warning: Received invalid or empty image response") |
|
return full_summary |
|
|
|
try: |
|
|
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
image_folder = "Image" |
|
if not os.path.exists(image_folder): |
|
os.makedirs(image_folder) |
|
image_url = os.path.join(image_folder, f"image_{int(time.time())}.jpg") |
|
image.save(image_url) |
|
|
|
return f"{full_summary}\n\nImage URL: {image_url}" |
|
except Exception as img_error: |
|
print(f"Error processing image: {str(img_error)}") |
|
|
|
return full_summary |
|
except Exception as img_gen_error: |
|
print(f"Error generating image: {str(img_gen_error)}") |
|
|
|
return full_summary |
|
except Exception as e: |
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
class YouTubeTranscriptExtractor(Tool): |
|
description = "Extracts the transcript from a YouTube video." |
|
name = "youtube_transcript_extractor" |
|
inputs = {'video_url': {'type': 'string', 'description': 'The URL of the YouTube video.'}} |
|
output_type = "string" |
|
|
|
def forward(self, video_url: str) -> str: |
|
try: |
|
|
|
yt = YouTube(video_url) |
|
lang = 'en' |
|
|
|
|
|
try: |
|
if 'en' in yt.captions: |
|
transcript = yt.captions['en'].generate_srt_captions() |
|
lang = 'en' |
|
else: |
|
|
|
if len(yt.captions.all()) > 0: |
|
caption = yt.captions.all()[0] |
|
transcript = caption.generate_srt_captions() |
|
lang = caption.code |
|
else: |
|
return f"LANGUAGE:{lang}||No transcript available for this video." |
|
except StopIteration: |
|
return f"LANGUAGE:{lang}||No transcript available for this video." |
|
except Exception as e: |
|
return f"LANGUAGE:{lang}||An unexpected error occurred while accessing captions: {str(e)}" |
|
|
|
|
|
cleaned_transcript = "" |
|
for line in transcript.splitlines(): |
|
if not line.strip().isdigit() and "-->" not in line: |
|
cleaned_transcript += line + "\n" |
|
|
|
print(f"Transcript language detected: {lang}") |
|
print("Transcript sample: ", cleaned_transcript[:200] + "..." if len(cleaned_transcript) > 200 else cleaned_transcript) |
|
|
|
|
|
|
|
return f"LANGUAGE:{lang}||{cleaned_transcript}" |
|
except Exception as e: |
|
return f"LANGUAGE:en||An unexpected error occurred: {str(e)}" |
|
|
|
def __init__(self, *args, **kwargs): |
|
self.is_initialized = False |
|
|
|
class TranscriptToDocx(Tool): |
|
description = "Creates or updates a DOCX file with YouTube transcript and summary." |
|
name = "transcript_to_docx" |
|
inputs = { |
|
'transcript': {'type': 'string', 'description': 'The transcript to include in the document.'}, |
|
'summary': {'type': 'string', 'description': 'The summary to include in the document.'}, |
|
'video_title': {'type': 'string', 'description': 'The title of the YouTube video.'}, |
|
'image_path': {'type': 'string', 'description': 'Path to the image to include in the document.', 'nullable': True}, |
|
'existing_docx_path': {'type': 'string', 'description': 'Path to an existing DOCX file to update.', 'nullable': True} |
|
} |
|
output_type = "string" |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.docx_folder = "Documents" |
|
if not os.path.exists(self.docx_folder): |
|
os.makedirs(self.docx_folder) |
|
|
|
def forward(self, transcript: str, summary: str, video_title: str, image_path: Optional[str] = None, existing_docx_path: Optional[str] = None) -> str: |
|
try: |
|
|
|
if existing_docx_path and os.path.exists(existing_docx_path): |
|
doc = docx.Document(existing_docx_path) |
|
|
|
doc.add_paragraph().add_run().add_break(docx.enum.text.WD_BREAK.PAGE) |
|
else: |
|
doc = docx.Document() |
|
|
|
doc.core_properties.title = f"YouTube Transcript: {video_title}" |
|
doc.core_properties.author = "YouTube Transcript Tool" |
|
|
|
|
|
title = doc.add_heading(video_title, level=1) |
|
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER |
|
|
|
|
|
doc.add_heading("Summary", level=2) |
|
summary_para = doc.add_paragraph(summary) |
|
|
|
|
|
if image_path and os.path.exists(image_path): |
|
try: |
|
doc.add_picture(image_path, width=Inches(6)) |
|
|
|
caption = doc.add_paragraph("Generated image based on transcript content") |
|
caption.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER |
|
caption.runs[0].italic = True |
|
except Exception as img_error: |
|
|
|
print(f"Error adding image to document: {str(img_error)}") |
|
|
|
|
|
doc.add_heading("Full Transcript", level=2) |
|
transcript_para = doc.add_paragraph(transcript) |
|
|
|
|
|
safe_title = ''.join(c for c in video_title if c.isalnum() or c in ' _-') |
|
safe_title = safe_title.replace(' ', '_') |
|
|
|
|
|
output_filename = f"{safe_title}.docx" |
|
output_path = os.path.join(self.docx_folder, output_filename) |
|
|
|
try: |
|
doc.save(output_path) |
|
print(f"Document saved successfully at: {output_path}") |
|
return output_path |
|
except Exception as save_error: |
|
error_msg = f"Error saving document: {str(save_error)}" |
|
print(error_msg) |
|
|
|
try: |
|
fallback_path = os.path.join(self.docx_folder, f"youtube_transcript_{int(time.time())}.docx") |
|
doc.save(fallback_path) |
|
print(f"Document saved with fallback name at: {fallback_path}") |
|
return fallback_path |
|
except: |
|
return error_msg |
|
except Exception as e: |
|
return f"An error occurred while creating the DOCX file: {str(e)}" |
|
|