Add backend for ProduceStructure function

This commit is contained in:
IRBorisov 2024-03-14 20:01:19 +03:00
parent cf47d90822
commit 4d0ba713f3
9 changed files with 385 additions and 83 deletions

View File

@ -19,6 +19,9 @@ def renameTaken(name: str):
def pyconceptFailure():
return 'Invalid data response from pyconcept'
def typificationInvalidStr():
return 'Invalid typification string'
def libraryTypeUnexpected():
return 'Attempting to use invalid adaptor for non-RSForm item'
@ -27,3 +30,6 @@ def exteorFileVersionNotSupported():
def positionNegative():
return 'Invalid position: should be positive integer'
def constituentaNoStructure():
return 'Указанная конституента не обладает теоретико-множественной типизацией'

View File

@ -3,11 +3,12 @@ import re
from django.db.models import (
CASCADE, ForeignKey, Model, PositiveIntegerField,
TextChoices, TextField, CharField, JSONField
TextField, CharField, JSONField
)
from django.core.validators import MinValueValidator
from django.urls import reverse
from .api_RSLanguage import CstType
from ..utils import apply_pattern
@ -15,18 +16,6 @@ _REF_ENTITY_PATTERN = re.compile(r'@{([^0-9\-].*?)\|.*?}')
_GLOBAL_ID_PATTERN = re.compile(r'([XCSADFPT][0-9]+)') # cspell:disable-line
class CstType(TextChoices):
''' Type of constituenta '''
BASE = 'basic'
CONSTANT = 'constant'
STRUCTURED = 'structure'
AXIOM = 'axiom'
TERM = 'term'
FUNCTION = 'function'
PREDICATE = 'predicate'
THEOREM = 'theorem'
def _empty_forms():
return []

View File

