F: Improve attribution propagation on sub change
Some checks failed
Backend CI / build (3.12) (push) Has been cancelled
Frontend CI / build (22.x) (push) Has been cancelled
Backend CI / notify-failure (push) Has been cancelled
Frontend CI / notify-failure (push) Has been cancelled

This commit is contained in:
Ivan 2025-11-12 12:58:25 +03:00
parent 7ed13cccb9
commit 2c3bcd3424
2 changed files with 90 additions and 42 deletions

View File

@ -1,6 +1,7 @@
''' Models: Change propagation engine. '''
from typing import Optional
from django.db.models import Q
from rest_framework.serializers import ValidationError
from apps.rsform.models import Attribution, Constituenta, CstType, RSFormCached
@ -90,9 +91,7 @@ class PropagationEngine:
# pylint: disable=too-many-arguments, too-many-positional-arguments
def on_update_cst(
self,
operation: int,
cst_id: int,
self, operation: int, cst_id: int,
data: dict, old_data: dict,
mapping: CstMapping
) -> None:
@ -128,18 +127,20 @@ class PropagationEngine:
mapping=new_mapping
)
def on_inherit_attribution(self, operationID: int,
items: list[Attribution],
exclude: Optional[list[int]] = None) -> None:
def on_inherit_attribution(
self, operationID: int,
items: list[Attribution],
exclude: Optional[list[int]] = None
) -> None:
''' Trigger cascade resolutions when Attribution is inherited. '''
children = self.cache.extend_graph.outputs[operationID]
if not children:
return
for child_id in children:
if not exclude or child_id not in exclude:
self.inherit_attribution(child_id, items)
self.inherit_attributions(child_id, items)
def inherit_attribution(self, target: int, items: list[Attribution]) -> None:
def inherit_attributions(self, target: int, items: list[Attribution]) -> None:
''' Execute inheritance of Attributions. '''
operation = self.cache.operation_by_id[target]
if operation.result is None or not items:
@ -194,30 +195,33 @@ class PropagationEngine:
return
self.cache.ensure_loaded_subs()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_result(child_operation)
if child_schema is None:
self._delete_child_attributions(child_id, attributions)
def _delete_child_attributions(self, operationID: int, attributions: list[Attribution]) -> None:
child_operation = self.cache.operation_by_id[operationID]
child_schema = self.cache.get_result(child_operation)
if child_schema is None:
return
deleted: list[Attribution] = []
for attr in attributions:
new_container = self.cache.get_successor(attr.container_id, operationID)
new_attribute = self.cache.get_successor(attr.attribute_id, operationID)
if new_container is None or new_attribute is None:
continue
deleted_attr = Attribution.objects.filter(
container=new_container,
attribute=new_attribute
).first()
if not deleted_attr:
continue
deleted: list[Attribution] = []
for attr in attributions:
new_container = self.cache.get_successor(attr.container_id, child_id)
new_attribute = self.cache.get_successor(attr.attribute_id, child_id)
if new_container is None or new_attribute is None:
continue
deleted_attr = Attribution.objects.filter(
container=new_container,
attribute=new_attribute
).first()
if not deleted_attr:
continue
if not self._has_alternative_attribution(operationID, attr.container_id, attr.attribute_id):
deleted.append(deleted_attr)
if not self._has_alternative_attribution(child_id, attr.container_id, attr.attribute_id):
deleted.append(deleted_attr)
if deleted:
self.on_delete_attribution(child_id, deleted)
Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete()
if deleted:
self.on_delete_attribution(operationID, deleted)
Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete()
def _has_alternative_attribution(self, operationID: int, container: int, attribute: int) -> bool:
''' Check if there is an alternative attribution among substitutions. '''
@ -249,7 +253,10 @@ class PropagationEngine:
self.cache.remove_cst(operationID, target_ids)
schema.delete_cst(target_ids)
def undo_substitutions_cst(self, target_ids: list[int], operation: Operation, schema: RSFormCached) -> None:
def undo_substitutions_cst(
self, target_ids: list[int],
operation: Operation, schema: RSFormCached
) -> None:
''' Undo substitutions for Constituents. '''
to_process = []
for sub in self.cache.substitutions[operation.pk]:
@ -259,17 +266,23 @@ class PropagationEngine:
self.undo_substitution(schema, sub, target_ids)
def undo_substitution(
self,
schema: RSFormCached,
target: Substitution,
ignore_parents: Optional[list[int]] = None
self, schema: RSFormCached, target: Substitution,
ignore_parents: Optional[list[int]] = None
) -> None:
''' Undo target substitution. '''
if ignore_parents is None:
ignore_parents = []
operation_id = target.operation_id
original_schema = self.context.get_schema(target.original.schema_id)
original_attributions = list(Attribution.objects.filter(
Q(container=target.original_id) |
Q(attribute=target.original_id)
))
if original_attributions:
self._delete_child_attributions(operation_id, original_attributions)
dependant = []
original_schema = self.context.get_schema(target.original.schema_id)
for cst_id in original_schema.get_dependant([target.original_id]):
if cst_id not in ignore_parents:
inheritor_id = self.cache.get_inheritor(cst_id, operation_id)
@ -284,6 +297,9 @@ class PropagationEngine:
full_cst = Constituenta.objects.get(pk=target.original_id)
cst_mapping = create_dependant_mapping(original_schema, [full_cst])
self.inherit_cst(operation_id, original_schema, [full_cst], cst_mapping)
if original_attributions:
self.inherit_attributions(operation_id, original_attributions)
new_original_id = self.cache.get_inheritor(target.original_id, operation_id)
assert new_original_id is not None
new_original = schema.cache.by_id[new_original_id]
@ -295,6 +311,7 @@ class PropagationEngine:
mapping = {substitution_inheritor.alias: new_original}
self._on_partial_mapping(mapping, dependant, operation_id, schema)
def _determine_insert_position(
self, prototype_id: int,
operation: Operation,
@ -332,8 +349,7 @@ class PropagationEngine:
return result
def _transform_substitutions(
self,
target: CstSubstitution,
self, target: CstSubstitution,
operation: int,
schema: RSFormCached
) -> CstSubstitution:
@ -373,8 +389,7 @@ class PropagationEngine:
return result
def _on_partial_mapping(
self,
mapping: CstMapping,
self, mapping: CstMapping,
target: list[int],
operation: int,
schema: RSFormCached

View File

@ -1,7 +1,7 @@
''' Testing API: Change substitutions in OSS. '''
from apps.oss.models import OperationSchema, OperationType
from apps.rsform.models import Constituenta, RSForm
from apps.rsform.models import Attribution, Constituenta, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
@ -25,6 +25,7 @@ class TestChangeOperations(EndpointTester):
self.ks1X1 = self.ks1.insert_last('X1', convention='KS1X1')
self.ks1X2 = self.ks1.insert_last('X2', convention='KS1X2')
self.ks1D1 = self.ks1.insert_last('D1', definition_formal='X1 X2', convention='KS1D1')
Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2)
self.ks2 = RSForm.create(
alias='KS2',
@ -38,6 +39,7 @@ class TestChangeOperations(EndpointTester):
definition_formal=r'X1',
convention='KS2S1'
)
Attribution.objects.create(container=self.ks2S1, attribute=self.ks2X1)
self.ks3 = RSForm.create(
alias='KS3',
@ -80,6 +82,7 @@ class TestChangeOperations(EndpointTester):
self.operation4.refresh_from_db()
self.ks4 = RSForm(self.operation4.result)
self.ks4X1 = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk)
self.ks4X2 = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk)
self.ks4S1 = Constituenta.objects.get(as_child__parent_id=self.ks2S1.pk)
self.ks4D1 = Constituenta.objects.get(as_child__parent_id=self.ks1D1.pk)
self.ks4D2 = self.ks4.insert_last(
@ -87,6 +90,7 @@ class TestChangeOperations(EndpointTester):
definition_formal=r'X1 X2 X3 S1 D1',
convention='KS4D2'
)
Attribution.objects.create(container=self.ks4S1, attribute=self.ks4D2)
self.operation5 = self.owned.create_operation(
alias='5',
@ -179,8 +183,8 @@ class TestChangeOperations(EndpointTester):
title='Test6',
owner=self.user
)
ks6X1 = ks6.insert_last('X1', convention='KS6X1')
ks6X2 = ks6.insert_last('X2', convention='KS6X2')
ks6.insert_last('X1', convention='KS6X1')
ks6.insert_last('X2', convention='KS6X2')
ks6D1 = ks6.insert_last('D1', definition_formal='X1 X2', convention='KS6D1')
data = {
@ -330,6 +334,35 @@ class TestChangeOperations(EndpointTester):
self.assertEqual(self.ks5D4.definition_formal, r'D1 X2 X3 S1 D1 D2 D3')
@decl_endpoint('/api/oss/{item}/update-operation', method='patch')
def test_change_substitutions_attribution(self):
self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X1).exists())
self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X2).exists())
self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4D2).exists())
data = {
'target': self.operation4.pk,
'item_data': {
'alias': 'Test4 mod',
'title': 'Test title mod',
'description': 'Comment mod'
},
'layout': self.layout_data,
'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': []
}
self.executeOK(data, item=self.owned_id)
self.assertEqual(self.operation4.getQ_substitutions().count(), 0)
x3 = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X2).exists())
self.assertFalse(Attribution.objects.filter(container=x3, attribute=self.ks4X2).exists())
self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4D2).exists())
self.assertFalse(Attribution.objects.filter(container=x3, attribute=self.ks4D2).exists())
self.assertFalse(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X1).exists())
self.assertTrue(Attribution.objects.filter(container=x3, attribute=self.ks4X1).exists())
@decl_endpoint('/api/oss/{item}/update-operation', method='patch')
def test_change_arguments(self):
data = {