R: Refactoring cache models pt1

This commit is contained in:
Ivan 2025-08-03 11:37:27 +03:00
parent 601ab8ce7b
commit 4794d36b6d
21 changed files with 696 additions and 756 deletions

View File

@ -67,7 +67,7 @@ class TestVersionViews(EndpointTester):
self.executeNotFound(schema=self.unowned_id, version=version_id)
self.owned.model.alias = 'NewName'
self.owned.save()
self.owned.model.save()
self.x1.alias = 'X33'
self.x1.save()
@ -160,7 +160,7 @@ class TestVersionViews(EndpointTester):
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
self.owned.delete_cst([d1])
Constituenta.objects.get(pk=d1.pk).delete()
x3 = self.owned.insert_last('X3')
x1.order = x3.order
x1.convention = 'Test2'

View File

@ -47,6 +47,7 @@ class VersionViewset(
item = version.item
with transaction.atomic():
RSFormSerializer(item).restore_from_version(version.data)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=RSFormParseSerializer(item).data

View File

@ -4,7 +4,7 @@
from django.db.models import QuerySet
from apps.library.models import Editor, LibraryItem, LibraryItemType
from apps.rsform.models import Constituenta, RSFormCached
from apps.rsform.models import Constituenta, OrderManager, RSFormCached
from .Argument import Argument
from .Block import Block
@ -158,6 +158,6 @@ class OperationSchema:
parent=parent
)
receiver.restore_order()
OrderManager(receiver).restore_order()
receiver.reset_aliases()
receiver.resolve_all_text()

View File

