ConceptPortal-public/rsconcept/backend/cctext/ruparser.py

489 lines
19 KiB
Python
Raw Normal View History

2023-08-16 21:26:59 +03:00
''' Parsing russian language using pymorphy2 and natasha libraries. '''
from __future__ import annotations
from typing import Optional
2023-08-16 21:26:59 +03:00
from razdel.substring import Substring as Segment
from pymorphy2.analyzer import Parse as WordParse
2023-08-16 21:26:59 +03:00
from .syntax import RuSyntax, Capitalization
from .rumodel import SemanticRole, Morphology, WordTag, morpho, Grammemes
2023-08-16 21:26:59 +03:00
INDEX_NONE = -1
NO_COORDINATION = -1
WORD_NONE = -1
class WordToken:
2023-08-17 21:23:54 +03:00
''' Atomic text token. '''
def __init__(self, segment: Segment, parse: list[WordParse], main_parse: int = 0):
2023-08-16 21:26:59 +03:00
self.segment: Segment = segment
self.forms: list[WordParse] = parse
self.main: int = main_parse
2023-08-16 21:26:59 +03:00
def get_morpho(self) -> Morphology:
''' Return morphology for current token. '''
return Morphology(self.get_parse().tag)
2023-08-16 21:26:59 +03:00
def get_parse(self) -> WordParse:
2023-08-16 21:26:59 +03:00
''' Access main form. '''
return self.forms[self.main]
def inflect(self, inflection_grams: set[str]) -> Optional[WordParse]:
2023-08-16 21:26:59 +03:00
''' Apply inflection to segment text. Does not modify forms '''
inflected = self.get_parse().inflect(inflection_grams)
2023-08-16 21:26:59 +03:00
if not inflected:
return None
self.segment.text = Capitalization.from_text(self.segment.text).apply_to(inflected.word)
return inflected
class Collation:
''' Parsed data for input coordinated text. '''
def __init__(self, text: str):
self.text = text
2023-08-17 21:23:54 +03:00
self.words: list[WordToken] = []
self.coordination: list[int] = []
2023-08-16 21:26:59 +03:00
self.main_word: int = WORD_NONE
2023-08-17 21:23:54 +03:00
def is_valid(self) -> bool:
''' Check if data is parsed correctly '''
return self.main_word != WORD_NONE
2023-08-16 21:26:59 +03:00
def get_form(self) -> WordParse:
''' Access WordParse. '''
return self.words[self.main_word].get_parse()
2023-08-16 21:26:59 +03:00
def get_morpho(self) -> Morphology:
2023-12-26 14:23:51 +03:00
''' Access parsed main morphology. '''
2023-08-16 21:26:59 +03:00
return self.words[self.main_word].get_morpho()
def add_word(self, segment, forms: list, main_form: int, need_coordination: bool = True):
''' Add word information. '''
self.words.append(WordToken(segment, forms, main_form))
self.coordination.append(NO_COORDINATION if not need_coordination else 0)
def inflect(self, target_grams: Grammemes) -> str:
2023-08-16 21:26:59 +03:00
''' Inflect text to match required tags. '''
2023-08-17 21:23:54 +03:00
if self.is_valid():
origin = self.get_morpho()
if not origin.tag.grammemes.issuperset(target_grams):
if self._apply_inflection(origin, target_grams):
2023-08-17 21:23:54 +03:00
return self._generate_text()
return self.text
2023-08-16 21:26:59 +03:00
def inflect_like(self, base_model: Collation) -> str:
''' Create inflection to substitute base_model form. '''
2023-08-17 21:23:54 +03:00
if self.is_valid():
morph = base_model.get_morpho()
if morph.effective_POS:
tags = set()
tags.add(morph.effective_POS)
tags = morph.complete_grams(tags)
2023-08-17 21:23:54 +03:00
return self.inflect(tags)
return self.text
2023-08-16 21:26:59 +03:00
def inflect_dependant(self, master_model: Collation) -> str:
''' Create inflection to coordinate with master_model form. '''
2023-08-17 21:23:54 +03:00
assert self.is_valid()
2023-08-16 21:26:59 +03:00
morph = master_model.get_morpho()
tags = morph.coordination_grams()
tags = self.get_morpho().complete_grams(tags)
2023-08-16 21:26:59 +03:00
return self.inflect(tags)
def normal_form(self) -> str:
''' Generate normal form. '''
2023-08-17 21:23:54 +03:00
if self.is_valid():
main_form = self.get_form()
new_morpho = Morphology(main_form.normalized.tag)
new_grams = new_morpho.complete_grams(frozenset())
return self.inflect(new_grams)
2023-08-17 21:23:54 +03:00
return self.text
2023-08-16 21:26:59 +03:00
def _iterate_coordinated(self):
words_count = len(self.words)
current_word = self.coordination[words_count]
while current_word != words_count:
yield self.words[current_word]
current_word += self.coordination[current_word]
def _inflect_main_word(self, origin: Morphology, target_grams: Grammemes) -> Optional[Morphology]:
full_grams = origin.complete_grams(target_grams)
inflected = self.words[self.main_word].inflect(full_grams)
2023-08-16 21:26:59 +03:00
if not inflected:
return None
return Morphology(inflected.tag)
def _apply_inflection(self, origin: Morphology, target_grams: Grammemes) -> bool:
new_moprho = self._inflect_main_word(origin, target_grams)
2023-08-16 21:26:59 +03:00
if not new_moprho:
return False
inflection_grams = new_moprho.coordination_grams()
if len(inflection_grams) == 0:
2023-08-16 21:26:59 +03:00
return True
for word in self._iterate_coordinated():
word.inflect(inflection_grams)
2023-08-16 21:26:59 +03:00
return True
def _generate_text(self) -> str:
current_pos = 0
result = ''
for token in self.words:
if token.segment.start > current_pos:
result += self.text[current_pos: token.segment.start]
result += token.segment.text
current_pos = token.segment.stop
if current_pos + 1 < len(self.text):
result += self.text[current_pos:]
return result
class PhraseParser:
2023-08-16 21:26:59 +03:00
''' Russian grammar parser. '''
def __init__(self):
pass
def __del__(self):
pass
_FILTER_SCORE = 0.005
_SINGLE_SCORE_SEARCH = 0.2
_PRIORITY_NONE = NO_COORDINATION
_MAIN_WAIT_LIMIT = 10 # count words until fixing main
2023-08-16 21:26:59 +03:00
_MAIN_MAX_FOLLOWERS = 3 # count words after main as coordination candidates
2023-08-17 21:23:54 +03:00
def parse(self, text: str,
require_index: int = INDEX_NONE,
require_grams: Optional[Grammemes] = None) -> Optional[Collation]:
2023-08-17 21:23:54 +03:00
'''
Determine morpho tags for input text.
::returns:: Morphology of a text or None if no suitable form is available
'''
2023-08-16 21:26:59 +03:00
segments = list(RuSyntax.tokenize(text))
2023-08-17 21:23:54 +03:00
if len(segments) == 0:
return None
elif len(segments) == 1:
return self._parse_single(segments[0], require_index, require_grams)
2023-08-16 21:26:59 +03:00
else:
return self._parse_multiword(text, segments, require_index, require_grams)
2023-08-16 21:26:59 +03:00
def normalize(self, text: str):
''' Get normal form for target text. '''
processed = self.parse(text)
2023-08-17 21:23:54 +03:00
if processed:
return processed.normal_form()
return text
2023-08-16 21:26:59 +03:00
def find_substr(self, text: str, sub: str) -> tuple[int, int]:
''' Search for substring position in text regardless of morphology. '''
if not text or not sub:
return (0, 0)
query = [self.normalize(elem.text) for elem in RuSyntax.tokenize(sub)]
query_len = len(query)
start = 0
current_index = 0
for token in RuSyntax.tokenize(text):
text_word = self.normalize(token.text)
if text_word != query[current_index]:
current_index = 0
else:
if current_index == 0:
start = token.start
current_index += 1
if current_index == query_len:
return (start, token.stop)
return (0, 0)
def inflect_context(self, text: str, before: str = '', after: str = '') -> str:
2023-08-16 21:26:59 +03:00
''' Inflect text in accordance to context before and after. '''
target = self.parse(text)
if not target:
return text
target_morpho = target.get_morpho()
if not target_morpho or not target_morpho.can_coordinate:
return text
model_after = self.parse(after)
model_before = self.parse(before)
etalon = PhraseParser._choose_context_etalon(target_morpho, model_before, model_after)
2023-08-16 21:26:59 +03:00
if not etalon:
return text
etalon_moprho = etalon.get_morpho()
if not etalon_moprho.can_coordinate:
return text
new_form = PhraseParser._combine_morpho(target_morpho, etalon_moprho.tag)
2023-08-16 21:26:59 +03:00
return target.inflect(new_form)
def inflect_substitute(self, substitute_normal: str, original: str) -> str:
''' Inflect substitute to match original form. '''
original_model = self.parse(original)
if not original_model:
return substitute_normal
substitute_model = self.parse(substitute_normal)
if not substitute_model:
return substitute_normal
return substitute_model.inflect_like(original_model)
def inflect_dependant(self, dependant_normal: str, master: str) -> str:
''' Inflect dependant to coordinate with master text. '''
master_model = self.parse(master)
if not master_model:
return dependant_normal
dependant_model = self.parse(dependant_normal)
if not dependant_model:
return dependant_normal
return dependant_model.inflect_dependant(master_model)
def _parse_single(self, segment, require_index: int, require_grams: Optional[Grammemes]) -> Optional[Collation]:
2023-08-16 21:26:59 +03:00
forms = list(self._filtered_parse(segment.text))
parse_index = INDEX_NONE
if len(forms) == 0 or require_index >= len(forms):
return None
if require_index != INDEX_NONE:
tags = forms[require_index].tag
if require_grams and not tags.grammemes.issuperset(require_grams):
2023-08-16 21:26:59 +03:00
return None
parse_index = require_index
else:
current_score = 0
for (index, form) in enumerate(forms):
if not require_grams or form.tag.grammemes.issuperset(require_grams):
2023-08-16 21:26:59 +03:00
if form.tag.case == 'nomn':
parse_index = index
break
elif parse_index == INDEX_NONE:
current_score = form.score
parse_index = index
elif form.score / current_score < self._SINGLE_SCORE_SEARCH:
break
if parse_index == INDEX_NONE:
return None
result = Collation(segment.text)
result.add_word(segment, [forms[parse_index]], main_form=0, need_coordination=False)
result.coordination.append(len(result.words))
result.main_word = 0
return result
2023-08-17 21:23:54 +03:00
def _parse_multiword(self, text: str, segments: list, require_index: int,
require_grams: Optional[Grammemes]) -> Optional[Collation]:
2023-08-16 21:26:59 +03:00
result = Collation(text)
2023-08-17 21:23:54 +03:00
priority_main: float = self._PRIORITY_NONE
2023-08-16 21:26:59 +03:00
segment_index = 0
main_wait = 0
word_index = 0
for segment in segments:
if main_wait > PhraseParser._MAIN_WAIT_LIMIT:
2023-08-16 21:26:59 +03:00
break
segment_index += 1
priority = self._parse_segment(result, segment, require_index, require_grams)
2023-08-16 21:26:59 +03:00
if priority is None:
continue # skip non-parsable entities
main_wait += 1
if priority > priority_main:
result.main_word = word_index
priority_main = priority
word_index += 1
if result.main_word == INDEX_NONE:
return None
self._finalize_coordination(result)
if segment_index < len(segments):
pass # finish to parse segments after main if needed
return result
def _parse_segment(self,
output: Collation,
segment: Segment,
require_index: int,
require_grams: Optional[Grammemes]) -> Optional[float]:
2023-08-16 21:26:59 +03:00
''' Return priority for this can be a new main word '''
forms = list(self._filtered_parse(segment.text))
if len(forms) == 0:
return None
2023-08-17 21:23:54 +03:00
main_index: int = INDEX_NONE
segment_score: float = self._PRIORITY_NONE
2023-08-16 21:26:59 +03:00
needs_coordination = False
2023-08-17 21:23:54 +03:00
local_sum: float = 0
score_sum: float = 0
2023-08-16 21:26:59 +03:00
if require_index != INDEX_NONE:
form = forms[require_index]
if not require_grams or form.tag.grammemes.issuperset(require_grams):
2023-08-17 21:23:54 +03:00
(local_max, segment_score) = PhraseParser._get_priorities_for(form.tag)
2023-08-16 21:26:59 +03:00
main_index = require_index
needs_coordination = Morphology.is_dependable(form.tag.POS)
else:
local_max = self._PRIORITY_NONE
for (index, form) in enumerate(forms):
if require_grams and not form.tag.grammemes.issuperset(require_grams):
2023-08-16 21:26:59 +03:00
continue
2023-08-17 21:23:54 +03:00
(local_priority, global_priority) = PhraseParser._get_priorities_for(form.tag)
2023-08-16 21:26:59 +03:00
needs_coordination = needs_coordination or Morphology.is_dependable(form.tag.POS)
local_sum += global_priority * form.score
score_sum += form.score
if local_priority > local_max:
local_max = local_priority
# segment_score = global_priority
main_index = index
if score_sum == 0:
return None
segment_score = local_sum / score_sum
output.add_word(segment, forms, main_index, needs_coordination)
return segment_score
# Alternative: return segment_score
2023-12-26 14:23:51 +03:00
# penalty_suspicious = 0 if local_max == 0 else (1 - local_sum / local_max) * self._PRIORITY_PENALTY
# return segment_score - penalty_suspicious
2023-08-16 21:26:59 +03:00
@classmethod
def _finalize_coordination(cls, target: Collation):
main_morpho: Morphology = target.get_morpho()
main_coordinate = main_morpho.can_coordinate
target.coordination[target.main_word] = NO_COORDINATION
first_change = INDEX_NONE
current_len = 0
for (index, word) in enumerate(target.words):
if target.coordination[index] == NO_COORDINATION or index - target.main_word > cls._MAIN_MAX_FOLLOWERS:
needs_change = False
if index != target.main_word:
word.main = INDEX_NONE
else:
word.main = PhraseParser._find_coordination(word.forms, main_morpho.tag, index < target.main_word)
2023-08-16 21:26:59 +03:00
needs_change = word.main != INDEX_NONE
if not needs_change or not main_coordinate:
target.coordination[index] = NO_COORDINATION
current_len += 1
if needs_change and main_coordinate:
target.coordination[index] = current_len
current_len = 0
if first_change == INDEX_NONE:
first_change = index
if first_change == INDEX_NONE:
target.coordination.append(len(target.words))
return
previous_reference = first_change
current_word = len(target.words)
target.coordination.append(current_len + 1)
while target.coordination[current_word] != INDEX_NONE:
previous_word = current_word - target.coordination[current_word]
target.coordination[current_word] = previous_reference
previous_reference = current_word - previous_word
current_word = previous_word
if previous_reference == 0 or current_word < 0:
break
@staticmethod
def _find_coordination(forms: list, main_tag: WordTag, before_main: bool) -> int:
for (index, form) in enumerate(forms):
pos = form.tag.POS
case = form.tag.case
if pos not in ['ADJF', 'ADJS', 'PRTF', 'PRTS']:
continue
if SemanticRole.from_POS(pos) == SemanticRole.term and case == 'gent':
2023-08-16 21:26:59 +03:00
if before_main:
continue
else:
return INDEX_NONE
if case == main_tag.case:
return index
elif main_tag.case in ['accs', 'gent'] and case in ['accs', 'gent']:
return index
return INDEX_NONE
@staticmethod
def _filtered_parse(text: str):
capital = Capitalization.from_text(text)
score_filter = PhraseParser._filter_score(morpho.parse(text))
for form in PhraseParser._filter_capital(score_filter, capital):
2023-08-16 21:26:59 +03:00
yield form
@staticmethod
def _filter_score(generator):
for form in generator:
if form.score < PhraseParser._FILTER_SCORE:
2023-08-16 21:26:59 +03:00
break
yield form
@staticmethod
def _filter_capital(generator, capital: Capitalization):
if capital in [Capitalization.upper_case, Capitalization.mixed]:
for form in generator:
if 'Abbr' not in form.tag.grammemes:
continue
yield form
else:
for form in generator:
yield form
@staticmethod
2023-08-17 21:23:54 +03:00
def _parse_word(text: str, require_index: int = INDEX_NONE,
require_grams: Optional[Grammemes] = None) -> Optional[Morphology]:
2023-08-16 21:26:59 +03:00
parsed_variants = morpho.parse(text)
if not parsed_variants or require_index >= len(parsed_variants):
return None
if require_index != INDEX_NONE:
tags = parsed_variants[require_index].tag
if not require_grams or tags.grammemes.issuperset(require_grams):
2023-08-16 21:26:59 +03:00
return Morphology(tags)
else:
return None
else:
for variant in parsed_variants:
tags = variant.tag
if not require_grams or tags.grammemes.issuperset(require_grams):
2023-08-16 21:26:59 +03:00
return Morphology(tags)
return None
@staticmethod
2023-08-17 21:23:54 +03:00
def _get_priorities_for(tag: WordTag) -> tuple[float, float]:
2023-08-16 21:26:59 +03:00
''' Return pair of local and global priorities. '''
if tag.POS in ['VERB', 'INFN']:
return (9, 10)
if tag.POS in ['NOUN', 'NPRO']:
return (10, 9) if 'nomn' in tag.grammemes and 'Fixd' not in tag.grammemes else (8, 8)
if tag.POS in ['PRTF', 'PRTS']:
return (6, 6)
if tag.POS in ['ADJF', 'ADJS']:
return (5, 5)
if tag.POS == 'ADVB':
return (7, 4)
return (0, 0)
@staticmethod
2023-08-17 21:23:54 +03:00
def _choose_context_etalon(target: Morphology,
before: Optional[Collation],
after: Optional[Collation]) -> Optional[Collation]:
2023-08-16 21:26:59 +03:00
if not before or not before.get_morpho().can_coordinate:
return after
if not after or not after.get_morpho().can_coordinate:
return before
before_semantic = before.get_morpho().semantic
after_semantic = after.get_morpho().semantic
if target.semantic == SemanticRole.definition:
if after_semantic == SemanticRole.term:
return after
if before_semantic == SemanticRole.term:
return before
if before_semantic == SemanticRole.definition:
return before
return after
if target.semantic == SemanticRole.term:
if before_semantic == SemanticRole.definition:
return before
if after_semantic == SemanticRole.definition:
return after
return before
@staticmethod
2023-08-17 21:23:54 +03:00
def _combine_morpho(target: Morphology, etalon: WordTag) -> frozenset[str]:
2023-08-16 21:26:59 +03:00
part_of_speech = target.tag.POS
number = etalon.number
if number == 'plur':
return frozenset([part_of_speech, number, etalon.case])
else:
gender = etalon.gender if target.semantic != SemanticRole.term else target.tag.gender
return frozenset([part_of_speech, number, gender, etalon.case])