@ -6,6 +6,7 @@ from django.db.models import QuerySet
from django.core.exceptions import ValidationError
from cctext import Resolver, Entity, extract_entities, split_grams, TermForm
from .api_RSLanguage import get_type_prefix, generate_structure
from .LibraryItem import LibraryItem, LibraryItemType
from .Constituenta import CstType, Constituenta
from .Version import Version
@ -14,27 +15,6 @@ from ..graph import Graph
from .. import messages as msg
def _get_type_prefix(cst_type: CstType) -> str:
''' Get alias prefix. '''
if cst_type == CstType.BASE:
return 'X'
if cst_type == CstType.CONSTANT:
return 'C'
if cst_type == CstType.STRUCTURED:
return 'S'
if cst_type == CstType.AXIOM:
return 'A'
if cst_type == CstType.TERM:
return 'D'
if cst_type == CstType.FUNCTION:
return 'F'
if cst_type == CstType.PREDICATE:
return 'P'
if cst_type == CstType.THEOREM:
return 'T'
return 'X'
class RSForm:
''' RSForm is math form of conceptual schema. '''
def __init__(self, item: LibraryItem):
@ -95,6 +75,17 @@ class RSForm:
cst.definition_resolved = resolved
cst.save()
def get_max_index(self, cst_type: CstType) -> int:
''' Get maximum alias index for specific CstType '''
result: int = 1
items = Constituenta.objects \
.filter(schema=self.item, cst_type=cst_type) \
.order_by('-alias') \
.values_list('alias', flat=True)
for alias in items:
result = max(result, int(alias[1:]))
return result
@transaction.atomic
def insert_at(self, position: int, alias: str, insert_type: CstType) -> 'Constituenta':
''' Insert new constituenta at given position.
@ -105,13 +96,7 @@ class RSForm:
raise ValidationError(msg.renameTaken(alias))
currentSize = self.constituents().count()
position = max(1, min(position, currentSize + 1))
update_list = \
Constituenta.objects \
.only('id', 'order', 'schema') \
.filter(schema=self.item, order__gte=position)
for cst in update_list:
cst.order += 1
Constituenta.objects.bulk_update(update_list, ['order'])
self._shift_positions(position, 1)
result = Constituenta.objects.create(
schema=self.item,
@ -225,7 +210,7 @@ class RSForm:
bases[cst_type] = 1
cst_list = self.constituents().order_by('order')
for cst in cst_list:
alias = f'{_get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
alias = f'{get_type_prefix(cst.cst_type)}{bases[cst.cst_type]}'
bases[cst.cst_type] += 1
if cst.alias != alias:
mapping[cst.alias] = alias
@ -267,6 +252,50 @@ class RSForm:
data=data
)
@transaction.atomic
def produce_structure(self, target: Constituenta, parse: dict) -> list[int]:
''' Add constituents for each structural element of the target. '''
expressions = generate_structure(
alias=target.alias,
expression=target.definition_formal,
parse=parse
)
count_new = len(expressions)
if count_new == 0:
return []
position = target.order + 1
self._shift_positions(position, count_new)
result = []
cst_type = CstType.TERM if len(parse['args']) == 0 else CstType.FUNCTION
free_index = self.get_max_index(cst_type) + 1
prefix = get_type_prefix(cst_type)
for text in expressions:
new_item = Constituenta.objects.create(
schema=self.item,
order=position,
alias=f'{prefix}{free_index}',
definition_formal=text,
cst_type=cst_type
)
result.append(new_item.id)
free_index = free_index + 1
position = position + 1
self.item.save()
return result
def _shift_positions(self, start: int, shift: int):
if shift == 0:
return
update_list = \
Constituenta.objects \
.only('id', 'order', 'schema') \
.filter(schema=self.item, order__gte=start)
for cst in update_list:
cst.order += shift
Constituenta.objects.bulk_update(update_list, ['order'])
@transaction.atomic
def _reset_order(self):
order = 1

View File

@ -0,0 +1,120 @@
''' Models: Definitions and utility function for RSLanguage. '''
import json
from typing import Tuple
from enum import IntEnum , unique
from django.db.models import TextChoices
import pyconcept
from .. import messages as msg
@unique
class TokenType(IntEnum):
''' Some of grammar token types. Full list seek in frontend / pyconcept '''
ID_GLOBAL = 259
ID_RADICAL = 262
DECART = 287
BOOLEAN = 292
BIGPR = 293
SMALLPR = 294
REDUCE = 299
class CstType(TextChoices):
''' Type of constituenta '''
BASE = 'basic'
CONSTANT = 'constant'
STRUCTURED = 'structure'
AXIOM = 'axiom'
TERM = 'term'
FUNCTION = 'function'
PREDICATE = 'predicate'
THEOREM = 'theorem'
def get_type_prefix(cst_type: CstType) -> str:
''' Get alias prefix. '''
if cst_type == CstType.BASE:
return 'X'
if cst_type == CstType.CONSTANT:
return 'C'
if cst_type == CstType.STRUCTURED:
return 'S'
if cst_type == CstType.AXIOM:
return 'A'
if cst_type == CstType.TERM:
return 'D'
if cst_type == CstType.FUNCTION:
return 'F'
if cst_type == CstType.PREDICATE:
return 'P'
if cst_type == CstType.THEOREM:
return 'T'
return 'X'
def _get_structure_prefix(alias: str, expression: str, parse: dict) -> Tuple[str, str]:
''' Generate prefix and alias for structure generation. '''
args = parse['args']
if len(args) == 0:
return (alias, '')
prefix = expression[0:expression.find(']')] + '] '
newAlias = alias + '[' + ','.join([arg['alias'] for arg in args]) + ']'
return (newAlias, prefix)
def generate_structure(alias: str, expression: str, parse: dict) -> list:
''' Generate list of expressions for target structure. '''
ast = json.loads(pyconcept.parse_expression(parse['typification']))['ast']
if len(ast) == 0:
raise ValueError(msg.typificationInvalidStr())
if len(ast) == 1:
return []
(link, prefix) = _get_structure_prefix(alias, expression, parse)
generated: list = []
arity: list = [1] * len(ast)
for (n, item) in enumerate(ast):
if n == 0:
generated.append({
'text': link, # generated text
'operation': None, # applied operation. None if text should be skipped
'is_boolean': False # is the result of operation has an additional boolean
})
continue
parent_index = item['parent']
parent_type = ast[parent_index]['typeID']
parent_text = generated[parent_index]['text']
parent_is_boolean = generated[parent_index]['is_boolean']
assert(parent_type in [TokenType.BOOLEAN, TokenType.DECART])
if parent_is_boolean:
if parent_type == TokenType.BOOLEAN:
generated.append({
'text': f'red({parent_text})',
'operation': TokenType.REDUCE,
'is_boolean': True
})
if parent_type == TokenType.DECART:
generated.append({
'text': f'Pr{arity[parent_index]}({parent_text})',
'operation': TokenType.BIGPR,
'is_boolean': True
})
arity[parent_index] = arity[parent_index] + 1
else:
if parent_type == TokenType.BOOLEAN:
generated.append({
'text': parent_text,
'operation': None,
'is_boolean': True
})
if parent_type == TokenType.DECART:
generated.append({
'text': f'pr{arity[parent_index]}({parent_text})',
'operation': TokenType.SMALLPR,
'is_boolean': False
})
arity[parent_index] = arity[parent_index] + 1
return [prefix + item['text'] for item in generated if item['operation'] is not None]

View File

@ -16,12 +16,22 @@ from .data_access import (
VersionSerializer,
VersionCreateSerializer,
ConstituentaSerializer,
CstStructuredSerializer,
CstMoveSerializer,
CstSubstituteSerializer,
CstCreateSerializer,
CstRenameSerializer,
CstListSerializer
)
from .schema_typing import (NewCstResponse, NewVersionResponse, ResultTextResponse)
from .schema_typing import (
NewCstResponse,
NewMultiCstResponse,
NewVersionResponse,
ResultTextResponse
)
from .io_pyconcept import PyConceptAdapter
from .io_files import (FileSerializer, RSFormUploadSerializer, RSFormTRSSerializer)
from .io_files import (
FileSerializer,
RSFormUploadSerializer,
RSFormTRSSerializer
)

View File

@ -6,10 +6,24 @@ from .basics import ConstituentaID, CstParseSerializer
from .io_pyconcept import PyConceptAdapter
from ..models import Constituenta, LibraryItem, RSForm, Version
from ..models import Constituenta, LibraryItem, RSForm, Version, CstType
from .. import messages as msg
def _try_access_constituenta(item_id: str, schema: LibraryItem) -> Constituenta:
try:
cst = Constituenta.objects.get(pk=item_id)
except Constituenta.DoesNotExist as exception:
raise serializers.ValidationError({
f'{item_id}': msg.constituentaNotExists()
}) from exception
if cst.schema != schema:
raise serializers.ValidationError({
f'{item_id}': msg.constituentaNotOwned(schema.title)
})
return cst
class LibraryItemSerializer(serializers.ModelSerializer):
''' Serializer: LibraryItem entry. '''
class Meta:
@ -105,6 +119,24 @@ class CstCreateSerializer(serializers.ModelSerializer):
'insert_after', 'term_forms'
class CstStructuredSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta structure production. '''
class Meta:
''' serializer metadata. '''
model = Constituenta
fields = ('id',)
def validate(self, attrs):
schema = cast(LibraryItem, self.context['schema'])
cst = _try_access_constituenta(self.initial_data['id'], schema)
if cst.cst_type not in [CstType.FUNCTION, CstType.STRUCTURED, CstType.TERM]:
raise serializers.ValidationError({
f'{cst.id}': msg.constituentaNoStructure()
})
self.instance = cst
return attrs
class CstRenameSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta renaming. '''
class Meta:
@ -113,23 +145,19 @@ class CstRenameSerializer(serializers.ModelSerializer):
fields = 'id', 'alias', 'cst_type'
def validate(self, attrs):
schema = cast(RSForm, self.context['schema'])
old_cst = Constituenta.objects.get(pk=self.initial_data['id'])
schema = cast(LibraryItem, self.context['schema'])
cst = _try_access_constituenta(self.initial_data['id'], schema)
new_alias = self.initial_data['alias']
if old_cst.schema != schema.item:
raise serializers.ValidationError({
'id': msg.constituentaNotOwned(schema.item.title)
})
if old_cst.alias == new_alias:
if cst.alias == new_alias:
raise serializers.ValidationError({
'alias': msg.renameTrivial(new_alias)
})
if schema.constituents().filter(alias=new_alias).exists():
if RSForm(schema).constituents().filter(alias=new_alias).exists():
raise serializers.ValidationError({
'alias': msg.renameTaken(new_alias)
})
self.instance = old_cst
attrs['schema'] = schema.item
self.instance = cst
attrs['schema'] = schema
attrs['id'] = self.initial_data['id']
return attrs
@ -227,20 +255,20 @@ class CstSubstituteSerializer(serializers.Serializer):
transfer_term = serializers.BooleanField(required=False, default=False)
def validate(self, attrs):
schema = cast(RSForm, self.context['schema'])
schema = cast(LibraryItem, self.context['schema'])
original_cst = Constituenta.objects.get(pk=self.initial_data['original'])
substitution_cst = Constituenta.objects.get(pk=self.initial_data['substitution'])
if original_cst.alias == substitution_cst.alias:
raise serializers.ValidationError({
'alias': msg.substituteTrivial(original_cst.alias)
})
if original_cst.schema != schema.item:
if original_cst.schema != schema:
raise serializers.ValidationError({
'original': msg.constituentaNotOwned(schema.item.title)
'original': msg.constituentaNotOwned(schema.title)
})
if substitution_cst.schema != schema.item:
if substitution_cst.schema != schema:
raise serializers.ValidationError({
'substitution': msg.constituentaNotOwned(schema.item.title)
'substitution': msg.constituentaNotOwned(schema.title)
})
attrs['original'] = original_cst
attrs['substitution'] = substitution_cst
@ -255,19 +283,10 @@ class CstListSerializer(serializers.Serializer):
)
def validate(self, attrs):
schema = self.context['schema']
schema = cast(LibraryItem, self.context['schema'])
cstList = []
for item in attrs['items']:
try:
cst = Constituenta.objects.get(pk=item)
except Constituenta.DoesNotExist as exception:
raise serializers.ValidationError({
f'{item}': msg.constituentaNotExists
}) from exception
if cst.schema != schema.item:
raise serializers.ValidationError({
f'{item}': msg.constituentaNotOwned(schema.item.title)
})
cst = _try_access_constituenta(item, schema)
cstList.append(cst)
attrs['constituents'] = cstList
return attrs

View File

@ -1,7 +1,7 @@
''' Utility serializers for REST API schema - SHOULD NOT BE ACCESSED DIRECTLY. '''
from rest_framework import serializers
from .data_access import ConstituentaSerializer, RSFormParseSerializer
from .data_access import RSFormParseSerializer
class ResultTextResponse(serializers.Serializer):
''' Serializer: Text result of a function call. '''
@ -10,7 +10,14 @@ class ResultTextResponse(serializers.Serializer):
class NewCstResponse(serializers.Serializer):
''' Serializer: Create cst response. '''
new_cst = ConstituentaSerializer()
new_cst = serializers.IntegerField()
schema = RSFormParseSerializer()
class NewMultiCstResponse(serializers.Serializer):
''' Serializer: Create multiple cst response. '''
cst_list = serializers.ListField(
child=serializers.IntegerField()
)
schema = RSFormParseSerializer()
class NewVersionResponse(serializers.Serializer):

View File

@ -384,7 +384,7 @@ class TestRSFormViewset(APITestCase):
)
x2.refresh_from_db()
schema.item.refresh_from_db()
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['items']), 1)
self.assertEqual(schema.constituents().count(), 1)
self.assertEqual(x2.alias, 'X2')
@ -498,3 +498,88 @@ class TestRSFormViewset(APITestCase):
self.assertEqual(response.data['items'][0]['term_resolved'], x1.term_resolved)
self.assertEqual(response.data['items'][1]['term_raw'], d1.term_raw)
self.assertEqual(response.data['items'][1]['term_resolved'], d1.term_resolved)
def test_produce_structure(self):
item = self.owned.item
x1 = Constituenta.objects.create(schema=item, alias='X1', cst_type='basic', order=1)
s1 = Constituenta.objects.create(schema=item, alias='S1', cst_type='structure', order=2)
s2 = Constituenta.objects.create(schema=item, alias='S2', cst_type='structure', order=3)
s3 = Constituenta.objects.create(schema=item, alias='S3', cst_type='structure', order=4)
a1 = Constituenta.objects.create(schema=item, alias='A1', cst_type='axiom', order=5)
f1 = Constituenta.objects.create(schema=item, alias='F10', cst_type='function', order=6)
invalid_id = f1.id + 1
s1.definition_formal = '(X1×X1)' # (X1×X1)
s2.definition_formal = 'invalid'
s3.definition_formal = 'X1×(X1×(X1))×(X1×X1)'
a1.definition_formal = '1=1'
f1.definition_formal = '[α∈X1, β∈X1] Fi1[{α,β}](S1)'
s1.save()
s2.save()
s3.save()
a1.save()
f1.save()
data = {'id': invalid_id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
data = {'id': x1.id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
data = {'id': s2.id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Testing simple structure
s1.refresh_from_db()
data = {'id': s1.id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
result = response.data['schema']
items = [item for item in result['items'] if item['id'] in response.data['cst_list']]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['order'], s1.order + 1)
self.assertEqual(items[0]['definition_formal'], 'Pr1(S1)')
self.assertEqual(items[1]['order'], s1.order + 2)
self.assertEqual(items[1]['definition_formal'], 'Pr2(S1)')
# Testing complex structure
s3.refresh_from_db()
data = {'id': s3.id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
result = response.data['schema']
items = [item for item in result['items'] if item['id'] in response.data['cst_list']]
self.assertEqual(len(items), 8)
self.assertEqual(items[0]['order'], s3.order + 1)
self.assertEqual(items[0]['definition_formal'], 'pr1(S3)')
# Testing function
f1.refresh_from_db()
data = {'id': f1.id}
response = self.client.patch(
f'/api/rsforms/{item.id}/cst-produce-structure',
data=data, format='json'
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
result = response.data['schema']
items = [item for item in result['items'] if item['id'] in response.data['cst_list']]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['order'], f1.order + 1)
self.assertEqual(items[0]['definition_formal'], '[α∈X1, β∈X1] Pr1(F10[α,β])')

View File

@ -11,9 +11,11 @@ from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import status as c
import pyconcept
from .. import models as m
from .. import serializers as s
from .. import utils
from .. import messages as msg
@extend_schema(tags=['RSForm'])
@ -63,6 +65,38 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
response['Location'] = new_cst.get_absolute_url()
return response
@extend_schema(
summary='produce the structure of a given constituenta',
tags=['RSForm'],
request=s.CstStructuredSerializer,
responses={c.HTTP_200_OK: s.NewMultiCstResponse}
)
@action(detail=True, methods=['patch'], url_path='cst-produce-structure')
def produce_structure(self, request: Request, pk):
''' Produce a term for every element of the target constituenta typification. '''
schema = self._get_schema()
serializer = s.CstStructuredSerializer(data=request.data, context={'schema': schema.item})
serializer.is_valid(raise_exception=True)
cst = cast(m.Constituenta, serializer.instance)
schema_details = s.RSFormParseSerializer(schema.item).data['items']
cst_parse = next(item for item in schema_details if item['id']==cst.id)['parse']
if not cst_parse['typification']:
return Response(
status=c.HTTP_400_BAD_REQUEST,
data={f'{cst.id}': msg.constituentaNoStructure()}
)
result = schema.produce_structure(cst, cst_parse)
return Response(
status=c.HTTP_200_OK,
data={
'cst_list': result,
'schema': s.RSFormParseSerializer(schema.item).data
}
)
@extend_schema(
summary='rename constituenta',
tags=['Constituenta'],
@ -74,7 +108,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def cst_rename(self, request: Request, pk):
''' Rename constituenta possibly changing type. '''
schema = self._get_schema()
serializer = s.CstRenameSerializer(data=request.data, context={'schema': schema})
serializer = s.CstRenameSerializer(data=request.data, context={'schema': schema.item})
serializer.is_valid(raise_exception=True)
old_alias = m.Constituenta.objects.get(pk=request.data['id']).alias
serializer.save()
@ -92,7 +126,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@extend_schema(
summary='substitute constituenta',
tags=['Constituenta'],
tags=['RSForm'],
request=s.CstSubstituteSerializer,
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@ -101,7 +135,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def cst_substitute(self, request: Request, pk):
''' Substitute occurrences of constituenta with another one. '''
schema = self._get_schema()
serializer = s.CstSubstituteSerializer(data=request.data, context={'schema': schema})
serializer = s.CstSubstituteSerializer(
data=request.data,
context={'schema': schema.item}
)
serializer.is_valid(raise_exception=True)
schema.substitute(
original=serializer.validated_data['original'],
@ -116,9 +153,9 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
@extend_schema(
summary='delete constituents',
tags=['Constituenta'],
tags=['RSForm'],
request=s.CstListSerializer,
responses={c.HTTP_202_ACCEPTED: s.RSFormParseSerializer}
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@action(detail=True, methods=['patch'], url_path='cst-delete-multiple')
def cst_delete_multiple(self, request: Request, pk):
@ -126,19 +163,19 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
schema = self._get_schema()
serializer = s.CstListSerializer(
data=request.data,
context={'schema': schema}
context={'schema': schema.item}
)
serializer.is_valid(raise_exception=True)
schema.delete_cst(serializer.validated_data['constituents'])
schema.item.refresh_from_db()
return Response(
status=c.HTTP_202_ACCEPTED,
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data
)
@extend_schema(
summary='move constituenta',
tags=['Constituenta'],
tags=['RSForm'],
request=s.CstMoveSerializer,
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@ -148,7 +185,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
schema = self._get_schema()
serializer = s.CstMoveSerializer(
data=request.data,
context={'schema': schema}
context={'schema': schema.item}
)
serializer.is_valid(raise_exception=True)
schema.move_cst(