F: Copy attributions when cloning schema for import

This commit is contained in:
Ivan 2025-11-05 19:18:01 +03:00
parent dbe627a394
commit 30d24f4fb6
2 changed files with 42 additions and 16 deletions

View File

@ -1,7 +1,7 @@
''' Testing API: Operation Schema - operations manipulation. ''' ''' Testing API: Operation Schema - operations manipulation. '''
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType from apps.library.models import Editor, LibraryItem
from apps.oss.models import Argument, Operation, OperationSchema, OperationType, Replica from apps.oss.models import Argument, Operation, OperationSchema, OperationType, Replica
from apps.rsform.models import Constituenta, RSForm from apps.rsform.models import Attribution, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint from shared.EndpointTester import EndpointTester, decl_endpoint
@ -627,6 +627,9 @@ class TestOssOperations(EndpointTester):
title='Target', title='Target',
owner=self.user owner=self.user
) )
x1 = target_ks.insert_last('X1')
x2 = target_ks.insert_last('X2')
Attribution.objects.create(container=x1, attribute=x2)
data = { data = {
'item_data': { 'item_data': {
'alias': 'ImportedAlias', 'alias': 'ImportedAlias',
@ -649,24 +652,30 @@ class TestOssOperations(EndpointTester):
new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id) new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id)
layout = response.data['oss']['layout'] layout = response.data['oss']['layout']
operation_node = [item for item in layout if item['nodeID'] == 'o' + str(new_operation_id)][0] operation_node = [item for item in layout if item['nodeID'] == 'o' + str(new_operation_id)][0]
schema = LibraryItem.objects.get(pk=new_operation['result']) new_item = LibraryItem.objects.get(pk=new_operation['result'])
new_schema = RSForm(new_item)
attributions = Attribution.objects.filter(container__schema=new_item)
self.assertEqual(new_schema.constituentsQ().count(), target_ks.constituentsQ().count())
self.assertEqual(len(attributions), 1)
self.assertEqual(new_operation['alias'], data['item_data']['alias']) self.assertEqual(new_operation['alias'], data['item_data']['alias'])
self.assertEqual(new_operation['title'], data['item_data']['title']) self.assertEqual(new_operation['title'], data['item_data']['title'])
self.assertEqual(new_operation['description'], data['item_data']['description']) self.assertEqual(new_operation['description'], data['item_data']['description'])
self.assertEqual(new_operation['operation_type'], OperationType.INPUT) self.assertEqual(new_operation['operation_type'], OperationType.INPUT)
self.assertEqual(schema.pk, target_ks.model.pk) # Not a clone self.assertEqual(new_item.pk, target_ks.model.pk) # Not a clone
self.assertEqual(operation_node['x'], data['position']['x']) self.assertEqual(operation_node['x'], data['position']['x'])
self.assertEqual(operation_node['y'], data['position']['y']) self.assertEqual(operation_node['y'], data['position']['y'])
self.assertEqual(operation_node['width'], data['position']['width']) self.assertEqual(operation_node['width'], data['position']['width'])
self.assertEqual(operation_node['height'], data['position']['height']) self.assertEqual(operation_node['height'], data['position']['height'])
self.assertEqual(schema.visible, target_ks.model.visible) self.assertEqual(new_item.visible, target_ks.model.visible)
self.assertEqual(schema.access_policy, target_ks.model.access_policy) self.assertEqual(new_item.access_policy, target_ks.model.access_policy)
self.assertEqual(schema.location, target_ks.model.location) self.assertEqual(new_item.location, target_ks.model.location)
@decl_endpoint('/api/oss/{item}/import-schema', method='post') @decl_endpoint('/api/oss/{item}/import-schema', method='post')
def test_import_schema_clone(self): def test_import_schema_clone(self):
self.populateData() self.populateData()
# Use ks2 as the source RSForm # Use ks2 as the source RSForm
x3 = self.ks2.insert_last('X3')
Attribution.objects.create(container=self.ks2X1, attribute=x3)
data = { data = {
'item_data': { 'item_data': {
'alias': 'ClonedAlias', 'alias': 'ClonedAlias',
@ -689,22 +698,26 @@ class TestOssOperations(EndpointTester):
new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id) new_operation = next(op for op in response.data['oss']['operations'] if op['id'] == new_operation_id)
layout = response.data['oss']['layout'] layout = response.data['oss']['layout']
operation_node = [item for item in layout if item['nodeID'] == 'o' + str(new_operation_id)][0] operation_node = [item for item in layout if item['nodeID'] == 'o' + str(new_operation_id)][0]
schema = LibraryItem.objects.get(pk=new_operation['result']) new_item = LibraryItem.objects.get(pk=new_operation['result'])
new_schema = RSForm(new_item)
attributions = Attribution.objects.filter(container__schema=new_item)
self.assertEqual(new_schema.constituentsQ().count(), self.ks2.constituentsQ().count())
self.assertEqual(len(attributions), 1)
self.assertEqual(new_operation['alias'], data['item_data']['alias']) self.assertEqual(new_operation['alias'], data['item_data']['alias'])
self.assertEqual(new_operation['title'], data['item_data']['title']) self.assertEqual(new_operation['title'], data['item_data']['title'])
self.assertEqual(new_operation['description'], data['item_data']['description']) self.assertEqual(new_operation['description'], data['item_data']['description'])
self.assertEqual(new_operation['operation_type'], OperationType.INPUT) self.assertEqual(new_operation['operation_type'], OperationType.INPUT)
self.assertNotEqual(schema.pk, self.ks2.model.pk) # Should be a clone self.assertNotEqual(new_item.pk, self.ks2.model.pk) # Should be a clone
self.assertEqual(schema.alias, data['item_data']['alias']) self.assertEqual(new_item.alias, data['item_data']['alias'])
self.assertEqual(schema.title, data['item_data']['title']) self.assertEqual(new_item.title, data['item_data']['title'])
self.assertEqual(schema.description, data['item_data']['description']) self.assertEqual(new_item.description, data['item_data']['description'])
self.assertEqual(operation_node['x'], data['position']['x']) self.assertEqual(operation_node['x'], data['position']['x'])
self.assertEqual(operation_node['y'], data['position']['y']) self.assertEqual(operation_node['y'], data['position']['y'])
self.assertEqual(operation_node['width'], data['position']['width']) self.assertEqual(operation_node['width'], data['position']['width'])
self.assertEqual(operation_node['height'], data['position']['height']) self.assertEqual(operation_node['height'], data['position']['height'])
self.assertEqual(schema.visible, False) self.assertEqual(new_item.visible, False)
self.assertEqual(schema.access_policy, self.owned.model.access_policy) self.assertEqual(new_item.access_policy, self.owned.model.access_policy)
self.assertEqual(schema.location, self.owned.model.location) self.assertEqual(new_item.location, self.owned.model.location)
@decl_endpoint('/api/oss/{item}/import-schema', method='post') @decl_endpoint('/api/oss/{item}/import-schema', method='post')

View File

@ -14,7 +14,7 @@ from rest_framework.response import Response
from apps.library.models import LibraryItem, LibraryItemType from apps.library.models import LibraryItem, LibraryItemType
from apps.library.serializers import LibraryItemSerializer from apps.library.serializers import LibraryItemSerializer
from apps.rsform.models import Constituenta, RSFormCached from apps.rsform.models import Attribution, Constituenta, RSFormCached
from apps.rsform.serializers import CstTargetSerializer from apps.rsform.serializers import CstTargetSerializer
from shared import messages as msg from shared import messages as msg
from shared import permissions from shared import permissions
@ -36,11 +36,24 @@ def _create_clone(prototype: LibraryItem, operation: m.Operation, oss: LibraryIt
clone.access_policy = oss.access_policy clone.access_policy = oss.access_policy
clone.location = oss.location clone.location = oss.location
clone.save() clone.save()
cst_map: dict[int, int] = {}
cst_list: list[int] = []
for cst in Constituenta.objects.filter(schema_id=prototype.pk): for cst in Constituenta.objects.filter(schema_id=prototype.pk):
old_pk = cst.pk
cst_copy = deepcopy(cst) cst_copy = deepcopy(cst)
cst_copy.pk = None cst_copy.pk = None
cst_copy.schema = clone cst_copy.schema = clone
cst_copy.save() cst_copy.save()
cst_map[old_pk] = cst_copy.pk
cst_list.append(old_pk)
for attr in Attribution.objects.filter(container__in=cst_list, attribute__in=cst_list):
attr.pk = None
attr.container_id = cst_map[attr.container_id]
attr.attribute_id = cst_map[attr.attribute_id]
attr.save()
return clone return clone