diff --git a/rsconcept/backend/apps/oss/models/Inheritance.py b/rsconcept/backend/apps/oss/models/Inheritance.py index 761fe58f..cfd27718 100644 --- a/rsconcept/backend/apps/oss/models/Inheritance.py +++ b/rsconcept/backend/apps/oss/models/Inheritance.py @@ -1,6 +1,8 @@ ''' Models: Synthesis Inheritance. ''' from django.db.models import CASCADE, ForeignKey, Model +from .Substitution import Substitution + class Inheritance(Model): ''' Inheritance links parent and child constituents in synthesis operation.''' @@ -32,3 +34,32 @@ class Inheritance(Model): def __str__(self) -> str: return f'{self.parent} -> {self.child}' + + @staticmethod + def check_share_origin(cst1: int, cst2: int) -> bool: + ''' Check if two constituents share origin. ''' + inheritance1 = Inheritance.objects.filter(child_id=cst1).first() + if not inheritance1: + return False + inheritance2 = Inheritance.objects.filter(child_id=cst2).first() + if not inheritance2: + return False + + parent1 = inheritance1.parent + parent2 = inheritance2.parent + + origins1 = list( + Substitution.objects.filter( + substitution=parent1).values_list( + 'original__schema_id', + flat=True)) + origins1.append(parent1.schema_id) + + origins2 = list( + Substitution.objects.filter( + substitution=parent2).values_list( + 'original__schema_id', + flat=True)) + origins2.append(parent2.schema_id) + + return any(x in origins1 for x in origins2) diff --git a/rsconcept/backend/apps/oss/models/OssCache.py b/rsconcept/backend/apps/oss/models/OssCache.py index 8ce62845..2e39f632 100644 --- a/rsconcept/backend/apps/oss/models/OssCache.py +++ b/rsconcept/backend/apps/oss/models/OssCache.py @@ -95,6 +95,16 @@ class OssCache: return self.get_inheritor(sub.substitution_id, operation) return self.get_inheritor(parent_cst, operation) + def get_substitution_partners(self, cst: int, operation: int) -> list[int]: + ''' Get originals or substitutes for target constituent in target operation. ''' + result = [] + for sub in self.substitutions[operation]: + if sub.original_id == cst: + result.append(sub.substitution_id) + elif sub.substitution_id == cst: + result.append(sub.original_id) + return result + def insert_argument(self, argument: Argument) -> None: ''' Insert new argument. ''' self.graph.add_edge(argument.argument_id, argument.operation_id) diff --git a/rsconcept/backend/apps/oss/models/PropagationEngine.py b/rsconcept/backend/apps/oss/models/PropagationEngine.py index 79af2863..645b6c5f 100644 --- a/rsconcept/backend/apps/oss/models/PropagationEngine.py +++ b/rsconcept/backend/apps/oss/models/PropagationEngine.py @@ -205,16 +205,28 @@ class PropagationEngine: new_attribute = self.cache.get_successor(attr.attribute_id, child_id) if new_container is None or new_attribute is None: continue - deleted_assoc = Attribution.objects.filter( + deleted_attr = Attribution.objects.filter( container=new_container, attribute=new_attribute - ) - if deleted_assoc.exists(): - deleted.append(deleted_assoc[0]) + ).first() + if not deleted_attr: + continue + + if not self._has_alternative_attribution(child_id, attr.container_id, attr.attribute_id): + deleted.append(deleted_attr) + if deleted: self.on_delete_attribution(child_id, deleted) Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete() + def _has_alternative_attribution(self, operationID: int, container: int, attribute: int) -> bool: + ''' Check if there is an alternative attribution among substitutions. ''' + container_partners = self.cache.get_substitution_partners(container, operationID) + attribute_partners = self.cache.get_substitution_partners(attribute, operationID) + if not container_partners or not attribute_partners: + return False + return Attribution.objects.filter(container__in=container_partners, attribute__in=attribute_partners).exists() + def on_delete_inherited(self, operationID: int, target: list[int]) -> None: ''' Trigger cascade resolutions when Constituenta inheritance is deleted. ''' children = self.cache.extend_graph.outputs[operationID] diff --git a/rsconcept/backend/apps/oss/tests/s_propagation/t_constituents.py b/rsconcept/backend/apps/oss/tests/s_propagation/t_constituents.py index 79996245..84732cf0 100644 --- a/rsconcept/backend/apps/oss/tests/s_propagation/t_constituents.py +++ b/rsconcept/backend/apps/oss/tests/s_propagation/t_constituents.py @@ -1,6 +1,6 @@ ''' Testing API: Change constituents in OSS. ''' -from apps.oss.models import OperationSchema, OperationType +from apps.oss.models import OperationSchema, OperationType, PropagationFacade from apps.rsform.models import Attribution, Constituenta, CstType, RSForm from shared.EndpointTester import EndpointTester, decl_endpoint @@ -95,14 +95,14 @@ class TestChangeConstituents(EndpointTester): ]) - @decl_endpoint('/api/rsforms/{schema}/create-cst', method='post') + @decl_endpoint('/api/rsforms/{item}/create-cst', method='post') def test_create_constituenta(self): data = { 'alias': 'X3', 'cst_type': CstType.BASE, 'definition_formal': 'X4 = X5' } - response = self.executeCreated(data, schema=self.ks1.model.pk) + response = self.executeCreated(data, item=self.ks1.model.pk) new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id']) inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk) self.assertEqual(self.ks1.constituentsQ().count(), 3) @@ -112,7 +112,7 @@ class TestChangeConstituents(EndpointTester): self.assertEqual(inherited_cst.definition_formal, 'X1 = X2') - @decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch') + @decl_endpoint('/api/rsforms/{item}/update-cst', method='patch') def test_update_constituenta(self): d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}') data = { @@ -125,7 +125,7 @@ class TestChangeConstituents(EndpointTester): 'crucial': True, } } - self.executeOK(data, schema=self.ks1.model.pk) + self.executeOK(data, item=self.ks1.model.pk) self.ks1X1.refresh_from_db() d2.refresh_from_db() inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk) @@ -142,10 +142,10 @@ class TestChangeConstituents(EndpointTester): self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}') - @decl_endpoint('/api/rsforms/{schema}/delete-multiple-cst', method='patch') + @decl_endpoint('/api/rsforms/{item}/delete-multiple-cst', method='patch') def test_delete_constituenta(self): data = {'items': [self.ks2X1.pk]} - self.executeOK(data, schema=self.ks2.model.pk) + self.executeOK(data, item=self.ks2.model.pk) inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk) self.ks2D1.refresh_from_db() self.assertEqual(self.ks2.constituentsQ().count(), 1) @@ -154,14 +154,14 @@ class TestChangeConstituents(EndpointTester): self.assertEqual(inherited_cst.definition_formal, r'DEL\DEL') - @decl_endpoint('/api/rsforms/{schema}/substitute', method='patch') + @decl_endpoint('/api/rsforms/{item}/substitute', method='patch') def test_substitute(self): d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_formal=r'X1\X2\X3') data = {'substitutions': [{ 'original': self.ks1X1.pk, 'substitution': self.ks1X2.pk }]} - self.executeOK(data, schema=self.ks1.model.pk) + self.executeOK(data, item=self.ks1.model.pk) self.ks1X2.refresh_from_db() d2.refresh_from_db() self.assertEqual(self.ks1.constituentsQ().count(), 1) @@ -170,16 +170,25 @@ class TestChangeConstituents(EndpointTester): self.assertEqual(d2.definition_formal, r'X2\X2\X3') - @decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post') + @decl_endpoint('/api/rsforms/{item}/create-attribution', method='post') def test_create_attribution(self): - data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} - self.executeCreated(data, schema=self.ks1.model.pk) x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk) x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk) + + data = {'container': x1_child.pk, 'attribute': x2_child.pk} + self.executeBadData(data, item=self.ks3.model.pk) + + data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} + self.executeCreated(data, item=self.ks1.model.pk) self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + data = {'container': x1_child.pk, 'attribute': ks2x1_child.pk} + self.executeCreated(data, item=self.ks3.model.pk) + self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=ks2x1_child).exists()) - @decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post') + + @decl_endpoint('/api/rsforms/{item}/create-attribution', method='post') def test_create_attribution_substitution(self): self.operation3.result.delete() self.owned.set_substitutions(self.operation3.pk, [{ @@ -189,9 +198,120 @@ class TestChangeConstituents(EndpointTester): self.owned.execute_operation(self.operation3) self.operation3.refresh_from_db() self.ks3 = RSForm(self.operation3.result) + ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk) + ks2d1_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk) + + data = {'container': ks1x2_child.pk, 'attribute': ks2x1_child.pk} + self.executeBadData(data, item=self.ks3.model.pk) + + data = {'container': ks2d1_child.pk, 'attribute': ks2x1_child.pk} + self.executeBadData(data, item=self.ks3.model.pk) data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} - self.executeCreated(data, schema=self.ks1.model.pk) - x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + self.executeCreated(data, item=self.ks1.model.pk) + self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertTrue(Attribution.objects.filter(container=ks2x1_child, attribute=ks1x2_child).exists()) + + self.executeBadData(data, item=self.ks1.model.pk) + + + @decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch') + def test_delete_attribution(self): + attr = Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2) + PropagationFacade().after_create_attribution(self.ks1.model.pk, [attr]) + x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk) x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk) self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + data = {'container': x1_child.pk, 'attribute': x2_child.pk} + self.executeBadData(data, item=self.ks3.model.pk) + + data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} + self.executeOK(data, item=self.ks1.model.pk) + self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + + self.executeBadData(data, item=self.ks3.model.pk) + + + @decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch') + def test_delete_attribution_diamond_right(self): + Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2) + Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1) + self.operation3.result.delete() + self.owned.set_substitutions(self.operation3.pk, [{ + 'original': self.ks1X1, + 'substitution': self.ks2X1 + }, { + 'original': self.ks1X2, + 'substitution': self.ks2D1 + }]) + self.owned.execute_operation(self.operation3) + self.operation3.refresh_from_db() + self.ks3 = RSForm(self.operation3.result) + x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk) + self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk} + self.executeOK(data, item=self.ks2.model.pk) + self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists()) + self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} + self.executeOK(data, item=self.ks1.model.pk) + self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + + @decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch') + def test_delete_attribution_diamond_left(self): + Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2) + Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1) + self.operation3.result.delete() + self.owned.set_substitutions(self.operation3.pk, [{ + 'original': self.ks1X1, + 'substitution': self.ks2X1 + }, { + 'original': self.ks1X2, + 'substitution': self.ks2D1 + }]) + self.owned.execute_operation(self.operation3) + self.operation3.refresh_from_db() + self.ks3 = RSForm(self.operation3.result) + x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk) + self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk} + self.executeOK(data, item=self.ks1.model.pk) + self.assertTrue(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists()) + self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk} + self.executeOK(data, item=self.ks2.model.pk) + self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists()) + self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists()) + + + @decl_endpoint('/api/rsforms/{item}/clear-attributions', method='patch') + def test_clear_attributions(self): + Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2) + self.operation3.result.delete() + self.owned.execute_operation(self.operation3) + self.operation3.refresh_from_db() + self.ks3 = RSForm(self.operation3.result) + ks1x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk) + ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk) + ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) + Attribution.objects.create(container=ks1x1_child, attribute=ks2x1_child) + data = {'target': ks1x1_child.pk} + self.executeOK(data, item=self.ks3.model.pk) + + self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists()) + self.assertTrue(Attribution.objects.filter(container=ks1x1_child, attribute=ks1x2_child).exists()) + self.assertFalse(Attribution.objects.filter(container=ks1x1_child, attribute=ks2x1_child).exists()) diff --git a/rsconcept/backend/apps/rsform/views/rsforms.py b/rsconcept/backend/apps/rsform/views/rsforms.py index 89af3d05..920c399f 100644 --- a/rsconcept/backend/apps/rsform/views/rsforms.py +++ b/rsconcept/backend/apps/rsform/views/rsforms.py @@ -16,7 +16,7 @@ from rest_framework.serializers import ValidationError from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead from apps.library.serializers import LibraryItemSerializer -from apps.oss.models import PropagationFacade +from apps.oss.models import Inheritance, PropagationFacade from apps.users.models import User from shared import messages as msg from shared import permissions, utility @@ -308,6 +308,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr attribute = serializer.validated_data['attribute'] with transaction.atomic(): + if Inheritance.check_share_origin(container.pk, attribute.pk): + raise ValidationError({ + 'container': msg.deleteInheritedAttribution() + }) + new_attribution = m.Attribution.objects.create( container=container, attribute=attribute @@ -321,7 +326,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr ) @extend_schema( - summary='delete Association', + summary='delete Attribution', tags=['RSForm'], request=s.AttributionDataSerializer, responses={ @@ -339,17 +344,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr serializer.is_valid(raise_exception=True) with transaction.atomic(): - target = list(m.Attribution.objects.filter( + target_query = m.Attribution.objects.filter( container=serializer.validated_data['container'], attribute=serializer.validated_data['attribute'] - )) - if not target: + ) + attr = target_query.first() + if not attr: raise ValidationError({ - 'container': msg.invalidAssociation() + 'container': msg.missingAttribution() + }) + if Inheritance.check_share_origin(request.data['container'], request.data['attribute']): + raise ValidationError({ + 'container': msg.deleteInheritedAttribution() }) - PropagationFacade().before_delete_attribution(item.pk, target) - m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete() + PropagationFacade().before_delete_attribution(item.pk, [attr]) + attr.delete() item.save(update_fields=['time_update']) return Response( @@ -376,10 +386,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr serializer.is_valid(raise_exception=True) with transaction.atomic(): - target = list(m.Attribution.objects.filter(container=serializer.validated_data['target'])) - if target: - PropagationFacade().before_delete_attribution(item.pk, target) - m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete() + attributions = list(m.Attribution.objects.filter(container=serializer.validated_data['target'])) + to_delete: list[m.Attribution] = [] + for attrib in attributions: + if not Inheritance.check_share_origin(attrib.container.pk, attrib.attribute.pk): + to_delete.append(attrib) + if to_delete: + PropagationFacade().before_delete_attribution(item.pk, to_delete) + m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in to_delete]).delete() item.save(update_fields=['time_update']) return Response( diff --git a/rsconcept/backend/shared/messages.py b/rsconcept/backend/shared/messages.py index 10262384..94236151 100644 --- a/rsconcept/backend/shared/messages.py +++ b/rsconcept/backend/shared/messages.py @@ -150,8 +150,16 @@ def typificationInvalidStr(): return 'Invalid typification string' -def invalidAssociation(): - return f'Ассоциация не найдена' +def missingAttribution(): + return f'Атрибутирование не найдено' + + +def deleteInheritedAttribution(): + return f'Попытка удалить наследованное атрибутирование' + + +def createdInheritedAttribution(): + return f'Попытка установить атрибутирование между наследниками из одной КС' def exteorFileVersionNotSupported():