import sys import os from dotenv import load_dotenv from youtube_transcript_api import YouTubeTranscriptApi from googleapiclient.discovery import build from googleapiclient.errors import HttpError import google_auth_oauthlib.flow import googleapiclient.discovery import googleapiclient.errors import re import logging import ssl import certifi import requests # Set up logging # Configure logging for stdout only logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout ) logger = logging.getLogger(__name__) # Try to load from .env file if it exists, but don't fail if it doesn't try: current_dir = os.path.dirname(os.path.abspath(__file__)) dotenv_path = os.path.join(os.path.dirname(current_dir), '.env') if os.path.exists(dotenv_path): load_dotenv(dotenv_path) logger.info(f"Loaded environment variables from {dotenv_path}") except Exception as e: logger.warning(f"Could not load .env file: {e}") # Get API key from environment variable API_KEY = os.getenv('YOUTUBE_API_KEY') # Safe logging of API key if API_KEY: masked_key = f"{API_KEY[:3]}...{API_KEY[-3:]}" if len(API_KEY) > 6 else "***" logger.info(f"API_KEY found (masked): {masked_key}") else: logger.error("YouTube API key not found in environment variables") raise ValueError("YouTube API key not found. Make sure it's set in your environment variables or .env file.") def get_youtube_client(): try: # Create a custom session with SSL verification session = requests.Session() session.verify = certifi.where() # Create a custom HTTP object http = googleapiclient.http.build_http() http.verify = session.verify # Build the YouTube client with the custom HTTP object youtube = build('youtube', 'v3', developerKey=API_KEY, http=http) logger.info("YouTube API client initialized successfully") return youtube except Exception as e: logger.error(f"Error initializing YouTube API client: {str(e)}") raise # Rest of your existing functions remain the same... def extract_video_id(url): if not url: return None video_id_match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url) if video_id_match: return video_id_match.group(1) return None def get_video_metadata(video_id): youtube = get_youtube_client() try: request = youtube.videos().list( part="snippet,contentDetails,statistics", id=video_id ) response = request.execute() if 'items' in response and len(response['items']) > 0: video = response['items'][0] snippet = video['snippet'] description = snippet.get('description', '').strip() if not description: description = 'Not Available' return { 'title': snippet['title'], 'author': snippet['channelTitle'], 'upload_date': snippet['publishedAt'], 'view_count': video['statistics'].get('viewCount', '0'), 'like_count': video['statistics'].get('likeCount', '0'), 'comment_count': video['statistics'].get('commentCount', '0'), 'duration': video['contentDetails']['duration'], 'description': description } else: logger.error(f"No video found with id: {video_id}") return None except Exception as e: logger.error(f"An error occurred while fetching metadata for video {video_id}: {str(e)}") return None def get_transcript(video_id): if not video_id: return None try: transcript = YouTubeTranscriptApi.get_transcript(video_id) metadata = get_video_metadata(video_id) logger.info(f"Metadata for video {video_id}: {metadata}") logger.info(f"Transcript length for video {video_id}: {len(transcript)}") if not metadata: return None return { 'transcript': transcript, 'metadata': metadata } except Exception as e: logger.error(f"Error extracting transcript for video {video_id}: {str(e)}") return None def get_channel_videos(channel_url): youtube = get_youtube_client() channel_id = extract_channel_id(channel_url) if not channel_id: logger.error(f"Invalid channel URL: {channel_url}") return [] try: request = youtube.search().list( part="id,snippet", channelId=channel_id, type="video", maxResults=50 ) response = request.execute() videos = [] for item in response['items']: videos.append({ 'video_id': item['id']['videoId'], 'title': item['snippet']['title'], 'description': item['snippet']['description'], 'published_at': item['snippet']['publishedAt'] }) return videos except HttpError as e: logger.error(f"An HTTP error {e.resp.status} occurred: {e.content}") return [] except Exception as e: logger.error(f"An error occurred while fetching channel videos: {str(e)}") return [] def extract_channel_id(url): channel_id_match = re.search(r"(?:channel\/|c\/|@)([a-zA-Z0-9-_]+)", url) if channel_id_match: return channel_id_match.group(1) return None def test_api_key(): try: youtube = get_youtube_client() request = youtube.videos().list(part="snippet", id="dQw4w9WgXcQ") response = request.execute() if 'items' in response: logger.info("API key is valid and working") return True else: logger.error("API request successful but returned unexpected response") return False except Exception as e: logger.error(f"API key test failed: {str(e)}") return False def initialize_youtube_api(): if test_api_key(): logger.info("YouTube API initialized successfully") return True else: logger.error("Failed to initialize YouTube API") return False