Compare commits

...

4 Commits

Author SHA1 Message Date
Ivan
a9edf842d8 M: Propagate cst_update
Some checks failed
Frontend CI / build (22.x) (push) Waiting to run
Backend CI / build (3.12) (push) Has been cancelled
2024-08-10 11:46:08 +03:00
Ivan
1600c8abd2 M: Change propogation 2024-08-09 20:57:03 +03:00
Ivan
5c4c0b38d5 R: Cleanup model API: move logic to view/serializer 2024-08-08 15:31:32 +03:00
Ivan
1647b3c0e8 R: RSForm cache and transaction.atomic 2024-08-07 21:54:14 +03:00
40 changed files with 1094 additions and 496 deletions

View File

@ -1,7 +1,6 @@
''' Models: LibraryItem. '''
import re
from django.db import transaction
from django.db.models import (
SET_NULL,
BooleanField,
@ -16,7 +15,6 @@ from django.db.models import (
from apps.users.models import User
from .Subscription import Subscription
from .Version import Version
@ -125,34 +123,3 @@ class LibraryItem(Model):
def versions(self) -> QuerySet[Version]:
''' Get all Versions of this item. '''
return Version.objects.filter(item=self.pk).order_by('-time_create')
# TODO: move to View layer
@transaction.atomic
def save(self, *args, **kwargs):
''' Save updating subscriptions and connected operations. '''
if not self._state.adding:
self._update_connected_operations()
subscribe = self._state.adding and self.owner
super().save(*args, **kwargs)
if subscribe:
Subscription.subscribe(user=self.owner_id, item=self.pk)
def _update_connected_operations(self):
# using method level import to prevent circular dependency
from apps.oss.models import Operation # pylint: disable=import-outside-toplevel
operations = Operation.objects.filter(result__pk=self.pk)
if not operations.exists():
return
for operation in operations:
changed = False
if operation.alias != self.alias:
operation.alias = self.alias
changed = True
if operation.title != self.title:
operation.title = self.title
changed = True
if operation.comment != self.comment:
operation.comment = self.comment
changed = True
if changed:
operation.save()

View File

@ -36,7 +36,7 @@ class LibraryItemSerializer(serializers.ModelSerializer):
class LibraryItemCloneSerializer(serializers.ModelSerializer):
''' Serializer: LibraryItem cloning. '''
items = PKField(many=True, required=False, queryset=Constituenta.objects.all())
items = PKField(many=True, required=False, queryset=Constituenta.objects.all().only('pk'))
class Meta:
''' serializer metadata. '''

View File

@ -68,7 +68,6 @@ class TestLibraryItem(TestCase):
self.assertEqual(item.alias, 'KS1')
self.assertEqual(item.comment, 'Test comment')
self.assertEqual(item.location, LocationHead.COMMON)
self.assertTrue(Subscription.objects.filter(user=item.owner, item=item).exists())
class TestLocation(TestCase):

View File

@ -21,9 +21,7 @@ class TestSubscription(TestCase):
def test_default(self):
subs = list(Subscription.objects.filter(item=self.item))
self.assertEqual(len(subs), 1)
self.assertEqual(subs[0].item, self.item)
self.assertEqual(subs[0].user, self.user1)
self.assertEqual(len(subs), 0)
def test_str(self):

View File

@ -49,6 +49,7 @@ class TestLibraryViewset(EndpointTester):
self.assertEqual(response.data['item_type'], LibraryItemType.RSFORM)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], data['alias'])
self.assertTrue(Subscription.objects.filter(user=self.user, item_id=response.data['id']).exists())
data = {
'item_type': LibraryItemType.OPERATION_SCHEMA,
@ -74,7 +75,7 @@ class TestLibraryViewset(EndpointTester):
@decl_endpoint('/api/library/{item}', method='patch')
def test_update(self):
data = {'id': self.unowned.pk, 'title': 'New Title'}
data = {'title': 'New Title'}
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
@ -86,13 +87,12 @@ class TestLibraryViewset(EndpointTester):
self.unowned.save()
self.executeForbidden(data=data, item=self.unowned.pk)
data = {'id': self.owned.pk, 'title': 'New Title'}
data = {'title': 'New Title'}
response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], self.owned.alias)
data = {
'id': self.owned.pk,
'title': 'Another Title',
'owner': self.user2.pk,
'access_policy': AccessPolicy.PROTECTED,

View File

@ -142,7 +142,7 @@ class TestVersionViews(EndpointTester):
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
d1.delete()
self.owned.delete_cst([d1])
x3 = self.owned.insert_new('X3')
x1.order = x3.order
x1.convention = 'Test2'

View File

@ -4,6 +4,7 @@ from typing import cast
from django.db import transaction
from django.db.models import Q
from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
from rest_framework import status as c
@ -12,7 +13,7 @@ from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from apps.oss.models import OperationSchema
from apps.oss.models import Operation, OperationSchema
from apps.rsform.models import RSForm
from apps.rsform.serializers import RSFormParseSerializer
from apps.users.models import User
@ -36,11 +37,35 @@ class LibraryViewSet(viewsets.ModelViewSet):
return s.LibraryItemBaseSerializer
return s.LibraryItemSerializer
def perform_create(self, serializer):
def perform_create(self, serializer) -> None:
if not self.request.user.is_anonymous and 'owner' not in self.request.POST:
return serializer.save(owner=self.request.user)
instance = serializer.save(owner=self.request.user)
else:
return serializer.save()
instance = serializer.save()
if instance.owner:
m.Subscription.subscribe(user=instance.owner_id, item=instance.pk)
def perform_update(self, serializer) -> None:
instance = serializer.save()
operations = Operation.objects.filter(result__pk=instance.pk)
if not operations.exists():
return
update_list: list[Operation] = []
for operation in operations:
changed = False
if operation.alias != instance.alias:
operation.alias = instance.alias
changed = True
if operation.title != instance.title:
operation.title = instance.title
changed = True
if operation.comment != instance.comment:
operation.comment = instance.comment
changed = True
if changed:
update_list.append(operation)
if update_list:
Operation.objects.bulk_update(update_list, ['alias', 'title', 'comment'])
def get_permissions(self):
if self.action in ['update', 'partial_update']:
@ -79,7 +104,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}
)
@action(detail=True, methods=['post'], url_path='clone')
def clone(self, request: Request, pk):
def clone(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Create deep copy of library item. '''
serializer = s.LibraryItemCloneSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@ -139,7 +164,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
},
)
@action(detail=True, methods=['delete'])
def unsubscribe(self, request: Request, pk):
def unsubscribe(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Unsubscribe current user from item. '''
item = self._get_item()
m.Subscription.unsubscribe(user=cast(int, self.request.user.pk), item=item.pk)
@ -156,7 +181,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}
)
@action(detail=True, methods=['patch'], url_path='set-owner')
def set_owner(self, request: Request, pk):
def set_owner(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item owner. '''
item = self._get_item()
serializer = s.UserTargetSerializer(data=request.data)
@ -188,7 +213,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}
)
@action(detail=True, methods=['patch'], url_path='set-location')
def set_location(self, request: Request, pk):
def set_location(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item location. '''
item = self._get_item()
serializer = s.LocationSerializer(data=request.data)
@ -222,7 +247,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}
)
@action(detail=True, methods=['patch'], url_path='set-access-policy')
def set_access_policy(self, request: Request, pk):
def set_access_policy(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item AccessPolicy. '''
item = self._get_item()
serializer = s.AccessPolicySerializer(data=request.data)
@ -253,7 +278,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}
)
@action(detail=True, methods=['patch'], url_path='set-editors')
def set_editors(self, request: Request, pk):
def set_editors(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set list of editors for item. '''
item = self._get_item()
serializer = s.UsersListSerializer(data=request.data)

View File

@ -1,6 +1,7 @@
''' Endpoints for versions. '''
from typing import cast
from django.db import transaction
from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
@ -40,10 +41,11 @@ class VersionViewset(
}
)
@action(detail=True, methods=['patch'], url_path='restore')
def restore(self, request: Request, pk):
def restore(self, request: Request, pk) -> HttpResponse:
''' Restore version data into current item. '''
version = cast(m.Version, self.get_object())
item = cast(m.LibraryItem, version.item)
with transaction.atomic():
RSFormSerializer(item).restore_from_version(version.data)
return Response(
status=c.HTTP_200_OK,
@ -61,7 +63,7 @@ class VersionViewset(
}
)
@api_view(['GET'])
def export_file(request: Request, pk: int):
def export_file(request: Request, pk: int) -> HttpResponse:
''' Endpoint: Download Exteor compatible file for versioned data. '''
try:
version = m.Version.objects.get(pk=pk)
@ -88,7 +90,7 @@ def export_file(request: Request, pk: int):
)
@api_view(['POST'])
@permission_classes([permissions.GlobalUser])
def create_version(request: Request, pk_item: int):
def create_version(request: Request, pk_item: int) -> HttpResponse:
''' Endpoint: Create new version for RSForm copying current content. '''
try:
item = m.LibraryItem.objects.get(pk=pk_item)
@ -125,7 +127,7 @@ def create_version(request: Request, pk_item: int):
}
)
@api_view(['GET'])
def retrieve_version(request: Request, pk_item: int, pk_version: int):
def retrieve_version(request: Request, pk_item: int, pk_version: int) -> HttpResponse:
''' Endpoint: Retrieve version for RSForm. '''
try:
item = m.LibraryItem.objects.get(pk=pk_item)

View File

@ -0,0 +1,20 @@
# Generated by Django 5.0.7 on 2024-08-09 13:56
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('library', '0003_alter_librarytemplate_lib_source'),
('oss', '0005_inheritance_operation'),
]
operations = [
migrations.AlterField(
model_name='operation',
name='oss',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='operations', to='library.libraryitem', verbose_name='Схема синтеза'),
),
]

View File

