mirror of
https://github.com/IRBorisov/ConceptPortal.git
synced 2025-06-26 04:50:36 +03:00
Implement backed for RestoreOrder functionality
This commit is contained in:
parent
8104b57097
commit
b07f0a8cd0
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
|
@ -94,6 +94,7 @@
|
|||
"Quantor",
|
||||
"razdel",
|
||||
"reagraph",
|
||||
"redef",
|
||||
"Reindex",
|
||||
"rsconcept",
|
||||
"rsedit",
|
||||
|
|
|
@ -1,42 +1,54 @@
|
|||
''' Utility: Graph implementation. '''
|
||||
from typing import Iterable, Optional, cast
|
||||
import copy
|
||||
from typing import Generic, Iterable, Optional, TypeVar
|
||||
|
||||
|
||||
class Graph:
|
||||
ItemType = TypeVar("ItemType")
|
||||
|
||||
|
||||
class Graph(Generic[ItemType]):
|
||||
''' Directed graph. '''
|
||||
def __init__(self, graph: Optional[dict[str, list[str]]]=None):
|
||||
def __init__(self, graph: Optional[dict[ItemType, list[ItemType]]]=None):
|
||||
if graph is None:
|
||||
self._graph = cast(dict[str, list[str]], {})
|
||||
self.outputs: dict[ItemType, list[ItemType]] = {}
|
||||
self.inputs: dict[ItemType, list[ItemType]] = {}
|
||||
else:
|
||||
self._graph = graph
|
||||
self.outputs = graph
|
||||
self.inputs: dict[ItemType, list[ItemType]] = {id : [] for id in graph.keys()} #type: ignore[no-redef]
|
||||
for parent in graph.keys():
|
||||
for child in graph[parent]:
|
||||
self.inputs[child].append(parent)
|
||||
|
||||
def contains(self, node_id: str) -> bool:
|
||||
def contains(self, node_id: ItemType) -> bool:
|
||||
''' Check if node is in graph. '''
|
||||
return node_id in self._graph
|
||||
return node_id in self.outputs
|
||||
|
||||
def has_edge(self, id_from: str, id_to: str) -> bool:
|
||||
def has_edge(self, src: ItemType, dest: ItemType) -> bool:
|
||||
''' Check if edge is in graph. '''
|
||||
return self.contains(id_from) and id_to in self._graph[id_from]
|
||||
return self.contains(src) and dest in self.outputs[src]
|
||||
|
||||
def add_node(self, node_id: str):
|
||||
def add_node(self, node_id: ItemType):
|
||||
''' Add node to graph. '''
|
||||
if not self.contains(node_id):
|
||||
self._graph[node_id] = []
|
||||
self.outputs[node_id] = []
|
||||
self.inputs[node_id] = []
|
||||
|
||||
def add_edge(self, id_from: str, id_to: str):
|
||||
def add_edge(self, src: ItemType, dest: ItemType):
|
||||
''' Add edge to graph. '''
|
||||
self.add_node(id_from)
|
||||
self.add_node(id_to)
|
||||
if id_to not in self._graph[id_from]:
|
||||
self._graph[id_from].append(id_to)
|
||||
self.add_node(src)
|
||||
self.add_node(dest)
|
||||
if dest not in self.outputs[src]:
|
||||
self.outputs[src].append(dest)
|
||||
if src not in self.inputs[dest]:
|
||||
self.inputs[dest].append(src)
|
||||
|
||||
def expand_outputs(self, origin: Iterable[str]) -> list[str]:
|
||||
def expand_inputs(self, origin: Iterable[ItemType]) -> list[ItemType]:
|
||||
''' Expand origin nodes forward through graph edges. '''
|
||||
result: list[str] = []
|
||||
marked: set[str] = set(origin)
|
||||
result: list[ItemType] = []
|
||||
marked: set[ItemType] = set(origin)
|
||||
for node_id in origin:
|
||||
if self.contains(node_id):
|
||||
for child_id in self._graph[node_id]:
|
||||
for child_id in self.inputs[node_id]:
|
||||
if child_id not in marked and child_id not in result:
|
||||
result.append(child_id)
|
||||
position: int = 0
|
||||
|
@ -45,19 +57,51 @@ class Graph:
|
|||
position += 1
|
||||
if node_id not in marked:
|
||||
marked.add(node_id)
|
||||
for child_id in self._graph[node_id]:
|
||||
for child_id in self.inputs[node_id]:
|
||||
if child_id not in marked and child_id not in result:
|
||||
result.append(child_id)
|
||||
return result
|
||||
|
||||
def topological_order(self) -> list[str]:
|
||||
''' Return nodes in topological order. '''
|
||||
result: list[str] = []
|
||||
marked: set[str] = set()
|
||||
for node_id in self._graph.keys():
|
||||
def expand_outputs(self, origin: Iterable[ItemType]) -> list[ItemType]:
|
||||
''' Expand origin nodes forward through graph edges. '''
|
||||
result: list[ItemType] = []
|
||||
marked: set[ItemType] = set(origin)
|
||||
for node_id in origin:
|
||||
if self.contains(node_id):
|
||||
for child_id in self.outputs[node_id]:
|
||||
if child_id not in marked and child_id not in result:
|
||||
result.append(child_id)
|
||||
position: int = 0
|
||||
while position < len(result):
|
||||
node_id = result[position]
|
||||
position += 1
|
||||
if node_id not in marked:
|
||||
marked.add(node_id)
|
||||
for child_id in self.outputs[node_id]:
|
||||
if child_id not in marked and child_id not in result:
|
||||
result.append(child_id)
|
||||
return result
|
||||
|
||||
def transitive_closure(self) -> dict[ItemType, list[ItemType]]:
|
||||
''' Generate transitive closure - list of reachable nodes for each node. '''
|
||||
result = copy.deepcopy(self.outputs)
|
||||
order = self.topological_order()
|
||||
order.reverse()
|
||||
for node_id in order:
|
||||
if len(self.inputs[node_id]) == 0:
|
||||
continue
|
||||
for parent in self.inputs[node_id]:
|
||||
result[parent] = result[parent] + [id for id in result[node_id] if not id in result[parent]]
|
||||
return result
|
||||
|
||||
def topological_order(self) -> list[ItemType]:
|
||||
''' Return nodes in SOME topological order. '''
|
||||
result: list[ItemType] = []
|
||||
marked: set[ItemType] = set()
|
||||
for node_id in self.outputs.keys():
|
||||
if node_id in marked:
|
||||
continue
|
||||
to_visit: list[str] = [node_id]
|
||||
to_visit: list[ItemType] = [node_id]
|
||||
while len(to_visit) > 0:
|
||||
node = to_visit[-1]
|
||||
if node in marked:
|
||||
|
@ -66,10 +110,33 @@ class Graph:
|
|||
to_visit.remove(node)
|
||||
else:
|
||||
marked.add(node)
|
||||
if len(self._graph[node]) <= 0:
|
||||
if len(self.outputs[node]) <= 0:
|
||||
continue
|
||||
for child_id in self._graph[node]:
|
||||
for child_id in self.outputs[node]:
|
||||
if child_id not in marked:
|
||||
to_visit.append(child_id)
|
||||
result.reverse()
|
||||
return result
|
||||
|
||||
def sort_stable(self, target: list[ItemType]) -> list[ItemType]:
|
||||
''' Returns target stable sorted in topological order based on minimal modifications. '''
|
||||
if len(target) <= 1:
|
||||
return target
|
||||
reachable = self.transitive_closure()
|
||||
test_set: set[ItemType] = set()
|
||||
result: list[ItemType] = []
|
||||
for node_id in reversed(target):
|
||||
need_move = node_id in test_set
|
||||
test_set = test_set.union(reachable[node_id])
|
||||
if not need_move:
|
||||
result.append(node_id)
|
||||
continue
|
||||
for (index, parent) in enumerate(result):
|
||||
if node_id in reachable[parent]:
|
||||
if parent in reachable[node_id]:
|
||||
result.append(node_id)
|
||||
else:
|
||||
result.insert(index, node_id)
|
||||
break
|
||||
result.reverse()
|
||||
return result
|
||||
|
|
|
@ -2,13 +2,12 @@
|
|||
import re
|
||||
|
||||
from django.db.models import (
|
||||
CASCADE, ForeignKey, Model, PositiveIntegerField,
|
||||
CASCADE, ForeignKey, Model, PositiveIntegerField, TextChoices,
|
||||
TextField, CharField, JSONField
|
||||
)
|
||||
from django.core.validators import MinValueValidator
|
||||
from django.urls import reverse
|
||||
|
||||
from .api_RSLanguage import CstType
|
||||
from ..utils import apply_pattern
|
||||
|
||||
|
||||
|
@ -20,8 +19,20 @@ def _empty_forms():
|
|||
return []
|
||||
|
||||
|
||||
class CstType(TextChoices):
|
||||
''' Type of constituenta. '''
|
||||
BASE = 'basic'
|
||||
CONSTANT = 'constant'
|
||||
STRUCTURED = 'structure'
|
||||
AXIOM = 'axiom'
|
||||
TERM = 'term'
|
||||
FUNCTION = 'function'
|
||||
PREDICATE = 'predicate'
|
||||
THEOREM = 'theorem'
|
||||
|
||||
|
||||
class Constituenta(Model):
|
||||
''' Constituenta is the base unit for every conceptual schema '''
|
||||
''' Constituenta is the base unit for every conceptual schema. '''
|
||||
schema: ForeignKey = ForeignKey(
|
||||
verbose_name='Концептуальная схема',
|
||||
to='rsform.LibraryItem',
|
||||
|
|
|
@ -1,13 +1,29 @@
|
|||
''' Models: RSForm API. '''
|
||||
from copy import deepcopy
|
||||
from typing import Iterable, Optional, Union, cast
|
||||
from typing import Optional, Union, cast
|
||||
|
||||
from django.db import transaction
|
||||
from django.db.models import QuerySet
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from cctext import Resolver, Entity, extract_entities, split_grams, TermForm
|
||||
from .api_RSLanguage import get_type_prefix, generate_structure, guess_type
|
||||
from cctext import (
|
||||
Resolver,
|
||||
Entity,
|
||||
extract_entities,
|
||||
split_grams,
|
||||
TermForm
|
||||
)
|
||||
from .api_RSLanguage import (
|
||||
extract_globals,
|
||||
get_type_prefix,
|
||||
generate_structure,
|
||||
guess_type,
|
||||
infer_template,
|
||||
is_base_set,
|
||||
is_functional,
|
||||
is_simple_expression,
|
||||
split_template
|
||||
)
|
||||
from .LibraryItem import LibraryItem, LibraryItemType
|
||||
from .Constituenta import CstType, Constituenta
|
||||
from .Version import Version
|
||||
|
@ -49,18 +65,22 @@ class RSForm:
|
|||
result.context[cst.alias] = entity
|
||||
return result
|
||||
|
||||
def semantic(self) -> 'SemanticInfo':
|
||||
''' Access semantic information on constituents. '''
|
||||
return SemanticInfo(self)
|
||||
|
||||
@transaction.atomic
|
||||
def on_term_change(self, changed: Iterable[str]):
|
||||
def on_term_change(self, changed: list[int]):
|
||||
''' Trigger cascade resolutions when term changes. '''
|
||||
graph_terms = self._term_graph()
|
||||
graph_terms = self._graph_term()
|
||||
expansion = graph_terms.expand_outputs(changed)
|
||||
expanded_change = list(changed) + expansion
|
||||
expanded_change = changed + expansion
|
||||
resolver = self.resolver()
|
||||
if len(expansion) > 0:
|
||||
for alias in graph_terms.topological_order():
|
||||
if alias not in expansion:
|
||||
for cst_id in graph_terms.topological_order():
|
||||
if cst_id not in expansion:
|
||||
continue
|
||||
cst = self.constituents().get(alias=alias)
|
||||
cst = self.constituents().get(id=cst_id)
|
||||
resolved = resolver.resolve(cst.term_raw)
|
||||
if resolved == cst.term_resolved:
|
||||
continue
|
||||
|
@ -68,12 +88,12 @@ class RSForm:
|
|||
cst.save()
|
||||
resolver.context[cst.alias] = Entity(cst.alias, resolved)
|
||||
|
||||
graph_defs = self._definition_graph()
|
||||
graph_defs = self._graph_text()
|
||||
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
|
||||
if len(update_defs) == 0:
|
||||
return
|
||||
for alias in update_defs:
|
||||
cst = self.constituents().get(alias=alias)
|
||||
for cst_id in update_defs:
|
||||
cst = self.constituents().get(id=cst_id)
|
||||
resolved = resolver.resolve(cst.definition_raw)
|
||||
if resolved == cst.definition_resolved:
|
||||
continue
|
||||
|
@ -199,7 +219,7 @@ class RSForm:
|
|||
if cst.definition_raw != '':
|
||||
cst.definition_resolved = resolver.resolve(cst.definition_raw)
|
||||
cst.save()
|
||||
self.on_term_change([cst.alias])
|
||||
self.on_term_change([cst.id])
|
||||
cst.refresh_from_db()
|
||||
return cst
|
||||
|
||||
|
@ -220,7 +240,12 @@ class RSForm:
|
|||
substitution.term_resolved = original.term_resolved
|
||||
substitution.save()
|
||||
original.delete()
|
||||
self.on_term_change([substitution.alias])
|
||||
self.on_term_change([substitution.id])
|
||||
|
||||
def restore_order(self):
|
||||
''' Restore order based on types and term graph. '''
|
||||
manager = _OrderManager(self)
|
||||
manager.restore_order()
|
||||
|
||||
def reset_aliases(self):
|
||||
''' Recreate all aliases based on constituents order. '''
|
||||
|
@ -251,10 +276,10 @@ class RSForm:
|
|||
@transaction.atomic
|
||||
def resolve_all_text(self):
|
||||
''' Trigger reference resolution for all texts. '''
|
||||
graph_terms = self._term_graph()
|
||||
graph_terms = self._graph_term()
|
||||
resolver = Resolver({})
|
||||
for alias in graph_terms.topological_order():
|
||||
cst = self.constituents().get(alias=alias)
|
||||
for cst_id in graph_terms.topological_order():
|
||||
cst = self.constituents().get(id=cst_id)
|
||||
resolved = resolver.resolve(cst.term_raw)
|
||||
resolver.context[cst.alias] = Entity(cst.alias, resolved)
|
||||
if resolved != cst.term_resolved:
|
||||
|
@ -352,30 +377,241 @@ class RSForm:
|
|||
else:
|
||||
return self.insert_new(data['alias'], data['cst_type'])
|
||||
|
||||
def _term_graph(self) -> Graph:
|
||||
result = Graph()
|
||||
def _graph_formal(self) -> Graph[int]:
|
||||
''' Graph based on formal definitions. '''
|
||||
result: Graph[int] = Graph()
|
||||
cst_list = \
|
||||
self.constituents() \
|
||||
.only('order', 'alias', 'term_raw') \
|
||||
.only('id', 'order', 'alias', 'definition_formal') \
|
||||
.order_by('order')
|
||||
for cst in cst_list:
|
||||
result.add_node(cst.alias)
|
||||
result.add_node(cst.id)
|
||||
for cst in cst_list:
|
||||
for alias in extract_entities(cst.term_raw):
|
||||
if result.contains(alias):
|
||||
result.add_edge(id_from=alias, id_to=cst.alias)
|
||||
for alias in extract_globals(cst.definition_formal):
|
||||
try:
|
||||
child = cst_list.get(alias=alias)
|
||||
result.add_edge(src=child.id, dest=cst.id)
|
||||
except Constituenta.DoesNotExist:
|
||||
pass
|
||||
return result
|
||||
|
||||
def _definition_graph(self) -> Graph:
|
||||
result = Graph()
|
||||
def _graph_term(self) -> Graph[int]:
|
||||
''' Graph based on term texts. '''
|
||||
result: Graph[int] = Graph()
|
||||
cst_list = \
|
||||
self.constituents() \
|
||||
.only('order', 'alias', 'definition_raw') \
|
||||
.only('id', 'order', 'alias', 'term_raw') \
|
||||
.order_by('order')
|
||||
for cst in cst_list:
|
||||
result.add_node(cst.alias)
|
||||
result.add_node(cst.id)
|
||||
for cst in cst_list:
|
||||
for alias in extract_entities(cst.term_raw):
|
||||
try:
|
||||
child = cst_list.get(alias=alias)
|
||||
result.add_edge(src=child.id, dest=cst.id)
|
||||
except Constituenta.DoesNotExist:
|
||||
pass
|
||||
return result
|
||||
|
||||
def _graph_text(self) -> Graph[int]:
|
||||
''' Graph based on definition texts. '''
|
||||
result: Graph[int] = Graph()
|
||||
cst_list = \
|
||||
self.constituents() \
|
||||
.only('id', 'order', 'alias', 'definition_raw') \
|
||||
.order_by('order')
|
||||
for cst in cst_list:
|
||||
result.add_node(cst.id)
|
||||
for cst in cst_list:
|
||||
for alias in extract_entities(cst.definition_raw):
|
||||
if result.contains(alias):
|
||||
result.add_edge(id_from=alias, id_to=cst.alias)
|
||||
try:
|
||||
child = cst_list.get(alias=alias)
|
||||
result.add_edge(src=child.id, dest=cst.id)
|
||||
except Constituenta.DoesNotExist:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
class SemanticInfo:
|
||||
''' Semantic information derived from constituents. '''
|
||||
def __init__(self, schema: RSForm):
|
||||
self._graph = schema._graph_formal()
|
||||
self._items = list(
|
||||
schema.constituents() \
|
||||
.only('id', 'alias', 'cst_type', 'definition_formal') \
|
||||
.order_by('order')
|
||||
)
|
||||
self._cst_by_alias = { cst.alias : cst for cst in self._items }
|
||||
self._cst_by_ID = { cst.id : cst for cst in self._items }
|
||||
self.info = {
|
||||
cst.id: {
|
||||
'is_simple' : False, \
|
||||
'is_template' : False, \
|
||||
'parent' : cst.id, \
|
||||
'children' : []
|
||||
}
|
||||
for cst in self._items
|
||||
}
|
||||
self._calculate_attributes()
|
||||
|
||||
def __getitem__(self, key: int) -> dict:
|
||||
return self.info[key]
|
||||
|
||||
def is_simple_expression(self, target: int) -> bool:
|
||||
''' Access "is_simple" attribute. '''
|
||||
return cast(bool, self.info[target]['is_simple'])
|
||||
|
||||
def is_template(self, target: int) -> bool:
|
||||
''' Access "is_template" attribute. '''
|
||||
return cast(bool, self.info[target]['is_template'])
|
||||
|
||||
def parent(self, target: int) -> int:
|
||||
''' Access "parent" attribute. '''
|
||||
return cast(int, self.info[target]['parent'])
|
||||
|
||||
def children(self, target: int) -> list[int]:
|
||||
''' Access "children" attribute. '''
|
||||
return cast(list[int], self.info[target]['children'])
|
||||
|
||||
def _calculate_attributes(self):
|
||||
for cst_id in self._graph.topological_order():
|
||||
cst = self._cst_by_ID[cst_id]
|
||||
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
|
||||
self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst)
|
||||
if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED:
|
||||
continue
|
||||
parent = self._infer_parent(cst)
|
||||
self.info[cst_id]['parent'] = parent
|
||||
if parent != cst_id:
|
||||
self.info[parent]['children'].append(cst_id)
|
||||
|
||||
def _infer_simple_expression(self, target: Constituenta) -> bool:
|
||||
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
|
||||
return False
|
||||
|
||||
dependencies = self._graph.inputs[target.id]
|
||||
has_complex_dependency = any(
|
||||
self.is_template(cst_id) and \
|
||||
not self.is_simple_expression(cst_id) for cst_id in dependencies
|
||||
)
|
||||
if has_complex_dependency:
|
||||
return False
|
||||
|
||||
if is_functional(target.cst_type):
|
||||
return is_simple_expression(split_template(target.definition_formal)['body'])
|
||||
else:
|
||||
return is_simple_expression(target.definition_formal)
|
||||
|
||||
def _infer_parent(self, target: Constituenta) -> int:
|
||||
sources = self._extract_sources(target)
|
||||
if len(sources) != 1:
|
||||
return target.id
|
||||
return next(iter(sources))
|
||||
|
||||
def _extract_sources(self, target: Constituenta) -> set[int]:
|
||||
sources: set[int] = set()
|
||||
if not is_functional(target.cst_type):
|
||||
for parent_id in self._graph.inputs[target.id]:
|
||||
parent_info = self[parent_id]
|
||||
if not parent_info['is_template'] or not parent_info['is_simple']:
|
||||
sources.add(parent_info['parent'])
|
||||
return sources
|
||||
|
||||
expression = split_template(target.definition_formal)
|
||||
body_dependencies = extract_globals(expression['body'])
|
||||
for alias in body_dependencies:
|
||||
parent = self._cst_by_alias.get(alias)
|
||||
if not parent:
|
||||
continue
|
||||
|
||||
parent_info = self[parent.id]
|
||||
if not parent_info['is_template'] or not parent_info['is_simple']:
|
||||
sources.add(parent_info['parent'])
|
||||
|
||||
if self._need_check_head(sources, expression['head']):
|
||||
head_dependencies = extract_globals(expression['head'])
|
||||
for alias in head_dependencies:
|
||||
parent = self._cst_by_alias.get(alias)
|
||||
if not parent:
|
||||
continue
|
||||
|
||||
parent_info = self[parent.id]
|
||||
if not is_base_set(parent.cst_type) and \
|
||||
(not parent_info['is_template'] or not parent_info['is_simple']):
|
||||
sources.add(parent_info['parent'])
|
||||
return sources
|
||||
|
||||
def _need_check_head(self, sources: set[int], head: str)-> bool:
|
||||
if len(sources) == 0:
|
||||
return True
|
||||
elif len(sources) != 1:
|
||||
return False
|
||||
else:
|
||||
base = self._cst_by_ID[next(iter(sources))]
|
||||
return not is_functional(base.cst_type) or \
|
||||
split_template(base.definition_formal)['head'] != head
|
||||
|
||||
|
||||
class _OrderManager:
|
||||
''' Ordering helper class '''
|
||||
def __init__(self, schema: RSForm):
|
||||
self._semantic = schema.semantic()
|
||||
self._graph = schema._graph_formal()
|
||||
self._items = list(
|
||||
schema.constituents() \
|
||||
.only('id', 'order', 'alias', 'cst_type', 'definition_formal') \
|
||||
.order_by('order')
|
||||
)
|
||||
self._cst_by_ID = { cst.id : cst for cst in self._items }
|
||||
|
||||
def restore_order(self) -> None:
|
||||
''' Implement order restoration process. '''
|
||||
if len(self._items) <= 1:
|
||||
return
|
||||
self._fix_kernel()
|
||||
self._fix_topological()
|
||||
self._fix_semantic_children()
|
||||
self._save_order()
|
||||
|
||||
def _fix_topological(self) -> None:
|
||||
sorted_ids = self._graph.sort_stable([cst.id for cst in self._items])
|
||||
sorted_items = [next(cst for cst in self._items if cst.id == id) for id in sorted_ids]
|
||||
self._items = sorted_items
|
||||
|
||||
def _fix_kernel(self) -> None:
|
||||
result = [cst for cst in self._items if cst.cst_type == CstType.BASE]
|
||||
result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT]
|
||||
kernel = [
|
||||
cst.id for cst in self._items if \
|
||||
cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or \
|
||||
self._cst_by_ID[self._semantic.parent(cst.id)].cst_type == CstType.STRUCTURED
|
||||
]
|
||||
kernel = kernel + self._graph.expand_inputs(kernel)
|
||||
result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.id in kernel]
|
||||
result = result + [cst for cst in self._items if result.count(cst) == 0]
|
||||
self._items = result
|
||||
|
||||
def _fix_semantic_children(self) -> None:
|
||||
result: list[Constituenta] = []
|
||||
marked: set[Constituenta] = set()
|
||||
for cst in self._items:
|
||||
if cst in marked:
|
||||
continue
|
||||
result.append(cst)
|
||||
children = self._semantic[cst.id]['children']
|
||||
if len(children) == 0:
|
||||
continue
|
||||
for child in self._items:
|
||||
if child.id in children:
|
||||
marked.add(child)
|
||||
result.append(child)
|
||||
self._items = result
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def _save_order(self) -> None:
|
||||
order = 1
|
||||
for cst in self._items:
|
||||
cst.order = order
|
||||
cst.save()
|
||||
order += 1
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
''' Models: Definitions and utility function for RSLanguage. '''
|
||||
import json
|
||||
from typing import Tuple, cast
|
||||
import re
|
||||
from typing import Set, Tuple, cast
|
||||
from enum import IntEnum , unique
|
||||
|
||||
from django.db.models import TextChoices
|
||||
|
||||
import pyconcept
|
||||
|
||||
from .. import messages as msg
|
||||
from .Constituenta import CstType
|
||||
|
||||
|
||||
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
|
||||
_RE_TEMPLATE = r'R\d+'
|
||||
_RE_COMPLEX_SYMBOLS = r'[∀∃×ℬ;|:]'
|
||||
|
||||
|
||||
@unique
|
||||
|
@ -22,38 +27,51 @@ class TokenType(IntEnum):
|
|||
REDUCE = 299
|
||||
|
||||
|
||||
class CstType(TextChoices):
|
||||
''' Type of constituenta '''
|
||||
BASE = 'basic'
|
||||
CONSTANT = 'constant'
|
||||
STRUCTURED = 'structure'
|
||||
AXIOM = 'axiom'
|
||||
TERM = 'term'
|
||||
FUNCTION = 'function'
|
||||
PREDICATE = 'predicate'
|
||||
THEOREM = 'theorem'
|
||||
|
||||
|
||||
def get_type_prefix(cst_type: CstType) -> str:
|
||||
''' Get alias prefix. '''
|
||||
if cst_type == CstType.BASE:
|
||||
return 'X'
|
||||
if cst_type == CstType.CONSTANT:
|
||||
return 'C'
|
||||
if cst_type == CstType.STRUCTURED:
|
||||
return 'S'
|
||||
if cst_type == CstType.AXIOM:
|
||||
return 'A'
|
||||
if cst_type == CstType.TERM:
|
||||
return 'D'
|
||||
if cst_type == CstType.FUNCTION:
|
||||
return 'F'
|
||||
if cst_type == CstType.PREDICATE:
|
||||
return 'P'
|
||||
if cst_type == CstType.THEOREM:
|
||||
return 'T'
|
||||
match cst_type:
|
||||
case CstType.BASE: return 'X'
|
||||
case CstType.CONSTANT: return 'C'
|
||||
case CstType.STRUCTURED: return 'S'
|
||||
case CstType.AXIOM: return 'A'
|
||||
case CstType.TERM: return 'D'
|
||||
case CstType.FUNCTION: return 'F'
|
||||
case CstType.PREDICATE: return 'P'
|
||||
case CstType.THEOREM: return 'T'
|
||||
return 'X'
|
||||
|
||||
|
||||
def is_basic_concept(cst_type: CstType) -> bool:
|
||||
''' Evaluate if CstType is basic concept.'''
|
||||
return cst_type in [
|
||||
CstType.BASE,
|
||||
CstType.CONSTANT,
|
||||
CstType.STRUCTURED,
|
||||
CstType.AXIOM
|
||||
]
|
||||
|
||||
|
||||
def is_base_set(cst_type: CstType) -> bool:
|
||||
''' Evaluate if CstType is base set or constant set.'''
|
||||
return cst_type in [
|
||||
CstType.BASE,
|
||||
CstType.CONSTANT
|
||||
]
|
||||
|
||||
|
||||
def is_functional(cst_type: CstType) -> bool:
|
||||
''' Evaluate if CstType is function.'''
|
||||
return cst_type in [
|
||||
CstType.FUNCTION,
|
||||
CstType.PREDICATE
|
||||
]
|
||||
|
||||
|
||||
def extract_globals(expression: str) -> Set[str]:
|
||||
''' Extract all global aliases from expression. '''
|
||||
return set(re.findall(_RE_GLOBALS, expression))
|
||||
|
||||
|
||||
def guess_type(alias: str) -> CstType:
|
||||
''' Get CstType for alias. '''
|
||||
prefix = alias[0]
|
||||
|
@ -62,6 +80,7 @@ def guess_type(alias: str) -> CstType:
|
|||
return cast(CstType, value)
|
||||
return CstType.BASE
|
||||
|
||||
|
||||
def _get_structure_prefix(alias: str, expression: str, parse: dict) -> Tuple[str, str]:
|
||||
''' Generate prefix and alias for structure generation. '''
|
||||
args = parse['args']
|
||||
|
@ -71,6 +90,43 @@ def _get_structure_prefix(alias: str, expression: str, parse: dict) -> Tuple[str
|
|||
newAlias = alias + '[' + ','.join([arg['alias'] for arg in args]) + ']'
|
||||
return (newAlias, prefix)
|
||||
|
||||
|
||||
def infer_template(expression: str) -> bool:
|
||||
''' Checks if given expression is a template. '''
|
||||
return bool(re.search(_RE_TEMPLATE, expression))
|
||||
|
||||
|
||||
def is_simple_expression(expression: str) -> bool:
|
||||
''' Checks if given expression is "simple". '''
|
||||
return not bool(re.search(_RE_COMPLEX_SYMBOLS, expression))
|
||||
|
||||
|
||||
def split_template(expression: str):
|
||||
''' Splits a string containing a template definition into its head and body parts. '''
|
||||
start = 0
|
||||
for index, char in enumerate(expression):
|
||||
start = index
|
||||
if char == '[':
|
||||
break
|
||||
if start < len(expression):
|
||||
counter = 0
|
||||
for end in range(start + 1, len(expression)):
|
||||
if expression[end] == '[':
|
||||
counter += 1
|
||||
elif expression[end] == ']':
|
||||
if counter != 0:
|
||||
counter -= 1
|
||||
else:
|
||||
return {
|
||||
'head': expression[start + 1:end].strip(),
|
||||
'body': expression[end + 1:].strip()
|
||||
}
|
||||
return {
|
||||
'head': '',
|
||||
'body': expression
|
||||
}
|
||||
|
||||
|
||||
def generate_structure(alias: str, expression: str, parse: dict) -> list:
|
||||
''' Generate list of expressions for target structure. '''
|
||||
ast = json.loads(pyconcept.parse_expression(parse['typification']))['ast']
|
||||
|
|
|
@ -89,7 +89,7 @@ class ConstituentaSerializer(serializers.ModelSerializer):
|
|||
term_changed = data['term_resolved'] != instance.term_resolved
|
||||
result: Constituenta = super().update(instance, data)
|
||||
if term_changed:
|
||||
schema.on_term_change([result.alias])
|
||||
schema.on_term_change([result.id])
|
||||
result.refresh_from_db()
|
||||
schema.item.save()
|
||||
return result
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
''' Tests. '''
|
||||
from .t_imports import *
|
||||
from .s_views import *
|
||||
from .t_models import *
|
||||
from .s_models.t_RSForm import *
|
||||
from .t_serializers import *
|
||||
from .t_graph import *
|
||||
from .t_utils import *
|
||||
|
|
4
rsconcept/backend/apps/rsform/tests/s_models/__init__.py
Normal file
4
rsconcept/backend/apps/rsform/tests/s_models/__init__.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
''' Tests for REST API. '''
|
||||
from .t_Constituenta import *
|
||||
from .t_LibraryItem import *
|
||||
from .t_RSForm import *
|
|
@ -0,0 +1,68 @@
|
|||
''' Testing models: Constituenta. '''
|
||||
from django.test import TestCase
|
||||
from django.db.utils import IntegrityError
|
||||
from django.forms import ValidationError
|
||||
|
||||
from apps.rsform.models import (
|
||||
Constituenta, CstType,
|
||||
LibraryItem, LibraryItemType
|
||||
)
|
||||
|
||||
|
||||
class TestConstituenta(TestCase):
|
||||
''' Testing Constituenta model. '''
|
||||
def setUp(self):
|
||||
self.schema1 = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test1')
|
||||
self.schema2 = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test2')
|
||||
|
||||
|
||||
def test_str(self):
|
||||
testStr = 'X1'
|
||||
cst = Constituenta.objects.create(alias=testStr, schema=self.schema1, order=1, convention='Test')
|
||||
self.assertEqual(str(cst), testStr)
|
||||
|
||||
|
||||
def test_url(self):
|
||||
testStr = 'X1'
|
||||
cst = Constituenta.objects.create(alias=testStr, schema=self.schema1, order=1, convention='Test')
|
||||
self.assertEqual(cst.get_absolute_url(), f'/api/constituents/{cst.id}')
|
||||
|
||||
|
||||
def test_order_not_null(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', schema=self.schema1)
|
||||
|
||||
|
||||
def test_order_positive_integer(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', schema=self.schema1, order=-1)
|
||||
|
||||
|
||||
def test_order_min_value(self):
|
||||
with self.assertRaises(ValidationError):
|
||||
cst = Constituenta.objects.create(alias='X1', schema=self.schema1, order=0)
|
||||
cst.full_clean()
|
||||
|
||||
|
||||
def test_schema_not_null(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', order=1)
|
||||
|
||||
|
||||
def test_create_default(self):
|
||||
cst = Constituenta.objects.create(
|
||||
alias='X1',
|
||||
schema=self.schema1,
|
||||
order=1
|
||||
)
|
||||
self.assertEqual(cst.schema, self.schema1)
|
||||
self.assertEqual(cst.order, 1)
|
||||
self.assertEqual(cst.alias, 'X1')
|
||||
self.assertEqual(cst.cst_type, CstType.BASE)
|
||||
self.assertEqual(cst.convention, '')
|
||||
self.assertEqual(cst.definition_formal, '')
|
||||
self.assertEqual(cst.term_raw, '')
|
||||
self.assertEqual(cst.term_resolved, '')
|
||||
self.assertEqual(cst.term_forms, [])
|
||||
self.assertEqual(cst.definition_resolved, '')
|
||||
self.assertEqual(cst.definition_raw, '')
|
100
rsconcept/backend/apps/rsform/tests/s_models/t_LibraryItem.py
Normal file
100
rsconcept/backend/apps/rsform/tests/s_models/t_LibraryItem.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
''' Testing models: LibraryItem. '''
|
||||
from django.test import TestCase
|
||||
|
||||
from apps.rsform.models import (
|
||||
LibraryItem, LibraryItemType, Subscription,
|
||||
User
|
||||
)
|
||||
|
||||
|
||||
class TestLibraryItem(TestCase):
|
||||
''' Testing LibraryItem model. '''
|
||||
def setUp(self):
|
||||
self.user1 = User.objects.create(username='User1')
|
||||
self.user2 = User.objects.create(username='User2')
|
||||
self.assertNotEqual(self.user1, self.user2)
|
||||
|
||||
|
||||
def test_str(self):
|
||||
testStr = 'Test123'
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title=testStr,
|
||||
owner=self.user1,
|
||||
alias='КС1'
|
||||
)
|
||||
self.assertEqual(str(item), testStr)
|
||||
|
||||
|
||||
def test_url(self):
|
||||
testStr = 'Test123'
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title=testStr,
|
||||
owner=self.user1,
|
||||
alias='КС1'
|
||||
)
|
||||
self.assertEqual(item.get_absolute_url(), f'/api/library/{item.id}')
|
||||
|
||||
|
||||
def test_create_default(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertIsNone(item.owner)
|
||||
self.assertEqual(item.title, 'Test')
|
||||
self.assertEqual(item.alias, '')
|
||||
self.assertEqual(item.comment, '')
|
||||
self.assertEqual(item.is_common, False)
|
||||
self.assertEqual(item.is_canonical, False)
|
||||
|
||||
|
||||
def test_create(self):
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title='Test',
|
||||
owner=self.user1,
|
||||
alias='KS1',
|
||||
comment='Test comment',
|
||||
is_common=True,
|
||||
is_canonical=True
|
||||
)
|
||||
self.assertEqual(item.owner, self.user1)
|
||||
self.assertEqual(item.title, 'Test')
|
||||
self.assertEqual(item.alias, 'KS1')
|
||||
self.assertEqual(item.comment, 'Test comment')
|
||||
self.assertEqual(item.is_common, True)
|
||||
self.assertEqual(item.is_canonical, True)
|
||||
self.assertTrue(Subscription.objects.filter(user=item.owner, item=item).exists())
|
||||
|
||||
|
||||
def test_subscribe(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertEqual(len(item.subscribers()), 0)
|
||||
|
||||
self.assertTrue(Subscription.subscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
self.assertTrue(self.user1 in item.subscribers())
|
||||
|
||||
self.assertFalse(Subscription.subscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
|
||||
self.assertTrue(Subscription.subscribe(self.user2, item))
|
||||
self.assertEqual(len(item.subscribers()), 2)
|
||||
self.assertTrue(self.user1 in item.subscribers())
|
||||
self.assertTrue(self.user2 in item.subscribers())
|
||||
|
||||
self.user1.delete()
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
|
||||
|
||||
def test_unsubscribe(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertFalse(Subscription.unsubscribe(self.user1, item))
|
||||
Subscription.subscribe(self.user1, item)
|
||||
Subscription.subscribe(self.user2, item)
|
||||
self.assertEqual(len(item.subscribers()), 2)
|
||||
|
||||
self.assertTrue(Subscription.unsubscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
self.assertTrue(self.user2 in item.subscribers())
|
||||
|
||||
self.assertFalse(Subscription.unsubscribe(self.user1, item))
|
|
@ -1,167 +1,13 @@
|
|||
''' Testing models '''
|
||||
import json
|
||||
''' Testing models: api_RSForm. '''
|
||||
from django.test import TestCase
|
||||
from django.db.utils import IntegrityError
|
||||
from django.forms import ValidationError
|
||||
|
||||
from apps.rsform.models import (
|
||||
RSForm, Constituenta, CstType,
|
||||
User,
|
||||
LibraryItem, LibraryItemType, Subscription
|
||||
User
|
||||
)
|
||||
|
||||
|
||||
class TestConstituenta(TestCase):
|
||||
''' Testing Constituenta model. '''
|
||||
def setUp(self):
|
||||
self.schema1 = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test1')
|
||||
self.schema2 = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test2')
|
||||
|
||||
def test_str(self):
|
||||
testStr = 'X1'
|
||||
cst = Constituenta.objects.create(alias=testStr, schema=self.schema1, order=1, convention='Test')
|
||||
self.assertEqual(str(cst), testStr)
|
||||
|
||||
|
||||
def test_url(self):
|
||||
testStr = 'X1'
|
||||
cst = Constituenta.objects.create(alias=testStr, schema=self.schema1, order=1, convention='Test')
|
||||
self.assertEqual(cst.get_absolute_url(), f'/api/constituents/{cst.id}')
|
||||
|
||||
|
||||
def test_order_not_null(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', schema=self.schema1)
|
||||
|
||||
|
||||
def test_order_positive_integer(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', schema=self.schema1, order=-1)
|
||||
|
||||
|
||||
def test_order_min_value(self):
|
||||
with self.assertRaises(ValidationError):
|
||||
cst = Constituenta.objects.create(alias='X1', schema=self.schema1, order=0)
|
||||
cst.full_clean()
|
||||
|
||||
|
||||
def test_schema_not_null(self):
|
||||
with self.assertRaises(IntegrityError):
|
||||
Constituenta.objects.create(alias='X1', order=1)
|
||||
|
||||
|
||||
def test_create_default(self):
|
||||
cst = Constituenta.objects.create(
|
||||
alias='X1',
|
||||
schema=self.schema1,
|
||||
order=1
|
||||
)
|
||||
self.assertEqual(cst.schema, self.schema1)
|
||||
self.assertEqual(cst.order, 1)
|
||||
self.assertEqual(cst.alias, 'X1')
|
||||
self.assertEqual(cst.cst_type, CstType.BASE)
|
||||
self.assertEqual(cst.convention, '')
|
||||
self.assertEqual(cst.definition_formal, '')
|
||||
self.assertEqual(cst.term_raw, '')
|
||||
self.assertEqual(cst.term_resolved, '')
|
||||
self.assertEqual(cst.term_forms, [])
|
||||
self.assertEqual(cst.definition_resolved, '')
|
||||
self.assertEqual(cst.definition_raw, '')
|
||||
|
||||
|
||||
class TestLibraryItem(TestCase):
|
||||
''' Testing LibraryItem model. '''
|
||||
def setUp(self):
|
||||
self.user1 = User.objects.create(username='User1')
|
||||
self.user2 = User.objects.create(username='User2')
|
||||
self.assertNotEqual(self.user1, self.user2)
|
||||
|
||||
|
||||
def test_str(self):
|
||||
testStr = 'Test123'
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title=testStr,
|
||||
owner=self.user1,
|
||||
alias='КС1'
|
||||
)
|
||||
self.assertEqual(str(item), testStr)
|
||||
|
||||
|
||||
def test_url(self):
|
||||
testStr = 'Test123'
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title=testStr,
|
||||
owner=self.user1,
|
||||
alias='КС1'
|
||||
)
|
||||
self.assertEqual(item.get_absolute_url(), f'/api/library/{item.id}')
|
||||
|
||||
|
||||
def test_create_default(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertIsNone(item.owner)
|
||||
self.assertEqual(item.title, 'Test')
|
||||
self.assertEqual(item.alias, '')
|
||||
self.assertEqual(item.comment, '')
|
||||
self.assertEqual(item.is_common, False)
|
||||
self.assertEqual(item.is_canonical, False)
|
||||
|
||||
|
||||
def test_create(self):
|
||||
item = LibraryItem.objects.create(
|
||||
item_type=LibraryItemType.RSFORM,
|
||||
title='Test',
|
||||
owner=self.user1,
|
||||
alias='KS1',
|
||||
comment='Test comment',
|
||||
is_common=True,
|
||||
is_canonical=True
|
||||
)
|
||||
self.assertEqual(item.owner, self.user1)
|
||||
self.assertEqual(item.title, 'Test')
|
||||
self.assertEqual(item.alias, 'KS1')
|
||||
self.assertEqual(item.comment, 'Test comment')
|
||||
self.assertEqual(item.is_common, True)
|
||||
self.assertEqual(item.is_canonical, True)
|
||||
self.assertTrue(Subscription.objects.filter(user=item.owner, item=item).exists())
|
||||
|
||||
|
||||
def test_subscribe(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertEqual(len(item.subscribers()), 0)
|
||||
|
||||
self.assertTrue(Subscription.subscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
self.assertTrue(self.user1 in item.subscribers())
|
||||
|
||||
self.assertFalse(Subscription.subscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
|
||||
self.assertTrue(Subscription.subscribe(self.user2, item))
|
||||
self.assertEqual(len(item.subscribers()), 2)
|
||||
self.assertTrue(self.user1 in item.subscribers())
|
||||
self.assertTrue(self.user2 in item.subscribers())
|
||||
|
||||
self.user1.delete()
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
|
||||
|
||||
def test_unsubscribe(self):
|
||||
item = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, title='Test')
|
||||
self.assertFalse(Subscription.unsubscribe(self.user1, item))
|
||||
Subscription.subscribe(self.user1, item)
|
||||
Subscription.subscribe(self.user2, item)
|
||||
self.assertEqual(len(item.subscribers()), 2)
|
||||
|
||||
self.assertTrue(Subscription.unsubscribe(self.user1, item))
|
||||
self.assertEqual(len(item.subscribers()), 1)
|
||||
self.assertTrue(self.user2 in item.subscribers())
|
||||
|
||||
self.assertFalse(Subscription.unsubscribe(self.user1, item))
|
||||
|
||||
|
||||
class TestRSForm(TestCase):
|
||||
''' Testing RSForm wrapper. '''
|
||||
def setUp(self):
|
||||
|
@ -377,6 +223,69 @@ class TestRSForm(TestCase):
|
|||
self.assertEqual(x2.order, 1)
|
||||
|
||||
|
||||
def test_restore_order(self):
|
||||
d2 = self.schema.insert_new(
|
||||
alias='D2',
|
||||
definition_formal=r'D{ξ∈S1 | 1=1}',
|
||||
)
|
||||
d1 = self.schema.insert_new(
|
||||
alias='D1',
|
||||
definition_formal=r'Pr1(S1)\X1',
|
||||
)
|
||||
x1 = self.schema.insert_new('X1')
|
||||
x2 = self.schema.insert_new('X2')
|
||||
s1 = self.schema.insert_new(
|
||||
alias='S1',
|
||||
definition_formal='ℬ(X1×X1)'
|
||||
)
|
||||
c1 = self.schema.insert_new('C1')
|
||||
s2 = self.schema.insert_new(
|
||||
alias='S2',
|
||||
definition_formal='ℬ(X2×D1)'
|
||||
)
|
||||
a1 = self.schema.insert_new(
|
||||
alias='A1',
|
||||
definition_formal=r'D3=∅',
|
||||
)
|
||||
d3 = self.schema.insert_new(
|
||||
alias='D3',
|
||||
definition_formal=r'Pr2(S2)',
|
||||
)
|
||||
f1 = self.schema.insert_new(
|
||||
alias='F1',
|
||||
definition_formal=r'[α∈ℬ(X1)] D{σ∈S1 | α⊆pr1(σ)}',
|
||||
)
|
||||
d4 = self.schema.insert_new(
|
||||
alias='D4',
|
||||
definition_formal=r'Pr2(D3)',
|
||||
)
|
||||
|
||||
self.schema.restore_order()
|
||||
x1.refresh_from_db()
|
||||
x2.refresh_from_db()
|
||||
c1.refresh_from_db()
|
||||
s1.refresh_from_db()
|
||||
s2.refresh_from_db()
|
||||
d1.refresh_from_db()
|
||||
d2.refresh_from_db()
|
||||
d3.refresh_from_db()
|
||||
d4.refresh_from_db()
|
||||
f1.refresh_from_db()
|
||||
a1.refresh_from_db()
|
||||
|
||||
self.assertEqual(x1.order, 1)
|
||||
self.assertEqual(x2.order, 2)
|
||||
self.assertEqual(c1.order, 3)
|
||||
self.assertEqual(s1.order, 4)
|
||||
self.assertEqual(d1.order, 5)
|
||||
self.assertEqual(s2.order, 6)
|
||||
self.assertEqual(d3.order, 7)
|
||||
self.assertEqual(a1.order, 8)
|
||||
self.assertEqual(d4.order, 9)
|
||||
self.assertEqual(d2.order, 10)
|
||||
self.assertEqual(f1.order, 11)
|
||||
|
||||
|
||||
def test_reset_aliases(self):
|
||||
x1 = self.schema.insert_new(
|
||||
alias='X11',
|
||||
|
@ -437,7 +346,7 @@ class TestRSForm(TestCase):
|
|||
x1.term_resolved='слон'
|
||||
x1.save()
|
||||
|
||||
self.schema.on_term_change([x1.alias])
|
||||
self.schema.on_term_change([x1.id])
|
||||
x1.refresh_from_db()
|
||||
x2.refresh_from_db()
|
||||
x3.refresh_from_db()
|
|
@ -4,7 +4,7 @@ from rest_framework import status
|
|||
from apps.users.models import User
|
||||
from apps.rsform.models import LibraryItem, LibraryItemType, Subscription, LibraryTemplate, RSForm
|
||||
|
||||
from ..utils import response_contains
|
||||
from ..testing_utils import response_contains
|
||||
|
||||
from .EndpointTester import decl_endpoint, EndpointTester
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ from apps.rsform.models import (
|
|||
)
|
||||
|
||||
from cctext import ReferenceType
|
||||
from ..utils import response_contains
|
||||
from ..testing_utils import response_contains
|
||||
|
||||
from .EndpointTester import decl_endpoint, EndpointTester
|
||||
|
||||
|
|
|
@ -9,56 +9,107 @@ class TestGraph(unittest.TestCase):
|
|||
|
||||
def test_construction(self):
|
||||
graph = Graph()
|
||||
self.assertFalse(graph.contains('X1'))
|
||||
self.assertFalse(graph.contains(1))
|
||||
|
||||
graph.add_node('X1')
|
||||
self.assertTrue(graph.contains('X1'))
|
||||
graph.add_node(1)
|
||||
self.assertTrue(graph.contains(1))
|
||||
|
||||
graph.add_edge('X2', 'X3')
|
||||
self.assertTrue(graph.contains('X2'))
|
||||
self.assertTrue(graph.contains('X3'))
|
||||
self.assertTrue(graph.has_edge('X2', 'X3'))
|
||||
self.assertFalse(graph.has_edge('X3', 'X2'))
|
||||
graph.add_edge(2, 3)
|
||||
self.assertTrue(graph.contains(2))
|
||||
self.assertTrue(graph.contains(3))
|
||||
self.assertTrue(graph.has_edge(2, 3))
|
||||
self.assertFalse(graph.has_edge(3, 2))
|
||||
|
||||
graph = Graph({'X1': ['X3', 'X4'], 'X2': ['X1'], 'X3': [], 'X4': [], 'X5': []})
|
||||
self.assertTrue(graph.contains('X1'))
|
||||
self.assertTrue(graph.contains('X5'))
|
||||
self.assertTrue(graph.has_edge('X1', 'X3'))
|
||||
self.assertTrue(graph.has_edge('X2', 'X1'))
|
||||
graph = Graph({1: [3, 4], 2: [1], 3: [], 4: [], 5: []})
|
||||
self.assertTrue(graph.contains(1))
|
||||
self.assertTrue(graph.contains(5))
|
||||
self.assertTrue(graph.has_edge(1, 3))
|
||||
self.assertTrue(graph.has_edge(2, 1))
|
||||
|
||||
|
||||
def test_expand_outputs(self):
|
||||
graph = Graph({
|
||||
'X1': ['X2'],
|
||||
'X2': ['X3', 'X5'],
|
||||
'X3': [],
|
||||
'X5': ['X6'],
|
||||
'X6': ['X1'],
|
||||
'X7': []
|
||||
1: [2],
|
||||
2: [3, 5],
|
||||
3: [],
|
||||
5: [6],
|
||||
6: [1],
|
||||
7: []
|
||||
})
|
||||
self.assertEqual(graph.expand_outputs([]), [])
|
||||
self.assertEqual(graph.expand_outputs(['X3']), [])
|
||||
self.assertEqual(graph.expand_outputs(['X7']), [])
|
||||
self.assertEqual(graph.expand_outputs(['X2', 'X5']), ['X3', 'X6', 'X1'])
|
||||
self.assertEqual(graph.expand_outputs([3]), [])
|
||||
self.assertEqual(graph.expand_outputs([7]), [])
|
||||
self.assertEqual(graph.expand_outputs([2, 5]), [3, 6, 1])
|
||||
|
||||
def test_expand_inputs(self):
|
||||
graph = Graph({
|
||||
1: [2],
|
||||
2: [3, 5],
|
||||
3: [],
|
||||
5: [6],
|
||||
6: [1],
|
||||
7: []
|
||||
})
|
||||
self.assertEqual(graph.expand_inputs([]), [])
|
||||
self.assertEqual(graph.expand_inputs([1]), [6, 5, 2])
|
||||
self.assertEqual(graph.expand_inputs([7]), [])
|
||||
self.assertEqual(graph.expand_inputs([3]), [2, 1, 6, 5])
|
||||
self.assertEqual(graph.expand_inputs([2, 5]), [1, 6])
|
||||
|
||||
|
||||
def test_transitive_closure(self):
|
||||
graph = Graph({
|
||||
1: [2],
|
||||
2: [3, 5],
|
||||
3: [],
|
||||
5: [6],
|
||||
6: [],
|
||||
7: [6]
|
||||
})
|
||||
self.assertEqual(graph.transitive_closure(), {
|
||||
1: [2, 3, 5, 6],
|
||||
2: [3, 5, 6],
|
||||
3: [],
|
||||
5: [6],
|
||||
6: [],
|
||||
7: [6]
|
||||
})
|
||||
|
||||
def test_topological_order(self):
|
||||
self.assertEqual(Graph().topological_order(), [])
|
||||
graph = Graph({
|
||||
'X1': [],
|
||||
'X2': ['X1'],
|
||||
'X3': [],
|
||||
'X4': ['X3'],
|
||||
'X5': ['X6'],
|
||||
'X6': ['X1', 'X2']
|
||||
1: [],
|
||||
2: [1],
|
||||
3: [],
|
||||
4: [3],
|
||||
5: [6],
|
||||
6: [1, 2]
|
||||
})
|
||||
self.assertEqual(graph.topological_order(), ['X5', 'X6', 'X4', 'X3', 'X2', 'X1'])
|
||||
self.assertEqual(graph.topological_order(), [5, 6, 4, 3, 2, 1])
|
||||
|
||||
graph = Graph({
|
||||
'X1': ['X1'],
|
||||
'X2': ['X4'],
|
||||
'X3': ['X2'],
|
||||
'X4': [],
|
||||
'X5': ['X2'],
|
||||
1: [1],
|
||||
2: [4],
|
||||
3: [2],
|
||||
4: [],
|
||||
5: [2],
|
||||
})
|
||||
self.assertEqual(graph.topological_order(), ['X5', 'X3', 'X2', 'X4', 'X1'])
|
||||
self.assertEqual(graph.topological_order(), [5, 3, 2, 4, 1])
|
||||
|
||||
def test_sort_stable(self):
|
||||
graph = Graph({
|
||||
1: [2],
|
||||
2: [3, 5],
|
||||
3: [],
|
||||
5: [6],
|
||||
6: [],
|
||||
7: [6]
|
||||
})
|
||||
self.assertEqual(graph.sort_stable([]), [])
|
||||
self.assertEqual(graph.sort_stable([1]), [1])
|
||||
self.assertEqual(graph.sort_stable([1, 2]), [1, 2])
|
||||
self.assertEqual(graph.sort_stable([7, 2, 1]), [7, 1, 2])
|
||||
self.assertEqual(graph.sort_stable([2, 1, 7]), [1, 2, 7])
|
||||
self.assertEqual(graph.sort_stable([1, 2, 7]), [1, 2, 7])
|
||||
self.assertEqual(graph.sort_stable([2, 1, 3, 6, 7]), [1, 2, 3, 7, 6])
|
||||
self.assertEqual(graph.sort_stable([2, 1, 6, 7, 3]), [1, 2, 7, 6, 3])
|
||||
|
|
|
@ -9,6 +9,9 @@ from rest_framework.permissions import BasePermission, IsAuthenticated
|
|||
# Name for JSON inside Exteor files archive
|
||||
EXTEOR_INNER_FILENAME = 'document.json'
|
||||
|
||||
# Old style reference pattern
|
||||
_REF_OLD_PATTERN = re.compile(r'@{([^0-9\-][^\}\|\{]*?)\|([^\}\|\{]*?)\|([^\}\|\{]*?)}')
|
||||
|
||||
|
||||
class ObjectOwnerOrAdmin(BasePermission):
|
||||
''' Permission for object ownership restriction '''
|
||||
|
@ -47,6 +50,7 @@ class ItemOwnerOrAdmin(BasePermission):
|
|||
return False
|
||||
return request.user.is_staff # type: ignore
|
||||
|
||||
|
||||
def read_zipped_json(data, json_filename: str) -> dict:
|
||||
''' Read JSON from zipped data '''
|
||||
with ZipFile(data, 'r') as archive:
|
||||
|
@ -63,6 +67,7 @@ def write_zipped_json(json_data: dict, json_filename: str) -> bytes:
|
|||
archive.writestr(json_filename, data=data)
|
||||
return content.getvalue()
|
||||
|
||||
|
||||
def apply_pattern(text: str, mapping: dict[str, str], pattern: re.Pattern[str]) -> str:
|
||||
''' Apply mapping to matching in regular expression pattern subgroup 1 '''
|
||||
if text == '' or pattern == '':
|
||||
|
@ -79,7 +84,6 @@ def apply_pattern(text: str, mapping: dict[str, str], pattern: re.Pattern[str])
|
|||
output += text[pos_input : len(text)]
|
||||
return output
|
||||
|
||||
_REF_OLD_PATTERN = re.compile(r'@{([^0-9\-][^\}\|\{]*?)\|([^\}\|\{]*?)\|([^\}\|\{]*?)}')
|
||||
|
||||
def fix_old_references(text: str) -> str:
|
||||
''' Fix reference format: @{X1|nomn|sing} -> {X1|nomn,sing} '''
|
||||
|
@ -94,6 +98,7 @@ def fix_old_references(text: str) -> str:
|
|||
output += text[pos_input : len(text)]
|
||||
return output
|
||||
|
||||
|
||||
def filename_for_schema(alias: str) -> str:
|
||||
''' Generate filename for schema from alias. '''
|
||||
if alias == '' or not alias.isascii():
|
||||
|
|
|
@ -246,7 +246,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
status=c.HTTP_200_OK,
|
||||
data=s.RSFormParseSerializer(schema.item).data
|
||||
)
|
||||
|
||||
|
||||
@extend_schema(
|
||||
summary='restore order based on types and term graph',
|
||||
tags=['RSForm'],
|
||||
|
@ -261,8 +261,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
def restore_order(self, request: Request, pk):
|
||||
''' Endpoint: Restore order based on types and term graph. '''
|
||||
schema = self._get_schema()
|
||||
# TODO: implement reordering
|
||||
# schema.reset_aliases()
|
||||
schema.restore_order()
|
||||
return Response(
|
||||
status=c.HTTP_200_OK,
|
||||
data=s.RSFormParseSerializer(schema.item).data
|
||||
|
|
Loading…
Reference in New Issue
Block a user