Spaces:
Sleeping
Sleeping
from typing import Iterator | |
from langchain_core.documents import Document | |
from langchain_community.document_loaders.base import BaseBlobParser | |
from langchain_community.document_loaders.blob_loaders import Blob | |
import io | |
# import ffmpeg | |
import speech_recognition as sr | |
from pydub import AudioSegment | |
class VideoParser(BaseBlobParser): | |
"""Parse video files from a blob.""" | |
def lazy_parse(self, blob: Blob) -> Iterator[Document]: | |
"""Parse a video file into the Document iterator. | |
Args: | |
blob: The blob to parse. | |
Returns: An iterator of Documents. | |
""" | |
if not blob.mimetype.startswith('video/'): | |
raise ValueError("This blob type is not supported for this parser.") | |
with blob.as_bytes_io() as video_bytes_io: | |
video_bytes_io.seek(0) | |
audio_text = self.extract_audio_text(video_bytes_io) | |
metadata = {"source": blob.source, 'size': blob.size} | |
yield Document(page_content=audio_text, metadata=metadata) | |
def extract_audio_text(self, video_bytes_io: io.BytesIO) -> str: | |
"""Extract text from video audio. | |
Args: | |
video_bytes_io: The in-memory video bytes. | |
Returns: A string representing the transcribed audio text. | |
""" | |
try: | |
# Extract audio from video using ffmpeg | |
audio_buffer = io.BytesIO() | |
# process = ( | |
# ffmpeg | |
# .input('pipe:0', format='mp4') | |
# .output('pipe:1', format='wav', acodec='pcm_s16le', ac=1, ar='16000') | |
# .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) | |
# ) | |
# stdout, stderr = process.communicate(input=video_bytes_io.read()) | |
# if process.returncode != 0: | |
# raise RuntimeError(f"ffmpeg error: {stderr.decode()}") | |
audio_buffer.write(stdout) | |
audio_buffer.seek(0) | |
# Load the audio file into Pydub AudioSegment | |
audio_segment = AudioSegment.from_file(audio_buffer, format="wav") | |
audio_buffer.close() | |
# Convert audio to bytes compatible with the recognizer | |
audio_stream = io.BytesIO() | |
audio_segment.export(audio_stream, format="wav") | |
audio_stream.seek(0) | |
# Save the audio stream for debugging | |
with open("extracted_audio.wav", "wb") as f: | |
f.write(audio_stream.getvalue()) | |
recognizer = sr.Recognizer() | |
audio_file = sr.AudioFile(audio_stream) | |
with audio_file as source: | |
audio_data = recognizer.record(source) | |
audio_text = recognizer.recognize_google(audio_data) | |
return audio_text | |
except Exception as e: | |
return f"Error transcribing audio: {str(e)}" | |