M: Propagate cst_delete
This commit is contained in:
parent
79ad54ed84
commit
6a498ed2de
|
@ -102,6 +102,15 @@ class ChangeManager:
|
|||
''' Insert new inheritance. '''
|
||||
self.inheritance[inheritance.operation_id].append((inheritance.parent_id, inheritance.child_id))
|
||||
|
||||
def remove_cst(self, target: list[int], operation: int) -> None:
|
||||
''' Remove constituents from operation. '''
|
||||
subs = [sub for sub in self.substitutions if sub.original_id in target or sub.substitution_id in target]
|
||||
for sub in subs:
|
||||
self.substitutions.remove(sub)
|
||||
to_delete = [item for item in self.inheritance[operation] if item[1] in target]
|
||||
for item in to_delete:
|
||||
self.inheritance[operation].remove(item)
|
||||
|
||||
def _insert_new(self, schema: RSForm) -> None:
|
||||
self._schemas.append(schema)
|
||||
self._schema_by_id[schema.model.pk] = schema
|
||||
|
@ -142,6 +151,37 @@ class ChangeManager:
|
|||
alias_mapping[alias] = cst
|
||||
self._cascade_update_cst(target.pk, operation, data, old_data, alias_mapping)
|
||||
|
||||
def before_delete(self, target: list[Constituenta], source: RSForm) -> None:
|
||||
''' Trigger cascade resolutions before constituents are deleted. '''
|
||||
self.cache.insert(source)
|
||||
operation = self.cache.get_operation(source)
|
||||
self._cascade_before_delete(target, operation)
|
||||
|
||||
def _cascade_before_delete(self, target: list[Constituenta], operation: Operation) -> None:
|
||||
children = self.cache.graph.outputs[operation.pk]
|
||||
if len(children) == 0:
|
||||
return
|
||||
self.cache.ensure_loaded()
|
||||
for child_id in children:
|
||||
child_operation = self.cache.operation_by_id[child_id]
|
||||
child_schema = self.cache.get_schema(child_operation)
|
||||
if child_schema is None:
|
||||
continue
|
||||
child_schema.cache.ensure_loaded()
|
||||
|
||||
# TODO: check if substitutions are affected. Undo substitutions before deletion
|
||||
|
||||
child_target_cst = []
|
||||
child_target_ids = []
|
||||
for cst in target:
|
||||
successor_id = self.cache.get_successor_for(cst.pk, child_id, ignore_substitution=True)
|
||||
if successor_id is not None:
|
||||
child_target_ids.append(successor_id)
|
||||
child_target_cst.append(child_schema.cache.by_id[successor_id])
|
||||
self._cascade_before_delete(child_target_cst, child_operation)
|
||||
self.cache.remove_cst(child_target_ids, child_id)
|
||||
child_schema.delete_cst(child_target_cst)
|
||||
|
||||
def _cascade_create_cst(self, prototype: Constituenta, operation: Operation, mapping: CstMapping) -> None:
|
||||
children = self.cache.graph.outputs[operation.pk]
|
||||
if len(children) == 0:
|
||||
|
|
|
@ -35,8 +35,8 @@ class PropagationFacade:
|
|||
ChangeManager(host).on_update_cst(target, data, old_data, source)
|
||||
|
||||
@classmethod
|
||||
def before_delete(cls, delete_cst: list[Constituenta], source: RSForm) -> None:
|
||||
def before_delete(cls, target: list[Constituenta], source: RSForm) -> None:
|
||||
''' Trigger cascade resolutions before constituents are deleted. '''
|
||||
# hosts = _get_oss_hosts(source.model)
|
||||
# for host in hosts:
|
||||
# ChangeManager(host).before_delete(delete_cst, source)
|
||||
hosts = _get_oss_hosts(source.model)
|
||||
for host in hosts:
|
||||
ChangeManager(host).before_delete(target, source)
|
||||
|
|
|
@ -57,7 +57,6 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.ks3 = RSForm(self.operation3.result)
|
||||
self.assertEqual(self.ks3.constituents().count(), 4)
|
||||
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/create-cst', method='post')
|
||||
def test_create_constituenta(self):
|
||||
data = {
|
||||
|
@ -107,3 +106,14 @@ class TestChangeConstituents(EndpointTester):
|
|||
self.assertEqual(inherited_cst.term_raw, data['item_data']['term_raw'])
|
||||
self.assertEqual(inherited_cst.definition_formal, r'X1\X1')
|
||||
self.assertEqual(inherited_cst.definition_raw, r'@{X2|sing,datv}')
|
||||
|
||||
@decl_endpoint('/api/rsforms/{schema}/delete-multiple-cst', method='patch')
|
||||
def test_delete_constituenta(self):
|
||||
data = {'items': [self.ks2X1.pk]}
|
||||
response = self.executeOK(data=data, schema=self.ks2.model.pk)
|
||||
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks2D1.pk)
|
||||
self.ks2D1.refresh_from_db()
|
||||
self.assertEqual(self.ks2.constituents().count(), 1)
|
||||
self.assertEqual(self.ks3.constituents().count(), 3)
|
||||
self.assertEqual(self.ks2D1.definition_formal, r'DEL\DEL')
|
||||
self.assertEqual(inherited_cst.definition_formal, r'DEL\DEL')
|
||||
|
|
|
@ -23,6 +23,7 @@ from .api_RSLanguage import (
|
|||
from .Constituenta import Constituenta, CstType, extract_globals
|
||||
|
||||
INSERT_LAST: int = -1
|
||||
DELETED_ALIAS = 'DEL'
|
||||
|
||||
|
||||
class RSForm:
|
||||
|
@ -348,10 +349,12 @@ class RSForm:
|
|||
|
||||
def delete_cst(self, target: Iterable[Constituenta]) -> None:
|
||||
''' Delete multiple constituents. Do not check if listCst are from this schema. '''
|
||||
mapping = {cst.alias: DELETED_ALIAS for cst in target}
|
||||
self.cache.ensure_loaded()
|
||||
self.cache.remove_multi(target)
|
||||
self.apply_mapping(mapping)
|
||||
Constituenta.objects.filter(pk__in=[cst.pk for cst in target]).delete()
|
||||
self._reset_order()
|
||||
self.resolve_all_text()
|
||||
self.save()
|
||||
|
||||
def substitute(self, substitutions: list[tuple[Constituenta, Constituenta]]) -> None:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
''' Django: Models. '''
|
||||
|
||||
from .Constituenta import Constituenta, CstType, extract_globals, replace_entities, replace_globals
|
||||
from .RSForm import INSERT_LAST, RSForm
|
||||
from .RSForm import DELETED_ALIAS, INSERT_LAST, RSForm, SemanticInfo
|
||||
|
|
|
@ -174,6 +174,26 @@ class TestRSForm(DBTester):
|
|||
self.assertEqual(s2.definition_raw, '@{X11|plur}')
|
||||
|
||||
|
||||
def test_delete_cst(self):
|
||||
x1 = self.schema.insert_new('X1')
|
||||
x2 = self.schema.insert_new('X2')
|
||||
d1 = self.schema.insert_new(
|
||||
alias='D1',
|
||||
definition_formal='X1 = X2',
|
||||
definition_raw='@{X1|sing}',
|
||||
term_raw='@{X2|plur}'
|
||||
)
|
||||
|
||||
self.schema.delete_cst([x1])
|
||||
x2.refresh_from_db()
|
||||
d1.refresh_from_db()
|
||||
self.assertEqual(self.schema.constituents().count(), 2)
|
||||
self.assertEqual(x2.order, 1)
|
||||
self.assertEqual(d1.order, 2)
|
||||
self.assertEqual(d1.definition_formal, 'DEL = X2')
|
||||
self.assertEqual(d1.definition_raw, '@{DEL|sing}')
|
||||
self.assertEqual(d1.term_raw, '@{X2|plur}')
|
||||
|
||||
def test_apply_mapping(self):
|
||||
x1 = self.schema.insert_new('X1')
|
||||
x2 = self.schema.insert_new('X11')
|
||||
|
|
Loading…
Reference in New Issue
Block a user