File size: 5,709 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import re
import time
from enum import Enum
from typing import List, Optional

import requests
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


class BlockchainType(Enum):
    """Enumerator of the supported blockchains."""

    ETH_MAINNET = "eth-mainnet"
    ETH_GOERLI = "eth-goerli"
    POLYGON_MAINNET = "polygon-mainnet"
    POLYGON_MUMBAI = "polygon-mumbai"


class BlockchainDocumentLoader(BaseLoader):
    """Load elements from a blockchain smart contract.

    The supported blockchains are: Ethereum mainnet, Ethereum Goerli testnet,
    Polygon mainnet, and Polygon Mumbai testnet.

    If no BlockchainType is specified, the default is Ethereum mainnet.

    The Loader uses the Alchemy API to interact with the blockchain.
    ALCHEMY_API_KEY environment variable must be set to use this loader.

    The API returns 100 NFTs per request and can be paginated using the
    startToken parameter.

    If get_all_tokens is set to True, the loader will get all tokens
    on the contract.  Note that for contracts with a large number of tokens,
    this may take a long time (e.g. 10k tokens is 100 requests).
    Default value is false for this reason.

    The max_execution_time (sec) can be set to limit the execution time
    of the loader.

    Future versions of this loader can:
        - Support additional Alchemy APIs (e.g. getTransactions, etc.)
        - Support additional blockain APIs (e.g. Infura, Opensea, etc.)
    """

    def __init__(
        self,
        contract_address: str,
        blockchainType: BlockchainType = BlockchainType.ETH_MAINNET,
        api_key: str = "docs-demo",
        startToken: str = "",
        get_all_tokens: bool = False,
        max_execution_time: Optional[int] = None,
    ):
        """

        Args:
            contract_address: The address of the smart contract.
            blockchainType: The blockchain type.
            api_key: The Alchemy API key.
            startToken: The start token for pagination.
            get_all_tokens: Whether to get all tokens on the contract.
            max_execution_time: The maximum execution time (sec).
        """
        self.contract_address = contract_address
        self.blockchainType = blockchainType.value
        self.api_key = os.environ.get("ALCHEMY_API_KEY") or api_key
        self.startToken = startToken
        self.get_all_tokens = get_all_tokens
        self.max_execution_time = max_execution_time

        if not self.api_key:
            raise ValueError("Alchemy API key not provided.")

        if not re.match(r"^0x[a-fA-F0-9]{40}$", self.contract_address):
            raise ValueError(f"Invalid contract address {self.contract_address}")

    def load(self) -> List[Document]:
        result = []

        current_start_token = self.startToken

        start_time = time.time()

        while True:
            url = (
                f"https://{self.blockchainType}.g.alchemy.com/nft/v2/"
                f"{self.api_key}/getNFTsForCollection?withMetadata="
                f"True&contractAddress={self.contract_address}"
                f"&startToken={current_start_token}"
            )

            response = requests.get(url)

            if response.status_code != 200:
                raise ValueError(
                    f"Request failed with status code {response.status_code}"
                )

            items = response.json()["nfts"]

            if not items:
                break

            for item in items:
                content = str(item)
                tokenId = item["id"]["tokenId"]
                metadata = {
                    "source": self.contract_address,
                    "blockchain": self.blockchainType,
                    "tokenId": tokenId,
                }
                result.append(Document(page_content=content, metadata=metadata))

            # exit after the first API call if get_all_tokens is False
            if not self.get_all_tokens:
                break

            # get the start token for the next API call from the last item in array
            current_start_token = self._get_next_tokenId(result[-1].metadata["tokenId"])

            if (
                self.max_execution_time is not None
                and (time.time() - start_time) > self.max_execution_time
            ):
                raise RuntimeError("Execution time exceeded the allowed time limit.")

        if not result:
            raise ValueError(
                f"No NFTs found for contract address {self.contract_address}"
            )

        return result

    # add one to the tokenId, ensuring the correct tokenId format is used
    def _get_next_tokenId(self, tokenId: str) -> str:
        value_type = self._detect_value_type(tokenId)

        if value_type == "hex_0x":
            value_int = int(tokenId, 16)
        elif value_type == "hex_0xbf":
            value_int = int(tokenId[2:], 16)
        else:
            value_int = int(tokenId)

        result = value_int + 1

        if value_type == "hex_0x":
            return "0x" + format(result, "0" + str(len(tokenId) - 2) + "x")
        elif value_type == "hex_0xbf":
            return "0xbf" + format(result, "0" + str(len(tokenId) - 4) + "x")
        else:
            return str(result)

    # A smart contract can use different formats for the tokenId
    @staticmethod
    def _detect_value_type(tokenId: str) -> str:
        if isinstance(tokenId, int):
            return "int"
        elif tokenId.startswith("0x"):
            return "hex_0x"
        elif tokenId.startswith("0xbf"):
            return "hex_0xbf"
        else:
            return "hex_0xbf"