File size: 4,148 Bytes
1897f56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from langchain_community.document_loaders.parsers.pdf import PyPDFParser
from langchain_community.document_loaders.generic import GenericLoader
from langchain_core.document_loaders.blob_loaders import BlobLoader
from io import BytesIO
from starlette.datastructures import UploadFile
from typing import List, Iterable, ByteString
# Ensure this is the correct path for your custom loader
from custon_generic_loader import CustomGenericLoader
from langchain_core.documents import Document
from langchain_community.document_loaders.blob_loaders.schema import Blob
from parser.msword_parser import MsWordParser
from parser.pptx_parser import PptxParser
from parser.xlsx_parser import XlsxParser
from parser.txt_parser import TxtParser
from parser.audio_parser import AudioParser
from parser.video_parser import VideoParser


class InMemoryBlobLoader(BlobLoader):
    def __init__(self, upload_file: UploadFile):
        self.upload_file = upload_file

    async def yield_blobs(self) -> Iterable[ByteString]:
        data = await self.upload_file.read()
        yield Blob.from_data(data, mime_type=self.upload_file.content_type, metadata={
            'name': self.upload_file.filename,
            'size': self.upload_file.size,
            'source': self.upload_file.filename
        })


async def load_document(upload_file: UploadFile) -> List[Document]:
    blob_loader = InMemoryBlobLoader(upload_file)

    if upload_file.content_type == 'application/pdf':
        blob_parser = PyPDFParser()
        print(f'Loading PDF: {upload_file.filename}')
    elif upload_file.content_type in [
        'application/msword',
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
    ]:
        blob_parser = MsWordParser()
        print(f'Loading Word Document: {upload_file.filename}')
    elif upload_file.content_type in [
        'application/vnd.ms-powerpoint',
        'application/vnd.openxmlformats-officedocument.presentationml.presentation'
    ]:
        blob_parser = PptxParser()
        print(f'Loading PowerPoint: {upload_file.filename}')
    elif upload_file.content_type == 'text/plain':
        blob_parser = TxtParser()
        print(f'Loading Text File: {upload_file.filename}')
    elif upload_file.content_type.startswith('audio/'):
        blob_parser = AudioParser()
        print(f'Loading Audio File: {upload_file.filename}')
    elif upload_file.content_type.startswith('video/'):
        blob_parser = VideoParser()
        print(f'Loading Video File: {upload_file.filename}')

# Suggested code may be subject to a license. Learn more: ~LicenseLog:3330720155.
    elif upload_file.content_type in [
        'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
        'application/vnd.ms-excel'
    ]:
        blob_parser = XlsxParser()
        print(f'Loading Excel File: {upload_file.filename}')
    
    else:
        raise ValueError(f"Unsupported file type: {upload_file.content_type}")

    loader = CustomGenericLoader(blob_loader, blob_parser)
    documents = []
    # async for document in loader.lazy_load():
    #     documents.append(document)

    document = await loader.load_all()
    documents.append(document)

    if not documents:
        raise ValueError(
            f"No documents were loaded for file: {upload_file.filename}")

    return documents


async def load_all_documents(upload_files: List[UploadFile]) -> List[List[Document]]:
    all_documents = []
    for upload_file in upload_files:
        try:
            documents = await load_document(upload_file)
            all_documents.extend(documents)
        except ValueError as e:
            print(f"Error loading {upload_file.filename}: {e}")

    if not all_documents:
        raise ValueError("No documents were loaded from the provided files.")

    return all_documents

# Example usage:
# Note: You would typically run this inside an async function or an async event loop.
# Example:
# upload_files = [UploadFile1, UploadFile2, ...]
# documents = await load_all_documents(upload_files)