@ -13,6 +13,7 @@ from apps.rsform.models import (
INSERT_LAST,
Constituenta,
CstType,
OrderManager,
RSFormCached,
extract_globals,
replace_entities,
@ -21,7 +22,7 @@ from apps.rsform.models import (
from .Argument import Argument
from .Inheritance import Inheritance
from .Operation import Operation
from .Operation import Operation, OperationType
from .Substitution import Substitution
CstMapping = dict[str, Optional[Constituenta]]
@ -53,7 +54,7 @@ class OperationSchemaCached:
def delete_operation(self, target: int, keep_constituents: bool = False):
''' Delete Operation. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
operation = self.cache.operation_by_id[target]
schema = self.cache.get_schema(operation)
children = self.cache.graph.outputs[target]
@ -84,7 +85,8 @@ class OperationSchemaCached:
operation = self.cache.operation_by_id[target]
has_children = len(self.cache.graph.outputs[target]) > 0
old_schema = self.cache.get_schema(operation)
if schema == old_schema:
if schema is None and old_schema is None or \
(schema is not None and old_schema is not None and schema.pk == old_schema.model.pk):
return
if old_schema is not None:
@ -105,7 +107,7 @@ class OperationSchemaCached:
def set_arguments(self, target: int, arguments: list[Operation]) -> None:
''' Set arguments of target Operation. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
operation = self.cache.operation_by_id[target]
processed: list[Operation] = []
updated: list[Argument] = []
@ -136,7 +138,7 @@ class OperationSchemaCached:
def set_substitutions(self, target: int, substitutes: list[dict]) -> None:
''' Clear all arguments for target Operation. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
operation = self.cache.operation_by_id[target]
schema = self.cache.get_schema(operation)
processed: list[dict] = []
@ -226,18 +228,19 @@ class OperationSchemaCached:
parent=parent
)
receiver.restore_order()
OrderManager(receiver).restore_order()
receiver.reset_aliases()
receiver.resolve_all_text()
if len(self.cache.graph.outputs[operation.pk]) > 0:
receiver_items = list(Constituenta.objects.filter(schema=receiver.model).order_by('order'))
self.after_create_cst(receiver, receiver_items)
receiver.model.save(update_fields=['time_update'])
return True
def relocate_down(self, source: RSFormCached, destination: RSFormCached, items: list[Constituenta]):
''' Move list of Constituents to destination Schema inheritor. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
self.cache.insert_schema(source)
self.cache.insert_schema(destination)
operation = self.cache.get_operation(destination.model.pk)
@ -252,7 +255,7 @@ class OperationSchemaCached:
def relocate_up(self, source: RSFormCached, destination: RSFormCached,
items: list[Constituenta]) -> list[Constituenta]:
''' Move list of Constituents upstream to destination Schema. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
self.cache.insert_schema(source)
self.cache.insert_schema(destination)
@ -273,7 +276,7 @@ class OperationSchemaCached:
)
self.cache.insert_inheritance(new_inheritance)
self.after_create_cst(destination, new_items, exclude=[operation.pk])
destination.model.save(update_fields=['time_update'])
return new_items
def after_create_cst(
@ -296,10 +299,9 @@ class OperationSchemaCached:
operation = self.cache.get_operation(source.model.pk)
self._cascade_inherit_cst(operation.pk, source, cst_list, alias_mapping, exclude)
def after_change_cst_type(self, source: RSFormCached, target: Constituenta) -> None:
def after_change_cst_type(self, target: Constituenta) -> None:
''' Trigger cascade resolutions when Constituenta type is changed. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
operation = self.cache.get_operation(target.schema.pk)
self._cascade_change_cst_type(operation.pk, target.pk, cast(CstType, target.cst_type))
def after_update_cst(self, source: RSFormCached, target: Constituenta, data: dict, old_data: dict) -> None:
@ -326,10 +328,9 @@ class OperationSchemaCached:
operation = self.cache.get_operation(source.model.pk)
self._cascade_delete_inherited(operation.pk, target)
def before_substitute(self, source: RSFormCached, substitutions: CstSubstitution) -> None:
def before_substitute(self, schemaID: int, substitutions: CstSubstitution) -> None:
''' Trigger cascade resolutions before Constituents are substituted. '''
self.cache.insert_schema(source)
operation = self.cache.get_operation(source.model.pk)
operation = self.cache.get_operation(schemaID)
self._cascade_before_substitute(substitutions, operation)
def before_delete_arguments(self, target: Operation, arguments: list[Operation]) -> None:
@ -384,7 +385,7 @@ class OperationSchemaCached:
if destination is None:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
new_mapping = self._transform_mapping(mapping, operation, destination)
alias_mapping = OperationSchemaCached._produce_alias_mapping(new_mapping)
insert_where = self._determine_insert_position(items[0].pk, operation, source, destination)
@ -403,7 +404,7 @@ class OperationSchemaCached:
children = self.cache.graph.outputs[operation_id]
if len(children) == 0:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_inheritor(cst_id, child_id)
@ -426,7 +427,7 @@ class OperationSchemaCached:
children = self.cache.graph.outputs[operation]
if len(children) == 0:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_inheritor(cst_id, child_id)
@ -458,7 +459,7 @@ class OperationSchemaCached:
children = self.cache.graph.outputs[operation]
if len(children) == 0:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
for child_id in children:
self._execute_delete_inherited(child_id, target)
@ -479,7 +480,7 @@ class OperationSchemaCached:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_schema(child_operation)
@ -503,7 +504,7 @@ class OperationSchemaCached:
children = self.cache.graph.outputs[operation]
if len(children) == 0:
return
self.cache.ensure_loaded()
self.cache.ensure_loaded_subs()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_schema(child_operation)
@ -697,7 +698,7 @@ class OperationSchemaCached:
original_cst = schema.cache.by_id[original_id]
substitution_cst = schema.cache.by_id[substitution_id]
cst_mapping.append((original_cst, substitution_cst))
self.before_substitute(schema, cst_mapping)
self.before_substitute(schema.model.pk, cst_mapping)
schema.substitute(cst_mapping)
for sub in added:
self.cache.insert_substitution(sub)
@ -723,15 +724,15 @@ class OssCache:
for argument in arguments:
self.graph.add_edge(argument.argument_id, argument.operation_id)
self.is_loaded = False
self.is_loaded_subs = False
self.substitutions: dict[int, list[Substitution]] = {}
self.inheritance: dict[int, list[Inheritance]] = {}
def ensure_loaded(self) -> None:
def ensure_loaded_subs(self) -> None:
''' Ensure cache is fully loaded. '''
if self.is_loaded:
if self.is_loaded_subs:
return
self.is_loaded = True
self.is_loaded_subs = True
for operation in self.operations:
self.inheritance[operation.pk] = []
self.substitutions[operation.pk] = []
@ -754,12 +755,12 @@ class OssCache:
self._insert_new(schema)
return schema
def get_operation(self, schema: int) -> Operation:
def get_operation(self, schemaID: int) -> Operation:
''' Get operation by schema. '''
for operation in self.operations:
if operation.result_id == schema:
if operation.result_id == schemaID and operation.operation_type != OperationType.REFERENCE:
return operation
raise ValueError(f'Operation for schema {schema} not found')
raise ValueError(f'Operation for schema {schemaID} not found')
def get_inheritor(self, parent_cst: int, operation: int) -> Optional[int]:
''' Get child for parent inside target RSFrom. '''
@ -794,7 +795,7 @@ class OssCache:
self.operations.append(operation)
self.operation_by_id[operation.pk] = operation
self.graph.add_node(operation.pk)
if self.is_loaded:
if self.is_loaded_subs:
self.substitutions[operation.pk] = []
self.inheritance[operation.pk] = []
@ -836,7 +837,7 @@ class OssCache:
del self._schema_by_id[target.result_id]
self.operations.remove(self.operation_by_id[operation])
del self.operation_by_id[operation]
if self.is_loaded:
if self.is_loaded_subs:
del self.substitutions[operation]
del self.inheritance[operation]

View File

@ -7,9 +7,9 @@ from apps.rsform.models import Constituenta, RSFormCached
from .OperationSchemaCached import CstSubstitution, OperationSchemaCached
def _get_oss_hosts(item: LibraryItem) -> list[LibraryItem]:
def _get_oss_hosts(schemaID: int) -> list[LibraryItem]:
''' Get all hosts for LibraryItem. '''
return list(LibraryItem.objects.filter(operations__result=item).only('pk'))
return list(LibraryItem.objects.filter(operations__result_id=schemaID).only('pk').distinct())
class PropagationFacade:
@ -19,18 +19,18 @@ class PropagationFacade:
def after_create_cst(source: RSFormCached, new_cst: list[Constituenta],
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions when new constituenta is created. '''
hosts = _get_oss_hosts(source.model)
hosts = _get_oss_hosts(source.model.pk)
for host in hosts:
if exclude is None or host.pk not in exclude:
OperationSchemaCached(host).after_create_cst(source, new_cst)
@staticmethod
def after_change_cst_type(source: RSFormCached, target: Constituenta, exclude: Optional[list[int]] = None) -> None:
def after_change_cst_type(target: Constituenta, exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions when constituenta type is changed. '''
hosts = _get_oss_hosts(source.model)
hosts = _get_oss_hosts(target.schema.pk)
for host in hosts:
if exclude is None or host.pk not in exclude:
OperationSchemaCached(host).after_change_cst_type(source, target)
OperationSchemaCached(host).after_change_cst_type(target)
@staticmethod
def after_update_cst(
@ -41,7 +41,7 @@ class PropagationFacade:
exclude: Optional[list[int]] = None
) -> None:
''' Trigger cascade resolutions when constituenta data is changed. '''
hosts = _get_oss_hosts(source.model)
hosts = _get_oss_hosts(source.model.pk)
for host in hosts:
if exclude is None or host.pk not in exclude:
OperationSchemaCached(host).after_update_cst(source, target, data, old_data)
@ -50,26 +50,28 @@ class PropagationFacade:
def before_delete_cst(source: RSFormCached, target: list[Constituenta],
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions before constituents are deleted. '''
hosts = _get_oss_hosts(source.model)
hosts = _get_oss_hosts(source.model.pk)
for host in hosts:
if exclude is None or host.pk not in exclude:
OperationSchemaCached(host).before_delete_cst(source, target)
@staticmethod
def before_substitute(source: RSFormCached, substitutions: CstSubstitution,
def before_substitute(sourceID: int, substitutions: CstSubstitution,
exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions before constituents are substituted. '''
hosts = _get_oss_hosts(source.model)
if len(substitutions) == 0:
return
hosts = _get_oss_hosts(sourceID)
for host in hosts:
if exclude is None or host.pk not in exclude:
OperationSchemaCached(host).before_substitute(source, substitutions)
OperationSchemaCached(host).before_substitute(sourceID, substitutions)
@staticmethod
def before_delete_schema(item: LibraryItem, exclude: Optional[list[int]] = None) -> None:
''' Trigger cascade resolutions before schema is deleted. '''
if item.item_type != LibraryItemType.RSFORM:
return
hosts = _get_oss_hosts(item)
hosts = _get_oss_hosts(item.pk)
if len(hosts) == 0:
return

View File

@ -75,10 +75,10 @@ class TestChangeAttributes(EndpointTester):
self.executeOK(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.ks1.refresh_from_db()
self.ks2.refresh_from_db()
self.ks3.refresh_from_db()
self.owned.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks2.model.refresh_from_db()
self.ks3.model.refresh_from_db()
self.assertEqual(self.owned.model.owner, self.user3)
self.assertEqual(self.ks1.model.owner, self.user)
self.assertEqual(self.ks2.model.owner, self.user2)
@ -91,10 +91,10 @@ class TestChangeAttributes(EndpointTester):
self.executeOK(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.ks1.refresh_from_db()
self.ks2.refresh_from_db()
self.ks3.refresh_from_db()
self.owned.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks2.model.refresh_from_db()
self.ks3.model.refresh_from_db()
self.assertEqual(self.owned.model.location, data['location'])
self.assertNotEqual(self.ks1.model.location, data['location'])
self.assertNotEqual(self.ks2.model.location, data['location'])
@ -107,10 +107,10 @@ class TestChangeAttributes(EndpointTester):
self.executeOK(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.ks1.refresh_from_db()
self.ks2.refresh_from_db()
self.ks3.refresh_from_db()
self.owned.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks2.model.refresh_from_db()
self.ks3.model.refresh_from_db()
self.assertEqual(self.owned.model.access_policy, data['access_policy'])
self.assertNotEqual(self.ks1.model.access_policy, data['access_policy'])
self.assertNotEqual(self.ks2.model.access_policy, data['access_policy'])
@ -126,10 +126,10 @@ class TestChangeAttributes(EndpointTester):
self.executeOK(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.ks1.refresh_from_db()
self.ks2.refresh_from_db()
self.ks3.refresh_from_db()
self.owned.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks2.model.refresh_from_db()
self.ks3.model.refresh_from_db()
self.assertEqual(list(self.owned.model.getQ_editors()), [self.user3])
self.assertEqual(list(self.ks1.model.getQ_editors()), [self.user, self.user2])
self.assertEqual(list(self.ks2.model.getQ_editors()), [])
@ -162,7 +162,7 @@ class TestChangeAttributes(EndpointTester):
}
response = self.executeOK(data=data, item=self.owned_id)
self.ks3.refresh_from_db()
self.ks3.model.refresh_from_db()
self.assertEqual(self.ks3.model.alias, data['item_data']['alias'])
self.assertEqual(self.ks3.model.title, data['item_data']['title'])
self.assertEqual(self.ks3.model.description, data['item_data']['description'])

View File

@ -279,7 +279,7 @@ class TestChangeOperations(EndpointTester):
}
self.executeOK(data=data, item=self.owned_id)
self.ks1.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks4D2.refresh_from_db()
self.ks5D4.refresh_from_db()
subs1_2 = self.operation4.getQ_substitutions()
@ -373,7 +373,7 @@ class TestChangeOperations(EndpointTester):
def test_execute_middle_operation(self):
self.client.delete(f'/api/library/{self.ks4.model.pk}')
self.operation4.refresh_from_db()
self.ks5.refresh_from_db()
self.ks5.model.refresh_from_db()
self.assertEqual(self.operation4.result, None)
self.assertEqual(self.ks5.constituentsQ().count(), 3)
@ -383,7 +383,7 @@ class TestChangeOperations(EndpointTester):
}
self.executeOK(data=data, item=self.owned_id)
self.operation4.refresh_from_db()
self.ks5.refresh_from_db()
self.ks5.model.refresh_from_db()
self.assertNotEqual(self.operation4.result, None)
self.assertEqual(self.ks5.constituentsQ().count(), 8)
@ -409,9 +409,9 @@ class TestChangeOperations(EndpointTester):
}
self.executeOK(data=data)
ks6.refresh_from_db()
self.ks1.refresh_from_db()
self.ks4.refresh_from_db()
ks6.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks4.model.refresh_from_db()
self.assertEqual(ks6.constituentsQ().count(), ks6_old_count)
self.assertEqual(self.ks1.constituentsQ().count(), ks1_old_count + 1)
@ -439,9 +439,9 @@ class TestChangeOperations(EndpointTester):
}
self.executeOK(data=data)
ks6.refresh_from_db()
self.ks1.refresh_from_db()
self.ks4.refresh_from_db()
ks6.model.refresh_from_db()
self.ks1.model.refresh_from_db()
self.ks4.model.refresh_from_db()
self.ks4D2.refresh_from_db()
self.ks5D4.refresh_from_db()

View File

@ -229,7 +229,7 @@ class TestOssOperations(EndpointTester):
data['target'] = self.operation1.pk
response = self.executeCreated(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.owned.model.refresh_from_db()
new_operation_id = response.data['new_operation']
new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id)
self.assertEqual(new_operation['operation_type'], OperationType.REFERENCE)
@ -261,7 +261,7 @@ class TestOssOperations(EndpointTester):
'substitutions': []
}
response = self.executeCreated(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.owned.model.refresh_from_db()
new_operation_id = response.data['new_operation']
new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id)
arguments = Argument.objects.filter(operation__oss=self.owned.model)
@ -396,7 +396,7 @@ class TestOssOperations(EndpointTester):
self.ks1.model.alias = 'Test42'
self.ks1.model.title = 'Test421'
self.ks1.model.description = 'TestComment42'
self.ks1.save()
self.ks1.model.save()
response = self.executeOK(data=data)
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.result, self.ks1.model)

View File

@ -138,7 +138,7 @@ class TestOssViewset(EndpointTester):
self.toggle_admin(False)
self.executeOK(data=data, item=self.owned_id)
self.owned.refresh_from_db()
self.owned.model.refresh_from_db()
self.assertEqual(OperationSchema.layoutQ(self.owned_id).data, data['data'])
self.executeForbidden(data=data, item=self.unowned_id)

View File

@ -0,0 +1,64 @@
''' Models: RSForm order manager. '''
from .Constituenta import Constituenta, CstType
from .RSFormCached import RSFormCached
from .SemanticInfo import SemanticInfo
class OrderManager:
''' Ordering helper class '''
def __init__(self, schema: RSFormCached):
self._semantic = SemanticInfo(schema)
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
def restore_order(self) -> None:
''' Implement order restoration process. '''
if len(self._items) <= 1:
return
self._fix_kernel()
self._fix_topological()
self._fix_semantic_children()
self._override_order()
def _fix_topological(self) -> None:
sorted_ids = self._semantic.graph.sort_stable([cst.pk for cst in self._items])
sorted_items = [next(cst for cst in self._items if cst.pk == id) for id in sorted_ids]
self._items = sorted_items
def _fix_kernel(self) -> None:
result = [cst for cst in self._items if cst.cst_type == CstType.BASE]
result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT]
kernel = [
cst.pk for cst in self._items if
cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or
self._cst_by_ID[self._semantic.parent(cst.pk)].cst_type == CstType.STRUCTURED
]
kernel = kernel + self._semantic.graph.expand_inputs(kernel)
result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.pk in kernel]
result = result + [cst for cst in self._items if result.count(cst) == 0]
self._items = result
def _fix_semantic_children(self) -> None:
result: list[Constituenta] = []
marked: set[Constituenta] = set()
for cst in self._items:
if cst in marked:
continue
result.append(cst)
children = self._semantic[cst.pk]['children']
if len(children) == 0:
continue
for child in self._items:
if child.pk in children:
marked.add(child)
result.append(child)
self._items = result
def _override_order(self) -> None:
order = 0
for cst in self._items:
cst.order = order
order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -1,7 +1,7 @@
''' Models: RSForm API. '''
# pylint: disable=duplicate-code
from typing import Iterable, Optional
from typing import Iterable, Optional, cast
from cctext import Entity, Resolver, TermForm, split_grams
from django.core.exceptions import ValidationError
@ -10,8 +10,9 @@ from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType, Version
from shared import messages as msg
from .api_RSLanguage import guess_type
from .Constituenta import Constituenta, CstType
from ..graph import Graph
from .api_RSLanguage import get_type_prefix, guess_type
from .Constituenta import Constituenta, CstType, extract_entities, extract_globals
INSERT_LAST: int = -1
DELETED_ALIAS = 'DEL'
@ -31,7 +32,7 @@ class RSForm:
return RSForm(model)
@staticmethod
def spawn_resolver(schemaID: int) -> Resolver:
def resolver_from_schema(schemaID: int) -> Resolver:
''' Create resolver for text references based on schema terms. '''
result = Resolver({})
constituents = Constituenta.objects.filter(schema_id=schemaID).only('alias', 'term_resolved', 'term_forms')
@ -47,13 +48,145 @@ class RSForm:
result.context[cst.alias] = entity
return result
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
@staticmethod
def resolver_from_list(cst_list: Iterable[Constituenta]) -> Resolver:
''' Create resolver for text references based on list of constituents. '''
result = Resolver({})
for cst in cst_list:
entity = Entity(
alias=cst.alias,
nominal=cst.term_resolved,
manual_forms=[
TermForm(text=form['text'], grams=split_grams(form['tags']))
for form in cst.term_forms
]
)
result.context[cst.alias] = entity
return result
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
@staticmethod
def graph_formal(cst_list: Iterable[Constituenta],
cst_by_alias: Optional[dict[str, Constituenta]] = None) -> Graph[int]:
''' Graph based on formal definitions. '''
result: Graph[int] = Graph()
if cst_by_alias is None:
cst_by_alias = {cst.alias: cst for cst in cst_list}
for cst in cst_list:
result.add_node(cst.pk)
for cst in cst_list:
for alias in extract_globals(cst.definition_formal):
child = cst_by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
@staticmethod
def graph_term(cst_list: Iterable[Constituenta],
cst_by_alias: Optional[dict[str, Constituenta]] = None) -> Graph[int]:
''' Graph based on term texts. '''
result: Graph[int] = Graph()
if cst_by_alias is None:
cst_by_alias = {cst.alias: cst for cst in cst_list}
for cst in cst_list:
result.add_node(cst.pk)
for cst in cst_list:
for alias in extract_entities(cst.term_raw):
child = cst_by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
@staticmethod
def graph_text(cst_list: Iterable[Constituenta],
cst_by_alias: Optional[Optional[dict[str, Constituenta]]] = None) -> Graph[int]:
''' Graph based on definition texts. '''
result: Graph[int] = Graph()
if cst_by_alias is None:
cst_by_alias = {cst.alias: cst for cst in cst_list}
for cst in cst_list:
result.add_node(cst.pk)
for cst in cst_list:
for alias in extract_entities(cst.definition_raw):
child = cst_by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
@staticmethod
def save_order(cst_list: Iterable[Constituenta]) -> None:
''' Save order for constituents list. '''
order = 0
changed: list[Constituenta] = []
for cst in cst_list:
if cst.order != order:
cst.order = order
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])
@staticmethod
def shift_positions(start: int, shift: int, cst_list: list[Constituenta]) -> None:
''' Shift positions of constituents. '''
if shift == 0:
return
update_list = cst_list[start:]
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
@staticmethod
def apply_mapping(mapping: dict[str, str], cst_list: Iterable[Constituenta],
change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
update_list: list[Constituenta] = []
for cst in cst_list:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
@staticmethod
def resolve_term_change(cst_list: Iterable[Constituenta], changed: list[int],
cst_by_alias: Optional[Optional[dict[str, Constituenta]]] = None,
cst_by_id: Optional[Optional[dict[int, Constituenta]]] = None,
resolver: Optional[Resolver] = None) -> None:
''' Trigger cascade resolutions when term changes. '''
if cst_by_alias is None:
cst_by_alias = {cst.alias: cst for cst in cst_list}
if cst_by_id is None:
cst_by_id = {cst.pk: cst for cst in cst_list}
graph_terms = RSForm.graph_term(cst_list, cst_by_alias)
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
update_list: list[Constituenta] = []
if resolver is None:
resolver = RSForm.resolver_from_list(cst_list)
if len(expansion) > 0:
for cst_id in graph_terms.topological_order():
if cst_id not in expansion:
continue
cst = cst_by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
if resolved == resolver.context[cst.alias].get_nominal():
continue
cst.set_term_resolved(resolved)
update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
graph_defs = RSForm.graph_text(cst_list, cst_by_alias)
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0:
return
for cst_id in update_defs:
cst = cst_by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def constituentsQ(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
@ -70,7 +203,7 @@ class RSForm:
raise ValidationError(msg.aliasTaken(alias))
if cst_type is None:
cst_type = guess_type(alias)
position = self.constituentsQ().count()
position = Constituenta.objects.filter(schema=self.model).count()
result = Constituenta.objects.create(
schema=self.model,
order=position,
@ -78,7 +211,6 @@ class RSForm:
cst_type=cst_type,
**kwargs
)
self.model.save(update_fields=['time_update'])
return result
def move_cst(self, target: list[Constituenta], destination: int) -> None:
@ -100,25 +232,43 @@ class RSForm:
cst.order = destination + size + count_bot
count_bot += 1
Constituenta.objects.bulk_update(cst_list, ['order'])
self.save(update_fields=['time_update'])
def delete_cst(self, target: Iterable[Constituenta]) -> None:
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
mapping = {cst.alias: DELETED_ALIAS for cst in target}
self.apply_mapping(mapping)
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order()
self.save(update_fields=['time_update'])
def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. '''
bases = cast(dict[str, int], {})
mapping = cast(dict[str, str], {})
for cst_type in CstType.values:
bases[cst_type] = 1
cst_list = Constituenta.objects.filter(schema=self.model).only(
'alias', 'cst_type', 'definition_formal',
'term_raw', 'definition_raw'
).order_by('order')
for cst in cst_list:
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
RSForm.apply_mapping(mapping, cst_list, change_aliases=True)
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
update_list: list[Constituenta] = []
constituents = self.constituentsQ().only('alias', 'definition_formal', 'term_raw', 'definition_raw')
for cst in constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
''' Execute constituenta substitution. '''
if len(substitutions) < 1:
return
mapping = {}
deleted: list[int] = []
replacements: list[int] = []
for original, substitution in substitutions:
mapping[original.alias] = substitution.alias
deleted.append(original.pk)
replacements.append(substitution.pk)
Constituenta.objects.filter(pk__in=deleted).delete()
cst_list = Constituenta.objects.filter(schema=self.model).only(
'alias', 'cst_type', 'definition_formal',
'term_raw', 'definition_raw', 'order', 'term_forms', 'term_resolved'
).order_by('order')
RSForm.save_order(cst_list)
RSForm.apply_mapping(mapping, cst_list, change_aliases=False)
RSForm.resolve_term_change(cst_list, replacements)
def create_version(self, version: str, description: str, data) -> Version:
''' Creates version for current state. '''
@ -128,14 +278,3 @@ class RSForm:
description=description,
data=data
)
def _reset_order(self) -> None:
order = 0
changed: list[Constituenta] = []
cst_list = self.constituentsQ().only('order').order_by('order')
for cst in cst_list:
if cst.order != order:
cst.order = order
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])

View File

@ -4,25 +4,15 @@
from copy import deepcopy
from typing import Iterable, Optional, cast
from cctext import Entity, Resolver, extract_entities
from cctext import Entity, Resolver
from django.core.exceptions import ValidationError
from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType
from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
generate_structure,
get_type_prefix,
guess_type,
infer_template,
is_base_set,
is_functional,
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType, extract_globals
from .api_RSLanguage import generate_structure, get_type_prefix, guess_type
from .Constituenta import Constituenta, CstType
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm
@ -47,83 +37,65 @@ class RSFormCached:
def get_dependant(self, target: Iterable[int]) -> set[int]:
''' Get list of constituents depending on target (only 1st degree). '''
self.cache.ensure_loaded()
result: set[int] = set()
terms = self._graph_term()
formal = self._graph_formal()
definitions = self._graph_text()
terms = RSForm.graph_term(self.cache.constituents, self.cache.by_alias)
formal = RSForm.graph_formal(self.cache.constituents, self.cache.by_alias)
definitions = RSForm.graph_text(self.cache.constituents, self.cache.by_alias)
for cst_id in target:
result.update(formal.outputs[cst_id])
result.update(terms.outputs[cst_id])
result.update(definitions.outputs[cst_id])
return result
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
self.cache.is_loaded = False
def constituentsQ(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.model)
def semantic(self) -> 'SemanticInfo':
''' Access semantic information on constituents. '''
return SemanticInfo(self)
def after_term_change(self, changed: list[int]) -> None:
''' Trigger cascade resolutions when term changes. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
update_list: list[Constituenta] = []
resolver = RSForm.spawn_resolver(self.model.pk)
if len(expansion) > 0:
for cst_id in graph_terms.topological_order():
if cst_id not in expansion:
continue
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
if resolved == resolver.context[cst.alias].get_nominal():
continue
cst.set_term_resolved(resolved)
update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
graph_defs = self._graph_text()
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0:
return
for cst_id in update_defs:
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def insert_last(
self,
alias: str,
cst_type: Optional[CstType] = None,
**kwargs
) -> Constituenta:
''' Insert new constituenta at last position. '''
if cst_type is None:
cst_type = guess_type(alias)
position = Constituenta.objects.filter(schema=self.model).count()
result = Constituenta.objects.create(
schema=self.model,
order=position,
alias=alias,
cst_type=cst_type,
**kwargs
)
self.cache.is_loaded = False
return result
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create constituenta from data. '''
if insert_after is None:
position = INSERT_LAST
self.cache.ensure_loaded_terms()
if insert_after is not None:
position = self.cache.by_id[insert_after.pk].order + 1
else:
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[insert_after.pk]) + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
result.crucial = data.get('crucial', False)
result.convention = data.get('convention', '')
result.definition_formal = data.get('definition_formal', '')
result.term_forms = data.get('term_forms', [])
result.term_raw = data.get('term_raw', '')
result.definition_raw = data.get('definition_raw', '')
position = len(self.cache.constituents)
RSForm.shift_positions(position, 1, self.cache.constituents)
result = Constituenta.objects.create(
schema=self.model,
order=position,
alias=data['alias'],
cst_type=data['cst_type'],
crucial=data.get('crucial', False),
convention=data.get('convention', ''),
definition_formal=data.get('definition_formal', ''),
term_forms=data.get('term_forms', []),
term_raw=data.get('term_raw', ''),
definition_raw=data.get('definition_raw', '')
)
if result.term_raw != '' or result.definition_raw != '':
resolver = RSForm.spawn_resolver(self.model.pk)
resolver = RSForm.resolver_from_list(self.cache.constituents)
if result.term_raw != '':
resolved = resolver.resolve(result.term_raw)
result.term_resolved = resolved
@ -133,33 +105,7 @@ class RSFormCached:
result.save()
self.cache.insert(result)
self.after_term_change([result.pk])
result.refresh_from_db()
return result
def insert_new(
self,
alias: str,
cst_type: Optional[CstType] = None,
position: int = INSERT_LAST,
**kwargs
) -> Constituenta:
''' Insert new constituenta at given position. '''
if Constituenta.objects.filter(schema=self.model, alias=alias):
raise ValidationError(msg.aliasTaken(alias))
position = self._get_insert_position(position)
if cst_type is None:
cst_type = guess_type(alias)
self._shift_positions(position, 1)
result = Constituenta.objects.create(
schema=self.model,
order=position,
alias=alias,
cst_type=cst_type,
**kwargs
)
self.cache.insert(result)
self.save(update_fields=['time_update'])
RSForm.resolve_term_change(self.cache.constituents, [result.pk], self.cache.by_alias, self.cache.by_id)
return result
def insert_copy(
@ -174,8 +120,12 @@ class RSFormCached:
return []
self.cache.ensure_loaded()
position = self._get_insert_position(position)
self._shift_positions(position, count)
lastPosition = len(self.cache.constituents)
if position == INSERT_LAST:
position = lastPosition
else:
position = max(0, min(position, lastPosition))
RSForm.shift_positions(position, count, self.cache.constituents)
indices: dict[str, int] = {}
for (value, _) in CstType.choices:
@ -200,13 +150,12 @@ class RSFormCached:
new_cst = Constituenta.objects.bulk_create(result)
self.cache.insert_multi(new_cst)
self.save(update_fields=['time_update'])
return result
# pylint: disable=too-many-branches
def update_cst(self, target: Constituenta, data: dict) -> dict:
''' Update persistent attributes of a given constituenta. Return old values. '''
self.cache.ensure_loaded()
self.cache.ensure_loaded_terms()
cst = self.cache.by_id.get(target.pk)
if cst is None:
raise ValidationError(msg.constituentaNotInRSform(target.alias))
@ -232,8 +181,10 @@ class RSFormCached:
term_changed = True
old_data['term_forms'] = cst.term_forms
cst.term_forms = data['term_forms']
resolver: Optional[Resolver] = None
if 'definition_raw' in data or 'term_raw' in data:
resolver = RSForm.spawn_resolver(self.model.pk)
resolver = RSForm.resolver_from_list(self.cache.constituents)
if 'term_raw' in data:
if cst.term_raw == data['term_raw']:
del data['term_raw']
@ -254,43 +205,51 @@ class RSFormCached:
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
if term_changed:
self.after_term_change([cst.pk])
self.save(update_fields=['time_update'])
RSForm.resolve_term_change(
self.cache.constituents, [cst.pk],
self.cache.by_alias, self.cache.by_id, resolver
)
return old_data
def delete_cst(self, target: Iterable[Constituenta]) -> None:
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
''' Delete multiple constituents. '''
mapping = {cst.alias: DELETED_ALIAS for cst in target}
self.cache.ensure_loaded()
self.cache.remove_multi(target)
self.apply_mapping(mapping)
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order()
self.save(update_fields=['time_update'])
RSForm.save_order(self.cache.constituents)
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
''' Execute constituenta substitution. '''
if len(substitutions) < 1:
return
self.cache.ensure_loaded_terms()
mapping = {}
deleted: list[Constituenta] = []
replacements: list[Constituenta] = []
replacements: list[int] = []
for original, substitution in substitutions:
mapping[original.alias] = substitution.alias
deleted.append(original)
replacements.append(substitution)
replacements.append(substitution.pk)
self.cache.remove_multi(deleted)
Constituenta.objects.filter(pk__in=[cst.pk for cst in deleted]).delete()
self._reset_order()
RSForm.save_order(self.cache.constituents)
self.apply_mapping(mapping)
self.after_term_change([substitution.pk for substitution in replacements])
def restore_order(self) -> None:
''' Restore order based on types and term graph. '''
manager = _OrderManager(self)
manager.restore_order()
RSForm.resolve_term_change(self.cache.constituents, replacements, self.cache.by_alias, self.cache.by_id)
def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. '''
mapping = self._create_reset_mapping()
self.cache.ensure_loaded()
bases = cast(dict[str, int], {})
mapping = cast(dict[str, str], {})
for cst_type in CstType.values:
bases[cst_type] = 1
for cst in self.cache.constituents:
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
self.apply_mapping(mapping, change_aliases=True)
def change_cst_type(self, target: int, new_type: CstType) -> bool:
@ -310,14 +269,9 @@ class RSFormCached:
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
RSForm.apply_mapping(mapping, self.cache.constituents, change_aliases)
if change_aliases:
self.cache.reset_aliases()
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
self.cache.reload_aliases()
def apply_partial_mapping(self, mapping: dict[str, str], target: list[int]) -> None:
''' Apply rename mapping to target constituents. '''
@ -328,12 +282,11 @@ class RSFormCached:
if cst.apply_mapping(mapping):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_formal', 'term_raw', 'definition_raw'])
self.save(update_fields=['time_update'])
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
graph_terms = RSForm.graph_term(self.cache.constituents, self.cache.by_alias)
resolver = Resolver({})
update_list: list[Constituenta] = []
for cst_id in graph_terms.topological_order():
@ -363,7 +316,8 @@ class RSFormCached:
self.cache.ensure_loaded()
position = self.cache.constituents.index(self.cache.by_id[target.id]) + 1
self._shift_positions(position, count_new)
RSForm.shift_positions(position, count_new, self.cache.constituents)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self._get_max_index(cst_type) + 1
@ -381,12 +335,10 @@ class RSFormCached:
position = position + 1
self.cache.insert_multi(result)
self.save(update_fields=['time_update'])
return result
def _get_max_index(self, cst_type: str) -> int:
''' Get maximum alias index for specific CstType. '''
result: int = 0
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = Constituenta.objects \
@ -394,95 +346,12 @@ class RSFormCached:
.only('alias')
else:
cst_list = [cst for cst in self.cache.constituents if cst.cst_type == cst_type]
result: int = 0
for cst in cst_list:
result = max(result, int(cst.alias[1:]))
return result
def _create_reset_mapping(self) -> dict[str, str]:
bases = cast(dict[str, int], {})
mapping = cast(dict[str, str], {})
for cst_type in CstType.values:
bases[cst_type] = 1
cst_list = self.constituentsQ().order_by('order')
for cst in cst_list:
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
return mapping
def _shift_positions(self, start: int, shift: int) -> None:
if shift == 0:
return
self.cache.ensure_loaded()
update_list = self.cache.constituents[start:]
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
def _get_insert_position(self, position: int) -> int:
if position < 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition())
lastPosition = self.constituentsQ().count()
if position == INSERT_LAST:
return lastPosition
else:
return max(0, min(position, lastPosition))
def _reset_order(self) -> None:
order = 0
changed: list[Constituenta] = []
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituentsQ().only('order').order_by('order')
else:
cst_list = self.cache.constituents
for cst in cst_list:
if cst.order != order:
cst.order = order
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_globals(cst.definition_formal):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.term_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in self.cache.constituents:
for alias in extract_entities(cst.definition_raw):
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
return result
class _RSFormCache:
''' Cache for RSForm constituents. '''
@ -493,27 +362,45 @@ class _RSFormCache:
self.by_id: dict[int, Constituenta] = {}
self.by_alias: dict[str, Constituenta] = {}
self.is_loaded = False
def reload(self) -> None:
self.constituents = list(
self._schema.constituentsQ().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
self.is_loaded_terms = False
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
self.constituents = list(
self._schema.constituentsQ().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
self.is_loaded_terms = False
def reset_aliases(self) -> None:
def ensure_loaded_terms(self) -> None:
if not self.is_loaded_terms:
self.constituents = list(
self._schema.constituentsQ().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw',
'term_forms',
'term_resolved'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
self.is_loaded_terms = True
def reload_aliases(self) -> None:
self.by_alias = {cst.alias: cst for cst in self.constituents}
def clear(self) -> None:
@ -521,6 +408,7 @@ class _RSFormCache:
self.by_id = {}
self.by_alias = {}
self.is_loaded = False
self.is_loaded_terms = False
def insert(self, cst: Constituenta) -> None:
if self.is_loaded:
@ -547,186 +435,3 @@ class _RSFormCache:
self.constituents.remove(self.by_id[cst.pk])
del self.by_id[cst.pk]
del self.by_alias[cst.alias]
class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSFormCached):
schema.cache.ensure_loaded()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
self._cst_by_alias = schema.cache.by_alias
self.info = {
cst.pk: {
'is_simple': False,
'is_template': False,
'parent': cst.pk,
'children': []
}
for cst in schema.cache.constituents
}
self._calculate_attributes()
def __getitem__(self, key: int) -> dict:
return self.info[key]
def is_simple_expression(self, target: int) -> bool:
''' Access "is_simple" attribute. '''
return cast(bool, self.info[target]['is_simple'])
def is_template(self, target: int) -> bool:
''' Access "is_template" attribute. '''
return cast(bool, self.info[target]['is_template'])
def parent(self, target: int) -> int:
''' Access "parent" attribute. '''
return cast(int, self.info[target]['parent'])
def children(self, target: int) -> list[int]:
''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self) -> None:
for cst_id in self._graph.topological_order():
cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst)
if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED:
continue
parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent
if parent != cst_id:
cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
return False
dependencies = self._graph.inputs[target.pk]
has_complex_dependency = any(
self.is_template(cst_id) and
not self.is_simple_expression(cst_id) for cst_id in dependencies
)
if has_complex_dependency:
return False
if is_functional(target.cst_type):
return is_simple_expression(split_template(target.definition_formal)['body'])
else:
return is_simple_expression(target.definition_formal)
def _infer_parent(self, target: Constituenta) -> int:
sources = self._extract_sources(target)
if len(sources) != 1:
return target.pk
parent_id = next(iter(sources))
parent = self._cst_by_ID[parent_id]
if is_base_set(parent.cst_type):
return target.pk
return parent_id
def _extract_sources(self, target: Constituenta) -> set[int]:
sources: set[int] = set()
if not is_functional(target.cst_type):
for parent_id in self._graph.inputs[target.pk]:
parent_info = self[parent_id]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
return sources
expression = split_template(target.definition_formal)
body_dependencies = extract_globals(expression['body'])
for alias in body_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
if self._need_check_head(sources, expression['head']):
head_dependencies = extract_globals(expression['head'])
for alias in head_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not is_base_set(parent.cst_type) and \
(not parent_info['is_template'] or not parent_info['is_simple']):
sources.add(parent_info['parent'])
return sources
def _need_check_head(self, sources: set[int], head: str) -> bool:
if len(sources) == 0:
return True
elif len(sources) != 1:
return False
else:
base = self._cst_by_ID[next(iter(sources))]
return not is_functional(base.cst_type) or \
split_template(base.definition_formal)['head'] != head
class _OrderManager:
''' Ordering helper class '''
def __init__(self, schema: RSFormCached):
self._semantic = schema.semantic()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
def restore_order(self) -> None:
''' Implement order restoration process. '''
if len(self._items) <= 1:
return
self._fix_kernel()
self._fix_topological()
self._fix_semantic_children()
self._save_order()
def _fix_topological(self) -> None:
sorted_ids = self._graph.sort_stable([cst.pk for cst in self._items])
sorted_items = [next(cst for cst in self._items if cst.pk == id) for id in sorted_ids]
self._items = sorted_items
def _fix_kernel(self) -> None:
result = [cst for cst in self._items if cst.cst_type == CstType.BASE]
result = result + [cst for cst in self._items if cst.cst_type == CstType.CONSTANT]
kernel = [
cst.pk for cst in self._items if
cst.cst_type in [CstType.STRUCTURED, CstType.AXIOM] or
self._cst_by_ID[self._semantic.parent(cst.pk)].cst_type == CstType.STRUCTURED
]
kernel = kernel + self._graph.expand_inputs(kernel)
result = result + [cst for cst in self._items if result.count(cst) == 0 and cst.pk in kernel]
result = result + [cst for cst in self._items if result.count(cst) == 0]
self._items = result
def _fix_semantic_children(self) -> None:
result: list[Constituenta] = []
marked: set[Constituenta] = set()
for cst in self._items:
if cst in marked:
continue
result.append(cst)
children = self._semantic[cst.pk]['children']
if len(children) == 0:
continue
for child in self._items:
if child.pk in children:
marked.add(child)
result.append(child)
self._items = result
def _save_order(self) -> None:
order = 0
for cst in self._items:
cst.order = order
order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -0,0 +1,136 @@
''' Models: RSForm semantic information. '''
from typing import cast
from .api_RSLanguage import (
infer_template,
is_base_set,
is_functional,
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType, extract_globals
from .RSForm import RSForm
from .RSFormCached import RSFormCached
class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSFormCached):
schema.cache.ensure_loaded()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
self._cst_by_alias = schema.cache.by_alias
self.graph = RSForm.graph_formal(schema.cache.constituents, schema.cache.by_alias)
self.info = {
cst.pk: {
'is_simple': False,
'is_template': False,
'parent': cst.pk,
'children': []
}
for cst in schema.cache.constituents
}
self._calculate_attributes()
def __getitem__(self, key: int) -> dict:
return self.info[key]
def is_simple_expression(self, target: int) -> bool:
''' Access "is_simple" attribute. '''
return cast(bool, self.info[target]['is_simple'])
def is_template(self, target: int) -> bool:
''' Access "is_template" attribute. '''
return cast(bool, self.info[target]['is_template'])
def parent(self, target: int) -> int:
''' Access "parent" attribute. '''
return cast(int, self.info[target]['parent'])
def children(self, target: int) -> list[int]:
''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self) -> None:
for cst_id in self.graph.topological_order():
cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
self.info[cst_id]['is_simple'] = self._infer_simple_expression(cst)
if not self.info[cst_id]['is_simple'] or cst.cst_type == CstType.STRUCTURED:
continue
parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent
if parent != cst_id:
cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
return False
dependencies = self.graph.inputs[target.pk]
has_complex_dependency = any(
self.is_template(cst_id) and
not self.is_simple_expression(cst_id) for cst_id in dependencies
)
if has_complex_dependency:
return False
if is_functional(target.cst_type):
return is_simple_expression(split_template(target.definition_formal)['body'])
else:
return is_simple_expression(target.definition_formal)
def _infer_parent(self, target: Constituenta) -> int:
sources = self._extract_sources(target)
if len(sources) != 1:
return target.pk
parent_id = next(iter(sources))
parent = self._cst_by_ID[parent_id]
if is_base_set(parent.cst_type):
return target.pk
return parent_id
def _extract_sources(self, target: Constituenta) -> set[int]:
sources: set[int] = set()
if not is_functional(target.cst_type):
for parent_id in self.graph.inputs[target.pk]:
parent_info = self[parent_id]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
return sources
expression = split_template(target.definition_formal)
body_dependencies = extract_globals(expression['body'])
for alias in body_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not parent_info['is_template'] or not parent_info['is_simple']:
sources.add(parent_info['parent'])
if self._need_check_head(sources, expression['head']):
head_dependencies = extract_globals(expression['head'])
for alias in head_dependencies:
parent = self._cst_by_alias.get(alias)
if not parent:
continue
parent_info = self[parent.pk]
if not is_base_set(parent.cst_type) and \
(not parent_info['is_template'] or not parent_info['is_simple']):
sources.add(parent_info['parent'])
return sources
def _need_check_head(self, sources: set[int], head: str) -> bool:
if len(sources) == 0:
return True
elif len(sources) != 1:
return False
else:
base = self._cst_by_ID[next(iter(sources))]
return not is_functional(base.cst_type) or \
split_template(base.definition_formal)['head'] != head

