from smolagents.tools import Tool from typing import Optional, Union, Dict, Any import os import time import requests import io from PIL import Image from pytubefix import YouTube import docx from docx.shared import Pt, RGBColor, Inches from docx.enum.text import WD_PARAGRAPH_ALIGNMENT import google.generativeai as genai from dotenv import load_dotenv # Load environment variables load_dotenv() class TranscriptSummarizer(Tool): description = "Summarizes a transcript and generates blog content using Google's Gemini model for summarization and Hugging Face API for image generation." name = "transcript_summarizer" inputs = { 'transcript': {'type': 'string', 'description': 'The transcript to summarize.'}, 'language': {'type': 'string', 'description': 'The language of the transcript.', 'nullable': True} } output_type = "string" def __init__(self, *args, hf_api_key: str = None, **kwargs): super().__init__(*args, **kwargs) # Get Gemini API key from environment variables gemini_api_key = os.getenv("GEMINI_API_KEY") if gemini_api_key: # Configure the Gemini API genai.configure(api_key=gemini_api_key) # Set up the model self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') else: self.gemini_model = None # Set up Hugging Face for image generation self.api_url = "https://api-inference.huggingface.co/models/ZB-Tech/Text-to-Image" self.hf_api_key = hf_api_key self.headers = {"Authorization": f"Bearer {self.hf_api_key}"} def query_image_api(self, payload): response = requests.post(self.api_url, headers=self.headers, json=payload) return response.content def summarize_with_gemini(self, text, language='en', max_tokens=1000): """Use Gemini to summarize text in the specified language""" # Map language codes to full language names for better prompting language_map = { 'en': 'English', 'hi': 'Hindi', 'es': 'Spanish', 'fr': 'French', 'de': 'German', 'it': 'Italian', 'ja': 'Japanese', 'ko': 'Korean', 'pt': 'Portuguese', 'ru': 'Russian', 'zh': 'Chinese', 'ar': 'Arabic', 'bn': 'Bengali', 'ta': 'Tamil', 'te': 'Telugu', 'mr': 'Marathi', 'gu': 'Gujarati', 'kn': 'Kannada', 'ml': 'Malayalam', 'pa': 'Punjabi', 'ur': 'Urdu' # Add more languages as needed } language_name = language_map.get(language, language) prompt = f""" Please summarize the following transcript in a concise but comprehensive way. Focus on the main points and key information. IMPORTANT: The transcript is in {language_name}. Please provide the summary in the SAME LANGUAGE ({language_name}). Do not translate to any other language. Keep the summary in the original language of the transcript. Transcript: {text} """ generation_config = { "temperature": 0.4, "top_p": 0.95, "top_k": 40, "max_output_tokens": max_tokens, } response = self.gemini_model.generate_content( prompt, generation_config=generation_config ) return response.text def forward(self, transcript: str, language: str = 'en') -> str: try: if not self.hf_api_key: return "Hugging Face API key is required for image generation. Please provide it in the input field." if not self.gemini_model: return "Gemini API key is required for summarization. Please add it to your .env file." transcript_length = len(transcript) # Check if transcript is too short if transcript_length < 100: return "Transcript is too short to summarize." # For longer transcripts, split into chunks to handle context window limitations if transcript_length > 30000: # Gemini has a context window limit chunk_size = 25000 transcript_chunks = [transcript[i:i+chunk_size] for i in range(0, len(transcript), chunk_size)] # Summarize each chunk chunk_summaries = [] for chunk in transcript_chunks: chunk_summary = self.summarize_with_gemini(chunk, language=language, max_tokens=1000) chunk_summaries.append(chunk_summary) # Combine chunk summaries and create a final summary combined_summary = "\n\n".join(chunk_summaries) if len(combined_summary) > 25000: full_summary = self.summarize_with_gemini(combined_summary, language=language, max_tokens=2000) else: full_summary = combined_summary else: # For shorter transcripts, summarize directly full_summary = self.summarize_with_gemini(transcript, language=language, max_tokens=2000) # Generate image based on summary try: key_entities = full_summary.split()[:15] # Extract first 15 words as key entities image_prompt = f"Generate an image related to: {' '.join(key_entities)}, cartoon style" image_bytes = self.query_image_api({"inputs": image_prompt}) # Check if the response is valid if not image_bytes or len(image_bytes) < 100: print("Warning: Received invalid or empty image response") return full_summary # Return just the summary without image try: # Try to open the image image = Image.open(io.BytesIO(image_bytes)) # Save the image image_folder = "Image" if not os.path.exists(image_folder): os.makedirs(image_folder) image_url = os.path.join(image_folder, f"image_{int(time.time())}.jpg") # Use timestamp for unique filename image.save(image_url) return f"{full_summary}\n\nImage URL: {image_url}" # Return the file path with summary except Exception as img_error: print(f"Error processing image: {str(img_error)}") # Return just the summary if image processing fails return full_summary except Exception as img_gen_error: print(f"Error generating image: {str(img_gen_error)}") # Return just the summary if image generation fails return full_summary except Exception as e: return f"An unexpected error occurred: {str(e)}" class YouTubeTranscriptExtractor(Tool): description = "Extracts the transcript from a YouTube video." name = "youtube_transcript_extractor" inputs = {'video_url': {'type': 'string', 'description': 'The URL of the YouTube video.'}} output_type = "string" # Keep as string for compatibility with smolagents def forward(self, video_url: str) -> str: try: # Create a YouTube object yt = YouTube(video_url) lang = 'en' # Default language # Get the video transcript try: if 'en' in yt.captions: transcript = yt.captions['en'].generate_srt_captions() lang = 'en' else: # Get the first available caption if len(yt.captions.all()) > 0: caption = yt.captions.all()[0] transcript = caption.generate_srt_captions() lang = caption.code else: return f"LANGUAGE:{lang}||No transcript available for this video." except StopIteration: return f"LANGUAGE:{lang}||No transcript available for this video." except Exception as e: return f"LANGUAGE:{lang}||An unexpected error occurred while accessing captions: {str(e)}" # Clean up the transcript by removing timestamps and line numbers cleaned_transcript = "" for line in transcript.splitlines(): if not line.strip().isdigit() and "-->" not in line: cleaned_transcript += line + "\n" print(f"Transcript language detected: {lang}") print("Transcript sample: ", cleaned_transcript[:200] + "..." if len(cleaned_transcript) > 200 else cleaned_transcript) # Return both the transcript and the language as a formatted string # Format: "LANGUAGE:lang||transcript_text" return f"LANGUAGE:{lang}||{cleaned_transcript}" except Exception as e: return f"LANGUAGE:en||An unexpected error occurred: {str(e)}" def __init__(self, *args, **kwargs): self.is_initialized = False class TranscriptToDocx(Tool): description = "Creates or updates a DOCX file with YouTube transcript and summary." name = "transcript_to_docx" inputs = { 'transcript': {'type': 'string', 'description': 'The transcript to include in the document.'}, 'summary': {'type': 'string', 'description': 'The summary to include in the document.'}, 'video_title': {'type': 'string', 'description': 'The title of the YouTube video.'}, 'image_path': {'type': 'string', 'description': 'Path to the image to include in the document.', 'nullable': True}, 'existing_docx_path': {'type': 'string', 'description': 'Path to an existing DOCX file to update.', 'nullable': True} } output_type = "string" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.docx_folder = "Documents" if not os.path.exists(self.docx_folder): os.makedirs(self.docx_folder) def forward(self, transcript: str, summary: str, video_title: str, image_path: Optional[str] = None, existing_docx_path: Optional[str] = None) -> str: try: # Determine if we're creating a new document or updating an existing one if existing_docx_path and os.path.exists(existing_docx_path): doc = docx.Document(existing_docx_path) # Add a page break before adding new content doc.add_paragraph().add_run().add_break(docx.enum.text.WD_BREAK.PAGE) else: doc = docx.Document() # Set document properties doc.core_properties.title = f"YouTube Transcript: {video_title}" doc.core_properties.author = "YouTube Transcript Tool" # Add title title = doc.add_heading(video_title, level=1) title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER # Add summary section doc.add_heading("Summary", level=2) summary_para = doc.add_paragraph(summary) # Add image if provided if image_path and os.path.exists(image_path): try: doc.add_picture(image_path, width=Inches(6)) # Add caption for the image caption = doc.add_paragraph("Generated image based on transcript content") caption.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER caption.runs[0].italic = True except Exception as img_error: # If there's an error adding the image, just log it and continue print(f"Error adding image to document: {str(img_error)}") # Add transcript section doc.add_heading("Full Transcript", level=2) transcript_para = doc.add_paragraph(transcript) # Clean the video title for filename safe_title = ''.join(c for c in video_title if c.isalnum() or c in ' _-') safe_title = safe_title.replace(' ', '_') # Save the document output_filename = f"{safe_title}.docx" output_path = os.path.join(self.docx_folder, output_filename) try: doc.save(output_path) print(f"Document saved successfully at: {output_path}") return output_path except Exception as save_error: error_msg = f"Error saving document: {str(save_error)}" print(error_msg) # Try with a simpler filename as fallback try: fallback_path = os.path.join(self.docx_folder, f"youtube_transcript_{int(time.time())}.docx") doc.save(fallback_path) print(f"Document saved with fallback name at: {fallback_path}") return fallback_path except: return error_msg except Exception as e: return f"An error occurred while creating the DOCX file: {str(e)}"