Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
from datetime import datetime | |
from pathlib import Path | |
from typing import TYPE_CHECKING, Iterator, List, Optional, Union | |
from langchain_core.chat_loaders import BaseChatLoader | |
from langchain_core.chat_sessions import ChatSession | |
from langchain_core.messages import HumanMessage | |
if TYPE_CHECKING: | |
import sqlite3 | |
def nanoseconds_from_2001_to_datetime(nanoseconds: int) -> datetime: | |
# Convert nanoseconds to seconds (1 second = 1e9 nanoseconds) | |
timestamp_in_seconds = nanoseconds / 1e9 | |
# The reference date is January 1, 2001, in Unix time | |
reference_date_seconds = datetime(2001, 1, 1).timestamp() | |
# Calculate the actual timestamp by adding the reference date | |
actual_timestamp = reference_date_seconds + timestamp_in_seconds | |
# Convert to a datetime object | |
return datetime.fromtimestamp(actual_timestamp) | |
class IMessageChatLoader(BaseChatLoader): | |
"""Load chat sessions from the `iMessage` chat.db SQLite file. | |
It only works on macOS when you have iMessage enabled and have the chat.db file. | |
The chat.db file is likely located at ~/Library/Messages/chat.db. However, your | |
terminal may not have permission to access this file. To resolve this, you can | |
copy the file to a different location, change the permissions of the file, or | |
grant full disk access for your terminal emulator | |
in System Settings > Security and Privacy > Full Disk Access. | |
""" | |
def __init__(self, path: Optional[Union[str, Path]] = None): | |
""" | |
Initialize the IMessageChatLoader. | |
Args: | |
path (str or Path, optional): Path to the chat.db SQLite file. | |
Defaults to None, in which case the default path | |
~/Library/Messages/chat.db will be used. | |
""" | |
if path is None: | |
path = Path.home() / "Library" / "Messages" / "chat.db" | |
self.db_path = path if isinstance(path, Path) else Path(path) | |
if not self.db_path.exists(): | |
raise FileNotFoundError(f"File {self.db_path} not found") | |
try: | |
import sqlite3 # noqa: F401 | |
except ImportError as e: | |
raise ImportError( | |
"The sqlite3 module is required to load iMessage chats.\n" | |
"Please install it with `pip install pysqlite3`" | |
) from e | |
def _parse_attributedBody(self, attributedBody: bytes) -> str: | |
""" | |
Parse the attributedBody field of the message table | |
for the text content of the message. | |
The attributedBody field is a binary blob that contains | |
the message content after the byte string b"NSString": | |
5 bytes 1-3 bytes `len` bytes | |
... | b"NSString" | preamble | `len` | contents | ... | |
The 5 preamble bytes are always b"\x01\x94\x84\x01+" | |
The size of `len` is either 1 byte or 3 bytes: | |
- If the first byte in `len` is b"\x81" then `len` is 3 bytes long. | |
So the message length is the 2 bytes after, in little Endian. | |
- Otherwise, the size of `len` is 1 byte, and the message length is | |
that byte. | |
Args: | |
attributedBody (bytes): attributedBody field of the message table. | |
Return: | |
str: Text content of the message. | |
""" | |
content = attributedBody.split(b"NSString")[1][5:] | |
length, start = content[0], 1 | |
if content[0] == 129: | |
length, start = int.from_bytes(content[1:3], "little"), 3 | |
return content[start : start + length].decode("utf-8", errors="ignore") | |
def _get_session_query(self, use_chat_handle_table: bool) -> str: | |
# Messages sent pre OSX 12 require a join through the chat_handle_join table | |
# However, the table doesn't exist if database created with OSX 12 or above. | |
joins_w_chat_handle = """ | |
JOIN chat_handle_join ON | |
chat_message_join.chat_id = chat_handle_join.chat_id | |
JOIN handle ON | |
handle.ROWID = chat_handle_join.handle_id""" | |
joins_no_chat_handle = """ | |
JOIN handle ON message.handle_id = handle.ROWID | |
""" | |
joins = joins_w_chat_handle if use_chat_handle_table else joins_no_chat_handle | |
return f""" | |
SELECT message.date, | |
handle.id, | |
message.text, | |
message.is_from_me, | |
message.attributedBody | |
FROM message | |
JOIN chat_message_join ON | |
message.ROWID = chat_message_join.message_id | |
{joins} | |
WHERE chat_message_join.chat_id = ? | |
ORDER BY message.date ASC; | |
""" | |
def _load_single_chat_session( | |
self, cursor: "sqlite3.Cursor", use_chat_handle_table: bool, chat_id: int | |
) -> ChatSession: | |
""" | |
Load a single chat session from the iMessage chat.db. | |
Args: | |
cursor: SQLite cursor object. | |
chat_id (int): ID of the chat session to load. | |
Returns: | |
ChatSession: Loaded chat session. | |
""" | |
results: List[HumanMessage] = [] | |
query = self._get_session_query(use_chat_handle_table) | |
cursor.execute(query, (chat_id,)) | |
messages = cursor.fetchall() | |
for date, sender, text, is_from_me, attributedBody in messages: | |
if text: | |
content = text | |
elif attributedBody: | |
content = self._parse_attributedBody(attributedBody) | |
else: # Skip messages with no content | |
continue | |
results.append( | |
HumanMessage( # type: ignore[call-arg] | |
role=sender, | |
content=content, | |
additional_kwargs={ | |
"message_time": date, | |
"message_time_as_datetime": nanoseconds_from_2001_to_datetime( | |
date | |
), | |
"sender": sender, | |
"is_from_me": bool(is_from_me), | |
}, | |
) | |
) | |
return ChatSession(messages=results) | |
def lazy_load(self) -> Iterator[ChatSession]: | |
""" | |
Lazy load the chat sessions from the iMessage chat.db | |
and yield them in the required format. | |
Yields: | |
ChatSession: Loaded chat session. | |
""" | |
import sqlite3 | |
try: | |
conn = sqlite3.connect(self.db_path) | |
except sqlite3.OperationalError as e: | |
raise ValueError( | |
f"Could not open iMessage DB file {self.db_path}.\n" | |
"Make sure your terminal emulator has disk access to this file.\n" | |
" You can either copy the DB file to an accessible location" | |
" or grant full disk access for your terminal emulator." | |
" You can grant full disk access for your terminal emulator" | |
" in System Settings > Security and Privacy > Full Disk Access." | |
) from e | |
cursor = conn.cursor() | |
# See if chat_handle_join table exists: | |
query = """SELECT name FROM sqlite_master | |
WHERE type='table' AND name='chat_handle_join';""" | |
cursor.execute(query) | |
is_chat_handle_join_exists = cursor.fetchone() | |
# Fetch the list of chat IDs sorted by time (most recent first) | |
query = """SELECT chat_id | |
FROM message | |
JOIN chat_message_join ON message.ROWID = chat_message_join.message_id | |
GROUP BY chat_id | |
ORDER BY MAX(date) DESC;""" | |
cursor.execute(query) | |
chat_ids = [row[0] for row in cursor.fetchall()] | |
for chat_id in chat_ids: | |
yield self._load_single_chat_session( | |
cursor, is_chat_handle_join_exists, chat_id | |
) | |
conn.close() | |