Spaces:
Runtime error
Runtime error
import json | |
import logging | |
import re | |
import zipfile | |
from pathlib import Path | |
from typing import Dict, Iterator, List, Union | |
from langchain_core.chat_loaders import BaseChatLoader | |
from langchain_core.chat_sessions import ChatSession | |
from langchain_core.messages import AIMessage, HumanMessage | |
logger = logging.getLogger(__name__) | |
class SlackChatLoader(BaseChatLoader): | |
"""Load `Slack` conversations from a dump zip file.""" | |
def __init__( | |
self, | |
path: Union[str, Path], | |
): | |
""" | |
Initialize the chat loader with the path to the exported Slack dump zip file. | |
:param path: Path to the exported Slack dump zip file. | |
""" | |
self.zip_path = path if isinstance(path, Path) else Path(path) | |
if not self.zip_path.exists(): | |
raise FileNotFoundError(f"File {self.zip_path} not found") | |
def _load_single_chat_session(self, messages: List[Dict]) -> ChatSession: | |
results: List[Union[AIMessage, HumanMessage]] = [] | |
previous_sender = None | |
for message in messages: | |
if not isinstance(message, dict): | |
continue | |
text = message.get("text", "") | |
timestamp = message.get("ts", "") | |
sender = message.get("user", "") | |
if not sender: | |
continue | |
skip_pattern = re.compile( | |
r"<@U\d+> has joined the channel", flags=re.IGNORECASE | |
) | |
if skip_pattern.match(text): | |
continue | |
if sender == previous_sender: | |
results[-1].content += "\n\n" + text | |
results[-1].additional_kwargs["events"].append( | |
{"message_time": timestamp} | |
) | |
else: | |
results.append( | |
HumanMessage( # type: ignore[call-arg] | |
role=sender, | |
content=text, | |
additional_kwargs={ | |
"sender": sender, | |
"events": [{"message_time": timestamp}], | |
}, | |
) | |
) | |
previous_sender = sender | |
return ChatSession(messages=results) | |
def _read_json(self, zip_file: zipfile.ZipFile, file_path: str) -> List[dict]: | |
"""Read JSON data from a zip subfile.""" | |
with zip_file.open(file_path, "r") as f: | |
data = json.load(f) | |
if not isinstance(data, list): | |
raise ValueError(f"Expected list of dictionaries, got {type(data)}") | |
return data | |
def lazy_load(self) -> Iterator[ChatSession]: | |
""" | |
Lazy load the chat sessions from the Slack dump file and yield them | |
in the required format. | |
:return: Iterator of chat sessions containing messages. | |
""" | |
with zipfile.ZipFile(str(self.zip_path), "r") as zip_file: | |
for file_path in zip_file.namelist(): | |
if file_path.endswith(".json"): | |
messages = self._read_json(zip_file, file_path) | |
yield self._load_single_chat_session(messages) | |