From 2c3bcd3424db69de01fc963081f01ebc75cf2a13 Mon Sep 17 00:00:00 2001 From: Ivan <8611739+IRBorisov@users.noreply.github.com> Date: Wed, 12 Nov 2025 12:58:25 +0300 Subject: [PATCH] F: Improve attribution propagation on sub change --- .../apps/oss/models/PropagationEngine.py | 93 +++++++++++-------- .../oss/tests/s_propagation/t_operations.py | 39 +++++++- 2 files changed, 90 insertions(+), 42 deletions(-) diff --git a/rsconcept/backend/apps/oss/models/PropagationEngine.py b/rsconcept/backend/apps/oss/models/PropagationEngine.py index 645b6c5f..fc564d2d 100644 --- a/rsconcept/backend/apps/oss/models/PropagationEngine.py +++ b/rsconcept/backend/apps/oss/models/PropagationEngine.py @@ -1,6 +1,7 @@ ''' Models: Change propagation engine. ''' from typing import Optional +from django.db.models import Q from rest_framework.serializers import ValidationError from apps.rsform.models import Attribution, Constituenta, CstType, RSFormCached @@ -90,9 +91,7 @@ class PropagationEngine: # pylint: disable=too-many-arguments, too-many-positional-arguments def on_update_cst( - self, - operation: int, - cst_id: int, + self, operation: int, cst_id: int, data: dict, old_data: dict, mapping: CstMapping ) -> None: @@ -128,18 +127,20 @@ class PropagationEngine: mapping=new_mapping ) - def on_inherit_attribution(self, operationID: int, - items: list[Attribution], - exclude: Optional[list[int]] = None) -> None: + def on_inherit_attribution( + self, operationID: int, + items: list[Attribution], + exclude: Optional[list[int]] = None + ) -> None: ''' Trigger cascade resolutions when Attribution is inherited. ''' children = self.cache.extend_graph.outputs[operationID] if not children: return for child_id in children: if not exclude or child_id not in exclude: - self.inherit_attribution(child_id, items) + self.inherit_attributions(child_id, items) - def inherit_attribution(self, target: int, items: list[Attribution]) -> None: + def inherit_attributions(self, target: int, items: list[Attribution]) -> None: ''' Execute inheritance of Attributions. ''' operation = self.cache.operation_by_id[target] if operation.result is None or not items: @@ -194,30 +195,33 @@ class PropagationEngine: return self.cache.ensure_loaded_subs() for child_id in children: - child_operation = self.cache.operation_by_id[child_id] - child_schema = self.cache.get_result(child_operation) - if child_schema is None: + self._delete_child_attributions(child_id, attributions) + + def _delete_child_attributions(self, operationID: int, attributions: list[Attribution]) -> None: + child_operation = self.cache.operation_by_id[operationID] + child_schema = self.cache.get_result(child_operation) + if child_schema is None: + return + + deleted: list[Attribution] = [] + for attr in attributions: + new_container = self.cache.get_successor(attr.container_id, operationID) + new_attribute = self.cache.get_successor(attr.attribute_id, operationID) + if new_container is None or new_attribute is None: + continue + deleted_attr = Attribution.objects.filter( + container=new_container, + attribute=new_attribute + ).first() + if not deleted_attr: continue - deleted: list[Attribution] = [] - for attr in attributions: - new_container = self.cache.get_successor(attr.container_id, child_id) - new_attribute = self.cache.get_successor(attr.attribute_id, child_id) - if new_container is None or new_attribute is None: - continue - deleted_attr = Attribution.objects.filter( - container=new_container, - attribute=new_attribute - ).first() - if not deleted_attr: - continue + if not self._has_alternative_attribution(operationID, attr.container_id, attr.attribute_id): + deleted.append(deleted_attr) - if not self._has_alternative_attribution(child_id, attr.container_id, attr.attribute_id): - deleted.append(deleted_attr) - - if deleted: - self.on_delete_attribution(child_id, deleted) - Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete() + if deleted: + self.on_delete_attribution(operationID, deleted) + Attribution.objects.filter(pk__in=[attrib.pk for attrib in deleted]).delete() def _has_alternative_attribution(self, operationID: int, container: int, attribute: int) -> bool: ''' Check if there is an alternative attribution among substitutions. ''' @@ -249,7 +253,10 @@ class PropagationEngine: self.cache.remove_cst(operationID, target_ids) schema.delete_cst(target_ids) - def undo_substitutions_cst(self, target_ids: list[int], operation: Operation, schema: RSFormCached) -> None: + def undo_substitutions_cst( + self, target_ids: list[int], + operation: Operation, schema: RSFormCached + ) -> None: ''' Undo substitutions for Constituents. ''' to_process = [] for sub in self.cache.substitutions[operation.pk]: @@ -259,17 +266,23 @@ class PropagationEngine: self.undo_substitution(schema, sub, target_ids) def undo_substitution( - self, - schema: RSFormCached, - target: Substitution, - ignore_parents: Optional[list[int]] = None + self, schema: RSFormCached, target: Substitution, + ignore_parents: Optional[list[int]] = None ) -> None: ''' Undo target substitution. ''' if ignore_parents is None: ignore_parents = [] operation_id = target.operation_id - original_schema = self.context.get_schema(target.original.schema_id) + + original_attributions = list(Attribution.objects.filter( + Q(container=target.original_id) | + Q(attribute=target.original_id) + )) + if original_attributions: + self._delete_child_attributions(operation_id, original_attributions) + dependant = [] + original_schema = self.context.get_schema(target.original.schema_id) for cst_id in original_schema.get_dependant([target.original_id]): if cst_id not in ignore_parents: inheritor_id = self.cache.get_inheritor(cst_id, operation_id) @@ -284,6 +297,9 @@ class PropagationEngine: full_cst = Constituenta.objects.get(pk=target.original_id) cst_mapping = create_dependant_mapping(original_schema, [full_cst]) self.inherit_cst(operation_id, original_schema, [full_cst], cst_mapping) + if original_attributions: + self.inherit_attributions(operation_id, original_attributions) + new_original_id = self.cache.get_inheritor(target.original_id, operation_id) assert new_original_id is not None new_original = schema.cache.by_id[new_original_id] @@ -295,6 +311,7 @@ class PropagationEngine: mapping = {substitution_inheritor.alias: new_original} self._on_partial_mapping(mapping, dependant, operation_id, schema) + def _determine_insert_position( self, prototype_id: int, operation: Operation, @@ -332,8 +349,7 @@ class PropagationEngine: return result def _transform_substitutions( - self, - target: CstSubstitution, + self, target: CstSubstitution, operation: int, schema: RSFormCached ) -> CstSubstitution: @@ -373,8 +389,7 @@ class PropagationEngine: return result def _on_partial_mapping( - self, - mapping: CstMapping, + self, mapping: CstMapping, target: list[int], operation: int, schema: RSFormCached diff --git a/rsconcept/backend/apps/oss/tests/s_propagation/t_operations.py b/rsconcept/backend/apps/oss/tests/s_propagation/t_operations.py index 326fe829..fdec9d02 100644 --- a/rsconcept/backend/apps/oss/tests/s_propagation/t_operations.py +++ b/rsconcept/backend/apps/oss/tests/s_propagation/t_operations.py @@ -1,7 +1,7 @@ ''' Testing API: Change substitutions in OSS. ''' from apps.oss.models import OperationSchema, OperationType -from apps.rsform.models import Constituenta, RSForm +from apps.rsform.models import Attribution, Constituenta, RSForm from shared.EndpointTester import EndpointTester, decl_endpoint @@ -25,6 +25,7 @@ class TestChangeOperations(EndpointTester): self.ks1X1 = self.ks1.insert_last('X1', convention='KS1X1') self.ks1X2 = self.ks1.insert_last('X2', convention='KS1X2') self.ks1D1 = self.ks1.insert_last('D1', definition_formal='X1 X2', convention='KS1D1') + Attribution.objects.create(container=self.ks1X1, attribute=self.ks1X2) self.ks2 = RSForm.create( alias='KS2', @@ -38,6 +39,7 @@ class TestChangeOperations(EndpointTester): definition_formal=r'X1', convention='KS2S1' ) + Attribution.objects.create(container=self.ks2S1, attribute=self.ks2X1) self.ks3 = RSForm.create( alias='KS3', @@ -80,6 +82,7 @@ class TestChangeOperations(EndpointTester): self.operation4.refresh_from_db() self.ks4 = RSForm(self.operation4.result) self.ks4X1 = Constituenta.objects.get(as_child__parent_id=self.ks1X2.pk) + self.ks4X2 = Constituenta.objects.get(as_child__parent_id=self.ks2X1.pk) self.ks4S1 = Constituenta.objects.get(as_child__parent_id=self.ks2S1.pk) self.ks4D1 = Constituenta.objects.get(as_child__parent_id=self.ks1D1.pk) self.ks4D2 = self.ks4.insert_last( @@ -87,6 +90,7 @@ class TestChangeOperations(EndpointTester): definition_formal=r'X1 X2 X3 S1 D1', convention='KS4D2' ) + Attribution.objects.create(container=self.ks4S1, attribute=self.ks4D2) self.operation5 = self.owned.create_operation( alias='5', @@ -179,8 +183,8 @@ class TestChangeOperations(EndpointTester): title='Test6', owner=self.user ) - ks6X1 = ks6.insert_last('X1', convention='KS6X1') - ks6X2 = ks6.insert_last('X2', convention='KS6X2') + ks6.insert_last('X1', convention='KS6X1') + ks6.insert_last('X2', convention='KS6X2') ks6D1 = ks6.insert_last('D1', definition_formal='X1 X2', convention='KS6D1') data = { @@ -330,6 +334,35 @@ class TestChangeOperations(EndpointTester): self.assertEqual(self.ks5D4.definition_formal, r'D1 X2 X3 S1 D1 D2 D3') + @decl_endpoint('/api/oss/{item}/update-operation', method='patch') + def test_change_substitutions_attribution(self): + self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X1).exists()) + self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X2).exists()) + self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4D2).exists()) + + data = { + 'target': self.operation4.pk, + 'item_data': { + 'alias': 'Test4 mod', + 'title': 'Test title mod', + 'description': 'Comment mod' + }, + 'layout': self.layout_data, + 'arguments': [self.operation1.pk, self.operation2.pk], + 'substitutions': [] + } + + self.executeOK(data, item=self.owned_id) + self.assertEqual(self.operation4.getQ_substitutions().count(), 0) + x3 = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk) + self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X2).exists()) + self.assertFalse(Attribution.objects.filter(container=x3, attribute=self.ks4X2).exists()) + self.assertTrue(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4D2).exists()) + self.assertFalse(Attribution.objects.filter(container=x3, attribute=self.ks4D2).exists()) + self.assertFalse(Attribution.objects.filter(container=self.ks4S1, attribute=self.ks4X1).exists()) + self.assertTrue(Attribution.objects.filter(container=x3, attribute=self.ks4X1).exists()) + + @decl_endpoint('/api/oss/{item}/update-operation', method='patch') def test_change_arguments(self): data = {