ConceptPortal-public/rsconcept/backend/apps/oss/models/PropagationFacade.py
Ivan 3f97562fae
Some checks failed
Frontend CI / build (22.x) (push) Waiting to run
Backend CI / build (3.12) (push) Has been cancelled
F: Propagate operation changes to OSS
2024-08-16 21:05:06 +03:00

62 lines
2.5 KiB
Python

''' Models: Change propagation facade - managing all changes in OSS. '''
from apps.library.models import LibraryItem, LibraryItemType
from apps.rsform.models import Constituenta, RSForm
from .OperationSchema import CstSubstitution, OperationSchema
def _get_oss_hosts(item: LibraryItem) -> list[LibraryItem]:
''' Get all hosts for LibraryItem. '''
return list(LibraryItem.objects.filter(operations__result=item).only('pk'))
class PropagationFacade:
''' Change propagation API. '''
@staticmethod
def after_create_cst(source: RSForm, new_cst: list[Constituenta]) -> None:
''' Trigger cascade resolutions when new constituent is created. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
OperationSchema(host).after_create_cst(source, new_cst)
@staticmethod
def after_change_cst_type(source: RSForm, target: Constituenta) -> None:
''' Trigger cascade resolutions when constituenta type is changed. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
OperationSchema(host).after_change_cst_type(source, target)
@staticmethod
def after_update_cst(source: RSForm, target: Constituenta, data: dict, old_data: dict) -> None:
''' Trigger cascade resolutions when constituenta data is changed. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
OperationSchema(host).after_update_cst(source, target, data, old_data)
@staticmethod
def before_delete_cst(source: RSForm, target: list[Constituenta]) -> None:
''' Trigger cascade resolutions before constituents are deleted. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
OperationSchema(host).before_delete_cst(source, target)
@staticmethod
def before_substitute(source: RSForm, substitutions: CstSubstitution) -> None:
''' Trigger cascade resolutions before constituents are substituted. '''
hosts = _get_oss_hosts(source.model)
for host in hosts:
OperationSchema(host).before_substitute(source, substitutions)
@staticmethod
def before_delete_schema(item: LibraryItem) -> None:
''' Trigger cascade resolutions before schema is deleted. '''
if item.item_type != LibraryItemType.RSFORM:
return
hosts = _get_oss_hosts(item)
if len(hosts) == 0:
return
schema = RSForm(item)
PropagationFacade.before_delete_cst(schema, list(schema.constituents()))