M: Change propogation

This commit is contained in:
Ivan 2024-08-09 20:57:03 +03:00
parent 5c4c0b38d5
commit 1600c8abd2
21 changed files with 466 additions and 111 deletions

View File

@ -0,0 +1,20 @@
# Generated by Django 5.0.7 on 2024-08-09 13:56
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('library', '0003_alter_librarytemplate_lib_source'),
('oss', '0005_inheritance_operation'),
]
operations = [
migrations.AlterField(
model_name='operation',
name='oss',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='operations', to='library.libraryitem', verbose_name='Схема синтеза'),
),
]

View File

@ -0,0 +1,194 @@
''' Models: Change propagation manager. '''
from typing import Optional, cast
from apps.library.models import LibraryItem
from apps.rsform.graph import Graph
from apps.rsform.models import INSERT_LAST, Constituenta, CstType, RSForm
from .Inheritance import Inheritance
from .Operation import Operation
from .OperationSchema import OperationSchema
from .Substitution import Substitution
AliasMapping = dict[str, Constituenta]
# TODO: add more variety tests for cascade resolutions model
class ChangeManager:
''' Change propagation API. '''
class Cache:
''' Cache for RSForm constituents. '''
def __init__(self, oss: OperationSchema):
self._oss = oss
self._schemas: list[RSForm] = []
self._schema_by_id: dict[int, RSForm] = {}
self.operations = list(oss.operations().only('result_id'))
self.operation_by_id = {operation.pk: operation for operation in self.operations}
self.graph = Graph[int]()
for operation in self.operations:
self.graph.add_node(operation.pk)
for argument in self._oss.arguments().only('operation_id', 'argument_id'):
self.graph.add_edge(argument.argument_id, argument.operation_id)
self.is_loaded = False
self.substitutions: list[Substitution] = []
self.inheritance: dict[int, list[tuple[int, int]]] = {}
def insert(self, schema: RSForm) -> None:
''' Insert new schema. '''
if not self._schema_by_id.get(schema.model.pk):
self._insert_new(schema)
def get_schema(self, operation: Operation) -> Optional[RSForm]:
''' Get schema by Operation. '''
if operation.result_id is None:
return None
if operation.result_id in self._schema_by_id:
return self._schema_by_id[operation.result_id]
else:
schema = RSForm.from_id(operation.result_id)
schema.cache.ensure_loaded()
self._insert_new(schema)
return schema
def get_operation(self, schema: RSForm) -> Optional[Operation]:
''' Get operation by schema. '''
for operation in self.operations:
if operation.result_id == schema.model.pk:
return operation
return None
def ensure_loaded(self) -> None:
''' Ensure propagation of changes. '''
if self.is_loaded:
return
self.is_loaded = True
self.substitutions = list(self._oss.substitutions().only('operation_id', 'original_id', 'substitution_id'))
for operation in self.operations:
self.inheritance[operation.pk] = []
for item in self._oss.inheritance().only('operation_id', 'parent_id', 'child_id'):
self.inheritance[item.operation_id].append((item.parent_id, item.child_id))
def get_successor_for(self, parent_cst: int, operation: int,
ignore_substitution: bool = False) -> Optional[int]:
''' Get child for parent inside target RSFrom. '''
if not ignore_substitution:
for sub in self.substitutions:
if sub.operation_id == operation and sub.original_id == parent_cst:
return sub.substitution_id
for item in self.inheritance[operation]:
if item[0] == parent_cst:
return item[1]
return None
def insert_inheritance(self, inheritance: Inheritance) -> None:
''' Insert new inheritance. '''
self.inheritance[inheritance.operation_id].append((inheritance.parent_id, inheritance.child_id))
def _insert_new(self, schema: RSForm) -> None:
self._schemas.append(schema)
self._schema_by_id[schema.model.pk] = schema
def __init__(self, model: LibraryItem):
self.oss = OperationSchema(model)
self.cache = ChangeManager.Cache(self.oss)
def on_create_cst(self, new_cst: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when new constituent is created. '''
self.cache.insert(source)
depend_aliases = new_cst.extract_references()
alias_mapping: AliasMapping = {}
for alias in depend_aliases:
cst = source.cache.by_alias.get(alias)
if cst is not None:
alias_mapping[alias] = cst
operation = self.cache.get_operation(source)
if operation is None:
return
self._create_cst_cascade(new_cst, operation, alias_mapping)
def on_change_cst_type(self, target: Constituenta, source: RSForm) -> None:
''' Trigger cascade resolutions when new constituent type is changed. '''
self.cache.insert(source)
operation = self.cache.get_operation(source)
if operation is None:
return
self._change_cst_type_cascade(target.pk, target.cst_type, operation)
def _change_cst_type_cascade(self, cst_id: int, ctype: CstType, operation: Operation) -> None:
children = self.cache.graph.outputs[operation.pk]
if len(children) == 0:
return
self.cache.ensure_loaded()
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
successor_id = self.cache.get_successor_for(cst_id, child_id, ignore_substitution=True)
if successor_id is None:
continue
child_schema = self.cache.get_schema(child_operation)
if child_schema is not None and child_schema.change_cst_type(successor_id, ctype):
self._change_cst_type_cascade(successor_id, ctype, child_operation)
def _create_cst_cascade(self, prototype: Constituenta, source: Operation, mapping: AliasMapping) -> None:
children = self.cache.graph.outputs[source.pk]
if len(children) == 0:
return
source_schema = self.cache.get_schema(source)
assert source_schema is not None
for child_id in children:
child_operation = self.cache.operation_by_id[child_id]
child_schema = self.cache.get_schema(child_operation)
if child_schema is None:
continue
self.cache.ensure_loaded()
new_mapping = self._transform_mapping(mapping, child_operation, child_schema)
alias_mapping = {alias: cst.alias for alias, cst in new_mapping.items()}
insert_where = self._determine_insert_position(prototype, child_operation, source_schema, child_schema)
new_cst = child_schema.insert_copy([prototype], insert_where, alias_mapping)[0]
new_inheritance = Inheritance.objects.create(
operation=child_operation,
child=new_cst,
parent=prototype
)
self.cache.insert_inheritance(new_inheritance)
new_mapping = {alias_mapping[alias]: cst for alias, cst in new_mapping.items()}
self._create_cst_cascade(new_cst, child_operation, new_mapping)
def _transform_mapping(self, mapping: AliasMapping, operation: Operation, schema: RSForm) -> AliasMapping:
if len(mapping) == 0:
return mapping
result: AliasMapping = {}
for alias, cst in mapping.items():
successor_id = self.cache.get_successor_for(cst.pk, operation.pk)
if successor_id is None:
continue
successor = schema.cache.by_id.get(successor_id)
if successor is None:
continue
result[alias] = successor
return result
def _determine_insert_position(
self, prototype: Constituenta,
operation: Operation,
source: RSForm,
destination: RSForm
) -> int:
''' Determine insert_after for new constituenta. '''
if prototype.order == 1:
return 1
prev_cst = source.cache.constituents[prototype.order - 2]
inherited_prev_id = self.cache.get_successor_for(
source.cache.constituents[prototype.order - 2].pk, operation.pk)
if inherited_prev_id is None:
return INSERT_LAST
prev_cst = destination.cache.by_id[inherited_prev_id]
return cast(int, prev_cst.order) + 1

View File

@ -27,7 +27,7 @@ class Operation(Model):
verbose_name='Схема синтеза',
to='library.LibraryItem',
on_delete=CASCADE,
related_name='items'
related_name='operations'
)
operation_type: CharField = CharField(
verbose_name='Тип',

View File

@ -50,6 +50,10 @@ class OperationSchema:
''' Operation substitutions. '''
return Substitution.objects.filter(operation__oss=self.model)
def inheritance(self) -> QuerySet[Inheritance]:
''' Operation inheritances. '''
return Inheritance.objects.filter(operation__oss=self.model)
def owned_schemas(self) -> QuerySet[LibraryItem]:
''' Get QuerySet containing all result schemas owned by current OSS. '''
return LibraryItem.objects.filter(

View File

@ -1,6 +1,7 @@
''' Django: Models. '''
from .Argument import Argument
from .ChangeManager import ChangeManager
from .Inheritance import Inheritance
from .Operation import Operation, OperationType
from .OperationSchema import OperationSchema

View File

@ -14,9 +14,9 @@ class TestSynthesisSubstitution(TestCase):
self.oss = OperationSchema.create(alias='T1')
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks1X1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.ks2X1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = Operation.objects.create(
oss=self.oss.model,
@ -46,13 +46,13 @@ class TestSynthesisSubstitution(TestCase):
self.substitution = Substitution.objects.create(
operation=self.operation3,
original=self.ks1x1,
substitution=self.ks2x1
original=self.ks1X1,
substitution=self.ks2X1
)
def test_str(self):
testStr = f'{self.ks1x1} -> {self.ks2x1}'
testStr = f'{self.ks1X1} -> {self.ks2X1}'
self.assertEqual(str(self.substitution), testStr)
@ -64,11 +64,11 @@ class TestSynthesisSubstitution(TestCase):
def test_cascade_delete_original(self):
self.assertEqual(Substitution.objects.count(), 1)
self.ks1x1.delete()
self.ks1X1.delete()
self.assertEqual(Substitution.objects.count(), 0)
def test_cascade_delete_substitution(self):
self.assertEqual(Substitution.objects.count(), 1)
self.ks2x1.delete()
self.ks2X1.delete()
self.assertEqual(Substitution.objects.count(), 0)

View File

@ -1,3 +1,4 @@
''' Tests for REST API. '''
from .t_change_attributes import *
from .t_change_constituents import *
from .t_oss import *

View File

@ -1,7 +1,4 @@
''' Testing API: Change attributes of OSS and RSForms. '''
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LocationHead
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm

View File

@ -0,0 +1,86 @@
''' Testing API: Change constituents in OSS. '''
from apps.oss.models import OperationSchema, OperationType
from apps.rsform.models import Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestChangeConstituents(EndpointTester):
''' Testing Constituents change propagation in OSS. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(
title='Test',
alias='T1',
owner=self.user
)
self.owned_id = self.owned.model.pk
self.ks1 = RSForm.create(
alias='KS1',
title='Test1',
owner=self.user
)
self.ks1X1 = self.ks1.insert_new('X4')
self.ks1X2 = self.ks1.insert_new('X5')
self.ks2 = RSForm.create(
alias='KS2',
title='Test2',
owner=self.user
)
self.ks2X1 = self.ks2.insert_new('X1')
self.ks2D1 = self.ks2.insert_new(
alias='D1',
definition_formal=r'X1\X1'
)
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.model
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.model
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.execute_operation(self.operation3)
self.operation3.refresh_from_db()
self.ks3 = RSForm(self.operation3.result)
self.assertEqual(self.ks3.constituents().count(), 4)
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
def test_create_constituenta(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'definition_formal': 'X4 = X5'
}
response = self.executeCreated(data=data, item=self.ks1.model.pk)
new_cst = Constituenta.objects.get(pk=response.data['new_cst']['id'])
inherited_cst = Constituenta.objects.get(as_child__parent_id=new_cst.pk)
self.assertEqual(self.ks1.constituents().count(), 3)
self.assertEqual(self.ks3.constituents().count(), 5)
self.assertEqual(inherited_cst.alias, 'X4')
self.assertEqual(inherited_cst.order, 3)
self.assertEqual(inherited_cst.definition_formal, 'X1 = X2')
@decl_endpoint('/api/rsforms/{item}/rename-cst', method='patch')
def test_rename_constituenta(self):
data = {'target': self.ks1X1.pk, 'alias': 'D21', 'cst_type': CstType.TERM}
response = self.executeOK(data=data, item=self.ks1.model.pk)
self.ks1X1.refresh_from_db()
inherited_cst = Constituenta.objects.get(as_child__parent_id=self.ks1X1.pk)
self.assertEqual(self.ks1X1.alias, data['alias'])
self.assertEqual(self.ks1X1.cst_type, data['cst_type'])
self.assertEqual(inherited_cst.alias, 'D2')
self.assertEqual(inherited_cst.cst_type, data['cst_type'])

View File

@ -1,8 +1,5 @@
''' Testing API: Operation Schema. '''
from rest_framework import status
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType, LocationHead
from apps.library.models import AccessPolicy, Editor, LibraryItem, LibraryItemType
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
@ -28,7 +25,7 @@ class TestOssViewset(EndpointTester):
title='Test1',
owner=self.user
)
self.ks1x1 = self.ks1.insert_new(
self.ks1X1 = self.ks1.insert_new(
'X1',
term_raw='X1_1',
term_resolved='X1_1'
@ -38,7 +35,7 @@ class TestOssViewset(EndpointTester):
title='Test2',
owner=self.user
)
self.ks2x1 = self.ks2.insert_new(
self.ks2X1 = self.ks2.insert_new(
'X2',
term_raw='X1_2',
term_resolved='X1_2'
@ -60,8 +57,8 @@ class TestOssViewset(EndpointTester):
)
self.owned.set_arguments(self.operation3, [self.operation1, self.operation2])
self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1,
'substitution': self.ks2x1
'original': self.ks1X1,
'substitution': self.ks2X1
}])
@decl_endpoint('/api/oss/{item}/details', method='get')
@ -85,12 +82,12 @@ class TestOssViewset(EndpointTester):
self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk)
self.assertEqual(sub['original_alias'], self.ks1x1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved)
self.assertEqual(sub['original'], self.ks1X1.pk)
self.assertEqual(sub['substitution'], self.ks2X1.pk)
self.assertEqual(sub['original_alias'], self.ks1X1.alias)
self.assertEqual(sub['original_term'], self.ks1X1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2X1.alias)
self.assertEqual(sub['substitution_term'], self.ks2X1.term_resolved)
arguments = response.data['arguments']
self.assertEqual(len(arguments), 2)
@ -369,14 +366,14 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [
{
'original': self.ks1x1.pk,
'original': self.ks1X1.pk,
'substitution': ks3x1.pk
}
]
}
self.executeBadData(data=data)
data['substitutions'][0]['substitution'] = self.ks2x1.pk
data['substitutions'][0]['substitution'] = self.ks2X1.pk
self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id)
self.logout()
@ -421,7 +418,7 @@ class TestOssViewset(EndpointTester):
def test_update_operation_invalid_substitution(self):
self.populateData()
self.ks1x2 = self.ks1.insert_new('X2')
self.ks1X2 = self.ks1.insert_new('X2')
data = {
'target': self.operation3.pk,
@ -434,12 +431,12 @@ class TestOssViewset(EndpointTester):
'arguments': [self.operation1.pk, self.operation2.pk],
'substitutions': [
{
'original': self.ks1x1.pk,
'substitution': self.ks2x1.pk
'original': self.ks1X1.pk,
'substitution': self.ks2X1.pk
},
{
'original': self.ks2x1.pk,
'substitution': self.ks1x2.pk
'original': self.ks2X1.pk,
'substitution': self.ks1X2.pk
}
]
}
@ -473,4 +470,4 @@ class TestOssViewset(EndpointTester):
items = list(RSForm(schema).constituents())
self.assertEqual(len(items), 1)
self.assertEqual(items[0].alias, 'X1')
self.assertEqual(items[0].term_resolved, self.ks2x1.term_resolved)
self.assertEqual(items[0].term_resolved, self.ks2X1.term_resolved)

