File size: 4,802 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from typing import Any, Iterator, List, Optional, Sequence, Tuple, cast

from langchain_core.stores import ByteStore

from langchain_community.utilities.redis import get_client


class RedisStore(ByteStore):
    """BaseStore implementation using Redis as the underlying store.

    Examples:
        Create a RedisStore instance and perform operations on it:

        .. code-block:: python

            # Instantiate the RedisStore with a Redis connection
            from langchain_community.storage import RedisStore
            from langchain_community.utilities.redis import get_client

            client = get_client('redis://localhost:6379')
            redis_store = RedisStore(client)

            # Set values for keys
            redis_store.mset([("key1", b"value1"), ("key2", b"value2")])

            # Get values for keys
            values = redis_store.mget(["key1", "key2"])
            # [b"value1", b"value2"]

            # Delete keys
            redis_store.mdelete(["key1"])

            # Iterate over keys
            for key in redis_store.yield_keys():
                print(key)  # noqa: T201
    """

    def __init__(
        self,
        *,
        client: Any = None,
        redis_url: Optional[str] = None,
        client_kwargs: Optional[dict] = None,
        ttl: Optional[int] = None,
        namespace: Optional[str] = None,
    ) -> None:
        """Initialize the RedisStore with a Redis connection.

        Must provide either a Redis client or a redis_url with optional client_kwargs.

        Args:
            client: A Redis connection instance
            redis_url: redis url
            client_kwargs: Keyword arguments to pass to the Redis client
            ttl: time to expire keys in seconds if provided,
                 if None keys will never expire
            namespace: if provided, all keys will be prefixed with this namespace
        """
        try:
            from redis import Redis
        except ImportError as e:
            raise ImportError(
                "The RedisStore requires the redis library to be installed. "
                "pip install redis"
            ) from e

        if client and redis_url or client and client_kwargs:
            raise ValueError(
                "Either a Redis client or a redis_url with optional client_kwargs "
                "must be provided, but not both."
            )

        if client:
            if not isinstance(client, Redis):
                raise TypeError(
                    f"Expected Redis client, got {type(client).__name__} instead."
                )
            _client = client
        else:
            if not redis_url:
                raise ValueError(
                    "Either a Redis client or a redis_url must be provided."
                )
            _client = get_client(redis_url, **(client_kwargs or {}))

        self.client = _client

        if not isinstance(ttl, int) and ttl is not None:
            raise TypeError(f"Expected int or None, got {type(ttl)} instead.")

        self.ttl = ttl
        self.namespace = namespace

    def _get_prefixed_key(self, key: str) -> str:
        """Get the key with the namespace prefix.

        Args:
            key (str): The original key.

        Returns:
            str: The key with the namespace prefix.
        """
        delimiter = "/"
        if self.namespace:
            return f"{self.namespace}{delimiter}{key}"
        return key

    def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]:
        """Get the values associated with the given keys."""
        return cast(
            List[Optional[bytes]],
            self.client.mget([self._get_prefixed_key(key) for key in keys]),
        )

    def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None:
        """Set the given key-value pairs."""
        pipe = self.client.pipeline()

        for key, value in key_value_pairs:
            pipe.set(self._get_prefixed_key(key), value, ex=self.ttl)
        pipe.execute()

    def mdelete(self, keys: Sequence[str]) -> None:
        """Delete the given keys."""
        _keys = [self._get_prefixed_key(key) for key in keys]
        self.client.delete(*_keys)

    def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]:
        """Yield keys in the store."""
        if prefix:
            pattern = self._get_prefixed_key(prefix)
        else:
            pattern = self._get_prefixed_key("*")
        scan_iter = cast(Iterator[bytes], self.client.scan_iter(match=pattern))
        for key in scan_iter:
            decoded_key = key.decode("utf-8")
            if self.namespace:
                relative_key = decoded_key[len(self.namespace) + 1 :]
                yield relative_key
            else:
                yield decoded_key