Compare commits

...

4 Commits

Author SHA1 Message Date
Ivan
a9edf842d8 M: Propagate cst_update
Some checks failed
Frontend CI / build (22.x) (push) Waiting to run
Backend CI / build (3.12) (push) Has been cancelled
2024-08-10 11:46:08 +03:00
Ivan
1600c8abd2 M: Change propogation 2024-08-09 20:57:03 +03:00
Ivan
5c4c0b38d5 R: Cleanup model API: move logic to view/serializer 2024-08-08 15:31:32 +03:00
Ivan
1647b3c0e8 R: RSForm cache and transaction.atomic 2024-08-07 21:54:14 +03:00
40 changed files with 1094 additions and 496 deletions

View File

@ -1,7 +1,6 @@
''' Models: LibraryItem. ''' ''' Models: LibraryItem. '''
import re import re
from django.db import transaction
from django.db.models import ( from django.db.models import (
SET_NULL, SET_NULL,
BooleanField, BooleanField,
@ -16,7 +15,6 @@ from django.db.models import (
from apps.users.models import User from apps.users.models import User
from .Subscription import Subscription
from .Version import Version from .Version import Version
@ -125,34 +123,3 @@ class LibraryItem(Model):
def versions(self) -> QuerySet[Version]: def versions(self) -> QuerySet[Version]:
''' Get all Versions of this item. ''' ''' Get all Versions of this item. '''
return Version.objects.filter(item=self.pk).order_by('-time_create') return Version.objects.filter(item=self.pk).order_by('-time_create')
# TODO: move to View layer
@transaction.atomic
def save(self, *args, **kwargs):
''' Save updating subscriptions and connected operations. '''
if not self._state.adding:
self._update_connected_operations()
subscribe = self._state.adding and self.owner
super().save(*args, **kwargs)
if subscribe:
Subscription.subscribe(user=self.owner_id, item=self.pk)
def _update_connected_operations(self):
# using method level import to prevent circular dependency
from apps.oss.models import Operation # pylint: disable=import-outside-toplevel
operations = Operation.objects.filter(result__pk=self.pk)
if not operations.exists():
return
for operation in operations:
changed = False
if operation.alias != self.alias:
operation.alias = self.alias
changed = True
if operation.title != self.title:
operation.title = self.title
changed = True
if operation.comment != self.comment:
operation.comment = self.comment
changed = True
if changed:
operation.save()

View File

@ -36,7 +36,7 @@ class LibraryItemSerializer(serializers.ModelSerializer):
class LibraryItemCloneSerializer(serializers.ModelSerializer): class LibraryItemCloneSerializer(serializers.ModelSerializer):
''' Serializer: LibraryItem cloning. ''' ''' Serializer: LibraryItem cloning. '''
items = PKField(many=True, required=False, queryset=Constituenta.objects.all()) items = PKField(many=True, required=False, queryset=Constituenta.objects.all().only('pk'))
class Meta: class Meta:
''' serializer metadata. ''' ''' serializer metadata. '''

View File

@ -68,7 +68,6 @@ class TestLibraryItem(TestCase):
self.assertEqual(item.alias, 'KS1') self.assertEqual(item.alias, 'KS1')
self.assertEqual(item.comment, 'Test comment') self.assertEqual(item.comment, 'Test comment')
self.assertEqual(item.location, LocationHead.COMMON) self.assertEqual(item.location, LocationHead.COMMON)
self.assertTrue(Subscription.objects.filter(user=item.owner, item=item).exists())
class TestLocation(TestCase): class TestLocation(TestCase):

View File

@ -21,9 +21,7 @@ class TestSubscription(TestCase):
def test_default(self): def test_default(self):
subs = list(Subscription.objects.filter(item=self.item)) subs = list(Subscription.objects.filter(item=self.item))
self.assertEqual(len(subs), 1) self.assertEqual(len(subs), 0)
self.assertEqual(subs[0].item, self.item)
self.assertEqual(subs[0].user, self.user1)
def test_str(self): def test_str(self):

View File

@ -49,6 +49,7 @@ class TestLibraryViewset(EndpointTester):
self.assertEqual(response.data['item_type'], LibraryItemType.RSFORM) self.assertEqual(response.data['item_type'], LibraryItemType.RSFORM)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], data['alias']) self.assertEqual(response.data['alias'], data['alias'])
self.assertTrue(Subscription.objects.filter(user=self.user, item_id=response.data['id']).exists())
data = { data = {
'item_type': LibraryItemType.OPERATION_SCHEMA, 'item_type': LibraryItemType.OPERATION_SCHEMA,
@ -74,7 +75,7 @@ class TestLibraryViewset(EndpointTester):
@decl_endpoint('/api/library/{item}', method='patch') @decl_endpoint('/api/library/{item}', method='patch')
def test_update(self): def test_update(self):
data = {'id': self.unowned.pk, 'title': 'New Title'} data = {'title': 'New Title'}
self.executeNotFound(data=data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
@ -86,13 +87,12 @@ class TestLibraryViewset(EndpointTester):
self.unowned.save() self.unowned.save()
self.executeForbidden(data=data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
data = {'id': self.owned.pk, 'title': 'New Title'} data = {'title': 'New Title'}
response = self.executeOK(data=data, item=self.owned.pk) response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], self.owned.alias) self.assertEqual(response.data['alias'], self.owned.alias)
data = { data = {
'id': self.owned.pk,
'title': 'Another Title', 'title': 'Another Title',
'owner': self.user2.pk, 'owner': self.user2.pk,
'access_policy': AccessPolicy.PROTECTED, 'access_policy': AccessPolicy.PROTECTED,

View File

@ -142,7 +142,7 @@ class TestVersionViews(EndpointTester):
version_id = self._create_version(data=data) version_id = self._create_version(data=data)
invalid_id = version_id + 1337 invalid_id = version_id + 1337
d1.delete() self.owned.delete_cst([d1])
x3 = self.owned.insert_new('X3') x3 = self.owned.insert_new('X3')
x1.order = x3.order x1.order = x3.order
x1.convention = 'Test2' x1.convention = 'Test2'

View File

@ -4,6 +4,7 @@ from typing import cast
from django.db import transaction from django.db import transaction
from django.db.models import Q from django.db.models import Q
from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics from rest_framework import generics
from rest_framework import status as c from rest_framework import status as c
@ -12,7 +13,7 @@ from rest_framework.decorators import action
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from apps.oss.models import OperationSchema from apps.oss.models import Operation, OperationSchema
from apps.rsform.models import RSForm from apps.rsform.models import RSForm
from apps.rsform.serializers import RSFormParseSerializer from apps.rsform.serializers import RSFormParseSerializer
from apps.users.models import User from apps.users.models import User
@ -36,11 +37,35 @@ class LibraryViewSet(viewsets.ModelViewSet):
return s.LibraryItemBaseSerializer return s.LibraryItemBaseSerializer
return s.LibraryItemSerializer return s.LibraryItemSerializer
def perform_create(self, serializer): def perform_create(self, serializer) -> None:
if not self.request.user.is_anonymous and 'owner' not in self.request.POST: if not self.request.user.is_anonymous and 'owner' not in self.request.POST:
return serializer.save(owner=self.request.user) instance = serializer.save(owner=self.request.user)
else: else:
return serializer.save() instance = serializer.save()
if instance.owner:
m.Subscription.subscribe(user=instance.owner_id, item=instance.pk)
def perform_update(self, serializer) -> None:
instance = serializer.save()
operations = Operation.objects.filter(result__pk=instance.pk)
if not operations.exists():
return
update_list: list[Operation] = []
for operation in operations:
changed = False
if operation.alias != instance.alias:
operation.alias = instance.alias
changed = True
if operation.title != instance.title:
operation.title = instance.title
changed = True
if operation.comment != instance.comment:
operation.comment = instance.comment
changed = True
if changed:
update_list.append(operation)
if update_list:
Operation.objects.bulk_update(update_list, ['alias', 'title', 'comment'])
def get_permissions(self): def get_permissions(self):
if self.action in ['update', 'partial_update']: if self.action in ['update', 'partial_update']:
@ -79,7 +104,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
} }
) )
@action(detail=True, methods=['post'], url_path='clone') @action(detail=True, methods=['post'], url_path='clone')
def clone(self, request: Request, pk): def clone(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Create deep copy of library item. ''' ''' Endpoint: Create deep copy of library item. '''
serializer = s.LibraryItemCloneSerializer(data=request.data) serializer = s.LibraryItemCloneSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -139,7 +164,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
}, },
) )
@action(detail=True, methods=['delete']) @action(detail=True, methods=['delete'])
def unsubscribe(self, request: Request, pk): def unsubscribe(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Unsubscribe current user from item. ''' ''' Endpoint: Unsubscribe current user from item. '''
item = self._get_item() item = self._get_item()
m.Subscription.unsubscribe(user=cast(int, self.request.user.pk), item=item.pk) m.Subscription.unsubscribe(user=cast(int, self.request.user.pk), item=item.pk)
@ -156,7 +181,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
} }
) )
@action(detail=True, methods=['patch'], url_path='set-owner') @action(detail=True, methods=['patch'], url_path='set-owner')
def set_owner(self, request: Request, pk): def set_owner(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item owner. ''' ''' Endpoint: Set item owner. '''
item = self._get_item() item = self._get_item()
serializer = s.UserTargetSerializer(data=request.data) serializer = s.UserTargetSerializer(data=request.data)
@ -188,7 +213,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
} }
) )
@action(detail=True, methods=['patch'], url_path='set-location') @action(detail=True, methods=['patch'], url_path='set-location')
def set_location(self, request: Request, pk): def set_location(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item location. ''' ''' Endpoint: Set item location. '''
item = self._get_item() item = self._get_item()
serializer = s.LocationSerializer(data=request.data) serializer = s.LocationSerializer(data=request.data)
@ -222,7 +247,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
} }
) )
@action(detail=True, methods=['patch'], url_path='set-access-policy') @action(detail=True, methods=['patch'], url_path='set-access-policy')
def set_access_policy(self, request: Request, pk): def set_access_policy(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set item AccessPolicy. ''' ''' Endpoint: Set item AccessPolicy. '''
item = self._get_item() item = self._get_item()
serializer = s.AccessPolicySerializer(data=request.data) serializer = s.AccessPolicySerializer(data=request.data)
@ -253,7 +278,7 @@ class LibraryViewSet(viewsets.ModelViewSet):
} }
) )
@action(detail=True, methods=['patch'], url_path='set-editors') @action(detail=True, methods=['patch'], url_path='set-editors')
def set_editors(self, request: Request, pk): def set_editors(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Set list of editors for item. ''' ''' Endpoint: Set list of editors for item. '''
item = self._get_item() item = self._get_item()
serializer = s.UsersListSerializer(data=request.data) serializer = s.UsersListSerializer(data=request.data)

View File

@ -1,6 +1,7 @@
''' Endpoints for versions. ''' ''' Endpoints for versions. '''
from typing import cast from typing import cast
from django.db import transaction
from django.http import HttpResponse from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics from rest_framework import generics
@ -40,11 +41,12 @@ class VersionViewset(
} }
) )
@action(detail=True, methods=['patch'], url_path='restore') @action(detail=True, methods=['patch'], url_path='restore')
def restore(self, request: Request, pk): def restore(self, request: Request, pk) -> HttpResponse:
''' Restore version data into current item. ''' ''' Restore version data into current item. '''
version = cast(m.Version, self.get_object()) version = cast(m.Version, self.get_object())
item = cast(m.LibraryItem, version.item) item = cast(m.LibraryItem, version.item)
RSFormSerializer(item).restore_from_version(version.data) with transaction.atomic():
RSFormSerializer(item).restore_from_version(version.data)
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=RSFormParseSerializer(item).data data=RSFormParseSerializer(item).data
@ -61,7 +63,7 @@ class VersionViewset(
} }
) )
@api_view(['GET']) @api_view(['GET'])
def export_file(request: Request, pk: int): def export_file(request: Request, pk: int) -> HttpResponse:
''' Endpoint: Download Exteor compatible file for versioned data. ''' ''' Endpoint: Download Exteor compatible file for versioned data. '''
try: try:
version = m.Version.objects.get(pk=pk) version = m.Version.objects.get(pk=pk)
@ -88,7 +90,7 @@ def export_file(request: Request, pk: int):
) )
@api_view(['POST']) @api_view(['POST'])
@permission_classes([permissions.GlobalUser]) @permission_classes([permissions.GlobalUser])
def create_version(request: Request, pk_item: int): def create_version(request: Request, pk_item: int) -> HttpResponse:
''' Endpoint: Create new version for RSForm copying current content. ''' ''' Endpoint: Create new version for RSForm copying current content. '''
try: try:
item = m.LibraryItem.objects.get(pk=pk_item) item = m.LibraryItem.objects.get(pk=pk_item)
@ -125,7 +127,7 @@ def create_version(request: Request, pk_item: int):
} }
) )
@api_view(['GET']) @api_view(['GET'])
def retrieve_version(request: Request, pk_item: int, pk_version: int): def retrieve_version(request: Request, pk_item: int, pk_version: int) -> HttpResponse:
''' Endpoint: Retrieve version for RSForm. ''' ''' Endpoint: Retrieve version for RSForm. '''
try: try:
item = m.LibraryItem.objects.get(pk=pk_item) item = m.LibraryItem.objects.get(pk=pk_item)

View File

@ -0,0 +1,20 @@
# Generated by Django 5.0.7 on 2024-08-09 13:56
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('library', '0003_alter_librarytemplate_lib_source'),
('oss', '0005_inheritance_operation'),
]
operations = [
migrations.AlterField(
model_name='operation',
name='oss',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='operations', to='library.libraryitem', verbose_name='Схема синтеза'),
),
]

View File

