|
|
|
|
|
import os |
|
import io |
|
import yt_dlp |
|
import requests |
|
import openai |
|
from fastapi import FastAPI, HTTPException, Request |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from pydub import AudioSegment |
|
from huggingface_hub import InferenceClient |
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
HF_API_KEY = os.getenv("HUGGINGFACE_API_KEY") |
|
OPENROUTER_API_KEY= os.getenv("OPENROUTER_API_KEY") |
|
openai.api_key = OPENROUTER_API_KEY |
|
openai.api_base = "https://openrouter.ai/api/v1" |
|
|
|
|
|
if not HF_API_KEY: |
|
print("ERROR: HUGGINGFACE_API_KEY environment variable not found.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
ASR_MODEL = "openai/whisper-large-v3" |
|
LLM_MODEL = "microsoft/mai-ds-r1:free" |
|
|
|
|
|
|
|
try: |
|
hf_inference = InferenceClient(token=HF_API_KEY) |
|
print("Hugging Face Inference Client initialized.") |
|
except Exception as e: |
|
print(f"ERROR: Failed to initialize Hugging Face Inference Client: {e}") |
|
hf_inference = None |
|
|
|
|
|
app = FastAPI( |
|
title="Video Note Taker API", |
|
description="Transcribes videos and generates notes using Hugging Face models.", |
|
version="0.1.0", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
origins = [ |
|
"http://localhost:3000", |
|
|
|
|
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
|
|
|
|
class ProcessRequest(BaseModel): |
|
youtubeUrl: str |
|
|
|
def download_audio_bytes(youtube_url: str) -> bytes: |
|
|
|
""" |
|
Downloads the best audio-only format from a YouTube URL using yt-dlp and returns the raw audio data as bytes. |
|
""" |
|
|
|
print(f"Attempting to download audio for: {youtube_url}") |
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'noplaylist': True, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '128', |
|
}], |
|
|
|
|
|
} |
|
|
|
buffer = io.BytesIO() |
|
|
|
try: |
|
|
|
ydl_opts['outtmpl'] = '-' |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
|
|
|
|
|
|
|
|
|
info = ydl.extract_info(youtube_url, download=False) |
|
best_audio_format = None |
|
for f in info.get('formats', []): |
|
|
|
if f.get('acodec') != 'none' and f.get('vcodec') == 'none': |
|
if f.get('ext') in ['mp3', 'opus', 'm4a', 'webm']: |
|
best_audio_format = f |
|
break |
|
|
|
|
|
if not best_audio_format: |
|
for f in info.get('formats', []): |
|
if f.get('acodec') != 'none': |
|
best_audio_format = f |
|
break |
|
|
|
if not best_audio_format or 'url' not in best_audio_format: |
|
print("Could not find suitable audio stream URL via yt-dlp info. Direct download might fail or require ffmpeg.") |
|
|
|
|
|
|
|
error_info = ydl.download([youtube_url]) |
|
|
|
|
|
raise yt_dlp.utils.DownloadError("Could not extract a direct audio URL and ffmpeg may not be available.") |
|
|
|
|
|
audio_url = best_audio_format['url'] |
|
format_note = best_audio_format.get('format_note', best_audio_format.get('ext', 'N/A')) |
|
print(f"Found audio format: {format_note}. Downloading directly from URL...") |
|
|
|
|
|
with requests.get(audio_url, stream=True) as r: |
|
r.raise_for_status() |
|
for chunk in r.iter_content(chunk_size=8192): |
|
buffer.write(chunk) |
|
|
|
audio_bytes = buffer.getvalue() |
|
print(f"Audio downloaded successfully: {len(audio_bytes) / (1024*1024):.2f} MB") |
|
if not audio_bytes: |
|
raise ValueError("Downloaded audio data is empty.") |
|
audio_format = best_audio_format.get('ext', 'mp3') |
|
return audio_bytes, audio_format |
|
|
|
except yt_dlp.utils.DownloadError as e: |
|
print(f"ERROR: yt-dlp failed to download or process audio: {e}") |
|
raise HTTPException(status_code=500, detail=f"Failed to download audio from YouTube: {e}") |
|
except requests.exceptions.RequestException as e: |
|
print(f"ERROR: Failed to download audio stream from URL: {e}") |
|
raise HTTPException(status_code=500, detail=f"Failed to fetch audio stream: {e}") |
|
except Exception as e: |
|
print(f"ERROR: Unexpected error during audio download: {e}") |
|
|
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred during audio processing: {e}") |
|
|
|
|
|
def chunk_audio(audio_bytes: bytes, audio_format: str = "mp3", chunk_length_ms: int = 30000) -> list[bytes]: |
|
""" |
|
Splits raw audio bytes into smaller chunks using the specified format. |
|
|
|
Args: |
|
audio_bytes (bytes): Raw audio data. |
|
audio_format (str): The format of the audio (e.g., 'mp3', 'webm', 'm4a'). |
|
chunk_length_ms (int): Duration of each chunk in milliseconds. |
|
|
|
Returns: |
|
List[bytes]: List of audio chunks as bytes. |
|
""" |
|
try: |
|
audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=audio_format) |
|
except Exception as e: |
|
raise ValueError(f"Could not decode audio with format '{audio_format}': {e}") |
|
|
|
chunks = [] |
|
total_length = len(audio) |
|
|
|
for i in range(0, total_length, chunk_length_ms): |
|
chunk = audio[i:i+chunk_length_ms] |
|
chunk_buffer = io.BytesIO() |
|
chunk.export(chunk_buffer, format="mp3") |
|
chunks.append(chunk_buffer.getvalue()) |
|
|
|
return chunks |
|
|
|
|
|
def transcribe_audio(audio_bytes: bytes) -> str: |
|
""" |
|
Sends audio bytes to the Hugging Face ASR (Automatic Speech Recognition) API. |
|
""" |
|
if not hf_inference: |
|
raise HTTPException(status_code=503, detail="Transcription service client not initialized.") |
|
if not audio_bytes: |
|
raise ValueError("Cannot transcribe empty audio data.") |
|
|
|
print(f"Transcribing {len(audio_bytes) / (1024*1024):.2f} MB using {ASR_MODEL}...") |
|
try: |
|
|
|
|
|
transcript_result = hf_inference.automatic_speech_recognition( |
|
audio=audio_bytes, |
|
model=ASR_MODEL |
|
) |
|
transcript = transcript_result.get('text', '').strip() |
|
if not transcript: |
|
print("Warning: Transcription result was empty.") |
|
|
|
print("Transcription successful.") |
|
return transcript |
|
except Exception as e: |
|
print(f"ERROR: Hugging Face ASR API call failed: {e}") |
|
|
|
raise HTTPException(status_code=503, detail=f"Transcription service failed: {e}") |
|
|
|
|
|
def generate_notes_from_transcript(transcript: str) -> str: |
|
""" |
|
Sends the transcript to OpenRouter LLM (chat model) and gets structured notes back. |
|
""" |
|
if not OPENROUTER_API_KEY: |
|
raise HTTPException(status_code=503, detail="OpenRouter API key not found.") |
|
if not transcript: |
|
return "Transcript was empty." |
|
|
|
print(f"Generating notes for transcript (length {len(transcript)}) using {LLM_MODEL}...") |
|
|
|
|
|
|
|
prompt = f"""You are an expert note-taking assistant specializing in extracting key information from video transcripts. |
|
Please analyze the following transcript and generate concise, well-structured notes. |
|
Focus on the main topics, key points, important examples, definitions, and any conclusions presented. Use bullet points or numbered lists for clarity. |
|
|
|
Transcript: |
|
\"\"\" |
|
{transcript} |
|
\"\"\" |
|
|
|
Structured Notes:""" |
|
|
|
try: |
|
response = openai.ChatCompletion.create( |
|
model=LLM_MODEL, |
|
messages=[{"role": "user", "content": prompt}], |
|
max_tokens=1024, |
|
temperature=0.7 |
|
) |
|
notes = response.choices[0].message.content |
|
print("Note generation successful.") |
|
return notes |
|
|
|
except Exception as e: |
|
print(f"OpenRouter call failed: {e}") |
|
raise HTTPException(status_code=503, detail=f"OpenRouter failed: {e}") |
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
async def read_root(): |
|
""" Health check endpoint. Visit BASE_URL/ to see this. """ |
|
return {"message": "YouTube Notes API is running!"} |
|
|
|
@app.get("/docs", include_in_schema=False) |
|
async def custom_swagger_ui_html(): |
|
""" Access automatic API documentation via BASE_URL/docs """ |
|
from fastapi.openapi.docs import get_swagger_ui_html |
|
return get_swagger_ui_html(openapi_url=app.openapi_url, title=app.title + " - Docs") |
|
|
|
@app.get(app.openapi_url, include_in_schema=False) |
|
async def get_open_api_endpoint(): |
|
""" Serves the OpenAPI schema """ |
|
from fastapi.openapi.utils import get_openapi |
|
return get_openapi(title=app.title, version=app.version, routes=app.routes) |
|
|
|
|
|
@app.post("/process-video/") |
|
async def process_video(request: ProcessRequest): |
|
""" |
|
The main endpoint: receives a YouTube URL, processes it, and returns notes. |
|
Accessible via POST request to BASE_URL/process-video/ |
|
""" |
|
print(f"Received request for URL: {request.youtubeUrl}") |
|
try: |
|
|
|
audio_bytes, audio_format = download_audio_bytes(request.youtubeUrl) |
|
|
|
|
|
audio_chunks = chunk_audio(audio_bytes, audio_format) |
|
full_transcript = "" |
|
for idx, chunk in enumerate(audio_chunks): |
|
print(f"Transcribing chunk {idx + 1}/{len(audio_chunks)}...") |
|
try: |
|
partial = transcribe_audio(chunk) |
|
full_transcript += partial + " " |
|
except Exception as e: |
|
print(f"Skipping chunk {idx + 1} due to error: {e}") |
|
|
|
|
|
notes = generate_notes_from_transcript(full_transcript.strip()) |
|
|
|
|
|
print("Processing complete. Returning notes.") |
|
return {"notes": notes} |
|
|
|
except HTTPException as http_exc: |
|
|
|
print(f"HTTP Exception occurred: {http_exc.detail}") |
|
raise http_exc |
|
except Exception as e: |
|
|
|
print(f"ERROR: Unhandled exception in /process-video/ endpoint: {e}") |
|
|
|
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}") |