File size: 6,026 Bytes
62da328
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import json
import logging
import os
import time
from typing import IO, Any, Optional, Union

import requests

from camel.utils import api_keys_required

logger = logging.getLogger(__name__)


class ChunkrReader:
    r"""Chunkr Reader for processing documents and returning content
    in various formats.

    Args:
        api_key (Optional[str], optional): The API key for Chunkr API. If not
            provided, it will be retrieved from the environment variable
            `CHUNKR_API_KEY`. (default: :obj:`None`)
        url (Optional[str], optional): The url to the Chunkr service.
            (default: :obj:`https://api.chunkr.ai/api/v1/task`)
        timeout (int, optional): The maximum time in seconds to wait for the
            API responses. (default: :obj:`30`)
        **kwargs (Any): Additional keyword arguments for request headers.
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        url: Optional[str] = "https://api.chunkr.ai/api/v1/task",
        timeout: int = 30,
        **kwargs: Any,
    ) -> None:
        self._api_key = api_key or os.getenv('CHUNKR_API_KEY')
        self._url = os.getenv('CHUNKR_API_URL') or url
        self._headers = {
            "Authorization": f"{self._api_key}",
            **kwargs,
        }
        self.timeout = timeout

    def submit_task(
        self,
        file_path: str,
        model: str = "Fast",
        ocr_strategy: str = "Auto",
        target_chunk_length: str = "512",
    ) -> str:
        r"""Submits a file to the Chunkr API and returns the task ID.

        Args:
            file_path (str): The path to the file to be uploaded.
            model (str, optional): The model to be used for the task.
                (default: :obj:`Fast`)
            ocr_strategy (str, optional): The OCR strategy. Defaults to 'Auto'.
            target_chunk_length (str, optional): The target chunk length.
                (default: :obj:`512`)

        Returns:
            str: The task ID.
        """
        with open(file_path, 'rb') as file:
            files: dict[
                str, Union[tuple[None, IO[bytes]], tuple[None, str]]
            ] = {
                'file': (
                    None,
                    file,
                ),  # Properly pass the file as a binary stream
                'model': (None, model),
                'ocr_strategy': (None, ocr_strategy),
                'target_chunk_length': (None, target_chunk_length),
            }
            try:
                response = requests.post(
                    self._url,  # type: ignore[arg-type]
                    headers=self._headers,
                    files=files,
                    timeout=self.timeout,
                )
                response.raise_for_status()
                task_id = response.json().get('task_id')
                if not task_id:
                    raise ValueError("Task ID not returned in the response.")
                logger.info(f"Task submitted successfully. Task ID: {task_id}")
                return task_id
            except Exception as e:
                logger.error(f"Failed to submit task: {e}")
                raise ValueError(f"Failed to submit task: {e}") from e

    def get_task_output(self, task_id: str, max_retries: int = 5) -> str:
        r"""Polls the Chunkr API to check the task status and returns the task
        result.

        Args:
            task_id (str): The task ID to check the status for.
            max_retries (int, optional): Maximum number of retry attempts.
                (default: :obj:`5`)

        Returns:
            str: The formatted task result in JSON format.

        Raises:
            ValueError: If the task status cannot be retrieved.
            RuntimeError: If the maximum number of retries is reached without
                a successful task completion.
        """
        url_get = f"{self._url}/{task_id}"
        attempts = 0

        while attempts < max_retries:
            try:
                response = requests.get(
                    url_get, headers=self._headers, timeout=self.timeout
                )
                response.raise_for_status()
                task_status = response.json().get('status')

                if task_status == "Succeeded":
                    logger.info(f"Task {task_id} completed successfully.")
                    return self._pretty_print_response(response.json())
                else:
                    logger.info(
                        f"Task {task_id} is still {task_status}. Retrying "
                        "in 5 seconds..."
                    )
            except Exception as e:
                logger.error(f"Failed to retrieve task status: {e}")
                raise ValueError(f"Failed to retrieve task status: {e}") from e

            attempts += 1
            time.sleep(5)

        logger.error(f"Max retries reached for task {task_id}.")
        raise RuntimeError(f"Max retries reached for task {task_id}.")

    def _pretty_print_response(self, response_json: dict) -> str:
        r"""Pretty prints the JSON response.

        Args:
            response_json (dict): The response JSON to pretty print.

        Returns:
            str: Formatted JSON as a string.
        """
        return json.dumps(response_json, indent=4)