@ -0,0 +1,279 @@
''' Models: Change propagation manager. '''
from typing import Optional, cast
from cctext import extract_entities
from apps.library.models import LibraryItem
from apps.rsform.graph import Graph
from apps.rsform.models import (
INSERT_LAST,
Constituenta,
CstType,
RSForm,
extract_globals,
replace_entities,
replace_globals
)
from .Inheritance import Inheritance
from .Operation import Operation
from .OperationSchema import OperationSchema
from .Substitution import Substitution
CstMapping = dict[str, Constituenta]
# TODO: add more variety tests for cascade resolutions model
class ChangeManager:
''' Change propagation API. '''
class Cache:
''' Cache for RSForm constituents. '''
def __init__(self, oss: OperationSchema):
self._oss = oss
self._schemas: list[RSForm] = []
self._schema_by_id: dict[int, RSForm] = {}
self.operations = list(oss.operations().only('result_id'))
self.operation_by_id = {operation.pk: operation for operation in self.operations}
self.graph = Graph[int]()
for operation in self.operations:
self.graph.add_node(operation.pk)
for argument in self._oss.arguments().only('operation_id', 'argument_id'):
self.graph.add_edge(argument.argument_id, argument.operation_id)
self.is_loaded = False
self.substitutions: list[Substitution] = []
self.inheritance: dict[int, list[tuple[int, int]]] = {}
def insert(self, schema: RSForm) -> None:
''' Insert new schema. '''
if not self._schema_by_id.get(schema.model.pk):
self._insert_new(schema)
def get_schema(self, operation: Operation) -> Optional[RSForm]:
''' Get schema by Operation. '''
if operation.result_id is None:
return None
if operation.result_id in self._schema_by_id:
return self._schema_by_id[operation.result_id]
else:
schema = RSForm.from_id(operation.result_id)
schema.cache.ensure_loaded()
self._insert_new(schema)
return schema
def get_operation(self, schema: RSForm) -> Operation:
''' Get operation by schema. '''
for operation in self.operations:
if operation.result_id == schema.model.pk:
return operation
raise ValueError(f'Operation for schema {schema.model.pk} not found')
def ensure_loaded(self) -> None:
''' Ensure propagation of changes. '''
if self.is_loaded:
return
self.is_loaded = True
self.substitutions = list(self._oss.substitutions().only('operation_id', 'original_id', 'substitution_id'))
for operation in self.operations:
self.inheritance[operation.pk] = []
for item in self._oss.inheritance().only('operation_id', 'parent_id', 'child_id'):
self.inheritance[item.operation_id].append((item.parent_id, item.child_id))
def get_successor_for(
self,
parent_cst: int,
operation: int,
ignore_substitution: bool = False
) -> Optional[int]:
''' Get child for parent inside target RSFrom. '''
if not ignore_substitution:
for sub in self.substitutions:
if sub.operation_id == operation and sub.original_id == parent_cst:
return sub.substitution_id
for item in self.inheritance[operation]:
if item[0] == parent_cst:
return item[1]
return None
def insert_inheritance(self, inheritance: Inheritance) -> None:
''' Insert new inheritance. '''
self.inheritance[inheritance.operation_id].append((inheritance.parent_id, inheritance.child_id))
def _insert_new(self, schema: RSForm) -> None:
self._schemas.append(schema)
self._schema_by_id[schema.model.pk] = schema
def __init__(self, model: LibraryItem):
self.oss = OperationSchema(model)
self.cache = ChangeManager.Cache(self.oss)
def on_create_cst(self, new_cst: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when new constituent is created. '''
self.cache.insert(source)
depend_aliases = new_cst.extract_references()
alias_mapping: CstMapping = {}
for alias in depend_aliases:
cst = source.cache.by_alias.get(alias)
if cst is not None:
alias_mapping[alias] = cst
operation = self.cache.get_operation(source)
self._cascade_create_cst(new_cst, operation, alias_mapping)
def on_change_cst_type(self, target: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when constituenta type is changed. '''
self.cache.insert(source)
operation = self.cache.get_operation(source)
self._cascade_change_cst_type(target.pk, target.cst_type, operation)
def on_update_cst(self, target: Constituenta, data: dict, old_data: dict, source: RSForm) -> None:
''' Trigger cascade resolutions when constituenta data is changed. '''
self.cache.insert(source)
operation = self.cache.get_operation(source)
depend_aliases = self._extract_data_references(data, old_data)
alias_mapping: CstMapping = {}
for alias in depend_aliases:
cst = source.cache.by_alias.get(alias)
if cst is not None:
alias_mapping[alias] = cst
self._cascade_update_cst(target.pk, operation, data, old_data, alias_mapping)
def _cascade_create_cst(self, prototype: Constituenta, operation: Operation, mapping: CstMapping) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
source_schema = self.cache.get_schema(operation)
assert source_schema is not None
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_schema(child_operation)
if child_schema is None:
continue
# TODO: update substitutions for diamond synthesis (if needed)
self.cache.ensure_loaded()
new_mapping = self._transform_mapping(mapping, child_operation, child_schema)
alias_mapping = {alias: cst.alias for alias, cst in new_mapping.items()}
insert_where = self._determine_insert_position(prototype, child_operation, source_schema, child_schema)
new_cst = child_schema.insert_copy([prototype], insert_where, alias_mapping)[0]
new_inheritance = Inheritance.objects.create(
operation=child_operation,
child=new_cst,
parent=prototype
)
self.cache.insert_inheritance(new_inheritance)
new_mapping = {alias_mapping[alias]: cst for alias, cst in new_mapping.items()}
self._cascade_create_cst(new_cst, child_operation, new_mapping)
def _cascade_change_cst_type(self, cst_id: int, ctype: CstType, operation: Operation) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_successor_for(cst_id, child_id, ignore_substitution=True)
if successor_id is None:
continue
child_schema = self.cache.get_schema(child_operation)
if child_schema is not None and child_schema.change_cst_type(successor_id, ctype):
self._cascade_change_cst_type(successor_id, ctype, child_operation)
# pylint: disable=too-many-arguments
def _cascade_update_cst(
self,
cst_id: int, operation: Operation,
data: dict, old_data: dict,
mapping: CstMapping
) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_successor_for(cst_id, child_id, ignore_substitution=True)
if successor_id is None:
continue
child_schema = self.cache.get_schema(child_operation)
assert child_schema is not None
new_mapping = self._transform_mapping(mapping, child_operation, child_schema)
alias_mapping = {alias: cst.alias for alias, cst in new_mapping.items()}
successor = child_schema.cache.by_id.get(successor_id)
if successor is None:
continue
new_data = self._prepare_update_data(successor, data, old_data, alias_mapping)
if len(new_data) == 0:
continue
new_old_data = child_schema.update_cst(successor, new_data)
if len(new_old_data) == 0:
continue
new_mapping = {alias_mapping[alias]: cst for alias, cst in new_mapping.items()}
self._cascade_update_cst(successor_id, child_operation, new_data, new_old_data, new_mapping)
def _transform_mapping(self, mapping: CstMapping, operation: Operation, schema: RSForm) -> CstMapping:
if len(mapping) == 0:
return mapping
result: CstMapping = {}
for alias, cst in mapping.items():
successor_id = self.cache.get_successor_for(cst.pk, operation.pk)
if successor_id is None:
continue
successor = schema.cache.by_id.get(successor_id)
if successor is None:
continue
result[alias] = successor
return result
def _determine_insert_position(
self, prototype: Constituenta,
operation: Operation,
source: RSForm,
destination: RSForm
) -> int:
''' Determine insert_after for new constituenta. '''
if prototype.order == 1:
return 1
prev_cst = source.cache.constituents[prototype.order - 2]
inherited_prev_id = self.cache.get_successor_for(
source.cache.constituents[prototype.order - 2].pk, operation.pk)
if inherited_prev_id is None:
return INSERT_LAST
prev_cst = destination.cache.by_id[inherited_prev_id]
return cast(int, prev_cst.order) + 1
def _extract_data_references(self, data: dict, old_data: dict) -> set[str]:
result: set[str] = set()
if 'definition_formal' in data:
result.update(extract_globals(data['definition_formal']))
result.update(extract_globals(old_data['definition_formal']))
if 'term_raw' in data:
result.update(extract_entities(data['term_raw']))
result.update(extract_entities(old_data['term_raw']))
if 'definition_raw' in data:
result.update(extract_entities(data['definition_raw']))
result.update(extract_entities(old_data['definition_raw']))
return result
def _prepare_update_data(self, cst: Constituenta, data: dict, old_data: dict, mapping: dict[str, str]) -> dict:
new_data = {}
if 'term_forms' in data:
if old_data['term_forms'] == cst.term_forms:
new_data['term_forms'] = data['term_forms']
if 'convention' in data:
if old_data['convention'] == cst.convention:
new_data['convention'] = data['convention']
if 'definition_formal' in data:
new_data['definition_formal'] = replace_globals(data['definition_formal'], mapping)
if 'term_raw' in data:
if replace_entities(old_data['term_raw'], mapping) == cst.term_raw:
new_data['term_raw'] = replace_entities(data['term_raw'], mapping)
if 'definition_raw' in data:
if replace_entities(old_data['definition_raw'], mapping) == cst.definition_raw:
new_data['definition_raw'] = replace_entities(data['definition_raw'], mapping)
return new_data

View File

@ -27,7 +27,7 @@ class Operation(Model):
verbose_name='Схема синтеза',
to='library.LibraryItem',
on_delete=CASCADE,
related_name='items'
related_name='operations'
)
operation_type: CharField = CharField(
verbose_name='Тип',

View File

@ -1,11 +1,10 @@
''' Models: OSS API. '''
from typing import Optional
from django.db import transaction
from django.db.models import QuerySet
from apps.library.models import Editor, LibraryItem, LibraryItemType
from apps.rsform.models import RSForm
from apps.rsform.models import Constituenta, RSForm
from .Argument import Argument
from .Inheritance import Inheritance
@ -31,11 +30,11 @@ class OperationSchema:
model = LibraryItem.objects.get(pk=pk)
return OperationSchema(model)
def save(self, *args, **kwargs):
def save(self, *args, **kwargs) -> None:
''' Save wrapper. '''
self.model.save(*args, **kwargs)
def refresh_from_db(self):
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
@ -51,6 +50,10 @@ class OperationSchema:
''' Operation substitutions. '''
return Substitution.objects.filter(operation__oss=self.model)
def inheritance(self) -> QuerySet[Inheritance]:
''' Operation inheritances. '''
return Inheritance.objects.filter(operation__oss=self.model)
def owned_schemas(self) -> QuerySet[LibraryItem]:
''' Get QuerySet containing all result schemas owned by current OSS. '''
return LibraryItem.objects.filter(
@ -59,7 +62,7 @@ class OperationSchema:
location=self.model.location
)
def update_positions(self, data: list[dict]):
def update_positions(self, data: list[dict]) -> None:
''' Update positions. '''
lookup = {x['id']: x for x in data}
operations = self.operations()
@ -69,7 +72,6 @@ class OperationSchema:
item.position_y = lookup[item.pk]['position_y']
Operation.objects.bulk_update(operations, ['position_x', 'position_y'])
@transaction.atomic
def create_operation(self, **kwargs) -> Operation:
''' Insert new operation. '''
result = Operation.objects.create(oss=self.model, **kwargs)
@ -77,7 +79,6 @@ class OperationSchema:
result.refresh_from_db()
return result
@transaction.atomic
def delete_operation(self, operation: Operation):
''' Delete operation. '''
operation.delete()
@ -87,8 +88,7 @@ class OperationSchema:
self.save()
@transaction.atomic
def set_input(self, target: Operation, schema: Optional[LibraryItem]):
def set_input(self, target: Operation, schema: Optional[LibraryItem]) -> None:
''' Set input schema for operation. '''
if schema == target.result:
return
@ -104,8 +104,7 @@ class OperationSchema:
self.save()
@transaction.atomic
def set_arguments(self, operation: Operation, arguments: list[Operation]):
def set_arguments(self, operation: Operation, arguments: list[Operation]) -> None:
''' Set arguments to operation. '''
processed: list[Operation] = []
changed = False
@ -125,8 +124,7 @@ class OperationSchema:
# TODO: trigger on_change effects
self.save()
@transaction.atomic
def set_substitutions(self, target: Operation, substitutes: list[dict]):
def set_substitutions(self, target: Operation, substitutes: list[dict]) -> None:
''' Clear all arguments for operation. '''
processed: list[dict] = []
changed = False
@ -157,7 +155,6 @@ class OperationSchema:
self.save()
@transaction.atomic
def create_input(self, operation: Operation) -> RSForm:
''' Create input RSForm. '''
schema = RSForm.create(
@ -175,7 +172,6 @@ class OperationSchema:
self.save()
return schema
@transaction.atomic
def execute_operation(self, operation: Operation) -> bool:
''' Execute target operation. '''
schemas: list[LibraryItem] = [arg.argument.result for arg in operation.getArguments()]
@ -194,10 +190,12 @@ class OperationSchema:
parents[cst.pk] = items[i]
children[items[i].pk] = cst
translated_substitutions: list[tuple[Constituenta, Constituenta]] = []
for sub in substitutions:
original = children[sub.original.pk]
replacement = children[sub.substitution.pk]
receiver.substitute(original, replacement)
translated_substitutions.append((original, replacement))
receiver.substitute(translated_substitutions)
# TODO: remove duplicates from diamond

View File

@ -1,6 +1,7 @@
''' Django: Models. '''
from .Argument import Argument
from .ChangeManager import ChangeManager
from .Inheritance import Inheritance
from .Operation import Operation, OperationType
from .OperationSchema import OperationSchema

View File

@ -57,9 +57,9 @@ class OperationCreateSerializer(serializers.Serializer):
class OperationUpdateSerializer(serializers.Serializer):
''' Serializer: Operation creation. '''
''' Serializer: Operation update. '''
class OperationUpdateData(serializers.ModelSerializer):
''' Serializer: Operation creation data. '''
''' Serializer: Operation update data. '''
class Meta:
''' serializer metadata. '''
model = Operation

View File

@ -31,36 +31,3 @@ class TestOperation(TestCase):
self.assertEqual(self.operation.comment, '')
self.assertEqual(self.operation.position_x, 0)
self.assertEqual(self.operation.position_y, 0)
def test_sync_from_result(self):
schema = RSForm.create(alias=self.operation.alias)
self.operation.result = schema.model
self.operation.save()
schema.model.alias = 'KS2'
schema.model.comment = 'Comment'
schema.model.title = 'Title'
schema.save()
self.operation.refresh_from_db()
self.assertEqual(self.operation.result, schema.model)
self.assertEqual(self.operation.alias, schema.model.alias)
self.assertEqual(self.operation.title, schema.model.title)
self.assertEqual(self.operation.comment, schema.model.comment)
def test_sync_from_library_item(self):
schema = LibraryItem.objects.create(alias=self.operation.alias, item_type=LibraryItemType.RSFORM)
self.operation.result = schema
self.operation.save()
schema.alias = 'KS2'
schema.comment = 'Comment'
schema.title = 'Title'
schema.save()
self.operation.refresh_from_db()
self.assertEqual(self.operation.result, schema)
self.assertEqual(self.operation.alias, schema.alias)
self.assertEqual(self.operation.title, schema.title)
self.assertEqual(self.operation.comment, schema.comment)

View File

@ -14,9 +14,9 @@ class TestSynthesisSubstitution(TestCase):
self.oss = OperationSchema.create(alias='T1')
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks1X1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.ks2X1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = Operation.objects.create(
oss=self.oss.model,
@ -46,13 +46,13 @@ class TestSynthesisSubstitution(TestCase):
self.substitution = Substitution.objects.create(
operation=self.operation3,
original=self.ks1x1,
substitution=self.ks2x1
original=self.ks1X1,
substitution=self.ks2X1
)
def test_str(self):
testStr = f'{self.ks1x1} -> {self.ks2x1}'
testStr = f'{self.ks1X1} -> {self.ks2X1}'
self.assertEqual(str(self.substitution), testStr)
@ -64,11 +64,11 @@ class TestSynthesisSubstitution(TestCase):
def test_cascade_delete_original(self):
self.assertEqual(Substitution.objects.count(), 1)
self.ks1x1.delete()
self.ks1X1.delete()
self.assertEqual(Substitution.objects.count(), 0)
def test_cascade_delete_substitution(self):
self.assertEqual(Substitution.objects.count(), 1)
self.ks2x1.delete()
self.ks2X1.delete()
self.assertEqual(Substitution.objects.count(), 0)

View File

@ -1,3 +1,4 @@
''' Tests for REST API. '''
from .t_change_attributes import *
from .t_change_constituents import *
from .t_oss import *

View File

@ -1,7 +1,4 @@
''' Testing API: Change attributes of OSS and RSForms. '''
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LocationHead
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm
@ -123,3 +120,33 @@ class TestChangeAttributes(EndpointTester):
self.assertEqual(list(self.ks1.model.editors()), [self.user, self.user2])
self.assertEqual(list(self.ks2.model.editors()), [])
self.assertEqual(set(self.ks3.editors()), set([self.user, self.user3]))
@decl_endpoint('/api/library/{item}', method='patch')
def test_sync_from_result(self):
data = {'alias': 'KS111', 'title': 'New Title', 'comment': 'New Comment'}
self.executeOK(data=data, item=self.ks1.model.pk)
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.result, self.ks1.model)
self.assertEqual(self.operation1.alias, data['alias'])
self.assertEqual(self.operation1.title, data['title'])
self.assertEqual(self.operation1.comment, data['comment'])
@decl_endpoint('/api/oss/{item}/update-operation', method='patch')
def test_sync_from_operation(self):
data = {
'target': self.operation3.pk,
'item_data': {
'alias': 'Test3 mod',
'title': 'Test title mod',
'comment': 'Comment mod'
},
'positions': [],
}
response = self.executeOK(data=data, item=self.owned_id)
self.ks3.refresh_from_db()
self.assertEqual(self.ks3.alias, data['item_data']['alias'])
self.assertEqual(self.ks3.title, data['item_data']['title'])
self.assertEqual(self.ks3.comment, data['item_data']['comment'])

View File

@ -0,0 +1,109 @@
''' Testing API: Change constituents in OSS. '''
from apps.oss.models import OperationSchema, OperationType
from apps.rsform.models import Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestChangeConstituents(EndpointTester):
''' Testing Constituents change propagation in OSS. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(
title='Test',
alias='T1',
owner=self.user
)
self.owned_id = self.owned.model.pk
self.ks1 = RSForm.create(
alias='KS1',
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X4')
self.ks1X2 = self.ks1.insert_new('X5')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1')
self.ks2D1 = self.ks2.insert_new(
alias='D1',
definition_formal=r'X1\X1'
)
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.model
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.model
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
self.assertEqual(self.ks3.constituents().count(), 4)
@decl_endpoint('/api/rsforms/{schema}/create-cst', method='post')
def test_create_constituenta(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'definition_formal': 'X4 = X5'
}
response = self.executeCreated(data=data, schema=self.ks1.model.pk)
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 5)
self.assertEqual(inherited_cst.alias, 'X4')
self.assertEqual(inherited_cst.order, 3)
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
@decl_endpoint('/api/rsforms/{schema}/rename-cst', method='patch')
def test_rename_constituenta(self):
data = {'target': self.ks1X1.pk, 'alias': 'D21', 'cst_type': CstType.TERM}
response = self.executeOK(data=data, schema=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertEqual(self.ks1X1.alias, data['alias'])
self.assertEqual(self.ks1X1.cst_type, data['cst_type'])
self.assertEqual(inherited_cst.alias, 'D2')
self.assertEqual(inherited_cst.cst_type, data['cst_type'])
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_constituenta(self):
d2 = self.ks3.insert_new('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
data = {
'target': self.ks1X1.pk,
'item_data': {
'term_raw': 'Test1',
'definition_formal': r'X4\X4',
'definition_raw': '@{X5|sing,datv}'
}
}
response = self.executeOK(data=data, schema=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
d2.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertEqual(self.ks1X1.term_raw, data['item_data']['term_raw'])
self.assertEqual(self.ks1X1.definition_formal, data['item_data']['definition_formal'])
self.assertEqual(self.ks1X1.definition_raw, data['item_data']['definition_raw'])
self.assertEqual(d2.definition_resolved, data['item_data']['term_raw'])
self.assertEqual(inherited_cst.term_raw, data['item_data']['term_raw'])
self.assertEqual(inherited_cst.definition_formal, r'X1\X1')
self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}')

View File

@ -1,8 +1,5 @@
''' Testing API: Operation Schema. '''
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType, LocationHead
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
@ -28,7 +25,7 @@ class TestOssViewset(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1x1 = self.ks1.insert_new(
self.ks1X1 = self.ks1.insert_new(
'X1',
term_raw='X1_1',
term_resolved='X1_1'
@ -38,7 +35,7 @@ class TestOssViewset(EndpointTester):
title='Test2',
owner=self.user
)
self.ks2x1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_new(
'X2',
term_raw='X1_2',
term_resolved='X1_2'
@ -60,8 +57,8 @@ class TestOssViewset(EndpointTester):
)
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1,
'substitution': self.ks2x1
'original': self.ks1X1,
'substitution': self.ks2X1
}])
@decl_endpoint('/api/oss/{item}/details', method='get')
@ -85,12 +82,12 @@ class TestOssViewset(EndpointTester):
self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk)
self.assertEqual(sub['original_alias'], self.ks1x1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved)
self.assertEqual(sub['original'], self.ks1X1.pk)
self.assertEqual(sub['substitution'], self.ks2X1.pk)
self.assertEqual(sub['original_alias'], self.ks1X1.alias)
self.assertEqual(sub['original_term'], self.ks1X1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2X1.alias)
self.assertEqual(sub['substitution_term'], self.ks2X1.term_resolved)
arguments = response.data['arguments']
self.assertEqual(len(arguments), 2)
@ -369,14 +366,14 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [
{
'original': self.ks1x1.pk,
'original': self.ks1X1.pk,
'substitution': ks3x1.pk
}
]
}
self.executeBadData(data=data)
data['substitutions'][0]['substitution'] = self.ks2x1.pk
data['substitutions'][0]['substitution'] = self.ks2X1.pk
self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id)
self.logout()
@ -421,7 +418,7 @@ class TestOssViewset(EndpointTester):
def test_update_operation_invalid_substitution(self):
self.populateData()
self.ks1x2 = self.ks1.insert_new('X2')
self.ks1X2 = self.ks1.insert_new('X2')
data = {
'target': self.operation3.pk,
@ -434,12 +431,12 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [
{
'original': self.ks1x1.pk,
'substitution': self.ks2x1.pk
'original': self.ks1X1.pk,
'substitution': self.ks2X1.pk
},
{
'original': self.ks2x1.pk,
'substitution': self.ks1x2.pk
'original': self.ks2X1.pk,
'substitution': self.ks1X2.pk
}
]
}
@ -473,4 +470,4 @@ class TestOssViewset(EndpointTester):
items = list(RSForm(schema).constituents())
self.assertEqual(len(items), 1)
self.assertEqual(items[0].alias, 'X1')
self.assertEqual(items[0].term_resolved, self.ks2x1.term_resolved)
self.assertEqual(items[0].term_resolved, self.ks2X1.term_resolved)

View File

@ -2,6 +2,7 @@
from typing import cast
from django.db import transaction
from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics, serializers
from rest_framework import status as c
@ -61,7 +62,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk):
def details(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Detailed OSS data. '''
serializer = s.OperationSchemaSerializer(self._get_item())
return Response(
@ -80,7 +81,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['patch'], url_path='update-positions')
def update_positions(self, request: Request, pk):
def update_positions(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Update operations positions. '''
serializer = s.PositionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@ -99,7 +100,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['post'], url_path='create-operation')
def create_operation(self, request: Request, pk):
def create_operation(self, request: Request, pk) -> HttpResponse:
''' Create new operation. '''
serializer = s.OperationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@ -135,7 +136,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['patch'], url_path='delete-operation')
def delete_operation(self, request: Request, pk):
def delete_operation(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete operation. '''
serializer = s.OperationTargetSerializer(
data=request.data,
@ -165,7 +166,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['patch'], url_path='create-input')
def create_input(self, request: Request, pk):
def create_input(self, request: Request, pk) -> HttpResponse:
''' Create new input RSForm. '''
serializer = s.OperationTargetSerializer(
data=request.data,
@ -208,7 +209,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['patch'], url_path='set-input')
def set_input(self, request: Request, pk):
def set_input(self, request: Request, pk) -> HttpResponse:
''' Set input schema for target operation. '''
serializer = s.SetOperationInputSerializer(
data=request.data,
@ -238,7 +239,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['patch'], url_path='update-operation')
def update_operation(self, request: Request, pk):
def update_operation(self, request: Request, pk) -> HttpResponse:
''' Update operation arguments and parameters. '''
serializer = s.OperationUpdateSerializer(
data=request.data,
@ -253,7 +254,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
operation.alias = serializer.validated_data['item_data']['alias']
operation.title = serializer.validated_data['item_data']['title']
operation.comment = serializer.validated_data['item_data']['comment']
operation.save()
operation.save(update_fields=['alias', 'title', 'comment'])
if operation.result is not None:
can_edit = permissions.can_edit_item(request.user, operation.result)
@ -283,7 +284,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=True, methods=['post'], url_path='execute-operation')
def execute_operation(self, request: Request, pk):
def execute_operation(self, request: Request, pk) -> HttpResponse:
''' Execute operation. '''
serializer = s.OperationTargetSerializer(
data=request.data,
@ -323,7 +324,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
}
)
@action(detail=False, methods=['post'], url_path='get-predecessor')
def get_predecessor(self, request: Request):
def get_predecessor(self, request: Request) -> HttpResponse:
''' Get predecessor. '''
# TODO: add tests for this method
serializer = CstTargetSerializer(data=request.data)

View File

@ -1,6 +1,7 @@
''' Models: Constituenta. '''
import re
from cctext import extract_entities
from django.core.validators import MinValueValidator
from django.db.models import (
CASCADE,
@ -15,10 +16,26 @@ from django.db.models import (
from ..utils import apply_pattern
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_REF_ENTITY_PATTERN = re.compile(r'@{([^0-9\-].*?)\|.*?}')
_GLOBAL_ID_PATTERN = re.compile(r'([XCSADFPT][0-9]+)') # cspell:disable-line
def extract_globals(expression: str) -> set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
def replace_globals(expression: str, mapping: dict[str, str]) -> str:
''' Replace all global aliases in expression. '''
return apply_pattern(expression, mapping, _GLOBAL_ID_PATTERN)
def replace_entities(expression: str, mapping: dict[str, str]) -> str:
''' Replace all entity references in expression. '''
return apply_pattern(expression, mapping, _REF_ENTITY_PATTERN)
class CstType(TextChoices):
''' Type of constituenta. '''
BASE = 'basic'
@ -99,8 +116,6 @@ class Constituenta(Model):
def set_term_resolved(self, new_term: str):
''' Set term and reset forms if needed. '''
if new_term == self.term_resolved:
return
self.term_resolved = new_term
self.term_forms = []
@ -109,20 +124,23 @@ class Constituenta(Model):
if change_aliases and self.alias in mapping:
modified = True
self.alias = mapping[self.alias]
expression = apply_pattern(self.definition_formal, mapping, _GLOBAL_ID_PATTERN)
expression = replace_globals(self.definition_formal, mapping)
if expression != self.definition_formal:
modified = True
self.definition_formal = expression
convention = apply_pattern(self.convention, mapping, _GLOBAL_ID_PATTERN)
if convention != self.convention:
modified = True
self.convention = convention
term = apply_pattern(self.term_raw, mapping, _REF_ENTITY_PATTERN)
term = replace_entities(self.term_raw, mapping)
if term != self.term_raw:
modified = True
self.term_raw = term
definition = apply_pattern(self.definition_raw, mapping, _REF_ENTITY_PATTERN)
definition = replace_entities(self.definition_raw, mapping)
if definition != self.definition_raw:
modified = True
self.definition_raw = definition
return modified
def extract_references(self) -> set[str]:
''' Extract all references from term and definition. '''
result: set[str] = extract_globals(self.definition_formal)
result.update(extract_entities(self.term_raw))
result.update(extract_entities(self.definition_raw))
return result

View File

@ -1,10 +1,9 @@
''' Models: RSForm API. '''
from copy import deepcopy
from typing import Optional, cast
from typing import Iterable, Optional, cast
from cctext import Entity, Resolver, TermForm, extract_entities, split_grams
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType, Version
@ -12,7 +11,6 @@ from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
extract_globals,
generate_structure,
get_type_prefix,
guess_type,
@ -22,16 +20,79 @@ from .api_RSLanguage import (
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType
from .Constituenta import Constituenta, CstType, extract_globals
_INSERT_LAST: int = -1
INSERT_LAST: int = -1
class RSForm:
''' RSForm is math form of conceptual schema. '''
class Cache:
''' Cache for RSForm constituents. '''
def __init__(self, schema: 'RSForm'):
self._schema = schema
self.constituents: list[Constituenta] = []
self.by_id: dict[int, Constituenta] = {}
self.by_alias: dict[str, Constituenta] = {}
self.is_loaded = False
def reload(self) -> None:
self.constituents = list(
self._schema.constituents().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
def clear(self) -> None:
self.constituents = []
self.by_id = {}
self.by_alias = {}
self.is_loaded = False
def insert(self, cst: Constituenta) -> None:
if self.is_loaded:
self.constituents.insert(cst.order - 1, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def insert_multi(self, items: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in items:
self.constituents.insert(cst.order - 1, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def remove(self, target: Constituenta) -> None:
if self.is_loaded:
self.constituents.remove(self.by_id[target.pk])
del self.by_id[target.pk]
del self.by_alias[target.alias]
def remove_multi(self, target: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in target:
self.constituents.remove(self.by_id[cst.pk])
del self.by_id[cst.pk]
del self.by_alias[cst.alias]
def __init__(self, model: LibraryItem):
self.model = model
self.cache: RSForm.Cache = RSForm.Cache(self)
@staticmethod
def create(**kwargs) -> 'RSForm':
@ -45,11 +106,11 @@ class RSForm:
model = LibraryItem.objects.get(pk=pk)
return RSForm(model)
def save(self, *args, **kwargs):
def save(self, *args, **kwargs) -> None:
''' Model wrapper. '''
self.model.save(*args, **kwargs)
def refresh_from_db(self):
def refresh_from_db(self) -> None:
''' Model wrapper. '''
self.model.refresh_from_db()
@ -60,7 +121,7 @@ class RSForm:
def resolver(self) -> Resolver:
''' Create resolver for text references based on schema terms. '''
result = Resolver({})
for cst in self.constituents():
for cst in self.constituents().only('alias', 'term_resolved', 'term_forms'):
entity = Entity(
alias=cst.alias,
nominal=cst.term_resolved,
@ -76,53 +137,57 @@ class RSForm:
''' Access semantic information on constituents. '''
return SemanticInfo(self)
@transaction.atomic
def on_term_change(self, changed: list[int]):
def on_term_change(self, changed: list[int]) -> None:
''' Trigger cascade resolutions when term changes. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
update_list: list[Constituenta] = []
resolver = self.resolver()
if len(expansion) > 0:
for cst_id in graph_terms.topological_order():
if cst_id not in expansion:
continue
cst = self.constituents().get(id=cst_id)
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
if resolved == cst.term_resolved:
if resolved == resolver.context[cst.alias].get_nominal():
continue
cst.set_term_resolved(resolved)
cst.save()
update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
graph_defs = self._graph_text()
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0:
return
for cst_id in update_defs:
cst = self.constituents().get(id=cst_id)
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw)
if resolved == cst.definition_resolved:
continue
cst.definition_resolved = resolved
cst.save()
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def get_max_index(self, cst_type: CstType) -> int:
''' Get maximum alias index for specific CstType. '''
result: int = 0
items = Constituenta.objects \
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = Constituenta.objects \
.filter(schema=self.model, cst_type=cst_type) \
.order_by('-alias') \
.values_list('alias', flat=True)
for alias in items:
result = max(result, int(alias[1:]))
.only('alias')
else:
cst_list = [cst for cst in self.cache.constituents if cst.cst_type == cst_type]
for cst in cst_list:
result = max(result, int(cst.alias[1:]))
return result
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create new cst from data. '''
if insert_after is None:
position = _INSERT_LAST
position = INSERT_LAST
else:
position = insert_after.order + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
@ -142,16 +207,16 @@ class RSForm:
result.definition_resolved = resolver.resolve(result.definition_raw)
result.save()
self.cache.insert(result)
self.on_term_change([result.pk])
result.refresh_from_db()
return result
@transaction.atomic
def insert_new(
self,
alias: str,
cst_type: Optional[CstType] = None,
position: int = _INSERT_LAST,
position: int = INSERT_LAST,
**kwargs
) -> Constituenta:
''' Insert new constituenta at given position.
@ -169,17 +234,18 @@ class RSForm:
cst_type=cst_type,
**kwargs
)
self.cache.insert(result)
self.save()
result.refresh_from_db()
return result
@transaction.atomic
def insert_copy(self, items: list[Constituenta], position: int = _INSERT_LAST) -> list[Constituenta]:
def insert_copy(self, items: list[Constituenta], position: int = INSERT_LAST,
initial_mapping: Optional[dict[str, str]] = None) -> list[Constituenta]:
''' Insert copy of target constituents updating references. '''
count = len(items)
if count == 0:
return []
self.cache.ensure_loaded()
position = self._get_insert_position(position)
self._shift_positions(position, count)
@ -187,7 +253,7 @@ class RSForm:
for (value, _) in CstType.choices:
indices[value] = self.get_max_index(cast(CstType, value))
mapping: dict[str, str] = {}
mapping: dict[str, str] = initial_mapping.copy() if initial_mapping else {}
for cst in items:
indices[cst.cst_type] = indices[cst.cst_type] + 1
newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}'
@ -200,62 +266,115 @@ class RSForm:
cst.order = position
cst.alias = mapping[cst.alias]
cst.apply_mapping(mapping)
cst.save()
position = position + 1
new_cst = Constituenta.objects.bulk_create(result)
self.cache.insert_multi(new_cst)
self.save()
return result
@transaction.atomic
def move_cst(self, listCst: list[Constituenta], target: int):
# pylint: disable=too-many-branches
def update_cst(self, target: Constituenta, data: dict) -> dict:
''' Update persistent attributes of a given constituenta. Return old values. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target.pk)
if cst is None:
raise ValidationError(msg.constituentaNotInRSform(target.alias))
old_data = {}
term_changed = False
if 'convention' in data:
cst.convention = data['convention']
if 'definition_formal' in data:
if cst.definition_formal == data['definition_formal']:
del data['definition_formal']
else:
old_data['definition_formal'] = cst.definition_formal
cst.definition_formal = data['definition_formal']
if 'term_forms' in data:
term_changed = True
old_data['term_forms'] = cst.term_forms
cst.term_forms = data['term_forms']
if 'definition_raw' in data or 'term_raw' in data:
resolver = self.resolver()
if 'term_raw' in data:
if cst.term_raw == data['term_raw']:
del data['term_raw']
else:
term_changed = True
old_data['term_raw'] = cst.term_raw
cst.term_raw = data['term_raw']
cst.term_resolved = resolver.resolve(cst.term_raw)
if 'term_forms' not in data:
cst.term_forms = []
resolver.context[cst.alias] = Entity(cst.alias, cst.term_resolved, manual_forms=cst.term_forms)
if 'definition_raw' in data:
if cst.definition_raw == data['definition_raw']:
del data['definition_raw']
else:
old_data['definition_raw'] = cst.definition_raw
cst.definition_raw = data['definition_raw']
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
if term_changed:
self.on_term_change([cst.pk])
self.save()
return old_data
def move_cst(self, target: list[Constituenta], destination: int) -> None:
''' Move list of constituents to specific position '''
count_moved = 0
count_top = 0
count_bot = 0
size = len(listCst)
update_list = []
for cst in self.constituents().only('order').order_by('order'):
if cst not in listCst:
if count_top + 1 < target:
size = len(target)
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituents().only('order').order_by('order')
else:
cst_list = self.cache.constituents
for cst in cst_list:
if cst in target:
cst.order = destination + count_moved
count_moved += 1
elif count_top + 1 < destination:
cst.order = count_top + 1
count_top += 1
else:
cst.order = target + size + count_bot
cst.order = destination + size + count_bot
count_bot += 1
else:
cst.order = target + count_moved
count_moved += 1
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['order'])
Constituenta.objects.bulk_update(cst_list, ['order'])
self.save()
@transaction.atomic
def delete_cst(self, listCst):
def delete_cst(self, target: Iterable[Constituenta]) -> None:
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
for cst in listCst:
cst.delete()
self.cache.remove_multi(target)
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order()
self.resolve_all_text()
self.save()
@transaction.atomic
def substitute(
self,
original: Constituenta,
substitution: Constituenta
):
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
''' Execute constituenta substitution. '''
mapping = {}
deleted: list[Constituenta] = []
replacements: list[Constituenta] = []
for original, substitution in substitutions:
assert original.pk != substitution.pk
mapping = {original.alias: substitution.alias}
mapping[original.alias] = substitution.alias
deleted.append(original)
replacements.append(substitution)
self.cache.remove_multi(deleted)
Constituenta.objects.filter(pk__in=[cst.pk for cst in deleted]).delete()
self.apply_mapping(mapping)
original.delete()
self.on_term_change([substitution.pk])
self.on_term_change([substitution.pk for substitution in replacements])
def restore_order(self):
def restore_order(self) -> None:
''' Restore order based on types and term graph. '''
manager = _OrderManager(self)
manager.restore_order()
def reset_aliases(self):
def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. '''
mapping = self._create_reset_mapping()
self.apply_mapping(mapping, change_aliases=True)
@ -273,33 +392,50 @@ class RSForm:
mapping[cst.alias] = alias
return mapping
@transaction.atomic
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False):
''' Apply rename mapping. '''
cst_list = self.constituents().order_by('order')
for cst in cst_list:
if cst.apply_mapping(mapping, change_aliases):
cst.save()
def change_cst_type(self, target: int, new_type: CstType) -> bool:
''' Change type of constituenta generating alias automatically. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target)
if cst is None:
return False
newAlias = f'{get_type_prefix(new_type)}{self.get_max_index(new_type) + 1}'
mapping = {cst.alias: newAlias}
cst.cst_type = new_type
cst.alias = newAlias
cst.save(update_fields=['cst_type', 'alias'])
self.apply_mapping(mapping)
return True
@transaction.atomic
def resolve_all_text(self):
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save()
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term()
resolver = Resolver({})
update_list: list[Constituenta] = []
for cst_id in graph_terms.topological_order():
cst = self.constituents().get(id=cst_id)
cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw)
resolver.context[cst.alias] = Entity(cst.alias, resolved)
if resolved != cst.term_resolved:
cst.term_resolved = resolved
cst.save()
for cst in self.constituents():
resolved = resolver.resolve(cst.definition_raw)
if resolved != cst.definition_resolved:
cst.definition_resolved = resolved
cst.save()
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
for cst in self.cache.constituents:
resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved
Constituenta.objects.bulk_update(self.cache.constituents, ['definition_resolved'])
@transaction.atomic
def create_version(self, version: str, description: str, data) -> Version:
''' Creates version for current state. '''
return Version.objects.create(
@ -309,7 +445,6 @@ class RSForm:
data=data
)
@transaction.atomic
def produce_structure(self, target: Constituenta, parse: dict) -> list[int]:
''' Add constituents for each structural element of the target. '''
expressions = generate_structure(
@ -320,9 +455,10 @@ class RSForm:
count_new = len(expressions)
if count_new == 0:
return []
position = target.order + 1
self._shift_positions(position, count_new)
position = target.order + 1
self.cache.ensure_loaded()
self._shift_positions(position, count_new)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self.get_max_index(cst_type) + 1
@ -339,97 +475,86 @@ class RSForm:
free_index = free_index + 1
position = position + 1
self.cache.clear()
self.save()
return result
def _shift_positions(self, start: int, shift: int):
def _shift_positions(self, start: int, shift: int) -> None:
if shift == 0:
return
update_list = \
Constituenta.objects \
update_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
update_list = Constituenta.objects \
.only('order') \
.filter(schema=self.model, order__gte=start)
else:
update_list = [cst for cst in self.cache.constituents if cst.order >= start]
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
def _get_last_position(self):
if self.constituents().exists():
return self.constituents().count()
else:
return 0
def _get_insert_position(self, position: int) -> int:
if position <= 0 and position != _INSERT_LAST:
if position <= 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition())
lastPosition = self._get_last_position()
if position == _INSERT_LAST:
lastPosition = self.constituents().count()
if position == INSERT_LAST:
position = lastPosition + 1
else:
position = max(1, min(position, lastPosition + 1))
return position
@transaction.atomic
def _reset_order(self):
def _reset_order(self) -> None:
order = 1
for cst in self.constituents().only('order').order_by('order'):
changed: list[Constituenta] = []
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituents().only('order').order_by('order')
else:
cst_list = self.cache.constituents
for cst in cst_list:
if cst.order != order:
cst.order = order
cst.save()
changed.append(cst)
order += 1
Constituenta.objects.bulk_update(changed, ['order'])
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
cst_list = \
self.constituents() \
.only('alias', 'definition_formal') \
.order_by('order')
for cst in cst_list:
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in cst_list:
for cst in self.cache.constituents:
for alias in extract_globals(cst.definition_formal):
try:
child = cst_list.get(alias=alias)
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result
def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
cst_list = \
self.constituents() \
.only('alias', 'term_raw') \
.order_by('order')
for cst in cst_list:
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in cst_list:
for cst in self.cache.constituents:
for alias in extract_entities(cst.term_raw):
try:
child = cst_list.get(alias=alias)
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result
def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph()
cst_list = \
self.constituents() \
.only('alias', 'definition_raw') \
.order_by('order')
for cst in cst_list:
for cst in self.cache.constituents:
result.add_node(cst.pk)
for cst in cst_list:
for cst in self.cache.constituents:
for alias in extract_entities(cst.definition_raw):
try:
child = cst_list.get(alias=alias)
child = self.cache.by_alias.get(alias)
if child is not None:
result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result
@ -437,14 +562,11 @@ class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSForm):
schema.cache.ensure_loaded()
self._graph = schema._graph_formal()
self._items = list(
schema.constituents()
.only('alias', 'cst_type', 'definition_formal')
.order_by('order')
)
self._cst_by_alias = {cst.alias: cst for cst in self._items}
self._cst_by_ID = {cst.pk: cst for cst in self._items}
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
self._cst_by_alias = schema.cache.by_alias
self.info = {
cst.pk: {
'is_simple': False,
@ -452,7 +574,7 @@ class SemanticInfo:
'parent': cst.pk,
'children': []
}
for cst in self._items
for cst in schema.cache.constituents
}
self._calculate_attributes()
@ -475,7 +597,7 @@ class SemanticInfo:
''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self):
def _calculate_attributes(self) -> None:
for cst_id in self._graph.topological_order():
cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
@ -485,7 +607,7 @@ class SemanticInfo:
parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent
if parent != cst_id:
self.info[parent]['children'].append(cst_id)
cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
@ -565,12 +687,8 @@ class _OrderManager:
def __init__(self, schema: RSForm):
self._semantic = schema.semantic()
self._graph = schema._graph_formal()
self._items = list(
schema.constituents()
.only('order', 'alias', 'cst_type', 'definition_formal')
.order_by('order')
)
self._cst_by_ID = {cst.pk: cst for cst in self._items}
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id
def restore_order(self) -> None:
''' Implement order restoration process. '''
@ -615,10 +733,9 @@ class _OrderManager:
result.append(child)
self._items = result
@transaction.atomic
def _save_order(self) -> None:
order = 1
for cst in self._items:
cst.order = order
cst.save()
order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -1,4 +1,4 @@
''' Django: Models. '''
from .Constituenta import Constituenta, CstType
from .RSForm import RSForm
from .Constituenta import Constituenta, CstType, extract_globals, replace_entities, replace_globals
from .RSForm import INSERT_LAST, RSForm

View File

@ -2,7 +2,7 @@
import json
import re
from enum import IntEnum, unique
from typing import Set, Tuple, cast
from typing import Tuple, cast
import pyconcept
@ -10,7 +10,6 @@ from shared import messages as msg
from .Constituenta import CstType
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_RE_TEMPLATE = r'R\d+'
_RE_COMPLEX_SYMBOLS = r'[∀∃×ℬ;|:]'
@ -67,11 +66,6 @@ def is_functional(cst_type: CstType) -> bool:
]
def extract_globals(expression: str) -> Set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
def guess_type(alias: str) -> CstType:
''' Get CstType for alias. '''
prefix = alias[0]

View File

@ -17,6 +17,7 @@ from .data_access import (
CstSerializer,
CstSubstituteSerializer,
CstTargetSerializer,
CstUpdateSerializer,
InlineSynthesisSerializer,
RSFormParseSerializer,
RSFormSerializer,

View File

@ -1,9 +1,8 @@
''' Serializers for persistent data manipulation. '''
from typing import Optional, cast
from typing import cast
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.db.models import Q
from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField
@ -39,25 +38,30 @@ class CstSerializer(serializers.ModelSerializer):
fields = '__all__'
read_only_fields = ('id', 'schema', 'order', 'alias', 'cst_type', 'definition_resolved', 'term_resolved')
def update(self, instance: Constituenta, validated_data) -> Constituenta:
data = validated_data # Note: use alias for better code readability
definition: Optional[str] = data['definition_raw'] if 'definition_raw' in data else None
term: Optional[str] = data['term_raw'] if 'term_raw' in data else None
term_changed = 'term_forms' in data
schema = RSForm(instance.schema)
if definition is not None and definition != instance.definition_raw:
data['definition_resolved'] = schema.resolver().resolve(definition)
if term is not None and term != instance.term_raw:
data['term_resolved'] = schema.resolver().resolve(term)
if data['term_resolved'] != instance.term_resolved and 'term_forms' not in data:
data['term_forms'] = []
term_changed = data['term_resolved'] != instance.term_resolved
result: Constituenta = super().update(instance, data)
if term_changed:
schema.on_term_change([result.pk])
result.refresh_from_db()
schema.save()
return result
class CstUpdateSerializer(serializers.Serializer):
''' Serializer: Constituenta update. '''
class ConstituentaUpdateData(serializers.ModelSerializer):
''' Serializer: Operation creation data. '''
class Meta:
''' serializer metadata. '''
model = Constituenta
fields = 'convention', 'definition_formal', 'definition_raw', 'term_raw', 'term_forms'
target = PKField(
many=False,
queryset=Constituenta.objects.all().only('convention', 'definition_formal', 'definition_raw', 'term_raw')
)
item_data = ConstituentaUpdateData()
def validate(self, attrs):
schema = cast(LibraryItem, self.context['schema'])
cst = cast(Constituenta, attrs['target'])
if schema and cst.schema_id != schema.pk:
raise serializers.ValidationError({
f'{cst.pk}': msg.constituentaNotInRSform(schema.title)
})
return attrs
class CstDetailsSerializer(serializers.ModelSerializer):
@ -72,7 +76,12 @@ class CstDetailsSerializer(serializers.ModelSerializer):
class CstCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. '''
insert_after = serializers.IntegerField(required=False, allow_null=True)
insert_after = PKField(
many=False,
allow_null=True,
required=False,
queryset=Constituenta.objects.all().only('schema_id', 'order')
)
alias = serializers.CharField(max_length=8)
cst_type = serializers.ChoiceField(CstType.choices)
@ -117,7 +126,7 @@ class RSFormSerializer(serializers.ModelSerializer):
for link in Inheritance.objects.filter(Q(child__schema=instance) | Q(parent__schema=instance)):
result['inheritance'].append([link.child.pk, link.parent.pk])
result['oss'] = []
for oss in LibraryItem.objects.filter(items__result=instance).only('alias'):
for oss in LibraryItem.objects.filter(operations__result=instance).only('alias'):
result['oss'].append({
'id': oss.pk,
'alias': oss.alias
@ -149,7 +158,6 @@ class RSFormSerializer(serializers.ModelSerializer):
result['version'] = version
return result | data
@transaction.atomic
def restore_from_version(self, data: dict):
''' Load data from version. '''
schema = RSForm(cast(LibraryItem, self.instance))
@ -243,7 +251,7 @@ class CstTargetSerializer(serializers.Serializer):
class CstRenameSerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. '''
target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'schema'))
target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'cst_type', 'schema'))
alias = serializers.CharField()
cst_type = serializers.CharField()
@ -312,9 +320,9 @@ class CstSubstituteSerializer(serializers.Serializer):
raise serializers.ValidationError({
f'{original_cst.pk}': msg.substituteDouble(original_cst.alias)
})
if original_cst.alias == substitution_cst.alias:
if original_cst.pk == substitution_cst.pk:
raise serializers.ValidationError({
'alias': msg.substituteTrivial(original_cst.alias)
'original': msg.substituteTrivial(original_cst.alias)
})
if original_cst.schema_id != schema.pk:
raise serializers.ValidationError({

View File

@ -58,3 +58,28 @@ class TestConstituenta(TestCase):
self.assertEqual(cst.term_forms, [])
self.assertEqual(cst.definition_resolved, '')
self.assertEqual(cst.definition_raw, '')
def test_extract_references(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 X2',
term_raw='@{X3|sing} is a @{X4|sing}',
definition_raw='@{X5|sing}'
)
self.assertEqual(cst.extract_references(), set(['X1', 'X2', 'X3', 'X4', 'X5']))
def text_apply_mapping(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 = X2',
term_raw='@{X1|sing}',
definition_raw='@{X2|sing}'
)
cst.apply_mapping({'X1': 'X3', 'X2': 'X4'})
self.assertEqual(cst.definition_formal, 'X3 = X4')
self.assertEqual(cst.term_raw, '@{X3|sing}')
self.assertEqual(cst.definition_raw, '@{X4|sing}')

View File

@ -1,15 +1,16 @@
''' Testing models: api_RSForm. '''
from django.forms import ValidationError
from django.test import TestCase
from apps.rsform.models import Constituenta, CstType, RSForm
from apps.users.models import User
from shared.DBTester import DBTester
class TestRSForm(TestCase):
class TestRSForm(DBTester):
''' Testing RSForm wrapper. '''
def setUp(self):
super().setUp()
self.user1 = User.objects.create(username='User1')
self.user2 = User.objects.create(username='User2')
self.schema = RSForm.create(title='Test')
@ -180,7 +181,6 @@ class TestRSForm(TestCase):
alias='D1',
definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}',
convention='X1',
term_raw='@{X1|plur}'
)
@ -188,7 +188,6 @@ class TestRSForm(TestCase):
d1.refresh_from_db()
self.assertEqual(d1.definition_formal, 'X3 = X4 = X2', msg='Map IDs in expression')
self.assertEqual(d1.definition_raw, '@{X4|sing}', msg='Map IDs in definition')
self.assertEqual(d1.convention, 'X3', msg='Map IDs in convention')
self.assertEqual(d1.term_raw, '@{X3|plur}', msg='Map IDs in term')
self.assertEqual(d1.term_resolved, '', msg='Do not run resolve on mapping')
self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping')
@ -208,7 +207,7 @@ class TestRSForm(TestCase):
definition_formal=x1.alias
)
self.schema.substitute(x1, x2)
self.schema.substitute([(x1, x2)])
x2.refresh_from_db()
d1.refresh_from_db()
self.assertEqual(self.schema.constituents().count(), 2)
@ -320,7 +319,6 @@ class TestRSForm(TestCase):
x2 = self.schema.insert_new('X21')
d1 = self.schema.insert_new(
alias='D11',
convention='D11 - cool',
definition_formal='X21=X21',
term_raw='@{X21|sing}',
definition_raw='@{X11|datv}',
@ -335,7 +333,6 @@ class TestRSForm(TestCase):
self.assertEqual(x1.alias, 'X1')
self.assertEqual(x2.alias, 'X2')
self.assertEqual(d1.alias, 'D1')
self.assertEqual(d1.convention, 'D1 - cool')
self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test')

View File

@ -100,7 +100,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(response.data['items'][1]['id'], x2.pk)
self.assertEqual(response.data['items'][1]['term_raw'], x2.term_raw)
self.assertEqual(response.data['items'][1]['term_resolved'], x2.term_resolved)
self.assertEqual(response.data['subscribers'], [self.user.pk])
self.assertEqual(response.data['subscribers'], [])
self.assertEqual(response.data['editors'], [])
self.assertEqual(response.data['inheritance'], [])
self.assertEqual(response.data['oss'], [])
@ -183,9 +183,10 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
def test_create_constituenta(self):
data = {'alias': 'X3'}
data = {'alias': 'X3', 'cst_type': CstType.BASE}
self.executeForbidden(data=data, item=self.unowned_id)
data = {'alias': 'X3'}
self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id)
@ -524,7 +525,7 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_partial_update(self):
data = {'id': self.cst1.pk, 'convention': 'tt'}
data = {'target': self.cst1.pk, 'item_data': {'convention': 'tt'}}
self.executeForbidden(data=data, schema=self.rsform_unowned.model.pk)
self.logout()
@ -542,10 +543,12 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_resolved_no_refs(self):
data = {
'id': self.cst3.pk,
'target': self.cst3.pk,
'item_data': {
'term_raw': 'New term',
'definition_raw': 'New def'
}
}
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db()
self.assertEqual(response.data['term_resolved'], 'New term')
@ -557,10 +560,12 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_resolved_refs(self):
data = {
'id': self.cst3.pk,
'target': self.cst3.pk,
'item_data': {
'term_raw': '@{X1|nomn,sing}',
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}'
}
}
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db()
self.assertEqual(self.cst3.term_resolved, self.cst1.term_resolved)
@ -568,14 +573,32 @@ class TestConstituentaAPI(EndpointTester):
self.assertEqual(self.cst3.definition_resolved, f'{self.cst1.term_resolved} form1')
self.assertEqual(response.data['definition_resolved'], f'{self.cst1.term_resolved} form1')
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_term_forms(self):
data = {
'target': self.cst3.pk,
'item_data': {
'definition_raw': '@{X3|sing,datv}',
'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}]
}
}
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db()
self.assertEqual(self.cst3.definition_resolved, 'form1')
self.assertEqual(response.data['definition_resolved'], 'form1')
self.assertEqual(self.cst3.term_forms, data['item_data']['term_forms'])
self.assertEqual(response.data['term_forms'], data['item_data']['term_forms'])
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_readonly_cst_fields(self):
data = {
'id': self.cst1.pk,
'target': self.cst1.pk,
'item_data': {
'alias': 'X33',
'order': 10
}
}
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.assertEqual(response.data['alias'], 'X1')
self.assertEqual(response.data['alias'], self.cst1.alias)

