Compare commits

...

5 Commits

Author SHA1 Message Date
Ivan
a256582a65 Implementing oss backend pt2
Some checks are pending
Backend CI / build (3.12) (push) Waiting to run
2024-07-19 19:28:55 +03:00
Ivan
86033ccd46 Update RunServer.ps1 2024-07-19 11:19:40 +03:00
Ivan
da7113ce7e Refactoring: preparing backend for oss pt1 2024-07-17 12:56:01 +03:00
Ivan
aed899ba70 Refactor RSForm pt2 2024-07-16 12:17:40 +03:00
Ivan
e9f77057d9 Refactor RSForm backend and tests 2024-07-15 12:35:47 +03:00
59 changed files with 1297 additions and 305 deletions

View File

@ -11,7 +11,9 @@
"--multi-line",
"3",
"--project",
"apps"
"apps",
"--project",
"shared"
],
"autopep8.args": [
"--max-line-length",

View File

@ -69,6 +69,7 @@ This readme file is used mostly to document project dependencies
- Backticks
- Svg Preview
- TODO Highlight v2
- Prettier
</pre>
</details>
<details>
@ -114,8 +115,10 @@ This readme file is used mostly to document project dependencies
<pre>
- Pylance
- Pylint
- Django
- autopep8
- isort
- Django
- SQLite
</pre>
</details>

View File

View File

@ -0,0 +1,14 @@
''' Admin view: OperationSchema. '''
from django.contrib import admin
from . import models
class OperationAdmin(admin.ModelAdmin):
''' Admin model: Operation. '''
ordering = ['oss']
list_display = ['oss', 'operation_type', 'result', 'alias', 'title', 'comment', 'position_x', 'position_y']
search_fields = ['operation_type', 'title', 'alias']
admin.site.register(models.Operation, OperationAdmin)

View File

@ -0,0 +1,8 @@
''' Application: Operation Schema. '''
from django.apps import AppConfig
class RsformConfig(AppConfig):
''' Application config. '''
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.oss'

View File

@ -0,0 +1,69 @@
# Generated by Django 5.0.7 on 2024-07-17 09:51
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('rsform', '0008_alter_libraryitem_item_type'),
]
operations = [
migrations.CreateModel(
name='Operation',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('operation_type', models.CharField(choices=[
('input', 'Input'), ('synthesis', 'Synthesis')], default='input', max_length=10, verbose_name='Тип')),
('alias', models.CharField(blank=True, max_length=255, verbose_name='Шифр')),
('title', models.TextField(blank=True, verbose_name='Название')),
('comment', models.TextField(blank=True, verbose_name='Комментарий')),
('position_x', models.FloatField(default=0, verbose_name='Положение по горизонтали')),
('position_y', models.FloatField(default=0, verbose_name='Положение по вертикали')),
('oss', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='items', to='rsform.libraryitem', verbose_name='Схема синтеза')),
('result', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL,
related_name='producer', to='rsform.libraryitem', verbose_name='Связанная КС')),
],
options={
'verbose_name': 'Операция',
'verbose_name_plural': 'Операции',
},
),
migrations.CreateModel(
name='SynthesisSubstitution',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('transfer_term', models.BooleanField(default=False, verbose_name='Перенос термина')),
('operation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
to='oss.operation', verbose_name='Операция')),
('original', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='as_original', to='rsform.constituenta', verbose_name='Удаляемая конституента')),
('substitution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='as_substitute', to='rsform.constituenta', verbose_name='Замещающая конституента')),
],
options={
'verbose_name': 'Отождествление синтеза',
'verbose_name_plural': 'Таблицы отождествлений',
},
),
migrations.CreateModel(
name='Argument',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('argument', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='descendants', to='oss.operation', verbose_name='Аргумент')),
('operation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='arguments', to='oss.operation', verbose_name='Операция')),
],
options={
'verbose_name': 'Аргумент',
'verbose_name_plural': 'Аргументы операций',
'unique_together': {('operation', 'argument')},
},
),
]

View File

@ -0,0 +1,27 @@
''' Models: Operation Argument in OSS. '''
from django.db.models import CASCADE, ForeignKey, Model
class Argument(Model):
''' Operation Argument.'''
operation: ForeignKey = ForeignKey(
verbose_name='Операция',
to='oss.Operation',
on_delete=CASCADE,
related_name='arguments'
)
argument: ForeignKey = ForeignKey(
verbose_name='Аргумент',
to='oss.Operation',
on_delete=CASCADE,
related_name='descendants'
)
class Meta:
''' Model metadata. '''
verbose_name = 'Аргумент'
verbose_name_plural = 'Аргументы операций'
unique_together = [['operation', 'argument']]
def __str__(self) -> str:
return f'{self.argument.pk} -> {self.operation.pk}'

View File

@ -0,0 +1,71 @@
''' Models: Operation in OSS. '''
from django.db.models import (
CASCADE,
SET_NULL,
CharField,
FloatField,
ForeignKey,
Model,
TextChoices,
TextField
)
class OperationType(TextChoices):
''' Type of operation. '''
INPUT = 'input'
SYNTHESIS = 'synthesis'
class Operation(Model):
''' Operational schema Unit.'''
oss: ForeignKey = ForeignKey(
verbose_name='Схема синтеза',
to='rsform.LibraryItem',
on_delete=CASCADE,
related_name='items'
)
operation_type: CharField = CharField(
verbose_name='Тип',
max_length=10,
choices=OperationType.choices,
default=OperationType.INPUT
)
result: ForeignKey = ForeignKey(
verbose_name='Связанная КС',
to='rsform.LibraryItem',
null=True,
on_delete=SET_NULL,
related_name='producer'
)
alias: CharField = CharField(
verbose_name='Шифр',
max_length=255,
blank=True
)
title: TextField = TextField(
verbose_name='Название',
blank=True
)
comment: TextField = TextField(
verbose_name='Комментарий',
blank=True
)
position_x: FloatField = FloatField(
verbose_name='Положение по горизонтали',
default=0
)
position_y: FloatField = FloatField(
verbose_name='Положение по вертикали',
default=0
)
class Meta:
''' Model metadata. '''
verbose_name = 'Операция'
verbose_name_plural = 'Операции'
def __str__(self) -> str:
return f'Операция {self.alias}'

View File

@ -0,0 +1,36 @@
''' Models: SynthesisSubstitution. '''
from django.db.models import CASCADE, BooleanField, ForeignKey, Model
class SynthesisSubstitution(Model):
''' Substitutions as part of Synthesis operation in OSS.'''
operation: ForeignKey = ForeignKey(
verbose_name='Операция',
to='oss.Operation',
on_delete=CASCADE
)
original: ForeignKey = ForeignKey(
verbose_name='Удаляемая конституента',
to='rsform.Constituenta',
on_delete=CASCADE,
related_name='as_original'
)
substitution: ForeignKey = ForeignKey(
verbose_name='Замещающая конституента',
to='rsform.Constituenta',
on_delete=CASCADE,
related_name='as_substitute'
)
transfer_term: BooleanField = BooleanField(
verbose_name='Перенос термина',
default=False
)
class Meta:
''' Model metadata. '''
verbose_name = 'Отождествление синтеза'
verbose_name_plural = 'Таблицы отождествлений'
def __str__(self) -> str:
return f'{self.original.pk} -> {self.substitution.pk}'

View File

@ -0,0 +1,8 @@
''' Django: Models. '''
from apps.rsform.models import LibraryItem, LibraryItemType
from .api_OSS import OperationSchema
from .Argument import Argument
from .Operation import Operation, OperationType
from .SynthesisSubstitution import SynthesisSubstitution

View File