View File

@ -1,6 +1,7 @@
''' Models: Constituenta. '''
import re
from cctext import extract_entities
from django.core.validators import MinValueValidator
from django.db.models import (
CASCADE,
@ -15,10 +16,16 @@ from django.db.models import (
from ..utils import apply_pattern
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_REF_ENTITY_PATTERN = re.compile(r'@{([^0-9\-].*?)\|.*?}')
_GLOBAL_ID_PATTERN = re.compile(r'([XCSADFPT][0-9]+)') # cspell:disable-line
def extract_globals(expression: str) -> set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
class CstType(TextChoices):
''' Type of constituenta. '''
BASE = 'basic'
@ -120,3 +127,10 @@ class Constituenta(Model):
modified = True
self.definition_raw = definition
return modified
def extract_references(self) -> set[str]:
''' Extract all references from term and definition. '''
result: set[str] = extract_globals(self.definition_formal)
result.update(extract_entities(self.term_raw))
result.update(extract_entities(self.definition_raw))
return result

View File

@ -11,7 +11,6 @@ from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
extract_globals,
generate_structure,
get_type_prefix,
guess_type,
@ -21,9 +20,9 @@ from .api_RSLanguage import (
is_simple_expression,
split_template
)
from .Constituenta import Constituenta, CstType
from .Constituenta import Constituenta, CstType, extract_globals
_INSERT_LAST: int = -1
INSERT_LAST: int = -1
class RSForm:
@ -54,7 +53,7 @@ class RSForm:
self.by_alias = {cst.alias: cst for cst in self.constituents}
self.is_loaded = True
def ensure(self) -> None:
def ensure_loaded(self) -> None:
if not self.is_loaded:
self.reload()
@ -140,7 +139,7 @@ class RSForm:
def on_term_change(self, changed: list[int]) -> None:
''' Trigger cascade resolutions when term changes. '''
self.cache.ensure()
self.cache.ensure_loaded()
graph_terms = self._graph_term()
expansion = graph_terms.expand_outputs(changed)
expanded_change = changed + expansion
@ -188,7 +187,7 @@ class RSForm:
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create new cst from data. '''
if insert_after is None:
position = _INSERT_LAST
position = INSERT_LAST
else:
position = insert_after.order + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
@ -217,7 +216,7 @@ class RSForm:
self,
alias: str,
cst_type: Optional[CstType] = None,
position: int = _INSERT_LAST,
position: int = INSERT_LAST,
**kwargs
) -> Constituenta:
''' Insert new constituenta at given position.
@ -239,13 +238,14 @@ class RSForm:
self.save()
return result
def insert_copy(self, items: list[Constituenta], position: int = _INSERT_LAST) -> list[Constituenta]:
def insert_copy(self, items: list[Constituenta], position: int = INSERT_LAST,
initial_mapping: Optional[dict[str, str]] = None) -> list[Constituenta]:
''' Insert copy of target constituents updating references. '''
count = len(items)
if count == 0:
return []
self.cache.ensure()
self.cache.ensure_loaded()
position = self._get_insert_position(position)
self._shift_positions(position, count)
@ -253,7 +253,7 @@ class RSForm:
for (value, _) in CstType.choices:
indices[value] = self.get_max_index(cast(CstType, value))
mapping: dict[str, str] = {}
mapping: dict[str, str] = initial_mapping.copy() if initial_mapping else {}
for cst in items:
indices[cst.cst_type] = indices[cst.cst_type] + 1
newAlias = f'{get_type_prefix(cst.cst_type)}{indices[cst.cst_type]}'
@ -344,9 +344,23 @@ class RSForm:
mapping[cst.alias] = alias
return mapping
def change_cst_type(self, target: int, new_type: CstType) -> bool:
''' Change type of constituenta generating alias automatically. '''
self.cache.ensure_loaded()
cst = self.cache.by_id.get(target)
if cst is None:
return False
newAlias = f'{get_type_prefix(new_type)}{self.get_max_index(new_type) + 1}'
mapping = {cst.alias: newAlias}
cst.cst_type = new_type
cst.alias = newAlias
cst.save(update_fields=['cst_type', 'alias'])
self.apply_mapping(mapping)
return True
def apply_mapping(self, mapping: dict[str, str], change_aliases: bool = False) -> None:
''' Apply rename mapping. '''
self.cache.ensure()
self.cache.ensure_loaded()
update_list: list[Constituenta] = []
for cst in self.cache.constituents:
if cst.apply_mapping(mapping, change_aliases):
@ -356,7 +370,7 @@ class RSForm:
def resolve_all_text(self) -> None:
''' Trigger reference resolution for all texts. '''
self.cache.ensure()
self.cache.ensure_loaded()
graph_terms = self._graph_term()
resolver = Resolver({})
update_list: list[Constituenta] = []
@ -395,7 +409,7 @@ class RSForm:
return []
position = target.order + 1
self.cache.ensure()
self.cache.ensure_loaded()
self._shift_positions(position, count_new)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
@ -432,10 +446,10 @@ class RSForm:
Constituenta.objects.bulk_update(update_list, ['order'])
def _get_insert_position(self, position: int) -> int:
if position <= 0 and position != _INSERT_LAST:
if position <= 0 and position != INSERT_LAST:
raise ValidationError(msg.invalidPosition())
lastPosition = self.constituents().count()
if position == _INSERT_LAST:
if position == INSERT_LAST:
position = lastPosition + 1
else:
position = max(1, min(position, lastPosition + 1))
@ -458,7 +472,7 @@ class RSForm:
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
self.cache.ensure()
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
@ -471,7 +485,7 @@ class RSForm:
def _graph_term(self) -> Graph[int]:
''' Graph based on term texts. '''
self.cache.ensure()
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
@ -484,7 +498,7 @@ class RSForm:
def _graph_text(self) -> Graph[int]:
''' Graph based on definition texts. '''
self.cache.ensure()
self.cache.ensure_loaded()
result: Graph[int] = Graph()
for cst in self.cache.constituents:
result.add_node(cst.pk)
@ -500,7 +514,7 @@ class SemanticInfo:
''' Semantic information derived from constituents. '''
def __init__(self, schema: RSForm):
schema.cache.ensure()
schema.cache.ensure_loaded()
self._graph = schema._graph_formal()
self._items = schema.cache.constituents
self._cst_by_ID = schema.cache.by_id

View File

@ -1,4 +1,4 @@
''' Django: Models. '''
from .Constituenta import Constituenta, CstType
from .RSForm import RSForm
from .Constituenta import Constituenta, CstType, extract_globals
from .RSForm import INSERT_LAST, RSForm

View File

@ -2,7 +2,7 @@
import json
import re
from enum import IntEnum, unique
from typing import Set, Tuple, cast
from typing import Tuple, cast
import pyconcept
@ -10,7 +10,6 @@ from shared import messages as msg
from .Constituenta import CstType
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line
_RE_TEMPLATE = r'R\d+'
_RE_COMPLEX_SYMBOLS = r'[∀∃×ℬ;|:]'
@ -67,11 +66,6 @@ def is_functional(cst_type: CstType) -> bool:
]
def extract_globals(expression: str) -> Set[str]:
''' Extract all global aliases from expression. '''
return set(re.findall(_RE_GLOBALS, expression))
def guess_type(alias: str) -> CstType:
''' Get CstType for alias. '''
prefix = alias[0]

View File

@ -121,7 +121,7 @@ class RSFormSerializer(serializers.ModelSerializer):
for link in Inheritance.objects.filter(Q(child__schema=instance) | Q(parent__schema=instance)):
result['inheritance'].append([link.child.pk, link.parent.pk])
result['oss'] = []
for oss in LibraryItem.objects.filter(items__result=instance).only('alias'):
for oss in LibraryItem.objects.filter(operations__result=instance).only('alias'):
result['oss'].append({
'id': oss.pk,
'alias': oss.alias
@ -246,7 +246,7 @@ class CstTargetSerializer(serializers.Serializer):
class CstRenameSerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. '''
target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'schema'))
target = PKField(many=False, queryset=Constituenta.objects.only('alias', 'cst_type', 'schema'))
alias = serializers.CharField()
cst_type = serializers.CharField()

View File

@ -58,3 +58,28 @@ class TestConstituenta(TestCase):
self.assertEqual(cst.term_forms, [])
self.assertEqual(cst.definition_resolved, '')
self.assertEqual(cst.definition_raw, '')
def test_extract_references(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 X2',
term_raw='@{X3|sing} is a @{X4|sing}',
definition_raw='@{X5|sing}'
)
self.assertEqual(cst.extract_references(), set(['X1', 'X2', 'X3', 'X4', 'X5']))
def text_apply_mapping(self):
cst = Constituenta.objects.create(
alias='X1',
order=1,
schema=self.schema1.model,
definition_formal='X1 = X2',
term_raw='@{X1|sing}',
definition_raw='@{X2|sing}'
)
cst.apply_mapping({'X1': 'X3', 'X2': 'X4'})
self.assertEqual(cst.definition_formal, 'X3 = X4')
self.assertEqual(cst.term_raw, '@{X3|sing}')
self.assertEqual(cst.definition_raw, '@{X4|sing}')

View File

@ -183,9 +183,10 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/create-cst', method='post')
def test_create_constituenta(self):
data = {'alias': 'X3'}
data = {'alias': 'X3', 'cst_type': CstType.BASE}
self.executeForbidden(data=data, item=self.unowned_id)
data = {'alias': 'X3'}
self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id)

View File

@ -16,6 +16,7 @@ from rest_framework.serializers import ValidationError
from apps.library.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead
from apps.library.serializers import LibraryItemSerializer
from apps.oss.models import ChangeManager
from apps.users.models import User
from shared import messages as msg
from shared import permissions, utility
@ -76,7 +77,6 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['post'], url_path='create-cst')
def create_cst(self, request: Request, pk) -> HttpResponse:
''' Create new constituenta. '''
schema = self._get_item()
serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
@ -86,13 +86,18 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
insert_after = data['insert_after']
with transaction.atomic():
new_cst = m.RSForm(schema).create_cst(data, insert_after)
schema = m.RSForm(self._get_item())
new_cst = schema.create_cst(data, insert_after)
hosts = LibraryItem.objects.filter(operations__result=schema.model)
for host in hosts:
ChangeManager(host).on_create_cst(new_cst, schema)
return Response(
status=c.HTTP_201_CREATED,
data={
'new_cst': s.CstSerializer(new_cst).data,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(schema.model).data
}
)
@ -119,7 +124,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
raise ValidationError({
'schema': msg.constituentaNotInRSform(schema.title)
})
serializer.update(instance=cst, validated_data=serializer.validated_data)
with transaction.atomic():
serializer.update(instance=cst, validated_data=serializer.validated_data)
# 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'
return Response(
status=c.HTTP_200_OK,
@ -140,9 +148,9 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='produce-structure')
def produce_structure(self, request: Request, pk) -> HttpResponse:
''' Produce a term for every element of the target constituenta typification. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstTargetSerializer(data=request.data, context={'schema': schema})
serializer = s.CstTargetSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
if cst.cst_type not in [m.CstType.FUNCTION, m.CstType.STRUCTURED, m.CstType.TERM]:
@ -150,7 +158,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
f'{cst.pk}': msg.constituentaNoStructure()
})
schema_details = s.RSFormParseSerializer(schema).data['items']
schema_details = s.RSFormParseSerializer(model).data['items']
cst_parse = next(item for item in schema_details if item['id'] == cst.pk)['parse']
if not cst_parse['typification']:
return Response(
@ -158,12 +166,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
data={f'{cst.pk}': msg.constituentaNoStructure()}
)
with transaction.atomic():
result = m.RSForm(schema).produce_structure(cst, cst_parse)
result = m.RSForm(model).produce_structure(cst, cst_parse)
return Response(
status=c.HTTP_200_OK,
data={
'cst_list': result,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(model).data
}
)
@ -181,26 +189,31 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='rename-cst')
def rename_cst(self, request: Request, pk) -> HttpResponse:
''' Rename constituenta possibly changing type. '''
schema = self._get_item()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': schema})
model = self._get_item()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': model})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.validated_data['target'])
changed_type = cst.cst_type != serializer.validated_data['cst_type']
mapping = {cst.alias: serializer.validated_data['alias']}
cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type']
schema = m.RSForm(model)
with transaction.atomic():
cst.save()
m.RSForm(schema).apply_mapping(mapping=mapping, change_aliases=False)
schema.apply_mapping(mapping=mapping, change_aliases=False)
cst.refresh_from_db()
if changed_type:
hosts = LibraryItem.objects.filter(operations__result=model)
for host in hosts:
ChangeManager(host).on_change_cst_type(cst, schema)
schema.refresh_from_db()
cst.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data={
'new_cst': s.CstSerializer(cst).data,
'schema': s.RSFormParseSerializer(schema).data
'schema': s.RSFormParseSerializer(model).data
}
)
@ -218,10 +231,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='substitute')
def substitute(self, request: Request, pk) -> HttpResponse:
''' Substitute occurrences of constituenta with another one. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstSubstituteSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
@ -231,12 +244,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
substitutions.append((original, replacement))
m.RSForm(schema).substitute(substitutions)
m.RSForm(model).substitute(substitutions)
schema.refresh_from_db()
model.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -253,17 +266,17 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='delete-multiple-cst')
def delete_multiple_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Delete multiple constituents. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstListSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
m.RSForm(schema).delete_cst(serializer.validated_data['items'])
m.RSForm(model).delete_cst(serializer.validated_data['items'])
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -280,20 +293,20 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='move-cst')
def move_cst(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Move multiple constituents. '''
schema = self._get_item()
model = self._get_item()
serializer = s.CstMoveSerializer(
data=request.data,
context={'schema': schema}
context={'schema': model}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
m.RSForm(schema).move_cst(
m.RSForm(model).move_cst(
target=serializer.validated_data['items'],
destination=serializer.validated_data['move_to']
)
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -309,11 +322,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='reset-aliases')
def reset_aliases(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Recreate all aliases based on order. '''
schema = self._get_item()
m.RSForm(schema).reset_aliases()
model = self._get_item()
m.RSForm(model).reset_aliases()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -329,11 +342,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['patch'], url_path='restore-order')
def restore_order(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Restore order based on types and term graph. '''
schema = self._get_item()
m.RSForm(schema).restore_order()
model = self._get_item()
m.RSForm(model).restore_order()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema).data
data=s.RSFormParseSerializer(model).data
)
@extend_schema(
@ -353,10 +366,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
input_serializer = s.RSFormUploadSerializer(data=request.data)
input_serializer.is_valid(raise_exception=True)
schema = self._get_item()
model = self._get_item()
load_metadata = input_serializer.validated_data['load_metadata']
data = utility.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
data['id'] = schema.pk
data['id'] = model.pk
serializer = s.RSFormTRSSerializer(
data=data,
@ -461,10 +474,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@action(detail=True, methods=['get'], url_path='export-trs')
def export_trs(self, request: Request, pk) -> HttpResponse:
''' Endpoint: Download Exteor compatible file. '''
schema = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(schema)).data
model = self._get_item()
data = s.RSFormTRSSerializer(m.RSForm(model)).data
file = utility.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
filename = utils.filename_for_schema(schema.alias)
filename = utils.filename_for_schema(model.alias)
response = HttpResponse(file, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename={filename}'
return response

View File

@ -25,7 +25,7 @@ function TextArea({
<div
className={clsx(
{
'flex flex-col flex-grow gap-2': !dense,
'flex flex-col gap-2': !dense,
'flex flex-grow items-center gap-3': dense
},
dense && className

View File

@ -155,10 +155,7 @@ export interface ICstMovetoData extends IConstituentaList {
export interface ICstUpdateData
extends Pick<IConstituentaMeta, 'id'>,
Partial<
Pick<
IConstituentaMeta,
'alias' | 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'
>
Pick<IConstituentaMeta, 'convention' | 'definition_formal' | 'definition_raw' | 'term_raw' | 'term_forms'>
> {}
/**

View File

@ -55,7 +55,6 @@ function FormConstituenta({
}: FormConstituentaProps) {
const { schema, cstUpdate, processing } = useRSForm();
const [alias, setAlias] = useState('');
const [term, setTerm] = useState('');
const [textDefinition, setTextDefinition] = useState('');
const [expression, setExpression] = useState('');
@ -98,7 +97,6 @@ function FormConstituenta({
useLayoutEffect(() => {
if (state) {
setAlias(state.alias);
setConvention(state.convention || '');
setTerm(state.term_raw || '');
setTextDefinition(state.definition_raw || '');
@ -117,7 +115,6 @@ function FormConstituenta({
}
const data: ICstUpdateData = {
id: state.id,
alias: alias,
term_raw: term,
definition_formal: expression,
definition_raw: textDefinition,