Spaces:
Runtime error
Runtime error
File size: 7,925 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, List, Optional, Union
from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage
if TYPE_CHECKING:
import sqlite3
def nanoseconds_from_2001_to_datetime(nanoseconds: int) -> datetime:
# Convert nanoseconds to seconds (1 second = 1e9 nanoseconds)
timestamp_in_seconds = nanoseconds / 1e9
# The reference date is January 1, 2001, in Unix time
reference_date_seconds = datetime(2001, 1, 1).timestamp()
# Calculate the actual timestamp by adding the reference date
actual_timestamp = reference_date_seconds + timestamp_in_seconds
# Convert to a datetime object
return datetime.fromtimestamp(actual_timestamp)
class IMessageChatLoader(BaseChatLoader):
"""Load chat sessions from the `iMessage` chat.db SQLite file.
It only works on macOS when you have iMessage enabled and have the chat.db file.
The chat.db file is likely located at ~/Library/Messages/chat.db. However, your
terminal may not have permission to access this file. To resolve this, you can
copy the file to a different location, change the permissions of the file, or
grant full disk access for your terminal emulator
in System Settings > Security and Privacy > Full Disk Access.
"""
def __init__(self, path: Optional[Union[str, Path]] = None):
"""
Initialize the IMessageChatLoader.
Args:
path (str or Path, optional): Path to the chat.db SQLite file.
Defaults to None, in which case the default path
~/Library/Messages/chat.db will be used.
"""
if path is None:
path = Path.home() / "Library" / "Messages" / "chat.db"
self.db_path = path if isinstance(path, Path) else Path(path)
if not self.db_path.exists():
raise FileNotFoundError(f"File {self.db_path} not found")
try:
import sqlite3 # noqa: F401
except ImportError as e:
raise ImportError(
"The sqlite3 module is required to load iMessage chats.\n"
"Please install it with `pip install pysqlite3`"
) from e
def _parse_attributedBody(self, attributedBody: bytes) -> str:
"""
Parse the attributedBody field of the message table
for the text content of the message.
The attributedBody field is a binary blob that contains
the message content after the byte string b"NSString":
5 bytes 1-3 bytes `len` bytes
... | b"NSString" | preamble | `len` | contents | ...
The 5 preamble bytes are always b"\x01\x94\x84\x01+"
The size of `len` is either 1 byte or 3 bytes:
- If the first byte in `len` is b"\x81" then `len` is 3 bytes long.
So the message length is the 2 bytes after, in little Endian.
- Otherwise, the size of `len` is 1 byte, and the message length is
that byte.
Args:
attributedBody (bytes): attributedBody field of the message table.
Return:
str: Text content of the message.
"""
content = attributedBody.split(b"NSString")[1][5:]
length, start = content[0], 1
if content[0] == 129:
length, start = int.from_bytes(content[1:3], "little"), 3
return content[start : start + length].decode("utf-8", errors="ignore")
def _get_session_query(self, use_chat_handle_table: bool) -> str:
# Messages sent pre OSX 12 require a join through the chat_handle_join table
# However, the table doesn't exist if database created with OSX 12 or above.
joins_w_chat_handle = """
JOIN chat_handle_join ON
chat_message_join.chat_id = chat_handle_join.chat_id
JOIN handle ON
handle.ROWID = chat_handle_join.handle_id"""
joins_no_chat_handle = """
JOIN handle ON message.handle_id = handle.ROWID
"""
joins = joins_w_chat_handle if use_chat_handle_table else joins_no_chat_handle
return f"""
SELECT message.date,
handle.id,
message.text,
message.is_from_me,
message.attributedBody
FROM message
JOIN chat_message_join ON
message.ROWID = chat_message_join.message_id
{joins}
WHERE chat_message_join.chat_id = ?
ORDER BY message.date ASC;
"""
def _load_single_chat_session(
self, cursor: "sqlite3.Cursor", use_chat_handle_table: bool, chat_id: int
) -> ChatSession:
"""
Load a single chat session from the iMessage chat.db.
Args:
cursor: SQLite cursor object.
chat_id (int): ID of the chat session to load.
Returns:
ChatSession: Loaded chat session.
"""
results: List[HumanMessage] = []
query = self._get_session_query(use_chat_handle_table)
cursor.execute(query, (chat_id,))
messages = cursor.fetchall()
for date, sender, text, is_from_me, attributedBody in messages:
if text:
content = text
elif attributedBody:
content = self._parse_attributedBody(attributedBody)
else: # Skip messages with no content
continue
results.append(
HumanMessage( # type: ignore[call-arg]
role=sender,
content=content,
additional_kwargs={
"message_time": date,
"message_time_as_datetime": nanoseconds_from_2001_to_datetime(
date
),
"sender": sender,
"is_from_me": bool(is_from_me),
},
)
)
return ChatSession(messages=results)
def lazy_load(self) -> Iterator[ChatSession]:
"""
Lazy load the chat sessions from the iMessage chat.db
and yield them in the required format.
Yields:
ChatSession: Loaded chat session.
"""
import sqlite3
try:
conn = sqlite3.connect(self.db_path)
except sqlite3.OperationalError as e:
raise ValueError(
f"Could not open iMessage DB file {self.db_path}.\n"
"Make sure your terminal emulator has disk access to this file.\n"
" You can either copy the DB file to an accessible location"
" or grant full disk access for your terminal emulator."
" You can grant full disk access for your terminal emulator"
" in System Settings > Security and Privacy > Full Disk Access."
) from e
cursor = conn.cursor()
# See if chat_handle_join table exists:
query = """SELECT name FROM sqlite_master
WHERE type='table' AND name='chat_handle_join';"""
cursor.execute(query)
is_chat_handle_join_exists = cursor.fetchone()
# Fetch the list of chat IDs sorted by time (most recent first)
query = """SELECT chat_id
FROM message
JOIN chat_message_join ON message.ROWID = chat_message_join.message_id
GROUP BY chat_id
ORDER BY MAX(date) DESC;"""
cursor.execute(query)
chat_ids = [row[0] for row in cursor.fetchall()]
for chat_id in chat_ids:
yield self._load_single_chat_session(
cursor, is_chat_handle_join_exists, chat_id
)
conn.close()
|