''' Models: RSForm API. ''' from copy import deepcopy from typing import Optional, cast from cctext import Entity, Resolver, TermForm, extract_entities, split_grams from django.core.exceptions import ValidationError from django.db import transaction from django.db.models import QuerySet from apps.library.models import LibraryItem, LibraryItemType, Version from shared import messages as msg from ..graph import Graph from .api_RSLanguage import ( extract_globals, generate_structure, get_type_prefix, guess_type, infer_template, is_base_set, is_functional, is_simple_expression, split_template ) from .Constituenta import Constituenta, CstType _INSERT_LAST: int = -1 class RSForm: ''' RSForm is math form of conceptual schema. ''' def __init__(self, model: LibraryItem): self.model = model @staticmethod def create(**kwargs) -> 'RSForm': ''' Create LibraryItem via RSForm. ''' model = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, **kwargs) return RSForm(model) @staticmethod def from_id(pk: int) -> 'RSForm': ''' Get LibraryItem by pk. ''' model = LibraryItem.objects.get(pk=pk) return RSForm(model) def save(self, *args, **kwargs): ''' Model wrapper. ''' self.model.save(*args, **kwargs) def refresh_from_db(self): ''' Model wrapper. ''' self.model.refresh_from_db() def constituents(self) -> QuerySet[Constituenta]: ''' Get QuerySet containing all constituents of current RSForm. ''' return Constituenta.objects.filter(schema=self.model) def resolver(self) -> Resolver: ''' Create resolver for text references based on schema terms. ''' result = Resolver({}) for cst in self.constituents(): entity = Entity( alias=cst.alias, nominal=cst.term_resolved, manual_forms=[ TermForm(text=form['text'], grams=split_grams(form['tags'])) for form in cst.term_forms ] ) result.context[cst.alias] = entity return result def semantic(self) -> 'SemanticInfo': ''' Access semantic information on constituents. ''' return SemanticInfo(self) @transaction.atomic def on_term_change(self, changed: list[int]): ''' Trigger cascade resolutions when term changes. ''' graph_terms = self._graph_term() expansion = graph_terms.expand_outputs(changed) expanded_change = changed + expansion resolver = self.resolver() if len(expansion) > 0: for cst_id in graph_terms.topological_order(): if cst_id not in expansion: continue cst = self.constituents().get(id=cst_id) resolved = resolver.resolve(cst.term_raw) if resolved == cst.term_resolved: continue cst.set_term_resolved(resolved) cst.save() resolver.context[cst.alias] = Entity(cst.alias, resolved) graph_defs = self._graph_text() update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed) if len(update_defs) == 0: return for cst_id in update_defs: cst = self.constituents().get(id=cst_id) resolved = resolver.resolve(cst.definition_raw) if resolved == cst.definition_resolved: continue cst.definition_resolved = resolved cst.save() def get_max_index(self, cst_type: CstType) -> int: ''' Get maximum alias index for specific CstType. ''' result: int = 0 items = Constituenta.objects \ .filter(schema=self.model, cst_type=cst_type) \ .order_by('-alias') \ .values_list('alias', flat=True) for alias in items: result = max(result, int(alias[1:])) return result @transaction.atomic def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta: ''' Create new cst from data. ''' if insert_after is None: position = _INSERT_LAST else: position = insert_after.order + 1 result = self.insert_new(data['alias'], data['cst_type'], position) result.convention = data.get('convention', '') result.definition_formal = data.get('definition_formal', '') result.term_forms = data.get('term_forms', []) result.term_raw = data.get('term_raw', '') result.definition_raw = data.get('definition_raw', '') if result.term_raw != '' or result.definition_raw != '': resolver = self.resolver() if result.term_raw != '': resolved = resolver.resolve(result.term_raw) result.term_resolved = resolved resolver.context[result.alias] = Entity(result.alias, resolved) if result.definition_raw != '': result.definition_resolved = resolver.resolve(result.definition_raw) result.save() self.on_term_change([result.id]) result.refresh_from_db() return result @transaction.atomic def insert_new( self, alias: str, cst_type: Optional[CstType] = None, position: int = _INSERT_LAST, **kwargs ) -> Constituenta: ''' Insert new constituenta at given position. All following constituents order is shifted by 1 position. ''' if self.constituents().filter(alias=alias).exists(): raise ValidationError(msg.aliasTaken(alias)) position = self._get_insert_position(position) if cst_type is None: cst_type = guess_type(alias) self._shift_positions(position, 1) result = Constituenta.objects.create( schema=self.model, order=position, alias=alias, cst_type=cst_type, **kwargs ) self.save() result.refresh_from_db() return result @transaction.atomic def insert_copy(self, items: list[Constituenta], position: int = _INSERT_LAST) -> list[Constituenta]: ''' Insert copy of target constituents updating references. ''' count = len(items) if count == 0: return [] position = self._get_insert_position(position) self._shift_positions(position, count) indices: dict[str, int] = {} for (value, _) in CstType.choices: indices[value] = self.get_max_index(cast(CstType, value)) mapping: dict[str, str] = {} for cst in items: indices[cst.cst_type] = indices[cst.cst_type] + 1 newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}' mapping[cst.alias] = newAlias result = deepcopy(items) for cst in result: cst.pk = None cst.schema = self.model cst.order = position cst.alias = mapping[cst.alias] cst.apply_mapping(mapping) cst.save() position = position + 1 self.save() return result @transaction.atomic def move_cst(self, listCst: list[Constituenta], target: int): ''' Move list of constituents to specific position ''' count_moved = 0 count_top = 0 count_bot = 0 size = len(listCst) update_list = [] for cst in self.constituents().only('id', 'order').order_by('order'): if cst not in listCst: if count_top + 1 < target: cst.order = count_top + 1 count_top += 1 else: cst.order = target + size + count_bot count_bot += 1 else: cst.order = target + count_moved count_moved += 1 update_list.append(cst) Constituenta.objects.bulk_update(update_list, ['order']) self.save() @transaction.atomic def delete_cst(self, listCst): ''' Delete multiple constituents. Do not check if listCst are from this schema. ''' for cst in listCst: cst.delete() self._reset_order() self.resolve_all_text() self.save() @transaction.atomic def substitute( self, original: Constituenta, substitution: Constituenta ): ''' Execute constituenta substitution. ''' assert original.pk != substitution.pk mapping = {original.alias: substitution.alias} self.apply_mapping(mapping) original.delete() self.on_term_change([substitution.id]) def restore_order(self): ''' Restore order based on types and term graph. ''' manager = _OrderManager(self) manager.restore_order() def reset_aliases(self): ''' Recreate all aliases based on constituents order. ''' mapping = self._create_reset_mapping() self.apply_mapping(mapping, change_aliases=True) def _create_reset_mapping(self) -> dict[str, str]: bases = cast(dict[str, int], {}) mapping = cast(dict[str, str], {}) for cst_type in CstType.values: bases[cst_type] = 1 cst_list = self.constituents().order_by('order') for cst in cst_list: alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}' bases[cst.cst_type] += 1 if cst.alias != alias: mapping[cst.alias] = alias return mapping @transaction.atomic def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False): ''' Apply rename mapping. ''' cst_list = self.constituents().order_by('order') for cst in cst_list: if cst.apply_mapping(mapping, change_aliases): cst.save() @transaction.atomic def resolve_all_text(self): ''' Trigger reference resolution for all texts. ''' graph_terms = self._graph_term() resolver = Resolver({}) for cst_id in graph_terms.topological_order(): cst = self.constituents().get(id=cst_id) resolved = resolver.resolve(cst.term_raw) resolver.context[cst.alias] = Entity(cst.alias, resolved) if resolved != cst.term_resolved: cst.term_resolved = resolved cst.save() for cst in self.constituents(): resolved = resolver.resolve(cst.definition_raw) if resolved != cst.definition_resolved: cst.definition_resolved = resolved cst.save() @transaction.atomic def create_version(self, version: str, description: str, data) -> Version: ''' Creates version for current state. ''' return Version.objects.create( item=self.model, version=version, description=description, data=data ) @transaction.atomic def produce_structure(self, target: Constituenta, parse: dict) -> list[int]: ''' Add constituents for each structural element of the target. ''' expressions = generate_structure( alias=target.alias, expression=target.definition_formal, parse=parse ) count_new = len(expressions) if count_new == 0: return [] position = target.order + 1 self._shift_positions(position, count_new) result = [] cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION free_index = self.get_max_index(cst_type) + 1 prefix = get_type_prefix(cst_type) for text in expressions: new_item = Constituenta.objects.create( schema=self.model, order=position, alias=f'{prefix}{free_index}', definition_formal=text, cst_type=cst_type ) result.append(new_item.id) free_index = free_index + 1 position = position + 1 self.save() return result def _shift_positions(self, start: int, shift: int): if shift == 0: return update_list = \ Constituenta.objects \ .only('id', 'order', 'schema') \ .filter(schema=self.model, order__gte=start) for cst in update_list: cst.order += shift Constituenta.objects.bulk_update(update_list, ['order']) def _get_last_position(self): if self.constituents().exists(): return self.constituents().count() else: return 0 def _get_insert_position(self, position: int) -> int: if position <= 0 and position != _INSERT_LAST: raise ValidationError(msg.invalidPosition()) lastPosition = self._get_last_position() if position == _INSERT_LAST: position = lastPosition + 1 else: position = max(1, min(position, lastPosition + 1)) return position @transaction.atomic def _reset_order(self): order = 1 for cst in self.constituents().only('id', 'order').order_by('order'): if cst.order != order: cst.order = order cst.save() order += 1 def _graph_formal(self) -> Graph[int]: ''' Graph based on formal definitions. ''' result: Graph[int] = Graph() cst_list = \ self.constituents() \ .only('id', 'order', 'alias', 'definition_formal') \ .order_by('order') for cst in cst_list: result.add_node(cst.id) for cst in cst_list: for alias in extract_globals(cst.definition_formal): try: child = cst_list.get(alias=alias) result.add_edge(src=child.id, dest=cst.id) except Constituenta.DoesNotExist: pass return result def _graph_term(self) -> Graph[int]: ''' Graph based on term texts. ''' result: Graph[int] = Graph() cst_list = \ self.constituents() \ .only('id', 'order', 'alias', 'term_raw') \ .order_by('order') for cst in cst_list: result.add_node(cst.id) for cst in cst_list: for alias in extract_entities(cst.term_raw): try: child = cst_list.get(alias=alias) result.add_edge(src=child.id, dest=cst.id) except Constituenta.DoesNotExist: pass return result def _graph_text(self) -> Graph[int]: ''' Graph based on definition texts. ''' result: Graph[int] = Graph() cst_list = \ self.constituents() \ .only('id', 'order', 'alias', 'definition_raw') \ .order_by('order') for cst in cst_list: result.add_node(cst.id) for cst in cst_list: for alias in extract_entities(cst.definition_raw): try: child = cst_list.get(alias=alias) result.add_edge(src=child.id, dest=cst.id) except Constituenta.DoesNotExist: pass return result class SemanticInfo: ''' Semantic information derived from constituents. ''' def __init__(self, schema: RSForm): self._graph = schema._graph_formal() self._items = list( schema.constituents() .only('id', 'alias', 'cst_type', 'definition_formal') .order_by('order') ) self._cst_by_alias = {cst.alias: cst for cst in self._items} self._cst_by_ID = {cst.id: cst for cst in self._items} self.info = { cst.id: { 'is_simple': False, 'is_template': False, 'parent': cst.id, 'children': [] } for cst in self._items } self._calculate_attributes() def __getitem__(self, key: int) -> dict: return self.info[key] def is_simple_expression(self, target: int) -> bool: ''' Access "is_simple" attribute. ''' return cast(bool, self.info[target]['is_simple']) def is_template(self, target: int) -> bool: ''' Access "is_template" attribute. ''' return cast(bool, self.info[target]['is_template']) def parent(self, target: int) -> int: ''' Access "parent" attribute. ''' return cast(int, self.info[target]['parent']) def children(self, target: int) -> list[int]: ''' Access "children" attribute. ''' return cast(list[int], self.info[target]['children']) def _calculate_attributes(self): for cst_id in self._graph.topological_order(): cst = self._cst_by_ID[cst_id] self.info[cst_id]['is_template'] = infer_template(cst.definition_formal) self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst) if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED: continue parent = self._infer_parent(cst) self.info[cst_id]['parent'] = parent if parent != cst_id: self.info[parent]['children'].append(cst_id) def _infer_simple_expression(self, target: Constituenta) -> bool: if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type): return False dependencies = self._graph.inputs[target.id] has_complex_dependency = any( self.is_template(cst_id) and not self.is_simple_expression(cst_id) for cst_id in dependencies ) if has_complex_dependency: return False if is_functional(target.cst_type): return is_simple_expression(split_template(target.definition_formal)['body']) else: return is_simple_expression(target.definition_formal) def _infer_parent(self, target: Constituenta) -> int: sources = self._extract_sources(target) if len(sources) != 1: return target.id parent_id = next(iter(sources)) parent = self._cst_by_ID[parent_id] if is_base_set(parent.cst_type): return target.id return parent_id def _extract_sources(self, target: Constituenta) -> set[int]: sources: set[int] = set() if not is_functional(target.cst_type): for parent_id in self._graph.inputs[target.id]: parent_info = self[parent_id] if not parent_info['is_template'] or not parent_info['is_simple']: sources.add(parent_info['parent']) return sources expression = split_template(target.definition_formal) body_dependencies = extract_globals(expression['body']) for alias in body_dependencies: parent = self._cst_by_alias.get(alias) if not parent: continue parent_info = self[parent.id] if not parent_info['is_template'] or not parent_info['is_simple']: sources.add(parent_info['parent']) if self._need_check_head(sources, expression['head']): head_dependencies = extract_globals(expression['head']) for alias in head_dependencies: parent = self._cst_by_alias.get(alias) if not parent: continue parent_info = self[parent.id] if not is_base_set(parent.cst_type) and \ (not parent_info['is_template'] or not parent_info['is_simple']): sources.add(parent_info['parent']) return sources def _need_check_head(self, sources: set[int], head: str) -> bool: if len(sources) == 0: return True elif len(sources) != 1: return False else: base = self._cst_by_ID[next(iter(sources))] return not is_functional(base.cst_type) or \ split_template(base.definition_formal)['head'] != head class _OrderManager: ''' Ordering helper class ''' def __init__(self, schema: RSForm): self._semantic = schema.semantic() self._graph = schema._graph_formal() self._items = list( schema.constituents() .only('id', 'order', 'alias', 'cst_type', 'definition_formal') .order_by('order') ) self._cst_by_ID = {cst.id: cst for cst in self._items} def restore_order(self) -> None: ''' Implement order restoration process. ''' if len(self._items) <= 1: return self._fix_kernel() self._fix_topological() self._fix_semantic_children() self._save_order() def _fix_topological(self) -> None: sorted_ids = self._graph.sort_stable([cst.id for cst in self._items]) sorted_items = [next(cst for cst in self._items if cst.id == id) for id in sorted_ids] self._items = sorted_items def _fix_kernel(self) -> None: result = [cst for cst in self._items if cst.cst_type == CstType.BASE] result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT] kernel = [ cst.id for cst in self._items if cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or self._cst_by_ID[self._semantic.parent(cst.id)].cst_type == CstType.STRUCTURED ] kernel = kernel + self._graph.expand_inputs(kernel) result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.id in kernel] result = result + [cst for cst in self._items if result.count(cst) == 0] self._items = result def _fix_semantic_children(self) -> None: result: list[Constituenta] = [] marked: set[Constituenta] = set() for cst in self._items: if cst in marked: continue result.append(cst) children = self._semantic[cst.id]['children'] if len(children) == 0: continue for child in self._items: if child.id in children: marked.add(child) result.append(child) self._items = result @transaction.atomic def _save_order(self) -> None: order = 1 for cst in self._items: cst.order = order cst.save() order += 1