View File

@ -1,5 +1,6 @@
''' Django: Models. '''
from .Constituenta import Constituenta, CstType, extract_globals, replace_entities, replace_globals
from .OrderManager import OrderManager
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm
from .RSFormCached import RSFormCached, SemanticInfo
from .RSFormCached import RSFormCached

View File

@ -197,7 +197,8 @@ class RSFormSerializer(StrictModelSerializer):
def restore_from_version(self, data: dict):
''' Load data from version. '''
schema = RSForm(cast(LibraryItem, self.instance))
instance = cast(LibraryItem, self.instance)
schema = RSForm(instance)
items: list[dict] = data['items']
ids: list[int] = [item['id'] for item in items]
processed: list[int] = []
@ -207,7 +208,7 @@ class RSFormSerializer(StrictModelSerializer):
cst.delete()
else:
cst_data = next(x for x in items if x['id'] == cst.pk)
cst_data['schema'] = cast(LibraryItem, self.instance).pk
cst_data['schema'] = instance.pk
new_cst = CstBaseSerializer(data=cst_data)
new_cst.is_valid(raise_exception=True)
new_cst.validated_data['order'] = ids.index(cst.pk)
@ -222,7 +223,7 @@ class RSFormSerializer(StrictModelSerializer):
cst = schema.insert_last(cst_data['alias'])
old_id = cst_data['id']
cst_data['id'] = cst.pk
cst_data['schema'] = cast(LibraryItem, self.instance).pk
cst_data['schema'] = instance.pk
new_cst = CstBaseSerializer(data=cst_data)
new_cst.is_valid(raise_exception=True)
new_cst.validated_data['order'] = ids.index(old_id)

