Source code for mwptoolkit.utils.preprocess_tool.sentence_operator

import re

import nltk
import stanza

from mwptoolkit.utils.data_structure import DependencyTree
from mwptoolkit.utils.utils import write_json_data,read_json_data
from mwptoolkit.utils.enum_type import EPT

[docs]def deprel_tree_to_file(train_datas, valid_datas, test_datas, path, language, use_gpu): """save deprel tree infomation to file """ nlp = stanza.Pipeline(language, processors='depparse,tokenize,pos,lemma', tokenize_pretokenized=True, logging_level='error', use_gpu=use_gpu) new_datas = [] for idx, data in enumerate(train_datas): doc = nlp(data["ques source 1"]) token_list = doc.to_dict()[0] new_datas.append({'id': data['id'], 'deprel': token_list}) for idx, data in enumerate(valid_datas): doc = nlp(data["ques source 1"]) token_list = doc.to_dict()[0] new_datas.append({'id': data['id'], 'deprel': token_list}) for idx, data in enumerate(test_datas): doc = nlp(data["ques source 1"]) token_list = doc.to_dict()[0] new_datas.append({'id': data['id'], 'deprel': token_list}) write_json_data(new_datas, path)
[docs]def get_group_nums_(train_datas, valid_datas, test_datas, path): """get group nums infomation from file. """ deprel_datas = read_json_data(path) new_datas = [] for idx, data in enumerate(train_datas): group_nums = [] num_pos = data["number position"] sent_len = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for n_pos in num_pos: pos_stack = [] group_num = [] pos_stack.append([n_pos, token_list[n_pos]["deprel"]]) head_pos = token_list[n_pos]['head'] for idx, x in enumerate(token_list): if x['head'] == head_pos and n_pos != idx: deprel = x["deprel"] pos_stack.append([idx, deprel]) while pos_stack: pos_dep = pos_stack.pop(0) pos = pos_dep[0] dep = pos_dep[1] upos = token_list[pos]['upos'] head_pos = token_list[pos]['head'] - 1 if upos not in ['NOUN', 'NUM', 'ADJ', 'VERB', 'DET', 'SYM']: continue elif upos == 'NOUN' and dep not in ['compound', 'nsubj:pass', 'nsubj', 'compound']: continue elif upos == 'VERB' and dep not in ['conj', 'root']: continue elif upos == 'ADJ' and dep not in ['amod']: continue elif upos == 'DET' and dep not in ['advmod']: continue elif upos == 'SYM' and dep not in ['obl']: continue else: group_num.append(pos) if head_pos >= 0: head_dep = token_list[head_pos]['deprel'] if [head_pos, head_dep] in pos_stack: pass else: pos_stack.append([head_pos, head_dep]) if group_num == []: group_num.append(n_pos) if len(group_num) == 1: if n_pos - 1 >= 0: group_num.append(n_pos - 1) if n_pos + 1 <= sent_len: group_num.append(n_pos + 1) group_nums.append(group_num) deprel_datas.remove(deprel_data) break data["group nums"] = group_nums for idx, data in enumerate(valid_datas): group_nums = [] num_pos = data["number position"] sent_len = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for n_pos in num_pos: pos_stack = [] group_num = [] pos_stack.append([n_pos, token_list[n_pos]["deprel"]]) head_pos = token_list[n_pos]['head'] for idx, x in enumerate(token_list): if x['head'] == head_pos and n_pos != idx: deprel = x["deprel"] pos_stack.append([idx, deprel]) while pos_stack: pos_dep = pos_stack.pop(0) pos = pos_dep[0] dep = pos_dep[1] upos = token_list[pos]['upos'] head_pos = token_list[pos]['head'] - 1 if upos not in ['NOUN', 'NUM', 'ADJ', 'VERB', 'DET', 'SYM']: continue elif upos == 'NOUN' and dep not in ['compound', 'nsubj:pass', 'nsubj', 'compound']: continue elif upos == 'VERB' and dep not in ['conj', 'root']: continue elif upos == 'ADJ' and dep not in ['amod']: continue elif upos == 'DET' and dep not in ['advmod']: continue elif upos == 'SYM' and dep not in ['obl']: continue else: group_num.append(pos) if head_pos >= 0: head_dep = token_list[head_pos]['deprel'] if [head_pos, head_dep] in pos_stack: pass else: pos_stack.append([head_pos, head_dep]) if group_num == []: group_num.append(n_pos) if len(group_num) == 1: if n_pos - 1 >= 0: group_num.append(n_pos - 1) if n_pos + 1 <= sent_len: group_num.append(n_pos + 1) group_nums.append(group_num) deprel_datas.remove(deprel_data) break data["group nums"] = group_nums for idx, data in enumerate(test_datas): group_nums = [] num_pos = data["number position"] sent_len = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for n_pos in num_pos: pos_stack = [] group_num = [] pos_stack.append([n_pos, token_list[n_pos]["deprel"]]) head_pos = token_list[n_pos]['head'] for idx, x in enumerate(token_list): if x['head'] == head_pos and n_pos != idx: deprel = x["deprel"] pos_stack.append([idx, deprel]) while pos_stack: pos_dep = pos_stack.pop(0) pos = pos_dep[0] dep = pos_dep[1] upos = token_list[pos]['upos'] head_pos = token_list[pos]['head'] - 1 if upos not in ['NOUN', 'NUM', 'ADJ', 'VERB', 'DET', 'SYM']: continue elif upos == 'NOUN' and dep not in ['compound', 'nsubj:pass', 'nsubj', 'compound']: continue elif upos == 'VERB' and dep not in ['conj', 'root']: continue elif upos == 'ADJ' and dep not in ['amod']: continue elif upos == 'DET' and dep not in ['advmod']: continue elif upos == 'SYM' and dep not in ['obl']: continue else: group_num.append(pos) if head_pos >= 0: head_dep = token_list[head_pos]['deprel'] if [head_pos, head_dep] in pos_stack: pass else: pos_stack.append([head_pos, head_dep]) if group_num == []: group_num.append(n_pos) if len(group_num) == 1: if n_pos - 1 >= 0: group_num.append(n_pos - 1) if n_pos + 1 < sent_len: group_num.append(n_pos + 1) group_nums.append(group_num) deprel_datas.remove(deprel_data) break data["group nums"] = group_nums return train_datas, valid_datas, test_datas
[docs]def span_level_deprel_tree_to_file(train_datas, valid_datas, test_datas, path, language, use_gpu): nlp = stanza.Pipeline(language, processors='depparse,tokenize,pos,lemma', tokenize_pretokenized=True, logging_level='error') new_datas = [] max_span_size = 0 for idx, data in enumerate(train_datas): sentences = split_sentence(data["ques source 1"]) dependency_infos = [] deprel_trees = [] for sentence in sentences: dependency_info = [] doc = nlp(sentence) token_list = doc.to_dict()[0] for token in token_list: deprel = token['deprel'] father_idx = token['head'] - 1 child_idx = token['id'] - 1 dependency_info.append([deprel, child_idx, father_idx]) dependency_infos.append(dependency_info) new_datas.append({'id': data['id'], 'split sentences source': sentences, 'dependency info': dependency_infos}) for idx, data in enumerate(valid_datas): sentences = split_sentence(data["ques source 1"]) dependency_infos = [] deprel_trees = [] for sentence in sentences: dependency_info = [] doc = nlp(sentence) token_list = doc.to_dict()[0] for token in token_list: deprel = token['deprel'] father_idx = token['head'] - 1 child_idx = token['id'] - 1 dependency_info.append([deprel, child_idx, father_idx]) dependency_infos.append(dependency_info) new_datas.append({'id': data['id'], 'split sentences source': sentences, 'dependency info': dependency_infos}) for idx, data in enumerate(test_datas): sentences = split_sentence(data["ques source 1"]) dependency_infos = [] deprel_trees = [] for sentence in sentences: dependency_info = [] doc = nlp(sentence) token_list = doc.to_dict()[0] for token in token_list: deprel = token['deprel'] father_idx = token['head'] - 1 child_idx = token['id'] - 1 dependency_info.append([deprel, child_idx, father_idx]) dependency_infos.append(dependency_info) new_datas.append({'id': data['id'], 'split sentences source': sentences, 'dependency info': dependency_infos}) write_json_data(new_datas, path)
[docs]def get_span_level_deprel_tree_(train_datas, valid_datas, test_datas, path): deprel_datas = read_json_data(path) max_span_size = 0 for idx, data in enumerate(train_datas): for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: masked_sentences = split_sentence(' '.join(data['question'])) span_size = len(masked_sentences) if span_size > max_span_size: max_span_size = span_size deprel_trees = [] for sentence, dependency_info in zip(deprel_data['split sentences source'], deprel_data['dependency info']): tree = DependencyTree() tree.sentence2tree(sentence.split(' '), dependency_info) deprel_trees.append(tree) data['split sentences'] = [sentence.split(' ') for sentence in masked_sentences] data['split sentences source'] = deprel_data['split sentences source'] data['dependency info'] = deprel_data['dependency info'] data['deprel tree'] = deprel_trees deprel_datas.remove(deprel_data) break for idx, data in enumerate(valid_datas): for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: masked_sentences = split_sentence(' '.join(data['question'])) span_size = len(masked_sentences) if span_size > max_span_size: max_span_size = span_size deprel_trees = [] for sentence, dependency_info in zip(deprel_data['split sentences source'], deprel_data['dependency info']): tree = DependencyTree() tree.sentence2tree(sentence.split(' '), dependency_info) deprel_trees.append(tree) data['split sentences'] = [sentence.split(' ') for sentence in masked_sentences] data['split sentences source'] = deprel_data['split sentences source'] data['dependency info'] = deprel_data['dependency info'] data['deprel tree'] = deprel_trees deprel_datas.remove(deprel_data) break for idx, data in enumerate(test_datas): for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: masked_sentences = split_sentence(' '.join(data['question'])) span_size = len(masked_sentences) if span_size > max_span_size: max_span_size = span_size deprel_trees = [] for sentence, dependency_info in zip(deprel_data['split sentences source'], deprel_data['dependency info']): tree = DependencyTree() tree.sentence2tree(sentence.split(' '), dependency_info) deprel_trees.append(tree) data['split sentences'] = [sentence.split(' ') for sentence in masked_sentences] data['split sentences source'] = deprel_data['split sentences source'] data['dependency info'] = deprel_data['dependency info'] data['deprel tree'] = deprel_trees deprel_datas.remove(deprel_data) break return train_datas, valid_datas, test_datas, max_span_size
#token_list=deprel_data['deprel']
[docs]def get_deprel_tree_(train_datas, valid_datas, test_datas, path): """get deprel tree infomation from file """ deprel_datas = read_json_data(path) deprel_tokens = [] for idx, data in enumerate(train_datas): group_nums = [] deprel_token = [] length = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for idx, x in enumerate(token_list): token = x['deprel'] if token in deprel_token: deprel_idx = deprel_token.index(token) + length else: deprel_token.append(token) deprel_idx = deprel_token.index(token) + length group_nums.append([x['head'] - 1, deprel_idx]) group_nums.append([deprel_idx, idx]) data["group nums"] = group_nums data["question"] = data["question"] + deprel_token for token in deprel_token: if token not in deprel_tokens: deprel_tokens.append(token) deprel_datas.remove(deprel_data) break for idx, data in enumerate(valid_datas): group_nums = [] deprel_token = [] length = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for idx, x in enumerate(token_list): token = x['deprel'] if token in deprel_token: deprel_idx = deprel_token.index(token) + length else: deprel_token.append(token) deprel_idx = deprel_token.index(token) + length group_nums.append([x['head'] - 1, deprel_idx]) group_nums.append([deprel_idx, idx]) data["group nums"] = group_nums data["question"] = data["question"] + deprel_token deprel_datas.remove(deprel_data) break for idx, data in enumerate(test_datas): group_nums = [] deprel_token = [] length = len(data["question"]) for deprel_data in deprel_datas: if data['id'] != deprel_data['id']: continue else: token_list = deprel_data['deprel'] for idx, x in enumerate(token_list): token = x['deprel'] if token in deprel_token: deprel_idx = deprel_token.index(token) + length else: deprel_token.append(token) deprel_idx = deprel_token.index(token) + length group_nums.append([x['head'] - 1, deprel_idx]) group_nums.append([deprel_idx, idx]) data["group nums"] = group_nums data["question"] = data["question"] + deprel_token deprel_datas.remove(deprel_data) break return train_datas, valid_datas, test_datas, deprel_tokens
[docs]def get_group_nums(datas, language, use_gpu): nlp = stanza.Pipeline(language, processors='depparse,tokenize,pos,lemma', tokenize_pretokenized=True, logging_level='error', use_gpu=use_gpu) new_datas = [] for idx, data in enumerate(datas): group_nums = [] num_pos = data["number position"] sent_len = len(data["question"]) doc = nlp(data["ques source 1"]) token_list = doc.to_dict()[0] for n_pos in num_pos: pos_stack = [] group_num = [] pos_stack.append([n_pos, token_list[n_pos]["deprel"]]) head_pos = token_list[n_pos]['head'] for idx, x in enumerate(token_list): if x['head'] == head_pos and n_pos != idx: deprel = x["deprel"] pos_stack.append([idx, deprel]) while pos_stack: pos_dep = pos_stack.pop(0) pos = pos_dep[0] dep = pos_dep[1] upos = token_list[pos]['upos'] head_pos = token_list[pos]['head'] - 1 if upos not in ['NOUN', 'NUM', 'ADJ', 'VERB', 'DET', 'SYM']: continue elif upos == 'NOUN' and dep not in ['compound', 'nsubj:pass', 'nsubj', 'compound']: continue elif upos == 'VERB' and dep not in ['conj', 'root']: continue elif upos == 'ADJ' and dep not in ['amod']: continue elif upos == 'DET' and dep not in ['advmod']: continue elif upos == 'SYM' and dep not in ['obl']: continue else: group_num.append(pos) if head_pos >= 0: head_dep = token_list[head_pos]['deprel'] if [head_pos, head_dep] in pos_stack: pass else: pos_stack.append([head_pos, head_dep]) if group_num == []: group_num.append(n_pos) if len(group_num) == 1: if n_pos - 1 >= 0: group_num.append(n_pos - 1) if n_pos + 1 <= sent_len: group_num.append(n_pos + 1) group_nums.append(group_num) #datas[idx]["group nums"]=group_nums data["group nums"] = group_nums new_datas.append(data) # group_words=[] # for group_num in group_nums: # group_word=[] # for idx in group_num: # group_word.append(token_list[idx]["text"]) # group_words.append(group_word) # path="/group_nums.json" # json_data={"sentence":data["ques source 1"],"num pos":num_pos,"words":group_words} # write_json_data(json_data,path) return new_datas
[docs]def get_deprel_tree(datas, language): nlp = stanza.Pipeline(language, processors='depparse,tokenize,pos,lemma', tokenize_pretokenized=True, logging_level='error') new_datas = [] deprel_tokens = [] for idx, data in enumerate(datas): group_nums = [] deprel_token = [] doc = nlp(data["ques source 1"]) token_list = doc.to_dict()[0] length = len(data["question"]) for idx, x in enumerate(token_list): token = x['deprel'] if token in deprel_token: deprel_idx = deprel_token.index(token) + length else: deprel_token.append(token) deprel_idx = deprel_token.index(token) + length group_nums.append([x['head'] - 1, deprel_idx]) group_nums.append([deprel_idx, idx]) data["group nums"] = group_nums data["question"] = data["question"] + deprel_token new_datas.append(data) for token in deprel_token: if token not in deprel_tokens: deprel_tokens.append(token) return new_datas, deprel_tokens
[docs]def get_span_level_deprel_tree(datas, language): nlp = stanza.Pipeline(language, processors='depparse,tokenize,pos,lemma', tokenize_pretokenized=True, logging_level='error') new_datas = [] max_span_size = 0 for idx, data in enumerate(datas): sentences = split_sentence(data["ques source 1"]) masked_sentences = split_sentence(' '.join(data['question'])) span_size = len(masked_sentences) if span_size > max_span_size: max_span_size = span_size dependency_infos = [] deprel_trees = [] for sentence in sentences: dependency_info = [] doc = nlp(sentence) token_list = doc.to_dict()[0] for token in token_list: deprel = token['deprel'] father_idx = token['head'] - 1 child_idx = token['id'] - 1 dependency_info.append([deprel, child_idx, father_idx]) tree = DependencyTree() tree.sentence2tree(sentence.split(' '), dependency_info) dependency_infos.append(dependency_info) deprel_trees.append(tree) data['split sentences'] = [sentence.split(' ') for sentence in masked_sentences] data['split sentences source'] = sentences data['deprel tree'] = deprel_trees data['dependency info'] = dependency_infos new_datas.append(data) return new_datas, max_span_size
[docs]def split_sentence(text): """split sentence by punctuations. """ #seps = [',',',','。','.','. ',';','?','!','!'] sentences = nltk.tokenize.sent_tokenize(text) #seps=',。(\. ).;?!!' #x=f"([{seps}])" #seps = ",。..;?!!" spans_posts = [] seps = ",。.;?!!" sep_pattern = re.compile(f"([{seps}])") #sep_pattern = re.compile(r',|。|(\. )|.|;|?|!|!',re.S) for sentence in sentences: spans = re.split(sep_pattern, sentence) spans = [span.strip() for span in spans if span.strip() != ''] spans_post = [] for i, span in enumerate(spans): if span in seps: if i > 0 and spans[i - 1] not in seps: spans_post[-1] += ' ' + span else: spans_post.append(span) spans_posts += spans_post return spans_posts
[docs]def find_ept_numbers_in_text(text: str, append_number_token: bool = False): numbers = [] new_text = [] # Replace "[NON-DIGIT][SPACEs].[DIGIT]" with "[NON-DIGIT][SPACEs]0.[DIGIT]" text = re.sub("([^\\d.,]+\\s*)(\\.\\d+)", "\\g<1>0\\g<2>", text) # Replace space between digits or digit and special characters like ',.' with "⌒" (to preserve original token id) text = re.sub("(\\d+)\\s+(\\.\\d+|,\\d{3}|\\d{3})", "\\1⌒\\2", text) # Original token index i = 0 prev_token = None for token in text.split(' '): # Increase token id and record original token indices token_index = [i + j for j in range(token.count('⌒') + 1)] i = max(token_index) + 1 # First, find the number patterns in the token token = token.replace('⌒', '') number_patterns = EPT.NUMBER_AND_FRACTION_PATTERN.findall(token) if number_patterns: for pattern in number_patterns: # Matched patterns, listed by order of occurrence. surface_form = pattern[0] surface_form = surface_form.replace(',', '') # Normalize the form: use the decimal point representation with 15-th position under the decimal point. is_fraction = '/' in surface_form value = eval(surface_form) if type(value) is float: surface_form = EPT.FOLLOWING_ZERO_PATTERN.sub('\\1', '%.15f' % value) numbers.append(dict(token=token_index, value=surface_form, is_text=False, is_integer='.' not in surface_form, is_ordinal=False, is_fraction=is_fraction, is_single_multiple=False, is_combined_multiple=False)) new_text.append(EPT.NUMBER_AND_FRACTION_PATTERN.sub(' \\1 %s ' % EPT.NUM_TOKEN, token)) else: # If there is no numbers in the text, then find the textual numbers. # Append the token first. new_text.append(token) # Type indicator is_ordinal = False is_fraction = False is_single_multiple = False is_combined_multiple = False subtokens = re.split('[^a-zA-Z0-9]+', token.lower()) # Split hypen-concatnated tokens like twenty-three token_value = None for subtoken in subtokens: if not subtoken: continue # convert to singular nouns for plural, singluar in EPT.PLURAL_FORMS: if subtoken.endswith(plural): subtoken = subtoken[:-len(plural)] + singluar break if subtoken in EPT.NUMBER_READINGS: if not token_value: # If this is the first value in the token, then set it as it is. token_value = EPT.NUMBER_READINGS[subtoken] is_ordinal = subtoken[-2:] in ['rd', 'th'] is_single_multiple = subtoken in EPT.MULTIPLES if is_ordinal and prev_token == 'a': # Case like 'A third' token_value = 1 / token_value else: # If a value was set before reading this subtoken, # then treat it as multiples. (e.g. one-million, three-fifths, etc.) followed_value = EPT.NUMBER_READINGS[subtoken] is_single_multiple = False is_ordinal = False if followed_value >= 100 or subtoken == 'half': # case of unit token_value *= followed_value is_combined_multiple = True elif subtoken[-2:] in ['rd', 'th']: # case of fractions token_value /= followed_value is_fraction = True else: token_value += followed_value # If a number is found. if token_value is not None: if type(token_value) is float: surface_form = EPT.FOLLOWING_ZERO_PATTERN.sub('\\1', '%.15f' % token_value) else: surface_form = str(token_value) numbers.append(dict(token=token_index, value=surface_form, is_text=True, is_integer='.' not in surface_form, is_ordinal=is_ordinal, is_fraction=is_fraction, is_single_multiple=is_single_multiple, is_combined_multiple=is_combined_multiple)) new_text.append(EPT.NUM_TOKEN) prev_token = token if append_number_token: text = ' '.join(new_text) return text, numbers