|
import glob |
|
import io |
|
import ntpath |
|
import os |
|
import time |
|
from typing import List, Optional |
|
|
|
from .client import ApiClient |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_GROBID_CONFIG = { |
|
"grobid_server": "localhost", |
|
"grobid_port": "8070", |
|
"batch_size": 1000, |
|
"sleep_time": 5, |
|
"generateIDs": False, |
|
"consolidate_header": False, |
|
"consolidate_citations": False, |
|
"include_raw_citations": True, |
|
"include_raw_affiliations": False, |
|
"max_workers": 2, |
|
} |
|
|
|
|
|
class GrobidClient(ApiClient): |
|
def __init__(self, config=None): |
|
self.config = config or DEFAULT_GROBID_CONFIG |
|
self.generate_ids = self.config["generateIDs"] |
|
self.consolidate_header = self.config["consolidate_header"] |
|
self.consolidate_citations = self.config["consolidate_citations"] |
|
self.include_raw_citations = self.config["include_raw_citations"] |
|
self.include_raw_affiliations = self.config["include_raw_affiliations"] |
|
self.max_workers = self.config["max_workers"] |
|
self.grobid_server = self.config["grobid_server"] |
|
self.grobid_port = str(self.config["grobid_port"]) |
|
self.sleep_time = self.config["sleep_time"] |
|
|
|
def process(self, input: str, output: str, service: str): |
|
batch_size_pdf = self.config["batch_size"] |
|
pdf_files = [] |
|
|
|
for pdf_file in glob.glob(input + "/*.pdf"): |
|
pdf_files.append(pdf_file) |
|
|
|
if len(pdf_files) == batch_size_pdf: |
|
self.process_batch(pdf_files, output, service) |
|
pdf_files = [] |
|
|
|
|
|
if len(pdf_files) > 0: |
|
self.process_batch(pdf_files, output, service) |
|
|
|
def process_batch(self, pdf_files: List[str], output: str, service: str) -> None: |
|
print(len(pdf_files), "PDF files to process") |
|
for pdf_file in pdf_files: |
|
self.process_pdf(pdf_file, output, service) |
|
|
|
def process_pdf_stream(self, pdf_file: str, pdf_strm: bytes, output: str, service: str) -> str: |
|
|
|
files = {"input": (pdf_file, pdf_strm, "application/pdf", {"Expires": "0"})} |
|
|
|
the_url = "http://" + self.grobid_server |
|
the_url += ":" + self.grobid_port |
|
the_url += "/api/" + service |
|
|
|
|
|
the_data = {} |
|
if self.generate_ids: |
|
the_data["generateIDs"] = "1" |
|
else: |
|
the_data["generateIDs"] = "0" |
|
|
|
if self.consolidate_header: |
|
the_data["consolidateHeader"] = "1" |
|
else: |
|
the_data["consolidateHeader"] = "0" |
|
|
|
if self.consolidate_citations: |
|
the_data["consolidateCitations"] = "1" |
|
else: |
|
the_data["consolidateCitations"] = "0" |
|
|
|
if self.include_raw_affiliations: |
|
the_data["includeRawAffiliations"] = "1" |
|
else: |
|
the_data["includeRawAffiliations"] = "0" |
|
|
|
if self.include_raw_citations: |
|
the_data["includeRawCitations"] = "1" |
|
else: |
|
the_data["includeRawCitations"] = "0" |
|
|
|
res, status = self.post( |
|
url=the_url, files=files, data=the_data, headers={"Accept": "text/plain"} |
|
) |
|
|
|
if status == 503: |
|
time.sleep(self.sleep_time) |
|
|
|
return self.process_pdf_stream( |
|
pdf_file=pdf_file, pdf_strm=pdf_strm, service=service, output=output |
|
) |
|
elif status != 200: |
|
with open(os.path.join(output, "failed.log"), "a+") as failed: |
|
failed.write(pdf_file.strip(".pdf") + "\n") |
|
print("Processing failed with error " + str(status)) |
|
return "" |
|
else: |
|
return res.text |
|
|
|
def process_pdf(self, pdf_file: str, output: str, service: str) -> None: |
|
|
|
|
|
pdf_file_name = ntpath.basename(pdf_file) |
|
filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + ".tei.xml") |
|
if os.path.isfile(filename): |
|
return |
|
|
|
print(pdf_file) |
|
pdf_strm = open(pdf_file, "rb").read() |
|
tei_text = self.process_pdf_stream(pdf_file, pdf_strm, output, service) |
|
|
|
|
|
if tei_text: |
|
with io.open(filename, "w+", encoding="utf8") as tei_file: |
|
tei_file.write(tei_text) |
|
|
|
def process_citation(self, bib_string: str, log_file: str) -> Optional[str]: |
|
|
|
the_data = {"citations": bib_string, "consolidateCitations": "0"} |
|
|
|
the_url = "http://" + self.grobid_server |
|
the_url += ":" + self.grobid_port |
|
the_url += "/api/processCitation" |
|
|
|
for _ in range(5): |
|
try: |
|
res, status = self.post( |
|
url=the_url, data=the_data, headers={"Accept": "text/plain"} |
|
) |
|
if status == 503: |
|
time.sleep(self.sleep_time) |
|
continue |
|
elif status != 200: |
|
with open(log_file, "a+") as failed: |
|
failed.write("-- BIBSTR --\n") |
|
failed.write(bib_string + "\n\n") |
|
break |
|
else: |
|
return res.text |
|
except Exception: |
|
continue |
|
|
|
return None |
|
|
|
def process_header_names(self, header_string: str, log_file: str) -> Optional[str]: |
|
|
|
the_data = {"names": header_string} |
|
|
|
the_url = "http://" + self.grobid_server |
|
the_url += ":" + self.grobid_port |
|
the_url += "/api/processHeaderNames" |
|
|
|
res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"}) |
|
|
|
if status == 503: |
|
time.sleep(self.sleep_time) |
|
return self.process_header_names(header_string, log_file) |
|
elif status != 200: |
|
with open(log_file, "a+") as failed: |
|
failed.write("-- AUTHOR --\n") |
|
failed.write(header_string + "\n\n") |
|
else: |
|
return res.text |
|
|
|
return None |
|
|
|
def process_affiliations(self, aff_string: str, log_file: str) -> Optional[str]: |
|
|
|
the_data = {"affiliations": aff_string} |
|
|
|
the_url = "http://" + self.grobid_server |
|
the_url += ":" + self.grobid_port |
|
the_url += "/api/processAffiliations" |
|
|
|
res, status = self.post(url=the_url, data=the_data, headers={"Accept": "text/plain"}) |
|
|
|
if status == 503: |
|
time.sleep(self.sleep_time) |
|
return self.process_affiliations(aff_string, log_file) |
|
elif status != 200: |
|
with open(log_file, "a+") as failed: |
|
failed.write("-- AFFILIATION --\n") |
|
failed.write(aff_string + "\n\n") |
|
else: |
|
return res.text |
|
|
|
return None |
|
|