View File

@ -147,7 +147,6 @@ class RSFormTRSSerializer(serializers.Serializer):
access_policy=validated_data['access_policy'],
location=validated_data['location']
)
self.instance.save()
order = 0
for cst_data in validated_data['items']:
cst = Constituenta(
@ -200,7 +199,7 @@ class RSFormTRSSerializer(serializers.Serializer):
prev_cst.delete()
instance.resolve_all_text()
instance.save()
instance.model.save()
return instance
@staticmethod

View File

@ -30,7 +30,6 @@ class TestRSForm(DBTester):
self.assertFalse(schema2.constituentsQ().exists())
self.assertEqual(schema1.constituentsQ().count(), 2)
def test_insert_at_invalid_alias(self):
self.schema.insert_last('X1')
with self.assertRaises(ValidationError):
@ -47,46 +46,32 @@ class TestRSForm(DBTester):
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 0)
def test_delete_cst(self):
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
def test_reset_aliases(self):
x1 = self.schema.insert_last(
alias='X11',
term_raw='человек',
term_resolved='человек'
)
x2 = self.schema.insert_last('X21')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X2',
definition_raw='@{X1|sing}',
term_raw='@{X2|plur}'
alias='D11',
definition_formal='X21=X21',
term_raw='@{X21|sing}',
definition_raw='@{X11|datv}',
definition_resolved='test'
)
self.schema.delete_cst([x1])
self.schema.reset_aliases()
x1.refresh_from_db()
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituentsQ().count(), 2)
self.assertEqual(x2.order, 0)
self.assertEqual(d1.order, 1)
self.assertEqual(d1.definition_formal, 'DEL = X2')
self.assertEqual(d1.definition_raw, '@{DEL|sing}')
self.assertEqual(d1.term_raw, '@{X2|plur}')
def test_apply_mapping(self):
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X11')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}',
term_raw='@{X1|plur}'
)
self.schema.apply_mapping({x1.alias: 'X3', x2.alias: 'X4'})
d1.refresh_from_db()
self.assertEqual(d1.definition_formal, 'X3 = X4 = X2', msg='Map IDs in expression')
self.assertEqual(d1.definition_raw, '@{X4|sing}', msg='Map IDs in definition')
self.assertEqual(d1.term_raw, '@{X3|plur}', msg='Map IDs in term')
self.assertEqual(d1.term_resolved, '', msg='Do not run resolve on mapping')
self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping')
self.assertEqual(x1.alias, 'X1')
self.assertEqual(x2.alias, 'X2')
self.assertEqual(d1.alias, 'D1')
self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test')
def test_move_cst(self):
x1 = self.schema.insert_last('X1')

