File size: 7,670 Bytes
ced4316 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import glob
import io
import ntpath
import os
import time
from typing import List, Optional
from .client import ApiClient
# This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services.
# Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
# is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries).
# We are moving from first batch to the second one only when the first is entirely processed - which means it is
# slightly sub-optimal, but should scale better. However acquiring a list of million of files in directories would
# require something scalable too, which is not implemented for the moment.
DEFAULT_GROBID_CONFIG = {
"grobid_server": "localhost",
"grobid_port": "8070",
"batch_size": 1000,
"sleep_time": 5,
"generateIDs": False,
"consolidate_header": False,
"consolidate_citations": False,
"include_raw_citations": True,
"include_raw_affiliations": False,
"max_workers": 2,
}
class GrobidClient(ApiClient):
def __init__(self, config=None):
self.config = config or DEFAULT_GROBID_CONFIG
self.generate_ids = self.config["generateIDs"]
self.consolidate_header = self.config["consolidate_header"]
self.consolidate_citations = self.config["consolidate_citations"]
self.include_raw_citations = self.config["include_raw_citations"]
self.include_raw_affiliations = self.config["include_raw_affiliations"]
self.max_workers = self.config["max_workers"]
self.grobid_server = self.config["grobid_server"]
self.grobid_port = str(self.config["grobid_port"])
self.sleep_time = self.config["sleep_time"]
def process(self, input: str, output: str, service: str):
batch_size_pdf = self.config["batch_size"]
pdf_files = []
for pdf_file in glob.glob(input + "/*.pdf"):
pdf_files.append(pdf_file)
if len(pdf_files) == batch_size_pdf:
self.process_batch(pdf_files, output, service)
pdf_files = []
# last batch
if len(pdf_files) > 0:
self.process_batch(pdf_files, output, service)
def process_batch(self, pdf_files: List[str], output: str, service: str) -> None:
print(len(pdf_files), "PDF files to process")
for pdf_file in pdf_files:
self.process_pdf(pdf_file, output, service)
def process_pdf_stream(self, pdf_file: str, pdf_strm: bytes, output: str, service: str) -> str:
# process the stream
files = {"input": (pdf_file, pdf_strm, "application/pdf", {"Expires": "0"})}
the_url = "http://" + self.grobid_server
the_url += ":" + self.grobid_port
the_url += "/api/" + service
# set the GROBID parameters
the_data = {}
if self.generate_ids:
the_data["generateIDs"] = "1"
else:
the_data["generateIDs"] = "0"
if self.consolidate_header:
the_data["consolidateHeader"] = "1"
else:
the_data["consolidateHeader"] = "0"
if self.consolidate_citations:
the_data["consolidateCitations"] = "1"
else:
the_data["consolidateCitations"] = "0"
if self.include_raw_affiliations:
the_data["includeRawAffiliations"] = "1"
else:
the_data["includeRawAffiliations"] = "0"
if self.include_raw_citations:
the_data["includeRawCitations"] = "1"
else:
the_data["includeRawCitations"] = "0"
res, status = self.post(
url=the_url, files=files, data=the_data, headers={"Accept": "text/plain"}
)
if status == 503:
time.sleep(self.sleep_time)
# TODO: check if simply passing output as output is correct
return self.process_pdf_stream(
pdf_file=pdf_file, pdf_strm=pdf_strm, service=service, output=output
)
elif status != 200:
with open(os.path.join(output, "failed.log"), "a+") as failed:
failed.write(pdf_file.strip(".pdf") + "\n")
print("Processing failed with error " + str(status))
return ""
else:
return res.text
def process_pdf(self, pdf_file: str, output: str, service: str) -> None:
# check if TEI file is already produced
# we use ntpath here to be sure it will work on Windows too
pdf_file_name = ntpath.basename(pdf_file)
filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + ".tei.xml")
if os.path.isfile(filename):
return
print(pdf_file)
pdf_strm = open(pdf_file, "rb").read()
tei_text = self.process_pdf_stream(pdf_file, pdf_strm, output, service)
# writing TEI file
if tei_text:
with io.open(filename, "w+", encoding="utf8") as tei_file:
tei_file.write(tei_text)
def process_citation(self, bib_string: str, log_file: str) -> Optional[str]:
# process citation raw string and return corresponding dict
the_data = {"citations": bib_string, "consolidateCitations": "0"}
the_url = "http://" + self.grobid_server
the_url += ":" + self.grobid_port
the_url += "/api/processCitation"
for _ in range(5):
try:
res, status = self.post(
url=the_url, data=the_data, headers={"Accept": "text/plain"}
)
if status == 503:
time.sleep(self.sleep_time)
continue
elif status != 200:
with open(log_file, "a+") as failed:
failed.write("-- BIBSTR --\n")
failed.write(bib_string + "\n\n")
break
else:
return res.text
except Exception:
continue
return None
def process_header_names(self, header_string: str, log_file: str) -> Optional[str]:
# process author names from header string
the_data = {"names": header_string}
the_url = "http://" + self.grobid_server
the_url += ":" + self.grobid_port
the_url += "/api/processHeaderNames"
res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"})
if status == 503:
time.sleep(self.sleep_time)
return self.process_header_names(header_string, log_file)
elif status != 200:
with open(log_file, "a+") as failed:
failed.write("-- AUTHOR --\n")
failed.write(header_string + "\n\n")
else:
return res.text
return None
def process_affiliations(self, aff_string: str, log_file: str) -> Optional[str]:
# process affiliation from input string
the_data = {"affiliations": aff_string}
the_url = "http://" + self.grobid_server
the_url += ":" + self.grobid_port
the_url += "/api/processAffiliations"
res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"})
if status == 503:
time.sleep(self.sleep_time)
return self.process_affiliations(aff_string, log_file)
elif status != 200:
with open(log_file, "a+") as failed:
failed.write("-- AFFILIATION --\n")
failed.write(aff_string + "\n\n")
else:
return res.text
return None
|