from typing import Iterator from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseBlobParser from langchain_community.document_loaders.blob_loaders import Blob import io # import ffmpeg import speech_recognition as sr from pydub import AudioSegment class VideoParser(BaseBlobParser): """Parse video files from a blob.""" def lazy_parse(self, blob: Blob) -> Iterator[Document]: """Parse a video file into the Document iterator. Args: blob: The blob to parse. Returns: An iterator of Documents. """ if not blob.mimetype.startswith('video/'): raise ValueError("This blob type is not supported for this parser.") with blob.as_bytes_io() as video_bytes_io: video_bytes_io.seek(0) audio_text = self.extract_audio_text(video_bytes_io) metadata = {"source": blob.source, 'size': blob.size} yield Document(page_content=audio_text, metadata=metadata) def extract_audio_text(self, video_bytes_io: io.BytesIO) -> str: """Extract text from video audio. Args: video_bytes_io: The in-memory video bytes. Returns: A string representing the transcribed audio text. """ try: # Extract audio from video using ffmpeg audio_buffer = io.BytesIO() # process = ( # ffmpeg # .input('pipe:0', format='mp4') # .output('pipe:1', format='wav', acodec='pcm_s16le', ac=1, ar='16000') # .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) # ) # stdout, stderr = process.communicate(input=video_bytes_io.read()) # if process.returncode != 0: # raise RuntimeError(f"ffmpeg error: {stderr.decode()}") audio_buffer.write(stdout) audio_buffer.seek(0) # Load the audio file into Pydub AudioSegment audio_segment = AudioSegment.from_file(audio_buffer, format="wav") audio_buffer.close() # Convert audio to bytes compatible with the recognizer audio_stream = io.BytesIO() audio_segment.export(audio_stream, format="wav") audio_stream.seek(0) # Save the audio stream for debugging with open("extracted_audio.wav", "wb") as f: f.write(audio_stream.getvalue()) recognizer = sr.Recognizer() audio_file = sr.AudioFile(audio_stream) with audio_file as source: audio_data = recognizer.record(source) audio_text = recognizer.recognize_google(audio_data) return audio_text except Exception as e: return f"Error transcribing audio: {str(e)}"