File size: 9,506 Bytes
62da328
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import os
from typing import List, Optional, cast

from camel.toolkits import FunctionTool
from camel.toolkits.base import BaseToolkit


def get_plain_text_from_rich_text(rich_text: List[dict]) -> str:
    r"""Extracts plain text from a list of rich text elements.

    Args:
        rich_text: A list of dictionaries representing rich text elements.
            Each dictionary should contain a key named "plain_text" with
            the plain text content.

    Returns:
        str: A string containing the combined plain text from all elements,
            joined together.
    """
    plain_texts = [element.get("plain_text", "") for element in rich_text]
    return "".join(plain_texts)


def get_media_source_text(block: dict) -> str:
    r"""Extracts the source URL and optional caption from a
    Notion media block.

    Args:
        block: A dictionary representing a Notion media block.

    Returns:
        A string containing the source URL and caption (if available),
            separated by a colon.
    """
    block_type = block.get("type", "Unknown Type")
    block_content = block.get(block_type, {})

    # Extract source URL based on available types
    source = (
        block_content.get("external", {}).get("url")
        or block_content.get("file", {}).get("url")
        or block_content.get(
            "url", "[Missing case for media block types]: " + block_type
        )
    )

    # Extract caption if available
    caption_elements = block_content.get("caption", [])
    if caption_elements:
        caption = get_plain_text_from_rich_text(caption_elements)
        return f"{caption}: {source}"

    return source


class NotionToolkit(BaseToolkit):
    r"""A toolkit for retrieving information from the user's notion pages.

    Attributes:
        notion_token (Optional[str], optional): The notion_token used to
            interact with notion APIs.(default: :obj:`None`)
        notion_client (module): The notion module for interacting with
            the notion APIs.
    """

    def __init__(
        self,
        notion_token: Optional[str] = None,
    ) -> None:
        r"""Initializes the NotionToolkit.

        Args:
            notion_token (Optional[str], optional): The optional notion_token
                used to interact with notion APIs.(default: :obj:`None`)
        """
        from notion_client import Client

        self.notion_token = notion_token or os.environ.get("NOTION_TOKEN")
        self.notion_client = Client(auth=self.notion_token)

    def list_all_users(self) -> List[dict]:
        r"""Lists all users via the Notion integration.

        Returns:
            List[dict]: A list of user objects with type, name, and workspace.
        """
        all_users_info: List[dict] = []
        cursor = None

        while True:
            response = cast(
                dict,
                self.notion_client.users.list(start_cursor=cursor),
            )
            all_users_info.extend(response["results"])

            if not response["has_more"]:
                break

            cursor = response["next_cursor"]

        formatted_users = [
            {
                "type": user["type"],
                "name": user["name"],
                "workspace": user.get(user.get("type"), {}).get(
                    "workspace_name", ""
                ),
            }
            for user in all_users_info
        ]

        return formatted_users

    def list_all_pages(self) -> List[dict]:
        r"""Lists all pages in the Notion workspace.

        Returns:
            List[dict]: A list of page objects with title and id.
        """
        all_pages_info: List[dict] = []
        cursor = None

        while True:
            response = cast(
                dict,
                self.notion_client.search(
                    filter={"property": "object", "value": "page"},
                    start_cursor=cursor,
                ),
            )
            all_pages_info.extend(response["results"])

            if not response["has_more"]:
                break

            cursor = response["next_cursor"]

        formatted_pages = [
            {
                "id": page.get("id"),
                "title": next(
                    (
                        title.get("text", {}).get("content")
                        for title in page["properties"]
                        .get("title", {})
                        .get("title", [])
                        if title["type"] == "text"
                    ),
                    None,
                ),
            }
            for page in all_pages_info
        ]

        return formatted_pages

    def get_notion_block_text_content(self, block_id: str) -> str:
        r"""Retrieves the text content of a Notion block.

        Args:
            block_id (str): The ID of the Notion block to retrieve.

        Returns:
            str: The text content of a Notion block, containing all
                the sub blocks.
        """
        blocks: List[dict] = []
        cursor = None

        while True:
            response = cast(
                dict,
                self.notion_client.blocks.children.list(
                    block_id=block_id, start_cursor=cursor
                ),
            )
            blocks.extend(response["results"])

            if not response["has_more"]:
                break

            cursor = response["next_cursor"]

        block_text_content = " ".join(
            [self.get_text_from_block(sub_block) for sub_block in blocks]
        )

        return block_text_content

    def get_text_from_block(self, block: dict) -> str:
        r"""Extracts plain text from a Notion block based on its type.

        Args:
            block (dict): A dictionary representing a Notion block.

        Returns:
            str: A string containing the extracted plain text and block type.
        """
        # Get rich text for supported block types
        if block.get(block.get("type"), {}).get("rich_text"):
            # Empty string if it's an empty line
            text = get_plain_text_from_rich_text(
                block[block["type"]]["rich_text"]
            )
        else:
            # Handle block types by case
            block_type = block.get("type")
            if block_type == "unsupported":
                text = "[Unsupported block type]"
            elif block_type == "bookmark":
                text = block["bookmark"]["url"]
            elif block_type == "child_database":
                text = block["child_database"]["title"]
                # Use other API endpoints for full database data
            elif block_type == "child_page":
                text = block["child_page"]["title"]
            elif block_type in ("embed", "video", "file", "image", "pdf"):
                text = get_media_source_text(block)
            elif block_type == "equation":
                text = block["equation"]["expression"]
            elif block_type == "link_preview":
                text = block["link_preview"]["url"]
            elif block_type == "synced_block":
                if block["synced_block"].get("synced_from"):
                    text = (
                        f"This block is synced with a block with ID: "
                        f"""
                        {block['synced_block']['synced_from']
                        [block['synced_block']['synced_from']['type']]}
                        """
                    )
                else:
                    text = (
                        "Source sync block that another"
                        + "blocked is synced with."
                    )
            elif block_type == "table":
                text = f"Table width: {block['table']['table_width']}"
                # Fetch children for full table data
            elif block_type == "table_of_contents":
                text = f"ToC color: {block['table_of_contents']['color']}"
            elif block_type in ("breadcrumb", "column_list", "divider"):
                text = "No text available"
            else:
                text = "[Needs case added]"

        # Query children for blocks with children
        if block.get("has_children"):
            text += self.get_notion_block_text_content(block["id"])

        return text

    def get_tools(self) -> List[FunctionTool]:
        r"""Returns a list of FunctionTool objects representing the
        functions in the toolkit.

        Returns:
            List[FunctionTool]: A list of FunctionTool objects
                representing the functions in the toolkit.
        """
        return [
            FunctionTool(self.list_all_pages),
            FunctionTool(self.list_all_users),
            FunctionTool(self.get_notion_block_text_content),
        ]