import re from typing import Dict, List, Tuple import bs4 def replace_refspans( spans_to_replace: List[Tuple[int, int, str, str]], full_string: str, pre_padding: str = "", post_padding: str = "", btwn_padding: str = ", ", ) -> str: """ For each span within the full string, replace that span with new text :param spans_to_replace: list of tuples of form (start_ind, end_ind, span_text, new_substring) :param full_string: :param pre_padding: :param post_padding: :param btwn_padding: :return: """ # assert all spans are equal to full_text span assert all([full_string[start:end] == span for start, end, span, _ in spans_to_replace]) # assert none of the spans start with the same start ind start_inds = [rep[0] for rep in spans_to_replace] assert len(set(start_inds)) == len(start_inds) # sort by start index spans_to_replace.sort(key=lambda x: x[0]) # form strings for each span group for i, entry in enumerate(spans_to_replace): start, end, span, new_string = entry # skip empties if end <= 0: continue # compute shift amount shift_amount = len(new_string) - len(span) + len(pre_padding) + len(post_padding) # shift remaining appropriately for ind in range(i + 1, len(spans_to_replace)): next_start, next_end, next_span, next_string = spans_to_replace[ind] # skip empties if next_end <= 0: continue # if overlap between ref span and current ref span, remove from replacement if next_start < end: next_start = 0 next_end = 0 next_string = "" # if ref span abuts previous reference span elif next_start == end: next_start += shift_amount next_end += shift_amount next_string = btwn_padding + pre_padding + next_string + post_padding # if ref span starts after, shift starts and ends elif next_start > end: next_start += shift_amount next_end += shift_amount next_string = pre_padding + next_string + post_padding # save adjusted span spans_to_replace[ind] = (next_start, next_end, next_span, next_string) spans_to_replace = [entry for entry in spans_to_replace if entry[1] > 0] spans_to_replace.sort(key=lambda x: x[0]) # apply shifts in series for start, end, span, new_string in spans_to_replace: assert full_string[start:end] == span full_string = full_string[:start] + new_string + full_string[end:] return full_string BRACKET_REGEX = re.compile(r"\[[1-9]\d{0,2}([,;\-\s]+[1-9]\d{0,2})*;?\]") BRACKET_STYLE_THRESHOLD = 5 SINGLE_BRACKET_REGEX = re.compile(r"\[([1-9]\d{0,2})\]") EXPANSION_CHARS = {"-", "–"} REPLACE_TABLE_TOKS = { "": "", "": "", "": "", "": "", "": "", "": "", "= span_start and sub_end <= span_end: return True return False def is_expansion_string(between_string: str) -> bool: """ Check if the string between two refs is an expansion string :param between_string: :return: """ if ( len(between_string) <= 2 and any([c in EXPANSION_CHARS for c in between_string]) and all([c in EXPANSION_CHARS.union({" "}) for c in between_string]) ): return True return False # TODO: still cases like `09bcee03baceb509d4fcf736fa1322cb8adf507f` w/ dups like ['L Jung', 'R Hessler', 'Louis Jung', 'Roland Hessler'] # example paper that has empties & duplicates: `09bce26cc7e825e15a4469e3e78b7a54898bb97f` def _clean_empty_and_duplicate_authors_from_grobid_parse( authors: List[Dict], ) -> List[Dict]: """ Within affiliation, `location` is a dict with fields , , , , etc. Too much hassle, so just take the first one that's not empty. """ # stripping empties clean_authors_list = [] for author in authors: clean_first = author["first"].strip() clean_last = author["last"].strip() clean_middle = [m.strip() for m in author["middle"]] clean_suffix = author["suffix"].strip() if clean_first or clean_last or clean_middle: author["first"] = clean_first author["last"] = clean_last author["middle"] = clean_middle author["suffix"] = clean_suffix clean_authors_list.append(author) # combining duplicates (preserve first occurrence of author name as position) key_to_author_blobs = {} ordered_keys_by_author_pos = [] for author in clean_authors_list: key = ( author["first"], author["last"], " ".join(author["middle"]), author["suffix"], ) if key not in key_to_author_blobs: key_to_author_blobs[key] = author ordered_keys_by_author_pos.append(key) else: if author["email"]: key_to_author_blobs[key]["email"] = author["email"] if author["affiliation"] and ( author["affiliation"]["institution"] or author["affiliation"]["laboratory"] or author["affiliation"]["location"] ): key_to_author_blobs[key]["affiliation"] = author["affiliation"] dedup_authors_list = [key_to_author_blobs[key] for key in ordered_keys_by_author_pos] return dedup_authors_list def sub_spans_and_update_indices( spans_to_replace: List[Tuple[int, int, str, str]], full_string: str ) -> Tuple[str, List]: """ Replace all spans and recompute indices :param spans_to_replace: :param full_string: :return: """ # TODO: check no spans overlapping # TODO: check all spans well-formed # assert all spans are equal to full_text span assert all([full_string[start:end] == token for start, end, token, _ in spans_to_replace]) # assert none of the spans start with the same start ind start_inds = [rep[0] for rep in spans_to_replace] assert len(set(start_inds)) == len(start_inds) # sort by start index spans_to_replace.sort(key=lambda x: x[0]) # compute offsets for each span new_spans = [ (start, end, token, surface, 0) for start, end, token, surface in spans_to_replace ] for i, entry in enumerate(spans_to_replace): start, end, token, surface = entry new_end = start + len(surface) offset = new_end - end # new_spans[i][1] += offset new_spans[i] = ( new_spans[i][0], new_spans[i][1] + offset, new_spans[i][2], new_spans[i][3], new_spans[i][4], ) # for new_span_entry in new_spans[i + 1 :]: # new_span_entry[4] += offset for j in range(i + 1, len(new_spans)): new_spans[j] = ( new_spans[j][0], new_spans[j][1], new_spans[j][2], new_spans[j][3], new_spans[j][4] + offset, ) # generate new text and create final spans new_text = replace_refspans(spans_to_replace, full_string, btwn_padding="") result = [ (start + offset, end + offset, token, surface) for start, end, token, surface, offset in new_spans ] return new_text, result class UniqTokenGenerator: """ Generate unique token """ def __init__(self, tok_string): self.tok_string = tok_string self.ind = 0 def __iter__(self): return self def __next__(self): return self.next() def next(self): new_token = f"{self.tok_string}{self.ind}" self.ind += 1 return new_token def normalize_grobid_id(grobid_id: str): """ Normalize grobid object identifiers :param grobid_id: :return: """ str_norm = grobid_id.upper().replace("_", "").replace("#", "") if str_norm.startswith("B"): return str_norm.replace("B", "BIBREF") if str_norm.startswith("TAB"): return str_norm.replace("TAB", "TABREF") if str_norm.startswith("FIG"): return str_norm.replace("FIG", "FIGREF") if str_norm.startswith("FORMULA"): return str_norm.replace("FORMULA", "EQREF") return str_norm def extract_formulas_from_tei_xml(sp: bs4.BeautifulSoup) -> None: """ Replace all formulas with the text :param sp: :return: """ for eq in sp.find_all("formula"): eq.replace_with(sp.new_string(eq.text.strip())) def table_to_html(table: bs4.element.Tag) -> str: """ Sub table tags with html table tags :param table_str: :return: """ for tag in table: if tag.name != "row": print(f"Unknown table subtag: {tag.name}") tag.decompose() table_str = str(table) for token, subtoken in REPLACE_TABLE_TOKS.items(): table_str = table_str.replace(token, subtoken) return table_str def extract_figures_and_tables_from_tei_xml(sp: bs4.BeautifulSoup) -> Dict[str, Dict]: """ Generate figure and table dicts :param sp: :return: """ ref_map = dict() for fig in sp.find_all("figure"): try: if fig.name and fig.get("xml:id"): if fig.get("type") == "table": ref_map[normalize_grobid_id(fig.get("xml:id"))] = { "text": ( fig.figDesc.text.strip() if fig.figDesc else fig.head.text.strip() if fig.head else "" ), "latex": None, "type": "table", "content": table_to_html(fig.table), "fig_num": fig.get("xml:id"), } else: if True in [char.isdigit() for char in fig.findNext("head").findNext("label")]: fig_num = fig.findNext("head").findNext("label").contents[0] else: fig_num = None ref_map[normalize_grobid_id(fig.get("xml:id"))] = { "text": fig.figDesc.text.strip() if fig.figDesc else "", "latex": None, "type": "figure", "content": "", "fig_num": fig_num, } except AttributeError: continue fig.decompose() return ref_map def check_if_citations_are_bracket_style(sp: bs4.BeautifulSoup) -> bool: """ Check if the document has bracket style citations :param sp: :return: """ cite_strings = [] if sp.body: for div in sp.body.find_all("div"): if div.head: continue for rtag in div.find_all("ref"): ref_type = rtag.get("type") if ref_type == "bibr": cite_strings.append(rtag.text.strip()) # check how many match bracket style bracket_style = [bool(BRACKET_REGEX.match(cite_str)) for cite_str in cite_strings] # return true if if sum(bracket_style) > BRACKET_STYLE_THRESHOLD: return True return False def sub_all_note_tags(sp: bs4.BeautifulSoup) -> bs4.BeautifulSoup: """ Sub all note tags with p tags :param para_el: :param sp: :return: """ for ntag in sp.find_all("note"): p_tag = sp.new_tag("p") p_tag.string = ntag.text.strip() ntag.replace_with(p_tag) return sp def process_formulas_in_paragraph(para_el: bs4.BeautifulSoup, sp: bs4.BeautifulSoup) -> None: """ Process all formulas in paragraph and replace with text and label :param para_el: :param sp: :return: """ for ftag in para_el.find_all("formula"): # get label if exists and insert a space between formula and label if ftag.label: label = " " + ftag.label.text ftag.label.decompose() else: label = "" ftag.replace_with(sp.new_string(f"{ftag.text.strip()}{label}")) def process_references_in_paragraph( para_el: bs4.BeautifulSoup, sp: bs4.BeautifulSoup, refs: Dict ) -> Dict: """ Process all references in paragraph and generate a dict that contains (type, ref_id, surface_form) :param para_el: :param sp: :param refs: :return: """ tokgen = UniqTokenGenerator("REFTOKEN") ref_dict = dict() for rtag in para_el.find_all("ref"): try: ref_type = rtag.get("type") # skip if citation if ref_type == "bibr": continue if ref_type == "table" or ref_type == "figure": ref_id = rtag.get("target") if ref_id and normalize_grobid_id(ref_id) in refs: # normalize reference string rtag_string = normalize_grobid_id(ref_id) else: rtag_string = None # add to ref set ref_key = tokgen.next() ref_dict[ref_key] = (rtag_string, rtag.text.strip(), ref_type) rtag.replace_with(sp.new_string(f" {ref_key} ")) else: # replace with surface form rtag.replace_with(sp.new_string(rtag.text.strip())) except AttributeError: continue return ref_dict def process_citations_in_paragraph( para_el: bs4.BeautifulSoup, sp: bs4.BeautifulSoup, bibs: Dict, bracket: bool ) -> Dict: """ Process all citations in paragraph and generate a dict for surface forms :param para_el: :param sp: :param bibs: :param bracket: :return: """ # CHECK if range between two surface forms is appropriate for bracket style expansion def _get_surface_range(start_surface, end_surface): span1_match = SINGLE_BRACKET_REGEX.match(start_surface) span2_match = SINGLE_BRACKET_REGEX.match(end_surface) if span1_match and span2_match: # get numbers corresponding to citations span1_num = int(span1_match.group(1)) span2_num = int(span2_match.group(1)) # expand if range is between 1 and 20 if 1 < span2_num - span1_num < 20: return span1_num, span2_num return None # CREATE BIBREF range between two reference ids, e.g. BIBREF1-BIBREF4 -> BIBREF1 BIBREF2 BIBREF3 BIBREF4 def _create_ref_id_range(start_ref_id, end_ref_id): start_ref_num = int(start_ref_id[6:]) end_ref_num = int(end_ref_id[6:]) return [f"BIBREF{curr_ref_num}" for curr_ref_num in range(start_ref_num, end_ref_num + 1)] # CREATE surface form range between two bracket strings, e.g. [1]-[4] -> [1] [2] [3] [4] def _create_surface_range(start_number, end_number): return [f"[{n}]" for n in range(start_number, end_number + 1)] # create citation dict with keywords cite_map = dict() tokgen = UniqTokenGenerator("CITETOKEN") for rtag in para_el.find_all("ref"): try: # get surface span, e.g. [3] surface_span = rtag.text.strip() # check if target is available (#b2 -> BID2) if rtag.get("target"): # normalize reference string rtag_ref_id = normalize_grobid_id(rtag.get("target")) # skip if rtag ref_id not in bibliography if rtag_ref_id not in bibs: cite_key = tokgen.next() rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = (None, surface_span) continue # if bracket style, only keep if surface form is bracket if bracket: # valid bracket span if surface_span and ( surface_span[0] == "[" or surface_span[-1] == "]" or surface_span[-1] == "," ): pass # invalid, replace tag with surface form and continue to next ref tag else: rtag.replace_with(sp.new_string(f" {surface_span} ")) continue # not bracket, add cite span and move on else: cite_key = tokgen.next() rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = (rtag_ref_id, surface_span) continue # EXTRA PROCESSING FOR BRACKET STYLE CITATIONS; EXPAND RANGES ### # look backward for range marker, e.g. [1]-*[3]* backward_between_span = "" for sib in rtag.previous_siblings: if sib.name == "ref": break elif type(sib) is bs4.NavigableString: backward_between_span += sib else: break # check if there's a backwards expansion, e.g. need to expand [1]-[3] -> [1] [2] [3] if is_expansion_string(backward_between_span): # get surface number range surface_num_range = _get_surface_range( rtag.find_previous_sibling("ref").text.strip(), surface_span ) # if the surface number range is reasonable (range < 20, in order), EXPAND if surface_num_range: # delete previous ref tag and anything in between (i.e. delete "-" and extra spaces) for sib in rtag.previous_siblings: if sib.name == "ref": break elif type(sib) is bs4.NavigableString: sib.replace_with(sp.new_string("")) else: break # get ref id of previous ref, e.g. [1] (#b0 -> BID0) previous_rtag = rtag.find_previous_sibling("ref") previous_rtag_ref_id = normalize_grobid_id(previous_rtag.get("target")) previous_rtag.decompose() # replace this ref tag with the full range expansion, e.g. [3] (#b2 -> BID1 BID2) id_range = _create_ref_id_range(previous_rtag_ref_id, rtag_ref_id) surface_range = _create_surface_range( surface_num_range[0], surface_num_range[1] ) replace_string = "" for range_ref_id, range_surface_form in zip(id_range, surface_range): # only replace if ref id is in bibliography, else add none if range_ref_id in bibs: cite_key = tokgen.next() cite_map[cite_key] = (range_ref_id, range_surface_form) else: cite_key = tokgen.next() cite_map[cite_key] = (None, range_surface_form) replace_string += cite_key + " " rtag.replace_with(sp.new_string(f" {replace_string} ")) # ELSE do not expand backwards and replace previous and current rtag with appropriate ref id else: # add mapping between ref id and surface form for previous ref tag previous_rtag = rtag.find_previous_sibling("ref") previous_rtag_ref_id = normalize_grobid_id(previous_rtag.get("target")) previous_rtag_surface = previous_rtag.text.strip() cite_key = tokgen.next() previous_rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = ( previous_rtag_ref_id, previous_rtag_surface, ) # add mapping between ref id and surface form for current reftag cite_key = tokgen.next() rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = (rtag_ref_id, surface_span) else: # look forward and see if expansion string, e.g. *[1]*-[3] forward_between_span = "" for sib in rtag.next_siblings: if sib.name == "ref": break elif type(sib) is bs4.NavigableString: forward_between_span += sib else: break # look forward for range marker (if is a range, continue -- range will be expanded # when we get to the second value) if is_expansion_string(forward_between_span): continue # else treat like normal reference else: cite_key = tokgen.next() rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = (rtag_ref_id, surface_span) else: cite_key = tokgen.next() rtag.replace_with(sp.new_string(f" {cite_key} ")) cite_map[cite_key] = (None, surface_span) except AttributeError: continue return cite_map def process_paragraph( sp: bs4.BeautifulSoup, para_el: bs4.element.Tag, section_names: List[Tuple], bib_dict: Dict, ref_dict: Dict, bracket: bool, ) -> Dict: """ Process one paragraph :param sp: :param para_el: :param section_names: :param bib_dict: :param ref_dict: :param bracket: if bracket style, expand and clean up citations :return: """ # return empty paragraph if no text if not para_el.text: return { "text": "", "cite_spans": [], "ref_spans": [], "eq_spans": [], "section": section_names, } # replace formulas with formula text process_formulas_in_paragraph(para_el, sp) # get references to tables and figures ref_map = process_references_in_paragraph(para_el, sp, ref_dict) # generate citation map for paragraph element (keep only cite spans with bib entry or unlinked) cite_map = process_citations_in_paragraph(para_el, sp, bib_dict, bracket) # substitute space characters para_text = re.sub(r"\s+", " ", para_el.text) para_text = re.sub(r"\s", " ", para_text) # get all cite and ref spans all_spans_to_replace = [] for span in re.finditer(r"(CITETOKEN\d+)", para_text): uniq_token = span.group() ref_id, surface_text = cite_map[uniq_token] all_spans_to_replace.append( (span.start(), span.start() + len(uniq_token), uniq_token, surface_text) ) for span in re.finditer(r"(REFTOKEN\d+)", para_text): uniq_token = span.group() ref_id, surface_text, ref_type = ref_map[uniq_token] all_spans_to_replace.append( (span.start(), span.start() + len(uniq_token), uniq_token, surface_text) ) # replace cite and ref spans and create json blobs para_text, all_spans_to_replace = sub_spans_and_update_indices(all_spans_to_replace, para_text) cite_span_blobs = [ {"start": start, "end": end, "text": surface, "ref_id": cite_map[token][0]} for start, end, token, surface in all_spans_to_replace if token.startswith("CITETOKEN") ] ref_span_blobs = [ {"start": start, "end": end, "text": surface, "ref_id": ref_map[token][0]} for start, end, token, surface in all_spans_to_replace if token.startswith("REFTOKEN") ] for cite_blob in cite_span_blobs: assert para_text[cite_blob["start"] : cite_blob["end"]] == cite_blob["text"] for ref_blob in ref_span_blobs: assert para_text[ref_blob["start"] : ref_blob["end"]] == ref_blob["text"] return { "text": para_text, "cite_spans": cite_span_blobs, "ref_spans": ref_span_blobs, "eq_spans": [], "section": section_names, } def extract_abstract_from_tei_xml( sp: bs4.BeautifulSoup, bib_dict: Dict, ref_dict: Dict, cleanup_bracket: bool ) -> List[Dict]: """ Parse abstract from soup :param sp: :param bib_dict: :param ref_dict: :param cleanup_bracket: :return: """ abstract_text = [] if sp.abstract: # process all divs if sp.abstract.div: for div in sp.abstract.find_all("div"): if div.text: if div.p: for para in div.find_all("p"): if para.text: abstract_text.append( process_paragraph( sp, para, [(None, "Abstract")], bib_dict, ref_dict, cleanup_bracket, ) ) else: if div.text: abstract_text.append( process_paragraph( sp, div, [(None, "Abstract")], bib_dict, ref_dict, cleanup_bracket, ) ) # process all paragraphs elif sp.abstract.p: for para in sp.abstract.find_all("p"): if para.text: abstract_text.append( process_paragraph( sp, para, [(None, "Abstract")], bib_dict, ref_dict, cleanup_bracket, ) ) # else just try to get the text else: if sp.abstract.text: abstract_text.append( process_paragraph( sp, sp.abstract, [(None, "Abstract")], bib_dict, ref_dict, cleanup_bracket, ) ) sp.abstract.decompose() return abstract_text def extract_body_text_from_div( sp: bs4.BeautifulSoup, div: bs4.element.Tag, sections: List[Tuple], bib_dict: Dict, ref_dict: Dict, cleanup_bracket: bool, ) -> List[Dict]: """ Parse body text from soup :param sp: :param div: :param sections: :param bib_dict: :param ref_dict: :param cleanup_bracket: :return: """ chunks = [] # check if nested divs; recursively process if div.div: for subdiv in div.find_all("div"): # has header, add to section list and process if subdiv.head: chunks += extract_body_text_from_div( sp, subdiv, sections + [(subdiv.head.get("n", None), subdiv.head.text.strip())], bib_dict, ref_dict, cleanup_bracket, ) subdiv.head.decompose() # no header, process with same section list else: chunks += extract_body_text_from_div( sp, subdiv, sections, bib_dict, ref_dict, cleanup_bracket ) # process tags individuals for tag in div: try: if tag.name == "p": if tag.text: chunks.append( process_paragraph(sp, tag, sections, bib_dict, ref_dict, cleanup_bracket) ) elif tag.name == "formula": # e.g. Y = W T X. label = tag.label.text tag.label.decompose() eq_text = tag.text chunks.append( { "text": "EQUATION", "cite_spans": [], "ref_spans": [], "eq_spans": [ { "start": 0, "end": 8, "text": "EQUATION", "ref_id": "EQREF", "raw_str": eq_text, "eq_num": label, } ], "section": sections, } ) except AttributeError: if tag.text: chunks.append( process_paragraph(sp, tag, sections, bib_dict, ref_dict, cleanup_bracket) ) return chunks def extract_body_text_from_tei_xml( sp: bs4.BeautifulSoup, bib_dict: Dict, ref_dict: Dict, cleanup_bracket: bool ) -> List[Dict]: """ Parse body text from soup :param sp: :param bib_dict: :param ref_dict: :param cleanup_bracket: :return: """ body_text = [] if sp.body: body_text = extract_body_text_from_div( sp, sp.body, [], bib_dict, ref_dict, cleanup_bracket ) sp.body.decompose() return body_text def extract_back_matter_from_tei_xml( sp: bs4.BeautifulSoup, bib_dict: Dict, ref_dict: Dict, cleanup_bracket: bool ) -> List[Dict]: """ Parse back matter from soup :param sp: :param bib_dict: :param ref_dict: :param cleanup_bracket: :return: """ back_text = [] if sp.back: for div in sp.back.find_all("div"): if div.get("type"): section_type = div.get("type") else: section_type = "" for child_div in div.find_all("div"): if child_div.head: section_title = child_div.head.text.strip() section_num = child_div.head.get("n", None) child_div.head.decompose() else: section_title = section_type section_num = None if child_div.text: if child_div.text: back_text.append( process_paragraph( sp, child_div, [(section_num, section_title)], bib_dict, ref_dict, cleanup_bracket, ) ) sp.back.decompose() return back_text