F: Improve attribution change propagation

This commit is contained in:
Ivan 2025-11-10 21:01:59 +03:00
parent 6489af44d5
commit 11a11bb558
6 changed files with 228 additions and 33 deletions

View File

@ -1,6 +1,8 @@
''' Models: Synthesis Inheritance. '''
from django.db.models import CASCADE, ForeignKey, Model
from .Substitution import Substitution
class Inheritance(Model):
''' Inheritance links parent and child constituents in synthesis operation.'''
@ -32,3 +34,32 @@ class Inheritance(Model):
def __str__(self) -> str:
return f'{self.parent} -> {self.child}'
@staticmethod
def check_share_origin(cst1: int, cst2: int) -> bool:
''' Check if two constituents share origin. '''
inheritance1 = Inheritance.objects.filter(child_id=cst1).first()
if not inheritance1:
return False
inheritance2 = Inheritance.objects.filter(child_id=cst2).first()
if not inheritance2:
return False
parent1 = inheritance1.parent
parent2 = inheritance2.parent
origins1 = list(
Substitution.objects.filter(
substitution=parent1).values_list(
'original__schema_id',
flat=True))
origins1.append(parent1.schema_id)
origins2 = list(
Substitution.objects.filter(
substitution=parent2).values_list(
'original__schema_id',
flat=True))
origins2.append(parent2.schema_id)
return any(x in origins1 for x in origins2)

View File

@ -95,6 +95,16 @@ class OssCache:
return self.get_inheritor(sub.substitution_id, operation)
return self.get_inheritor(parent_cst, operation)
def get_substitution_partners(self, cst: int, operation: int) -> list[int]:
''' Get originals or substitutes for target constituent in target operation. '''
result = []
for sub in self.substitutions[operation]:
if sub.original_id == cst:
result.append(sub.substitution_id)
elif sub.substitution_id == cst:
result.append(sub.original_id)
return result
def insert_argument(self, argument: Argument) -> None:
''' Insert new argument. '''
self.graph.add_edge(argument.argument_id, argument.operation_id)

View File

@ -205,16 +205,28 @@ class PropagationEngine:
new_attribute = self.cache.get_successor(attr.attribute_id, child_id)
if new_container is None or new_attribute is None:
continue
deleted_assoc = Attribution.objects.filter(
deleted_attr = Attribution.objects.filter(
container=new_container,
attribute=new_attribute
)
if deleted_assoc.exists():
deleted.append(deleted_assoc[0])
).first()
if not deleted_attr:
continue
if not self._has_alternative_attribution(child_id, attr.container_id, attr.attribute_id):
deleted.append(deleted_attr)
if deleted:
self.on_delete_attribution(child_id, deleted)
Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete()
def _has_alternative_attribution(self, operationID: int, container: int, attribute: int) -> bool:
''' Check if there is an alternative attribution among substitutions. '''
container_partners = self.cache.get_substitution_partners(container, operationID)
attribute_partners = self.cache.get_substitution_partners(attribute, operationID)
if not container_partners or not attribute_partners:
return False
return Attribution.objects.filter(container__in=container_partners, attribute__in=attribute_partners).exists()
def on_delete_inherited(self, operationID: int, target: list[int]) -> None:
''' Trigger cascade resolutions when Constituenta inheritance is deleted. '''
children = self.cache.extend_graph.outputs[operationID]

View File