@ -0,0 +1,128 @@
''' Models: OSS API. '''
from typing import Optional
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet
from apps.rsform.models import LibraryItem, LibraryItemType
from shared import messages as msg
from .Argument import Argument
from .Operation import Operation
from .SynthesisSubstitution import SynthesisSubstitution
class OperationSchema:
''' Operations schema API. '''
def __init__(self, item: LibraryItem):
if item.item_type != LibraryItemType.OPERATION_SCHEMA:
raise ValueError(msg.libraryTypeUnexpected())
self.item = item
@staticmethod
def create(**kwargs) -> 'OperationSchema':
item = LibraryItem.objects.create(item_type=LibraryItemType.OPERATION_SCHEMA, **kwargs)
return OperationSchema(item=item)
def operations(self) -> QuerySet[Operation]:
''' Get QuerySet containing all operations of current OSS. '''
return Operation.objects.filter(oss=self.item)
def arguments(self) -> QuerySet[Argument]:
''' Operation arguments. '''
return Argument.objects.filter(operation__oss=self.item)
def substitutions(self) -> QuerySet[SynthesisSubstitution]:
''' Operation substitutions. '''
return SynthesisSubstitution.objects.filter(operation__oss=self.item)
def update_positions(self, data: list[dict]):
''' Update positions. '''
lookup = {x['id']: x for x in data}
operations = self.operations()
for item in operations:
if item.pk in lookup:
item.position_x = lookup[item.pk]['position_x']
item.position_y = lookup[item.pk]['position_y']
Operation.objects.bulk_update(operations, ['position_x', 'position_y'])
@transaction.atomic
def create_operation(self, **kwargs) -> Operation:
''' Insert new operation. '''
if kwargs['alias'] != '' and self.operations().filter(alias=kwargs['alias']).exists():
raise ValidationError(msg.aliasTaken(kwargs['alias']))
result = Operation.objects.create(
oss=self.item,
**kwargs
)
self.item.save()
result.refresh_from_db()
return result
@transaction.atomic
def delete_operation(self, operation: Operation):
''' Delete operation. '''
operation.delete()
# deal with attached schema
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_input(self, target: Operation, schema: Optional[LibraryItem]):
''' Set input schema for operation. '''
if schema == target.result:
return
if schema:
target.result = schema
target.alias = schema.alias
target.title = schema.title
target.comment = schema.comment
else:
target.result = None
target.save()
# trigger on_change effects
self.item.save()
@transaction.atomic
def add_argument(self, operation: Operation, argument: Operation) -> Optional[Argument]:
''' Add Argument to operation. '''
if Argument.objects.filter(operation=operation, argument=argument).exists():
return None
result = Argument.objects.create(operation=operation, argument=argument)
self.item.save()
return result
@transaction.atomic
def clear_arguments(self, target: Operation):
''' Clear all arguments for operation. '''
if not Argument.objects.filter(operation=target).exists():
return
Argument.objects.filter(operation=target).delete()
SynthesisSubstitution.objects.filter(operation=target).delete()
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_substitutions(self, target: Operation, substitutes: list[dict]):
''' Clear all arguments for operation. '''
SynthesisSubstitution.objects.filter(operation=target).delete()
for sub in substitutes:
SynthesisSubstitution.objects.create(
operation=target,
original=sub['original'],
substitution=sub['substitution'],
transfer_term=sub['transfer_term']
)
# trigger on_change effects
self.item.save()

View File

@ -0,0 +1,13 @@
''' REST API: Serializers. '''
from apps.rsform.serializers import LibraryItemSerializer
from .basics import OperationPositionSerializer, PositionsSerializer
from .data_access import (
ArgumentSerializer,
OperationCreateSerializer,
OperationDeleteSerializer,
OperationSchemaSerializer,
OperationSerializer
)
from .schema_typing import NewOperationResponse

View File

@ -0,0 +1,16 @@
''' Basic serializers that do not interact with database. '''
from rest_framework import serializers
class OperationPositionSerializer(serializers.Serializer):
''' Operation position. '''
id = serializers.IntegerField()
position_x = serializers.FloatField()
position_y = serializers.FloatField()
class PositionsSerializer(serializers.Serializer):
''' Operations position for OperationSchema. '''
positions = serializers.ListField(
child=OperationPositionSerializer()
)

View File

@ -0,0 +1,108 @@
''' Serializers for persistent data manipulation. '''
from typing import cast
from django.db.models import F
from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField
from apps.rsform.models import LibraryItem
from apps.rsform.serializers import LibraryItemDetailsSerializer
from shared import messages as msg
from ..models import Argument, Operation, OperationSchema, OperationType
from .basics import OperationPositionSerializer
class OperationSerializer(serializers.ModelSerializer):
''' Serializer: Operation data. '''
class Meta:
''' serializer metadata. '''
model = Operation
fields = '__all__'
read_only_fields = ('id', 'oss')
class ArgumentSerializer(serializers.ModelSerializer):
''' Serializer: Operation data. '''
class Meta:
''' serializer metadata. '''
model = Argument
fields = ('operation', 'argument')
class OperationCreateSerializer(serializers.Serializer):
''' Serializer: Operation creation. '''
class OperationData(serializers.ModelSerializer):
''' Serializer: Operation creation data. '''
alias = serializers.CharField()
operation_type = serializers.ChoiceField(OperationType.choices)
class Meta:
''' serializer metadata. '''
model = Operation
fields = \
'alias', 'operation_type', 'title', \
'comment', 'position_x', 'position_y'
item_data = OperationData()
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
class OperationDeleteSerializer(serializers.Serializer):
''' Serializer: Delete operation. '''
target = PKField(many=False, queryset=Operation.objects.all())
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
def validate(self, attrs):
oss = cast(LibraryItem, self.context['oss'])
operation = cast(Operation, attrs['target'])
if oss and operation.oss != oss:
raise serializers.ValidationError({
f'{operation.id}': msg.operationNotOwned(oss.title)
})
self.instance = operation
return attrs
class OperationSchemaSerializer(serializers.ModelSerializer):
''' Serializer: Detailed data for OSS. '''
items = serializers.ListField(
child=OperationSerializer()
)
graph = serializers.ListField(
child=ArgumentSerializer()
)
class Meta:
''' serializer metadata. '''
model = LibraryItem
fields = '__all__'
def to_representation(self, instance: LibraryItem):
result = LibraryItemDetailsSerializer(instance).data
oss = OperationSchema(instance)
result['items'] = []
for operation in oss.operations():
result['items'].append(OperationSerializer(operation).data)
result['graph'] = []
for argument in oss.arguments():
result['graph'].append(ArgumentSerializer(argument).data)
result['substitutions'] = []
for substitution in oss.substitutions().values(
'operation',
'original',
'transfer_term',
'substitution',
original_alias=F('original__alias'),
original_term=F('original__term_resolved'),
substitution_alias=F('substitution__alias'),
substitution_term=F('substitution__term_resolved'),
):
result['substitutions'].append(substitution)
return result

View File

@ -0,0 +1,10 @@
''' Utility serializers for REST API schema - SHOULD NOT BE ACCESSED DIRECTLY. '''
from rest_framework import serializers
from .data_access import OperationSchemaSerializer, OperationSerializer
class NewOperationResponse(serializers.Serializer):
''' Serializer: Create operation response. '''
new_operation = OperationSerializer()
oss = OperationSchemaSerializer()

View File

@ -0,0 +1,3 @@
''' Tests. '''
from .s_models import *
from .s_views import *

View File

@ -0,0 +1 @@
''' Tests for Django Models. '''

View File

@ -0,0 +1,2 @@
''' Tests for REST API. '''
from .t_oss import *

View File