@ -0,0 +1,279 @@
''' Models: Change propagation manager. '''
from typing import Optional, cast
from cctext import extract_entities
from apps.library.models import LibraryItem
from apps.rsform.graph import Graph
from apps.rsform.models import (
INSERT_LAST,
Constituenta,
CstType,
RSForm,
extract_globals,
replace_entities,
replace_globals
)
from .Inheritance import Inheritance
from .Operation import Operation
from .OperationSchema import OperationSchema
from .Substitution import Substitution
CstMapping = dict[str, Constituenta]
# TODO: add more variety tests for cascade resolutions model
class ChangeManager:
''' Change propagation API. '''
class Cache:
''' Cache for RSForm constituents. '''
def __init__(self, oss: OperationSchema):
self._oss = oss
self._schemas: list[RSForm] = []
self._schema_by_id: dict[int, RSForm] = {}
self.operations = list(oss.operations().only('result_id'))
self.operation_by_id = {operation.pk: operation for operation in self.operations}
self.graph = Graph[int]()
for operation in self.operations:
self.graph.add_node(operation.pk)
for argument in self._oss.arguments().only('operation_id', 'argument_id'):
self.graph.add_edge(argument.argument_id, argument.operation_id)
self.is_loaded = False
self.substitutions: list[Substitution] = []
self.inheritance: dict[int, list[tuple[int, int]]] = {}
def insert(self, schema: RSForm) -> None:
''' Insert new schema. '''
if not self._schema_by_id.get(schema.model.pk):
self._insert_new(schema)
def get_schema(self, operation: Operation) -> Optional[RSForm]:
''' Get schema by Operation. '''
if operation.result_id is None:
return None
if operation.result_id in self._schema_by_id:
return self._schema_by_id[operation.result_id]
else:
schema = RSForm.from_id(operation.result_id)
schema.cache.ensure_loaded()
self._insert_new(schema)
return schema
def get_operation(self, schema: RSForm) -> Operation:
''' Get operation by schema. '''
for operation in self.operations:
if operation.result_id == schema.model.pk:
return operation
raise ValueError(f'Operation for schema {schema.model.pk} not found')
def ensure_loaded(self) -> None:
''' Ensure propagation of changes. '''
if self.is_loaded:
return
self.is_loaded = True
self.substitutions = list(self._oss.substitutions().only('operation_id', 'original_id', 'substitution_id'))
for operation in self.operations:
self.inheritance[operation.pk] = []
for item in self._oss.inheritance().only('operation_id', 'parent_id', 'child_id'):
self.inheritance[item.operation_id].append((item.parent_id, item.child_id))
def get_successor_for(
self,
parent_cst: int,
operation: int,
ignore_substitution: bool = False
) -> Optional[int]:
''' Get child for parent inside target RSFrom. '''
if not ignore_substitution:
for sub in self.substitutions:
if sub.operation_id == operation and sub.original_id == parent_cst:
return sub.substitution_id
for item in self.inheritance[operation]:
if item[0] == parent_cst:
return item[1]
return None
def insert_inheritance(self, inheritance: Inheritance) -> None:
''' Insert new inheritance. '''
self.inheritance[inheritance.operation_id].append((inheritance.parent_id, inheritance.child_id))
def _insert_new(self, schema: RSForm) -> None:
self._schemas.append(schema)
self._schema_by_id[schema.model.pk] = schema
def __init__(self, model: LibraryItem):
self.oss = OperationSchema(model)
self.cache = ChangeManager.Cache(self.oss)
def on_create_cst(self, new_cst: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when new constituent is created. '''
self.cache.insert(source)
depend_aliases = new_cst.extract_references()
alias_mapping: CstMapping = {}
for alias in depend_aliases:
cst = source.cache.by_alias.get(alias)
if cst is not None:
alias_mapping[alias] = cst
operation = self.cache.get_operation(source)
self._cascade_create_cst(new_cst, operation, alias_mapping)
def on_change_cst_type(self, target: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when constituenta type is changed. '''
self.cache.insert(source)
operation = self.cache.get_operation(source)
self._cascade_change_cst_type(target.pk, target.cst_type, operation)
def on_update_cst(self, target: Constituenta, data: dict, old_data: dict, source: RSForm) -> None:
''' Trigger cascade resolutions when constituenta data is changed. '''
self.cache.insert(source)
operation = self.cache.get_operation(source)
depend_aliases = self._extract_data_references(data, old_data)
alias_mapping: CstMapping = {}
for alias in depend_aliases:
cst = source.cache.by_alias.get(alias)
if cst is not None:
alias_mapping[alias] = cst
self._cascade_update_cst(target.pk, operation, data, old_data, alias_mapping)
def _cascade_create_cst(self, prototype: Constituenta, operation: Operation, mapping: CstMapping) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
source_schema = self.cache.get_schema(operation)
assert source_schema is not None
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_schema(child_operation)
if child_schema is None:
continue
# TODO: update substitutions for diamond synthesis (if needed)
self.cache.ensure_loaded()
new_mapping = self._transform_mapping(mapping, child_operation, child_schema)
alias_mapping = {alias: cst.alias for alias, cst in new_mapping.items()}
insert_where = self._determine_insert_position(prototype, child_operation, source_schema, child_schema)
new_cst = child_schema.insert_copy([prototype], insert_where, alias_mapping)[0]
new_inheritance = Inheritance.objects.create(
operation=child_operation,
child=new_cst,
parent=prototype
)
self.cache.insert_inheritance(new_inheritance)
new_mapping = {alias_mapping[alias]: cst for alias, cst in new_mapping.items()}
self._cascade_create_cst(new_cst, child_operation, new_mapping)
def _cascade_change_cst_type(self, cst_id: int, ctype: CstType, operation: Operation) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_successor_for(cst_id, child_id, ignore_substitution=True)
if successor_id is None:
continue
child_schema = self.cache.get_schema(child_operation)
if child_schema is not None and child_schema.change_cst_type(successor_id, ctype):
self._cascade_change_cst_type(successor_id, ctype, child_operation)
# pylint: disable=too-many-arguments
def _cascade_update_cst(
self,
cst_id: int, operation: Operation,
data: dict, old_data: dict,
mapping: CstMapping
) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_successor_for(cst_id, child_id, ignore_substitution=True)
if successor_id is None:
continue
child_schema = self.cache.get_schema(child_operation)
assert child_schema is not None
new_mapping = self._transform_mapping(mapping, child_operation, child_schema)
alias_mapping = {alias: cst.alias for alias, cst in new_mapping.items()}
successor = child_schema.cache.by_id.get(successor_id)
if successor is None:
continue
new_data = self._prepare_update_data(successor, data, old_data, alias_mapping)
if len(new_data) == 0:
continue
new_old_data = child_schema.update_cst(successor, new_data)
if len(new_old_data) == 0:
continue
new_mapping = {alias_mapping[alias]: cst for alias, cst in new_mapping.items()}
self._cascade_update_cst(successor_id, child_operation, new_data, new_old_data, new_mapping)
def _transform_mapping(self, mapping: CstMapping, operation: Operation, schema: RSForm) -> CstMapping:
if len(mapping) == 0:
return mapping
result: CstMapping = {}
for alias, cst in mapping.items():
successor_id = self.cache.get_successor_for(cst.pk, operation.pk)
if successor_id is None:
continue
successor = schema.cache.by_id.get(successor_id)
if successor is None:
continue
result[alias] = successor
return result
def _determine_insert_position(
self, prototype: Constituenta,
operation: Operation,
source: RSForm,
destination: RSForm
) -> int:
''' Determine insert_after for new constituenta. '''
if prototype.order == 1:
return 1
prev_cst = source.cache.constituents[prototype.order - 2]
inherited_prev_id = self.cache.get_successor_for(
source.cache.constituents[prototype.order - 2].pk, operation.pk)
if inherited_prev_id is None:
return INSERT_LAST
prev_cst = destination.cache.by_id[inherited_prev_id]
return cast(int, prev_cst.order) + 1
def _extract_data_references(self, data: dict, old_data: dict) -> set[str]:
result: set[str] = set()
if 'definition_formal' in data:
result.update(extract_globals(data['definition_formal']))
result.update(extract_globals(old_data['definition_formal']))
if 'term_raw' in data:
result.update(extract_entities(data['term_raw']))
result.update(extract_entities(old_data['term_raw']))
if 'definition_raw' in data:
result.update(extract_entities(data['definition_raw']))
result.update(extract_entities(old_data['definition_raw']))
return result
def _prepare_update_data(self, cst: Constituenta, data: dict, old_data: dict, mapping: dict[str, str]) -> dict:
new_data = {}
if 'term_forms' in data:
if old_data['term_forms'] == cst.term_forms:
new_data['term_forms'] = data['term_forms']
if 'convention' in data:
if old_data['convention'] == cst.convention:
new_data['convention'] = data['convention']
if 'definition_formal' in data:
new_data['definition_formal'] = replace_globals(data['definition_formal'], mapping)
if 'term_raw' in data:
if replace_entities(old_data['term_raw'], mapping) == cst.term_raw:
new_data['term_raw'] = replace_entities(data['term_raw'], mapping)
if 'definition_raw' in data:
if replace_entities(old_data['definition_raw'], mapping) == cst.definition_raw:
new_data['definition_raw'] = replace_entities(data['definition_raw'], mapping)
return new_data

View File

@ -27,7 +27,7 @@ class Operation(Model):
verbose_name='Схема синтеза', verbose_name='Схема синтеза',
to='library.LibraryItem', to='library.LibraryItem',
on_delete=CASCADE, on_delete=CASCADE,
related_name='items' related_name='operations'
) )
operation_type: CharField = CharField( operation_type: CharField = CharField(
verbose_name='Тип', verbose_name='Тип',

View File

@ -1,11 +1,10 @@
''' Models: OSS API. ''' ''' Models: OSS API. '''
from typing import Optional from typing import Optional
from django.db import transaction
from django.db.models import QuerySet from django.db.models import QuerySet
from apps.library.models import Editor, LibraryItem, LibraryItemType from apps.library.models import Editor, LibraryItem, LibraryItemType
from apps.rsform.models import RSForm from apps.rsform.models import Constituenta, RSForm
from .Argument import Argument from .Argument import Argument
from .Inheritance import Inheritance from .Inheritance import Inheritance
@ -31,11 +30,11 @@ class OperationSchema:
model = LibraryItem.objects.get(pk=pk) model = LibraryItem.objects.get(pk=pk)
return OperationSchema(model) return OperationSchema(model)
def save(self, *args, **kwargs): def save(self, *args, **kwargs) -> None:
''' Save wrapper. ''' ''' Save wrapper. '''
self.model.save(*args, **kwargs) self.model.save(*args, **kwargs)
def refresh_from_db(self): def refresh_from_db(self) -> None:
''' Model wrapper. ''' ''' Model wrapper. '''
self.model.refresh_from_db() self.model.refresh_from_db()
@ -51,6 +50,10 @@ class OperationSchema:
''' Operation substitutions. ''' ''' Operation substitutions. '''
return Substitution.objects.filter(operation__oss=self.model) return Substitution.objects.filter(operation__oss=self.model)
def inheritance(self) -> QuerySet[Inheritance]:
''' Operation inheritances. '''
return Inheritance.objects.filter(operation__oss=self.model)
def owned_schemas(self) -> QuerySet[LibraryItem]: def owned_schemas(self) -> QuerySet[LibraryItem]:
''' Get QuerySet containing all result schemas owned by current OSS. ''' ''' Get QuerySet containing all result schemas owned by current OSS. '''
return LibraryItem.objects.filter( return LibraryItem.objects.filter(
@ -59,7 +62,7 @@ class OperationSchema:
location=self.model.location location=self.model.location
) )
def update_positions(self, data: list[dict]): def update_positions(self, data: list[dict]) -> None:
''' Update positions. ''' ''' Update positions. '''
lookup = {x['id']: x for x in data} lookup = {x['id']: x for x in data}
operations = self.operations() operations = self.operations()
@ -69,7 +72,6 @@ class OperationSchema:
item.position_y = lookup[item.pk]['position_y'] item.position_y = lookup[item.pk]['position_y']
Operation.objects.bulk_update(operations, ['position_x', 'position_y']) Operation.objects.bulk_update(operations, ['position_x', 'position_y'])
@transaction.atomic
def create_operation(self, **kwargs) -> Operation: def create_operation(self, **kwargs) -> Operation:
''' Insert new operation. ''' ''' Insert new operation. '''
result = Operation.objects.create(oss=self.model, **kwargs) result = Operation.objects.create(oss=self.model, **kwargs)
@ -77,7 +79,6 @@ class OperationSchema:
result.refresh_from_db() result.refresh_from_db()
return result return result
@transaction.atomic
def delete_operation(self, operation: Operation): def delete_operation(self, operation: Operation):
''' Delete operation. ''' ''' Delete operation. '''
operation.delete() operation.delete()
@ -87,8 +88,7 @@ class OperationSchema:
self.save() self.save()
@transaction.atomic def set_input(self, target: Operation, schema: Optional[LibraryItem]) -> None:
def set_input(self, target: Operation, schema: Optional[LibraryItem]):
''' Set input schema for operation. ''' ''' Set input schema for operation. '''
if schema == target.result: if schema == target.result:
return return
@ -104,8 +104,7 @@ class OperationSchema:
self.save() self.save()
@transaction.atomic def set_arguments(self, operation: Operation, arguments: list[Operation]) -> None:
def set_arguments(self, operation: Operation, arguments: list[Operation]):
''' Set arguments to operation. ''' ''' Set arguments to operation. '''
processed: list[Operation] = [] processed: list[Operation] = []
changed = False changed = False
@ -125,8 +124,7 @@ class OperationSchema:
# TODO: trigger on_change effects # TODO: trigger on_change effects
self.save() self.save()
@transaction.atomic def set_substitutions(self, target: Operation, substitutes: list[dict]) -> None:
def set_substitutions(self, target: Operation, substitutes: list[dict]):
''' Clear all arguments for operation. ''' ''' Clear all arguments for operation. '''
processed: list[dict] = [] processed: list[dict] = []
changed = False changed = False
@ -157,7 +155,6 @@ class OperationSchema:
self.save() self.save()
@transaction.atomic
def create_input(self, operation: Operation) -> RSForm: def create_input(self, operation: Operation) -> RSForm:
''' Create input RSForm. ''' ''' Create input RSForm. '''
schema = RSForm.create( schema = RSForm.create(
@ -175,7 +172,6 @@ class OperationSchema:
self.save() self.save()
return schema return schema
@transaction.atomic
def execute_operation(self, operation: Operation) -> bool: def execute_operation(self, operation: Operation) -> bool:
''' Execute target operation. ''' ''' Execute target operation. '''
schemas: list[LibraryItem] = [arg.argument.result for arg in operation.getArguments()] schemas: list[LibraryItem] = [arg.argument.result for arg in operation.getArguments()]
@ -194,10 +190,12 @@ class OperationSchema:
parents[cst.pk] = items[i] parents[cst.pk] = items[i]
children[items[i].pk] = cst children[items[i].pk] = cst
translated_substitutions: list[tuple[Constituenta, Constituenta]] = []
for sub in substitutions: for sub in substitutions:
original = children[sub.original.pk] original = children[sub.original.pk]
replacement = children[sub.substitution.pk] replacement = children[sub.substitution.pk]
receiver.substitute(original, replacement) translated_substitutions.append((original, replacement))
receiver.substitute(translated_substitutions)
# TODO: remove duplicates from diamond # TODO: remove duplicates from diamond

View File

@ -1,6 +1,7 @@
''' Django: Models. ''' ''' Django: Models. '''
from .Argument import Argument from .Argument import Argument
from .ChangeManager import ChangeManager
from .Inheritance import Inheritance from .Inheritance import Inheritance
from .Operation import Operation, OperationType from .Operation import Operation, OperationType
from .OperationSchema import OperationSchema from .OperationSchema import OperationSchema

View File

@ -57,9 +57,9 @@ class OperationCreateSerializer(serializers.Serializer):
class OperationUpdateSerializer(serializers.Serializer): class OperationUpdateSerializer(serializers.Serializer):
''' Serializer: Operation creation. ''' ''' Serializer: Operation update. '''
class OperationUpdateData(serializers.ModelSerializer): class OperationUpdateData(serializers.ModelSerializer):
''' Serializer: Operation creation data. ''' ''' Serializer: Operation update data. '''
class Meta: class Meta:
''' serializer metadata. ''' ''' serializer metadata. '''
model = Operation model = Operation

View File

@ -31,36 +31,3 @@ class TestOperation(TestCase):
self.assertEqual(self.operation.comment, '') self.assertEqual(self.operation.comment, '')
self.assertEqual(self.operation.position_x, 0) self.assertEqual(self.operation.position_x, 0)
self.assertEqual(self.operation.position_y, 0) self.assertEqual(self.operation.position_y, 0)
def test_sync_from_result(self):
schema = RSForm.create(alias=self.operation.alias)
self.operation.result = schema.model
self.operation.save()
schema.model.alias = 'KS2'
schema.model.comment = 'Comment'
schema.model.title = 'Title'
schema.save()
self.operation.refresh_from_db()
self.assertEqual(self.operation.result, schema.model)
self.assertEqual(self.operation.alias, schema.model.alias)
self.assertEqual(self.operation.title, schema.model.title)
self.assertEqual(self.operation.comment, schema.model.comment)
def test_sync_from_library_item(self):
schema = LibraryItem.objects.create(alias=self.operation.alias, item_type=LibraryItemType.RSFORM)
self.operation.result = schema
self.operation.save()
schema.alias = 'KS2'
schema.comment = 'Comment'
schema.title = 'Title'
schema.save()
self.operation.refresh_from_db()
self.assertEqual(self.operation.result, schema)
self.assertEqual(self.operation.alias, schema.alias)
self.assertEqual(self.operation.title, schema.title)
self.assertEqual(self.operation.comment, schema.comment)

View File

@ -14,9 +14,9 @@ class TestSynthesisSubstitution(TestCase):
self.oss = OperationSchema.create(alias='T1') self.oss = OperationSchema.create(alias='T1')
self.ks1 = RSForm.create(alias='KS1', title='Test1') self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1') self.ks1X1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2') self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2') self.ks2X1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = Operation.objects.create( self.operation1 = Operation.objects.create(
oss=self.oss.model, oss=self.oss.model,
@ -46,13 +46,13 @@ class TestSynthesisSubstitution(TestCase):
self.substitution = Substitution.objects.create( self.substitution = Substitution.objects.create(
operation=self.operation3, operation=self.operation3,
original=self.ks1x1, original=self.ks1X1,
substitution=self.ks2x1 substitution=self.ks2X1
) )
def test_str(self): def test_str(self):
testStr = f'{self.ks1x1} -> {self.ks2x1}' testStr = f'{self.ks1X1} -> {self.ks2X1}'
self.assertEqual(str(self.substitution), testStr) self.assertEqual(str(self.substitution), testStr)
@ -64,11 +64,11 @@ class TestSynthesisSubstitution(TestCase):
def test_cascade_delete_original(self): def test_cascade_delete_original(self):
self.assertEqual(Substitution.objects.count(), 1) self.assertEqual(Substitution.objects.count(), 1)
self.ks1x1.delete() self.ks1X1.delete()
self.assertEqual(Substitution.objects.count(), 0) self.assertEqual(Substitution.objects.count(), 0)
def test_cascade_delete_substitution(self): def test_cascade_delete_substitution(self):
self.assertEqual(Substitution.objects.count(), 1) self.assertEqual(Substitution.objects.count(), 1)
self.ks2x1.delete() self.ks2X1.delete()
self.assertEqual(Substitution.objects.count(), 0) self.assertEqual(Substitution.objects.count(), 0)

View File

@ -1,3 +1,4 @@
''' Tests for REST API. ''' ''' Tests for REST API. '''
from .t_change_attributes import * from .t_change_attributes import *
from .t_change_constituents import *
from .t_oss import * from .t_oss import *

View File

@ -1,7 +1,4 @@
''' Testing API: Change attributes of OSS and RSForms. ''' ''' Testing API: Change attributes of OSS and RSForms. '''
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LocationHead from apps.library.models import AccessPolicy, Editor, LocationHead
from apps.oss.models import Operation, OperationSchema, OperationType from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm from apps.rsform.models import RSForm
@ -123,3 +120,33 @@ class TestChangeAttributes(EndpointTester):
self.assertEqual(list(self.ks1.model.editors()), [self.user, self.user2]) self.assertEqual(list(self.ks1.model.editors()), [self.user, self.user2])
self.assertEqual(list(self.ks2.model.editors()), []) self.assertEqual(list(self.ks2.model.editors()), [])
self.assertEqual(set(self.ks3.editors()), set([self.user, self.user3])) self.assertEqual(set(self.ks3.editors()), set([self.user, self.user3]))
@decl_endpoint('/api/library/{item}', method='patch')
def test_sync_from_result(self):
data = {'alias': 'KS111', 'title': 'New Title', 'comment': 'New Comment'}
self.executeOK(data=data, item=self.ks1.model.pk)
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.result, self.ks1.model)
self.assertEqual(self.operation1.alias, data['alias'])
self.assertEqual(self.operation1.title, data['title'])
self.assertEqual(self.operation1.comment, data['comment'])
@decl_endpoint('/api/oss/{item}/update-operation', method='patch')
def test_sync_from_operation(self):
data = {
'target': self.operation3.pk,
'item_data': {
'alias': 'Test3 mod',
'title': 'Test title mod',
'comment': 'Comment mod'
},
'positions': [],
}
response = self.executeOK(data=data, item=self.owned_id)
self.ks3.refresh_from_db()
self.assertEqual(self.ks3.alias, data['item_data']['alias'])
self.assertEqual(self.ks3.title, data['item_data']['title'])
self.assertEqual(self.ks3.comment, data['item_data']['comment'])

View File

@ -0,0 +1,109 @@
''' Testing API: Change constituents in OSS. '''
from apps.oss.models import OperationSchema, OperationType
from apps.rsform.models import Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestChangeConstituents(EndpointTester):
''' Testing Constituents change propagation in OSS. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(
title='Test',
alias='T1',
owner=self.user
)
self.owned_id = self.owned.model.pk
self.ks1 = RSForm.create(
alias='KS1',
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X4')
self.ks1X2 = self.ks1.insert_new('X5')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1')
self.ks2D1 = self.ks2.insert_new(
alias='D1',
definition_formal=r'X1\X1'
)
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.model
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.model
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
self.assertEqual(self.ks3.constituents().count(), 4)
@decl_endpoint('/api/rsforms/{schema}/create-cst', method='post')
def test_create_constituenta(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'definition_formal': 'X4 = X5'
}
response = self.executeCreated(data=data, schema=self.ks1.model.pk)
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 5)
self.assertEqual(inherited_cst.alias, 'X4')
self.assertEqual(inherited_cst.order, 3)
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
@decl_endpoint('/api/rsforms/{schema}/rename-cst', method='patch')
def test_rename_constituenta(self):
data = {'target': self.ks1X1.pk, 'alias': 'D21', 'cst_type': CstType.TERM}
response = self.executeOK(data=data, schema=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertEqual(self.ks1X1.alias, data['alias'])
self.assertEqual(self.ks1X1.cst_type, data['cst_type'])
self.assertEqual(inherited_cst.alias, 'D2')
self.assertEqual(inherited_cst.cst_type, data['cst_type'])
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_constituenta(self):
d2 = self.ks3.insert_new('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
data = {
'target': self.ks1X1.pk,
'item_data': {
'term_raw': 'Test1',
'definition_formal': r'X4\X4',
'definition_raw': '@{X5|sing,datv}'
}
}
response = self.executeOK(data=data, schema=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
d2.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertEqual(self.ks1X1.term_raw, data['item_data']['term_raw'])
self.assertEqual(self.ks1X1.definition_formal, data['item_data']['definition_formal'])
self.assertEqual(self.ks1X1.definition_raw, data['item_data']['definition_raw'])
self.assertEqual(d2.definition_resolved, data['item_data']['term_raw'])
self.assertEqual(inherited_cst.term_raw, data['item_data']['term_raw'])
self.assertEqual(inherited_cst.definition_formal, r'X1\X1')
self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}')

View File

@ -1,8 +1,5 @@
''' Testing API: Operation Schema. ''' ''' Testing API: Operation Schema. '''
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType, LocationHead
from apps.oss.models import Operation, OperationSchema, OperationType from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm from apps.rsform.models import RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint from shared.EndpointTester import EndpointTester, decl_endpoint
@ -28,7 +25,7 @@ class TestOssViewset(EndpointTester):
title='Test1', title='Test1',
owner=self.user owner=self.user
) )
self.ks1x1 = self.ks1.insert_new( self.ks1X1 = self.ks1.insert_new(
'X1', 'X1',
term_raw='X1_1', term_raw='X1_1',
term_resolved='X1_1' term_resolved='X1_1'
@ -38,7 +35,7 @@ class TestOssViewset(EndpointTester):
title='Test2', title='Test2',
owner=self.user owner=self.user
) )
self.ks2x1 = self.ks2.insert_new( self.ks2X1 = self.ks2.insert_new(
'X2', 'X2',
term_raw='X1_2', term_raw='X1_2',
term_resolved='X1_2' term_resolved='X1_2'
@ -60,8 +57,8 @@ class TestOssViewset(EndpointTester):
) )
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2]) self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.set_substitutions(self.operation3, [{ self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1, 'original': self.ks1X1,
'substitution': self.ks2x1 'substitution': self.ks2X1
}]) }])
@decl_endpoint('/api/oss/{item}/details', method='get') @decl_endpoint('/api/oss/{item}/details', method='get')
@ -85,12 +82,12 @@ class TestOssViewset(EndpointTester):
self.assertEqual(len(response.data['substitutions']), 1) self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0] sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk) self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk) self.assertEqual(sub['original'], self.ks1X1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk) self.assertEqual(sub['substitution'], self.ks2X1.pk)
self.assertEqual(sub['original_alias'], self.ks1x1.alias) self.assertEqual(sub['original_alias'], self.ks1X1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved) self.assertEqual(sub['original_term'], self.ks1X1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias) self.assertEqual(sub['substitution_alias'], self.ks2X1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved) self.assertEqual(sub['substitution_term'], self.ks2X1.term_resolved)
arguments = response.data['arguments'] arguments = response.data['arguments']
self.assertEqual(len(arguments), 2) self.assertEqual(len(arguments), 2)
@ -369,14 +366,14 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk], 'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [ 'substitutions': [
{ {
'original': self.ks1x1.pk, 'original': self.ks1X1.pk,
'substitution': ks3x1.pk 'substitution': ks3x1.pk
} }
] ]
} }
self.executeBadData(data=data) self.executeBadData(data=data)
data['substitutions'][0]['substitution'] = self.ks2x1.pk data['substitutions'][0]['substitution'] = self.ks2X1.pk
self.toggle_admin(True) self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id) self.executeBadData(data=data, item=self.unowned_id)
self.logout() self.logout()
@ -421,7 +418,7 @@ class TestOssViewset(EndpointTester):
def test_update_operation_invalid_substitution(self): def test_update_operation_invalid_substitution(self):
self.populateData() self.populateData()
self.ks1x2 = self.ks1.insert_new('X2') self.ks1X2 = self.ks1.insert_new('X2')
data = { data = {
'target': self.operation3.pk, 'target': self.operation3.pk,
@ -434,12 +431,12 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk], 'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [ 'substitutions': [
{ {
'original': self.ks1x1.pk, 'original': self.ks1X1.pk,
'substitution': self.ks2x1.pk 'substitution': self.ks2X1.pk
}, },
{ {
'original': self.ks2x1.pk, 'original': self.ks2X1.pk,
'substitution': self.ks1x2.pk 'substitution': self.ks1X2.pk
} }
] ]
} }
@ -473,4 +470,4 @@ class TestOssViewset(EndpointTester):
items = list(RSForm(schema).constituents()) items = list(RSForm(schema).constituents())
self.assertEqual(len(items), 1) self.assertEqual(len(items), 1)
self.assertEqual(items[0].alias, 'X1') self.assertEqual(items[0].alias, 'X1')
self.assertEqual(items[0].term_resolved, self.ks2x1.term_resolved) self.assertEqual(items[0].term_resolved, self.ks2X1.term_resolved)

View File

@ -2,6 +2,7 @@
from typing import cast from typing import cast
from django.db import transaction from django.db import transaction
from django.http import HttpResponse
from drf_spectacular.utils import extend_schema, extend_schema_view from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics, serializers from rest_framework import generics, serializers
from rest_framework import status as c from rest_framework import status as c
@ -61,7 +62,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['get'], url_path='details') @action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk): def details(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Detailed OSS data. ''' ''' Endpoint: Detailed OSS data. '''
serializer = s.OperationSchemaSerializer(self._get_item()) serializer = s.OperationSchemaSerializer(self._get_item())
return Response( return Response(
@ -80,7 +81,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['patch'], url_path='update-positions') @action(detail=True, methods=['patch'], url_path='update-positions')
def update_positions(self, request: Request, pk): def update_positions(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Update operations positions. ''' ''' Endpoint: Update operations positions. '''
serializer = s.PositionsSerializer(data=request.data) serializer = s.PositionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -99,7 +100,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['post'], url_path='create-operation') @action(detail=True, methods=['post'], url_path='create-operation')
def create_operation(self, request: Request, pk): def create_operation(self, request: Request, pk) -> HttpResponse:
''' Create new operation. ''' ''' Create new operation. '''
serializer = s.OperationCreateSerializer(data=request.data) serializer = s.OperationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -135,7 +136,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['patch'], url_path='delete-operation') @action(detail=True, methods=['patch'], url_path='delete-operation')
def delete_operation(self, request: Request, pk): def delete_operation(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete operation. ''' ''' Endpoint: Delete operation. '''
serializer = s.OperationTargetSerializer( serializer = s.OperationTargetSerializer(
data=request.data, data=request.data,
@ -165,7 +166,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['patch'], url_path='create-input') @action(detail=True, methods=['patch'], url_path='create-input')
def create_input(self, request: Request, pk): def create_input(self, request: Request, pk) -> HttpResponse:
''' Create new input RSForm. ''' ''' Create new input RSForm. '''
serializer = s.OperationTargetSerializer( serializer = s.OperationTargetSerializer(
data=request.data, data=request.data,
@ -208,7 +209,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['patch'], url_path='set-input') @action(detail=True, methods=['patch'], url_path='set-input')
def set_input(self, request: Request, pk): def set_input(self, request: Request, pk) -> HttpResponse:
''' Set input schema for target operation. ''' ''' Set input schema for target operation. '''
serializer = s.SetOperationInputSerializer( serializer = s.SetOperationInputSerializer(
data=request.data, data=request.data,
@ -238,7 +239,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['patch'], url_path='update-operation') @action(detail=True, methods=['patch'], url_path='update-operation')
def update_operation(self, request: Request, pk): def update_operation(self, request: Request, pk) -> HttpResponse:
''' Update operation arguments and parameters. ''' ''' Update operation arguments and parameters. '''
serializer = s.OperationUpdateSerializer( serializer = s.OperationUpdateSerializer(
data=request.data, data=request.data,
@ -253,7 +254,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
operation.alias = serializer.validated_data['item_data']['alias'] operation.alias = serializer.validated_data['item_data']['alias']
operation.title = serializer.validated_data['item_data']['title'] operation.title = serializer.validated_data['item_data']['title']
operation.comment = serializer.validated_data['item_data']['comment'] operation.comment = serializer.validated_data['item_data']['comment']
operation.save() operation.save(update_fields=['alias', 'title', 'comment'])
if operation.result is not None: if operation.result is not None:
can_edit = permissions.can_edit_item(request.user, operation.result) can_edit = permissions.can_edit_item(request.user, operation.result)
@ -283,7 +284,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=True, methods=['post'], url_path='execute-operation') @action(detail=True, methods=['post'], url_path='execute-operation')
def execute_operation(self, request: Request, pk): def execute_operation(self, request: Request, pk) -> HttpResponse:
''' Execute operation. ''' ''' Execute operation. '''
serializer = s.OperationTargetSerializer( serializer = s.OperationTargetSerializer(
data=request.data, data=request.data,
@ -323,7 +324,7 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
} }
) )
@action(detail=False, methods=['post'], url_path='get-predecessor') @action(detail=False, methods=['post'], url_path='get-predecessor')
def get_predecessor(self, request: Request): def get_predecessor(self, request: Request) -> HttpResponse:
''' Get predecessor. ''' ''' Get predecessor. '''
# TODO: add tests for this method # TODO: add tests for this method
serializer = CstTargetSerializer(data=request.data) serializer = CstTargetSerializer(data=request.data)

View File

@ -1,6 +1,7 @@
''' Models: Constituenta. ''' ''' Models: Constituenta. '''
import re import re
from cctext import extract_entities
from django.core.validators import MinValueValidator from django.core.validators import MinValueValidator
from django.db.models import ( from django.db.models import (
CASCADE, CASCADE,
@ -15,10 +16,26 @@ from django.db.models import (
from ..utils import apply_pattern from ..utils import apply_pattern
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_REF_ENTITY_PATTERN = re.compile(r'@{([^0-9\-].*?)\|.*?}') _REF_ENTITY_PATTERN = re.compile(r'@{([^0-9\-].*?)\|.*?}')
_GLOBAL_ID_PATTERN = re.compile(r'([XCSADFPT][0-9]+)') # cspell:disable-line _GLOBAL_ID_PATTERN = re.compile(r'([XCSADFPT][0-9]+)') # cspell:disable-line
def extract_globals(expression: str) -> set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
def replace_globals(expression: str, mapping: dict[str, str]) -> str:
''' Replace all global aliases in expression. '''
return apply_pattern(expression, mapping, _GLOBAL_ID_PATTERN)
def replace_entities(expression: str, mapping: dict[str, str]) -> str:
''' Replace all entity references in expression. '''
return apply_pattern(expression, mapping, _REF_ENTITY_PATTERN)
class CstType(TextChoices): class CstType(TextChoices):
''' Type of constituenta. ''' ''' Type of constituenta. '''
BASE = 'basic' BASE = 'basic'
@ -99,8 +116,6 @@ class Constituenta(Model):
def set_term_resolved(self, new_term: str): def set_term_resolved(self, new_term: str):
''' Set term and reset forms if needed. ''' ''' Set term and reset forms if needed. '''
if new_term == self.term_resolved:
return
self.term_resolved = new_term self.term_resolved = new_term
self.term_forms = [] self.term_forms = []
@ -109,20 +124,23 @@ class Constituenta(Model):
if change_aliases and self.alias in mapping: if change_aliases and self.alias in mapping:
modified = True modified = True
self.alias = mapping[self.alias] self.alias = mapping[self.alias]
expression = apply_pattern(self.definition_formal, mapping, _GLOBAL_ID_PATTERN) expression = replace_globals(self.definition_formal, mapping)
if expression != self.definition_formal: if expression != self.definition_formal:
modified = True modified = True
self.definition_formal = expression self.definition_formal = expression
convention = apply_pattern(self.convention, mapping, _GLOBAL_ID_PATTERN) term = replace_entities(self.term_raw, mapping)
if convention != self.convention:
modified = True
self.convention = convention
term = apply_pattern(self.term_raw, mapping, _REF_ENTITY_PATTERN)
if term != self.term_raw: if term != self.term_raw:
modified = True modified = True
self.term_raw = term self.term_raw = term
definition = apply_pattern(self.definition_raw, mapping, _REF_ENTITY_PATTERN) definition = replace_entities(self.definition_raw, mapping)
if definition != self.definition_raw: if definition != self.definition_raw:
modified = True modified = True
self.definition_raw = definition self.definition_raw = definition
return modified return modified
def extract_references(self) -> set[str]:
''' Extract all references from term and definition. '''
result: set[str] = extract_globals(self.definition_formal)
result.update(extract_entities(self.term_raw))
result.update(extract_entities(self.definition_raw))
return result

View File

@ -1,10 +1,9 @@
''' Models: RSForm API. ''' ''' Models: RSForm API. '''
from copy import deepcopy from copy import deepcopy
from typing import Optional, cast from typing import Iterable, Optional, cast
from cctext import Entity, Resolver, TermForm, extract_entities, split_grams from cctext import Entity, Resolver, TermForm, extract_entities, split_grams
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet from django.db.models import QuerySet
from apps.library.models import LibraryItem, LibraryItemType, Version from apps.library.models import LibraryItem, LibraryItemType, Version
@ -12,7 +11,6 @@ from shared import messages as msg
from ..graph import Graph from ..graph import Graph
from .api_RSLanguage import ( from .api_RSLanguage import (
extract_globals,
generate_structure, generate_structure,
get_type_prefix, get_type_prefix,
guess_type, guess_type,
@ -22,16 +20,79 @@ from .api_RSLanguage import (
is_simple_expression, is_simple_expression,
split_template split_template
) )
from .Constituenta import Constituenta, CstType from .Constituenta import Constituenta, CstType, extract_globals
_INSERT_LAST: int = -1 INSERT_LAST: int = -1
class RSForm: class RSForm:
''' RSForm is math form of conceptual schema. ''' ''' RSForm is math form of conceptual schema. '''
class Cache:
''' Cache for RSForm constituents. '''
def __init__(self, schema: 'RSForm'):
self._schema = schema
self.constituents: list[Constituenta] = []
self.by_id: dict[int, Constituenta] = {}
self.by_alias: dict[str, Constituenta] = {}
self.is_loaded = False
def reload(self) -> None:
self.constituents = list(
self._schema.constituents().only(
'order',
'alias',
'cst_type',
'definition_formal',
'term_raw',
'definition_raw'
).order_by('order')
)
self.by_id = {cst.pk: cst for cst in self.constituents}
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
def clear(self) -> None:
self.constituents = []
self.by_id = {}
self.by_alias = {}
self.is_loaded = False
def insert(self, cst: Constituenta) -> None:
if self.is_loaded:
self.constituents.insert(cst.order - 1, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def insert_multi(self, items: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in items:
self.constituents.insert(cst.order - 1, cst)
self.by_id[cst.pk] = cst
self.by_alias[cst.alias] = cst
def remove(self, target: Constituenta) -> None:
if self.is_loaded:
self.constituents.remove(self.by_id[target.pk])
del self.by_id[target.pk]
del self.by_alias[target.alias]
def remove_multi(self, target: Iterable[Constituenta]) -> None:
if self.is_loaded:
for cst in target:
self.constituents.remove(self.by_id[cst.pk])
del self.by_id[cst.pk]
del self.by_alias[cst.alias]
def __init__(self, model: LibraryItem): def __init__(self, model: LibraryItem):
self.model = model self.model = model
self.cache: RSForm.Cache = RSForm.Cache(self)
@staticmethod @staticmethod
def create(**kwargs) -> 'RSForm': def create(**kwargs) -> 'RSForm':
@ -45,11 +106,11 @@ class RSForm:
model = LibraryItem.objects.get(pk=pk) model = LibraryItem.objects.get(pk=pk)
return RSForm(model) return RSForm(model)
def save(self, *args, **kwargs): def save(self, *args, **kwargs) -> None:
''' Model wrapper. ''' ''' Model wrapper. '''
self.model.save(*args, **kwargs) self.model.save(*args, **kwargs)
def refresh_from_db(self): def refresh_from_db(self) -> None:
''' Model wrapper. ''' ''' Model wrapper. '''
self.model.refresh_from_db() self.model.refresh_from_db()
@ -60,7 +121,7 @@ class RSForm:
def resolver(self) -> Resolver: def resolver(self) -> Resolver:
''' Create resolver for text references based on schema terms. ''' ''' Create resolver for text references based on schema terms. '''
result = Resolver({}) result = Resolver({})
for cst in self.constituents(): for cst in self.constituents().only('alias', 'term_resolved', 'term_forms'):
entity = Entity( entity = Entity(
alias=cst.alias, alias=cst.alias,
nominal=cst.term_resolved, nominal=cst.term_resolved,
@ -76,53 +137,57 @@ class RSForm:
''' Access semantic information on constituents. ''' ''' Access semantic information on constituents. '''
return SemanticInfo(self) return SemanticInfo(self)
@transaction.atomic def on_term_change(self, changed: list[int]) -> None:
def on_term_change(self, changed: list[int]):
''' Trigger cascade resolutions when term changes. ''' ''' Trigger cascade resolutions when term changes. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term() graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed) expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion expanded_change = changed + expansion
update_list: list[Constituenta] = []
resolver = self.resolver() resolver = self.resolver()
if len(expansion) > 0: if len(expansion) > 0:
for cst_id in graph_terms.topological_order(): for cst_id in graph_terms.topological_order():
if cst_id not in expansion: if cst_id not in expansion:
continue continue
cst = self.constituents().get(id=cst_id) cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw) resolved = resolver.resolve(cst.term_raw)
if resolved == cst.term_resolved: if resolved == resolver.context[cst.alias].get_nominal():
continue continue
cst.set_term_resolved(resolved) cst.set_term_resolved(resolved)
cst.save() update_list.append(cst)
resolver.context[cst.alias] = Entity(cst.alias, resolved) resolver.context[cst.alias] = Entity(cst.alias, resolved)
Constituenta.objects.bulk_update(update_list, ['term_resolved'])
graph_defs = self._graph_text() graph_defs = self._graph_text()
update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed) update_defs = set(expansion + graph_defs.expand_outputs(expanded_change)).union(changed)
update_list = []
if len(update_defs) == 0: if len(update_defs) == 0:
return return
for cst_id in update_defs: for cst_id in update_defs:
cst = self.constituents().get(id=cst_id) cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.definition_raw) resolved = resolver.resolve(cst.definition_raw)
if resolved == cst.definition_resolved:
continue
cst.definition_resolved = resolved cst.definition_resolved = resolved
cst.save() update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['definition_resolved'])
def get_max_index(self, cst_type: CstType) -> int: def get_max_index(self, cst_type: CstType) -> int:
''' Get maximum alias index for specific CstType. ''' ''' Get maximum alias index for specific CstType. '''
result: int = 0 result: int = 0
items = Constituenta.objects \ cst_list: Iterable[Constituenta] = []
.filter(schema=self.model, cst_type=cst_type) \ if not self.cache.is_loaded:
.order_by('-alias') \ cst_list = Constituenta.objects \
.values_list('alias', flat=True) .filter(schema=self.model, cst_type=cst_type) \
for alias in items: .only('alias')
result = max(result, int(alias[1:])) else:
cst_list = [cst for cst in self.cache.constituents if cst.cst_type == cst_type]
for cst in cst_list:
result = max(result, int(cst.alias[1:]))
return result return result
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta: def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create new cst from data. ''' ''' Create new cst from data. '''
if insert_after is None: if insert_after is None:
position = _INSERT_LAST position = INSERT_LAST
else: else:
position = insert_after.order + 1 position = insert_after.order + 1
result = self.insert_new(data['alias'], data['cst_type'], position) result = self.insert_new(data['alias'], data['cst_type'], position)
@ -142,16 +207,16 @@ class RSForm:
result.definition_resolved = resolver.resolve(result.definition_raw) result.definition_resolved = resolver.resolve(result.definition_raw)
result.save() result.save()
self.cache.insert(result)
self.on_term_change([result.pk]) self.on_term_change([result.pk])
result.refresh_from_db() result.refresh_from_db()
return result return result
@transaction.atomic
def insert_new( def insert_new(
self, self,
alias: str, alias: str,
cst_type: Optional[CstType] = None, cst_type: Optional[CstType] = None,
position: int = _INSERT_LAST, position: int = INSERT_LAST,
**kwargs **kwargs
) -> Constituenta: ) -> Constituenta:
''' Insert new constituenta at given position. ''' Insert new constituenta at given position.
@ -169,17 +234,18 @@ class RSForm:
cst_type=cst_type, cst_type=cst_type,
**kwargs **kwargs
) )
self.cache.insert(result)
self.save() self.save()
result.refresh_from_db()
return result return result
@transaction.atomic def insert_copy(self, items: list[Constituenta], position: int = INSERT_LAST,
def insert_copy(self, items: list[Constituenta], position: int = _INSERT_LAST) -> list[Constituenta]: initial_mapping: Optional[dict[str, str]] = None) -> list[Constituenta]:
''' Insert copy of target constituents updating references. ''' ''' Insert copy of target constituents updating references. '''
count = len(items) count = len(items)
if count == 0: if count == 0:
return [] return []
self.cache.ensure_loaded()
position = self._get_insert_position(position) position = self._get_insert_position(position)
self._shift_positions(position, count) self._shift_positions(position, count)
@ -187,7 +253,7 @@ class RSForm:
for (value, _) in CstType.choices: for (value, _) in CstType.choices:
indices[value] = self.get_max_index(cast(CstType, value)) indices[value] = self.get_max_index(cast(CstType, value))
mapping: dict[str, str] = {} mapping: dict[str, str] = initial_mapping.copy() if initial_mapping else {}
for cst in items: for cst in items:
indices[cst.cst_type] = indices[cst.cst_type] + 1 indices[cst.cst_type] = indices[cst.cst_type] + 1
newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}' newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}'
@ -200,62 +266,115 @@ class RSForm:
cst.order = position cst.order = position
cst.alias = mapping[cst.alias] cst.alias = mapping[cst.alias]
cst.apply_mapping(mapping) cst.apply_mapping(mapping)
cst.save()
position = position + 1 position = position + 1
new_cst = Constituenta.objects.bulk_create(result)
self.cache.insert_multi(new_cst)
self.save() self.save()
return result return result
@transaction.atomic # pylint: disable=too-many-branches
def move_cst(self, listCst: list[Constituenta], target: int): def update_cst(self, target: Constituenta, data: dict) -> dict:
''' Update persistent attributes of a given constituenta. Return old values. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target.pk)
if cst is None:
raise ValidationError(msg.constituentaNotInRSform(target.alias))
old_data = {}
term_changed = False
if 'convention' in data:
cst.convention = data['convention']
if 'definition_formal' in data:
if cst.definition_formal == data['definition_formal']:
del data['definition_formal']
else:
old_data['definition_formal'] = cst.definition_formal
cst.definition_formal = data['definition_formal']
if 'term_forms' in data:
term_changed = True
old_data['term_forms'] = cst.term_forms
cst.term_forms = data['term_forms']
if 'definition_raw' in data or 'term_raw' in data:
resolver = self.resolver()
if 'term_raw' in data:
if cst.term_raw == data['term_raw']:
del data['term_raw']
else:
term_changed = True
old_data['term_raw'] = cst.term_raw
cst.term_raw = data['term_raw']
cst.term_resolved = resolver.resolve(cst.term_raw)
if 'term_forms' not in data:
cst.term_forms = []
resolver.context[cst.alias] = Entity(cst.alias, cst.term_resolved, manual_forms=cst.term_forms)
if 'definition_raw' in data:
if cst.definition_raw == data['definition_raw']:
del data['definition_raw']
else:
old_data['definition_raw'] = cst.definition_raw
cst.definition_raw = data['definition_raw']
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
if term_changed:
self.on_term_change([cst.pk])
self.save()
return old_data
def move_cst(self, target: list[Constituenta], destination: int) -> None:
''' Move list of constituents to specific position ''' ''' Move list of constituents to specific position '''
count_moved = 0 count_moved = 0
count_top = 0 count_top = 0
count_bot = 0 count_bot = 0
size = len(listCst) size = len(target)
update_list = []
for cst in self.constituents().only('order').order_by('order'): cst_list: Iterable[Constituenta] = []
if cst not in listCst: if not self.cache.is_loaded:
if count_top + 1 < target: cst_list = self.constituents().only('order').order_by('order')
cst.order = count_top + 1 else:
count_top += 1 cst_list = self.cache.constituents
else: for cst in cst_list:
cst.order = target + size + count_bot if cst in target:
count_bot += 1 cst.order = destination + count_moved
else:
cst.order = target + count_moved
count_moved += 1 count_moved += 1
update_list.append(cst) elif count_top + 1 < destination:
Constituenta.objects.bulk_update(update_list, ['order']) cst.order = count_top + 1
count_top += 1
else:
cst.order = destination + size + count_bot
count_bot += 1
Constituenta.objects.bulk_update(cst_list, ['order'])
self.save() self.save()
@transaction.atomic def delete_cst(self, target: Iterable[Constituenta]) -> None:
def delete_cst(self, listCst):
''' Delete multiple constituents. Do not check if listCst are from this schema. ''' ''' Delete multiple constituents. Do not check if listCst are from this schema. '''
for cst in listCst: self.cache.remove_multi(target)
cst.delete() Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
self._reset_order() self._reset_order()
self.resolve_all_text() self.resolve_all_text()
self.save() self.save()
@transaction.atomic def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
def substitute(
self,
original: Constituenta,
substitution: Constituenta
):
''' Execute constituenta substitution. ''' ''' Execute constituenta substitution. '''
assert original.pk != substitution.pk mapping = {}
mapping = {original.alias: substitution.alias} deleted: list[Constituenta] = []
replacements: list[Constituenta] = []
for original, substitution in substitutions:
assert original.pk != substitution.pk
mapping[original.alias] = substitution.alias
deleted.append(original)
replacements.append(substitution)
self.cache.remove_multi(deleted)
Constituenta.objects.filter(pk__in=[cst.pk for cst in deleted]).delete()
self.apply_mapping(mapping) self.apply_mapping(mapping)
original.delete() self.on_term_change([substitution.pk for substitution in replacements])
self.on_term_change([substitution.pk])
def restore_order(self): def restore_order(self) -> None:
''' Restore order based on types and term graph. ''' ''' Restore order based on types and term graph. '''
manager = _OrderManager(self) manager = _OrderManager(self)
manager.restore_order() manager.restore_order()
def reset_aliases(self): def reset_aliases(self) -> None:
''' Recreate all aliases based on constituents order. ''' ''' Recreate all aliases based on constituents order. '''
mapping = self._create_reset_mapping() mapping = self._create_reset_mapping()
self.apply_mapping(mapping, change_aliases=True) self.apply_mapping(mapping, change_aliases=True)
@ -273,33 +392,50 @@ class RSForm:
mapping[cst.alias] = alias mapping[cst.alias] = alias
return mapping return mapping
@transaction.atomic def change_cst_type(self, target: int, new_type: CstType) -> bool:
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False): ''' Change type of constituenta generating alias automatically. '''
''' Apply rename mapping. ''' self.cache.ensure_loaded()
cst_list = self.constituents().order_by('order') cst = self.cache.by_id.get(target)
for cst in cst_list: if cst is None:
if cst.apply_mapping(mapping, change_aliases): return False
cst.save() newAlias = f'{get_type_prefix(new_type)}{self.get_max_index(new_type) + 1}'
mapping = {cst.alias: newAlias}
cst.cst_type = new_type
cst.alias = newAlias
cst.save(update_fields=['cst_type', 'alias'])
self.apply_mapping(mapping)
return True
@transaction.atomic def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
def resolve_all_text(self): ''' Apply rename mapping. '''
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.apply_mapping(mapping, change_aliases):
update_list.append(cst)
Constituenta.objects.bulk_update(update_list, ['alias', 'definition_formal', 'term_raw', 'definition_raw'])
self.save()
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. ''' ''' Trigger reference resolution for all texts. '''
self.cache.ensure_loaded()
graph_terms = self._graph_term() graph_terms = self._graph_term()
resolver = Resolver({}) resolver = Resolver({})
update_list: list[Constituenta] = []
for cst_id in graph_terms.topological_order(): for cst_id in graph_terms.topological_order():
cst = self.constituents().get(id=cst_id) cst = self.cache.by_id[cst_id]
resolved = resolver.resolve(cst.term_raw) resolved = resolver.resolve(cst.term_raw)
resolver.context[cst.alias] = Entity(cst.alias, resolved) resolver.context[cst.alias] = Entity(cst.alias, resolved)
if resolved != cst.term_resolved: cst.term_resolved = resolved
cst.term_resolved = resolved update_list.append(cst)
cst.save() Constituenta.objects.bulk_update(update_list, ['term_resolved'])
for cst in self.constituents():
resolved = resolver.resolve(cst.definition_raw) for cst in self.cache.constituents:
if resolved != cst.definition_resolved: resolved = resolver.resolve(cst.definition_raw)
cst.definition_resolved = resolved cst.definition_resolved = resolved
cst.save() Constituenta.objects.bulk_update(self.cache.constituents, ['definition_resolved'])
@transaction.atomic
def create_version(self, version: str, description: str, data) -> Version: def create_version(self, version: str, description: str, data) -> Version:
''' Creates version for current state. ''' ''' Creates version for current state. '''
return Version.objects.create( return Version.objects.create(
@ -309,7 +445,6 @@ class RSForm:
data=data data=data
) )
@transaction.atomic
def produce_structure(self, target: Constituenta, parse: dict) -> list[int]: def produce_structure(self, target: Constituenta, parse: dict) -> list[int]:
''' Add constituents for each structural element of the target. ''' ''' Add constituents for each structural element of the target. '''
expressions = generate_structure( expressions = generate_structure(
@ -320,9 +455,10 @@ class RSForm:
count_new = len(expressions) count_new = len(expressions)
if count_new == 0: if count_new == 0:
return [] return []
position = target.order + 1
self._shift_positions(position, count_new)
position = target.order + 1
self.cache.ensure_loaded()
self._shift_positions(position, count_new)
result = [] result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self.get_max_index(cst_type) + 1 free_index = self.get_max_index(cst_type) + 1
@ -339,97 +475,86 @@ class RSForm:
free_index = free_index + 1 free_index = free_index + 1
position = position + 1 position = position + 1
self.cache.clear()
self.save() self.save()
return result return result
def _shift_positions(self, start: int, shift: int): def _shift_positions(self, start: int, shift: int) -> None:
if shift == 0: if shift == 0:
return return
update_list = \ update_list: Iterable[Constituenta] = []
Constituenta.objects \ if not self.cache.is_loaded:
.only('order') \ update_list = Constituenta.objects \
.filter(schema=self.model, order__gte=start) .only('order') \
.filter(schema=self.model, order__gte=start)
else:
update_list = [cst for cst in self.cache.constituents if cst.order >= start]
for cst in update_list: for cst in update_list:
cst.order += shift cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order']) Constituenta.objects.bulk_update(update_list, ['order'])
def _get_last_position(self):
if self.constituents().exists():
return self.constituents().count()
else:
return 0
def _get_insert_position(self, position: int) -> int: def _get_insert_position(self, position: int) -> int:
if position <= 0 and position != _INSERT_LAST: if position <= 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition()) raise ValidationError(msg.invalidPosition())
lastPosition = self._get_last_position() lastPosition = self.constituents().count()
if position == _INSERT_LAST: if position == INSERT_LAST:
position = lastPosition + 1 position = lastPosition + 1
else: else:
position = max(1, min(position, lastPosition + 1)) position = max(1, min(position, lastPosition + 1))
return position return position
@transaction.atomic def _reset_order(self) -> None:
def _reset_order(self):
order = 1 order = 1
for cst in self.constituents().only('order').order_by('order'): changed: list[Constituenta] = []
cst_list: Iterable[Constituenta] = []
if not self.cache.is_loaded:
cst_list = self.constituents().only('order').order_by('order')
else:
cst_list = self.cache.constituents
for cst in cst_list:
if cst.order != order: if cst.order != order:
cst.order = order cst.order = order
cst.save() changed.append(cst)
order += 1 order += 1
Constituenta.objects.bulk_update(changed, ['order'])
def _graph_formal(self) -> Graph[int]: def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. ''' ''' Graph based on formal definitions. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph() result: Graph[int] = Graph()
cst_list = \ for cst in self.cache.constituents:
self.constituents() \
.only('alias', 'definition_formal') \
.order_by('order')
for cst in cst_list:
result.add_node(cst.pk) result.add_node(cst.pk)
for cst in cst_list: for cst in self.cache.constituents:
for alias in extract_globals(cst.definition_formal): for alias in extract_globals(cst.definition_formal):
try: child = self.cache.by_alias.get(alias)
child = cst_list.get(alias=alias) if child is not None:
result.add_edge(src=child.pk, dest=cst.pk) result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result return result
def _graph_term(self) -> Graph[int]: def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. ''' ''' Graph based on term texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph() result: Graph[int] = Graph()
cst_list = \ for cst in self.cache.constituents:
self.constituents() \
.only('alias', 'term_raw') \
.order_by('order')
for cst in cst_list:
result.add_node(cst.pk) result.add_node(cst.pk)
for cst in cst_list: for cst in self.cache.constituents:
for alias in extract_entities(cst.term_raw): for alias in extract_entities(cst.term_raw):
try: child = self.cache.by_alias.get(alias)
child = cst_list.get(alias=alias) if child is not None:
result.add_edge(src=child.pk, dest=cst.pk) result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result return result
def _graph_text(self) -> Graph[int]: def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. ''' ''' Graph based on definition texts. '''
self.cache.ensure_loaded()
result: Graph[int] = Graph() result: Graph[int] = Graph()
cst_list = \ for cst in self.cache.constituents:
self.constituents() \
.only('alias', 'definition_raw') \
.order_by('order')
for cst in cst_list:
result.add_node(cst.pk) result.add_node(cst.pk)
for cst in cst_list: for cst in self.cache.constituents:
for alias in extract_entities(cst.definition_raw): for alias in extract_entities(cst.definition_raw):
try: child = self.cache.by_alias.get(alias)
child = cst_list.get(alias=alias) if child is not None:
result.add_edge(src=child.pk, dest=cst.pk) result.add_edge(src=child.pk, dest=cst.pk)
except Constituenta.DoesNotExist:
pass
return result return result
@ -437,14 +562,11 @@ class SemanticInfo:
''' Semantic information derived from constituents. ''' ''' Semantic information derived from constituents. '''
def __init__(self, schema: RSForm): def __init__(self, schema: RSForm):
schema.cache.ensure_loaded()
self._graph = schema._graph_formal() self._graph = schema._graph_formal()
self._items = list( self._items = schema.cache.constituents
schema.constituents() self._cst_by_ID = schema.cache.by_id
.only('alias', 'cst_type', 'definition_formal') self._cst_by_alias = schema.cache.by_alias
.order_by('order')
)
self._cst_by_alias = {cst.alias: cst for cst in self._items}
self._cst_by_ID = {cst.pk: cst for cst in self._items}
self.info = { self.info = {
cst.pk: { cst.pk: {
'is_simple': False, 'is_simple': False,
@ -452,7 +574,7 @@ class SemanticInfo:
'parent': cst.pk, 'parent': cst.pk,
'children': [] 'children': []
} }
for cst in self._items for cst in schema.cache.constituents
} }
self._calculate_attributes() self._calculate_attributes()
@ -475,7 +597,7 @@ class SemanticInfo:
''' Access "children" attribute. ''' ''' Access "children" attribute. '''
return cast(list[int], self.info[target]['children']) return cast(list[int], self.info[target]['children'])
def _calculate_attributes(self): def _calculate_attributes(self) -> None:
for cst_id in self._graph.topological_order(): for cst_id in self._graph.topological_order():
cst = self._cst_by_ID[cst_id] cst = self._cst_by_ID[cst_id]
self.info[cst_id]['is_template'] = infer_template(cst.definition_formal) self.info[cst_id]['is_template'] = infer_template(cst.definition_formal)
@ -485,7 +607,7 @@ class SemanticInfo:
parent = self._infer_parent(cst) parent = self._infer_parent(cst)
self.info[cst_id]['parent'] = parent self.info[cst_id]['parent'] = parent
if parent != cst_id: if parent != cst_id:
self.info[parent]['children'].append(cst_id) cast(list[int], self.info[parent]['children']).append(cst_id)
def _infer_simple_expression(self, target: Constituenta) -> bool: def _infer_simple_expression(self, target: Constituenta) -> bool:
if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type): if target.cst_type == CstType.STRUCTURED or is_base_set(target.cst_type):
@ -565,12 +687,8 @@ class _OrderManager:
def __init__(self, schema: RSForm): def __init__(self, schema: RSForm):
self._semantic = schema.semantic() self._semantic = schema.semantic()
self._graph = schema._graph_formal() self._graph = schema._graph_formal()
self._items = list( self._items = schema.cache.constituents
schema.constituents() self._cst_by_ID = schema.cache.by_id
.only('order', 'alias', 'cst_type', 'definition_formal')
.order_by('order')
)
self._cst_by_ID = {cst.pk: cst for cst in self._items}
def restore_order(self) -> None: def restore_order(self) -> None:
''' Implement order restoration process. ''' ''' Implement order restoration process. '''
@ -615,10 +733,9 @@ class _OrderManager:
result.append(child) result.append(child)
self._items = result self._items = result
@transaction.atomic
def _save_order(self) -> None: def _save_order(self) -> None:
order = 1 order = 1
for cst in self._items: for cst in self._items:
cst.order = order cst.order = order
cst.save()
order += 1 order += 1
Constituenta.objects.bulk_update(self._items, ['order'])

View File

@ -1,4 +1,4 @@
''' Django: Models. ''' ''' Django: Models. '''
from .Constituenta import Constituenta, CstType from .Constituenta import Constituenta, CstType, extract_globals, replace_entities, replace_globals
from .RSForm import RSForm from .RSForm import INSERT_LAST, RSForm

View File

@ -2,7 +2,7 @@
import json import json
import re import re
from enum import IntEnum, unique from enum import IntEnum, unique
from typing import Set, Tuple, cast from typing import Tuple, cast
import pyconcept import pyconcept
@ -10,7 +10,6 @@ from shared import messages as msg
from .Constituenta import CstType from .Constituenta import CstType
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_RE_TEMPLATE = r'R\d+' _RE_TEMPLATE = r'R\d+'
_RE_COMPLEX_SYMBOLS = r'[∀∃×ℬ;|:]' _RE_COMPLEX_SYMBOLS = r'[∀∃×ℬ;|:]'
@ -67,11 +66,6 @@ def is_functional(cst_type: CstType) -> bool:
] ]
def extract_globals(expression: str) -> Set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
def guess_type(alias: str) -> CstType: def guess_type(alias: str) -> CstType:
''' Get CstType for alias. ''' ''' Get CstType for alias. '''
prefix = alias[0] prefix = alias[0]

View File

@ -17,6 +17,7 @@ from .data_access import (
CstSerializer, CstSerializer,
CstSubstituteSerializer, CstSubstituteSerializer,
CstTargetSerializer, CstTargetSerializer,
CstUpdateSerializer,
InlineSynthesisSerializer, InlineSynthesisSerializer,
RSFormParseSerializer, RSFormParseSerializer,
RSFormSerializer, RSFormSerializer,

View File

@ -1,9 +1,8 @@
''' Serializers for persistent data manipulation. ''' ''' Serializers for persistent data manipulation. '''
from typing import Optional, cast from typing import cast
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.db.models import Q from django.db.models import Q
from rest_framework import serializers from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField from rest_framework.serializers import PrimaryKeyRelatedField as PKField
@ -39,25 +38,30 @@ class CstSerializer(serializers.ModelSerializer):
fields = '__all__' fields = '__all__'
read_only_fields = ('id', 'schema', 'order', 'alias', 'cst_type', 'definition_resolved', 'term_resolved') read_only_fields = ('id', 'schema', 'order', 'alias', 'cst_type', 'definition_resolved', 'term_resolved')
def update(self, instance: Constituenta, validated_data) -> Constituenta:
data = validated_data # Note: use alias for better code readability class CstUpdateSerializer(serializers.Serializer):
definition: Optional[str] = data['definition_raw'] if 'definition_raw' in data else None ''' Serializer: Constituenta update. '''
term: Optional[str] = data['term_raw'] if 'term_raw' in data else None class ConstituentaUpdateData(serializers.ModelSerializer):
term_changed = 'term_forms' in data ''' Serializer: Operation creation data. '''
schema = RSForm(instance.schema) class Meta:
if definition is not None and definition != instance.definition_raw: ''' serializer metadata. '''
data['definition_resolved'] = schema.resolver().resolve(definition) model = Constituenta
if term is not None and term != instance.term_raw: fields = 'convention', 'definition_formal', 'definition_raw', 'term_raw', 'term_forms'
data['term_resolved'] = schema.resolver().resolve(term)
if data['term_resolved'] != instance.term_resolved and 'term_forms' not in data: target = PKField(
data['term_forms'] = [] many=False,
term_changed = data['term_resolved'] != instance.term_resolved queryset=Constituenta.objects.all().only('convention', 'definition_formal', 'definition_raw', 'term_raw')
result: Constituenta = super().update(instance, data) )
if term_changed: item_data = ConstituentaUpdateData()
schema.on_term_change([result.pk])
result.refresh_from_db() def validate(self, attrs):
schema.save() schema = cast(LibraryItem, self.context['schema'])
return result cst = cast(Constituenta, attrs['target'])
if schema and cst.schema_id != schema.pk:
raise serializers.ValidationError({
f'{cst.pk}': msg.constituentaNotInRSform(schema.title)
})
return attrs
class CstDetailsSerializer(serializers.ModelSerializer): class CstDetailsSerializer(serializers.ModelSerializer):
@ -72,7 +76,12 @@ class CstDetailsSerializer(serializers.ModelSerializer):
class CstCreateSerializer(serializers.ModelSerializer): class CstCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. ''' ''' Serializer: Constituenta creation. '''
insert_after = serializers.IntegerField(required=False, allow_null=True) insert_after = PKField(
many=False,
allow_null=True,
required=False,
queryset=Constituenta.objects.all().only('schema_id', 'order')
)
alias = serializers.CharField(max_length=8) alias = serializers.CharField(max_length=8)
cst_type = serializers.ChoiceField(CstType.choices) cst_type = serializers.ChoiceField(CstType.choices)
@ -117,7 +126,7 @@ class RSFormSerializer(serializers.ModelSerializer):
for link in Inheritance.objects.filter(Q(child__schema=instance) | Q(parent__schema=instance)): for link in Inheritance.objects.filter(Q(child__schema=instance) | Q(parent__schema=instance)):
result['inheritance'].append([link.child.pk, link.parent.pk]) result['inheritance'].append([link.child.pk, link.parent.pk])
result['oss'] = [] result['oss'] = []
for oss in LibraryItem.objects.filter(items__result=instance).only('alias'): for oss in LibraryItem.objects.filter(operations__result=instance).only('alias'):
result['oss'].append({ result['oss'].append({
'id': oss.pk, 'id': oss.pk,
'alias': oss.alias 'alias': oss.alias
@ -149,7 +158,6 @@ class RSFormSerializer(serializers.ModelSerializer):
result['version'] = version result['version'] = version
return result | data return result | data
@transaction.atomic
def restore_from_version(self, data: dict): def restore_from_version(self, data: dict):
''' Load data from version. ''' ''' Load data from version. '''
schema = RSForm(cast(LibraryItem, self.instance)) schema = RSForm(cast(LibraryItem, self.instance))
@ -243,7 +251,7 @@ class CstTargetSerializer(serializers.Serializer):
class CstRenameSerializer(serializers.Serializer): class CstRenameSerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. ''' ''' Serializer: Constituenta renaming. '''
target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'schema')) target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'cst_type', 'schema'))
alias = serializers.CharField() alias = serializers.CharField()
cst_type = serializers.CharField() cst_type = serializers.CharField()
@ -312,9 +320,9 @@ class CstSubstituteSerializer(serializers.Serializer):
raise serializers.ValidationError({ raise serializers.ValidationError({
f'{original_cst.pk}': msg.substituteDouble(original_cst.alias) f'{original_cst.pk}': msg.substituteDouble(original_cst.alias)
}) })
if original_cst.alias == substitution_cst.alias: if original_cst.pk == substitution_cst.pk:
raise serializers.ValidationError({ raise serializers.ValidationError({
'alias': msg.substituteTrivial(original_cst.alias) 'original': msg.substituteTrivial(original_cst.alias)
}) })
if original_cst.schema_id != schema.pk: if original_cst.schema_id != schema.pk:
raise serializers.ValidationError({ raise serializers.ValidationError({

View File

@ -58,3 +58,28 @@ class TestConstituenta(TestCase):
self.assertEqual(cst.term_forms, []) self.assertEqual(cst.term_forms, [])
self.assertEqual(cst.definition_resolved, '') self.assertEqual(cst.definition_resolved, '')
self.assertEqual(cst.definition_raw, '') self.assertEqual(cst.definition_raw, '')
def test_extract_references(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 X2',
term_raw='@{X3|sing} is a @{X4|sing}',
definition_raw='@{X5|sing}'
)
self.assertEqual(cst.extract_references(), set(['X1', 'X2', 'X3', 'X4', 'X5']))
def text_apply_mapping(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 = X2',
term_raw='@{X1|sing}',
definition_raw='@{X2|sing}'
)
cst.apply_mapping({'X1': 'X3', 'X2': 'X4'})
self.assertEqual(cst.definition_formal, 'X3 = X4')
self.assertEqual(cst.term_raw, '@{X3|sing}')
self.assertEqual(cst.definition_raw, '@{X4|sing}')

View File

@ -1,15 +1,16 @@
''' Testing models: api_RSForm. ''' ''' Testing models: api_RSForm. '''
from django.forms import ValidationError from django.forms import ValidationError
from django.test import TestCase
from apps.rsform.models import Constituenta, CstType, RSForm from apps.rsform.models import Constituenta, CstType, RSForm
from apps.users.models import User from apps.users.models import User
from shared.DBTester import DBTester
class TestRSForm(TestCase): class TestRSForm(DBTester):
''' Testing RSForm wrapper. ''' ''' Testing RSForm wrapper. '''
def setUp(self): def setUp(self):
super().setUp()
self.user1 = User.objects.create(username='User1') self.user1 = User.objects.create(username='User1')
self.user2 = User.objects.create(username='User2') self.user2 = User.objects.create(username='User2')
self.schema = RSForm.create(title='Test') self.schema = RSForm.create(title='Test')
@ -180,7 +181,6 @@ class TestRSForm(TestCase):
alias='D1', alias='D1',
definition_formal='X1 = X11 = X2', definition_formal='X1 = X11 = X2',
definition_raw='@{X11|sing}', definition_raw='@{X11|sing}',
convention='X1',
term_raw='@{X1|plur}' term_raw='@{X1|plur}'
) )
@ -188,7 +188,6 @@ class TestRSForm(TestCase):
d1.refresh_from_db() d1.refresh_from_db()
self.assertEqual(d1.definition_formal, 'X3 = X4 = X2', msg='Map IDs in expression') self.assertEqual(d1.definition_formal, 'X3 = X4 = X2', msg='Map IDs in expression')
self.assertEqual(d1.definition_raw, '@{X4|sing}', msg='Map IDs in definition') self.assertEqual(d1.definition_raw, '@{X4|sing}', msg='Map IDs in definition')
self.assertEqual(d1.convention, 'X3', msg='Map IDs in convention')
self.assertEqual(d1.term_raw, '@{X3|plur}', msg='Map IDs in term') self.assertEqual(d1.term_raw, '@{X3|plur}', msg='Map IDs in term')
self.assertEqual(d1.term_resolved, '', msg='Do not run resolve on mapping') self.assertEqual(d1.term_resolved, '', msg='Do not run resolve on mapping')
self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping') self.assertEqual(d1.definition_resolved, '', msg='Do not run resolve on mapping')
@ -208,7 +207,7 @@ class TestRSForm(TestCase):
definition_formal=x1.alias definition_formal=x1.alias
) )
self.schema.substitute(x1, x2) self.schema.substitute([(x1, x2)])
x2.refresh_from_db() x2.refresh_from_db()
d1.refresh_from_db() d1.refresh_from_db()
self.assertEqual(self.schema.constituents().count(), 2) self.assertEqual(self.schema.constituents().count(), 2)
@ -320,7 +319,6 @@ class TestRSForm(TestCase):
x2 = self.schema.insert_new('X21') x2 = self.schema.insert_new('X21')
d1 = self.schema.insert_new( d1 = self.schema.insert_new(
alias='D11', alias='D11',
convention='D11 - cool',
definition_formal='X21=X21', definition_formal='X21=X21',
term_raw='@{X21|sing}', term_raw='@{X21|sing}',
definition_raw='@{X11|datv}', definition_raw='@{X11|datv}',
@ -335,7 +333,6 @@ class TestRSForm(TestCase):
self.assertEqual(x1.alias, 'X1') self.assertEqual(x1.alias, 'X1')
self.assertEqual(x2.alias, 'X2') self.assertEqual(x2.alias, 'X2')
self.assertEqual(d1.alias, 'D1') self.assertEqual(d1.alias, 'D1')
self.assertEqual(d1.convention, 'D1 - cool')
self.assertEqual(d1.term_raw, '@{X2|sing}') self.assertEqual(d1.term_raw, '@{X2|sing}')
self.assertEqual(d1.definition_raw, '@{X1|datv}') self.assertEqual(d1.definition_raw, '@{X1|datv}')
self.assertEqual(d1.definition_resolved, 'test') self.assertEqual(d1.definition_resolved, 'test')

View File

@ -100,7 +100,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(response.data['items'][1]['id'], x2.pk) self.assertEqual(response.data['items'][1]['id'], x2.pk)
self.assertEqual(response.data['items'][1]['term_raw'], x2.term_raw) self.assertEqual(response.data['items'][1]['term_raw'], x2.term_raw)
self.assertEqual(response.data['items'][1]['term_resolved'], x2.term_resolved) self.assertEqual(response.data['items'][1]['term_resolved'], x2.term_resolved)
self.assertEqual(response.data['subscribers'], [self.user.pk]) self.assertEqual(response.data['subscribers'], [])
self.assertEqual(response.data['editors'], []) self.assertEqual(response.data['editors'], [])
self.assertEqual(response.data['inheritance'], []) self.assertEqual(response.data['inheritance'], [])
self.assertEqual(response.data['oss'], []) self.assertEqual(response.data['oss'], [])
@ -183,9 +183,10 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post') @decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
def test_create_constituenta(self): def test_create_constituenta(self):
data = {'alias': 'X3'} data = {'alias': 'X3', 'cst_type': CstType.BASE}
self.executeForbidden(data=data, item=self.unowned_id) self.executeForbidden(data=data, item=self.unowned_id)
data = {'alias': 'X3'}
self.owned.insert_new('X1') self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2') x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id) self.executeBadData(item=self.owned_id)
@ -524,7 +525,7 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch') @decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_partial_update(self): def test_partial_update(self):
data = {'id': self.cst1.pk, 'convention': 'tt'} data = {'target': self.cst1.pk, 'item_data': {'convention': 'tt'}}
self.executeForbidden(data=data, schema=self.rsform_unowned.model.pk) self.executeForbidden(data=data, schema=self.rsform_unowned.model.pk)
self.logout() self.logout()
@ -542,9 +543,11 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch') @decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_resolved_no_refs(self): def test_update_resolved_no_refs(self):
data = { data = {
'id': self.cst3.pk, 'target': self.cst3.pk,
'term_raw': 'New term', 'item_data': {
'definition_raw': 'New def' 'term_raw': 'New term',
'definition_raw': 'New def'
}
} }
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk) response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db() self.cst3.refresh_from_db()
@ -557,9 +560,11 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch') @decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_resolved_refs(self): def test_update_resolved_refs(self):
data = { data = {
'id': self.cst3.pk, 'target': self.cst3.pk,
'term_raw': '@{X1|nomn,sing}', 'item_data': {
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}' 'term_raw': '@{X1|nomn,sing}',
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}'
}
} }
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk) response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db() self.cst3.refresh_from_db()
@ -568,13 +573,31 @@ class TestConstituentaAPI(EndpointTester):
self.assertEqual(self.cst3.definition_resolved, f'{self.cst1.term_resolved} form1') self.assertEqual(self.cst3.definition_resolved, f'{self.cst1.term_resolved} form1')
self.assertEqual(response.data['definition_resolved'], f'{self.cst1.term_resolved} form1') self.assertEqual(response.data['definition_resolved'], f'{self.cst1.term_resolved} form1')
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_update_term_forms(self):
data = {
'target': self.cst3.pk,
'item_data': {
'definition_raw': '@{X3|sing,datv}',
'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}]
}
}
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.cst3.refresh_from_db()
self.assertEqual(self.cst3.definition_resolved, 'form1')
self.assertEqual(response.data['definition_resolved'], 'form1')
self.assertEqual(self.cst3.term_forms, data['item_data']['term_forms'])
self.assertEqual(response.data['term_forms'], data['item_data']['term_forms'])
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch') @decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
def test_readonly_cst_fields(self): def test_readonly_cst_fields(self):
data = { data = {
'id': self.cst1.pk, 'target': self.cst1.pk,
'alias': 'X33', 'item_data': {
'order': 10 'alias': 'X33',
'order': 10
}
} }
response = self.executeOK(data=data, schema=self.rsform_owned.model.pk) response = self.executeOK(data=data, schema=self.rsform_owned.model.pk)
self.assertEqual(response.data['alias'], 'X1') self.assertEqual(response.data['alias'], 'X1')

View File

@ -16,6 +16,7 @@ from rest_framework.serializers import ValidationError
from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead
from apps.library.serializers import LibraryItemSerializer from apps.library.serializers import LibraryItemSerializer
from apps.oss.models import ChangeManager
from apps.users.models import User from apps.users.models import User
from shared import messages as msg from shared import messages as msg
from shared import permissions, utility from shared import permissions, utility
@ -40,14 +41,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
if self.action in [ if self.action in [
'load_trs', 'load_trs',
'create_cst', 'create_cst',
'delete_multiple_cst',
'rename_cst', 'rename_cst',
'update_cst',
'move_cst', 'move_cst',
'delete_multiple_cst',
'substitute', 'substitute',
'restore_order', 'restore_order',
'reset_aliases', 'reset_aliases',
'produce_structure', 'produce_structure',
'update_cst'
]: ]:
permission_list = [permissions.ItemEditor] permission_list = [permissions.ItemEditor]
elif self.action in [ elif self.action in [
@ -74,34 +75,36 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['post'], url_path='create-cst') @action(detail=True, methods=['post'], url_path='create-cst')
def create_cst(self, request: Request, pk): def create_cst(self, request: Request, pk) -> HttpResponse:
''' Create new constituenta. ''' ''' Create new constituenta. '''
schema = self._get_item()
serializer = s.CstCreateSerializer(data=request.data) serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
data = serializer.validated_data data = serializer.validated_data
if 'insert_after' in data and data['insert_after'] is not None: if 'insert_after' not in data:
try:
insert_after = m.Constituenta.objects.get(pk=data['insert_after'])
except LibraryItem.DoesNotExist:
return Response(status=c.HTTP_404_NOT_FOUND)
else:
insert_after = None insert_after = None
new_cst = m.RSForm(schema).create_cst(data, insert_after) else:
insert_after = data['insert_after']
schema = m.RSForm(self._get_item())
with transaction.atomic():
new_cst = schema.create_cst(data, insert_after)
hosts = LibraryItem.objects.filter(operations__result=schema.model)
for host in hosts:
ChangeManager(host).on_create_cst(new_cst, schema)
schema.refresh_from_db()
return Response( return Response(
status=c.HTTP_201_CREATED, status=c.HTTP_201_CREATED,
data={ data={
'new_cst': s.CstSerializer(new_cst).data, 'new_cst': s.CstSerializer(new_cst).data,
'schema': s.RSFormParseSerializer(schema).data 'schema': s.RSFormParseSerializer(schema.model).data
} }
) )
@extend_schema( @extend_schema(
summary='update persistent attributes of a given constituenta', summary='update persistent attributes of a given constituenta',
tags=['RSForm'], tags=['RSForm'],
request=s.CstSerializer, request=s.CstUpdateSerializer,
responses={ responses={
c.HTTP_200_OK: s.CstSerializer, c.HTTP_200_OK: s.CstSerializer,
c.HTTP_400_BAD_REQUEST: None, c.HTTP_400_BAD_REQUEST: None,
@ -110,22 +113,24 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='update-cst') @action(detail=True, methods=['patch'], url_path='update-cst')
def update_cst(self, request: Request, pk): def update_cst(self, request: Request, pk) -> HttpResponse:
''' Update persistent attributes of a given constituenta. ''' ''' Update persistent attributes of a given constituenta. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstSerializer(data=request.data, partial=True) serializer = s.CstUpdateSerializer(data=request.data, partial=True, context={'schema': model})
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
cst = m.Constituenta.objects.get(pk=request.data['id']) cst = cast(m.Constituenta, serializer.validated_data['target'])
if cst.schema != schema: schema = m.RSForm(model)
raise ValidationError({ data = serializer.validated_data['item_data']
'schema': msg.constituentaNotInRSform(schema.title) with transaction.atomic():
}) hosts = LibraryItem.objects.filter(operations__result=model)
serializer.update(instance=cst, validated_data=serializer.validated_data) old_data = schema.update_cst(cst, data)
for host in hosts:
ChangeManager(host).on_update_cst(cst, data, old_data, schema)
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.CstSerializer(cst).data data=s.CstSerializer(m.Constituenta.objects.get(pk=request.data['target'])).data
) )
@extend_schema( @extend_schema(
@ -140,11 +145,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='produce-structure') @action(detail=True, methods=['patch'], url_path='produce-structure')
def produce_structure(self, request: Request, pk): def produce_structure(self, request: Request, pk) -> HttpResponse:
''' Produce a term for every element of the target constituenta typification. ''' ''' Produce a term for every element of the target constituenta typification. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstTargetSerializer(data=request.data, context={'schema': schema}) serializer = s.CstTargetSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target']) cst = cast(m.Constituenta, serializer.validated_data['target'])
if cst.cst_type not in [m.CstType.FUNCTION, m.CstType.STRUCTURED, m.CstType.TERM]: if cst.cst_type not in [m.CstType.FUNCTION, m.CstType.STRUCTURED, m.CstType.TERM]:
@ -152,20 +157,20 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
f'{cst.pk}': msg.constituentaNoStructure() f'{cst.pk}': msg.constituentaNoStructure()
}) })
schema_details = s.RSFormParseSerializer(schema).data['items'] schema_details = s.RSFormParseSerializer(model).data['items']
cst_parse = next(item for item in schema_details if item['id'] == cst.pk)['parse'] cst_parse = next(item for item in schema_details if item['id'] == cst.pk)['parse']
if not cst_parse['typification']: if not cst_parse['typification']:
return Response( return Response(
status=c.HTTP_400_BAD_REQUEST, status=c.HTTP_400_BAD_REQUEST,
data={f'{cst.pk}': msg.constituentaNoStructure()} data={f'{cst.pk}': msg.constituentaNoStructure()}
) )
with transaction.atomic():
result = m.RSForm(schema).produce_structure(cst, cst_parse) result = m.RSForm(model).produce_structure(cst, cst_parse)
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data={ data={
'cst_list': result, 'cst_list': result,
'schema': s.RSFormParseSerializer(schema).data 'schema': s.RSFormParseSerializer(model).data
} }
) )
@ -181,29 +186,33 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='rename-cst') @action(detail=True, methods=['patch'], url_path='rename-cst')
def rename_cst(self, request: Request, pk): def rename_cst(self, request: Request, pk) -> HttpResponse:
''' Rename constituenta possibly changing type. ''' ''' Rename constituenta possibly changing type. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': schema}) serializer = s.CstRenameSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target']) cst = cast(m.Constituenta, serializer.validated_data['target'])
old_alias = cst.alias changed_type = cst.cst_type != serializer.validated_data['cst_type']
mapping = {cst.alias: serializer.validated_data['alias']}
cst.alias = serializer.validated_data['alias'] cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type'] cst.cst_type = serializer.validated_data['cst_type']
schema = m.RSForm(model)
with transaction.atomic(): with transaction.atomic():
cst.save() cst.save()
m.RSForm(schema).apply_mapping(mapping={old_alias: cst.alias}, change_aliases=False) schema.apply_mapping(mapping=mapping, change_aliases=False)
cst.refresh_from_db()
if changed_type:
hosts = LibraryItem.objects.filter(operations__result=model)
for host in hosts:
ChangeManager(host).on_change_cst_type(cst, schema)
schema.refresh_from_db()
cst.refresh_from_db()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data={ data={
'new_cst': s.CstSerializer(cst).data, 'new_cst': s.CstSerializer(cst).data,
'schema': s.RSFormParseSerializer(schema).data 'schema': s.RSFormParseSerializer(model).data
} }
) )
@ -219,25 +228,27 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='substitute') @action(detail=True, methods=['patch'], url_path='substitute')
def substitute(self, request: Request, pk): def substitute(self, request: Request, pk) -> HttpResponse:
''' Substitute occurrences of constituenta with another one. ''' ''' Substitute occurrences of constituenta with another one. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstSubstituteSerializer( serializer = s.CstSubstituteSerializer(
data=request.data, data=request.data,
context={'schema': schema} context={'schema': model}
) )
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
with transaction.atomic(): with transaction.atomic():
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
for substitution in serializer.validated_data['substitutions']: for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original']) original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution']) replacement = cast(m.Constituenta, substitution['substitution'])
m.RSForm(schema).substitute(original, replacement) substitutions.append((original, replacement))
m.RSForm(model).substitute(substitutions)
schema.refresh_from_db() model.refresh_from_db()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data data=s.RSFormParseSerializer(model).data
) )
@extend_schema( @extend_schema(
@ -252,20 +263,19 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='delete-multiple-cst') @action(detail=True, methods=['patch'], url_path='delete-multiple-cst')
def delete_multiple_cst(self, request: Request, pk): def delete_multiple_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete multiple constituents. ''' ''' Endpoint: Delete multiple constituents. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstListSerializer( serializer = s.CstListSerializer(
data=request.data, data=request.data,
context={'schema': schema} context={'schema': model}
) )
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
m.RSForm(schema).delete_cst(serializer.validated_data['items']) with transaction.atomic():
m.RSForm(model).delete_cst(serializer.validated_data['items'])
schema.refresh_from_db()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data data=s.RSFormParseSerializer(model).data
) )
@extend_schema( @extend_schema(
@ -280,21 +290,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='move-cst') @action(detail=True, methods=['patch'], url_path='move-cst')
def move_cst(self, request: Request, pk): def move_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Move multiple constituents. ''' ''' Endpoint: Move multiple constituents. '''
schema = self._get_item() model = self._get_item()
serializer = s.CstMoveSerializer( serializer = s.CstMoveSerializer(
data=request.data, data=request.data,
context={'schema': schema} context={'schema': model}
) )
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
m.RSForm(schema).move_cst( with transaction.atomic():
listCst=serializer.validated_data['items'], m.RSForm(model).move_cst(
target=serializer.validated_data['move_to'] target=serializer.validated_data['items'],
) destination=serializer.validated_data['move_to']
)
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data data=s.RSFormParseSerializer(model).data
) )
@extend_schema( @extend_schema(
@ -308,13 +319,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='reset-aliases') @action(detail=True, methods=['patch'], url_path='reset-aliases')
def reset_aliases(self, request: Request, pk): def reset_aliases(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Recreate all aliases based on order. ''' ''' Endpoint: Recreate all aliases based on order. '''
schema = self._get_item() model = self._get_item()
m.RSForm(schema).reset_aliases() m.RSForm(model).reset_aliases()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data data=s.RSFormParseSerializer(model).data
) )
@extend_schema( @extend_schema(
@ -328,13 +339,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='restore-order') @action(detail=True, methods=['patch'], url_path='restore-order')
def restore_order(self, request: Request, pk): def restore_order(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Restore order based on types and term graph. ''' ''' Endpoint: Restore order based on types and term graph. '''
schema = self._get_item() model = self._get_item()
m.RSForm(schema).restore_order() m.RSForm(model).restore_order()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data data=s.RSFormParseSerializer(model).data
) )
@extend_schema( @extend_schema(
@ -349,15 +360,15 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['patch'], url_path='load-trs') @action(detail=True, methods=['patch'], url_path='load-trs')
def load_trs(self, request: Request, pk): def load_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Load data from file and replace current schema. ''' ''' Endpoint: Load data from file and replace current schema. '''
input_serializer = s.RSFormUploadSerializer(data=request.data) input_serializer = s.RSFormUploadSerializer(data=request.data)
input_serializer.is_valid(raise_exception=True) input_serializer.is_valid(raise_exception=True)
schema = self._get_item() model = self._get_item()
load_metadata = input_serializer.validated_data['load_metadata'] load_metadata = input_serializer.validated_data['load_metadata']
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME) data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
data['id'] = schema.pk data['id'] = model.pk
serializer = s.RSFormTRSSerializer( serializer = s.RSFormTRSSerializer(
data=data, data=data,
@ -380,7 +391,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['get'], url_path='contents') @action(detail=True, methods=['get'], url_path='contents')
def contents(self, request: Request, pk): def contents(self, request: Request, pk) -> HttpResponse:
''' Endpoint: View schema db contents (including constituents). ''' ''' Endpoint: View schema db contents (including constituents). '''
serializer = s.RSFormSerializer(self.get_object()) serializer = s.RSFormSerializer(self.get_object())
return Response( return Response(
@ -398,7 +409,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['get'], url_path='details') @action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk): def details(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Detailed schema view including statuses and parse. ''' ''' Endpoint: Detailed schema view including statuses and parse. '''
serializer = s.RSFormParseSerializer(self.get_object()) serializer = s.RSFormParseSerializer(self.get_object())
return Response( return Response(
@ -416,7 +427,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
}, },
) )
@action(detail=True, methods=['post'], url_path='check') @action(detail=True, methods=['post'], url_path='check')
def check(self, request: Request, pk): def check(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Check RSLang expression against schema context. ''' ''' Endpoint: Check RSLang expression against schema context. '''
serializer = s.ExpressionSerializer(data=request.data) serializer = s.ExpressionSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -438,7 +449,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['post'], url_path='resolve') @action(detail=True, methods=['post'], url_path='resolve')
def resolve(self, request: Request, pk): def resolve(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Resolve references in text against schema terms context. ''' ''' Endpoint: Resolve references in text against schema terms context. '''
serializer = s.TextSerializer(data=request.data) serializer = s.TextSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -460,12 +471,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
} }
) )
@action(detail=True, methods=['get'], url_path='export-trs') @action(detail=True, methods=['get'], url_path='export-trs')
def export_trs(self, request: Request, pk): def export_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Download Exteor compatible file. ''' ''' Endpoint: Download Exteor compatible file. '''
schema = self._get_item() model = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(schema)).data data = s.RSFormTRSSerializer(m.RSForm(model)).data
file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME) file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
filename = utils.filename_for_schema(schema.alias) filename = utils.filename_for_schema(model.alias)
response = HttpResponse(file, content_type='application/zip') response = HttpResponse(file, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename={filename}' response['Content-Disposition'] = f'attachment; filename={filename}'
return response return response
@ -485,7 +496,7 @@ class TrsImportView(views.APIView):
c.HTTP_403_FORBIDDEN: None c.HTTP_403_FORBIDDEN: None
} }
) )
def post(self, request: Request): def post(self, request: Request) -> HttpResponse:
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME) data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
owner = cast(User, self.request.user) owner = cast(User, self.request.user)
_prepare_rsform_data(data, request, owner) _prepare_rsform_data(data, request, owner)
@ -512,7 +523,7 @@ class TrsImportView(views.APIView):
} }
) )
@api_view(['POST']) @api_view(['POST'])
def create_rsform(request: Request): def create_rsform(request: Request) -> HttpResponse:
''' Endpoint: Create RSForm from user input and/or trs file. ''' ''' Endpoint: Create RSForm from user input and/or trs file. '''
owner = cast(User, request.user) if not request.user.is_anonymous else None owner = cast(User, request.user) if not request.user.is_anonymous else None
if 'file' not in request.FILES: if 'file' not in request.FILES:
@ -564,7 +575,7 @@ def _prepare_rsform_data(data: dict, request: Request, owner: Union[User, None])
responses={c.HTTP_200_OK: s.RSFormParseSerializer} responses={c.HTTP_200_OK: s.RSFormParseSerializer}
) )
@api_view(['PATCH']) @api_view(['PATCH'])
def inline_synthesis(request: Request): def inline_synthesis(request: Request) -> HttpResponse:
''' Endpoint: Inline synthesis. ''' ''' Endpoint: Inline synthesis. '''
serializer = s.InlineSynthesisSerializer( serializer = s.InlineSynthesisSerializer(
data=request.data, data=request.data,
@ -577,16 +588,18 @@ def inline_synthesis(request: Request):
with transaction.atomic(): with transaction.atomic():
new_items = receiver.insert_copy(items) new_items = receiver.insert_copy(items)
substitutions: list[tuple[m.Constituenta, m.Constituenta]] = []
for substitution in serializer.validated_data['substitutions']: for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original']) original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution']) replacement = cast(m.Constituenta, substitution['substitution'])
if original in items: if original in items:
index = next(i for (i, cst) in enumerate(items) if cst == original) index = next(i for (i, cst) in enumerate(items) if cst.pk == original.pk)
original = new_items[index] original = new_items[index]
else: else:
index = next(i for (i, cst) in enumerate(items) if cst == replacement) index = next(i for (i, cst) in enumerate(items) if cst.pk == replacement.pk)
replacement = new_items[index] replacement = new_items[index]
receiver.substitute(original, replacement) substitutions.append((original, replacement))
receiver.substitute(substitutions)
receiver.restore_order() receiver.restore_order()
return Response( return Response(

View File

@ -134,7 +134,6 @@ class TestUserUserProfileAPIView(EndpointTester):
def test_password_reset_request(self): def test_password_reset_request(self):
self.executeBadData({'email': 'invalid@mail.ru'}) self.executeBadData({'email': 'invalid@mail.ru'})
self.executeOK({'email': self.user.email}) self.executeOK({'email': self.user.email})
# TODO: check if mail server actually sent email and if reset procedure works
class TestSignupAPIView(EndpointTester): class TestSignupAPIView(EndpointTester):

View File

@ -0,0 +1,23 @@
''' Utils: tester for database operations. '''
import logging
from django.db import connection
from rest_framework.test import APITestCase
class DBTester(APITestCase):
''' Abstract base class for Testing database. '''
def setUp(self):
self.logger = logging.getLogger('django.db.backends')
self.logger.setLevel(logging.DEBUG)
def start_db_log(self):
''' Warning! Do not use this second time before calling stop_db_log. '''
''' Warning! Do not forget to enable global logging in settings. '''
logging.disable(logging.NOTSET)
connection.force_debug_cursor = True
def stop_db_log(self):
connection.force_debug_cursor = False
logging.disable(logging.CRITICAL)

View File

@ -1,13 +1,12 @@
''' Utils: base tester class for endpoints. ''' ''' Utils: base tester class for endpoints. '''
import logging
from django.db import connection
from rest_framework import status from rest_framework import status
from rest_framework.test import APIClient, APIRequestFactory, APITestCase from rest_framework.test import APIClient, APIRequestFactory
from apps.library.models import Editor, LibraryItem from apps.library.models import Editor, LibraryItem
from apps.users.models import User from apps.users.models import User
from .DBTester import DBTester
def decl_endpoint(endpoint: str, method: str): def decl_endpoint(endpoint: str, method: str):
''' Decorator for EndpointTester methods to provide API attributes. ''' ''' Decorator for EndpointTester methods to provide API attributes. '''
@ -25,10 +24,11 @@ def decl_endpoint(endpoint: str, method: str):
return set_endpoint_inner return set_endpoint_inner
class EndpointTester(APITestCase): class EndpointTester(DBTester):
''' Abstract base class for Testing endpoints. ''' ''' Abstract base class for Testing endpoints. '''
def setUp(self): def setUp(self):
super().setUp()
self.factory = APIRequestFactory() self.factory = APIRequestFactory()
self.user = User.objects.create( self.user = User.objects.create(
username='UserTest', username='UserTest',
@ -43,9 +43,6 @@ class EndpointTester(APITestCase):
self.client = APIClient() self.client = APIClient()
self.client.force_authenticate(user=self.user) self.client.force_authenticate(user=self.user)
self.logger = logging.getLogger('django.db.backends')
self.logger.setLevel(logging.DEBUG)
def setUpFullUsers(self): def setUpFullUsers(self):
self.factory = APIRequestFactory() self.factory = APIRequestFactory()
self.user = User.objects.create_user( self.user = User.objects.create_user(
@ -77,16 +74,6 @@ class EndpointTester(APITestCase):
def logout(self): def logout(self):
self.client.logout() self.client.logout()
def start_db_log(self):
''' Warning! Do not use this second time before calling stop_db_log. '''
''' Warning! Do not forget to enable global logging in settings. '''
logging.disable(logging.NOTSET)
connection.force_debug_cursor = True
def stop_db_log(self):
connection.force_debug_cursor = False
logging.disable(logging.CRITICAL)
def set_params(self, **kwargs): def set_params(self, **kwargs):
''' Given named argument values resolve current endpoint_mask. ''' ''' Given named argument values resolve current endpoint_mask. '''
if self.endpoint_mask and len(kwargs) > 0: if self.endpoint_mask and len(kwargs) > 0:

View File

@ -25,7 +25,7 @@ function TextArea({
<div <div
className={clsx( className={clsx(
{ {
'flex flex-col flex-grow gap-2': !dense, 'flex flex-col gap-2': !dense,
'flex flex-grow items-center gap-3': dense 'flex flex-grow items-center gap-3': dense
}, },
dense && className dense && className

View File

@ -1,7 +1,5 @@
import { FolderTree } from './FolderTree'; import { FolderTree } from './FolderTree';
// TODO: test FolderNode and FolderTree exhaustively
describe('Testing Tree construction', () => { describe('Testing Tree construction', () => {
test('empty Tree should be empty', () => { test('empty Tree should be empty', () => {
const tree = new FolderTree(); const tree = new FolderTree();

View File

@ -152,14 +152,12 @@ export interface ICstMovetoData extends IConstituentaList {
/** /**
* Represents data, used in updating persistent attributes in {@link IConstituenta}. * Represents data, used in updating persistent attributes in {@link IConstituenta}.
*/ */
export interface ICstUpdateData export interface ICstUpdateData {
extends Pick<IConstituentaMeta, 'id'>, target: ConstituentaID;
Partial< item_data: Partial<
Pick< Pick<IConstituentaMeta, 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'>
IConstituentaMeta, >;
'alias' | 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms' }
>
> {}
/** /**
* Represents data, used in renaming {@link IConstituenta}. * Represents data, used in renaming {@link IConstituenta}.

View File

@ -55,7 +55,6 @@ function FormConstituenta({
}: FormConstituentaProps) { }: FormConstituentaProps) {
const { schema, cstUpdate, processing } = useRSForm(); const { schema, cstUpdate, processing } = useRSForm();
const [alias, setAlias] = useState('');
const [term, setTerm] = useState(''); const [term, setTerm] = useState('');
const [textDefinition, setTextDefinition] = useState(''); const [textDefinition, setTextDefinition] = useState('');
const [expression, setExpression] = useState(''); const [expression, setExpression] = useState('');
@ -98,7 +97,6 @@ function FormConstituenta({
useLayoutEffect(() => { useLayoutEffect(() => {
if (state) { if (state) {
setAlias(state.alias);
setConvention(state.convention || ''); setConvention(state.convention || '');
setTerm(state.term_raw || ''); setTerm(state.term_raw || '');
setTextDefinition(state.definition_raw || ''); setTextDefinition(state.definition_raw || '');
@ -116,13 +114,21 @@ function FormConstituenta({
return; return;
} }
const data: ICstUpdateData = { const data: ICstUpdateData = {
id: state.id, target: state.id,
alias: alias, item_data: {}
term_raw: term,
definition_formal: expression,
definition_raw: textDefinition,
convention: convention
}; };
if (state.term_raw !== term) {
data.item_data.term_raw = term;
}
if (state.definition_formal !== expression) {
data.item_data.definition_formal = expression;
}
if (state.definition_raw !== textDefinition) {
data.item_data.definition_raw = textDefinition;
}
if (state.convention !== convention) {
data.item_data.convention = convention;
}
cstUpdate(data, () => toast.success(information.changesSaved)); cstUpdate(data, () => toast.success(information.changesSaved));
} }
@ -219,7 +225,7 @@ function FormConstituenta({
onChange={event => setConvention(event.target.value)} onChange={event => setConvention(event.target.value)}
/> />
</AnimateFade> </AnimateFade>
{!showConvention && (!disabled || processing) ? ( <AnimateFade key='cst_convention_button' hideContent={showConvention || (disabled && !processing)}>
<button <button
key='cst_disable_comment' key='cst_disable_comment'
id='cst_disable_comment' id='cst_disable_comment'
@ -230,7 +236,7 @@ function FormConstituenta({
> >
Добавить комментарий Добавить комментарий
</button> </button>
) : null} </AnimateFade>
{!disabled || processing ? ( {!disabled || processing ? (
<div className='self-center flex'> <div className='self-center flex'>

View File

@ -323,8 +323,8 @@ export const RSEditState = ({
return; return;
} }
const data: ICstUpdateData = { const data: ICstUpdateData = {
id: activeCst.id, target: activeCst.id,
term_forms: forms item_data: { term_forms: forms }
}; };
model.cstUpdate(data, () => toast.success(information.changesSaved)); model.cstUpdate(data, () => toast.success(information.changesSaved));
}, },

View File

@ -57,7 +57,7 @@ function ViewConstituents({ expression, schema, activeCst, isBottom, onOpenEdit
className={clsx( className={clsx(
'border overflow-visible', // prettier: split-lines 'border overflow-visible', // prettier: split-lines
{ {
'mt-[2.2rem] rounded-l-md rounded-r-none': !isBottom, 'mt-[2.2rem] rounded-l-md rounded-r-none h-fit': !isBottom,
'mt-3 mx-6 rounded-md md:w-[45.8rem]': isBottom 'mt-3 mx-6 rounded-md md:w-[45.8rem]': isBottom
} }
)} )}