File size: 7,925 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, List, Optional, Union

from langchain_core.chat_loaders import BaseChatLoader
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage

if TYPE_CHECKING:
    import sqlite3


def nanoseconds_from_2001_to_datetime(nanoseconds: int) -> datetime:
    # Convert nanoseconds to seconds (1 second = 1e9 nanoseconds)
    timestamp_in_seconds = nanoseconds / 1e9

    # The reference date is January 1, 2001, in Unix time
    reference_date_seconds = datetime(2001, 1, 1).timestamp()

    # Calculate the actual timestamp by adding the reference date
    actual_timestamp = reference_date_seconds + timestamp_in_seconds

    # Convert to a datetime object
    return datetime.fromtimestamp(actual_timestamp)


class IMessageChatLoader(BaseChatLoader):
    """Load chat sessions from the `iMessage` chat.db SQLite file.

    It only works on macOS when you have iMessage enabled and have the chat.db file.

    The chat.db file is likely located at ~/Library/Messages/chat.db. However, your
    terminal may not have permission to access this file. To resolve this, you can
    copy the file to a different location, change the permissions of the file, or
    grant full disk access for your terminal emulator
    in System Settings > Security and Privacy > Full Disk Access.
    """

    def __init__(self, path: Optional[Union[str, Path]] = None):
        """
        Initialize the IMessageChatLoader.

        Args:
            path (str or Path, optional): Path to the chat.db SQLite file.
                Defaults to None, in which case the default path
                ~/Library/Messages/chat.db will be used.
        """
        if path is None:
            path = Path.home() / "Library" / "Messages" / "chat.db"
        self.db_path = path if isinstance(path, Path) else Path(path)
        if not self.db_path.exists():
            raise FileNotFoundError(f"File {self.db_path} not found")
        try:
            import sqlite3  # noqa: F401
        except ImportError as e:
            raise ImportError(
                "The sqlite3 module is required to load iMessage chats.\n"
                "Please install it with `pip install pysqlite3`"
            ) from e

    def _parse_attributedBody(self, attributedBody: bytes) -> str:
        """
        Parse the attributedBody field of the message table
        for the text content of the message.

        The attributedBody field is a binary blob that contains
        the message content after the byte string b"NSString":

                              5 bytes      1-3 bytes    `len` bytes
        ... | b"NSString" |   preamble   |   `len`   |    contents    | ...

        The 5 preamble bytes are always b"\x01\x94\x84\x01+"

        The size of `len` is either 1 byte or 3 bytes:
        - If the first byte in `len` is b"\x81" then `len` is 3 bytes long.
          So the message length is the 2 bytes after, in little Endian.
        - Otherwise, the size of `len` is 1 byte, and the message length is
          that byte.

        Args:
            attributedBody (bytes): attributedBody field of the message table.
        Return:
            str: Text content of the message.
        """
        content = attributedBody.split(b"NSString")[1][5:]
        length, start = content[0], 1
        if content[0] == 129:
            length, start = int.from_bytes(content[1:3], "little"), 3
        return content[start : start + length].decode("utf-8", errors="ignore")

    def _get_session_query(self, use_chat_handle_table: bool) -> str:
        # Messages sent pre OSX 12 require a join through the chat_handle_join table
        # However, the table doesn't exist if database created with OSX 12 or above.

        joins_w_chat_handle = """
            JOIN chat_handle_join ON
                 chat_message_join.chat_id = chat_handle_join.chat_id
            JOIN handle ON
                 handle.ROWID = chat_handle_join.handle_id"""

        joins_no_chat_handle = """
            JOIN handle ON message.handle_id = handle.ROWID
        """

        joins = joins_w_chat_handle if use_chat_handle_table else joins_no_chat_handle

        return f"""
            SELECT  message.date,
                    handle.id,
                    message.text,
                    message.is_from_me,
                    message.attributedBody
            FROM message
            JOIN chat_message_join ON
                 message.ROWID = chat_message_join.message_id
            {joins}
            WHERE chat_message_join.chat_id = ?
            ORDER BY message.date ASC;
        """

    def _load_single_chat_session(
        self, cursor: "sqlite3.Cursor", use_chat_handle_table: bool, chat_id: int
    ) -> ChatSession:
        """
        Load a single chat session from the iMessage chat.db.

        Args:
            cursor: SQLite cursor object.
            chat_id (int): ID of the chat session to load.

        Returns:
            ChatSession: Loaded chat session.
        """
        results: List[HumanMessage] = []

        query = self._get_session_query(use_chat_handle_table)
        cursor.execute(query, (chat_id,))
        messages = cursor.fetchall()

        for date, sender, text, is_from_me, attributedBody in messages:
            if text:
                content = text
            elif attributedBody:
                content = self._parse_attributedBody(attributedBody)
            else:  # Skip messages with no content
                continue

            results.append(
                HumanMessage(  # type: ignore[call-arg]
                    role=sender,
                    content=content,
                    additional_kwargs={
                        "message_time": date,
                        "message_time_as_datetime": nanoseconds_from_2001_to_datetime(
                            date
                        ),
                        "sender": sender,
                        "is_from_me": bool(is_from_me),
                    },
                )
            )

        return ChatSession(messages=results)

    def lazy_load(self) -> Iterator[ChatSession]:
        """
        Lazy load the chat sessions from the iMessage chat.db
        and yield them in the required format.

        Yields:
            ChatSession: Loaded chat session.
        """
        import sqlite3

        try:
            conn = sqlite3.connect(self.db_path)
        except sqlite3.OperationalError as e:
            raise ValueError(
                f"Could not open iMessage DB file {self.db_path}.\n"
                "Make sure your terminal emulator has disk access to this file.\n"
                "   You can either copy the DB file to an accessible location"
                " or grant full disk access for your terminal emulator."
                "  You can grant full disk access for your terminal emulator"
                " in System Settings > Security and Privacy > Full Disk Access."
            ) from e
        cursor = conn.cursor()

        # See if chat_handle_join table exists:
        query = """SELECT name FROM sqlite_master
                   WHERE type='table' AND name='chat_handle_join';"""

        cursor.execute(query)
        is_chat_handle_join_exists = cursor.fetchone()

        # Fetch the list of chat IDs sorted by time (most recent first)
        query = """SELECT chat_id
        FROM message
        JOIN chat_message_join ON message.ROWID = chat_message_join.message_id
        GROUP BY chat_id
        ORDER BY MAX(date) DESC;"""
        cursor.execute(query)
        chat_ids = [row[0] for row in cursor.fetchall()]

        for chat_id in chat_ids:
            yield self._load_single_chat_session(
                cursor, is_chat_handle_join_exists, chat_id
            )

        conn.close()