R: Splitting Cached models pt1
Some checks failed
Backend CI / build (3.12) (push) Has been cancelled
Backend CI / notify-failure (push) Has been cancelled

This commit is contained in:
Ivan 2025-08-01 10:55:53 +03:00
parent a7428a4af4
commit ee3c3e59b5
28 changed files with 1437 additions and 1269 deletions

View File

@ -334,12 +334,12 @@ class TestLibraryViewset(EndpointTester):
@decl_endpoint('/api/library/{item}/clone', method='post')
def test_clone_rsform(self):
schema = RSForm(self.owned)
x12 = schema.insert_new(
x12 = schema.insert_last(
alias='X12',
term_raw='человек',
term_resolved='человек'
)
d2 = schema.insert_new(
d2 = schema.insert_last(
alias='D2',
term_raw='@{X12|plur}',
term_resolved='люди'

View File

@ -20,7 +20,7 @@ class TestVersionViews(EndpointTester):
self.owned_id = self.owned.model.pk
self.unowned = RSForm.create(title='Test2', alias='T2')
self.unowned_id = self.unowned.model.pk
self.x1 = self.owned.insert_new(
self.x1 = self.owned.insert_last(
alias='X1',
convention='testStart'
)
@ -44,7 +44,7 @@ class TestVersionViews(EndpointTester):
@decl_endpoint('/api/library/{schema}/create-version', method='post')
def test_create_version_filter(self):
x2 = self.owned.insert_new('X2')
x2 = self.owned.insert_last('X2')
data = {'version': '1.0.0', 'description': 'test', 'items': [x2.pk]}
response = self.executeCreated(data=data, schema=self.owned_id)
version = Version.objects.get(pk=response.data['version'])
@ -154,14 +154,14 @@ class TestVersionViews(EndpointTester):
@decl_endpoint('/api/versions/{version}/restore', method='patch')
def test_restore_version(self):
x1 = self.x1
x2 = self.owned.insert_new('X2')
d1 = self.owned.insert_new('D1', term_raw='TestTerm')
x2 = self.owned.insert_last('X2')
d1 = self.owned.insert_last('D1', term_raw='TestTerm')
data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
self.owned.delete_cst([d1])
x3 = self.owned.insert_new('X3')
x3 = self.owned.insert_last('X3')
x1.order = x3.order
x1.convention = 'Test2'
x1.term_raw = 'Test'

View File

@ -14,7 +14,7 @@ from rest_framework.request import Request
from rest_framework.response import Response
from apps.oss.models import Layout, Operation, OperationSchema, PropagationFacade
from apps.rsform.models import RSForm
from apps.rsform.models import RSFormCached
from apps.rsform.serializers import RSFormParseSerializer
from apps.users.models import User
from shared import permissions
@ -172,7 +172,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
clone.location = data.get('location', m.LocationHead.USER)
clone.save()
need_filter = 'items' in request.data and len(request.data['items']) > 0
for cst in RSForm(item).constituents():
for cst in RSFormCached(item).constituentsQ():
if not need_filter or cst.pk in request.data['items']:
cst.pk = None
cst.schema = clone

View File

@ -69,7 +69,7 @@ def export_file(request: Request, pk: int) -> HttpResponse:
version = m.Version.objects.get(pk=pk)
except m.Version.DoesNotExist:
return Response(status=c.HTTP_404_NOT_FOUND)
data = RSFormTRSSerializer(version.item).from_versioned_data(version.data)
data = RSFormTRSSerializer.load_versioned_data(version.data)
file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
filename = utils.filename_for_schema(data['alias'])
response = HttpResponse(file, content_type='application/zip')

View File

@ -12,7 +12,7 @@ from apps.rsform.models import (
INSERT_LAST,
Constituenta,
CstType,
RSForm,
RSFormCached,
extract_globals,
replace_entities,
replace_globals
@ -44,12 +44,6 @@ class OperationSchema:
Layout.objects.create(oss=model, data=[])
return OperationSchema(model)
@staticmethod
def from_id(pk: int) -> 'OperationSchema':
''' Get LibraryItem by pk. '''
model = LibraryItem.objects.get(pk=pk)
return OperationSchema(model)
def save(self, *args, **kwargs) -> None:
''' Save wrapper. '''
self.model.save(*args, **kwargs)
@ -205,8 +199,8 @@ class OperationSchema:
operation.save(update_fields=['alias', 'title', 'description'])
if schema is not None and has_children:
rsform = RSForm(schema)
self.after_create_cst(rsform, list(rsform.constituents().order_by('order')))
rsform = RSFormCached(schema)
self.after_create_cst(rsform, list(rsform.constituentsQ().order_by('order')))
self.save(update_fields=['time_update'])
def set_arguments(self, target: int, arguments: list[Operation]) -> None:
@ -281,9 +275,9 @@ class OperationSchema:
if len(added) > 0 or len(deleted) > 0:
self.save(update_fields=['time_update'])
def create_input(self, operation: Operation) -> RSForm:
def create_input(self, operation: Operation) -> RSFormCached:
''' Create input RSForm for given Operation. '''
schema = RSForm.create(
schema = RSFormCached.create(
owner=self.model.owner,
alias=operation.alias,
title=operation.title,
@ -312,8 +306,8 @@ class OperationSchema:
parents: dict = {}
children: dict = {}
for operand in schemas:
schema = RSForm(operand)
items = list(schema.constituents().order_by('order'))
schema = RSFormCached(operand)
items = list(schema.constituentsQ().order_by('order'))
new_items = receiver.insert_copy(items)
for (i, cst) in enumerate(new_items):
parents[cst.pk] = items[i]
@ -326,7 +320,7 @@ class OperationSchema:
translated_substitutions.append((original, replacement))
receiver.substitute(translated_substitutions)
for cst in receiver.constituents().order_by('order'):
for cst in receiver.constituentsQ().order_by('order'):
parent = parents.get(cst.pk)
assert parent is not None
Inheritance.objects.create(
@ -340,11 +334,11 @@ class OperationSchema:
receiver.resolve_all_text()
if len(self.cache.graph.outputs[operation.pk]) > 0:
self.after_create_cst(receiver, list(receiver.constituents().order_by('order')))
self.after_create_cst(receiver, list(receiver.constituentsQ().order_by('order')))
self.save(update_fields=['time_update'])
return True
def relocate_down(self, source: RSForm, destination: RSForm, items: list[Constituenta]):
def relocate_down(self, source: RSFormCached, destination: RSFormCached, items: list[Constituenta]):
''' Move list of Constituents to destination Schema inheritor. '''
self.cache.ensure_loaded()
self.cache.insert_schema(source)
@ -358,7 +352,8 @@ class OperationSchema:
self.cache.remove_inheritance(item)
Inheritance.objects.filter(operation_id=operation.pk, parent__in=items).delete()
def relocate_up(self, source: RSForm, destination: RSForm, items: list[Constituenta]) -> list[Constituenta]:
def relocate_up(self, source: RSFormCached, destination: RSFormCached,
items: list[Constituenta]) -> list[Constituenta]:
''' Move list of Constituents upstream to destination Schema. '''
self.cache.ensure_loaded()
self.cache.insert_schema(source)
@ -385,7 +380,7 @@ class OperationSchema:
return new_items
def after_create_cst(
self, source: RSForm,
self, source: RSFormCached,
cst_list: list[Constituenta],
exclude: Optional[list[int]] = None
) -> None:
@ -404,13 +399,13 @@ class OperationSchema:
operation = self.cache.get_operation(source.model.pk)
self._cascade_inherit_cst(operation.pk, source, cst_list, alias_mapping, exclude)
def after_change_cst_type(self, source: RSForm, target: Constituenta) -> None:
def after_change_cst_type(self, source: RSFormCached, target: Constituenta) -> None:
''' Trigger cascade resolutions when Constituenta type is changed. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
self._cascade_change_cst_type(operation.pk, target.pk, cast(CstType, target.cst_type))
def after_update_cst(self, source: RSForm, target: Constituenta, data: dict, old_data: dict) -> None:
def after_update_cst(self, source: RSFormCached, target: Constituenta, data: dict, old_data: dict) -> None:
''' Trigger cascade resolutions when Constituenta data is changed. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
@ -428,13 +423,13 @@ class OperationSchema:
mapping=alias_mapping
)
def before_delete_cst(self, source: RSForm, target: list[Constituenta]) -> None:
def before_delete_cst(self, source: RSFormCached, target: list[Constituenta]) -> None:
''' Trigger cascade resolutions before Constituents are deleted. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
self._cascade_delete_inherited(operation.pk, target)
def before_substitute(self, source: RSForm, substitutions: CstSubstitution) -> None:
def before_substitute(self, source: RSFormCached, substitutions: CstSubstitution) -> None:
''' Trigger cascade resolutions before Constituents are substituted. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
@ -461,14 +456,14 @@ class OperationSchema:
self._execute_inherit_cst(
target_operation=target.pk,
source=parent_schema,
items=list(parent_schema.constituents().order_by('order')),
items=list(parent_schema.constituentsQ().order_by('order')),
mapping={}
)
# pylint: disable=too-many-arguments, too-many-positional-arguments
def _cascade_inherit_cst(
self, target_operation: int,
source: RSForm,
source: RSFormCached,
items: list[Constituenta],
mapping: CstMapping,
exclude: Optional[list[int]] = None
@ -483,7 +478,7 @@ class OperationSchema:
def _execute_inherit_cst(
self,
target_operation: int,
source: RSForm,
source: RSFormCached,
items: list[Constituenta],
mapping: CstMapping
) -> None:
@ -604,7 +599,7 @@ class OperationSchema:
mapping: CstMapping,
target: list[int],
operation: int,
schema: RSForm
schema: RSFormCached
) -> None:
alias_mapping = OperationSchema._produce_alias_mapping(mapping)
schema.apply_partial_mapping(alias_mapping, target)
@ -635,7 +630,7 @@ class OperationSchema:
result[alias] = cst.alias
return result
def _transform_mapping(self, mapping: CstMapping, operation: Operation, schema: RSForm) -> CstMapping:
def _transform_mapping(self, mapping: CstMapping, operation: Operation, schema: RSFormCached) -> CstMapping:
if len(mapping) == 0:
return mapping
result: CstMapping = {}
@ -655,8 +650,8 @@ class OperationSchema:
def _determine_insert_position(
self, prototype_id: int,
operation: Operation,
source: RSForm,
destination: RSForm
source: RSFormCached,
destination: RSFormCached
) -> int:
''' Determine insert_after for new constituenta. '''
prototype = source.cache.by_id[prototype_id]
@ -705,7 +700,7 @@ class OperationSchema:
self,
target: CstSubstitution,
operation: int,
schema: RSForm
schema: RSFormCached
) -> CstSubstitution:
result: CstSubstitution = []
for current_sub in target:
@ -742,7 +737,7 @@ class OperationSchema:
result.append((schema.cache.by_id[new_original_id], schema.cache.by_id[new_substitution_id]))
return result
def _undo_substitutions_cst(self, target: list[Constituenta], operation: Operation, schema: RSForm) -> None:
def _undo_substitutions_cst(self, target: list[Constituenta], operation: Operation, schema: RSFormCached) -> None:
target_ids = [cst.pk for cst in target]
to_process = []
for sub in self.cache.substitutions[operation.pk]:
@ -753,7 +748,7 @@ class OperationSchema:
def _undo_substitution(
self,
schema: RSForm,
schema: RSFormCached,
target: Substitution,
ignore_parents: Optional[list[int]] = None
) -> None:
@ -788,7 +783,7 @@ class OperationSchema:
mapping = {substitution_inheritor.alias: new_original}
self._cascade_partial_mapping(mapping, dependant, operation_id, schema)
def _process_added_substitutions(self, schema: Optional[RSForm], added: list[Substitution]) -> None:
def _process_added_substitutions(self, schema: Optional[RSFormCached], added: list[Substitution]) -> None:
if len(added) == 0:
return
if schema is None:
@ -816,8 +811,8 @@ class OssCache:
def __init__(self, oss: OperationSchema):
self._oss = oss
self._schemas: list[RSForm] = []
self._schema_by_id: dict[int, RSForm] = {}
self._schemas: list[RSFormCached] = []
self._schema_by_id: dict[int, RSFormCached] = {}
self.operations = list(oss.operations().only('result_id'))
self.operation_by_id = {operation.pk: operation for operation in self.operations}
@ -844,14 +839,14 @@ class OssCache:
for item in self._oss.inheritance().only('operation_id', 'parent_id', 'child_id'):
self.inheritance[item.operation_id].append(item)
def get_schema(self, operation: Operation) -> Optional[RSForm]:
def get_schema(self, operation: Operation) -> Optional[RSFormCached]:
''' Get schema by Operation. '''
if operation.result_id is None:
return None
if operation.result_id in self._schema_by_id:
return self._schema_by_id[operation.result_id]
else:
schema = RSForm.from_id(operation.result_id)
schema = RSFormCached.from_id(operation.result_id)
schema.cache.ensure_loaded()
self._insert_new(schema)
return schema
@ -885,7 +880,7 @@ class OssCache:
return self.get_inheritor(sub.substitution_id, operation)
return self.get_inheritor(parent_cst, operation)
def insert_schema(self, schema: RSForm) -> None:
def insert_schema(self, schema: RSFormCached) -> None:
''' Insert new schema. '''
if not self._schema_by_id.get(schema.model.pk):
schema.cache.ensure_loaded()
@ -924,7 +919,7 @@ class OssCache:
for item in inherit_to_delete:
self.inheritance[operation].remove(item)
def remove_schema(self, schema: RSForm) -> None:
def remove_schema(self, schema: RSFormCached) -> None:
''' Remove schema from cache. '''
self._schemas.remove(schema)
del self._schema_by_id[schema.model.pk]
@ -954,7 +949,7 @@ class OssCache:
''' Remove inheritance from cache. '''
self.inheritance[target.operation_id].remove(target)
def unfold_sub(self, sub: Substitution) -> tuple[RSForm, RSForm, Constituenta, Constituenta]:
def unfold_sub(self, sub: Substitution) -> tuple[RSFormCached, RSFormCached, Constituenta, Constituenta]:
''' Unfold substitution into original and substitution forms. '''
operation = self.operation_by_id[sub.operation_id]
parents = self.graph.inputs[operation.pk]
@ -976,6 +971,6 @@ class OssCache:
raise ValueError(f'Parent schema for Substitution-{sub.pk} not found.')
return original_schema, substitution_schema, original_cst, substitution_cst
def _insert_new(self, schema: RSForm) -> None:
def _insert_new(self, schema: RSFormCached) -> None:
self._schemas.append(schema)
self._schema_by_id[schema.model.pk] = schema

View File

@ -2,7 +2,7 @@
from typing import Optional
from apps.library.models import LibraryItem, LibraryItemType
from apps.rsform.models import Constituenta, RSForm
from apps.rsform.models import Constituenta, RSFormCached
from .OperationSchema import CstSubstitution, OperationSchema
@ -16,7 +16,8 @@ class PropagationFacade:
''' Change propagation API. '''
@staticmethod
def after_create_cst(source: RSForm, new_cst: list[Constituenta], exclude: Optional[list[int]] = None) -> None:
def after_create_cst(source: RSFormCached, new_cst: list[Constituenta],
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions when new constituenta is created. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
@ -24,7 +25,7 @@ class PropagationFacade:
OperationSchema(host).after_create_cst(source, new_cst)
@staticmethod
def after_change_cst_type(source: RSForm, target: Constituenta, exclude: Optional[list[int]] = None) -> None:
def after_change_cst_type(source: RSFormCached, target: Constituenta, exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions when constituenta type is changed. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
@ -33,7 +34,7 @@ class PropagationFacade:
@staticmethod
def after_update_cst(
source: RSForm,
source: RSFormCached,
target: Constituenta,
data: dict,
old_data: dict,
@ -46,7 +47,8 @@ class PropagationFacade:
OperationSchema(host).after_update_cst(source, target, data, old_data)
@staticmethod
def before_delete_cst(source: RSForm, target: list[Constituenta], exclude: Optional[list[int]] = None) -> None:
def before_delete_cst(source: RSFormCached, target: list[Constituenta],
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions before constituents are deleted. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
@ -54,7 +56,8 @@ class PropagationFacade:
OperationSchema(host).before_delete_cst(source, target)
@staticmethod
def before_substitute(source: RSForm, substitutions: CstSubstitution, exclude: Optional[list[int]] = None) -> None:
def before_substitute(source: RSFormCached, substitutions: CstSubstitution,
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions before constituents are substituted. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
@ -70,5 +73,5 @@ class PropagationFacade:
if len(hosts) == 0:
return
schema = RSForm(item)
PropagationFacade.before_delete_cst(schema, list(schema.constituents().order_by('order')), exclude)
schema = RSFormCached(item)
PropagationFacade.before_delete_cst(schema, list(schema.constituentsQ().order_by('order')), exclude)

View File

@ -20,8 +20,8 @@ class TestInheritance(TestCase):
operation_type=OperationType.INPUT,
result=self.ks1.model
)
self.ks1_x1 = self.ks1.insert_new('X1')
self.ks2_x1 = self.ks2.insert_new('X1')
self.ks1_x1 = self.ks1.insert_last('X1')
self.ks2_x1 = self.ks2.insert_last('X1')
self.inheritance = Inheritance.objects.create(
operation=self.operation,
parent=self.ks1_x1,

View File

@ -3,7 +3,6 @@ from django.test import TestCase
from apps.library.models import LibraryItem, LibraryItemType
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm
class TestOperation(TestCase):

View File

@ -15,9 +15,9 @@ class TestSynthesisSubstitution(TestCase):
self.oss = OperationSchema.create(alias='T1')
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1X1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks1X1 = self.ks1.insert_last('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2X1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.ks2X1 = self.ks2.insert_last('X2', term_resolved='X1_2')
self.operation1 = Operation.objects.create(
oss=self.oss.model,

View File

@ -22,16 +22,16 @@ class TestChangeConstituents(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X4')
self.ks1X2 = self.ks1.insert_new('X5')
self.ks1X1 = self.ks1.insert_last('X4')
self.ks1X2 = self.ks1.insert_last('X5')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1')
self.ks2D1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_last('X1')
self.ks2D1 = self.ks2.insert_last(
alias='D1',
definition_formal=r'X1\X1'
)
@ -55,7 +55,7 @@ class TestChangeConstituents(EndpointTester):
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
self.assertEqual(self.ks3.constituents().count(), 4)
self.assertEqual(self.ks3.constituentsQ().count(), 4)
self.layout_data = [
{'nodeID': 'o' + str(self.operation1.pk), 'x': 0, 'y': 0, 'width': 150, 'height': 40},
@ -105,8 +105,8 @@ class TestChangeConstituents(EndpointTester):
response = self.executeCreated(data=data, schema=self.ks1.model.pk)
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 5)
self.assertEqual(self.ks1.constituentsQ().count(), 3)
self.assertEqual(self.ks3.constituentsQ().count(), 5)
self.assertEqual(inherited_cst.alias, 'X4')
self.assertEqual(inherited_cst.order, 2)
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
@ -114,7 +114,7 @@ class TestChangeConstituents(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_constituenta(self):
d2 = self.ks3.insert_new('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
data = {
'target': self.ks1X1.pk,
'item_data': {
@ -148,15 +148,15 @@ class TestChangeConstituents(EndpointTester):
response = self.executeOK(data=data, schema=self.ks2.model.pk)
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
self.ks2D1.refresh_from_db()
self.assertEqual(self.ks2.constituents().count(), 1)
self.assertEqual(self.ks3.constituents().count(), 3)
self.assertEqual(self.ks2.constituentsQ().count(), 1)
self.assertEqual(self.ks3.constituentsQ().count(), 3)
self.assertEqual(self.ks2D1.definition_formal, r'DEL\DEL')
self.assertEqual(inherited_cst.definition_formal, r'DEL\DEL')
@decl_endpoint('/api/rsforms/{schema}/substitute', method='patch')
def test_substitute(self):
d2 = self.ks3.insert_new('D2', cst_type=CstType.TERM, definition_formal=r'X1\X2\X3')
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_formal=r'X1\X2\X3')
data = {'substitutions': [{
'original': self.ks1X1.pk,
'substitution': self.ks1X2.pk
@ -164,7 +164,7 @@ class TestChangeConstituents(EndpointTester):
self.executeOK(data=data, schema=self.ks1.model.pk)
self.ks1X2.refresh_from_db()
d2.refresh_from_db()
self.assertEqual(self.ks1.constituents().count(), 1)
self.assertEqual(self.ks3.constituents().count(), 4)
self.assertEqual(self.ks1.constituentsQ().count(), 1)
self.assertEqual(self.ks3.constituentsQ().count(), 4)
self.assertEqual(self.ks1X2.order, 0)
self.assertEqual(d2.definition_formal, r'X2\X2\X3')

View File

@ -22,18 +22,18 @@ class TestChangeOperations(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X1', convention='KS1X1')
self.ks1X2 = self.ks1.insert_new('X2', convention='KS1X2')
self.ks1D1 = self.ks1.insert_new('D1', definition_formal='X1 X2', convention='KS1D1')
self.ks1X1 = self.ks1.insert_last('X1', convention='KS1X1')
self.ks1X2 = self.ks1.insert_last('X2', convention='KS1X2')
self.ks1D1 = self.ks1.insert_last('D1', definition_formal='X1 X2', convention='KS1D1')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1', convention='KS2X1')
self.ks2X2 = self.ks2.insert_new('X2', convention='KS2X2')
self.ks2S1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_last('X1', convention='KS2X1')
self.ks2X2 = self.ks2.insert_last('X2', convention='KS2X2')
self.ks2S1 = self.ks2.insert_last(
alias='S1',
definition_formal=r'X1',
convention='KS2S1'
@ -44,8 +44,8 @@ class TestChangeOperations(EndpointTester):
title='Test3',
owner=self.user
)
self.ks3X1 = self.ks3.insert_new('X1', convention='KS3X1')
self.ks3D1 = self.ks3.insert_new(
self.ks3X1 = self.ks3.insert_last('X1', convention='KS3X1')
self.ks3D1 = self.ks3.insert_last(
alias='D1',
definition_formal='X1 X1',
convention='KS3D1'
@ -82,7 +82,7 @@ class TestChangeOperations(EndpointTester):
self.ks4X1 = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
self.ks4S1 = Constituenta.objects.get(as_child__parent_id=self.ks2S1.pk)
self.ks4D1 = Constituenta.objects.get(as_child__parent_id=self.ks1D1.pk)
self.ks4D2 = self.ks4.insert_new(
self.ks4D2 = self.ks4.insert_last(
alias='D2',
definition_formal=r'X1 X2 X3 S1 D1',
convention='KS4D2'
@ -100,7 +100,7 @@ class TestChangeOperations(EndpointTester):
self.owned.execute_operation(self.operation5)
self.operation5.refresh_from_db()
self.ks5 = RSForm(self.operation5.result)
self.ks5D4 = self.ks5.insert_new(
self.ks5D4 = self.ks5.insert_last(
alias='D4',
definition_formal=r'X1 X2 X3 S1 D1 D2 D3',
convention='KS5D4'
@ -119,11 +119,11 @@ class TestChangeOperations(EndpointTester):
def test_oss_setup(self):
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks2.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 2)
self.assertEqual(self.ks4.constituents().count(), 6)
self.assertEqual(self.ks5.constituents().count(), 8)
self.assertEqual(self.ks1.constituentsQ().count(), 3)
self.assertEqual(self.ks2.constituentsQ().count(), 3)
self.assertEqual(self.ks3.constituentsQ().count(), 2)
self.assertEqual(self.ks4.constituentsQ().count(), 6)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
self.assertEqual(self.ks4D1.definition_formal, 'S1 X1')
@ -141,8 +141,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 4)
self.assertEqual(self.ks5.constituents().count(), 6)
self.assertEqual(self.ks4.constituentsQ().count(), 4)
self.assertEqual(self.ks5.constituentsQ().count(), 6)
self.assertEqual(self.ks4D1.definition_formal, r'X4 X1')
self.assertEqual(self.ks4D2.definition_formal, r'X1 DEL DEL DEL D1')
self.assertEqual(self.ks5D4.definition_formal, r'DEL DEL X3 DEL D1 D2 D3')
@ -165,8 +165,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 4)
self.assertEqual(self.ks5.constituents().count(), 6)
self.assertEqual(self.ks4.constituentsQ().count(), 4)
self.assertEqual(self.ks5.constituentsQ().count(), 6)
self.assertEqual(self.ks4D1.definition_formal, r'X4 X1')
self.assertEqual(self.ks4D2.definition_formal, r'X1 DEL DEL DEL D1')
self.assertEqual(self.ks5D4.definition_formal, r'DEL DEL X3 DEL D1 D2 D3')
@ -179,9 +179,9 @@ class TestChangeOperations(EndpointTester):
title='Test6',
owner=self.user
)
ks6X1 = ks6.insert_new('X1', convention='KS6X1')
ks6X2 = ks6.insert_new('X2', convention='KS6X2')
ks6D1 = ks6.insert_new('D1', definition_formal='X1 X2', convention='KS6D1')
ks6X1 = ks6.insert_last('X1', convention='KS6X1')
ks6X2 = ks6.insert_last('X2', convention='KS6X2')
ks6D1 = ks6.insert_last('D1', definition_formal='X1 X2', convention='KS6D1')
data = {
'layout': self.layout_data,
@ -200,8 +200,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 7)
self.assertEqual(self.ks5.constituents().count(), 9)
self.assertEqual(self.ks4.constituentsQ().count(), 7)
self.assertEqual(self.ks5.constituentsQ().count(), 9)
self.assertEqual(ks4Dks6.definition_formal, r'X5 X6')
self.assertEqual(self.ks4D1.definition_formal, r'X4 X1')
self.assertEqual(self.ks4D2.definition_formal, r'X1 DEL DEL DEL D1')
@ -219,8 +219,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 0)
self.assertEqual(self.ks4.constituents().count(), 4)
self.assertEqual(self.ks5.constituents().count(), 7)
self.assertEqual(self.ks4.constituentsQ().count(), 4)
self.assertEqual(self.ks5.constituentsQ().count(), 7)
self.assertEqual(self.ks4D2.definition_formal, r'DEL X2 X3 S1 DEL')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 DEL D2 D3')
@ -241,8 +241,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 0)
self.assertEqual(self.ks4.constituents().count(), 4)
self.assertEqual(self.ks5.constituents().count(), 7)
self.assertEqual(self.ks4.constituentsQ().count(), 4)
self.assertEqual(self.ks5.constituentsQ().count(), 7)
self.assertEqual(self.ks4D2.definition_formal, r'DEL X2 X3 S1 DEL')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 DEL D2 D3')
@ -263,8 +263,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 6)
self.assertEqual(self.ks5.constituents().count(), 8)
self.assertEqual(self.ks4.constituentsQ().count(), 6)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
self.assertEqual(self.ks4D2.definition_formal, r'X1 X2 X3 S1 D1')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 D1 D2 D3')
@ -286,9 +286,9 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks4.constituents().count(), 6)
self.assertEqual(self.ks5.constituents().count(), 8)
self.assertEqual(self.ks1.constituentsQ().count(), 3)
self.assertEqual(self.ks4.constituentsQ().count(), 6)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
self.assertEqual(self.ks4D2.definition_formal, r'X1 X2 X3 S1 D1')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 D1 D2 D3')
self.assertFalse(Constituenta.objects.filter(as_child__parent_id=self.ks1D1.pk).exists())
@ -324,8 +324,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 2)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 5)
self.assertEqual(self.ks5.constituents().count(), 7)
self.assertEqual(self.ks4.constituentsQ().count(), 5)
self.assertEqual(self.ks5.constituentsQ().count(), 7)
self.assertEqual(self.ks4D2.definition_formal, r'X1 D1 X3 S1 D1')
self.assertEqual(self.ks5D4.definition_formal, r'D1 X2 X3 S1 D1 D2 D3')
@ -350,8 +350,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 4)
self.assertEqual(self.ks5.constituents().count(), 6)
self.assertEqual(self.ks4.constituentsQ().count(), 4)
self.assertEqual(self.ks5.constituentsQ().count(), 6)
self.assertEqual(self.ks4D2.definition_formal, r'X1 DEL DEL DEL D1')
self.assertEqual(self.ks5D4.definition_formal, r'DEL DEL X3 DEL D1 D2 D3')
@ -363,8 +363,8 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks4.constituents().count(), 7)
self.assertEqual(self.ks5.constituents().count(), 9)
self.assertEqual(self.ks4.constituentsQ().count(), 7)
self.assertEqual(self.ks5.constituentsQ().count(), 9)
self.assertEqual(self.ks4D2.definition_formal, r'X1 DEL DEL DEL D1')
self.assertEqual(self.ks5D4.definition_formal, r'DEL DEL X3 DEL D1 D2 D3')
@ -375,7 +375,7 @@ class TestChangeOperations(EndpointTester):
self.operation4.refresh_from_db()
self.ks5.refresh_from_db()
self.assertEqual(self.operation4.result, None)
self.assertEqual(self.ks5.constituents().count(), 3)
self.assertEqual(self.ks5.constituentsQ().count(), 3)
data = {
'target': self.operation4.pk,
@ -385,13 +385,13 @@ class TestChangeOperations(EndpointTester):
self.operation4.refresh_from_db()
self.ks5.refresh_from_db()
self.assertNotEqual(self.operation4.result, None)
self.assertEqual(self.ks5.constituents().count(), 8)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
@decl_endpoint('/api/oss/relocate-constituents', method='post')
def test_relocate_constituents_up(self):
ks1_old_count = self.ks1.constituents().count()
ks4_old_count = self.ks4.constituents().count()
ks1_old_count = self.ks1.constituentsQ().count()
ks4_old_count = self.ks4.constituentsQ().count()
operation6 = self.owned.create_operation(
alias='6',
operation_type=OperationType.SYNTHESIS
@ -400,8 +400,8 @@ class TestChangeOperations(EndpointTester):
self.owned.execute_operation(operation6)
operation6.refresh_from_db()
ks6 = RSForm(operation6.result)
ks6A1 = ks6.insert_new('A1')
ks6_old_count = ks6.constituents().count()
ks6A1 = ks6.insert_last('A1')
ks6_old_count = ks6.constituentsQ().count()
data = {
'destination': self.ks1.model.pk,
@ -413,15 +413,15 @@ class TestChangeOperations(EndpointTester):
self.ks1.refresh_from_db()
self.ks4.refresh_from_db()
self.assertEqual(ks6.constituents().count(), ks6_old_count)
self.assertEqual(self.ks1.constituents().count(), ks1_old_count + 1)
self.assertEqual(self.ks4.constituents().count(), ks4_old_count + 1)
self.assertEqual(ks6.constituentsQ().count(), ks6_old_count)
self.assertEqual(self.ks1.constituentsQ().count(), ks1_old_count + 1)
self.assertEqual(self.ks4.constituentsQ().count(), ks4_old_count + 1)
@decl_endpoint('/api/oss/relocate-constituents', method='post')
def test_relocate_constituents_down(self):
ks1_old_count = self.ks1.constituents().count()
ks4_old_count = self.ks4.constituents().count()
ks1_old_count = self.ks1.constituentsQ().count()
ks4_old_count = self.ks4.constituentsQ().count()
operation6 = self.owned.create_operation(
alias='6',
@ -431,7 +431,7 @@ class TestChangeOperations(EndpointTester):
self.owned.execute_operation(operation6)
operation6.refresh_from_db()
ks6 = RSForm(operation6.result)
ks6_old_count = ks6.constituents().count()
ks6_old_count = ks6.constituentsQ().count()
data = {
'destination': ks6.model.pk,
@ -445,8 +445,8 @@ class TestChangeOperations(EndpointTester):
self.ks4D2.refresh_from_db()
self.ks5D4.refresh_from_db()
self.assertEqual(ks6.constituents().count(), ks6_old_count)
self.assertEqual(self.ks1.constituents().count(), ks1_old_count - 1)
self.assertEqual(self.ks4.constituents().count(), ks4_old_count - 1)
self.assertEqual(ks6.constituentsQ().count(), ks6_old_count)
self.assertEqual(self.ks1.constituentsQ().count(), ks1_old_count - 1)
self.assertEqual(self.ks4.constituentsQ().count(), ks4_old_count - 1)
self.assertEqual(self.ks4D2.definition_formal, r'DEL X2 X3 S1 D1')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 D1 D2 D3')

View File

@ -23,18 +23,18 @@ class TestChangeSubstitutions(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X1', convention='KS1X1')
self.ks1X2 = self.ks1.insert_new('X2', convention='KS1X2')
self.ks1D1 = self.ks1.insert_new('D1', definition_formal='X1 X2', convention='KS1D1')
self.ks1X1 = self.ks1.insert_last('X1', convention='KS1X1')
self.ks1X2 = self.ks1.insert_last('X2', convention='KS1X2')
self.ks1D1 = self.ks1.insert_last('D1', definition_formal='X1 X2', convention='KS1D1')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1', convention='KS2X1')
self.ks2X2 = self.ks2.insert_new('X2', convention='KS2X2')
self.ks2S1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_last('X1', convention='KS2X1')
self.ks2X2 = self.ks2.insert_last('X2', convention='KS2X2')
self.ks2S1 = self.ks2.insert_last(
alias='S1',
definition_formal=r'X1',
convention='KS2S1'
@ -45,8 +45,8 @@ class TestChangeSubstitutions(EndpointTester):
title='Test3',
owner=self.user
)
self.ks3X1 = self.ks3.insert_new('X1', convention='KS3X1')
self.ks3D1 = self.ks3.insert_new(
self.ks3X1 = self.ks3.insert_last('X1', convention='KS3X1')
self.ks3D1 = self.ks3.insert_last(
alias='D1',
definition_formal='X1 X1',
convention='KS3D1'
@ -83,7 +83,7 @@ class TestChangeSubstitutions(EndpointTester):
self.ks4X1 = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
self.ks4S1 = Constituenta.objects.get(as_child__parent_id=self.ks2S1.pk)
self.ks4D1 = Constituenta.objects.get(as_child__parent_id=self.ks1D1.pk)
self.ks4D2 = self.ks4.insert_new(
self.ks4D2 = self.ks4.insert_last(
alias='D2',
definition_formal=r'X1 X2 X3 S1 D1',
convention='KS4D2'
@ -101,7 +101,7 @@ class TestChangeSubstitutions(EndpointTester):
self.owned.execute_operation(self.operation5)
self.operation5.refresh_from_db()
self.ks5 = RSForm(self.operation5.result)
self.ks5D4 = self.ks5.insert_new(
self.ks5D4 = self.ks5.insert_last(
alias='D4',
definition_formal=r'X1 X2 X3 S1 D1 D2 D3',
convention='KS5D4'
@ -120,11 +120,11 @@ class TestChangeSubstitutions(EndpointTester):
def test_oss_setup(self):
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks2.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 2)
self.assertEqual(self.ks4.constituents().count(), 6)
self.assertEqual(self.ks5.constituents().count(), 8)
self.assertEqual(self.ks1.constituentsQ().count(), 3)
self.assertEqual(self.ks2.constituentsQ().count(), 3)
self.assertEqual(self.ks3.constituentsQ().count(), 2)
self.assertEqual(self.ks4.constituentsQ().count(), 6)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
self.assertEqual(self.ks4D1.definition_formal, 'S1 X1')
@ -186,7 +186,7 @@ class TestChangeSubstitutions(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks5.constituents().count(), 7)
self.assertEqual(self.ks5.constituentsQ().count(), 7)
self.assertEqual(self.ks4D2.definition_formal, r'X1 X2 X3 S1 DEL')
self.assertEqual(self.ks5D4.definition_formal, r'X1 X2 X3 S1 DEL D2 D3')
@ -202,7 +202,7 @@ class TestChangeSubstitutions(EndpointTester):
self.assertEqual(subs1_2.count(), 0)
subs3_4 = self.operation5.getQ_substitutions()
self.assertEqual(subs3_4.count(), 1)
self.assertEqual(self.ks5.constituents().count(), 7)
self.assertEqual(self.ks5.constituentsQ().count(), 7)
self.assertEqual(self.ks4D1.definition_formal, r'X4 X1')
self.assertEqual(self.ks4D2.definition_formal, r'X1 X2 DEL DEL D1')
self.assertEqual(self.ks5D4.definition_formal, r'X1 DEL X3 DEL D1 D2 D3')

View File

@ -1,7 +1,6 @@
''' Testing API: Operation Schema - blocks manipulation. '''
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import Constituenta, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint

View File

@ -24,7 +24,7 @@ class TestOssOperations(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new(
self.ks1X1 = self.ks1.insert_last(
'X1',
term_raw='X1_1',
term_resolved='X1_1'
@ -34,7 +34,7 @@ class TestOssOperations(EndpointTester):
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_last(
'X2',
term_raw='X1_2',
term_resolved='X1_2'
@ -167,7 +167,7 @@ class TestOssOperations(EndpointTester):
self.assertEqual(new_schema.alias, new_operation['alias'])
self.assertEqual(new_schema.title, new_operation['title'])
self.assertEqual(new_schema.description, new_operation['description'])
self.assertEqual(self.ks1.constituents().count(), RSForm(new_schema).constituents().count())
self.assertEqual(self.ks1.constituentsQ().count(), RSForm(new_schema).constituentsQ().count())
unrelated_data = dict(data)
unrelated_data['source_operation'] = self.unowned_operation.pk
@ -446,7 +446,7 @@ class TestOssOperations(EndpointTester):
self.executeBadData(item=self.owned_id)
ks3 = RSForm.create(alias='KS3', title='Test3', owner=self.user)
ks3x1 = ks3.insert_new('X1', term_resolved='X1_1')
ks3x1 = ks3.insert_last('X1', term_resolved='X1_1')
data = {
'target': self.operation3.pk,
@ -530,7 +530,7 @@ class TestOssOperations(EndpointTester):
def test_update_operation_invalid_substitution(self):
self.populateData()
self.ks1X2 = self.ks1.insert_new('X2')
self.ks1X2 = self.ks1.insert_last('X2')
data = {
'target': self.operation3.pk,
@ -583,7 +583,7 @@ class TestOssOperations(EndpointTester):
self.assertEqual(schema.description, self.operation3.description)
self.assertEqual(schema.title, self.operation3.title)
self.assertEqual(schema.visible, False)
items = list(RSForm(schema).constituents())
items = list(RSForm(schema).constituentsQ())
self.assertEqual(len(items), 1)
self.assertEqual(items[0].alias, 'X1')
self.assertEqual(items[0].term_resolved, self.ks2X1.term_resolved)

View File

@ -25,7 +25,7 @@ class TestOssViewset(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new(
self.ks1X1 = self.ks1.insert_last(
'X1',
term_raw='X1_1',
term_resolved='X1_1'
@ -35,7 +35,7 @@ class TestOssViewset(EndpointTester):
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_last(
'X2',
term_raw='X1_2',
term_resolved='X1_2'
@ -148,7 +148,7 @@ class TestOssViewset(EndpointTester):
@decl_endpoint('/api/oss/get-predecessor', method='post')
def test_get_predecessor(self):
self.populateData()
self.ks1X2 = self.ks1.insert_new('X2')
self.ks1X2 = self.ks1.insert_last('X2')
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
@ -223,13 +223,13 @@ class TestOssViewset(EndpointTester):
@decl_endpoint('/api/oss/relocate-constituents', method='post')
def test_relocate_constituents(self):
self.populateData()
self.ks1X2 = self.ks1.insert_new('X2', convention='test')
self.ks1X2 = self.ks1.insert_last('X2', convention='test')
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
self.ks3X2 = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
self.ks3X10 = self.ks3.insert_new('X10', convention='test2')
self.ks3X10 = self.ks3.insert_last('X10', convention='test2')
# invalid destination
data = {

View File

@ -14,7 +14,7 @@ from rest_framework.response import Response
from apps.library.models import LibraryItem, LibraryItemType
from apps.library.serializers import LibraryItemSerializer
from apps.rsform.models import Constituenta, RSForm
from apps.rsform.models import Constituenta, RSFormCached
from apps.rsform.serializers import CstTargetSerializer
from shared import messages as msg
from shared import permissions
@ -25,7 +25,6 @@ from .. import serializers as s
def _create_clone(prototype: LibraryItem, operation: m.Operation, oss: LibraryItem) -> LibraryItem:
''' Create clone of prototype schema for operation. '''
prototype_schema = RSForm(prototype)
clone = deepcopy(prototype)
clone.pk = None
clone.owner = oss.owner
@ -37,7 +36,7 @@ def _create_clone(prototype: LibraryItem, operation: m.Operation, oss: LibraryIt
clone.access_policy = oss.access_policy
clone.location = oss.location
clone.save()
for cst in prototype_schema.constituents():
for cst in Constituenta.objects.filter(schema_id=prototype.pk):
cst_copy = deepcopy(cst)
cst_copy.pk = None
cst_copy.schema = clone
@ -359,16 +358,17 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
source = cast(m.Operation, serializer.validated_data['source_operation'])
alias = '+' + source.alias
title = '+' + source.title
source_schema = RSForm(cast(LibraryItem, source.result))
source_schema = cast(LibraryItem, source.result)
constituents = Constituenta.objects.filter(schema_id=source_schema.pk)
new_schema = deepcopy(source_schema.model)
new_schema = source_schema
new_schema.pk = None
new_schema.owner = oss.model.owner
new_schema.title = title
new_schema.alias = alias
new_schema.save()
for cst in source_schema.constituents():
for cst in constituents:
cst.pk = None
cst.schema = new_schema
cst.save()
@ -862,8 +862,8 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
data = serializer.validated_data
oss = m.OperationSchema(LibraryItem.objects.get(pk=data['oss']))
source = RSForm(LibraryItem.objects.get(pk=data['source']))
destination = RSForm(LibraryItem.objects.get(pk=data['destination']))
source = RSFormCached(LibraryItem.objects.get(pk=data['source']))
destination = RSFormCached(LibraryItem.objects.get(pk=data['destination']))
with transaction.atomic():
if data['move_down']:

View File

@ -1,37 +1,28 @@
''' Models: RSForm API. '''
from copy import deepcopy
from typing import Iterable, Optional, cast
# pylint: disable=duplicate-code
from cctext import Entity, Resolver, TermForm, extract_entities, split_grams
from typing import Iterable, Optional
from cctext import Entity, Resolver, TermForm, split_grams
from django.core.exceptions import ValidationError
from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType, Version
from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
generate_structure,
get_type_prefix,
guess_type,
infer_template,
is_base_set,
is_functional,
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType, extract_globals
from .api_RSLanguage import guess_type
from .Constituenta import Constituenta, CstType
INSERT_LAST: int = -1
DELETED_ALIAS = 'DEL'
class RSForm:
''' RSForm is math form of conceptual schema. '''
''' RSForm wrapper. No caching, each mutation requires querying. '''
def __init__(self, model: LibraryItem):
assert model.item_type == LibraryItemType.RSFORM
self.model = model
self.cache: RSFormCache = RSFormCache(self)
@staticmethod
def create(**kwargs) -> 'RSForm':
@ -40,40 +31,11 @@ class RSForm:
return RSForm(model)
@staticmethod
def from_id(pk: int) -> 'RSForm':
''' Get LibraryItem by pk. '''
model = LibraryItem.objects.get(pk=pk)
return RSForm(model)
def get_dependant(self, target: Iterable[int]) -> set[int]:
''' Get list of constituents depending on target (only 1st degree). '''
result: set[int] = set()
terms = self._graph_term()
formal = self._graph_formal()
definitions = self._graph_text()
for cst_id in target:
result.update(formal.outputs[cst_id])
result.update(terms.outputs[cst_id])
result.update(definitions.outputs[cst_id])
return result
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
self.cache.is_loaded = False
def constituents(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.model)
def resolver(self) -> Resolver:
def spawn_resolver(schemaID: int) -> Resolver:
''' Create resolver for text references based on schema terms. '''
result = Resolver({})
for cst in self.constituents().only('alias', 'term_resolved', 'term_forms'):
constituents = Constituenta.objects.filter(schema_id=schemaID).only('alias', 'term_resolved', 'term_forms')
for cst in constituents:
entity = Entity(
alias=cst.alias,
nominal=cst.term_resolved,
@ -85,101 +47,30 @@ class RSForm:
result.context[cst.alias] = entity
return result
def semantic(self) -> 'SemanticInfo':
''' Access semantic information on constituents. '''
return SemanticInfo(self)
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
def after_term_change(self, changed: list[int]) -> None:
''' Trigger cascade resolutions when term changes. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
update_list: list[Constituenta] = []
resolver = self.resolver()
if len(expansion) > 0:
for cst_id in graph_terms.topological_order():
if cst_id not in expansion:
continue
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
if resolved == resolver.context[cst.alias].get_nominal():
continue
cst.set_term_resolved(resolved)
update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
graph_defs = self._graph_text()
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0:
return
for cst_id in update_defs:
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def constituentsQ(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.model)
def get_max_index(self, cst_type: str) -> int:
''' Get maximum alias index for specific CstType. '''
result: int = 0
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = Constituenta.objects \
.filter(schema=self.model, cst_type=cst_type) \
.only('alias')
else:
cst_list = [cst for cst in self.cache.constituents if cst.cst_type == cst_type]
for cst in cst_list:
result = max(result, int(cst.alias[1:]))
return result
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create constituenta from data. '''
if insert_after is None:
position = INSERT_LAST
else:
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[insert_after.pk]) + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
result.crucial = data.get('crucial', False)
result.convention = data.get('convention', '')
result.definition_formal = data.get('definition_formal', '')
result.term_forms = data.get('term_forms', [])
result.term_raw = data.get('term_raw', '')
result.definition_raw = data.get('definition_raw', '')
if result.term_raw != '' or result.definition_raw != '':
resolver = self.resolver()
if result.term_raw != '':
resolved = resolver.resolve(result.term_raw)
result.term_resolved = resolved
resolver.context[result.alias] = Entity(result.alias, resolved)
if result.definition_raw != '':
result.definition_resolved = resolver.resolve(result.definition_raw)
result.save()
self.cache.insert(result)
self.after_term_change([result.pk])
result.refresh_from_db()
return result
def insert_new(
def insert_last(
self,
alias: str,
cst_type: Optional[CstType] = None,
position: int = INSERT_LAST,
**kwargs
) -> Constituenta:
''' Insert new constituenta at given position. '''
if self.constituents().filter(alias=alias).exists():
''' Insert new constituenta at last position. '''
if Constituenta.objects.filter(schema=self.model, alias=alias).exists():
raise ValidationError(msg.aliasTaken(alias))
position = self._get_insert_position(position)
if cst_type is None:
cst_type = guess_type(alias)
self._shift_positions(position, 1)
position = self.constituentsQ().count()
result = Constituenta.objects.create(
schema=self.model,
order=position,
@ -187,118 +78,17 @@ class RSForm:
cst_type=cst_type,
**kwargs
)
self.cache.insert(result)
self.save(update_fields=['time_update'])
self.model.save(update_fields=['time_update'])
return result
def insert_copy(
self,
items: list[Constituenta],
position: int = INSERT_LAST,
initial_mapping: Optional[dict[str, str]] = None
) -> list[Constituenta]:
''' Insert copy of target constituents updating references. '''
count = len(items)
if count == 0:
return []
self.cache.ensure_loaded()
position = self._get_insert_position(position)
self._shift_positions(position, count)
indices: dict[str, int] = {}
for (value, _) in CstType.choices:
indices[value] = -1
mapping: dict[str, str] = initial_mapping.copy() if initial_mapping else {}
for cst in items:
if indices[cst.cst_type] == -1:
indices[cst.cst_type] = self.get_max_index(cst.cst_type)
indices[cst.cst_type] = indices[cst.cst_type] + 1
newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}'
mapping[cst.alias] = newAlias
result = deepcopy(items)
for cst in result:
cst.pk = None
cst.schema = self.model
cst.order = position
cst.alias = mapping[cst.alias]
cst.apply_mapping(mapping)
position = position + 1
new_cst = Constituenta.objects.bulk_create(result)
self.cache.insert_multi(new_cst)
self.save(update_fields=['time_update'])
return result
# pylint: disable=too-many-branches
def update_cst(self, target: Constituenta, data: dict) -> dict:
''' Update persistent attributes of a given constituenta. Return old values. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target.pk)
if cst is None:
raise ValidationError(msg.constituentaNotInRSform(target.alias))
old_data = {}
term_changed = False
if 'convention' in data:
if cst.convention == data['convention']:
del data['convention']
else:
old_data['convention'] = cst.convention
cst.convention = data['convention']
if 'crucial' in data:
cst.crucial = data['crucial']
del data['crucial']
if 'definition_formal' in data:
if cst.definition_formal == data['definition_formal']:
del data['definition_formal']
else:
old_data['definition_formal'] = cst.definition_formal
cst.definition_formal = data['definition_formal']
if 'term_forms' in data:
term_changed = True
old_data['term_forms'] = cst.term_forms
cst.term_forms = data['term_forms']
if 'definition_raw' in data or 'term_raw' in data:
resolver = self.resolver()
if 'term_raw' in data:
if cst.term_raw == data['term_raw']:
del data['term_raw']
else:
term_changed = True
old_data['term_raw'] = cst.term_raw
cst.term_raw = data['term_raw']
cst.term_resolved = resolver.resolve(cst.term_raw)
if 'term_forms' not in data:
cst.term_forms = []
resolver.context[cst.alias] = Entity(cst.alias, cst.term_resolved, manual_forms=cst.term_forms)
if 'definition_raw' in data:
if cst.definition_raw == data['definition_raw']:
del data['definition_raw']
else:
old_data['definition_raw'] = cst.definition_raw
cst.definition_raw = data['definition_raw']
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
if term_changed:
self.after_term_change([cst.pk])
self.save(update_fields=['time_update'])
return old_data
def move_cst(self, target: list[Constituenta], destination: int) -> None:
''' Move list of constituents to specific position '''
''' Move list of constituents to specific position. '''
count_moved = 0
count_top = 0
count_bot = 0
size = len(target)
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituents().only('order').order_by('order')
else:
cst_list = self.cache.constituents
cst_list = Constituenta.objects.filter(schema=self.model).only('order').order_by('order')
for cst in cst_list:
if cst in target:
cst.order = destination + count_moved
@ -315,95 +105,21 @@ class RSForm:
def delete_cst(self, target: Iterable[Constituenta]) -> None:
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
mapping = {cst.alias: DELETED_ALIAS for cst in target}
self.cache.ensure_loaded()
self.cache.remove_multi(target)
self.apply_mapping(mapping)
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order()
self.save(update_fields=['time_update'])
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
''' Execute constituenta substitution. '''
mapping = {}
deleted: list[Constituenta] = []
replacements: list[Constituenta] = []
for original, substitution in substitutions:
mapping[original.alias] = substitution.alias
deleted.append(original)
replacements.append(substitution)
self.cache.remove_multi(deleted)
Constituenta.objects.filter(pk__in=[cst.pk for cst in deleted]).delete()
self._reset_order()
self.apply_mapping(mapping)
self.after_term_change([substitution.pk for substitution in replacements])
def restore_order(self) -> None:
''' Restore order based on types and term graph. '''
manager = _OrderManager(self)
manager.restore_order()
def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. '''
mapping = self._create_reset_mapping()
self.apply_mapping(mapping, change_aliases=True)
def change_cst_type(self, target: int, new_type: CstType) -> bool:
''' Change type of constituenta generating alias automatically. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target)
if cst is None:
return False
newAlias = f'{get_type_prefix(new_type)}{self.get_max_index(new_type) + 1}'
mapping = {cst.alias: newAlias}
cst.cst_type = new_type
cst.alias = newAlias
cst.save(update_fields=['cst_type', 'alias'])
self.apply_mapping(mapping)
return True
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
constituents = self.constituentsQ().only('alias', 'definition_formal', 'term_raw', 'definition_raw')
for cst in constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
if change_aliases:
self.cache.reset_aliases()
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def apply_partial_mapping(self, mapping: dict[str, str], target: list[int]) -> None:
''' Apply rename mapping to target constituents. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.pk in target:
if cst.apply_mapping(mapping):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
resolver = Resolver({})
update_list: list[Constituenta] = []
for cst_id in graph_terms.topological_order():
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
cst.term_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
for cst in self.cache.constituents:
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
Constituenta.objects.bulk_update(self.cache.constituents, ['definition_resolved'])
def create_version(self, version: str, description: str, data) -> Version:
''' Creates version for current state. '''
return Version.objects.create(
@ -413,369 +129,13 @@ class RSForm:
data=data
)
def produce_structure(self, target: Constituenta, parse: dict) -> list[Constituenta]:
''' Add constituents for each structural element of the target. '''
expressions = generate_structure(
alias=target.alias,
expression=target.definition_formal,
parse=parse
)
count_new = len(expressions)
if count_new == 0:
return []
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[target.id]) + 1
self._shift_positions(position, count_new)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self.get_max_index(cst_type) + 1
prefix = get_type_prefix(cst_type)
for text in expressions:
new_item = Constituenta.objects.create(
schema=self.model,
order=position,
alias=f'{prefix}{free_index}',
definition_formal=text,
cst_type=cst_type
)
result.append(new_item)
free_index = free_index + 1
position = position + 1
self.cache.insert_multi(result)
self.save(update_fields=['time_update'])
return result
def _create_reset_mapping(self) -> dict[str, str]:
bases = cast(dict[str, int], {})
mapping = cast(dict[str, str], {})
for cst_type in CstType.values:
bases[cst_type] = 1
cst_list = self.constituents().order_by('order')
for cst in cst_list:
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
return mapping
def _shift_positions(self, start: int, shift: int) -> None:
if shift == 0:
return
self.cache.ensure_loaded()
update_list = self.cache.constituents[start:]
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
def _get_insert_position(self, position: int) -> int:
if position < 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition())
lastPosition = self.constituents().count()
if position == INSERT_LAST:
return lastPosition
else:
return max(0, min(position, lastPosition))
def _reset_order(self) -> None:
order = 0
changed: list[Constituenta] = []
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituents().only('order').order_by('order')
else:
cst_list = self.cache.constituents
cst_list = self.constituentsQ().only('order').order_by('order')
for cst in cst_list:
if cst.order != order:
cst.order = order
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_globals(cst.definition_formal):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.term_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.definition_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
class RSFormCache:
''' Cache for RSForm constituents. '''
def __init__(self, schema: 'RSForm'):
self._schema = schema
self.constituents: list[Constituenta] = []
self.by_id: dict[int, Constituenta] = {}
self.by_alias: dict[str, Constituenta] = {}
self.is_loaded = False
def reload(self) -> None:
self.constituents = list(
self._schema.constituents().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
def reset_aliases(self) -> None:
self.by_alias = {cst.alias: cst for cst in self.constituents}
def clear(self) -> None:
self.constituents = []
self.by_id = {}
self.by_alias = {}
self.is_loaded = False
def insert(self, cst: Constituenta) -> None:
if self.is_loaded:
self.constituents.insert(cst.order, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def insert_multi(self, items: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in items:
self.constituents.insert(cst.order, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def remove(self, target: Constituenta) -> None:
if self.is_loaded:
self.constituents.remove(self.by_id[target.pk])
del self.by_id[target.pk]
del self.by_alias[target.alias]
def remove_multi(self, target: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in target:
self.constituents.remove(self.by_id[cst.pk])
del self.by_id[cst.pk]
del self.by_alias[cst.alias]
class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSForm):
schema.cache.ensure_loaded()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
self._cst_by_alias = schema.cache.by_alias
self.info = {
cst.pk: {
'is_simple': False,
'is_template': False,
'parent': cst.pk,
'children': []
}
for cst in schema.cache.constituents
}
self._calculate_attributes()
def __getitem__(self, key: int) -> dict:
return self.info[key]
def is_simple_expression(self, target: int) -> bool:
''' Access "is_simple" attribute. '''
return cast(bool, self.info[target]['is_simple'])
def is_template(self, target: int) -> bool:
''' Access "is_template" attribute. '''
return cast(bool, self.info[target]['is_template'])
def parent(self, target: int) -> int:
''' Access "parent" attribute. '''
return cast(int, self.info[target]['parent'])
def children(self, target: int) -> list[int]:
''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self) -> None:
for cst_id in self._graph.topological_order():
cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst)
if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED:
continue
parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent
if parent != cst_id:
cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
return False
dependencies = self._graph.inputs[target.pk]
has_complex_dependency = any(
self.is_template(cst_id) and
not self.is_simple_expression(cst_id) for cst_id in dependencies
)
if has_complex_dependency:
return False
if is_functional(target.cst_type):
return is_simple_expression(split_template(target.definition_formal)['body'])
else:
return is_simple_expression(target.definition_formal)
def _infer_parent(self, target: Constituenta) -> int:
sources = self._extract_sources(target)
if len(sources) != 1:
return target.pk
parent_id = next(iter(sources))
parent = self._cst_by_ID[parent_id]
if is_base_set(parent.cst_type):
return target.pk
return parent_id
def _extract_sources(self, target: Constituenta) -> set[int]:
sources: set[int] = set()
if not is_functional(target.cst_type):
for parent_id in self._graph.inputs[target.pk]:
parent_info = self[parent_id]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
return sources
expression = split_template(target.definition_formal)
body_dependencies = extract_globals(expression['body'])
for alias in body_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
if self._need_check_head(sources, expression['head']):
head_dependencies = extract_globals(expression['head'])
for alias in head_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not is_base_set(parent.cst_type) and \
(not parent_info['is_template'] or not parent_info['is_simple']):
sources.add(parent_info['parent'])
return sources
def _need_check_head(self, sources: set[int], head: str) -> bool:
if len(sources) == 0:
return True
elif len(sources) != 1:
return False
else:
base = self._cst_by_ID[next(iter(sources))]
return not is_functional(base.cst_type) or \
split_template(base.definition_formal)['head'] != head
class _OrderManager:
''' Ordering helper class '''
def __init__(self, schema: RSForm):
self._semantic = schema.semantic()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
def restore_order(self) -> None:
''' Implement order restoration process. '''
if len(self._items) <= 1:
return
self._fix_kernel()
self._fix_topological()
self._fix_semantic_children()
self._save_order()
def _fix_topological(self) -> None:
sorted_ids = self._graph.sort_stable([cst.pk for cst in self._items])
sorted_items = [next(cst for cst in self._items if cst.pk == id) for id in sorted_ids]
self._items = sorted_items
def _fix_kernel(self) -> None:
result = [cst for cst in self._items if cst.cst_type == CstType.BASE]
result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT]
kernel = [
cst.pk for cst in self._items if
cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or
self._cst_by_ID[self._semantic.parent(cst.pk)].cst_type == CstType.STRUCTURED
]
kernel = kernel + self._graph.expand_inputs(kernel)
result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.pk in kernel]
result = result + [cst for cst in self._items if result.count(cst) == 0]
self._items = result
def _fix_semantic_children(self) -> None:
result: list[Constituenta] = []
marked: set[Constituenta] = set()
for cst in self._items:
if cst in marked:
continue
result.append(cst)
children = self._semantic[cst.pk]['children']
if len(children) == 0:
continue
for child in self._items:
if child.pk in children:
marked.add(child)
result.append(child)
self._items = result
def _save_order(self) -> None:
order = 0
for cst in self._items:
cst.order = order
order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -0,0 +1,732 @@
''' Models: RSForm API. '''
# pylint: disable=duplicate-code
from copy import deepcopy
from typing import Iterable, Optional, cast
from cctext import Entity, Resolver, extract_entities
from django.core.exceptions import ValidationError
from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType
from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
generate_structure,
get_type_prefix,
guess_type,
infer_template,
is_base_set,
is_functional,
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType, extract_globals
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm
class RSFormCached:
''' RSForm cached. Caching allows to avoid querying for each method call. '''
def __init__(self, model: LibraryItem):
self.model = model
self.cache: _RSFormCache = _RSFormCache(self)
@staticmethod
def create(**kwargs) -> 'RSFormCached':
''' Create LibraryItem via RSForm. '''
model = LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, **kwargs)
return RSFormCached(model)
@staticmethod
def from_id(pk: int) -> 'RSFormCached':
''' Get LibraryItem by pk. '''
model = LibraryItem.objects.get(pk=pk)
return RSFormCached(model)
def get_dependant(self, target: Iterable[int]) -> set[int]:
''' Get list of constituents depending on target (only 1st degree). '''
result: set[int] = set()
terms = self._graph_term()
formal = self._graph_formal()
definitions = self._graph_text()
for cst_id in target:
result.update(formal.outputs[cst_id])
result.update(terms.outputs[cst_id])
result.update(definitions.outputs[cst_id])
return result
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
self.cache.is_loaded = False
def constituentsQ(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.model)
def semantic(self) -> 'SemanticInfo':
''' Access semantic information on constituents. '''
return SemanticInfo(self)
def after_term_change(self, changed: list[int]) -> None:
''' Trigger cascade resolutions when term changes. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
update_list: list[Constituenta] = []
resolver = RSForm.spawn_resolver(self.model.pk)
if len(expansion) > 0:
for cst_id in graph_terms.topological_order():
if cst_id not in expansion:
continue
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
if resolved == resolver.context[cst.alias].get_nominal():
continue
cst.set_term_resolved(resolved)
update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
graph_defs = self._graph_text()
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0:
return
for cst_id in update_defs:
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create constituenta from data. '''
if insert_after is None:
position = INSERT_LAST
else:
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[insert_after.pk]) + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
result.crucial = data.get('crucial', False)
result.convention = data.get('convention', '')
result.definition_formal = data.get('definition_formal', '')
result.term_forms = data.get('term_forms', [])
result.term_raw = data.get('term_raw', '')
result.definition_raw = data.get('definition_raw', '')
if result.term_raw != '' or result.definition_raw != '':
resolver = RSForm.spawn_resolver(self.model.pk)
if result.term_raw != '':
resolved = resolver.resolve(result.term_raw)
result.term_resolved = resolved
resolver.context[result.alias] = Entity(result.alias, resolved)
if result.definition_raw != '':
result.definition_resolved = resolver.resolve(result.definition_raw)
result.save()
self.cache.insert(result)
self.after_term_change([result.pk])
result.refresh_from_db()
return result
def insert_new(
self,
alias: str,
cst_type: Optional[CstType] = None,
position: int = INSERT_LAST,
**kwargs
) -> Constituenta:
''' Insert new constituenta at given position. '''
if Constituenta.objects.filter(schema=self.model, alias=alias):
raise ValidationError(msg.aliasTaken(alias))
position = self._get_insert_position(position)
if cst_type is None:
cst_type = guess_type(alias)
self._shift_positions(position, 1)
result = Constituenta.objects.create(
schema=self.model,
order=position,
alias=alias,
cst_type=cst_type,
**kwargs
)
self.cache.insert(result)
self.save(update_fields=['time_update'])
return result
def insert_copy(
self,
items: list[Constituenta],
position: int = INSERT_LAST,
initial_mapping: Optional[dict[str, str]] = None
) -> list[Constituenta]:
''' Insert copy of target constituents updating references. '''
count = len(items)
if count == 0:
return []
self.cache.ensure_loaded()
position = self._get_insert_position(position)
self._shift_positions(position, count)
indices: dict[str, int] = {}
for (value, _) in CstType.choices:
indices[value] = -1
mapping: dict[str, str] = initial_mapping.copy() if initial_mapping else {}
for cst in items:
if indices[cst.cst_type] == -1:
indices[cst.cst_type] = self._get_max_index(cst.cst_type)
indices[cst.cst_type] = indices[cst.cst_type] + 1
newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}'
mapping[cst.alias] = newAlias
result = deepcopy(items)
for cst in result:
cst.pk = None
cst.schema = self.model
cst.order = position
cst.alias = mapping[cst.alias]
cst.apply_mapping(mapping)
position = position + 1
new_cst = Constituenta.objects.bulk_create(result)
self.cache.insert_multi(new_cst)
self.save(update_fields=['time_update'])
return result
# pylint: disable=too-many-branches
def update_cst(self, target: Constituenta, data: dict) -> dict:
''' Update persistent attributes of a given constituenta. Return old values. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target.pk)
if cst is None:
raise ValidationError(msg.constituentaNotInRSform(target.alias))
old_data = {}
term_changed = False
if 'convention' in data:
if cst.convention == data['convention']:
del data['convention']
else:
old_data['convention'] = cst.convention
cst.convention = data['convention']
if 'crucial' in data:
cst.crucial = data['crucial']
del data['crucial']
if 'definition_formal' in data:
if cst.definition_formal == data['definition_formal']:
del data['definition_formal']
else:
old_data['definition_formal'] = cst.definition_formal
cst.definition_formal = data['definition_formal']
if 'term_forms' in data:
term_changed = True
old_data['term_forms'] = cst.term_forms
cst.term_forms = data['term_forms']
if 'definition_raw' in data or 'term_raw' in data:
resolver = RSForm.spawn_resolver(self.model.pk)
if 'term_raw' in data:
if cst.term_raw == data['term_raw']:
del data['term_raw']
else:
term_changed = True
old_data['term_raw'] = cst.term_raw
cst.term_raw = data['term_raw']
cst.term_resolved = resolver.resolve(cst.term_raw)
if 'term_forms' not in data:
cst.term_forms = []
resolver.context[cst.alias] = Entity(cst.alias, cst.term_resolved, manual_forms=cst.term_forms)
if 'definition_raw' in data:
if cst.definition_raw == data['definition_raw']:
del data['definition_raw']
else:
old_data['definition_raw'] = cst.definition_raw
cst.definition_raw = data['definition_raw']
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
if term_changed:
self.after_term_change([cst.pk])
self.save(update_fields=['time_update'])
return old_data
def delete_cst(self, target: Iterable[Constituenta]) -> None:
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
mapping = {cst.alias: DELETED_ALIAS for cst in target}
self.cache.ensure_loaded()
self.cache.remove_multi(target)
self.apply_mapping(mapping)
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order()
self.save(update_fields=['time_update'])
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
''' Execute constituenta substitution. '''
mapping = {}
deleted: list[Constituenta] = []
replacements: list[Constituenta] = []
for original, substitution in substitutions:
mapping[original.alias] = substitution.alias
deleted.append(original)
replacements.append(substitution)
self.cache.remove_multi(deleted)
Constituenta.objects.filter(pk__in=[cst.pk for cst in deleted]).delete()
self._reset_order()
self.apply_mapping(mapping)
self.after_term_change([substitution.pk for substitution in replacements])
def restore_order(self) -> None:
''' Restore order based on types and term graph. '''
manager = _OrderManager(self)
manager.restore_order()
def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. '''
mapping = self._create_reset_mapping()
self.apply_mapping(mapping, change_aliases=True)
def change_cst_type(self, target: int, new_type: CstType) -> bool:
''' Change type of constituenta generating alias automatically. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target)
if cst is None:
return False
newAlias = f'{get_type_prefix(new_type)}{self._get_max_index(new_type) + 1}'
mapping = {cst.alias: newAlias}
cst.cst_type = new_type
cst.alias = newAlias
cst.save(update_fields=['cst_type', 'alias'])
self.apply_mapping(mapping)
return True
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
if change_aliases:
self.cache.reset_aliases()
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def apply_partial_mapping(self, mapping: dict[str, str], target: list[int]) -> None:
''' Apply rename mapping to target constituents. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.pk in target:
if cst.apply_mapping(mapping):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
resolver = Resolver({})
update_list: list[Constituenta] = []
for cst_id in graph_terms.topological_order():
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
cst.term_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
for cst in self.cache.constituents:
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
Constituenta.objects.bulk_update(self.cache.constituents, ['definition_resolved'])
def produce_structure(self, target: Constituenta, parse: dict) -> list[Constituenta]:
''' Add constituents for each structural element of the target. '''
expressions = generate_structure(
alias=target.alias,
expression=target.definition_formal,
parse=parse
)
count_new = len(expressions)
if count_new == 0:
return []
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[target.id]) + 1
self._shift_positions(position, count_new)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self._get_max_index(cst_type) + 1
prefix = get_type_prefix(cst_type)
for text in expressions:
new_item = Constituenta.objects.create(
schema=self.model,
order=position,
alias=f'{prefix}{free_index}',
definition_formal=text,
cst_type=cst_type
)
result.append(new_item)
free_index = free_index + 1
position = position + 1
self.cache.insert_multi(result)
self.save(update_fields=['time_update'])
return result
def _get_max_index(self, cst_type: str) -> int:
''' Get maximum alias index for specific CstType. '''
result: int = 0
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = Constituenta.objects \
.filter(schema=self.model, cst_type=cst_type) \
.only('alias')
else:
cst_list = [cst for cst in self.cache.constituents if cst.cst_type == cst_type]
for cst in cst_list:
result = max(result, int(cst.alias[1:]))
return result
def _create_reset_mapping(self) -> dict[str, str]:
bases = cast(dict[str, int], {})
mapping = cast(dict[str, str], {})
for cst_type in CstType.values:
bases[cst_type] = 1
cst_list = self.constituentsQ().order_by('order')
for cst in cst_list:
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
return mapping
def _shift_positions(self, start: int, shift: int) -> None:
if shift == 0:
return
self.cache.ensure_loaded()
update_list = self.cache.constituents[start:]
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
def _get_insert_position(self, position: int) -> int:
if position < 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition())
lastPosition = self.constituentsQ().count()
if position == INSERT_LAST:
return lastPosition
else:
return max(0, min(position, lastPosition))
def _reset_order(self) -> None:
order = 0
changed: list[Constituenta] = []
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituentsQ().only('order').order_by('order')
else:
cst_list = self.cache.constituents
for cst in cst_list:
if cst.order != order:
cst.order = order
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_globals(cst.definition_formal):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.term_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.definition_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
class _RSFormCache:
''' Cache for RSForm constituents. '''
def __init__(self, schema: 'RSFormCached'):
self._schema = schema
self.constituents: list[Constituenta] = []
self.by_id: dict[int, Constituenta] = {}
self.by_alias: dict[str, Constituenta] = {}
self.is_loaded = False
def reload(self) -> None:
self.constituents = list(
self._schema.constituentsQ().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
def reset_aliases(self) -> None:
self.by_alias = {cst.alias: cst for cst in self.constituents}
def clear(self) -> None:
self.constituents = []
self.by_id = {}
self.by_alias = {}
self.is_loaded = False
def insert(self, cst: Constituenta) -> None:
if self.is_loaded:
self.constituents.insert(cst.order, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def insert_multi(self, items: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in items:
self.constituents.insert(cst.order, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def remove(self, target: Constituenta) -> None:
if self.is_loaded:
self.constituents.remove(self.by_id[target.pk])
del self.by_id[target.pk]
del self.by_alias[target.alias]
def remove_multi(self, target: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in target:
self.constituents.remove(self.by_id[cst.pk])
del self.by_id[cst.pk]
del self.by_alias[cst.alias]
class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSFormCached):
schema.cache.ensure_loaded()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
self._cst_by_alias = schema.cache.by_alias
self.info = {
cst.pk: {
'is_simple': False,
'is_template': False,
'parent': cst.pk,
'children': []
}
for cst in schema.cache.constituents
}
self._calculate_attributes()
def __getitem__(self, key: int) -> dict:
return self.info[key]
def is_simple_expression(self, target: int) -> bool:
''' Access "is_simple" attribute. '''
return cast(bool, self.info[target]['is_simple'])
def is_template(self, target: int) -> bool:
''' Access "is_template" attribute. '''
return cast(bool, self.info[target]['is_template'])
def parent(self, target: int) -> int:
''' Access "parent" attribute. '''
return cast(int, self.info[target]['parent'])
def children(self, target: int) -> list[int]:
''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self) -> None:
for cst_id in self._graph.topological_order():
cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst)
if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED:
continue
parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent
if parent != cst_id:
cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
return False
dependencies = self._graph.inputs[target.pk]
has_complex_dependency = any(
self.is_template(cst_id) and
not self.is_simple_expression(cst_id) for cst_id in dependencies
)
if has_complex_dependency:
return False
if is_functional(target.cst_type):
return is_simple_expression(split_template(target.definition_formal)['body'])
else:
return is_simple_expression(target.definition_formal)
def _infer_parent(self, target: Constituenta) -> int:
sources = self._extract_sources(target)
if len(sources) != 1:
return target.pk
parent_id = next(iter(sources))
parent = self._cst_by_ID[parent_id]
if is_base_set(parent.cst_type):
return target.pk
return parent_id
def _extract_sources(self, target: Constituenta) -> set[int]:
sources: set[int] = set()
if not is_functional(target.cst_type):
for parent_id in self._graph.inputs[target.pk]:
parent_info = self[parent_id]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
return sources
expression = split_template(target.definition_formal)
body_dependencies = extract_globals(expression['body'])
for alias in body_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
if self._need_check_head(sources, expression['head']):
head_dependencies = extract_globals(expression['head'])
for alias in head_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not is_base_set(parent.cst_type) and \
(not parent_info['is_template'] or not parent_info['is_simple']):
sources.add(parent_info['parent'])
return sources
def _need_check_head(self, sources: set[int], head: str) -> bool:
if len(sources) == 0:
return True
elif len(sources) != 1:
return False
else:
base = self._cst_by_ID[next(iter(sources))]
return not is_functional(base.cst_type) or \
split_template(base.definition_formal)['head'] != head
class _OrderManager:
''' Ordering helper class '''
def __init__(self, schema: RSFormCached):
self._semantic = schema.semantic()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
def restore_order(self) -> None:
''' Implement order restoration process. '''
if len(self._items) <= 1:
return
self._fix_kernel()
self._fix_topological()
self._fix_semantic_children()
self._save_order()
def _fix_topological(self) -> None:
sorted_ids = self._graph.sort_stable([cst.pk for cst in self._items])
sorted_items = [next(cst for cst in self._items if cst.pk == id) for id in sorted_ids]
self._items = sorted_items
def _fix_kernel(self) -> None:
result = [cst for cst in self._items if cst.cst_type == CstType.BASE]
result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT]
kernel = [
cst.pk for cst in self._items if
cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or
self._cst_by_ID[self._semantic.parent(cst.pk)].cst_type == CstType.STRUCTURED
]
kernel = kernel + self._graph.expand_inputs(kernel)
result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.pk in kernel]
result = result + [cst for cst in self._items if result.count(cst) == 0]
self._items = result
def _fix_semantic_children(self) -> None:
result: list[Constituenta] = []
marked: set[Constituenta] = set()
for cst in self._items:
if cst in marked:
continue
result.append(cst)
children = self._semantic[cst.pk]['children']
if len(children) == 0:
continue
for child in self._items:
if child.pk in children:
marked.add(child)
result.append(child)
self._items = result
def _save_order(self) -> None:
order = 0
for cst in self._items:
cst.order = order
order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -1,4 +1,5 @@
''' Django: Models. '''
from .Constituenta import Constituenta, CstType, extract_globals, replace_entities, replace_globals
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm, SemanticInfo
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm
from .RSFormCached import RSFormCached, SemanticInfo

View File

@ -25,6 +25,6 @@ from .data_access import (
RSFormSerializer,
SubstitutionSerializerBase
)
from .io_files import FileSerializer, RSFormTRSSerializer, RSFormUploadSerializer
from .io_files import FileSerializer, RSFormTRSSerializer, RSFormUploadSerializer, generate_trs
from .io_pyconcept import PyConceptAdapter
from .responses import NewCstResponse, NewMultiCstResponse, ResultTextResponse

View File

@ -52,7 +52,10 @@ class CstUpdateSerializer(StrictSerializer):
target = PKField(
many=False,
queryset=Constituenta.objects.all().only(
'alias', 'cst_type', 'convention', 'crucial', 'definition_formal', 'definition_raw', 'term_raw')
'schema_id',
'alias', 'cst_type', 'convention', 'crucial',
'definition_formal', 'definition_raw', 'term_raw'
)
)
item_data = ConstituentaUpdateData()
@ -65,7 +68,7 @@ class CstUpdateSerializer(StrictSerializer):
})
if 'alias' in attrs['item_data']:
new_alias = attrs['item_data']['alias']
if cst.alias != new_alias and RSForm(schema).constituents().filter(alias=new_alias).exists():
if cst.alias != new_alias and Constituenta.objects.filter(schema=schema, alias=new_alias).exists():
raise serializers.ValidationError({
'alias': msg.aliasTaken(new_alias)
})
@ -161,7 +164,7 @@ class RSFormSerializer(StrictModelSerializer):
result['items'] = []
result['oss'] = []
result['inheritance'] = []
for cst in RSForm(instance).constituents().defer('order').order_by('order'):
for cst in Constituenta.objects.filter(schema=instance).defer('order').order_by('order'):
result['items'].append(CstInfoSerializer(cst).data)
for oss in LibraryItem.objects.filter(operations__result=instance).only('alias'):
result['oss'].append({
@ -199,7 +202,7 @@ class RSFormSerializer(StrictModelSerializer):
ids: list[int] = [item['id'] for item in items]
processed: list[int] = []
for cst in schema.constituents():
for cst in schema.constituentsQ():
if not cst.pk in ids:
cst.delete()
else:
@ -216,7 +219,7 @@ class RSFormSerializer(StrictModelSerializer):
for cst_data in items:
if cst_data['id'] not in processed:
cst = schema.insert_new(cst_data['alias'])
cst = schema.insert_last(cst_data['alias'])
old_id = cst_data['id']
cst_data['id'] = cst.pk
cst_data['schema'] = cast(LibraryItem, self.instance).pk

View File

@ -6,7 +6,7 @@ from apps.library.models import LibraryItem
from shared import messages as msg
from shared.serializers import StrictSerializer
from ..models import Constituenta, RSForm
from ..models import Constituenta, RSFormCached
from ..utils import fix_old_references
_CST_TYPE = 'constituenta'
@ -27,53 +27,49 @@ class RSFormUploadSerializer(StrictSerializer):
load_metadata = serializers.BooleanField()
def generate_trs(schema: LibraryItem) -> dict:
''' Generate TRS file for RSForm. '''
items = []
for cst in Constituenta.objects.filter(schema=schema).order_by('order'):
items.append(
{
'entityUID': cst.pk,
'type': _CST_TYPE,
'cstType': cst.cst_type,
'alias': cst.alias,
'convention': cst.convention,
'term': {
'raw': cst.term_raw,
'resolved': cst.term_resolved,
'forms': cst.term_forms
},
'definition': {
'formal': cst.definition_formal,
'text': {
'raw': cst.definition_raw,
'resolved': cst.definition_resolved
},
},
}
)
return {
'type': _TRS_TYPE,
'title': schema.title,
'alias': schema.alias,
'comment': schema.description,
'items': items,
'claimed': False,
'selection': [],
'version': _TRS_VERSION,
'versionInfo': _TRS_HEADER
}
class RSFormTRSSerializer(serializers.Serializer):
''' Serializer: TRS file production and loading for RSForm. '''
def to_representation(self, instance: RSForm) -> dict:
result = self._prepare_json_rsform(instance.model)
items = instance.constituents().order_by('order')
for cst in items:
result['items'].append(self._prepare_json_constituenta(cst))
return result
@staticmethod
def _prepare_json_rsform(schema: LibraryItem) -> dict:
return {
'type': _TRS_TYPE,
'title': schema.title,
'alias': schema.alias,
'comment': schema.description,
'items': [],
'claimed': False,
'selection': [],
'version': _TRS_VERSION,
'versionInfo': _TRS_HEADER
}
@staticmethod
def _prepare_json_constituenta(cst: Constituenta) -> dict:
return {
'entityUID': cst.pk,
'type': _CST_TYPE,
'cstType': cst.cst_type,
'alias': cst.alias,
'convention': cst.convention,
'term': {
'raw': cst.term_raw,
'resolved': cst.term_resolved,
'forms': cst.term_forms
},
'definition': {
'formal': cst.definition_formal,
'text': {
'raw': cst.definition_raw,
'resolved': cst.definition_resolved
},
},
}
def from_versioned_data(self, data: dict) -> dict:
def load_versioned_data(data: dict) -> dict:
''' Load data from version. '''
result = {
'type': _TRS_TYPE,
@ -127,7 +123,7 @@ class RSFormTRSSerializer(serializers.Serializer):
result['description'] = data.get('description', '')
if 'id' in data:
result['id'] = data['id']
self.instance = RSForm.from_id(result['id'])
self.instance = RSFormCached.from_id(result['id'])
return result
def validate(self, attrs: dict):
@ -140,8 +136,8 @@ class RSFormTRSSerializer(serializers.Serializer):
return attrs
@transaction.atomic
def create(self, validated_data: dict) -> RSForm:
self.instance: RSForm = RSForm.create(
def create(self, validated_data: dict) -> RSFormCached:
self.instance: RSFormCached = RSFormCached.create(
owner=validated_data.get('owner', None),
alias=validated_data['alias'],
title=validated_data['title'],
@ -167,7 +163,7 @@ class RSFormTRSSerializer(serializers.Serializer):
return self.instance
@transaction.atomic
def update(self, instance: RSForm, validated_data) -> RSForm:
def update(self, instance: RSFormCached, validated_data) -> RSFormCached:
if 'alias' in validated_data:
instance.model.alias = validated_data['alias']
if 'title' in validated_data:
@ -176,7 +172,7 @@ class RSFormTRSSerializer(serializers.Serializer):
instance.model.description = validated_data['description']
order = 0
prev_constituents = instance.constituents()
prev_constituents = instance.constituentsQ()
loaded_ids = set()
for cst_data in validated_data['items']:
uid = int(cst_data['entityUID'])

View File

@ -6,20 +6,20 @@ import pyconcept
from shared import messages as msg
from ..models import RSForm
from ..models import Constituenta
class PyConceptAdapter:
''' RSForm adapter for interacting with pyconcept module. '''
def __init__(self, data: Union[RSForm, dict]):
def __init__(self, data: Union[int, dict]):
try:
if 'items' in cast(dict, data):
self.data = self._prepare_request_raw(cast(dict, data))
else:
self.data = self._prepare_request(cast(RSForm, data))
self.data = self._prepare_request(cast(int, data))
except TypeError:
self.data = self._prepare_request(cast(RSForm, data))
self.data = self._prepare_request(cast(int, data))
self._checked_data: Optional[dict] = None
def parse(self) -> dict:
@ -30,11 +30,11 @@ class PyConceptAdapter:
raise ValueError(msg.pyconceptFailure())
return self._checked_data
def _prepare_request(self, schema: RSForm) -> dict:
def _prepare_request(self, schemaID: int) -> dict:
result: dict = {
'items': []
}
items = schema.constituents().order_by('order')
items = Constituenta.objects.filter(schema_id=schemaID).order_by('order')
for cst in items:
result['items'].append({
'entityUID': cst.pk,

View File

@ -1,3 +1,4 @@
''' Tests for Django Models. '''
from .t_Constituenta import *
from .t_RSForm import *
from .t_RSFormCached import *

View File

@ -14,172 +14,44 @@ class TestRSForm(DBTester):
super().setUp()
self.user1 = User.objects.create(username='User1')
self.user2 = User.objects.create(username='User2')
self.schema = RSForm.create(title='Test')
self.assertNotEqual(self.user1, self.user2)
self.schema = RSForm.create(title='Test')
def test_constituents(self):
schema1 = RSForm.create(title='Test1')
schema2 = RSForm.create(title='Test2')
self.assertFalse(schema1.constituents().exists())
self.assertFalse(schema2.constituents().exists())
self.assertFalse(schema1.constituentsQ().exists())
self.assertFalse(schema2.constituentsQ().exists())
Constituenta.objects.create(alias='X1', schema=schema1.model, order=0)
Constituenta.objects.create(alias='X2', schema=schema1.model, order=1)
self.assertTrue(schema1.constituents().exists())
self.assertFalse(schema2.constituents().exists())
self.assertEqual(schema1.constituents().count(), 2)
def test_get_max_index(self):
schema1 = RSForm.create(title='Test1')
Constituenta.objects.create(alias='X1', schema=schema1.model, order=0)
Constituenta.objects.create(alias='D2', cst_type=CstType.TERM, schema=schema1.model, order=1)
self.assertEqual(schema1.get_max_index(CstType.BASE), 1)
self.assertEqual(schema1.get_max_index(CstType.TERM), 2)
self.assertEqual(schema1.get_max_index(CstType.AXIOM), 0)
def test_insert_at(self):
schema = RSForm.create(title='Test')
x1 = schema.insert_new('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, schema.model)
x2 = schema.insert_new('X2', position=0)
x1.refresh_from_db()
self.assertEqual(x2.order, 0)
self.assertEqual(x2.schema, schema.model)
self.assertEqual(x1.order, 1)
x3 = schema.insert_new('X3', position=3)
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x3.order, 2)
self.assertEqual(x3.schema, schema.model)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
x4 = schema.insert_new('X4', position=2)
x3.refresh_from_db()
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x4.order, 2)
self.assertEqual(x4.schema, schema.model)
self.assertEqual(x3.order, 3)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
def test_insert_at_invalid_position(self):
with self.assertRaises(ValidationError):
self.schema.insert_new('X5', position=-2)
self.assertTrue(schema1.constituentsQ().exists())
self.assertFalse(schema2.constituentsQ().exists())
self.assertEqual(schema1.constituentsQ().count(), 2)
def test_insert_at_invalid_alias(self):
self.schema.insert_new('X1')
self.schema.insert_last('X1')
with self.assertRaises(ValidationError):
self.schema.insert_new('X1')
def test_insert_at_reorder(self):
self.schema.insert_new('X1')
d1 = self.schema.insert_new('D1')
d2 = self.schema.insert_new('D2', position=0)
d1.refresh_from_db()
self.assertEqual(d1.order, 2)
self.assertEqual(d2.order, 0)
x2 = self.schema.insert_new('X2', position=3)
self.assertEqual(x2.order, 3)
self.schema.insert_last('X1')
def test_insert_last(self):
x1 = self.schema.insert_new('X1')
x1 = self.schema.insert_last('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, self.schema.model)
x2 = self.schema.insert_new('X2')
x2 = self.schema.insert_last('X2')
self.assertEqual(x2.order, 1)
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 0)
def test_create_cst(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': 'test',
'convention': 'convention'
}
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x3 = self.schema.create_cst(data=data, insert_after=x1)
x2.refresh_from_db()
self.assertEqual(x3.alias, data['alias'])
self.assertEqual(x3.term_raw, data['term_raw'])
self.assertEqual(x3.definition_raw, data['definition_raw'])
self.assertEqual(x2.order, 2)
self.assertEqual(x3.order, 1)
def test_create_cst_resolve(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='@{X2|datv}',
definition_raw='@{X1|datv} @{X2|datv}'
)
x2 = self.schema.create_cst({
'alias': 'X2',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': '@{X1|plur} @{X2|plur}'
})
x1.refresh_from_db()
self.assertEqual(x1.term_resolved, 'слону')
self.assertEqual(x1.definition_resolved, 'слону слону')
self.assertEqual(x2.term_resolved, 'слон')
self.assertEqual(x2.definition_resolved, 'слонам слоны')
def test_insert_copy(self):
x1 = self.schema.insert_new(
alias='X10',
convention='Test'
)
s1 = self.schema.insert_new(
alias='S11',
definition_formal=x1.alias,
definition_raw='@{X10|plur}'
)
result = self.schema.insert_copy([s1, x1], 1)
self.assertEqual(len(result), 2)
s1.refresh_from_db()
self.assertEqual(s1.order, 3)
x2 = result[1]
self.assertEqual(x2.order, 2)
self.assertEqual(x2.alias, 'X11')
self.assertEqual(x2.cst_type, CstType.BASE)
self.assertEqual(x2.convention, x1.convention)
s2 = result[0]
self.assertEqual(s2.order, 1)
self.assertEqual(s2.alias, 'S12')
self.assertEqual(s2.cst_type, CstType.STRUCTURED)
self.assertEqual(s2.definition_formal, x2.alias)
self.assertEqual(s2.definition_raw, '@{X11|plur}')
def test_delete_cst(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new(
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X2',
definition_raw='@{X1|sing}',
@ -189,7 +61,7 @@ class TestRSForm(DBTester):
self.schema.delete_cst([x1])
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituents().count(), 2)
self.assertEqual(self.schema.constituentsQ().count(), 2)
self.assertEqual(x2.order, 0)
self.assertEqual(d1.order, 1)
self.assertEqual(d1.definition_formal, 'DEL = X2')
@ -198,9 +70,9 @@ class TestRSForm(DBTester):
def test_apply_mapping(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X11')
d1 = self.schema.insert_new(
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X11')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}',
@ -216,33 +88,11 @@ class TestRSForm(DBTester):
self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping')
def test_substitute(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='Test'
)
x2 = self.schema.insert_new(
alias='X2',
term_raw='Test2'
)
d1 = self.schema.insert_new(
alias='D1',
definition_formal=x1.alias
)
self.schema.substitute([(x1, x2)])
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituents().count(), 2)
self.assertEqual(x2.term_raw, 'Test2')
self.assertEqual(d1.definition_formal, x2.alias)
def test_move_cst(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new('D1')
d2 = self.schema.insert_new('D2')
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
d1 = self.schema.insert_last('D1')
d2 = self.schema.insert_last('D2')
self.schema.move_cst([x2, d2], 0)
x1.refresh_from_db()
x2.refresh_from_db()
@ -255,151 +105,10 @@ class TestRSForm(DBTester):
def test_move_cst_down(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
self.schema.move_cst([x1], 1)
x1.refresh_from_db()
x2.refresh_from_db()
self.assertEqual(x1.order, 1)
self.assertEqual(x2.order, 0)
def test_restore_order(self):
d2 = self.schema.insert_new(
alias='D2',
definition_formal=r'D{ξ∈S1 | 1=1}',
)
d1 = self.schema.insert_new(
alias='D1',
definition_formal=r'Pr1(S1)\X1',
)
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
s1 = self.schema.insert_new(
alias='S1',
definition_formal='(X1×X1)'
)
c1 = self.schema.insert_new('C1')
s2 = self.schema.insert_new(
alias='S2',
definition_formal='(X2×D1)'
)
a1 = self.schema.insert_new(
alias='A1',
definition_formal=r'D3=∅',
)
d3 = self.schema.insert_new(
alias='D3',
definition_formal=r'Pr2(S2)',
)
f1 = self.schema.insert_new(
alias='F1',
definition_formal=r'[α∈ℬ(X1)] D{σ∈S1 | α⊆pr1(σ)}',
)
d4 = self.schema.insert_new(
alias='D4',
definition_formal=r'Pr2(D3)',
)
f2 = self.schema.insert_new(
alias='F2',
definition_formal=r'[α∈ℬ(X1)] X1\α',
)
self.schema.restore_order()
x1.refresh_from_db()
x2.refresh_from_db()
c1.refresh_from_db()
s1.refresh_from_db()
s2.refresh_from_db()
d1.refresh_from_db()
d2.refresh_from_db()
d3.refresh_from_db()
d4.refresh_from_db()
f1.refresh_from_db()
f2.refresh_from_db()
a1.refresh_from_db()
self.assertEqual(x1.order, 0)
self.assertEqual(x2.order, 1)
self.assertEqual(c1.order, 2)
self.assertEqual(s1.order, 3)
self.assertEqual(d1.order, 4)
self.assertEqual(s2.order, 5)
self.assertEqual(d3.order, 6)
self.assertEqual(a1.order, 7)
self.assertEqual(d4.order, 8)
self.assertEqual(d2.order, 9)
self.assertEqual(f1.order, 10)
self.assertEqual(f2.order, 11)
def test_reset_aliases(self):
x1 = self.schema.insert_new(
alias='X11',
term_raw='человек',
term_resolved='человек'
)
x2 = self.schema.insert_new('X21')
d1 = self.schema.insert_new(
alias='D11',
definition_formal='X21=X21',
term_raw='@{X21|sing}',
definition_raw='@{X11|datv}',
definition_resolved='test'
)
self.schema.reset_aliases()
x1.refresh_from_db()
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(x1.alias, 'X1')
self.assertEqual(x2.alias, 'X2')
self.assertEqual(d1.alias, 'D1')
self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test')
def test_on_term_change(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='человек',
term_resolved='человек',
definition_raw='одному @{X1|datv}',
definition_resolved='одному человеку',
)
x2 = self.schema.insert_new(
alias='X2',
term_raw='сильный @{X1|sing}',
term_resolved='сильный человек',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
x3 = self.schema.insert_new(
alias='X3',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
d1 = self.schema.insert_new(
alias='D1',
definition_raw='очень @{X2|sing}',
definition_resolved='очень сильный человек'
)
x1.term_raw = 'слон'
x1.term_resolved = 'слон'
x1.save()
self.schema.after_term_change([x1.pk])
x1.refresh_from_db()
x2.refresh_from_db()
x3.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(x1.term_raw, 'слон')
self.assertEqual(x1.term_resolved, 'слон')
self.assertEqual(x1.definition_resolved, 'одному слону')
self.assertEqual(x2.definition_resolved, x1.definition_resolved)
self.assertEqual(x3.definition_resolved, x1.definition_resolved)
self.assertEqual(d1.definition_resolved, 'очень сильный слон')

View File

@ -0,0 +1,369 @@
''' Testing models: api_RSForm. '''
from django.forms import ValidationError
from apps.rsform.models import Constituenta, CstType, RSFormCached
from apps.users.models import User
from shared.DBTester import DBTester
class TestRSFormCached(DBTester):
''' Testing RSForm Cached wrapper. '''
def setUp(self):
super().setUp()
self.user1 = User.objects.create(username='User1')
self.user2 = User.objects.create(username='User2')
self.assertNotEqual(self.user1, self.user2)
self.schema = RSFormCached.create(title='Test')
def test_constituents(self):
schema1 = RSFormCached.create(title='Test1')
schema2 = RSFormCached.create(title='Test2')
self.assertFalse(schema1.constituentsQ().exists())
self.assertFalse(schema2.constituentsQ().exists())
Constituenta.objects.create(alias='X1', schema=schema1.model, order=0)
Constituenta.objects.create(alias='X2', schema=schema1.model, order=1)
self.assertTrue(schema1.constituentsQ().exists())
self.assertFalse(schema2.constituentsQ().exists())
self.assertEqual(schema1.constituentsQ().count(), 2)
def test_insert_at(self):
x1 = self.schema.insert_new('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, self.schema.model)
x2 = self.schema.insert_new('X2', position=0)
x1.refresh_from_db()
self.assertEqual(x2.order, 0)
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 1)
x3 = self.schema.insert_new('X3', position=3)
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x3.order, 2)
self.assertEqual(x3.schema, self.schema.model)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
x4 = self.schema.insert_new('X4', position=2)
x3.refresh_from_db()
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x4.order, 2)
self.assertEqual(x4.schema, self.schema.model)
self.assertEqual(x3.order, 3)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
def test_insert_at_invalid_position(self):
with self.assertRaises(ValidationError):
self.schema.insert_new('X5', position=-2)
def test_insert_at_invalid_alias(self):
self.schema.insert_new('X1')
with self.assertRaises(ValidationError):
self.schema.insert_new('X1')
def test_insert_at_reorder(self):
self.schema.insert_new('X1')
d1 = self.schema.insert_new('D1')
d2 = self.schema.insert_new('D2', position=0)
d1.refresh_from_db()
self.assertEqual(d1.order, 2)
self.assertEqual(d2.order, 0)
x2 = self.schema.insert_new('X2', position=3)
self.assertEqual(x2.order, 3)
def test_insert_last(self):
x1 = self.schema.insert_new('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, self.schema.model)
x2 = self.schema.insert_new('X2')
self.assertEqual(x2.order, 1)
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 0)
def test_create_cst(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': 'test',
'convention': 'convention'
}
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x3 = self.schema.create_cst(data=data, insert_after=x1)
x2.refresh_from_db()
self.assertEqual(x3.alias, data['alias'])
self.assertEqual(x3.term_raw, data['term_raw'])
self.assertEqual(x3.definition_raw, data['definition_raw'])
self.assertEqual(x2.order, 2)
self.assertEqual(x3.order, 1)
def test_create_cst_resolve(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='@{X2|datv}',
definition_raw='@{X1|datv} @{X2|datv}'
)
x2 = self.schema.create_cst({
'alias': 'X2',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': '@{X1|plur} @{X2|plur}'
})
x1.refresh_from_db()
self.assertEqual(x1.term_resolved, 'слону')
self.assertEqual(x1.definition_resolved, 'слону слону')
self.assertEqual(x2.term_resolved, 'слон')
self.assertEqual(x2.definition_resolved, 'слонам слоны')
def test_insert_copy(self):
x1 = self.schema.insert_new(
alias='X10',
convention='Test'
)
s1 = self.schema.insert_new(
alias='S11',
definition_formal=x1.alias,
definition_raw='@{X10|plur}'
)
result = self.schema.insert_copy([s1, x1], 1)
self.assertEqual(len(result), 2)
s1.refresh_from_db()
self.assertEqual(s1.order, 3)
x2 = result[1]
self.assertEqual(x2.order, 2)
self.assertEqual(x2.alias, 'X11')
self.assertEqual(x2.cst_type, CstType.BASE)
self.assertEqual(x2.convention, x1.convention)
s2 = result[0]
self.assertEqual(s2.order, 1)
self.assertEqual(s2.alias, 'S12')
self.assertEqual(s2.cst_type, CstType.STRUCTURED)
self.assertEqual(s2.definition_formal, x2.alias)
self.assertEqual(s2.definition_raw, '@{X11|plur}')
def test_delete_cst(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new(
alias='D1',
definition_formal='X1 = X2',
definition_raw='@{X1|sing}',
term_raw='@{X2|plur}'
)
self.schema.delete_cst([x1])
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituentsQ().count(), 2)
self.assertEqual(x2.order, 0)
self.assertEqual(d1.order, 1)
self.assertEqual(d1.definition_formal, 'DEL = X2')
self.assertEqual(d1.definition_raw, '@{DEL|sing}')
self.assertEqual(d1.term_raw, '@{X2|plur}')
def test_apply_mapping(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X11')
d1 = self.schema.insert_new(
alias='D1',
definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}',
term_raw='@{X1|plur}'
)
self.schema.apply_mapping({x1.alias: 'X3', x2.alias: 'X4'})
d1.refresh_from_db()
self.assertEqual(d1.definition_formal, 'X3 = X4 = X2', msg='Map IDs in expression')
self.assertEqual(d1.definition_raw, '@{X4|sing}', msg='Map IDs in definition')
self.assertEqual(d1.term_raw, '@{X3|plur}', msg='Map IDs in term')
self.assertEqual(d1.term_resolved, '', msg='Do not run resolve on mapping')
self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping')
def test_substitute(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='Test'
)
x2 = self.schema.insert_new(
alias='X2',
term_raw='Test2'
)
d1 = self.schema.insert_new(
alias='D1',
definition_formal=x1.alias
)
self.schema.substitute([(x1, x2)])
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituentsQ().count(), 2)
self.assertEqual(x2.term_raw, 'Test2')
self.assertEqual(d1.definition_formal, x2.alias)
def test_restore_order(self):
d2 = self.schema.insert_new(
alias='D2',
definition_formal=r'D{ξ∈S1 | 1=1}',
)
d1 = self.schema.insert_new(
alias='D1',
definition_formal=r'Pr1(S1)\X1',
)
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
s1 = self.schema.insert_new(
alias='S1',
definition_formal='(X1×X1)'
)
c1 = self.schema.insert_new('C1')
s2 = self.schema.insert_new(
alias='S2',
definition_formal='(X2×D1)'
)
a1 = self.schema.insert_new(
alias='A1',
definition_formal=r'D3=∅',
)
d3 = self.schema.insert_new(
alias='D3',
definition_formal=r'Pr2(S2)',
)
f1 = self.schema.insert_new(
alias='F1',
definition_formal=r'[α∈ℬ(X1)] D{σ∈S1 | α⊆pr1(σ)}',
)
d4 = self.schema.insert_new(
alias='D4',
definition_formal=r'Pr2(D3)',
)
f2 = self.schema.insert_new(
alias='F2',
definition_formal=r'[α∈ℬ(X1)] X1\α',
)
self.schema.restore_order()
x1.refresh_from_db()
x2.refresh_from_db()
c1.refresh_from_db()
s1.refresh_from_db()
s2.refresh_from_db()
d1.refresh_from_db()
d2.refresh_from_db()
d3.refresh_from_db()
d4.refresh_from_db()
f1.refresh_from_db()
f2.refresh_from_db()
a1.refresh_from_db()
self.assertEqual(x1.order, 0)
self.assertEqual(x2.order, 1)
self.assertEqual(c1.order, 2)
self.assertEqual(s1.order, 3)
self.assertEqual(d1.order, 4)
self.assertEqual(s2.order, 5)
self.assertEqual(d3.order, 6)
self.assertEqual(a1.order, 7)
self.assertEqual(d4.order, 8)
self.assertEqual(d2.order, 9)
self.assertEqual(f1.order, 10)
self.assertEqual(f2.order, 11)
def test_reset_aliases(self):
x1 = self.schema.insert_new(
alias='X11',
term_raw='человек',
term_resolved='человек'
)
x2 = self.schema.insert_new('X21')
d1 = self.schema.insert_new(
alias='D11',
definition_formal='X21=X21',
term_raw='@{X21|sing}',
definition_raw='@{X11|datv}',
definition_resolved='test'
)
self.schema.reset_aliases()
x1.refresh_from_db()
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(x1.alias, 'X1')
self.assertEqual(x2.alias, 'X2')
self.assertEqual(d1.alias, 'D1')
self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test')
def test_on_term_change(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='человек',
term_resolved='человек',
definition_raw='одному @{X1|datv}',
definition_resolved='одному человеку',
)
x2 = self.schema.insert_new(
alias='X2',
term_raw='сильный @{X1|sing}',
term_resolved='сильный человек',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
x3 = self.schema.insert_new(
alias='X3',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
d1 = self.schema.insert_new(
alias='D1',
definition_raw='очень @{X2|sing}',
definition_resolved='очень сильный человек'
)
x1.term_raw = 'слон'
x1.term_resolved = 'слон'
x1.save()
self.schema.after_term_change([x1.pk])
x1.refresh_from_db()
x2.refresh_from_db()
x3.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(x1.term_raw, 'слон')
self.assertEqual(x1.term_resolved, 'слон')
self.assertEqual(x1.definition_resolved, 'одному слону')
self.assertEqual(x2.definition_resolved, x1.definition_resolved)
self.assertEqual(x3.definition_resolved, x1.definition_resolved)
self.assertEqual(d1.definition_resolved, 'очень сильный слон')

View File

@ -73,12 +73,12 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/details', method='get')
def test_details(self):
x1 = self.owned.insert_new(
x1 = self.owned.insert_last(
alias='X1',
term_raw='человек',
term_resolved='человек'
)
x2 = self.owned.insert_new(
x2 = self.owned.insert_last(
alias='X2',
term_raw='@{X1|plur}',
term_resolved='люди'
@ -115,7 +115,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/check-expression', method='post')
def test_check_expression(self):
self.owned.insert_new('X1')
self.owned.insert_last('X1')
data = {'expression': 'X1=X1'}
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], True)
@ -129,7 +129,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/check-constituenta', method='post')
def test_check_constituenta(self):
self.owned.insert_new('X1')
self.owned.insert_last('X1')
data = {'definition_formal': 'X1=X1', 'alias': 'A111', 'cst_type': CstType.AXIOM}
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], True)
@ -141,7 +141,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/check-constituenta', method='post')
def test_check_constituenta_error(self):
self.owned.insert_new('X1')
self.owned.insert_last('X1')
data = {'definition_formal': 'X1=X1', 'alias': 'D111', 'cst_type': CstType.TERM}
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], False)
@ -149,7 +149,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/resolve', method='post')
def test_resolve(self):
x1 = self.owned.insert_new(
x1 = self.owned.insert_last(
alias='X1',
term_resolved='синий слон'
)
@ -191,7 +191,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/export-trs', method='get')
def test_export_trs(self):
schema = RSForm.create(title='Test')
schema.insert_new('X1')
schema.insert_last('X1')
response = self.executeOK(item=schema.model.pk)
self.assertEqual(response.headers['Content-Disposition'], 'attachment; filename=Schema.trs')
with io.BytesIO(response.content) as stream:
@ -206,8 +206,8 @@ class TestRSFormViewset(EndpointTester):
self.executeForbidden(data=data, item=self.unowned_id)
data = {'alias': 'X3'}
self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
self.owned.insert_last('X1')
x2 = self.owned.insert_last('X2')
self.executeBadData(item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
@ -251,11 +251,11 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/substitute', method='patch')
def test_substitute_multiple(self):
self.set_params(item=self.owned_id)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
d1 = self.owned.insert_new('D1')
d2 = self.owned.insert_new('D2')
d3 = self.owned.insert_new(
x1 = self.owned.insert_last('X1')
x2 = self.owned.insert_last('X2')
d1 = self.owned.insert_last('D1')
d2 = self.owned.insert_last('D2')
d3 = self.owned.insert_last(
alias='D3',
definition_formal=r'X1 \ X2'
)
@ -318,19 +318,19 @@ class TestRSFormViewset(EndpointTester):
data = {'items': [1337]}
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
x1 = self.owned.insert_last('X1')
x2 = self.owned.insert_last('X2')
data = {'items': [x1.pk]}
response = self.executeOK(data=data)
x2.refresh_from_db()
self.owned.refresh_from_db()
self.assertEqual(len(response.data['items']), 1)
self.assertEqual(self.owned.constituents().count(), 1)
self.assertEqual(self.owned.constituentsQ().count(), 1)
self.assertEqual(x2.alias, 'X2')
self.assertEqual(x2.order, 0)
x3 = self.unowned.insert_new('X1')
x3 = self.unowned.insert_last('X1')
data = {'items': [x3.pk]}
self.executeBadData(data=data, item=self.owned_id)
@ -342,8 +342,8 @@ class TestRSFormViewset(EndpointTester):
data = {'items': [1337], 'move_to': 0}
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
x1 = self.owned.insert_last('X1')
x2 = self.owned.insert_last('X2')
data = {'items': [x2.pk], 'move_to': 0}
response = self.executeOK(data=data)
@ -353,7 +353,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(x1.order, 1)
self.assertEqual(x2.order, 0)
x3 = self.unowned.insert_new('X1')
x3 = self.unowned.insert_last('X1')
data = {'items': [x3.pk], 'move_to': 0}
self.executeBadData(data=data)
@ -365,9 +365,9 @@ class TestRSFormViewset(EndpointTester):
response = self.executeOK()
self.assertEqual(response.data['id'], self.owned_id)
x2 = self.owned.insert_new('X2')
x1 = self.owned.insert_new('X1')
d11 = self.owned.insert_new('D11')
x2 = self.owned.insert_last('X2')
x1 = self.owned.insert_last('X1')
d11 = self.owned.insert_last('D11')
response = self.executeOK()
x1.refresh_from_db()
@ -388,7 +388,7 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id)
self.owned.model.title = 'Test11'
self.owned.save()
x1 = self.owned.insert_new('X1')
x1 = self.owned.insert_last('X1')
work_dir = os.path.dirname(os.path.abspath(__file__))
with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file:
data = {'file': file, 'load_metadata': False}
@ -397,31 +397,31 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(self.owned.model.title, 'Test11')
self.assertEqual(len(response.data['items']), 25)
self.assertEqual(self.owned.constituents().count(), 25)
self.assertEqual(self.owned.constituentsQ().count(), 25)
self.assertFalse(Constituenta.objects.filter(pk=x1.pk).exists())
@decl_endpoint('/api/rsforms/{item}/produce-structure', method='patch')
def test_produce_structure(self):
self.set_params(item=self.owned_id)
x1 = self.owned.insert_new('X1')
s1 = self.owned.insert_new(
x1 = self.owned.insert_last('X1')
s1 = self.owned.insert_last(
alias='S1',
definition_formal='(X1×X1)'
)
s2 = self.owned.insert_new(
s2 = self.owned.insert_last(
alias='S2',
definition_formal='invalid'
)
s3 = self.owned.insert_new(
s3 = self.owned.insert_last(
alias='S3',
definition_formal='X1×(X1×(X1))×(X1×X1)'
)
a1 = self.owned.insert_new(
a1 = self.owned.insert_last(
alias='A1',
definition_formal='1=1'
)
f1 = self.owned.insert_new(
f1 = self.owned.insert_last(
alias='F10',
definition_formal='[α∈X1, β∈X1] Fi1[{α,β}](S1)'
)
@ -515,7 +515,7 @@ class TestConstituentaAPI(EndpointTester):
data = {'target': self.cst1.pk, 'item_data': {'alias': self.cst3.alias}}
self.executeBadData(data=data, schema=self.owned_id)
d1 = self.owned.insert_new(
d1 = self.owned.insert_last(
alias='D1',
term_raw='@{X1|plur}',
definition_formal='X1'
@ -629,15 +629,15 @@ class TestInlineSynthesis(EndpointTester):
def test_inline_synthesis(self):
ks1_x1 = self.schema1.insert_new('X1', term_raw='KS1X1') # -> delete
ks1_x2 = self.schema1.insert_new('X2', term_raw='KS1X2') # -> X2
ks1_s1 = self.schema1.insert_new('S1', definition_formal='X2', term_raw='KS1S1') # -> S1
ks1_d1 = self.schema1.insert_new('D1', definition_formal=r'S1\X1\X2') # -> D1
ks2_x1 = self.schema2.insert_new('X1', term_raw='KS2X1') # -> delete
ks2_x2 = self.schema2.insert_new('X2', term_raw='KS2X2') # -> X4
ks2_s1 = self.schema2.insert_new('S1', definition_formal='X2×X2', term_raw='KS2S1') # -> S2
ks2_d1 = self.schema2.insert_new('D1', definition_formal=r'S1\X1\X2') # -> D2
ks2_a1 = self.schema2.insert_new('A1', definition_formal='1=1') # -> not included in items
ks1_x1 = self.schema1.insert_last('X1', term_raw='KS1X1') # -> delete
ks1_x2 = self.schema1.insert_last('X2', term_raw='KS1X2') # -> X2
ks1_s1 = self.schema1.insert_last('S1', definition_formal='X2', term_raw='KS1S1') # -> S1
ks1_d1 = self.schema1.insert_last('D1', definition_formal=r'S1\X1\X2') # -> D1
ks2_x1 = self.schema2.insert_last('X1', term_raw='KS2X1') # -> delete
ks2_x2 = self.schema2.insert_last('X2', term_raw='KS2X2') # -> X4
ks2_s1 = self.schema2.insert_last('S1', definition_formal='X2×X2', term_raw='KS2S1') # -> S2
ks2_d1 = self.schema2.insert_last('D1', definition_formal=r'S1\X1\X2') # -> D2
ks2_a1 = self.schema2.insert_last('A1', definition_formal='1=1') # -> not included in items
data = {
'receiver': self.schema1.model.pk,

View File

@ -85,7 +85,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
insert_after = None
else:
insert_after = data['insert_after']
schema = m.RSForm(self._get_item())
schema = m.RSFormCached(self._get_item())
with transaction.atomic():
new_cst = schema.create_cst(data, insert_after)
PropagationFacade.after_create_cst(schema, [new_cst])
@ -115,7 +115,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.CstUpdateSerializer(data=request.data, partial=True, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
schema = m.RSForm(model)
schema = m.RSFormCached(model)
data = serializer.validated_data['item_data']
with transaction.atomic():
old_data = schema.update_cst(cst, data)
@ -199,7 +199,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
status=c.HTTP_400_BAD_REQUEST,
data={f'{cst.pk}': msg.constituentaNoStructure()}
)
schema = m.RSForm(model)
schema = m.RSFormCached(model)
with transaction.atomic():
new_cst = schema.produce_structure(cst, cst_parse)
@ -233,7 +233,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
schema = m.RSForm(model)
schema = m.RSFormCached(model)
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
with transaction.atomic():
for substitution in serializer.validated_data['substitutions']:
@ -268,7 +268,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
)
serializer.is_valid(raise_exception=True)
cst_list: list[m.Constituenta] = serializer.validated_data['items']
schema = m.RSForm(model)
schema = m.RSFormCached(model)
with transaction.atomic():
PropagationFacade.before_delete_cst(schema, cst_list)
schema.delete_cst(cst_list)
@ -321,7 +321,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def reset_aliases(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Recreate all aliases based on order. '''
model = self._get_item()
schema = m.RSForm(model)
schema = m.RSFormCached(model)
schema.reset_aliases()
return Response(
status=c.HTTP_200_OK,
@ -342,7 +342,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def restore_order(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Restore order based on types and Term graph. '''
model = self._get_item()
m.RSForm(model).restore_order()
m.RSFormCached(model).restore_order()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(model).data
@ -437,7 +437,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.ExpressionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
expression = serializer.validated_data['expression']
pySchema = s.PyConceptAdapter(m.RSForm(self.get_object()))
pySchema = s.PyConceptAdapter(pk)
result = pyconcept.check_expression(json.dumps(pySchema.data), expression)
return Response(
status=c.HTTP_200_OK,
@ -462,7 +462,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
alias = serializer.validated_data['alias']
cst_type = cast(m.CstType, serializer.validated_data['cst_type'])
pySchema = s.PyConceptAdapter(m.RSForm(self.get_object()))
pySchema = s.PyConceptAdapter(pk)
result = pyconcept.check_constituenta(json.dumps(pySchema.data), alias, expression, cst_type)
return Response(
status=c.HTTP_200_OK,
@ -484,7 +484,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.TextSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
text = serializer.validated_data['text']
resolver = m.RSForm(self.get_object()).resolver()
resolver = m.RSForm.spawn_resolver(pk)
resolver.resolve(text)
return Response(
status=c.HTTP_200_OK,
@ -504,7 +504,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def export_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Download Exteor compatible file. '''
model = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(model)).data
data = s.generate_trs(model)
file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
filename = utils.filename_for_schema(model.alias)
response = HttpResponse(file, content_type='application/zip')
@ -624,10 +624,11 @@ def inline_synthesis(request: Request) -> HttpResponse:
)
serializer.is_valid(raise_exception=True)
receiver = m.RSForm(serializer.validated_data['receiver'])
receiver = m.RSFormCached(serializer.validated_data['receiver'])
items = cast(list[m.Constituenta], serializer.validated_data['items'])
if len(items) == 0:
items = list(m.RSForm(serializer.validated_data['source']).constituents().order_by('order'))
source = cast(LibraryItem, serializer.validated_data['source'])
items = list(m.Constituenta.objects.filter(schema=source).order_by('order'))
with transaction.atomic():
new_items = receiver.insert_copy(items)