View File

@ -1,7 +1,7 @@
''' Testing models: api_RSForm. '''
from django.forms import ValidationError
from apps.rsform.models import Constituenta, CstType, RSFormCached
from apps.rsform.models import Constituenta, CstType, OrderManager, RSFormCached
from apps.users.models import User
from shared.DBTester import DBTester
@ -31,69 +31,11 @@ class TestRSFormCached(DBTester):
self.assertEqual(schema1.constituentsQ().count(), 2)
def test_insert_at(self):
x1 = self.schema.insert_new('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, self.schema.model)
x2 = self.schema.insert_new('X2', position=0)
x1.refresh_from_db()
self.assertEqual(x2.order, 0)
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 1)
x3 = self.schema.insert_new('X3', position=3)
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x3.order, 2)
self.assertEqual(x3.schema, self.schema.model)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
x4 = self.schema.insert_new('X4', position=2)
x3.refresh_from_db()
x2.refresh_from_db()
x1.refresh_from_db()
self.assertEqual(x4.order, 2)
self.assertEqual(x4.schema, self.schema.model)
self.assertEqual(x3.order, 3)
self.assertEqual(x2.order, 0)
self.assertEqual(x1.order, 1)
def test_insert_at_invalid_position(self):
with self.assertRaises(ValidationError):
self.schema.insert_new('X5', position=-2)
def test_insert_at_invalid_alias(self):
self.schema.insert_new('X1')
with self.assertRaises(ValidationError):
self.schema.insert_new('X1')
def test_insert_at_reorder(self):
self.schema.insert_new('X1')
d1 = self.schema.insert_new('D1')
d2 = self.schema.insert_new('D2', position=0)
d1.refresh_from_db()
self.assertEqual(d1.order, 2)
self.assertEqual(d2.order, 0)
x2 = self.schema.insert_new('X2', position=3)
self.assertEqual(x2.order, 3)
def test_insert_last(self):
x1 = self.schema.insert_new('X1')
x1 = self.schema.insert_last('X1')
self.assertEqual(x1.order, 0)
self.assertEqual(x1.schema, self.schema.model)
x2 = self.schema.insert_new('X2')
self.assertEqual(x2.order, 1)
self.assertEqual(x2.schema, self.schema.model)
self.assertEqual(x1.order, 0)
def test_create_cst(self):
data = {
@ -104,8 +46,8 @@ class TestRSFormCached(DBTester):
'convention': 'convention'
}
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
x3 = self.schema.create_cst(data=data, insert_after=x1)
x2.refresh_from_db()
@ -117,7 +59,7 @@ class TestRSFormCached(DBTester):
def test_create_cst_resolve(self):
x1 = self.schema.insert_new(
x1 = self.schema.insert_last(
alias='X1',
term_raw='@{X2|datv}',
definition_raw='@{X1|datv} @{X2|datv}'
@ -136,11 +78,11 @@ class TestRSFormCached(DBTester):
def test_insert_copy(self):
x1 = self.schema.insert_new(
x1 = self.schema.insert_last(
alias='X10',
convention='Test'
)
s1 = self.schema.insert_new(
s1 = self.schema.insert_last(
alias='S11',
definition_formal=x1.alias,
definition_raw='@{X10|plur}'
@ -167,9 +109,9 @@ class TestRSFormCached(DBTester):
def test_delete_cst(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new(
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X2',
definition_raw='@{X1|sing}',
@ -188,9 +130,9 @@ class TestRSFormCached(DBTester):
def test_apply_mapping(self):
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X11')
d1 = self.schema.insert_new(
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X11')
d1 = self.schema.insert_last(
alias='D1',
definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}',
@ -207,15 +149,15 @@ class TestRSFormCached(DBTester):
def test_substitute(self):
x1 = self.schema.insert_new(
x1 = self.schema.insert_last(
alias='X1',
term_raw='Test'
)
x2 = self.schema.insert_new(
x2 = self.schema.insert_last(
alias='X2',
term_raw='Test2'
)
d1 = self.schema.insert_new(
d1 = self.schema.insert_last(
alias='D1',
definition_formal=x1.alias
)
@ -229,47 +171,47 @@ class TestRSFormCached(DBTester):
def test_restore_order(self):
d2 = self.schema.insert_new(
d2 = self.schema.insert_last(
alias='D2',
definition_formal=r'D{ξ∈S1 | 1=1}',
)
d1 = self.schema.insert_new(
d1 = self.schema.insert_last(
alias='D1',
definition_formal=r'Pr1(S1)\X1',
)
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
s1 = self.schema.insert_new(
x1 = self.schema.insert_last('X1')
x2 = self.schema.insert_last('X2')
s1 = self.schema.insert_last(
alias='S1',
definition_formal='(X1×X1)'
)
c1 = self.schema.insert_new('C1')
s2 = self.schema.insert_new(
c1 = self.schema.insert_last('C1')
s2 = self.schema.insert_last(
alias='S2',
definition_formal='(X2×D1)'
)
a1 = self.schema.insert_new(
a1 = self.schema.insert_last(
alias='A1',
definition_formal=r'D3=∅',
)
d3 = self.schema.insert_new(
d3 = self.schema.insert_last(
alias='D3',
definition_formal=r'Pr2(S2)',
)
f1 = self.schema.insert_new(
f1 = self.schema.insert_last(
alias='F1',
definition_formal=r'[α∈ℬ(X1)] D{σ∈S1 | α⊆pr1(σ)}',
)
d4 = self.schema.insert_new(
d4 = self.schema.insert_last(
alias='D4',
definition_formal=r'Pr2(D3)',
)
f2 = self.schema.insert_new(
f2 = self.schema.insert_last(
alias='F2',
definition_formal=r'[α∈ℬ(X1)] X1\α',
)
self.schema.restore_order()
OrderManager(self.schema).restore_order()
x1.refresh_from_db()
x2.refresh_from_db()
c1.refresh_from_db()
@ -298,13 +240,13 @@ class TestRSFormCached(DBTester):
def test_reset_aliases(self):
x1 = self.schema.insert_new(
x1 = self.schema.insert_last(
alias='X11',
term_raw='человек',
term_resolved='человек'
)
x2 = self.schema.insert_new('X21')
d1 = self.schema.insert_new(
x2 = self.schema.insert_last('X21')
d1 = self.schema.insert_last(
alias='D11',
definition_formal='X21=X21',
term_raw='@{X21|sing}',
@ -323,47 +265,3 @@ class TestRSFormCached(DBTester):
self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test')
def test_on_term_change(self):
x1 = self.schema.insert_new(
alias='X1',
term_raw='человек',
term_resolved='человек',
definition_raw='одному @{X1|datv}',
definition_resolved='одному человеку',
)
x2 = self.schema.insert_new(
alias='X2',
term_raw='сильный @{X1|sing}',
term_resolved='сильный человек',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
x3 = self.schema.insert_new(
alias='X3',
definition_raw=x1.definition_raw,
definition_resolved=x1.definition_resolved
)
d1 = self.schema.insert_new(
alias='D1',
definition_raw='очень @{X2|sing}',
definition_resolved='очень сильный человек'
)
x1.term_raw = 'слон'
x1.term_resolved = 'слон'
x1.save()
self.schema.after_term_change([x1.pk])
x1.refresh_from_db()
x2.refresh_from_db()
x3.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(x1.term_raw, 'слон')
self.assertEqual(x1.term_resolved, 'слон')
self.assertEqual(x1.definition_resolved, 'одному слону')
self.assertEqual(x2.definition_resolved, x1.definition_resolved)
self.assertEqual(x3.definition_resolved, x1.definition_resolved)
self.assertEqual(d1.definition_resolved, 'очень сильный слон')

View File

@ -324,7 +324,7 @@ class TestRSFormViewset(EndpointTester):
data = {'items': [x1.pk]}
response = self.executeOK(data=data)
x2.refresh_from_db()
self.owned.refresh_from_db()
self.owned.model.refresh_from_db()
self.assertEqual(len(response.data['items']), 1)
self.assertEqual(self.owned.constituentsQ().count(), 1)
self.assertEqual(x2.alias, 'X2')
@ -387,13 +387,13 @@ class TestRSFormViewset(EndpointTester):
def test_load_trs(self):
self.set_params(item=self.owned_id)
self.owned.model.title = 'Test11'
self.owned.save()
self.owned.model.save()
x1 = self.owned.insert_last('X1')
work_dir = os.path.dirname(os.path.abspath(__file__))
with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file:
data = {'file': file, 'load_metadata': False}
response = self.client.patch(self.endpoint, data=data, format='multipart')
self.owned.refresh_from_db()
self.owned.model.refresh_from_db()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(self.owned.model.title, 'Test11')
self.assertEqual(len(response.data['items']), 25)

View File

@ -78,6 +78,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['post'], url_path='create-cst')
def create_cst(self, request: Request, pk) -> HttpResponse:
''' Create Constituenta. '''
item = self._get_item()
serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
@ -85,15 +86,16 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
insert_after = None
else:
insert_after = data['insert_after']
schema = m.RSFormCached(self._get_item())
schema = m.RSFormCached(item)
with transaction.atomic():
new_cst = schema.create_cst(data, insert_after)
PropagationFacade.after_create_cst(schema, [new_cst])
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_201_CREATED,
data={
'new_cst': s.CstInfoSerializer(new_cst).data,
'schema': s.RSFormParseSerializer(schema.model).data
'schema': s.RSFormParseSerializer(item).data
}
)
@ -111,11 +113,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='update-cst')
def update_cst(self, request: Request, pk) -> HttpResponse:
''' Update persistent attributes of a given constituenta. '''
model = self._get_item()
serializer = s.CstUpdateSerializer(data=request.data, partial=True, context={'schema': model})
item = self._get_item()
serializer = s.CstUpdateSerializer(data=request.data, partial=True, context={'schema': item})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
schema = m.RSFormCached(model)
schema = m.RSFormCached(item)
data = serializer.validated_data['item_data']
with transaction.atomic():
old_data = schema.update_cst(cst, data)
@ -129,13 +131,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
cst.cst_type = data['cst_type']
cst.save()
schema.apply_mapping(mapping=mapping, change_aliases=False)
schema.save()
cst.refresh_from_db()
if changed_type:
PropagationFacade.after_change_cst_type(schema, cst)
PropagationFacade.after_change_cst_type(cst)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -152,8 +154,8 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='update-crucial')
def update_crucial(self, request: Request, pk) -> HttpResponse:
''' Update crucial attributes of a given list of constituents. '''
model = self._get_item()
serializer = s.CrucialUpdateSerializer(data=request.data, partial=True, context={'schema': model})
item = self._get_item()
serializer = s.CrucialUpdateSerializer(data=request.data, partial=True, context={'schema': item})
serializer.is_valid(raise_exception=True)
value: bool = serializer.validated_data['value']
@ -161,11 +163,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
for cst in serializer.validated_data['target']:
cst.crucial = value
cst.save(update_fields=['crucial'])
model.save(update_fields=['time_update'])
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -182,9 +184,9 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='produce-structure')
def produce_structure(self, request: Request, pk) -> HttpResponse:
''' Produce a term for every element of the target constituenta typification. '''
model = self._get_item()
item = self._get_item()
serializer = s.CstTargetSerializer(data=request.data, context={'schema': model})
serializer = s.CstTargetSerializer(data=request.data, context={'schema': item})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
if cst.cst_type not in [m.CstType.FUNCTION, m.CstType.STRUCTURED, m.CstType.TERM]:
@ -192,23 +194,24 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
f'{cst.pk}': msg.constituentaNoStructure()
})
schema_details = s.RSFormParseSerializer(model).data['items']
schema_details = s.RSFormParseSerializer(item).data['items']
cst_parse = next(item for item in schema_details if item['id'] == cst.pk)['parse']
if not cst_parse['typification']:
return Response(
status=c.HTTP_400_BAD_REQUEST,
data={f'{cst.pk}': msg.constituentaNoStructure()}
)
schema = m.RSFormCached(model)
schema = m.RSFormCached(item)
with transaction.atomic():
new_cst = schema.produce_structure(cst, cst_parse)
PropagationFacade.after_create_cst(schema, new_cst)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data={
'cst_list': [cst.pk for cst in new_cst],
'schema': s.RSFormParseSerializer(schema.model).data
'schema': s.RSFormParseSerializer(item).data
}
)
@ -227,24 +230,25 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='substitute')
def substitute(self, request: Request, pk) -> HttpResponse:
''' Substitute occurrences of constituenta with another one. '''
model = self._get_item()
item = self._get_item()
serializer = s.CstSubstituteSerializer(
data=request.data,
context={'schema': model}
context={'schema': item}
)
serializer.is_valid(raise_exception=True)
schema = m.RSFormCached(model)
schema = m.RSForm(item)
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
with transaction.atomic():
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
substitutions.append((original, replacement))
PropagationFacade.before_substitute(schema, substitutions)
PropagationFacade.before_substitute(item.pk, substitutions)
schema.substitute(substitutions)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -261,17 +265,18 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='delete-multiple-cst')
def delete_multiple_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete multiple Constituents. '''
model = self._get_item()
item = self._get_item()
serializer = s.CstListSerializer(
data=request.data,
context={'schema': model}
context={'schema': item}
)
serializer.is_valid(raise_exception=True)
cst_list: list[m.Constituenta] = serializer.validated_data['items']
schema = m.RSFormCached(model)
schema = m.RSFormCached(item)
with transaction.atomic():
PropagationFacade.before_delete_cst(schema, cst_list)
schema.delete_cst(cst_list)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.model).data
@ -291,20 +296,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='move-cst')
def move_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Move multiple Constituents. '''
model = self._get_item()
item = self._get_item()
serializer = s.CstMoveSerializer(
data=request.data,
context={'schema': model}
context={'schema': item}
)
serializer.is_valid(raise_exception=True)
schema = m.RSForm(item)
with transaction.atomic():
m.RSForm(model).move_cst(
schema.move_cst(
target=serializer.validated_data['items'],
destination=serializer.validated_data['move_to']
)
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -320,12 +327,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='reset-aliases')
def reset_aliases(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Recreate all aliases based on order. '''
model = self._get_item()
schema = m.RSFormCached(model)
schema.reset_aliases()
item = self._get_item()
schema = m.RSForm(item)
with transaction.atomic():
schema.reset_aliases()
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -341,11 +350,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='restore-order')
def restore_order(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Restore order based on types and Term graph. '''
model = self._get_item()
m.RSFormCached(model).restore_order()
item = self._get_item()
with transaction.atomic():
m.OrderManager(m.RSFormCached(item)).restore_order()
item.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(model).data
data=s.RSFormParseSerializer(item).data
)
@extend_schema(
@ -365,7 +376,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
input_serializer = s.RSFormUploadSerializer(data=request.data)
input_serializer.is_valid(raise_exception=True)
model = self._get_item()
item = self._get_item()
load_metadata = input_serializer.validated_data['load_metadata']
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
if data is None:
@ -373,7 +384,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
status=c.HTTP_400_BAD_REQUEST,
data={'file': msg.exteorFileCorrupted()}
)
data['id'] = model.pk
data['id'] = item.pk
serializer = s.RSFormTRSSerializer(
data=data,
@ -484,7 +495,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.TextSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
text = serializer.validated_data['text']
resolver = m.RSForm.spawn_resolver(pk)
resolver = m.RSForm.resolver_from_schema(pk)
resolver.resolve(text)
return Response(
status=c.HTTP_200_OK,
@ -646,8 +657,9 @@ def inline_synthesis(request: Request) -> HttpResponse:
replacement = new_items[index]
substitutions.append((original, replacement))
PropagationFacade.before_substitute(receiver, substitutions)
PropagationFacade.before_substitute(receiver.model.pk, substitutions)
receiver.substitute(substitutions)
receiver.model.save(update_fields=['time_update'])
return Response(
status=c.HTTP_200_OK,

View File

@ -142,10 +142,6 @@ def exteorFileVersionNotSupported():
return 'Некорректный формат файла Экстеор. Сохраните файл в новой версии'
def invalidPosition():
return 'Invalid position: should be positive integer'
def constituentaNoStructure():
return 'Указанная конституента не обладает теоретико-множественной типизацией'