Tai Truong
fix readme
d202ada
raw
history blame contribute delete
3.69 kB
import json
from typing import Any
import requests
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.field_typing import Tool
from langflow.inputs import MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class NotionPageCreator(LCToolComponent):
display_name: str = "Create Page "
description: str = "A component for creating Notion pages."
documentation: str = "https://docs.langflow.org/integrations/notion/page-create"
icon = "NotionDirectoryLoader"
inputs = [
StrInput(
name="database_id",
display_name="Database ID",
info="The ID of the Notion database.",
),
SecretStrInput(
name="notion_secret",
display_name="Notion Secret",
info="The Notion integration token.",
required=True,
),
MultilineInput(
name="properties_json",
display_name="Properties (JSON)",
info="The properties of the new page as a JSON string.",
),
]
class NotionPageCreatorSchema(BaseModel):
database_id: str = Field(..., description="The ID of the Notion database.")
properties_json: str = Field(..., description="The properties of the new page as a JSON string.")
def run_model(self) -> Data:
result = self._create_notion_page(self.database_id, self.properties_json)
if isinstance(result, str):
# An error occurred, return it as text
return Data(text=result)
# Success, return the created page data
output = "Created page properties:\n"
for prop_name, prop_value in result.get("properties", {}).items():
output += f"{prop_name}: {prop_value}\n"
return Data(text=output, data=result)
def build_tool(self) -> Tool:
return StructuredTool.from_function(
name="create_notion_page",
description="Create a new page in a Notion database. "
"IMPORTANT: Use the tool to check the Database properties for more details before using this tool.",
func=self._create_notion_page,
args_schema=self.NotionPageCreatorSchema,
)
def _create_notion_page(self, database_id: str, properties_json: str) -> dict[str, Any] | str:
if not database_id or not properties_json:
return "Invalid input. Please provide 'database_id' and 'properties_json'."
try:
properties = json.loads(properties_json)
except json.JSONDecodeError as e:
return f"Invalid properties format. Please provide a valid JSON string. Error: {e}"
headers = {
"Authorization": f"Bearer {self.notion_secret}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
}
data = {
"parent": {"database_id": database_id},
"properties": properties,
}
try:
response = requests.post("https://api.notion.com/v1/pages", headers=headers, json=data, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
error_message = f"Failed to create Notion page. Error: {e}"
if hasattr(e, "response") and e.response is not None:
error_message += f" Status code: {e.response.status_code}, Response: {e.response.text}"
return error_message
def __call__(self, *args, **kwargs):
return self._create_notion_page(*args, **kwargs)