mirror of
https://github.com/IRBorisov/ConceptPortal.git
synced 2025-11-15 17:21:38 +03:00
F: Improve change propagation for attribution
This commit is contained in:
parent
0a34a5ab84
commit
b757fc5ec0
|
|
@ -1,6 +1,8 @@
|
|||
''' Models: Synthesis Inheritance. '''
|
||||
from django.db.models import CASCADE, ForeignKey, Model
|
||||
|
||||
from .Substitution import Substitution
|
||||
|
||||
|
||||
class Inheritance(Model):
|
||||
''' Inheritance links parent and child constituents in synthesis operation.'''
|
||||
|
|
@ -32,3 +34,32 @@ class Inheritance(Model):
|
|||
|
||||
def __str__(self) -> str:
|
||||
return f'{self.parent} -> {self.child}'
|
||||
|
||||
@staticmethod
|
||||
def check_share_origin(cst1: int, cst2: int) -> bool:
|
||||
''' Check if two constituents share origin. '''
|
||||
inheritance1 = Inheritance.objects.filter(child_id=cst1).first()
|
||||
if not inheritance1:
|
||||
return False
|
||||
inheritance2 = Inheritance.objects.filter(child_id=cst2).first()
|
||||
if not inheritance2:
|
||||
return False
|
||||
|
||||
parent1 = inheritance1.parent
|
||||
parent2 = inheritance2.parent
|
||||
|
||||
origins1 = list(
|
||||
Substitution.objects.filter(
|
||||
substitution=parent1).values_list(
|
||||
'original__schema_id',
|
||||
flat=True))
|
||||
origins1.append(parent1.schema_id)
|
||||
|
||||
origins2 = list(
|
||||
Substitution.objects.filter(
|
||||
substitution=parent2).values_list(
|
||||
'original__schema_id',
|
||||
flat=True))
|
||||
origins2.append(parent2.schema_id)
|
||||
|
||||
return any(x in origins1 for x in origins2)
|
||||
|
|
|
|||
|
|
@ -95,6 +95,16 @@ class OssCache:
|
|||
return self.get_inheritor(sub.substitution_id, operation)
|
||||
return self.get_inheritor(parent_cst, operation)
|
||||
|
||||
def get_substitution_partners(self, cst: int, operation: int) -> list[int]:
|
||||
''' Get originals or substitutes for target constituent in target operation. '''
|
||||
result = []
|
||||
for sub in self.substitutions[operation]:
|
||||
if sub.original_id == cst:
|
||||
result.append(sub.substitution_id)
|
||||
elif sub.substitution_id == cst:
|
||||
result.append(sub.original_id)
|
||||
return result
|
||||
|
||||
def insert_argument(self, argument: Argument) -> None:
|
||||
''' Insert new argument. '''
|
||||
self.graph.add_edge(argument.argument_id, argument.operation_id)
|
||||
|
|
|
|||
|
|
@ -205,16 +205,28 @@ class PropagationEngine:
|
|||
new_attribute = self.cache.get_successor(attr.attribute_id, child_id)
|
||||
if new_container is None or new_attribute is None:
|
||||
continue
|
||||
deleted_assoc = Attribution.objects.filter(
|
||||
deleted_attr = Attribution.objects.filter(
|
||||
container=new_container,
|
||||
attribute=new_attribute
|
||||
)
|
||||
if deleted_assoc.exists():
|
||||
deleted.append(deleted_assoc[0])
|
||||
).first()
|
||||
if not deleted_attr:
|
||||
continue
|
||||
|
||||
if not self._has_alternative_attribution(child_id, attr.container_id, attr.attribute_id):
|
||||
deleted.append(deleted_attr)
|
||||
|
||||
if deleted:
|
||||
self.on_delete_attribution(child_id, deleted)
|
||||
Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete()
|
||||
|
||||
def _has_alternative_attribution(self, operationID: int, container: int, attribute: int) -> bool:
|
||||
''' Check if there is an alternative attribution among substitutions. '''
|
||||
container_partners = self.cache.get_substitution_partners(container, operationID)
|
||||
attribute_partners = self.cache.get_substitution_partners(attribute, operationID)
|
||||
if not container_partners or not attribute_partners:
|
||||
return False
|
||||
return Attribution.objects.filter(container__in=container_partners, attribute__in=attribute_partners).exists()
|
||||
|
||||
def on_delete_inherited(self, operationID: int, target: list[int]) -> None:
|
||||
''' Trigger cascade resolutions when Constituenta inheritance is deleted. '''
|
||||
children = self.cache.extend_graph.outputs[operationID]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
''' Testing API: Change constituents in OSS. '''
|
||||
|
||||
from apps.oss.models import OperationSchema, OperationType
|
||||
from apps.oss.models import OperationSchema, OperationType, PropagationFacade
|
||||
from apps.rsform.models import Attribution, Constituenta, CstType, RSForm
|
||||
from shared.EndpointTester import EndpointTester, decl_endpoint
|
||||
|
||||
|
|
@ -95,14 +95,14 @@ class TestChangeConstituents(EndpointTester):
|
|||
])
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/create-cst', method='post')
|
||||
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
|
||||
def test_create_constituenta(self):
|
||||
data = {
|
||||
'alias': 'X3',
|
||||
'cst_type': CstType.BASE,
|
||||
'definition_formal': 'X4 = X5'
|
||||
}
|
||||
response = self.executeCreated(data, schema=self.ks1.model.pk)
|
||||
response = self.executeCreated(data, item=self.ks1.model.pk)
|
||||
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
|
||||
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
|
||||
self.assertEqual(self.ks1.constituentsQ().count(), 3)
|
||||
|
|
@ -112,7 +112,7 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
|
||||
@decl_endpoint('/api/rsforms/{item}/update-cst', method='patch')
|
||||
def test_update_constituenta(self):
|
||||
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
|
||||
data = {
|
||||
|
|
@ -125,7 +125,7 @@ class TestChangeConstituents(EndpointTester):
|
|||
'crucial': True,
|
||||
}
|
||||
}
|
||||
self.executeOK(data, schema=self.ks1.model.pk)
|
||||
self.executeOK(data, item=self.ks1.model.pk)
|
||||
self.ks1X1.refresh_from_db()
|
||||
d2.refresh_from_db()
|
||||
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
|
||||
|
|
@ -142,10 +142,10 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}')
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/delete-multiple-cst', method='patch')
|
||||
@decl_endpoint('/api/rsforms/{item}/delete-multiple-cst', method='patch')
|
||||
def test_delete_constituenta(self):
|
||||
data = {'items': [self.ks2X1.pk]}
|
||||
self.executeOK(data, schema=self.ks2.model.pk)
|
||||
self.executeOK(data, item=self.ks2.model.pk)
|
||||
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
|
||||
self.ks2D1.refresh_from_db()
|
||||
self.assertEqual(self.ks2.constituentsQ().count(), 1)
|
||||
|
|
@ -154,14 +154,14 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.assertEqual(inherited_cst.definition_formal, r'DEL\DEL')
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/substitute', method='patch')
|
||||
@decl_endpoint('/api/rsforms/{item}/substitute', method='patch')
|
||||
def test_substitute(self):
|
||||
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_formal=r'X1\X2\X3')
|
||||
data = {'substitutions': [{
|
||||
'original': self.ks1X1.pk,
|
||||
'substitution': self.ks1X2.pk
|
||||
}]}
|
||||
self.executeOK(data, schema=self.ks1.model.pk)
|
||||
self.executeOK(data, item=self.ks1.model.pk)
|
||||
self.ks1X2.refresh_from_db()
|
||||
d2.refresh_from_db()
|
||||
self.assertEqual(self.ks1.constituentsQ().count(), 1)
|
||||
|
|
@ -170,16 +170,25 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.assertEqual(d2.definition_formal, r'X2\X2\X3')
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post')
|
||||
@decl_endpoint('/api/rsforms/{item}/create-attribution', method='post')
|
||||
def test_create_attribution(self):
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeCreated(data, schema=self.ks1.model.pk)
|
||||
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
|
||||
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
|
||||
|
||||
data = {'container': x1_child.pk, 'attribute': x2_child.pk}
|
||||
self.executeBadData(data, item=self.ks3.model.pk)
|
||||
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeCreated(data, item=self.ks1.model.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
data = {'container': x1_child.pk, 'attribute': ks2x1_child.pk}
|
||||
self.executeCreated(data, item=self.ks3.model.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=ks2x1_child).exists())
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post')
|
||||
|
||||
@decl_endpoint('/api/rsforms/{item}/create-attribution', method='post')
|
||||
def test_create_attribution_substitution(self):
|
||||
self.operation3.result.delete()
|
||||
self.owned.set_substitutions(self.operation3.pk, [{
|
||||
|
|
@ -189,9 +198,120 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.owned.execute_operation(self.operation3)
|
||||
self.operation3.refresh_from_db()
|
||||
self.ks3 = RSForm(self.operation3.result)
|
||||
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
|
||||
ks2d1_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
|
||||
|
||||
data = {'container': ks1x2_child.pk, 'attribute': ks2x1_child.pk}
|
||||
self.executeBadData(data, item=self.ks3.model.pk)
|
||||
|
||||
data = {'container': ks2d1_child.pk, 'attribute': ks2x1_child.pk}
|
||||
self.executeBadData(data, item=self.ks3.model.pk)
|
||||
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeCreated(data, schema=self.ks1.model.pk)
|
||||
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
self.executeCreated(data, item=self.ks1.model.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertTrue(Attribution.objects.filter(container=ks2x1_child, attribute=ks1x2_child).exists())
|
||||
|
||||
self.executeBadData(data, item=self.ks1.model.pk)
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
|
||||
def test_delete_attribution(self):
|
||||
attr = Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
|
||||
PropagationFacade().after_create_attribution(self.ks1.model.pk, [attr])
|
||||
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
|
||||
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
data = {'container': x1_child.pk, 'attribute': x2_child.pk}
|
||||
self.executeBadData(data, item=self.ks3.model.pk)
|
||||
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeOK(data, item=self.ks1.model.pk)
|
||||
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
|
||||
self.executeBadData(data, item=self.ks3.model.pk)
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
|
||||
def test_delete_attribution_diamond_right(self):
|
||||
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
|
||||
Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1)
|
||||
self.operation3.result.delete()
|
||||
self.owned.set_substitutions(self.operation3.pk, [{
|
||||
'original': self.ks1X1,
|
||||
'substitution': self.ks2X1
|
||||
}, {
|
||||
'original': self.ks1X2,
|
||||
'substitution': self.ks2D1
|
||||
}])
|
||||
self.owned.execute_operation(self.operation3)
|
||||
self.operation3.refresh_from_db()
|
||||
self.ks3 = RSForm(self.operation3.result)
|
||||
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk}
|
||||
self.executeOK(data, item=self.ks2.model.pk)
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
|
||||
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeOK(data, item=self.ks1.model.pk)
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
|
||||
def test_delete_attribution_diamond_left(self):
|
||||
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
|
||||
Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1)
|
||||
self.operation3.result.delete()
|
||||
self.owned.set_substitutions(self.operation3.pk, [{
|
||||
'original': self.ks1X1,
|
||||
'substitution': self.ks2X1
|
||||
}, {
|
||||
'original': self.ks1X2,
|
||||
'substitution': self.ks2D1
|
||||
}])
|
||||
self.owned.execute_operation(self.operation3)
|
||||
self.operation3.refresh_from_db()
|
||||
self.ks3 = RSForm(self.operation3.result)
|
||||
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
|
||||
self.executeOK(data, item=self.ks1.model.pk)
|
||||
self.assertTrue(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk}
|
||||
self.executeOK(data, item=self.ks2.model.pk)
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{item}/clear-attributions', method='patch')
|
||||
def test_clear_attributions(self):
|
||||
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
|
||||
self.operation3.result.delete()
|
||||
self.owned.execute_operation(self.operation3)
|
||||
self.operation3.refresh_from_db()
|
||||
self.ks3 = RSForm(self.operation3.result)
|
||||
ks1x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
|
||||
ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
|
||||
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
|
||||
Attribution.objects.create(container=ks1x1_child, attribute=ks2x1_child)
|
||||
data = {'target': ks1x1_child.pk}
|
||||
self.executeOK(data, item=self.ks3.model.pk)
|
||||
|
||||
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
|
||||
self.assertTrue(Attribution.objects.filter(container=ks1x1_child, attribute=ks1x2_child).exists())
|
||||
self.assertFalse(Attribution.objects.filter(container=ks1x1_child, attribute=ks2x1_child).exists())
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ from rest_framework.serializers import ValidationError
|
|||
|
||||
from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead
|
||||
from apps.library.serializers import LibraryItemSerializer
|
||||
from apps.oss.models import PropagationFacade
|
||||
from apps.oss.models import Inheritance, PropagationFacade
|
||||
from apps.users.models import User
|
||||
from shared import messages as msg
|
||||
from shared import permissions, utility
|
||||
|
|
@ -308,6 +308,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
attribute = serializer.validated_data['attribute']
|
||||
|
||||
with transaction.atomic():
|
||||
if Inheritance.check_share_origin(container.pk, attribute.pk):
|
||||
raise ValidationError({
|
||||
'container': msg.deleteInheritedAttribution()
|
||||
})
|
||||
|
||||
new_attribution = m.Attribution.objects.create(
|
||||
container=container,
|
||||
attribute=attribute
|
||||
|
|
@ -321,7 +326,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
)
|
||||
|
||||
@extend_schema(
|
||||
summary='delete Association',
|
||||
summary='delete Attribution',
|
||||
tags=['RSForm'],
|
||||
request=s.AttributionDataSerializer,
|
||||
responses={
|
||||
|
|
@ -339,17 +344,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
with transaction.atomic():
|
||||
target = list(m.Attribution.objects.filter(
|
||||
target_query = m.Attribution.objects.filter(
|
||||
container=serializer.validated_data['container'],
|
||||
attribute=serializer.validated_data['attribute']
|
||||
))
|
||||
if not target:
|
||||
)
|
||||
attr = target_query.first()
|
||||
if not attr:
|
||||
raise ValidationError({
|
||||
'container': msg.invalidAssociation()
|
||||
'container': msg.missingAttribution()
|
||||
})
|
||||
if Inheritance.check_share_origin(request.data['container'], request.data['attribute']):
|
||||
raise ValidationError({
|
||||
'container': msg.deleteInheritedAttribution()
|
||||
})
|
||||
|
||||
PropagationFacade().before_delete_attribution(item.pk, target)
|
||||
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete()
|
||||
PropagationFacade().before_delete_attribution(item.pk, [attr])
|
||||
attr.delete()
|
||||
item.save(update_fields=['time_update'])
|
||||
|
||||
return Response(
|
||||
|
|
@ -376,10 +386,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
with transaction.atomic():
|
||||
target = list(m.Attribution.objects.filter(container=serializer.validated_data['target']))
|
||||
if target:
|
||||
PropagationFacade().before_delete_attribution(item.pk, target)
|
||||
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete()
|
||||
attributions = list(m.Attribution.objects.filter(container=serializer.validated_data['target']))
|
||||
to_delete: list[m.Attribution] = []
|
||||
for attrib in attributions:
|
||||
if not Inheritance.check_share_origin(attrib.container.pk, attrib.attribute.pk):
|
||||
to_delete.append(attrib)
|
||||
if to_delete:
|
||||
PropagationFacade().before_delete_attribution(item.pk, to_delete)
|
||||
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in to_delete]).delete()
|
||||
item.save(update_fields=['time_update'])
|
||||
|
||||
return Response(
|
||||
|
|
|
|||
|
|
@ -150,8 +150,16 @@ def typificationInvalidStr():
|
|||
return 'Invalid typification string'
|
||||
|
||||
|
||||
def invalidAssociation():
|
||||
return f'Ассоциация не найдена'
|
||||
def missingAttribution():
|
||||
return f'Атрибутирование не найдено'
|
||||
|
||||
|
||||
def deleteInheritedAttribution():
|
||||
return f'Попытка удалить наследованное атрибутирование'
|
||||
|
||||
|
||||
def createdInheritedAttribution():
|
||||
return f'Попытка установить атрибутирование между наследниками из одной КС'
|
||||
|
||||
|
||||
def exteorFileVersionNotSupported():
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user