@ -0,0 +1,189 @@
''' Testing API: Operation Schema. '''
from rest_framework import status
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestOssViewset(EndpointTester):
''' Testing OSS view. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(title='Test', alias='T1', owner=self.user)
self.owned_id = self.owned.item.pk
self.unowned = OperationSchema.create(title='Test2', alias='T2')
self.unowned_id = self.unowned.item.pk
self.private = OperationSchema.create(title='Test2', alias='T2', access_policy=AccessPolicy.PRIVATE)
self.private_id = self.private.item.pk
self.invalid_id = self.private.item.pk + 1337
def populateData(self):
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.item
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.item
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.add_argument(self.operation3, self.operation1)
self.owned.add_argument(self.operation3, self.operation2)
self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1,
'substitution': self.ks2x1,
'transfer_term': False
}])
@decl_endpoint('/api/oss/{item}/details', method='get')
def test_details(self):
self.populateData()
response = self.executeOK(item=self.owned_id)
self.assertEqual(response.data['owner'], self.owned.item.owner.pk)
self.assertEqual(response.data['title'], self.owned.item.title)
self.assertEqual(response.data['alias'], self.owned.item.alias)
self.assertEqual(response.data['location'], self.owned.item.location)
self.assertEqual(response.data['access_policy'], self.owned.item.access_policy)
self.assertEqual(response.data['visible'], self.owned.item.visible)
self.assertEqual(response.data['item_type'], LibraryItemType.OPERATION_SCHEMA)
self.assertEqual(len(response.data['items']), 3)
self.assertEqual(response.data['items'][0]['id'], self.operation1.pk)
self.assertEqual(response.data['items'][0]['operation_type'], self.operation1.operation_type)
self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk)
self.assertEqual(sub['transfer_term'], False)
self.assertEqual(sub['original_alias'], self.ks1x1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved)
graph = response.data['graph']
self.assertEqual(len(graph), 2)
self.assertEqual(graph[0]['operation'], self.operation3.pk)
self.assertEqual(graph[0]['argument'], self.operation1.pk)
self.assertEqual(graph[1]['operation'], self.operation3.pk)
self.assertEqual(graph[1]['argument'], self.operation2.pk)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
self.logout()
self.executeOK(item=self.owned_id)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/update-positions', method='patch')
def test_update_positions(self):
self.populateData()
self.executeBadData(item=self.owned_id)
data = {'positions': []}
self.executeOK(data=data)
data = {'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337},
{'id': self.operation2.pk, 'position_x': 36.1, 'position_y': 1437},
{'id': self.invalid_id, 'position_x': 31, 'position_y': 12},
]}
self.toggle_admin(True)
self.executeOK(data=data, item=self.unowned_id)
self.operation1.refresh_from_db()
self.assertNotEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertNotEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.toggle_admin(False)
self.executeOK(data=data, item=self.owned_id)
self.operation1.refresh_from_db()
self.operation2.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.assertEqual(self.operation2.position_x, data['positions'][1]['position_x'])
self.assertEqual(self.operation2.position_y, data['positions'][1]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/create-operation', method='post')
def test_create_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'item_data': {
'alias': 'Test3',
'title': 'Test title',
'comment': 'Тест кириллицы',
'position_x': 1,
'position_y': 1,
},
'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337}
]
}
self.executeBadData(data=data)
data['item_data']['operation_type'] = 'invalid'
self.executeBadData(data=data)
data['item_data']['operation_type'] = OperationType.INPUT
response = self.executeCreated(data=data)
self.assertEqual(len(response.data['oss']['items']), 4)
new_operation = response.data['new_operation']
self.assertEqual(new_operation['alias'], data['item_data']['alias'])
self.assertEqual(new_operation['operation_type'], data['item_data']['operation_type'])
self.assertEqual(new_operation['title'], data['item_data']['title'])
self.assertEqual(new_operation['comment'], data['item_data']['comment'])
self.assertEqual(new_operation['position_x'], data['item_data']['position_x'])
self.assertEqual(new_operation['position_y'], data['item_data']['position_y'])
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.toggle_admin(True)
self.executeCreated(data=data, item=self.unowned_id)
@decl_endpoint('/api/oss/{item}/delete-operation', method='patch')
def test_delete_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'positions': []
}
self.executeBadData(data=data)
data['target'] = self.operation1.pk
self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id)
self.logout()
self.executeForbidden(data=data, item=self.owned_id)
self.login()
response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 2)

View File

@ -0,0 +1,12 @@
''' Routing: Operation Schema. '''
from django.urls import include, path
from rest_framework import routers
from . import views
library_router = routers.SimpleRouter(trailing_slash=False)
library_router.register('oss', views.OssViewSet, 'OSS')
urlpatterns = [
path('', include(library_router.urls)),
]

View File

@ -0,0 +1,2 @@
''' REST API: Endpoint processors. '''
from .oss import OssViewSet

View File

@ -0,0 +1,141 @@
''' Endpoints for OSS. '''
from typing import cast
from django.db import transaction
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
from rest_framework import status as c
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from shared import permissions
from .. import models as m
from .. import serializers as s
@extend_schema(tags=['OSS'])
@extend_schema_view()
class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.RetrieveAPIView):
''' Endpoint: OperationSchema. '''
queryset = m.LibraryItem.objects.filter(item_type=m.LibraryItemType.OPERATION_SCHEMA)
serializer_class = s.LibraryItemSerializer
def _get_schema(self) -> m.OperationSchema:
return m.OperationSchema(cast(m.LibraryItem, self.get_object()))
def get_permissions(self):
''' Determine permission class. '''
if self.action in [
'create_operation',
'delete_operation',
'update_positions'
]:
permission_list = [permissions.ItemEditor]
elif self.action in ['details']:
permission_list = [permissions.ItemAnyone]
else:
permission_list = [permissions.Anyone]
return [permission() for permission in permission_list]
@extend_schema(
summary='get operations data',
tags=['OSS'],
request=None,
responses={
c.HTTP_200_OK: s.OperationSchemaSerializer,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk):
''' Endpoint: Detailed OSS data. '''
serializer = s.OperationSchemaSerializer(cast(m.LibraryItem, self.get_object()))
return Response(
status=c.HTTP_200_OK,
data=serializer.data
)
@extend_schema(
summary='update positions',
tags=['OSS'],
request=s.PositionsSerializer,
responses={
c.HTTP_200_OK: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='update-positions')
def update_positions(self, request: Request, pk):
''' Endpoint: Update operations positions. '''
schema = self._get_schema()
serializer = s.PositionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
schema.update_positions(serializer.validated_data['positions'])
return Response(status=c.HTTP_200_OK)
@extend_schema(
summary='create operation',
tags=['OSS'],
request=s.OperationCreateSerializer(),
responses={
c.HTTP_201_CREATED: s.NewOperationResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['post'], url_path='create-operation')
def create_operation(self, request: Request, pk):
''' Create new operation. '''
schema = self._get_schema()
serializer = s.OperationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
new_operation = schema.create_operation(**serializer.validated_data['item_data'])
schema.item.refresh_from_db()
response = Response(
status=c.HTTP_201_CREATED,
data={
'new_operation': s.OperationSerializer(new_operation).data,
'oss': s.OperationSchemaSerializer(schema.item).data
}
)
return response
@extend_schema(
summary='delete operation',
tags=['OSS'],
request=s.OperationDeleteSerializer,
responses={
c.HTTP_200_OK: s.OperationSchemaSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='delete-operation')
def delete_operation(self, request: Request, pk):
''' Endpoint: Delete operation. '''
schema = self._get_schema()
serializer = s.OperationDeleteSerializer(
data=request.data,
context={'oss': schema.item}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
schema.delete_operation(serializer.validated_data['target'])
schema.item.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.OperationSchemaSerializer(schema.item).data
)

View File

@ -0,0 +1,18 @@
# Generated by Django 5.0.7 on 2024-07-17 09:51
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('rsform', '0007_location_and_flags'),
]
operations = [
migrations.AlterField(
model_name='libraryitem',
name='item_type',
field=models.CharField(choices=[('rsform', 'Rsform'), ('oss', 'Operation Schema')], max_length=50, verbose_name='Тип'),
),
]

View File

@ -9,6 +9,7 @@ from django.db.models import (
DateTimeField,
ForeignKey,
Model,
QuerySet,
TextChoices,
TextField
)
@ -23,7 +24,7 @@ from .Version import Version
class LibraryItemType(TextChoices):
''' Type of library items '''
RSFORM = 'rsform'
OPERATIONS_SCHEMA = 'oss'
OPERATION_SCHEMA = 'oss'
class AccessPolicy(TextChoices):
@ -113,17 +114,17 @@ class LibraryItem(Model):
def get_absolute_url(self):
return f'/api/library/{self.pk}'
def subscribers(self) -> list[Subscription]:
def subscribers(self) -> list[User]:
''' Get all subscribers for this item. '''
return [subscription.user for subscription in Subscription.objects.filter(item=self.pk)]
return [subscription.user for subscription in Subscription.objects.filter(item=self.pk).only('user')]
def versions(self) -> list[Version]:
''' Get all Versions of this item. '''
return list(Version.objects.filter(item=self.pk).order_by('-time_create'))
def editors(self) -> list[Editor]:
def editors(self) -> list[User]:
''' Get all Editors of this item. '''
return [item.editor for item in Editor.objects.filter(item=self.pk)]
return [item.editor for item in Editor.objects.filter(item=self.pk).only('editor')]
def versions(self) -> QuerySet[Version]:
''' Get all Versions of this item. '''
return Version.objects.filter(item=self.pk).order_by('-time_create')
@transaction.atomic
def save(self, *args, **kwargs):

View File

@ -1,13 +1,14 @@
''' Models: RSForm API. '''
from copy import deepcopy
from typing import Optional, Union, cast
from typing import Optional, cast
from cctext import Entity, Resolver, TermForm, extract_entities, split_grams
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet
from .. import messages as msg
from shared import messages as msg
from ..graph import Graph
from .api_RSLanguage import (
extract_globals,
@ -39,7 +40,7 @@ class RSForm:
def create(**kwargs) -> 'RSForm':
return RSForm(LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, **kwargs))
def constituents(self) -> QuerySet['Constituenta']:
def constituents(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.item)
@ -104,11 +105,39 @@ class RSForm:
result = max(result, int(alias[1:]))
return result
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create new cst from data. '''
if insert_after is None:
position = _INSERT_LAST
else:
position = insert_after.order + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
result.convention = data.get('convention', '')
result.definition_formal = data.get('definition_formal', '')
result.term_forms = data.get('term_forms', [])
result.term_raw = data.get('term_raw', '')
result.definition_raw = data.get('definition_raw', '')
if result.term_raw != '' or result.definition_raw != '':
resolver = self.resolver()
if result.term_raw != '':
resolved = resolver.resolve(result.term_raw)
result.term_resolved = resolved
resolver.context[result.alias] = Entity(result.alias, resolved)
if result.definition_raw != '':
result.definition_resolved = resolver.resolve(result.definition_raw)
result.save()
self.on_term_change([result.id])
result.refresh_from_db()
return result
@transaction.atomic
def insert_new(
self,
alias: str,
cst_type: Union[CstType, None] = None,
cst_type: Optional[CstType] = None,
position: int = _INSERT_LAST,
**kwargs
) -> Constituenta:
@ -195,27 +224,6 @@ class RSForm:
self.resolve_all_text()
self.item.save()
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[str] = None) -> Constituenta:
''' Create new cst from data. '''
resolver = self.resolver()
cst = self._insert_new(data, insert_after)
cst.convention = data.get('convention', '')
cst.definition_formal = data.get('definition_formal', '')
cst.term_forms = data.get('term_forms', [])
cst.term_raw = data.get('term_raw', '')
if cst.term_raw != '':
resolved = resolver.resolve(cst.term_raw)
cst.term_resolved = resolved
resolver.context[cst.alias] = Entity(cst.alias, resolved)
cst.definition_raw = data.get('definition_raw', '')
if cst.definition_raw != '':
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
self.on_term_change([cst.id])
cst.refresh_from_db()
return cst
@transaction.atomic
def substitute(
self,
@ -363,13 +371,6 @@ class RSForm:
cst.save()
order += 1
def _insert_new(self, data: dict, insert_after: Optional[str] = None) -> Constituenta:
if insert_after is not None:
cst_after = Constituenta.objects.get(pk=insert_after)
return self.insert_new(data['alias'], data['cst_type'], cst_after.order + 1)
else:
return self.insert_new(data['alias'], data['cst_type'])
def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. '''
result: Graph[int] = Graph()

View File

@ -6,7 +6,8 @@ from typing import Set, Tuple, cast
import pyconcept
from .. import messages as msg
from shared import messages as msg
from .Constituenta import CstType
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line

View File

@ -22,6 +22,7 @@ from .data_access import (
InlineSynthesisSerializer,
LibraryItemBaseSerializer,
LibraryItemCloneSerializer,
LibraryItemDetailsSerializer,
LibraryItemSerializer,
RSFormParseSerializer,
RSFormSerializer,

View File

@ -4,7 +4,8 @@ from typing import cast
from cctext import EntityReference, Reference, ReferenceType, Resolver, SyntacticReference
from rest_framework import serializers
from .. import messages as msg
from shared import messages as msg
from ..models import AccessPolicy, validate_location
@ -34,7 +35,7 @@ class LocationSerializer(serializers.Serializer):
class AccessPolicySerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. '''
access_policy = serializers.CharField(max_length=500)
access_policy = serializers.CharField()
def validate(self, attrs):
attrs = super().validate(attrs)

View File

@ -7,7 +7,8 @@ from django.db import transaction
from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField
from .. import messages as msg
from shared import messages as msg
from ..models import Constituenta, CstType, LibraryItem, RSForm, Version
from .basics import CstParseSerializer
from .io_pyconcept import PyConceptAdapter
@ -140,6 +141,8 @@ class CstDetailsSerializer(serializers.ModelSerializer):
class CstCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. '''
insert_after = serializers.IntegerField(required=False, allow_null=True)
alias = serializers.CharField(max_length=8)
cst_type = serializers.ChoiceField(CstType.choices)
class Meta:
''' serializer metadata. '''

View File

@ -2,7 +2,8 @@
from django.db import transaction
from rest_framework import serializers
from .. import messages as msg
from shared import messages as msg
from ..models import Constituenta, LibraryItem, RSForm
from ..utils import fix_old_references

View File

@ -4,7 +4,8 @@ from typing import Optional, Union, cast
import pyconcept
from .. import messages as msg
from shared import messages as msg
from ..models import RSForm

View File

@ -1,5 +1,5 @@
''' Tests. '''
from .s_models.t_RSForm import *
from .s_models import *
from .s_views import *
from .t_graph import *
from .t_imports import *

View File

@ -1,4 +1,4 @@
''' Tests for REST API. '''
''' Tests for Django Models. '''
from .t_Constituenta import *
from .t_Editor import *
from .t_LibraryItem import *

View File

@ -101,6 +101,26 @@ class TestRSForm(TestCase):
self.assertEqual(x2.schema, self.schema.item)
self.assertEqual(x1.order, 1)
def test_create_cst(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': 'test',
'convention': 'convention'
}
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x3 = self.schema.create_cst(data=data, insert_after=x1)
x2.refresh_from_db()
self.assertEqual(x3.alias, data['alias'])
self.assertEqual(x3.term_raw, data['term_raw'])
self.assertEqual(x3.definition_raw, data['definition_raw'])
self.assertEqual(x2.order, 3)
self.assertEqual(x3.order, 2)
def test_create_cst_resolve(self):
x1 = self.schema.insert_new(

View File

@ -1,7 +1,7 @@
''' Testing views '''
from cctext import split_grams
from ..EndpointTester import EndpointTester, decl_endpoint
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestNaturalLanguageViews(EndpointTester):
@ -14,20 +14,20 @@ class TestNaturalLanguageViews(EndpointTester):
@decl_endpoint(endpoint='/api/cctext/parse', method='post')
def test_parse_text(self):
data = {'text': 'синим слонам'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self._assert_tags(response.data['result'], 'datv,NOUN,plur,anim,masc')
@decl_endpoint(endpoint='/api/cctext/inflect', method='post')
def test_inflect(self):
data = {'text': 'синий слон', 'grams': 'plur,datv'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], 'синим слонам')
@decl_endpoint(endpoint='/api/cctext/generate-lexeme', method='post')
def test_generate_lexeme(self):
data = {'text': 'синий слон'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 12)
self.assertEqual(response.data['items'][0]['text'], 'синий слон')

View File

@ -1,7 +1,6 @@
''' Testing API: Constituents. '''
from apps.rsform.models import Constituenta, CstType, RSForm
from ..EndpointTester import EndpointTester, decl_endpoint
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestConstituentaAPI(EndpointTester):
@ -52,18 +51,18 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch')
def test_partial_update(self):
data = {'convention': 'tt'}
self.executeForbidden(data, item=self.cst2.pk)
self.executeForbidden(data=data, item=self.cst2.pk)
self.logout()
self.executeForbidden(data, item=self.cst1.pk)
self.executeForbidden(data=data, item=self.cst1.pk)
self.login()
response = self.executeOK(data, item=self.cst1.pk)
response = self.executeOK(data=data, item=self.cst1.pk)
self.cst1.refresh_from_db()
self.assertEqual(response.data['convention'], 'tt')
self.assertEqual(self.cst1.convention, 'tt')
self.executeOK(data, item=self.cst1.pk)
self.executeOK(data=data, item=self.cst1.pk)
@decl_endpoint('/api/constituents/{item}', method='patch')
@ -72,7 +71,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': 'New term',
'definition_raw': 'New def'
}
response = self.executeOK(data, item=self.cst3.pk)
response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db()
self.assertEqual(response.data['term_resolved'], 'New term')
self.assertEqual(self.cst3.term_resolved, 'New term')
@ -86,7 +85,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': '@{X1|nomn,sing}',
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}'
}
response = self.executeOK(data, item=self.cst3.pk)
response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db()
self.assertEqual(self.cst3.term_resolved, self.cst1.term_resolved)
self.assertEqual(response.data['term_resolved'], self.cst1.term_resolved)
@ -97,7 +96,7 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch')
def test_readonly_cst_fields(self):
data = {'alias': 'X33', 'order': 10}
response = self.executeOK(data, item=self.cst1.pk)
response = self.executeOK(data=data, item=self.cst1.pk)
self.assertEqual(response.data['alias'], 'X1')
self.assertEqual(response.data['alias'], self.cst1.alias)
self.assertEqual(response.data['order'], self.cst1.order)

View File

@ -11,9 +11,8 @@ from apps.rsform.models import (
RSForm,
Subscription
)
from ..EndpointTester import EndpointTester, decl_endpoint
from ..testing_utils import response_contains
from shared.EndpointTester import EndpointTester, decl_endpoint
from shared.testing_utils import response_contains
class TestLibraryViewset(EndpointTester):
@ -49,17 +48,17 @@ class TestLibraryViewset(EndpointTester):
'title': 'Title',
'alias': 'alias',
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'item_type': LibraryItemType.OPERATIONS_SCHEMA,
'item_type': LibraryItemType.OPERATION_SCHEMA,
'title': 'Title',
'alias': 'alias',
'access_policy': AccessPolicy.PROTECTED,
'visible': False,
'read_only': True
}
response = self.executeCreated(data)
response = self.executeCreated(data=data)
self.assertEqual(response.data['owner'], self.user.pk)
self.assertEqual(response.data['item_type'], data['item_type'])
self.assertEqual(response.data['title'], data['title'])
@ -70,25 +69,25 @@ class TestLibraryViewset(EndpointTester):
self.logout()
data = {'title': 'Title2'}
self.executeForbidden(data)
self.executeForbidden(data=data)
@decl_endpoint('/api/library/{item}', method='patch')
def test_update(self):
data = {'id': self.unowned.pk, 'title': 'New Title'}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_editor(self.unowned, True)
response = self.executeOK(data, item=self.unowned.pk)
response = self.executeOK(data=data, item=self.unowned.pk)
self.assertEqual(response.data['title'], data['title'])
self.unowned.access_policy = AccessPolicy.PRIVATE
self.unowned.save()
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
data = {'id': self.owned.pk, 'title': 'New Title'}
response = self.executeOK(data, item=self.owned.pk)
response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], self.owned.alias)
@ -99,7 +98,7 @@ class TestLibraryViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED,
'location': LocationHead.LIBRARY
}
response = self.executeOK(data, item=self.owned.pk)
response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['owner'], self.owned.owner.pk)
self.assertEqual(response.data['access_policy'], self.owned.access_policy)
@ -112,22 +111,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user)
data = {'user': self.user2.pk}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user2)
self.assertEqual(self.owned.time_update, time_update)
self.executeForbidden(data, item=self.owned.pk)
self.executeForbidden(data=data, item=self.owned.pk)
self.toggle_admin(True)
data = {'user': self.user.pk}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user)
@ -136,20 +135,20 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'access_policy': 'invalid'}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'access_policy': AccessPolicy.PRIVATE}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.access_policy, data['access_policy'])
self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True)
self.executeOK(data, item=self.unowned.pk)
self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db()
self.assertEqual(self.unowned.access_policy, data['access_policy'])
@ -158,29 +157,29 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'location': 'invalid'}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'location': '/U/temp'}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location'])
data = {'location': LocationHead.LIBRARY}
self.executeForbidden(data, item=self.owned.pk)
self.executeForbidden(data=data, item=self.owned.pk)
data = {'location': '/U/temp'}
self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True)
data = {'location': LocationHead.LIBRARY}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location'])
self.executeOK(data, item=self.unowned.pk)
self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db()
self.assertEqual(self.unowned.location, data['location'])
@ -189,22 +188,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
data = {'user': self.user2.pk}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user, self.user2]))
@ -213,25 +212,25 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user)
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user)
Editor.add(item=self.owned, user=self.user2)
data = {'user': self.user2.pk}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
@ -240,30 +239,30 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'users': [self.invalid_user]}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'users': [self.user.pk]}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
data = {'users': [self.user2.pk]}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user2])
data = {'users': []}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [])
data = {'users': [self.user2.pk, self.user.pk]}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user2, self.user]))
@ -294,7 +293,7 @@ class TestLibraryViewset(EndpointTester):
@decl_endpoint('/api/library', method='get')
def test_library_get(self):
non_schema = LibraryItem.objects.create(
item_type=LibraryItemType.OPERATIONS_SCHEMA,
item_type=LibraryItemType.OPERATION_SCHEMA,
title='Test4'
)
response = self.executeOK()
@ -376,11 +375,11 @@ class TestLibraryViewset(EndpointTester):
)
data = {'title': 'Title1337'}
self.executeNotFound(data, item=self.invalid_item)
self.executeCreated(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeCreated(data=data, item=self.unowned.pk)
data = {'title': 'Title1338'}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 2)
self.assertEqual(response.data['items'][0]['alias'], x12.alias)
@ -390,12 +389,12 @@ class TestLibraryViewset(EndpointTester):
self.assertEqual(response.data['items'][1]['term_resolved'], d2.term_resolved)
data = {'title': 'Title1340', 'items': []}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 0)
data = {'title': 'Title1341', 'items': [x12.pk]}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 1)
self.assertEqual(response.data['items'][0]['alias'], x12.alias)

View File

@ -1,7 +1,6 @@
''' Testing API: Operations. '''
from apps.rsform.models import Constituenta, CstType, RSForm
from ..EndpointTester import EndpointTester, decl_endpoint
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestInlineSynthesis(EndpointTester):
@ -24,20 +23,20 @@ class TestInlineSynthesis(EndpointTester):
'items': [],
'substitutions': []
}
self.executeForbidden(data)
self.executeForbidden(data=data)
data['receiver'] = invalid_id
self.executeBadData(data)
self.executeBadData(data=data)
data['receiver'] = self.schema1.item.pk
data['source'] = invalid_id
self.executeBadData(data)
self.executeBadData(data=data)
data['source'] = self.schema1.item.pk
self.executeOK(data)
self.executeOK(data=data)
data['items'] = [invalid_id]
self.executeBadData(data)
self.executeBadData(data=data)
def test_inline_synthesis(self):
@ -68,7 +67,7 @@ class TestInlineSynthesis(EndpointTester):
}
]
}
response = self.executeOK(data)
response = self.executeOK(data=data)
result = {item['alias']: item for item in response.data['items']}
self.assertEqual(len(result), 6)
self.assertEqual(result['X2']['term_raw'], ks1_x2.term_raw)

View File

@ -15,9 +15,8 @@ from apps.rsform.models import (
LocationHead,
RSForm
)
from ..EndpointTester import EndpointTester, decl_endpoint
from ..testing_utils import response_contains
from shared.EndpointTester import EndpointTester, decl_endpoint
from shared.testing_utils import response_contains
class TestRSFormViewset(EndpointTester):
@ -44,7 +43,7 @@ class TestRSFormViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED,
'visible': False
}
self.executeBadData(data)
self.executeBadData(data=data)
with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file:
data['file'] = file
@ -59,7 +58,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms', method='get')
def test_list_rsforms(self):
non_schema = LibraryItem.objects.create(
item_type=LibraryItemType.OPERATIONS_SCHEMA,
item_type=LibraryItemType.OPERATION_SCHEMA,
title='Test3'
)
response = self.executeOK()
@ -124,14 +123,14 @@ class TestRSFormViewset(EndpointTester):
def test_check(self):
self.owned.insert_new('X1')
data = {'expression': 'X1=X1'}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[X1][X1]]')
self.assertEqual(response.data['typification'], 'LOGIC')
self.assertEqual(response.data['valueClass'], 'value')
self.executeOK(data, item=self.unowned_id)
self.executeOK(data=data, item=self.unowned_id)
@decl_endpoint('/api/rsforms/{item}/resolve', method='post')
@ -142,7 +141,7 @@ class TestRSFormViewset(EndpointTester):
)
data = {'text': '@{1|редкий} @{X1|plur,datv}'}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['input'], '@{1|редкий} @{X1|plur,datv}')
self.assertEqual(response.data['output'], 'редким синим слонам')
self.assertEqual(len(response.data['refs']), 2)
@ -189,13 +188,19 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/cst-create', method='post')
def test_create_constituenta(self):
data = {'alias': 'X3', 'cst_type': CstType.BASE}
self.executeForbidden(data, item=self.unowned_id)
data = {'alias': 'X3'}
self.executeForbidden(data=data, item=self.unowned_id)
self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
response = self.executeCreated(data, item=self.owned_id)
data['cst_type'] = 'invalid'
self.executeBadData(data=data, item=self.owned_id)
data['cst_type'] = CstType.BASE
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3')
x3 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x3.order, 3)
@ -207,7 +212,7 @@ class TestRSFormViewset(EndpointTester):
'term_raw': 'test',
'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}]
}
response = self.executeCreated(data, item=self.owned_id)
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], data['alias'])
x4 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x4.order, 3)
@ -234,14 +239,14 @@ class TestRSFormViewset(EndpointTester):
)
data = {'target': x2_2.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
self.executeForbidden(data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id)
self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x1.alias, 'cst_type': CstType.TERM}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x3.alias}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new(
alias='D1',
@ -253,7 +258,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(x1.cst_type, CstType.BASE)
data = {'target': x1.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'D2')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.TERM)
d1.refresh_from_db()
@ -280,14 +285,14 @@ class TestRSFormViewset(EndpointTester):
unowned = self.unowned.insert_new('X2')
data = {'substitutions': [{'original': x1.pk, 'substitution': unowned.pk, 'transfer_term': True}]}
self.executeForbidden(data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id)
self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': unowned.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': x1.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new(
alias='D1',
@ -295,7 +300,7 @@ class TestRSFormViewset(EndpointTester):
definition_formal='X1'
)
data = {'substitutions': [{'original': x1.pk, 'substitution': x2.pk, 'transfer_term': True}]}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
d1.refresh_from_db()
x2.refresh_from_db()
self.assertEqual(x2.term_raw, 'Test1')
@ -315,7 +320,7 @@ class TestRSFormViewset(EndpointTester):
)
data = {'substitutions': []}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'substitutions': [
{
@ -329,7 +334,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True
}
]}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'substitutions': [
{
@ -343,7 +348,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True
}
]}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
d3.refresh_from_db()
self.assertEqual(d3.definition_formal, r'D1 \ D2')
@ -358,7 +363,7 @@ class TestRSFormViewset(EndpointTester):
'definition_formal': '3',
'definition_raw': '4'
}
response = self.executeCreated(data, item=self.owned_id)
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.BASE)
self.assertEqual(response.data['new_cst']['convention'], '1')
@ -374,13 +379,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id)
data = {'items': [1337]}
self.executeBadData(data)
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
data = {'items': [x1.pk]}
response = self.executeOK(data)
response = self.executeOK(data=data)
x2.refresh_from_db()
self.owned.item.refresh_from_db()
self.assertEqual(len(response.data['items']), 1)
@ -390,7 +395,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
@decl_endpoint('/api/rsforms/{item}/cst-moveto', method='patch')
@ -398,13 +403,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id)
data = {'items': [1337], 'move_to': 1}
self.executeBadData(data)
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
data = {'items': [x2.pk], 'move_to': 1}
response = self.executeOK(data)
response = self.executeOK(data=data)
x1.refresh_from_db()
x2.refresh_from_db()
self.assertEqual(response.data['id'], self.owned_id)
@ -413,7 +418,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk], 'move_to': 1}
self.executeBadData(data)
self.executeBadData(data=data)
@decl_endpoint('/api/rsforms/{item}/reset-aliases', method='patch')

View File

@ -1,5 +1,5 @@
''' Testing views '''
from ..EndpointTester import EndpointTester, decl_endpoint
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestRSLanguageViews(EndpointTester):
@ -8,30 +8,30 @@ class TestRSLanguageViews(EndpointTester):
@decl_endpoint('/api/rslang/to-ascii', method='post')
def test_convert_to_ascii(self):
data = {'data': '1=1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': '1=1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1 \eq 1')
@decl_endpoint('/api/rslang/to-math', method='post')
def test_convert_to_math(self):
data = {'data': r'1 \eq 1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': r'1 \eq 1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1=1')
@decl_endpoint('/api/rslang/parse-expression', method='post')
def test_parse_expression(self):
data = {'data': r'1=1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': r'1=1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[1][1]]')

View File

@ -7,8 +7,7 @@ from zipfile import ZipFile
from rest_framework import status
from apps.rsform.models import Constituenta, RSForm
from ..EndpointTester import EndpointTester, decl_endpoint
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestVersionViews(EndpointTester):
@ -31,11 +30,11 @@ class TestVersionViews(EndpointTester):
invalid_id = 1338
data = {'version': '1.0.0', 'description': 'test'}
self.executeNotFound(data, schema=invalid_id)
self.executeForbidden(data, schema=self.unowned.pk)
self.executeBadData(invalid_data, schema=self.owned.pk)
self.executeNotFound(data=data, schema=invalid_id)
self.executeForbidden(data=data, schema=self.unowned.pk)
self.executeBadData(data=invalid_data, schema=self.owned.pk)
response = self.executeCreated(data, schema=self.owned.pk)
response = self.executeCreated(data=data, schema=self.owned.pk)
self.assertTrue('version' in response.data)
self.assertTrue('schema' in response.data)
self.assertTrue(response.data['version'] in [v['id'] for v in response.data['schema']['versions']])
@ -65,7 +64,7 @@ class TestVersionViews(EndpointTester):
@decl_endpoint('/api/versions/{version}', method='get')
def test_access_version(self):
data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data)
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
self.executeNotFound(version=invalid_id)
@ -79,14 +78,14 @@ class TestVersionViews(EndpointTester):
data = {'version': '1.2.0', 'description': 'test1'}
self.method = 'patch'
self.executeForbidden(data)
self.executeForbidden(data=data)
self.method = 'delete'
self.executeForbidden()
self.client.force_authenticate(user=self.user)
self.method = 'patch'
self.executeOK(data)
self.executeOK(data=data)
response = self.get()
self.assertEqual(response.data['version'], data['version'])
self.assertEqual(response.data['description'], data['description'])
@ -139,7 +138,7 @@ class TestVersionViews(EndpointTester):
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new('D1', term_raw='TestTerm')
data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data)
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
d1.delete()

View File

@ -2,8 +2,9 @@
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
from shared import permissions
from .. import models as m
from .. import permissions
from .. import serializers as s

View File

@ -13,8 +13,9 @@ from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from shared import permissions
from .. import models as m
from .. import permissions
from .. import serializers as s
@ -42,17 +43,27 @@ class LibraryViewSet(viewsets.ModelViewSet):
def get_permissions(self):
if self.action in ['update', 'partial_update']:
permission_list = [permissions.ItemEditor]
access_level = permissions.ItemEditor
elif self.action in [
'destroy', 'set_owner', 'set_access_policy', 'set_location',
'editors_add', 'editors_remove', 'editors_set'
'destroy',
'set_owner',
'set_access_policy',
'set_location',
'editors_add',
'editors_remove',
'editors_set'
]:
permission_list = [permissions.ItemOwner]
elif self.action in ['create', 'clone', 'subscribe', 'unsubscribe']:
permission_list = [permissions.GlobalUser]
access_level = permissions.ItemOwner
elif self.action in [
'create',
'clone',
'subscribe',
'unsubscribe'
]:
access_level = permissions.GlobalUser
else:
permission_list = [permissions.ItemAnyone]
return [permission() for permission in permission_list]
access_level = permissions.ItemAnyone
return [access_level()]
def _get_item(self) -> m.LibraryItem:
return cast(m.LibraryItem, self.get_object())
@ -68,7 +79,6 @@ class LibraryViewSet(viewsets.ModelViewSet):
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['post'], url_path='clone')
def clone(self, request: Request, pk):
''' Endpoint: Create deep copy of library item. '''
@ -85,8 +95,9 @@ class LibraryViewSet(viewsets.ModelViewSet):
clone.read_only = False
clone.access_policy = serializer.validated_data.get('access_policy', m.AccessPolicy.PUBLIC)
clone.location = serializer.validated_data.get('location', m.LocationHead.USER)
clone.save()
with transaction.atomic():
clone.save()
if clone.item_type == m.LibraryItemType.RSFORM:
need_filter = 'items' in request.data
for cst in m.RSForm(item).constituents():
@ -266,24 +277,17 @@ class LibraryActiveView(generics.ListAPIView):
serializer_class = s.LibraryItemSerializer
def get_queryset(self):
common_location = Q(location__startswith=m.LocationHead.COMMON) | Q(location__startswith=m.LocationHead.LIBRARY)
is_public = Q(access_policy=m.AccessPolicy.PUBLIC)
if self.request.user.is_anonymous:
return m.LibraryItem.objects.filter(
Q(access_policy=m.AccessPolicy.PUBLIC),
).filter(
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
).order_by('-time_update')
return m.LibraryItem.objects \
.filter(is_public) \
.filter(common_location).order_by('-time_update')
else:
user = cast(m.User, self.request.user)
# pylint: disable=unsupported-binary-operation
return m.LibraryItem.objects.filter(
(
Q(access_policy=m.AccessPolicy.PUBLIC) &
(
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
)
) |
(is_public & common_location) |
Q(owner=user) |
Q(editor__editor=user) |
Q(subscription__user=user)

View File

@ -18,7 +18,6 @@ from .. import serializers as s
request=s.InlineSynthesisSerializer,
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@transaction.atomic
@api_view(['PATCH'])
def inline_synthesis(request: Request):
''' Endpoint: Inline synthesis. '''
@ -30,8 +29,9 @@ def inline_synthesis(request: Request):
schema = m.RSForm(serializer.validated_data['receiver'])
items = cast(list[m.Constituenta], serializer.validated_data['items'])
new_items = schema.insert_copy(items)
with transaction.atomic():
new_items = schema.insert_copy(items)
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
@ -42,8 +42,8 @@ def inline_synthesis(request: Request):
index = next(i for (i, cst) in enumerate(items) if cst == replacement)
replacement = new_items[index]
schema.substitute(original, replacement, substitution['transfer_term'])
schema.restore_order()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data

View File

@ -13,9 +13,10 @@ from rest_framework.decorators import action, api_view
from rest_framework.request import Request
from rest_framework.response import Response
from .. import messages as msg
from shared import messages as msg
from shared import permissions
from .. import models as m
from .. import permissions
from .. import serializers as s
from .. import utils
@ -33,11 +34,21 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def get_permissions(self):
''' Determine permission class. '''
if self.action in [
'load_trs', 'cst_create', 'cst_delete_multiple',
'reset_aliases', 'cst_rename', 'cst_substitute'
'load_trs',
'reset_aliases',
'cst_create',
'cst_delete_multiple',
'cst_rename',
'cst_substitute'
]:
permission_list = [permissions.ItemEditor]
elif self.action in ['contents', 'details', 'export_trs', 'resolve', 'check']:
elif self.action in [
'contents',
'details',
'export_trs',
'resolve',
'check'
]:
permission_list = [permissions.ItemAnyone]
else:
permission_list = [permissions.Anyone]
@ -49,6 +60,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstCreateSerializer,
responses={
c.HTTP_201_CREATED: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -60,10 +72,15 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
new_cst = schema.create_cst(
data=data,
insert_after=data['insert_after'] if 'insert_after' in data else None
)
if 'insert_after' in data:
try:
insert_after = m.Constituenta.objects.get(pk=data['insert_after'])
except m.LibraryItem.DoesNotExist:
return Response(status=c.HTTP_404_NOT_FOUND)
else:
insert_after = None
new_cst = schema.create_cst(data, insert_after)
schema.item.refresh_from_db()
response = Response(
status=c.HTTP_201_CREATED,
@ -81,6 +98,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstTargetSerializer,
responses={
c.HTTP_200_OK: s.NewMultiCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -117,11 +135,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstRenameSerializer,
responses={
c.HTTP_200_OK: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-rename')
def cst_rename(self, request: Request, pk):
''' Rename constituenta possibly changing type. '''
@ -134,10 +152,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type']
cst.save()
mapping = {old_alias: cst.alias}
schema.apply_mapping(mapping, change_aliases=False)
with transaction.atomic():
cst.save()
schema.apply_mapping(mapping={old_alias: cst.alias}, change_aliases=False)
schema.item.refresh_from_db()
cst.refresh_from_db()
@ -155,11 +173,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstSubstituteSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-substitute')
def cst_substitute(self, request: Request, pk):
''' Substitute occurrences of constituenta with another one. '''
@ -169,6 +187,8 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
context={'schema': schema.item}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
@ -185,6 +205,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstListSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -211,6 +232,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstMoveSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -279,6 +301,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.RSFormUploadSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -313,7 +336,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['get'])
@action(detail=True, methods=['get'], url_path='contents')
def contents(self, request: Request, pk):
''' Endpoint: View schema db contents (including constituents). '''
schema = s.RSFormSerializer(self.get_object())
@ -331,7 +354,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['get'])
@action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk):
''' Endpoint: Detailed schema view including statuses and parse. '''
serializer = s.RSFormParseSerializer(cast(m.LibraryItem, self.get_object()))
@ -349,7 +372,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None
},
)
@action(detail=True, methods=['post'])
@action(detail=True, methods=['post'], url_path='check')
def check(self, request: Request, pk):
''' Endpoint: Check RSLang expression against schema context. '''
serializer = s.ExpressionSerializer(data=request.data)
@ -371,7 +394,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['post'])
@action(detail=True, methods=['post'], url_path='resolve')
def resolve(self, request: Request, pk):
''' Endpoint: Resolve references in text against schema terms context. '''
serializer = s.TextSerializer(data=request.data)

View File

@ -10,8 +10,9 @@ from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.request import Request
from rest_framework.response import Response
from shared import permissions
from .. import models as m
from .. import permissions
from .. import serializers as s
from .. import utils
@ -54,6 +55,7 @@ class VersionViewset(
request=s.VersionCreateSerializer,
responses={
c.HTTP_201_CREATED: s.NewVersionResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}

View File

@ -1,14 +0,0 @@
''' Utility: Text messages. '''
# pylint: skip-file
def passwordAuthFailed():
return 'Неизвестное сочетание имени пользователя (email) и пароля'
def passwordsNotMatch():
return 'Введенные пароли не совпадают'
def emailAlreadyTaken():
return 'Пользователь с данным email уже существует'

View File

@ -4,8 +4,8 @@ from django.contrib.auth.password_validation import validate_password
from rest_framework import serializers
from apps.rsform.models import Editor, Subscription
from shared import messages as msg
from . import messages as msg
from . import models

View File

@ -1,30 +1,30 @@
''' Testing API: users. '''
from rest_framework.test import APIClient, APITestCase
from apps.rsform.tests.EndpointTester import EndpointTester, decl_endpoint
from apps.users.models import User
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestUserAPIViews(EndpointTester):
''' Testing Authentication views. '''
def setUp(self):
super().setUp()
super().setUpFullUsers()
@decl_endpoint('/users/api/login', method='post')
def test_login(self):
self.logout()
data = {'username': self.user.username, 'password': 'invalid'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'username': self.user.username, 'password': 'password'}
self.executeAccepted(data)
self.executeAccepted(data)
self.executeAccepted(data=data)
self.executeAccepted(data=data)
self.logout()
data = {'username': self.user.email, 'password': 'password'}
self.executeAccepted(data)
self.executeAccepted(data=data)
@decl_endpoint('/users/api/logout', method='post')
@ -59,7 +59,7 @@ class TestUserUserProfileAPIView(EndpointTester):
''' Testing User profile views. '''
def setUp(self):
super().setUp()
super().setUpFullUsers()
self.user.first_name = 'John'
self.user.second_name = 'Smith'
self.user.save()
@ -84,7 +84,7 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName',
}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.user.refresh_from_db()
self.assertEqual(response.data['email'], '123@mail.ru')
self.assertEqual(self.user.email, '123@mail.ru')
@ -98,10 +98,10 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'new',
'last_name': 'new2',
}
self.executeOK(data)
self.executeOK(data=data)
data = {'email': self.user2.email}
self.executeBadData(data)
self.executeBadData(data=data)
self.logout()
self.executeForbidden()
@ -113,14 +113,14 @@ class TestUserUserProfileAPIView(EndpointTester):
'old_password': 'invalid',
'new_password': 'password2'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'old_password': 'password',
'new_password': 'password2'
}
oldHash = self.user.password
response = self.executeNoContent(data)
response = self.executeNoContent(data=data)
self.user.refresh_from_db()
self.assertNotEqual(self.user.password, oldHash)
self.assertTrue(self.client.login(username=self.user.username, password='password2'))
@ -154,7 +154,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'username': 'NewUser',
@ -164,7 +164,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
response = self.executeCreated(data)
response = self.executeCreated(data=data)
self.assertTrue('id' in response.data)
self.assertEqual(response.data['username'], data['username'])
self.assertEqual(response.data['email'], data['email'])
@ -179,7 +179,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'username': 'NewUser2',
@ -189,4 +189,4 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)

View File

@ -1,4 +1,22 @@
[
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$720000$gFZkaBswurtL0naQKiUnW7$3b6SUN3fY2Xl1H7erAszVpQl2LpoKusan+yJP7Bp3JA=",
"last_login": "2024-06-03T20:57:02.522Z",
"is_superuser": true,
"username": "admin",
"first_name": "Администратор",
"last_name": "",
"email": "admin@mail.ru",
"is_staff": true,
"is_active": true,
"date_joined": "2024-05-27T18:31:48.913Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "auth.user",
"pk": 3,

View File

@ -74,6 +74,7 @@ INSTALLED_APPS = [
'apps.users',
'apps.rsform',
'apps.oss',
'drf_spectacular',
'drf_spectacular_sidecar',

View File

@ -9,6 +9,7 @@ from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, Spec
urlpatterns = [
path('admin', admin.site.urls),
path('api/', include('apps.rsform.urls')),
path('api/', include('apps.oss.urls')),
path('users/', include('apps.users.urls')),
path('schema', SpectacularAPIView.as_view(), name='schema'),
path('redoc', SpectacularRedocView.as_view()),

View File

@ -26,6 +26,21 @@ class EndpointTester(APITestCase):
''' Abstract base class for Testing endpoints. '''
def setUp(self):
self.factory = APIRequestFactory()
self.user = User.objects.create(
username='UserTest',
email='blank@test.com',
password='password'
)
self.user2 = User.objects.create(
username='UserTest2',
email='another@test.com',
password='password'
)
self.client = APIClient()
self.client.force_authenticate(user=self.user)
def setUpFullUsers(self):
self.factory = APIRequestFactory()
self.user = User.objects.create_user(
username='UserTest',

View File

@ -0,0 +1 @@
''' Utilities shared between applications. '''

View File

@ -6,6 +6,10 @@ def constituentaNotOwned(title: str):
return f'Конституента не принадлежит схеме: {title}'
def operationNotOwned(title: str):
return f'Операция не принадлежит схеме: {title}'
def substitutionNotInList():
return 'Отождествляемая конституента отсутствует в списке'
@ -64,3 +68,15 @@ def constituentaNoStructure():
def missingFile():
return 'Отсутствует прикрепленный файл'
def passwordAuthFailed():
return 'Неизвестное сочетание имени пользователя (email) и пароля'
def passwordsNotMatch():
return 'Введенные пароли не совпадают'
def emailAlreadyTaken():
return 'Пользователь с данным email уже существует'

View File

@ -11,16 +11,27 @@ from rest_framework.permissions import \
from rest_framework.request import Request
from rest_framework.views import APIView
from . import models as m
from apps.oss.models import Operation
from apps.rsform.models import (
AccessPolicy,
Constituenta,
Editor,
LibraryItem,
Subscription,
Version
)
from apps.users.models import User
def _extract_item(obj: Any) -> m.LibraryItem:
if isinstance(obj, m.LibraryItem):
def _extract_item(obj: Any) -> LibraryItem:
if isinstance(obj, LibraryItem):
return obj
elif isinstance(obj, m.Constituenta):
return cast(m.LibraryItem, obj.schema)
elif isinstance(obj, (m.Version, m.Subscription, m.Editor)):
return cast(m.LibraryItem, obj.item)
elif isinstance(obj, Constituenta):
return cast(LibraryItem, obj.schema)
elif isinstance(obj, Operation):
return cast(LibraryItem, obj.oss)
elif isinstance(obj, (Version, Subscription, Editor)):
return cast(LibraryItem, obj.item)
raise PermissionDenied({
'message': 'Invalid type error. Please contact developers',
'object_id': obj.id
@ -60,10 +71,10 @@ class ItemEditor(ItemOwner):
if request.user.is_anonymous:
return False
item = _extract_item(obj)
if m.Editor.objects.filter(
if Editor.objects.filter(
item=item,
editor=cast(m.User, request.user)
).exists() and item.access_policy != m.AccessPolicy.PRIVATE:
editor=cast(User, request.user)
).exists() and item.access_policy != AccessPolicy.PRIVATE:
return True
return super().has_object_permission(request, view, obj)
@ -76,7 +87,7 @@ class ItemAnyone(ItemEditor):
def has_object_permission(self, request: Request, view: APIView, obj: Any) -> bool:
item = _extract_item(obj)
if item.access_policy == m.AccessPolicy.PUBLIC:
if item.access_policy == AccessPolicy.PUBLIC:
return True
return super().has_object_permission(request, view, obj)

View File

@ -25,7 +25,6 @@ function BackendRun() {
DoMigrations
PrepareStatic -clearPrevious
AddInitialData
AddAdmin
} else {
DoMigrations
PrepareStatic
@ -48,13 +47,15 @@ function FlushData {
}
function AddInitialData {
& $pyExec manage.py loaddata $initialData
}
function AddAdmin {
if (Test-Path -Path $initialData -PathType Leaf) {
& $pyExec $djangoSrc flush --noinput
& $pyExec $djangoSrc loaddata $initialData
} else {
$env:DJANGO_SUPERUSER_USERNAME = 'admin'
$env:DJANGO_SUPERUSER_PASSWORD = '1234'
$env:DJANGO_SUPERUSER_EMAIL = 'admin@admin.com'
& $pyExec $djangoSrc createsuperuser --noinput
}
}
function DoMigrations {