View File

@ -16,6 +16,7 @@ from rest_framework.serializers import ValidationError
from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead
from apps.library.serializers import LibraryItemSerializer
from apps.oss.models import ChangeManager
from apps.users.models import User
from shared import messages as msg
from shared import permissions, utility
@ -40,14 +41,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
if self.action in [
'load_trs',
'create_cst',
'delete_multiple_cst',
'rename_cst',
'update_cst',
'move_cst',
'delete_multiple_cst',
'substitute',
'restore_order',
'reset_aliases',
'produce_structure',
'update_cst'
]:
permission_list = [permissions.ItemEditor]
elif self.action in [
@ -74,34 +75,36 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['post'], url_path='create-cst')
def create_cst(self, request: Request, pk):
def create_cst(self, request: Request, pk) -> HttpResponse:
''' Create new constituenta. '''
schema = self._get_item()
serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
if 'insert_after' in data and data['insert_after'] is not None:
try:
insert_after = m.Constituenta.objects.get(pk=data['insert_after'])
except LibraryItem.DoesNotExist:
return Response(status=c.HTTP_404_NOT_FOUND)
else:
if 'insert_after' not in data:
insert_after = None
new_cst = m.RSForm(schema).create_cst(data, insert_after)
else:
insert_after = data['insert_after']
schema = m.RSForm(self._get_item())
with transaction.atomic():
new_cst = schema.create_cst(data, insert_after)
hosts = LibraryItem.objects.filter(operations__result=schema.model)
for host in hosts:
ChangeManager(host).on_create_cst(new_cst, schema)
schema.refresh_from_db()
return Response(
status=c.HTTP_201_CREATED,
data={
'new_cst': s.CstSerializer(new_cst).data,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(schema.model).data
}
)
@extend_schema(
summary='update persistent attributes of a given constituenta',
tags=['RSForm'],
request=s.CstSerializer,
request=s.CstUpdateSerializer,
responses={
c.HTTP_200_OK: s.CstSerializer,
c.HTTP_400_BAD_REQUEST: None,
@ -110,22 +113,24 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='update-cst')
def update_cst(self, request: Request, pk):
def update_cst(self, request: Request, pk) -> HttpResponse:
''' Update persistent attributes of a given constituenta. '''
schema = self._get_item()
serializer = s.CstSerializer(data=request.data, partial=True)
model = self._get_item()
serializer = s.CstUpdateSerializer(data=request.data, partial=True, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = m.Constituenta.objects.get(pk=request.data['id'])
if cst.schema != schema:
raise ValidationError({
'schema': msg.constituentaNotInRSform(schema.title)
})
serializer.update(instance=cst, validated_data=serializer.validated_data)
cst = cast(m.Constituenta, serializer.validated_data['target'])
schema = m.RSForm(model)
data = serializer.validated_data['item_data']
with transaction.atomic():
hosts = LibraryItem.objects.filter(operations__result=model)
old_data = schema.update_cst(cst, data)
for host in hosts:
ChangeManager(host).on_update_cst(cst, data, old_data, schema)
return Response(
status=c.HTTP_200_OK,
data=s.CstSerializer(cst).data
data=s.CstSerializer(m.Constituenta.objects.get(pk=request.data['target'])).data
)
@extend_schema(
@ -140,11 +145,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='produce-structure')
def produce_structure(self, request: Request, pk):
def produce_structure(self, request: Request, pk) -> HttpResponse:
''' Produce a term for every element of the target constituenta typification. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstTargetSerializer(data=request.data, context={'schema': schema})
serializer = s.CstTargetSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
if cst.cst_type not in [m.CstType.FUNCTION, m.CstType.STRUCTURED, m.CstType.TERM]:
@ -152,20 +157,20 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
f'{cst.pk}': msg.constituentaNoStructure()
})
schema_details = s.RSFormParseSerializer(schema).data['items']
schema_details = s.RSFormParseSerializer(model).data['items']
cst_parse = next(item for item in schema_details if item['id'] == cst.pk)['parse']
if not cst_parse['typification']:
return Response(
status=c.HTTP_400_BAD_REQUEST,
data={f'{cst.pk}': msg.constituentaNoStructure()}
)
result = m.RSForm(schema).produce_structure(cst, cst_parse)
with transaction.atomic():
result = m.RSForm(model).produce_structure(cst, cst_parse)
return Response(
status=c.HTTP_200_OK,
data={
'cst_list': result,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(model).data
}
)
@ -181,29 +186,33 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='rename-cst')
def rename_cst(self, request: Request, pk):
def rename_cst(self, request: Request, pk) -> HttpResponse:
''' Rename constituenta possibly changing type. '''
schema = self._get_item()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': schema})
model = self._get_item()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
old_alias = cst.alias
changed_type = cst.cst_type != serializer.validated_data['cst_type']
mapping = {cst.alias: serializer.validated_data['alias']}
cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type']
schema = m.RSForm(model)
with transaction.atomic():
cst.save()
m.RSForm(schema).apply_mapping(mapping={old_alias: cst.alias}, change_aliases=False)
schema.refresh_from_db()
schema.apply_mapping(mapping=mapping, change_aliases=False)
cst.refresh_from_db()
if changed_type:
hosts = LibraryItem.objects.filter(operations__result=model)
for host in hosts:
ChangeManager(host).on_change_cst_type(cst, schema)
return Response(
status=c.HTTP_200_OK,
data={
'new_cst': s.CstSerializer(cst).data,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(model).data
}
)
@ -219,25 +228,27 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='substitute')
def substitute(self, request: Request, pk):
def substitute(self, request: Request, pk) -> HttpResponse:
''' Substitute occurrences of constituenta with another one. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstSubstituteSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
m.RSForm(schema).substitute(original, replacement)
substitutions.append((original, replacement))
m.RSForm(model).substitute(substitutions)
schema.refresh_from_db()
model.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -252,20 +263,19 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='delete-multiple-cst')
def delete_multiple_cst(self, request: Request, pk):
def delete_multiple_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete multiple constituents. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstListSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
m.RSForm(schema).delete_cst(serializer.validated_data['items'])
schema.refresh_from_db()
with transaction.atomic():
m.RSForm(model).delete_cst(serializer.validated_data['items'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -280,21 +290,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='move-cst')
def move_cst(self, request: Request, pk):
def move_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Move multiple constituents. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstMoveSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
m.RSForm(schema).move_cst(
listCst=serializer.validated_data['items'],
target=serializer.validated_data['move_to']
with transaction.atomic():
m.RSForm(model).move_cst(
target=serializer.validated_data['items'],
destination=serializer.validated_data['move_to']
)
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -308,13 +319,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='reset-aliases')
def reset_aliases(self, request: Request, pk):
def reset_aliases(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Recreate all aliases based on order. '''
schema = self._get_item()
m.RSForm(schema).reset_aliases()
model = self._get_item()
m.RSForm(model).reset_aliases()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -328,13 +339,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='restore-order')
def restore_order(self, request: Request, pk):
def restore_order(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Restore order based on types and term graph. '''
schema = self._get_item()
m.RSForm(schema).restore_order()
model = self._get_item()
m.RSForm(model).restore_order()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -349,15 +360,15 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['patch'], url_path='load-trs')
def load_trs(self, request: Request, pk):
def load_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Load data from file and replace current schema. '''
input_serializer = s.RSFormUploadSerializer(data=request.data)
input_serializer.is_valid(raise_exception=True)
schema = self._get_item()
model = self._get_item()
load_metadata = input_serializer.validated_data['load_metadata']
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
data['id'] = schema.pk
data['id'] = model.pk
serializer = s.RSFormTRSSerializer(
data=data,
@ -380,7 +391,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['get'], url_path='contents')
def contents(self, request: Request, pk):
def contents(self, request: Request, pk) -> HttpResponse:
''' Endpoint: View schema db contents (including constituents). '''
serializer = s.RSFormSerializer(self.get_object())
return Response(
@ -398,7 +409,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk):
def details(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Detailed schema view including statuses and parse. '''
serializer = s.RSFormParseSerializer(self.get_object())
return Response(
@ -416,7 +427,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
},
)
@action(detail=True, methods=['post'], url_path='check')
def check(self, request: Request, pk):
def check(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Check RSLang expression against schema context. '''
serializer = s.ExpressionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@ -438,7 +449,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['post'], url_path='resolve')
def resolve(self, request: Request, pk):
def resolve(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Resolve references in text against schema terms context. '''
serializer = s.TextSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@ -460,12 +471,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}
)
@action(detail=True, methods=['get'], url_path='export-trs')
def export_trs(self, request: Request, pk):
def export_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Download Exteor compatible file. '''
schema = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(schema)).data
model = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(model)).data
file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
filename = utils.filename_for_schema(schema.alias)
filename = utils.filename_for_schema(model.alias)
response = HttpResponse(file, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename={filename}'
return response
@ -485,7 +496,7 @@ class TrsImportView(views.APIView):
c.HTTP_403_FORBIDDEN: None
}
)
def post(self, request: Request):
def post(self, request: Request) -> HttpResponse:
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
owner = cast(User, self.request.user)
_prepare_rsform_data(data, request, owner)
@ -512,7 +523,7 @@ class TrsImportView(views.APIView):
}
)
@api_view(['POST'])
def create_rsform(request: Request):
def create_rsform(request: Request) -> HttpResponse:
''' Endpoint: Create RSForm from user input and/or trs file. '''
owner = cast(User, request.user) if not request.user.is_anonymous else None
if 'file' not in request.FILES:
@ -564,7 +575,7 @@ def _prepare_rsform_data(data: dict, request: Request, owner: Union[User, None])
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@api_view(['PATCH'])
def inline_synthesis(request: Request):
def inline_synthesis(request: Request) -> HttpResponse:
''' Endpoint: Inline synthesis. '''
serializer = s.InlineSynthesisSerializer(
data=request.data,
@ -577,16 +588,18 @@ def inline_synthesis(request: Request):
with transaction.atomic():
new_items = receiver.insert_copy(items)
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
if original in items:
index = next(i for (i, cst) in enumerate(items) if cst == original)
index = next(i for (i, cst) in enumerate(items) if cst.pk == original.pk)
original = new_items[index]
else:
index = next(i for (i, cst) in enumerate(items) if cst == replacement)
index = next(i for (i, cst) in enumerate(items) if cst.pk == replacement.pk)
replacement = new_items[index]
receiver.substitute(original, replacement)
substitutions.append((original, replacement))
receiver.substitute(substitutions)
receiver.restore_order()
return Response(

View File

@ -134,7 +134,6 @@ class TestUserUserProfileAPIView(EndpointTester):
def test_password_reset_request(self):
self.executeBadData({'email': 'invalid@mail.ru'})
self.executeOK({'email': self.user.email})
# TODO: check if mail server actually sent email and if reset procedure works
class TestSignupAPIView(EndpointTester):

View File

@ -0,0 +1,23 @@
''' Utils: tester for database operations. '''
import logging
from django.db import connection
from rest_framework.test import APITestCase
class DBTester(APITestCase):
''' Abstract base class for Testing database. '''
def setUp(self):
self.logger = logging.getLogger('django.db.backends')
self.logger.setLevel(logging.DEBUG)
def start_db_log(self):
''' Warning! Do not use this second time before calling stop_db_log. '''
''' Warning! Do not forget to enable global logging in settings. '''
logging.disable(logging.NOTSET)
connection.force_debug_cursor = True
def stop_db_log(self):
connection.force_debug_cursor = False
logging.disable(logging.CRITICAL)

View File

@ -1,13 +1,12 @@
''' Utils: base tester class for endpoints. '''
import logging
from django.db import connection
from rest_framework import status
from rest_framework.test import APIClient, APIRequestFactory, APITestCase
from rest_framework.test import APIClient, APIRequestFactory
from apps.library.models import Editor, LibraryItem
from apps.users.models import User
from .DBTester import DBTester
def decl_endpoint(endpoint: str, method: str):
''' Decorator for EndpointTester methods to provide API attributes. '''
@ -25,10 +24,11 @@ def decl_endpoint(endpoint: str, method: str):
return set_endpoint_inner
class EndpointTester(APITestCase):
class EndpointTester(DBTester):
''' Abstract base class for Testing endpoints. '''
def setUp(self):
super().setUp()
self.factory = APIRequestFactory()
self.user = User.objects.create(
username='UserTest',
@ -43,9 +43,6 @@ class EndpointTester(APITestCase):
self.client = APIClient()
self.client.force_authenticate(user=self.user)
self.logger = logging.getLogger('django.db.backends')
self.logger.setLevel(logging.DEBUG)
def setUpFullUsers(self):
self.factory = APIRequestFactory()
self.user = User.objects.create_user(
@ -77,16 +74,6 @@ class EndpointTester(APITestCase):
def logout(self):
self.client.logout()
def start_db_log(self):
''' Warning! Do not use this second time before calling stop_db_log. '''
''' Warning! Do not forget to enable global logging in settings. '''
logging.disable(logging.NOTSET)
connection.force_debug_cursor = True
def stop_db_log(self):
connection.force_debug_cursor = False
logging.disable(logging.CRITICAL)
def set_params(self, **kwargs):
''' Given named argument values resolve current endpoint_mask. '''
if self.endpoint_mask and len(kwargs) > 0:

View File

@ -25,7 +25,7 @@ function TextArea({
<div
className={clsx(
{
'flex flex-col flex-grow gap-2': !dense,
'flex flex-col gap-2': !dense,
'flex flex-grow items-center gap-3': dense
},
dense && className

View File

@ -1,7 +1,5 @@
import { FolderTree } from './FolderTree';
// TODO: test FolderNode and FolderTree exhaustively
describe('Testing Tree construction', () => {
test('empty Tree should be empty', () => {
const tree = new FolderTree();

View File

@ -152,14 +152,12 @@ export interface ICstMovetoData extends IConstituentaList {
/**
* Represents data, used in updating persistent attributes in {@link IConstituenta}.
*/
export interface ICstUpdateData
extends Pick<IConstituentaMeta, 'id'>,
Partial<
Pick<
IConstituentaMeta,
'alias' | 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'
>
> {}
export interface ICstUpdateData {
target: ConstituentaID;
item_data: Partial<
Pick<IConstituentaMeta, 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'>
>;
}
/**
* Represents data, used in renaming {@link IConstituenta}.

View File

@ -55,7 +55,6 @@ function FormConstituenta({
}: FormConstituentaProps) {
const { schema, cstUpdate, processing } = useRSForm();
const [alias, setAlias] = useState('');
const [term, setTerm] = useState('');
const [textDefinition, setTextDefinition] = useState('');
const [expression, setExpression] = useState('');
@ -98,7 +97,6 @@ function FormConstituenta({
useLayoutEffect(() => {
if (state) {
setAlias(state.alias);
setConvention(state.convention || '');
setTerm(state.term_raw || '');
setTextDefinition(state.definition_raw || '');
@ -116,13 +114,21 @@ function FormConstituenta({
return;
}
const data: ICstUpdateData = {
id: state.id,
alias: alias,
term_raw: term,
definition_formal: expression,
definition_raw: textDefinition,
convention: convention
target: state.id,
item_data: {}
};
if (state.term_raw !== term) {
data.item_data.term_raw = term;
}
if (state.definition_formal !== expression) {
data.item_data.definition_formal = expression;
}
if (state.definition_raw !== textDefinition) {
data.item_data.definition_raw = textDefinition;
}
if (state.convention !== convention) {
data.item_data.convention = convention;
}
cstUpdate(data, () => toast.success(information.changesSaved));
}
@ -219,7 +225,7 @@ function FormConstituenta({
onChange={event => setConvention(event.target.value)}
/>
</AnimateFade>
{!showConvention && (!disabled || processing) ? (
<AnimateFade key='cst_convention_button' hideContent={showConvention || (disabled && !processing)}>
<button
key='cst_disable_comment'
id='cst_disable_comment'
@ -230,7 +236,7 @@ function FormConstituenta({
>
Добавить комментарий
</button>
) : null}
</AnimateFade>
{!disabled || processing ? (
<div className='self-center flex'>

View File

@ -323,8 +323,8 @@ export const RSEditState = ({
return;
}
const data: ICstUpdateData = {
id: activeCst.id,
term_forms: forms
target: activeCst.id,
item_data: { term_forms: forms }
};
model.cstUpdate(data, () => toast.success(information.changesSaved));
},

View File

@ -57,7 +57,7 @@ function ViewConstituents({ expression, schema, activeCst, isBottom, onOpenEdit
className={clsx(
'border overflow-visible', // prettier: split-lines
{
'mt-[2.2rem] rounded-l-md rounded-r-none': !isBottom,
'mt-[2.2rem] rounded-l-md rounded-r-none h-fit': !isBottom,
'mt-3 mx-6 rounded-md md:w-[45.8rem]': isBottom
}
)}