@ -1,6 +1,6 @@
''' Testing API: Change constituents in OSS. '''
from apps.oss.models import OperationSchema, OperationType
from apps.oss.models import OperationSchema, OperationType, PropagationFacade
from apps.rsform.models import Attribution, Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
@ -95,14 +95,14 @@ class TestChangeConstituents(EndpointTester):
])
@decl_endpoint('/api/rsforms/{schema}/create-cst', method='post')
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
def test_create_constituenta(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'definition_formal': 'X4 = X5'
}
response = self.executeCreated(data, schema=self.ks1.model.pk)
response = self.executeCreated(data, item=self.ks1.model.pk)
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
self.assertEqual(self.ks1.constituentsQ().count(), 3)
@ -112,7 +112,7 @@ class TestChangeConstituents(EndpointTester):
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
@decl_endpoint('/api/rsforms/{schema}/update-cst', method='patch')
@decl_endpoint('/api/rsforms/{item}/update-cst', method='patch')
def test_update_constituenta(self):
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_raw='@{X1|sing,nomn}')
data = {
@ -125,7 +125,7 @@ class TestChangeConstituents(EndpointTester):
'crucial': True,
}
}
self.executeOK(data, schema=self.ks1.model.pk)
self.executeOK(data, item=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
d2.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
@ -142,10 +142,10 @@ class TestChangeConstituents(EndpointTester):
self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}')
@decl_endpoint('/api/rsforms/{schema}/delete-multiple-cst', method='patch')
@decl_endpoint('/api/rsforms/{item}/delete-multiple-cst', method='patch')
def test_delete_constituenta(self):
data = {'items': [self.ks2X1.pk]}
self.executeOK(data, schema=self.ks2.model.pk)
self.executeOK(data, item=self.ks2.model.pk)
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
self.ks2D1.refresh_from_db()
self.assertEqual(self.ks2.constituentsQ().count(), 1)
@ -154,14 +154,14 @@ class TestChangeConstituents(EndpointTester):
self.assertEqual(inherited_cst.definition_formal, r'DEL\DEL')
@decl_endpoint('/api/rsforms/{schema}/substitute', method='patch')
@decl_endpoint('/api/rsforms/{item}/substitute', method='patch')
def test_substitute(self):
d2 = self.ks3.insert_last('D2', cst_type=CstType.TERM, definition_formal=r'X1\X2\X3')
data = {'substitutions': [{
'original': self.ks1X1.pk,
'substitution': self.ks1X2.pk
}]}
self.executeOK(data, schema=self.ks1.model.pk)
self.executeOK(data, item=self.ks1.model.pk)
self.ks1X2.refresh_from_db()
d2.refresh_from_db()
self.assertEqual(self.ks1.constituentsQ().count(), 1)
@ -170,16 +170,25 @@ class TestChangeConstituents(EndpointTester):
self.assertEqual(d2.definition_formal, r'X2\X2\X3')
@decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post')
@decl_endpoint('/api/rsforms/{item}/create-attribution', method='post')
def test_create_attribution(self):
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeCreated(data, schema=self.ks1.model.pk)
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
data = {'container': x1_child.pk, 'attribute': x2_child.pk}
self.executeBadData(data, item=self.ks3.model.pk)
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeCreated(data, item=self.ks1.model.pk)
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
data = {'container': x1_child.pk, 'attribute': ks2x1_child.pk}
self.executeCreated(data, item=self.ks3.model.pk)
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=ks2x1_child).exists())
@decl_endpoint('/api/rsforms/{schema}/create-attribution', method='post')
@decl_endpoint('/api/rsforms/{item}/create-attribution', method='post')
def test_create_attribution_substitution(self):
self.operation3.result.delete()
self.owned.set_substitutions(self.operation3.pk, [{
@ -189,9 +198,120 @@ class TestChangeConstituents(EndpointTester):
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
ks2d1_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
data = {'container': ks1x2_child.pk, 'attribute': ks2x1_child.pk}
self.executeBadData(data, item=self.ks3.model.pk)
data = {'container': ks2d1_child.pk, 'attribute': ks2x1_child.pk}
self.executeBadData(data, item=self.ks3.model.pk)
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeCreated(data, schema=self.ks1.model.pk)
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
self.executeCreated(data, item=self.ks1.model.pk)
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertTrue(Attribution.objects.filter(container=ks2x1_child, attribute=ks1x2_child).exists())
self.executeBadData(data, item=self.ks1.model.pk)
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
def test_delete_attribution(self):
attr = Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
PropagationFacade().after_create_attribution(self.ks1.model.pk, [attr])
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
data = {'container': x1_child.pk, 'attribute': x2_child.pk}
self.executeBadData(data, item=self.ks3.model.pk)
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeOK(data, item=self.ks1.model.pk)
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.executeBadData(data, item=self.ks3.model.pk)
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
def test_delete_attribution_diamond_right(self):
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1)
self.operation3.result.delete()
self.owned.set_substitutions(self.operation3.pk, [{
'original': self.ks1X1,
'substitution': self.ks2X1
}, {
'original': self.ks1X2,
'substitution': self.ks2D1
}])
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk}
self.executeOK(data, item=self.ks2.model.pk)
self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeOK(data, item=self.ks1.model.pk)
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
@decl_endpoint('/api/rsforms/{item}/delete-attribution', method='patch')
def test_delete_attribution_diamond_left(self):
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
Attribution.objects.create(container=self.ks2X1, attribute=self.ks2D1)
self.operation3.result.delete()
self.owned.set_substitutions(self.operation3.pk, [{
'original': self.ks1X1,
'substitution': self.ks2X1
}, {
'original': self.ks1X2,
'substitution': self.ks2D1
}])
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
x2_child = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
data = {'container': self.ks1X1.pk, 'attribute': self.ks1X2.pk}
self.executeOK(data, item=self.ks1.model.pk)
self.assertTrue(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertTrue(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
data = {'container': self.ks2X1.pk, 'attribute': self.ks2D1.pk}
self.executeOK(data, item=self.ks2.model.pk)
self.assertFalse(Attribution.objects.filter(container=self.ks2X1, attribute=self.ks2D1).exists())
self.assertFalse(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertFalse(Attribution.objects.filter(container=x1_child, attribute=x2_child).exists())
@decl_endpoint('/api/rsforms/{item}/clear-attributions', method='patch')
def test_clear_attributions(self):
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
self.operation3.result.delete()
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
ks1x1_child = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
ks1x2_child = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
ks2x1_child = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
Attribution.objects.create(container=ks1x1_child, attribute=ks2x1_child)
data = {'target': ks1x1_child.pk}
self.executeOK(data, item=self.ks3.model.pk)
self.assertTrue(Attribution.objects.filter(container=self.ks1X1, attribute=self.ks1X2).exists())
self.assertTrue(Attribution.objects.filter(container=ks1x1_child, attribute=ks1x2_child).exists())
self.assertFalse(Attribution.objects.filter(container=ks1x1_child, attribute=ks2x1_child).exists())

View File

@ -16,7 +16,7 @@ from rest_framework.serializers import ValidationError
from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead
from apps.library.serializers import LibraryItemSerializer
from apps.oss.models import PropagationFacade
from apps.oss.models import Inheritance, PropagationFacade
from apps.users.models import User
from shared import messages as msg
from shared import permissions, utility
@ -308,6 +308,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
attribute = serializer.validated_data['attribute']
with transaction.atomic():
if Inheritance.check_share_origin(container.pk, attribute.pk):
raise ValidationError({
'container': msg.deleteInheritedAttribution()
})
new_attribution = m.Attribution.objects.create(
container=container,
attribute=attribute
@ -321,7 +326,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
)
@extend_schema(
summary='delete Association',
summary='delete Attribution',
tags=['RSForm'],
request=s.AttributionDataSerializer,
responses={
@ -339,17 +344,22 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer.is_valid(raise_exception=True)
with transaction.atomic():
target = list(m.Attribution.objects.filter(
target_query = m.Attribution.objects.filter(
container=serializer.validated_data['container'],
attribute=serializer.validated_data['attribute']
))
if not target:
)
attr = target_query.first()
if not attr:
raise ValidationError({
'container': msg.invalidAssociation()
'container': msg.missingAttribution()
})
if Inheritance.check_share_origin(request.data['container'], request.data['attribute']):
raise ValidationError({
'container': msg.deleteInheritedAttribution()
})
PropagationFacade().before_delete_attribution(item.pk, target)
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete()
PropagationFacade().before_delete_attribution(item.pk, [attr])
attr.delete()
item.save(update_fields=['time_update'])
return Response(
@ -376,10 +386,14 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer.is_valid(raise_exception=True)
with transaction.atomic():
target = list(m.Attribution.objects.filter(container=serializer.validated_data['target']))
if target:
PropagationFacade().before_delete_attribution(item.pk, target)
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in target]).delete()
attributions = list(m.Attribution.objects.filter(container=serializer.validated_data['target']))
to_delete: list[m.Attribution] = []
for attrib in attributions:
if not Inheritance.check_share_origin(attrib.container.pk, attrib.attribute.pk):
to_delete.append(attrib)
if to_delete:
PropagationFacade().before_delete_attribution(item.pk, to_delete)
m.Attribution.objects.filter(pk__in=[attrib.pk for attrib in to_delete]).delete()
item.save(update_fields=['time_update'])
return Response(

View File

@ -150,8 +150,16 @@ def typificationInvalidStr():
return 'Invalid typification string'
def invalidAssociation():
return f'Ассоциация не найдена'
def missingAttribution():
return f'Атрибутирование не найдено'
def deleteInheritedAttribution():
return f'Попытка удалить наследованное атрибутирование'
def createdInheritedAttribution():
return f'Попытка установить атрибутирование между наследниками из одной КС'
def exteorFileVersionNotSupported():