File size: 7,670 Bytes
ced4316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import glob
import io
import ntpath
import os
import time
from typing import List, Optional

from .client import ApiClient

# This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services.
# Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
# is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries).
# We are moving from first batch to the second one only when the first is entirely processed - which means it is
# slightly sub-optimal, but should scale better. However acquiring a list of million of files in directories would
# require something scalable too, which is not implemented for the moment.
DEFAULT_GROBID_CONFIG = {
    "grobid_server": "localhost",
    "grobid_port": "8070",
    "batch_size": 1000,
    "sleep_time": 5,
    "generateIDs": False,
    "consolidate_header": False,
    "consolidate_citations": False,
    "include_raw_citations": True,
    "include_raw_affiliations": False,
    "max_workers": 2,
}


class GrobidClient(ApiClient):
    def __init__(self, config=None):
        self.config = config or DEFAULT_GROBID_CONFIG
        self.generate_ids = self.config["generateIDs"]
        self.consolidate_header = self.config["consolidate_header"]
        self.consolidate_citations = self.config["consolidate_citations"]
        self.include_raw_citations = self.config["include_raw_citations"]
        self.include_raw_affiliations = self.config["include_raw_affiliations"]
        self.max_workers = self.config["max_workers"]
        self.grobid_server = self.config["grobid_server"]
        self.grobid_port = str(self.config["grobid_port"])
        self.sleep_time = self.config["sleep_time"]

    def process(self, input: str, output: str, service: str):
        batch_size_pdf = self.config["batch_size"]
        pdf_files = []

        for pdf_file in glob.glob(input + "/*.pdf"):
            pdf_files.append(pdf_file)

            if len(pdf_files) == batch_size_pdf:
                self.process_batch(pdf_files, output, service)
                pdf_files = []

        # last batch
        if len(pdf_files) > 0:
            self.process_batch(pdf_files, output, service)

    def process_batch(self, pdf_files: List[str], output: str, service: str) -> None:
        print(len(pdf_files), "PDF files to process")
        for pdf_file in pdf_files:
            self.process_pdf(pdf_file, output, service)

    def process_pdf_stream(self, pdf_file: str, pdf_strm: bytes, output: str, service: str) -> str:
        # process the stream
        files = {"input": (pdf_file, pdf_strm, "application/pdf", {"Expires": "0"})}

        the_url = "http://" + self.grobid_server
        the_url += ":" + self.grobid_port
        the_url += "/api/" + service

        # set the GROBID parameters
        the_data = {}
        if self.generate_ids:
            the_data["generateIDs"] = "1"
        else:
            the_data["generateIDs"] = "0"

        if self.consolidate_header:
            the_data["consolidateHeader"] = "1"
        else:
            the_data["consolidateHeader"] = "0"

        if self.consolidate_citations:
            the_data["consolidateCitations"] = "1"
        else:
            the_data["consolidateCitations"] = "0"

        if self.include_raw_affiliations:
            the_data["includeRawAffiliations"] = "1"
        else:
            the_data["includeRawAffiliations"] = "0"

        if self.include_raw_citations:
            the_data["includeRawCitations"] = "1"
        else:
            the_data["includeRawCitations"] = "0"

        res, status = self.post(
            url=the_url, files=files, data=the_data, headers={"Accept": "text/plain"}
        )

        if status == 503:
            time.sleep(self.sleep_time)
            # TODO: check if simply passing output as output is correct
            return self.process_pdf_stream(
                pdf_file=pdf_file, pdf_strm=pdf_strm, service=service, output=output
            )
        elif status != 200:
            with open(os.path.join(output, "failed.log"), "a+") as failed:
                failed.write(pdf_file.strip(".pdf") + "\n")
            print("Processing failed with error " + str(status))
            return ""
        else:
            return res.text

    def process_pdf(self, pdf_file: str, output: str, service: str) -> None:
        # check if TEI file is already produced
        # we use ntpath here to be sure it will work on Windows too
        pdf_file_name = ntpath.basename(pdf_file)
        filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + ".tei.xml")
        if os.path.isfile(filename):
            return

        print(pdf_file)
        pdf_strm = open(pdf_file, "rb").read()
        tei_text = self.process_pdf_stream(pdf_file, pdf_strm, output, service)

        # writing TEI file
        if tei_text:
            with io.open(filename, "w+", encoding="utf8") as tei_file:
                tei_file.write(tei_text)

    def process_citation(self, bib_string: str, log_file: str) -> Optional[str]:
        # process citation raw string and return corresponding dict
        the_data = {"citations": bib_string, "consolidateCitations": "0"}

        the_url = "http://" + self.grobid_server
        the_url += ":" + self.grobid_port
        the_url += "/api/processCitation"

        for _ in range(5):
            try:
                res, status = self.post(
                    url=the_url, data=the_data, headers={"Accept": "text/plain"}
                )
                if status == 503:
                    time.sleep(self.sleep_time)
                    continue
                elif status != 200:
                    with open(log_file, "a+") as failed:
                        failed.write("-- BIBSTR --\n")
                        failed.write(bib_string + "\n\n")
                    break
                else:
                    return res.text
            except Exception:
                continue

        return None

    def process_header_names(self, header_string: str, log_file: str) -> Optional[str]:
        # process author names from header string
        the_data = {"names": header_string}

        the_url = "http://" + self.grobid_server
        the_url += ":" + self.grobid_port
        the_url += "/api/processHeaderNames"

        res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"})

        if status == 503:
            time.sleep(self.sleep_time)
            return self.process_header_names(header_string, log_file)
        elif status != 200:
            with open(log_file, "a+") as failed:
                failed.write("-- AUTHOR --\n")
                failed.write(header_string + "\n\n")
        else:
            return res.text

        return None

    def process_affiliations(self, aff_string: str, log_file: str) -> Optional[str]:
        # process affiliation from input string
        the_data = {"affiliations": aff_string}

        the_url = "http://" + self.grobid_server
        the_url += ":" + self.grobid_port
        the_url += "/api/processAffiliations"

        res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"})

        if status == 503:
            time.sleep(self.sleep_time)
            return self.process_affiliations(aff_string, log_file)
        elif status != 200:
            with open(log_file, "a+") as failed:
                failed.write("-- AFFILIATION --\n")
                failed.write(aff_string + "\n\n")
        else:
            return res.text

        return None