File size: 3,515 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
from typing import Iterator, List, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)


class CouchbaseLoader(BaseLoader):
    """Load documents from `Couchbase`.

    Each document represents one row of the result. The `page_content_fields` are
    written into the `page_content`of the document. The `metadata_fields` are written
    into the `metadata` of the document. By default, all columns are written into
    the `page_content` and none into the `metadata`.
    """

    def __init__(
        self,
        connection_string: str,
        db_username: str,
        db_password: str,
        query: str,
        *,
        page_content_fields: Optional[List[str]] = None,
        metadata_fields: Optional[List[str]] = None,
    ) -> None:
        """Initialize Couchbase document loader.

        Args:
            connection_string (str): The connection string to the Couchbase cluster.
            db_username (str): The username to connect to the Couchbase cluster.
            db_password (str): The password to connect to the Couchbase cluster.
            query (str): The SQL++ query to execute.
            page_content_fields (Optional[List[str]]): The columns to write into the
                `page_content` field of the document. By default, all columns are
                written.
            metadata_fields (Optional[List[str]]): The columns to write into the
                `metadata` field of the document. By default, no columns are written.
        """
        try:
            from couchbase.auth import PasswordAuthenticator
            from couchbase.cluster import Cluster
            from couchbase.options import ClusterOptions
        except ImportError as e:
            raise ImportError(
                "Could not import couchbase package."
                "Please install couchbase SDK with `pip install couchbase`."
            ) from e
        if not connection_string:
            raise ValueError("connection_string must be provided.")

        if not db_username:
            raise ValueError("db_username must be provided.")

        if not db_password:
            raise ValueError("db_password must be provided.")

        auth = PasswordAuthenticator(
            db_username,
            db_password,
        )

        self.cluster: Cluster = Cluster(connection_string, ClusterOptions(auth))
        self.query = query
        self.page_content_fields = page_content_fields
        self.metadata_fields = metadata_fields

    def lazy_load(self) -> Iterator[Document]:
        """Load Couchbase data into Document objects lazily."""
        from datetime import timedelta

        # Ensure connection to Couchbase cluster
        self.cluster.wait_until_ready(timedelta(seconds=5))

        # Run SQL++ Query
        result = self.cluster.query(self.query)
        for row in result:
            metadata_fields = self.metadata_fields
            page_content_fields = self.page_content_fields

            if not page_content_fields:
                page_content_fields = list(row.keys())

            if not metadata_fields:
                metadata_fields = []

            metadata = {field: row[field] for field in metadata_fields}

            document = "\n".join(
                f"{k}: {v}" for k, v in row.items() if k in page_content_fields
            )

            yield (Document(page_content=document, metadata=metadata))