Compare commits

...

5 Commits

Author SHA1 Message Date
Ivan
a256582a65 Implementing oss backend pt2
Some checks are pending
Backend CI / build (3.12) (push) Waiting to run
2024-07-19 19:28:55 +03:00
Ivan
86033ccd46 Update RunServer.ps1 2024-07-19 11:19:40 +03:00
Ivan
da7113ce7e Refactoring: preparing backend for oss pt1 2024-07-17 12:56:01 +03:00
Ivan
aed899ba70 Refactor RSForm pt2 2024-07-16 12:17:40 +03:00
Ivan
e9f77057d9 Refactor RSForm backend and tests 2024-07-15 12:35:47 +03:00
59 changed files with 1297 additions and 305 deletions

View File

@ -11,7 +11,9 @@
"--multi-line", "--multi-line",
"3", "3",
"--project", "--project",
"apps" "apps",
"--project",
"shared"
], ],
"autopep8.args": [ "autopep8.args": [
"--max-line-length", "--max-line-length",

View File

@ -69,6 +69,7 @@ This readme file is used mostly to document project dependencies
- Backticks - Backticks
- Svg Preview - Svg Preview
- TODO Highlight v2 - TODO Highlight v2
- Prettier
</pre> </pre>
</details> </details>
<details> <details>
@ -114,8 +115,10 @@ This readme file is used mostly to document project dependencies
<pre> <pre>
- Pylance - Pylance
- Pylint - Pylint
- Django
- autopep8 - autopep8
- isort
- Django
- SQLite
</pre> </pre>
</details> </details>

View File

View File

@ -0,0 +1,14 @@
''' Admin view: OperationSchema. '''
from django.contrib import admin
from . import models
class OperationAdmin(admin.ModelAdmin):
''' Admin model: Operation. '''
ordering = ['oss']
list_display = ['oss', 'operation_type', 'result', 'alias', 'title', 'comment', 'position_x', 'position_y']
search_fields = ['operation_type', 'title', 'alias']
admin.site.register(models.Operation, OperationAdmin)

View File

@ -0,0 +1,8 @@
''' Application: Operation Schema. '''
from django.apps import AppConfig
class RsformConfig(AppConfig):
''' Application config. '''
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.oss'

View File

@ -0,0 +1,69 @@
# Generated by Django 5.0.7 on 2024-07-17 09:51
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('rsform', '0008_alter_libraryitem_item_type'),
]
operations = [
migrations.CreateModel(
name='Operation',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('operation_type', models.CharField(choices=[
('input', 'Input'), ('synthesis', 'Synthesis')], default='input', max_length=10, verbose_name='Тип')),
('alias', models.CharField(blank=True, max_length=255, verbose_name='Шифр')),
('title', models.TextField(blank=True, verbose_name='Название')),
('comment', models.TextField(blank=True, verbose_name='Комментарий')),
('position_x', models.FloatField(default=0, verbose_name='Положение по горизонтали')),
('position_y', models.FloatField(default=0, verbose_name='Положение по вертикали')),
('oss', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='items', to='rsform.libraryitem', verbose_name='Схема синтеза')),
('result', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL,
related_name='producer', to='rsform.libraryitem', verbose_name='Связанная КС')),
],
options={
'verbose_name': 'Операция',
'verbose_name_plural': 'Операции',
},
),
migrations.CreateModel(
name='SynthesisSubstitution',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('transfer_term', models.BooleanField(default=False, verbose_name='Перенос термина')),
('operation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
to='oss.operation', verbose_name='Операция')),
('original', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='as_original', to='rsform.constituenta', verbose_name='Удаляемая конституента')),
('substitution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='as_substitute', to='rsform.constituenta', verbose_name='Замещающая конституента')),
],
options={
'verbose_name': 'Отождествление синтеза',
'verbose_name_plural': 'Таблицы отождествлений',
},
),
migrations.CreateModel(
name='Argument',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('argument', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='descendants', to='oss.operation', verbose_name='Аргумент')),
('operation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE,
related_name='arguments', to='oss.operation', verbose_name='Операция')),
],
options={
'verbose_name': 'Аргумент',
'verbose_name_plural': 'Аргументы операций',
'unique_together': {('operation', 'argument')},
},
),
]

View File

@ -0,0 +1,27 @@
''' Models: Operation Argument in OSS. '''
from django.db.models import CASCADE, ForeignKey, Model
class Argument(Model):
''' Operation Argument.'''
operation: ForeignKey = ForeignKey(
verbose_name='Операция',
to='oss.Operation',
on_delete=CASCADE,
related_name='arguments'
)
argument: ForeignKey = ForeignKey(
verbose_name='Аргумент',
to='oss.Operation',
on_delete=CASCADE,
related_name='descendants'
)
class Meta:
''' Model metadata. '''
verbose_name = 'Аргумент'
verbose_name_plural = 'Аргументы операций'
unique_together = [['operation', 'argument']]
def __str__(self) -> str:
return f'{self.argument.pk} -> {self.operation.pk}'

View File

@ -0,0 +1,71 @@
''' Models: Operation in OSS. '''
from django.db.models import (
CASCADE,
SET_NULL,
CharField,
FloatField,
ForeignKey,
Model,
TextChoices,
TextField
)
class OperationType(TextChoices):
''' Type of operation. '''
INPUT = 'input'
SYNTHESIS = 'synthesis'
class Operation(Model):
''' Operational schema Unit.'''
oss: ForeignKey = ForeignKey(
verbose_name='Схема синтеза',
to='rsform.LibraryItem',
on_delete=CASCADE,
related_name='items'
)
operation_type: CharField = CharField(
verbose_name='Тип',
max_length=10,
choices=OperationType.choices,
default=OperationType.INPUT
)
result: ForeignKey = ForeignKey(
verbose_name='Связанная КС',
to='rsform.LibraryItem',
null=True,
on_delete=SET_NULL,
related_name='producer'
)
alias: CharField = CharField(
verbose_name='Шифр',
max_length=255,
blank=True
)
title: TextField = TextField(
verbose_name='Название',
blank=True
)
comment: TextField = TextField(
verbose_name='Комментарий',
blank=True
)
position_x: FloatField = FloatField(
verbose_name='Положение по горизонтали',
default=0
)
position_y: FloatField = FloatField(
verbose_name='Положение по вертикали',
default=0
)
class Meta:
''' Model metadata. '''
verbose_name = 'Операция'
verbose_name_plural = 'Операции'
def __str__(self) -> str:
return f'Операция {self.alias}'

View File

@ -0,0 +1,36 @@
''' Models: SynthesisSubstitution. '''
from django.db.models import CASCADE, BooleanField, ForeignKey, Model
class SynthesisSubstitution(Model):
''' Substitutions as part of Synthesis operation in OSS.'''
operation: ForeignKey = ForeignKey(
verbose_name='Операция',
to='oss.Operation',
on_delete=CASCADE
)
original: ForeignKey = ForeignKey(
verbose_name='Удаляемая конституента',
to='rsform.Constituenta',
on_delete=CASCADE,
related_name='as_original'
)
substitution: ForeignKey = ForeignKey(
verbose_name='Замещающая конституента',
to='rsform.Constituenta',
on_delete=CASCADE,
related_name='as_substitute'
)
transfer_term: BooleanField = BooleanField(
verbose_name='Перенос термина',
default=False
)
class Meta:
''' Model metadata. '''
verbose_name = 'Отождествление синтеза'
verbose_name_plural = 'Таблицы отождествлений'
def __str__(self) -> str:
return f'{self.original.pk} -> {self.substitution.pk}'

View File

@ -0,0 +1,8 @@
''' Django: Models. '''
from apps.rsform.models import LibraryItem, LibraryItemType
from .api_OSS import OperationSchema
from .Argument import Argument
from .Operation import Operation, OperationType
from .SynthesisSubstitution import SynthesisSubstitution

View File

@ -0,0 +1,128 @@
''' Models: OSS API. '''
from typing import Optional
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet
from apps.rsform.models import LibraryItem, LibraryItemType
from shared import messages as msg
from .Argument import Argument
from .Operation import Operation
from .SynthesisSubstitution import SynthesisSubstitution
class OperationSchema:
''' Operations schema API. '''
def __init__(self, item: LibraryItem):
if item.item_type != LibraryItemType.OPERATION_SCHEMA:
raise ValueError(msg.libraryTypeUnexpected())
self.item = item
@staticmethod
def create(**kwargs) -> 'OperationSchema':
item = LibraryItem.objects.create(item_type=LibraryItemType.OPERATION_SCHEMA, **kwargs)
return OperationSchema(item=item)
def operations(self) -> QuerySet[Operation]:
''' Get QuerySet containing all operations of current OSS. '''
return Operation.objects.filter(oss=self.item)
def arguments(self) -> QuerySet[Argument]:
''' Operation arguments. '''
return Argument.objects.filter(operation__oss=self.item)
def substitutions(self) -> QuerySet[SynthesisSubstitution]:
''' Operation substitutions. '''
return SynthesisSubstitution.objects.filter(operation__oss=self.item)
def update_positions(self, data: list[dict]):
''' Update positions. '''
lookup = {x['id']: x for x in data}
operations = self.operations()
for item in operations:
if item.pk in lookup:
item.position_x = lookup[item.pk]['position_x']
item.position_y = lookup[item.pk]['position_y']
Operation.objects.bulk_update(operations, ['position_x', 'position_y'])
@transaction.atomic
def create_operation(self, **kwargs) -> Operation:
''' Insert new operation. '''
if kwargs['alias'] != '' and self.operations().filter(alias=kwargs['alias']).exists():
raise ValidationError(msg.aliasTaken(kwargs['alias']))
result = Operation.objects.create(
oss=self.item,
**kwargs
)
self.item.save()
result.refresh_from_db()
return result
@transaction.atomic
def delete_operation(self, operation: Operation):
''' Delete operation. '''
operation.delete()
# deal with attached schema
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_input(self, target: Operation, schema: Optional[LibraryItem]):
''' Set input schema for operation. '''
if schema == target.result:
return
if schema:
target.result = schema
target.alias = schema.alias
target.title = schema.title
target.comment = schema.comment
else:
target.result = None
target.save()
# trigger on_change effects
self.item.save()
@transaction.atomic
def add_argument(self, operation: Operation, argument: Operation) -> Optional[Argument]:
''' Add Argument to operation. '''
if Argument.objects.filter(operation=operation, argument=argument).exists():
return None
result = Argument.objects.create(operation=operation, argument=argument)
self.item.save()
return result
@transaction.atomic
def clear_arguments(self, target: Operation):
''' Clear all arguments for operation. '''
if not Argument.objects.filter(operation=target).exists():
return
Argument.objects.filter(operation=target).delete()
SynthesisSubstitution.objects.filter(operation=target).delete()
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_substitutions(self, target: Operation, substitutes: list[dict]):
''' Clear all arguments for operation. '''
SynthesisSubstitution.objects.filter(operation=target).delete()
for sub in substitutes:
SynthesisSubstitution.objects.create(
operation=target,
original=sub['original'],
substitution=sub['substitution'],
transfer_term=sub['transfer_term']
)
# trigger on_change effects
self.item.save()

View File

@ -0,0 +1,13 @@
''' REST API: Serializers. '''
from apps.rsform.serializers import LibraryItemSerializer
from .basics import OperationPositionSerializer, PositionsSerializer
from .data_access import (
ArgumentSerializer,
OperationCreateSerializer,
OperationDeleteSerializer,
OperationSchemaSerializer,
OperationSerializer
)
from .schema_typing import NewOperationResponse

View File

@ -0,0 +1,16 @@
''' Basic serializers that do not interact with database. '''
from rest_framework import serializers
class OperationPositionSerializer(serializers.Serializer):
''' Operation position. '''
id = serializers.IntegerField()
position_x = serializers.FloatField()
position_y = serializers.FloatField()
class PositionsSerializer(serializers.Serializer):
''' Operations position for OperationSchema. '''
positions = serializers.ListField(
child=OperationPositionSerializer()
)

View File

@ -0,0 +1,108 @@
''' Serializers for persistent data manipulation. '''
from typing import cast
from django.db.models import F
from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField
from apps.rsform.models import LibraryItem
from apps.rsform.serializers import LibraryItemDetailsSerializer
from shared import messages as msg
from ..models import Argument, Operation, OperationSchema, OperationType
from .basics import OperationPositionSerializer
class OperationSerializer(serializers.ModelSerializer):
''' Serializer: Operation data. '''
class Meta:
''' serializer metadata. '''
model = Operation
fields = '__all__'
read_only_fields = ('id', 'oss')
class ArgumentSerializer(serializers.ModelSerializer):
''' Serializer: Operation data. '''
class Meta:
''' serializer metadata. '''
model = Argument
fields = ('operation', 'argument')
class OperationCreateSerializer(serializers.Serializer):
''' Serializer: Operation creation. '''
class OperationData(serializers.ModelSerializer):
''' Serializer: Operation creation data. '''
alias = serializers.CharField()
operation_type = serializers.ChoiceField(OperationType.choices)
class Meta:
''' serializer metadata. '''
model = Operation
fields = \
'alias', 'operation_type', 'title', \
'comment', 'position_x', 'position_y'
item_data = OperationData()
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
class OperationDeleteSerializer(serializers.Serializer):
''' Serializer: Delete operation. '''
target = PKField(many=False, queryset=Operation.objects.all())
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
def validate(self, attrs):
oss = cast(LibraryItem, self.context['oss'])
operation = cast(Operation, attrs['target'])
if oss and operation.oss != oss:
raise serializers.ValidationError({
f'{operation.id}': msg.operationNotOwned(oss.title)
})
self.instance = operation
return attrs
class OperationSchemaSerializer(serializers.ModelSerializer):
''' Serializer: Detailed data for OSS. '''
items = serializers.ListField(
child=OperationSerializer()
)
graph = serializers.ListField(
child=ArgumentSerializer()
)
class Meta:
''' serializer metadata. '''
model = LibraryItem
fields = '__all__'
def to_representation(self, instance: LibraryItem):
result = LibraryItemDetailsSerializer(instance).data
oss = OperationSchema(instance)
result['items'] = []
for operation in oss.operations():
result['items'].append(OperationSerializer(operation).data)
result['graph'] = []
for argument in oss.arguments():
result['graph'].append(ArgumentSerializer(argument).data)
result['substitutions'] = []
for substitution in oss.substitutions().values(
'operation',
'original',
'transfer_term',
'substitution',
original_alias=F('original__alias'),
original_term=F('original__term_resolved'),
substitution_alias=F('substitution__alias'),
substitution_term=F('substitution__term_resolved'),
):
result['substitutions'].append(substitution)
return result

View File

@ -0,0 +1,10 @@
''' Utility serializers for REST API schema - SHOULD NOT BE ACCESSED DIRECTLY. '''
from rest_framework import serializers
from .data_access import OperationSchemaSerializer, OperationSerializer
class NewOperationResponse(serializers.Serializer):
''' Serializer: Create operation response. '''
new_operation = OperationSerializer()
oss = OperationSchemaSerializer()

View File

@ -0,0 +1,3 @@
''' Tests. '''
from .s_models import *
from .s_views import *

View File

@ -0,0 +1 @@
''' Tests for Django Models. '''

View File

@ -0,0 +1,2 @@
''' Tests for REST API. '''
from .t_oss import *

View File

@ -0,0 +1,189 @@
''' Testing API: Operation Schema. '''
from rest_framework import status
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestOssViewset(EndpointTester):
''' Testing OSS view. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(title='Test', alias='T1', owner=self.user)
self.owned_id = self.owned.item.pk
self.unowned = OperationSchema.create(title='Test2', alias='T2')
self.unowned_id = self.unowned.item.pk
self.private = OperationSchema.create(title='Test2', alias='T2', access_policy=AccessPolicy.PRIVATE)
self.private_id = self.private.item.pk
self.invalid_id = self.private.item.pk + 1337
def populateData(self):
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.item
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.item
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.add_argument(self.operation3, self.operation1)
self.owned.add_argument(self.operation3, self.operation2)
self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1,
'substitution': self.ks2x1,
'transfer_term': False
}])
@decl_endpoint('/api/oss/{item}/details', method='get')
def test_details(self):
self.populateData()
response = self.executeOK(item=self.owned_id)
self.assertEqual(response.data['owner'], self.owned.item.owner.pk)
self.assertEqual(response.data['title'], self.owned.item.title)
self.assertEqual(response.data['alias'], self.owned.item.alias)
self.assertEqual(response.data['location'], self.owned.item.location)
self.assertEqual(response.data['access_policy'], self.owned.item.access_policy)
self.assertEqual(response.data['visible'], self.owned.item.visible)
self.assertEqual(response.data['item_type'], LibraryItemType.OPERATION_SCHEMA)
self.assertEqual(len(response.data['items']), 3)
self.assertEqual(response.data['items'][0]['id'], self.operation1.pk)
self.assertEqual(response.data['items'][0]['operation_type'], self.operation1.operation_type)
self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk)
self.assertEqual(sub['transfer_term'], False)
self.assertEqual(sub['original_alias'], self.ks1x1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved)
graph = response.data['graph']
self.assertEqual(len(graph), 2)
self.assertEqual(graph[0]['operation'], self.operation3.pk)
self.assertEqual(graph[0]['argument'], self.operation1.pk)
self.assertEqual(graph[1]['operation'], self.operation3.pk)
self.assertEqual(graph[1]['argument'], self.operation2.pk)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
self.logout()
self.executeOK(item=self.owned_id)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/update-positions', method='patch')
def test_update_positions(self):
self.populateData()
self.executeBadData(item=self.owned_id)
data = {'positions': []}
self.executeOK(data=data)
data = {'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337},
{'id': self.operation2.pk, 'position_x': 36.1, 'position_y': 1437},
{'id': self.invalid_id, 'position_x': 31, 'position_y': 12},
]}
self.toggle_admin(True)
self.executeOK(data=data, item=self.unowned_id)
self.operation1.refresh_from_db()
self.assertNotEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertNotEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.toggle_admin(False)
self.executeOK(data=data, item=self.owned_id)
self.operation1.refresh_from_db()
self.operation2.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.assertEqual(self.operation2.position_x, data['positions'][1]['position_x'])
self.assertEqual(self.operation2.position_y, data['positions'][1]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/create-operation', method='post')
def test_create_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'item_data': {
'alias': 'Test3',
'title': 'Test title',
'comment': 'Тест кириллицы',
'position_x': 1,
'position_y': 1,
},
'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337}
]
}
self.executeBadData(data=data)
data['item_data']['operation_type'] = 'invalid'
self.executeBadData(data=data)
data['item_data']['operation_type'] = OperationType.INPUT
response = self.executeCreated(data=data)
self.assertEqual(len(response.data['oss']['items']), 4)
new_operation = response.data['new_operation']
self.assertEqual(new_operation['alias'], data['item_data']['alias'])
self.assertEqual(new_operation['operation_type'], data['item_data']['operation_type'])
self.assertEqual(new_operation['title'], data['item_data']['title'])
self.assertEqual(new_operation['comment'], data['item_data']['comment'])
self.assertEqual(new_operation['position_x'], data['item_data']['position_x'])
self.assertEqual(new_operation['position_y'], data['item_data']['position_y'])
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.toggle_admin(True)
self.executeCreated(data=data, item=self.unowned_id)
@decl_endpoint('/api/oss/{item}/delete-operation', method='patch')
def test_delete_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'positions': []
}
self.executeBadData(data=data)
data['target'] = self.operation1.pk
self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id)
self.logout()
self.executeForbidden(data=data, item=self.owned_id)
self.login()
response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 2)

View File

@ -0,0 +1,12 @@
''' Routing: Operation Schema. '''
from django.urls import include, path
from rest_framework import routers
from . import views
library_router = routers.SimpleRouter(trailing_slash=False)
library_router.register('oss', views.OssViewSet, 'OSS')
urlpatterns = [
path('', include(library_router.urls)),
]

View File

@ -0,0 +1,2 @@
''' REST API: Endpoint processors. '''
from .oss import OssViewSet

View File

@ -0,0 +1,141 @@
''' Endpoints for OSS. '''
from typing import cast
from django.db import transaction
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
from rest_framework import status as c
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from shared import permissions
from .. import models as m
from .. import serializers as s
@extend_schema(tags=['OSS'])
@extend_schema_view()
class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.RetrieveAPIView):
''' Endpoint: OperationSchema. '''
queryset = m.LibraryItem.objects.filter(item_type=m.LibraryItemType.OPERATION_SCHEMA)
serializer_class = s.LibraryItemSerializer
def _get_schema(self) -> m.OperationSchema:
return m.OperationSchema(cast(m.LibraryItem, self.get_object()))
def get_permissions(self):
''' Determine permission class. '''
if self.action in [
'create_operation',
'delete_operation',
'update_positions'
]:
permission_list = [permissions.ItemEditor]
elif self.action in ['details']:
permission_list = [permissions.ItemAnyone]
else:
permission_list = [permissions.Anyone]
return [permission() for permission in permission_list]
@extend_schema(
summary='get operations data',
tags=['OSS'],
request=None,
responses={
c.HTTP_200_OK: s.OperationSchemaSerializer,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk):
''' Endpoint: Detailed OSS data. '''
serializer = s.OperationSchemaSerializer(cast(m.LibraryItem, self.get_object()))
return Response(
status=c.HTTP_200_OK,
data=serializer.data
)
@extend_schema(
summary='update positions',
tags=['OSS'],
request=s.PositionsSerializer,
responses={
c.HTTP_200_OK: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='update-positions')
def update_positions(self, request: Request, pk):
''' Endpoint: Update operations positions. '''
schema = self._get_schema()
serializer = s.PositionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
schema.update_positions(serializer.validated_data['positions'])
return Response(status=c.HTTP_200_OK)
@extend_schema(
summary='create operation',
tags=['OSS'],
request=s.OperationCreateSerializer(),
responses={
c.HTTP_201_CREATED: s.NewOperationResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['post'], url_path='create-operation')
def create_operation(self, request: Request, pk):
''' Create new operation. '''
schema = self._get_schema()
serializer = s.OperationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
new_operation = schema.create_operation(**serializer.validated_data['item_data'])
schema.item.refresh_from_db()
response = Response(
status=c.HTTP_201_CREATED,
data={
'new_operation': s.OperationSerializer(new_operation).data,
'oss': s.OperationSchemaSerializer(schema.item).data
}
)
return response
@extend_schema(
summary='delete operation',
tags=['OSS'],
request=s.OperationDeleteSerializer,
responses={
c.HTTP_200_OK: s.OperationSchemaSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='delete-operation')
def delete_operation(self, request: Request, pk):
''' Endpoint: Delete operation. '''
schema = self._get_schema()
serializer = s.OperationDeleteSerializer(
data=request.data,
context={'oss': schema.item}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
schema.delete_operation(serializer.validated_data['target'])
schema.item.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.OperationSchemaSerializer(schema.item).data
)

View File

@ -0,0 +1,18 @@
# Generated by Django 5.0.7 on 2024-07-17 09:51
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('rsform', '0007_location_and_flags'),
]
operations = [
migrations.AlterField(
model_name='libraryitem',
name='item_type',
field=models.CharField(choices=[('rsform', 'Rsform'), ('oss', 'Operation Schema')], max_length=50, verbose_name='Тип'),
),
]

View File

@ -9,6 +9,7 @@ from django.db.models import (
DateTimeField, DateTimeField,
ForeignKey, ForeignKey,
Model, Model,
QuerySet,
TextChoices, TextChoices,
TextField TextField
) )
@ -23,7 +24,7 @@ from .Version import Version
class LibraryItemType(TextChoices): class LibraryItemType(TextChoices):
''' Type of library items ''' ''' Type of library items '''
RSFORM = 'rsform' RSFORM = 'rsform'
OPERATIONS_SCHEMA = 'oss' OPERATION_SCHEMA = 'oss'
class AccessPolicy(TextChoices): class AccessPolicy(TextChoices):
@ -113,17 +114,17 @@ class LibraryItem(Model):
def get_absolute_url(self): def get_absolute_url(self):
return f'/api/library/{self.pk}' return f'/api/library/{self.pk}'
def subscribers(self) -> list[Subscription]: def subscribers(self) -> list[User]:
''' Get all subscribers for this item. ''' ''' Get all subscribers for this item. '''
return [subscription.user for subscription in Subscription.objects.filter(item=self.pk)] return [subscription.user for subscription in Subscription.objects.filter(item=self.pk).only('user')]
def versions(self) -> list[Version]: def editors(self) -> list[User]:
''' Get all Versions of this item. '''
return list(Version.objects.filter(item=self.pk).order_by('-time_create'))
def editors(self) -> list[Editor]:
''' Get all Editors of this item. ''' ''' Get all Editors of this item. '''
return [item.editor for item in Editor.objects.filter(item=self.pk)] return [item.editor for item in Editor.objects.filter(item=self.pk).only('editor')]
def versions(self) -> QuerySet[Version]:
''' Get all Versions of this item. '''
return Version.objects.filter(item=self.pk).order_by('-time_create')
@transaction.atomic @transaction.atomic
def save(self, *args, **kwargs): def save(self, *args, **kwargs):

View File

@ -1,13 +1,14 @@
''' Models: RSForm API. ''' ''' Models: RSForm API. '''
from copy import deepcopy from copy import deepcopy
from typing import Optional, Union, cast from typing import Optional, cast
from cctext import Entity, Resolver, TermForm, extract_entities, split_grams from cctext import Entity, Resolver, TermForm, extract_entities, split_grams
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.db import transaction from django.db import transaction
from django.db.models import QuerySet from django.db.models import QuerySet
from .. import messages as msg from shared import messages as msg
from ..graph import Graph from ..graph import Graph
from .api_RSLanguage import ( from .api_RSLanguage import (
extract_globals, extract_globals,
@ -39,7 +40,7 @@ class RSForm:
def create(**kwargs) -> 'RSForm': def create(**kwargs) -> 'RSForm':
return RSForm(LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, **kwargs)) return RSForm(LibraryItem.objects.create(item_type=LibraryItemType.RSFORM, **kwargs))
def constituents(self) -> QuerySet['Constituenta']: def constituents(self) -> QuerySet[Constituenta]:
''' Get QuerySet containing all constituents of current RSForm. ''' ''' Get QuerySet containing all constituents of current RSForm. '''
return Constituenta.objects.filter(schema=self.item) return Constituenta.objects.filter(schema=self.item)
@ -104,11 +105,39 @@ class RSForm:
result = max(result, int(alias[1:])) result = max(result, int(alias[1:]))
return result return result
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[Constituenta] = None) -> Constituenta:
''' Create new cst from data. '''
if insert_after is None:
position = _INSERT_LAST
else:
position = insert_after.order + 1
result = self.insert_new(data['alias'], data['cst_type'], position)
result.convention = data.get('convention', '')
result.definition_formal = data.get('definition_formal', '')
result.term_forms = data.get('term_forms', [])
result.term_raw = data.get('term_raw', '')
result.definition_raw = data.get('definition_raw', '')
if result.term_raw != '' or result.definition_raw != '':
resolver = self.resolver()
if result.term_raw != '':
resolved = resolver.resolve(result.term_raw)
result.term_resolved = resolved
resolver.context[result.alias] = Entity(result.alias, resolved)
if result.definition_raw != '':
result.definition_resolved = resolver.resolve(result.definition_raw)
result.save()
self.on_term_change([result.id])
result.refresh_from_db()
return result
@transaction.atomic @transaction.atomic
def insert_new( def insert_new(
self, self,
alias: str, alias: str,
cst_type: Union[CstType, None] = None, cst_type: Optional[CstType] = None,
position: int = _INSERT_LAST, position: int = _INSERT_LAST,
**kwargs **kwargs
) -> Constituenta: ) -> Constituenta:
@ -195,27 +224,6 @@ class RSForm:
self.resolve_all_text() self.resolve_all_text()
self.item.save() self.item.save()
@transaction.atomic
def create_cst(self, data: dict, insert_after: Optional[str] = None) -> Constituenta:
''' Create new cst from data. '''
resolver = self.resolver()
cst = self._insert_new(data, insert_after)
cst.convention = data.get('convention', '')
cst.definition_formal = data.get('definition_formal', '')
cst.term_forms = data.get('term_forms', [])
cst.term_raw = data.get('term_raw', '')
if cst.term_raw != '':
resolved = resolver.resolve(cst.term_raw)
cst.term_resolved = resolved
resolver.context[cst.alias] = Entity(cst.alias, resolved)
cst.definition_raw = data.get('definition_raw', '')
if cst.definition_raw != '':
cst.definition_resolved = resolver.resolve(cst.definition_raw)
cst.save()
self.on_term_change([cst.id])
cst.refresh_from_db()
return cst
@transaction.atomic @transaction.atomic
def substitute( def substitute(
self, self,
@ -363,13 +371,6 @@ class RSForm:
cst.save() cst.save()
order += 1 order += 1
def _insert_new(self, data: dict, insert_after: Optional[str] = None) -> Constituenta:
if insert_after is not None:
cst_after = Constituenta.objects.get(pk=insert_after)
return self.insert_new(data['alias'], data['cst_type'], cst_after.order + 1)
else:
return self.insert_new(data['alias'], data['cst_type'])
def _graph_formal(self) -> Graph[int]: def _graph_formal(self) -> Graph[int]:
''' Graph based on formal definitions. ''' ''' Graph based on formal definitions. '''
result: Graph[int] = Graph() result: Graph[int] = Graph()

View File

@ -6,7 +6,8 @@ from typing import Set, Tuple, cast
import pyconcept import pyconcept
from .. import messages as msg from shared import messages as msg
from .Constituenta import CstType from .Constituenta import CstType
_RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line _RE_GLOBALS = r'[XCSADFPT]\d+' # cspell:disable-line

View File

@ -22,6 +22,7 @@ from .data_access import (
InlineSynthesisSerializer, InlineSynthesisSerializer,
LibraryItemBaseSerializer, LibraryItemBaseSerializer,
LibraryItemCloneSerializer, LibraryItemCloneSerializer,
LibraryItemDetailsSerializer,
LibraryItemSerializer, LibraryItemSerializer,
RSFormParseSerializer, RSFormParseSerializer,
RSFormSerializer, RSFormSerializer,

View File

@ -4,7 +4,8 @@ from typing import cast
from cctext import EntityReference, Reference, ReferenceType, Resolver, SyntacticReference from cctext import EntityReference, Reference, ReferenceType, Resolver, SyntacticReference
from rest_framework import serializers from rest_framework import serializers
from .. import messages as msg from shared import messages as msg
from ..models import AccessPolicy, validate_location from ..models import AccessPolicy, validate_location
@ -34,7 +35,7 @@ class LocationSerializer(serializers.Serializer):
class AccessPolicySerializer(serializers.Serializer): class AccessPolicySerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. ''' ''' Serializer: Constituenta renaming. '''
access_policy = serializers.CharField(max_length=500) access_policy = serializers.CharField()
def validate(self, attrs): def validate(self, attrs):
attrs = super().validate(attrs) attrs = super().validate(attrs)

View File

@ -7,7 +7,8 @@ from django.db import transaction
from rest_framework import serializers from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField from rest_framework.serializers import PrimaryKeyRelatedField as PKField
from .. import messages as msg from shared import messages as msg
from ..models import Constituenta, CstType, LibraryItem, RSForm, Version from ..models import Constituenta, CstType, LibraryItem, RSForm, Version
from .basics import CstParseSerializer from .basics import CstParseSerializer
from .io_pyconcept import PyConceptAdapter from .io_pyconcept import PyConceptAdapter
@ -140,6 +141,8 @@ class CstDetailsSerializer(serializers.ModelSerializer):
class CstCreateSerializer(serializers.ModelSerializer): class CstCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. ''' ''' Serializer: Constituenta creation. '''
insert_after = serializers.IntegerField(required=False, allow_null=True) insert_after = serializers.IntegerField(required=False, allow_null=True)
alias = serializers.CharField(max_length=8)
cst_type = serializers.ChoiceField(CstType.choices)
class Meta: class Meta:
''' serializer metadata. ''' ''' serializer metadata. '''

View File

@ -2,7 +2,8 @@
from django.db import transaction from django.db import transaction
from rest_framework import serializers from rest_framework import serializers
from .. import messages as msg from shared import messages as msg
from ..models import Constituenta, LibraryItem, RSForm from ..models import Constituenta, LibraryItem, RSForm
from ..utils import fix_old_references from ..utils import fix_old_references

View File

@ -4,7 +4,8 @@ from typing import Optional, Union, cast
import pyconcept import pyconcept
from .. import messages as msg from shared import messages as msg
from ..models import RSForm from ..models import RSForm

View File

@ -1,5 +1,5 @@
''' Tests. ''' ''' Tests. '''
from .s_models.t_RSForm import * from .s_models import *
from .s_views import * from .s_views import *
from .t_graph import * from .t_graph import *
from .t_imports import * from .t_imports import *

View File

@ -1,4 +1,4 @@
''' Tests for REST API. ''' ''' Tests for Django Models. '''
from .t_Constituenta import * from .t_Constituenta import *
from .t_Editor import * from .t_Editor import *
from .t_LibraryItem import * from .t_LibraryItem import *

View File

@ -101,6 +101,26 @@ class TestRSForm(TestCase):
self.assertEqual(x2.schema, self.schema.item) self.assertEqual(x2.schema, self.schema.item)
self.assertEqual(x1.order, 1) self.assertEqual(x1.order, 1)
def test_create_cst(self):
data = {
'alias': 'X3',
'cst_type': CstType.BASE,
'term_raw': 'слон',
'definition_raw': 'test',
'convention': 'convention'
}
x1 = self.schema.insert_new('X1')
x2 = self.schema.insert_new('X2')
x3 = self.schema.create_cst(data=data, insert_after=x1)
x2.refresh_from_db()
self.assertEqual(x3.alias, data['alias'])
self.assertEqual(x3.term_raw, data['term_raw'])
self.assertEqual(x3.definition_raw, data['definition_raw'])
self.assertEqual(x2.order, 3)
self.assertEqual(x3.order, 2)
def test_create_cst_resolve(self): def test_create_cst_resolve(self):
x1 = self.schema.insert_new( x1 = self.schema.insert_new(

View File

@ -1,7 +1,7 @@
''' Testing views ''' ''' Testing views '''
from cctext import split_grams from cctext import split_grams
from ..EndpointTester import EndpointTester, decl_endpoint from shared.EndpointTester import EndpointTester, decl_endpoint
class TestNaturalLanguageViews(EndpointTester): class TestNaturalLanguageViews(EndpointTester):
@ -14,20 +14,20 @@ class TestNaturalLanguageViews(EndpointTester):
@decl_endpoint(endpoint='/api/cctext/parse', method='post') @decl_endpoint(endpoint='/api/cctext/parse', method='post')
def test_parse_text(self): def test_parse_text(self):
data = {'text': 'синим слонам'} data = {'text': 'синим слонам'}
response = self.executeOK(data) response = self.executeOK(data=data)
self._assert_tags(response.data['result'], 'datv,NOUN,plur,anim,masc') self._assert_tags(response.data['result'], 'datv,NOUN,plur,anim,masc')
@decl_endpoint(endpoint='/api/cctext/inflect', method='post') @decl_endpoint(endpoint='/api/cctext/inflect', method='post')
def test_inflect(self): def test_inflect(self):
data = {'text': 'синий слон', 'grams': 'plur,datv'} data = {'text': 'синий слон', 'grams': 'plur,datv'}
response = self.executeOK(data) response = self.executeOK(data=data)
self.assertEqual(response.data['result'], 'синим слонам') self.assertEqual(response.data['result'], 'синим слонам')
@decl_endpoint(endpoint='/api/cctext/generate-lexeme', method='post') @decl_endpoint(endpoint='/api/cctext/generate-lexeme', method='post')
def test_generate_lexeme(self): def test_generate_lexeme(self):
data = {'text': 'синий слон'} data = {'text': 'синий слон'}
response = self.executeOK(data) response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 12) self.assertEqual(len(response.data['items']), 12)
self.assertEqual(response.data['items'][0]['text'], 'синий слон') self.assertEqual(response.data['items'][0]['text'], 'синий слон')

View File

@ -1,7 +1,6 @@
''' Testing API: Constituents. ''' ''' Testing API: Constituents. '''
from apps.rsform.models import Constituenta, CstType, RSForm from apps.rsform.models import Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
from ..EndpointTester import EndpointTester, decl_endpoint
class TestConstituentaAPI(EndpointTester): class TestConstituentaAPI(EndpointTester):
@ -52,18 +51,18 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch') @decl_endpoint('/api/constituents/{item}', method='patch')
def test_partial_update(self): def test_partial_update(self):
data = {'convention': 'tt'} data = {'convention': 'tt'}
self.executeForbidden(data, item=self.cst2.pk) self.executeForbidden(data=data, item=self.cst2.pk)
self.logout() self.logout()
self.executeForbidden(data, item=self.cst1.pk) self.executeForbidden(data=data, item=self.cst1.pk)
self.login() self.login()
response = self.executeOK(data, item=self.cst1.pk) response = self.executeOK(data=data, item=self.cst1.pk)
self.cst1.refresh_from_db() self.cst1.refresh_from_db()
self.assertEqual(response.data['convention'], 'tt') self.assertEqual(response.data['convention'], 'tt')
self.assertEqual(self.cst1.convention, 'tt') self.assertEqual(self.cst1.convention, 'tt')
self.executeOK(data, item=self.cst1.pk) self.executeOK(data=data, item=self.cst1.pk)
@decl_endpoint('/api/constituents/{item}', method='patch') @decl_endpoint('/api/constituents/{item}', method='patch')
@ -72,7 +71,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': 'New term', 'term_raw': 'New term',
'definition_raw': 'New def' 'definition_raw': 'New def'
} }
response = self.executeOK(data, item=self.cst3.pk) response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db() self.cst3.refresh_from_db()
self.assertEqual(response.data['term_resolved'], 'New term') self.assertEqual(response.data['term_resolved'], 'New term')
self.assertEqual(self.cst3.term_resolved, 'New term') self.assertEqual(self.cst3.term_resolved, 'New term')
@ -86,7 +85,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': '@{X1|nomn,sing}', 'term_raw': '@{X1|nomn,sing}',
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}' 'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}'
} }
response = self.executeOK(data, item=self.cst3.pk) response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db() self.cst3.refresh_from_db()
self.assertEqual(self.cst3.term_resolved, self.cst1.term_resolved) self.assertEqual(self.cst3.term_resolved, self.cst1.term_resolved)
self.assertEqual(response.data['term_resolved'], self.cst1.term_resolved) self.assertEqual(response.data['term_resolved'], self.cst1.term_resolved)
@ -97,7 +96,7 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch') @decl_endpoint('/api/constituents/{item}', method='patch')
def test_readonly_cst_fields(self): def test_readonly_cst_fields(self):
data = {'alias': 'X33', 'order': 10} data = {'alias': 'X33', 'order': 10}
response = self.executeOK(data, item=self.cst1.pk) response = self.executeOK(data=data, item=self.cst1.pk)
self.assertEqual(response.data['alias'], 'X1') self.assertEqual(response.data['alias'], 'X1')
self.assertEqual(response.data['alias'], self.cst1.alias) self.assertEqual(response.data['alias'], self.cst1.alias)
self.assertEqual(response.data['order'], self.cst1.order) self.assertEqual(response.data['order'], self.cst1.order)

View File

@ -11,9 +11,8 @@ from apps.rsform.models import (
RSForm, RSForm,
Subscription Subscription
) )
from shared.EndpointTester import EndpointTester, decl_endpoint
from ..EndpointTester import EndpointTester, decl_endpoint from shared.testing_utils import response_contains
from ..testing_utils import response_contains
class TestLibraryViewset(EndpointTester): class TestLibraryViewset(EndpointTester):
@ -49,17 +48,17 @@ class TestLibraryViewset(EndpointTester):
'title': 'Title', 'title': 'Title',
'alias': 'alias', 'alias': 'alias',
} }
self.executeBadData(data) self.executeBadData(data=data)
data = { data = {
'item_type': LibraryItemType.OPERATIONS_SCHEMA, 'item_type': LibraryItemType.OPERATION_SCHEMA,
'title': 'Title', 'title': 'Title',
'alias': 'alias', 'alias': 'alias',
'access_policy': AccessPolicy.PROTECTED, 'access_policy': AccessPolicy.PROTECTED,
'visible': False, 'visible': False,
'read_only': True 'read_only': True
} }
response = self.executeCreated(data) response = self.executeCreated(data=data)
self.assertEqual(response.data['owner'], self.user.pk) self.assertEqual(response.data['owner'], self.user.pk)
self.assertEqual(response.data['item_type'], data['item_type']) self.assertEqual(response.data['item_type'], data['item_type'])
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
@ -70,25 +69,25 @@ class TestLibraryViewset(EndpointTester):
self.logout() self.logout()
data = {'title': 'Title2'} data = {'title': 'Title2'}
self.executeForbidden(data) self.executeForbidden(data=data)
@decl_endpoint('/api/library/{item}', method='patch') @decl_endpoint('/api/library/{item}', method='patch')
def test_update(self): def test_update(self):
data = {'id': self.unowned.pk, 'title': 'New Title'} data = {'id': self.unowned.pk, 'title': 'New Title'}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_editor(self.unowned, True) self.toggle_editor(self.unowned, True)
response = self.executeOK(data, item=self.unowned.pk) response = self.executeOK(data=data, item=self.unowned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.unowned.access_policy = AccessPolicy.PRIVATE self.unowned.access_policy = AccessPolicy.PRIVATE
self.unowned.save() self.unowned.save()
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
data = {'id': self.owned.pk, 'title': 'New Title'} data = {'id': self.owned.pk, 'title': 'New Title'}
response = self.executeOK(data, item=self.owned.pk) response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], self.owned.alias) self.assertEqual(response.data['alias'], self.owned.alias)
@ -99,7 +98,7 @@ class TestLibraryViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED, 'access_policy': AccessPolicy.PROTECTED,
'location': LocationHead.LIBRARY 'location': LocationHead.LIBRARY
} }
response = self.executeOK(data, item=self.owned.pk) response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['owner'], self.owned.owner.pk) self.assertEqual(response.data['owner'], self.owned.owner.pk)
self.assertEqual(response.data['access_policy'], self.owned.access_policy) self.assertEqual(response.data['access_policy'], self.owned.access_policy)
@ -112,22 +111,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'user': self.user.pk} data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user) self.assertEqual(self.owned.owner, self.user)
data = {'user': self.user2.pk} data = {'user': self.user2.pk}
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user2) self.assertEqual(self.owned.owner, self.user2)
self.assertEqual(self.owned.time_update, time_update) self.assertEqual(self.owned.time_update, time_update)
self.executeForbidden(data, item=self.owned.pk) self.executeForbidden(data=data, item=self.owned.pk)
self.toggle_admin(True) self.toggle_admin(True)
data = {'user': self.user.pk} data = {'user': self.user.pk}
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user) self.assertEqual(self.owned.owner, self.user)
@ -136,20 +135,20 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'access_policy': 'invalid'} data = {'access_policy': 'invalid'}
self.executeBadData(data, item=self.owned.pk) self.executeBadData(data=data, item=self.owned.pk)
data = {'access_policy': AccessPolicy.PRIVATE} data = {'access_policy': AccessPolicy.PRIVATE}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.access_policy, data['access_policy']) self.assertEqual(self.owned.access_policy, data['access_policy'])
self.toggle_editor(self.unowned, True) self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True) self.toggle_admin(True)
self.executeOK(data, item=self.unowned.pk) self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db() self.unowned.refresh_from_db()
self.assertEqual(self.unowned.access_policy, data['access_policy']) self.assertEqual(self.unowned.access_policy, data['access_policy'])
@ -158,29 +157,29 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'location': 'invalid'} data = {'location': 'invalid'}
self.executeBadData(data, item=self.owned.pk) self.executeBadData(data=data, item=self.owned.pk)
data = {'location': '/U/temp'} data = {'location': '/U/temp'}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location']) self.assertEqual(self.owned.location, data['location'])
data = {'location': LocationHead.LIBRARY} data = {'location': LocationHead.LIBRARY}
self.executeForbidden(data, item=self.owned.pk) self.executeForbidden(data=data, item=self.owned.pk)
data = {'location': '/U/temp'} data = {'location': '/U/temp'}
self.toggle_editor(self.unowned, True) self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True) self.toggle_admin(True)
data = {'location': LocationHead.LIBRARY} data = {'location': LocationHead.LIBRARY}
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location']) self.assertEqual(self.owned.location, data['location'])
self.executeOK(data, item=self.unowned.pk) self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db() self.unowned.refresh_from_db()
self.assertEqual(self.unowned.location, data['location']) self.assertEqual(self.unowned.location, data['location'])
@ -189,22 +188,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'user': self.invalid_user} data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk) self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk} data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update) self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user]) self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user]) self.assertEqual(self.owned.editors(), [self.user])
data = {'user': self.user2.pk} data = {'user': self.user2.pk}
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user, self.user2])) self.assertEqual(set(self.owned.editors()), set([self.user, self.user2]))
@ -213,25 +212,25 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'user': self.invalid_user} data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk) self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk} data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update) self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), []) self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user) Editor.add(item=self.owned, user=self.user)
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), []) self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user) Editor.add(item=self.owned, user=self.user)
Editor.add(item=self.owned, user=self.user2) Editor.add(item=self.owned, user=self.user2)
data = {'user': self.user2.pk} data = {'user': self.user2.pk}
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user]) self.assertEqual(self.owned.editors(), [self.user])
@ -240,30 +239,30 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update time_update = self.owned.time_update
data = {'users': [self.invalid_user]} data = {'users': [self.invalid_user]}
self.executeBadData(data, item=self.owned.pk) self.executeBadData(data=data, item=self.owned.pk)
data = {'users': [self.user.pk]} data = {'users': [self.user.pk]}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk) self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk) self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db() self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update) self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user]) self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user]) self.assertEqual(self.owned.editors(), [self.user])
data = {'users': [self.user2.pk]} data = {'users': [self.user2.pk]}
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user2]) self.assertEqual(self.owned.editors(), [self.user2])
data = {'users': []} data = {'users': []}
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(self.owned.editors(), []) self.assertEqual(self.owned.editors(), [])
data = {'users': [self.user2.pk, self.user.pk]} data = {'users': [self.user2.pk, self.user.pk]}
self.executeOK(data) self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user2, self.user])) self.assertEqual(set(self.owned.editors()), set([self.user2, self.user]))
@ -294,7 +293,7 @@ class TestLibraryViewset(EndpointTester):
@decl_endpoint('/api/library', method='get') @decl_endpoint('/api/library', method='get')
def test_library_get(self): def test_library_get(self):
non_schema = LibraryItem.objects.create( non_schema = LibraryItem.objects.create(
item_type=LibraryItemType.OPERATIONS_SCHEMA, item_type=LibraryItemType.OPERATION_SCHEMA,
title='Test4' title='Test4'
) )
response = self.executeOK() response = self.executeOK()
@ -376,11 +375,11 @@ class TestLibraryViewset(EndpointTester):
) )
data = {'title': 'Title1337'} data = {'title': 'Title1337'}
self.executeNotFound(data, item=self.invalid_item) self.executeNotFound(data=data, item=self.invalid_item)
self.executeCreated(data, item=self.unowned.pk) self.executeCreated(data=data, item=self.unowned.pk)
data = {'title': 'Title1338'} data = {'title': 'Title1338'}
response = self.executeCreated(data, item=self.owned.pk) response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 2) self.assertEqual(len(response.data['items']), 2)
self.assertEqual(response.data['items'][0]['alias'], x12.alias) self.assertEqual(response.data['items'][0]['alias'], x12.alias)
@ -390,12 +389,12 @@ class TestLibraryViewset(EndpointTester):
self.assertEqual(response.data['items'][1]['term_resolved'], d2.term_resolved) self.assertEqual(response.data['items'][1]['term_resolved'], d2.term_resolved)
data = {'title': 'Title1340', 'items': []} data = {'title': 'Title1340', 'items': []}
response = self.executeCreated(data, item=self.owned.pk) response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 0) self.assertEqual(len(response.data['items']), 0)
data = {'title': 'Title1341', 'items': [x12.pk]} data = {'title': 'Title1341', 'items': [x12.pk]}
response = self.executeCreated(data, item=self.owned.pk) response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title']) self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 1) self.assertEqual(len(response.data['items']), 1)
self.assertEqual(response.data['items'][0]['alias'], x12.alias) self.assertEqual(response.data['items'][0]['alias'], x12.alias)

View File

@ -1,7 +1,6 @@
''' Testing API: Operations. ''' ''' Testing API: Operations. '''
from apps.rsform.models import Constituenta, CstType, RSForm from apps.rsform.models import Constituenta, CstType, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
from ..EndpointTester import EndpointTester, decl_endpoint
class TestInlineSynthesis(EndpointTester): class TestInlineSynthesis(EndpointTester):
@ -24,20 +23,20 @@ class TestInlineSynthesis(EndpointTester):
'items': [], 'items': [],
'substitutions': [] 'substitutions': []
} }
self.executeForbidden(data) self.executeForbidden(data=data)
data['receiver'] = invalid_id data['receiver'] = invalid_id
self.executeBadData(data) self.executeBadData(data=data)
data['receiver'] = self.schema1.item.pk data['receiver'] = self.schema1.item.pk
data['source'] = invalid_id data['source'] = invalid_id
self.executeBadData(data) self.executeBadData(data=data)
data['source'] = self.schema1.item.pk data['source'] = self.schema1.item.pk
self.executeOK(data) self.executeOK(data=data)
data['items'] = [invalid_id] data['items'] = [invalid_id]
self.executeBadData(data) self.executeBadData(data=data)
def test_inline_synthesis(self): def test_inline_synthesis(self):
@ -68,7 +67,7 @@ class TestInlineSynthesis(EndpointTester):
} }
] ]
} }
response = self.executeOK(data) response = self.executeOK(data=data)
result = {item['alias']: item for item in response.data['items']} result = {item['alias']: item for item in response.data['items']}
self.assertEqual(len(result), 6) self.assertEqual(len(result), 6)
self.assertEqual(result['X2']['term_raw'], ks1_x2.term_raw) self.assertEqual(result['X2']['term_raw'], ks1_x2.term_raw)

View File

@ -15,9 +15,8 @@ from apps.rsform.models import (
LocationHead, LocationHead,
RSForm RSForm
) )
from shared.EndpointTester import EndpointTester, decl_endpoint
from ..EndpointTester import EndpointTester, decl_endpoint from shared.testing_utils import response_contains
from ..testing_utils import response_contains
class TestRSFormViewset(EndpointTester): class TestRSFormViewset(EndpointTester):
@ -44,7 +43,7 @@ class TestRSFormViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED, 'access_policy': AccessPolicy.PROTECTED,
'visible': False 'visible': False
} }
self.executeBadData(data) self.executeBadData(data=data)
with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file: with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file:
data['file'] = file data['file'] = file
@ -59,7 +58,7 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms', method='get') @decl_endpoint('/api/rsforms', method='get')
def test_list_rsforms(self): def test_list_rsforms(self):
non_schema = LibraryItem.objects.create( non_schema = LibraryItem.objects.create(
item_type=LibraryItemType.OPERATIONS_SCHEMA, item_type=LibraryItemType.OPERATION_SCHEMA,
title='Test3' title='Test3'
) )
response = self.executeOK() response = self.executeOK()
@ -124,14 +123,14 @@ class TestRSFormViewset(EndpointTester):
def test_check(self): def test_check(self):
self.owned.insert_new('X1') self.owned.insert_new('X1')
data = {'expression': 'X1=X1'} data = {'expression': 'X1=X1'}
response = self.executeOK(data, item=self.owned_id) response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], True) self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math') self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[X1][X1]]') self.assertEqual(response.data['astText'], '[=[X1][X1]]')
self.assertEqual(response.data['typification'], 'LOGIC') self.assertEqual(response.data['typification'], 'LOGIC')
self.assertEqual(response.data['valueClass'], 'value') self.assertEqual(response.data['valueClass'], 'value')
self.executeOK(data, item=self.unowned_id) self.executeOK(data=data, item=self.unowned_id)
@decl_endpoint('/api/rsforms/{item}/resolve', method='post') @decl_endpoint('/api/rsforms/{item}/resolve', method='post')
@ -142,7 +141,7 @@ class TestRSFormViewset(EndpointTester):
) )
data = {'text': '@{1|редкий} @{X1|plur,datv}'} data = {'text': '@{1|редкий} @{X1|plur,datv}'}
response = self.executeOK(data, item=self.owned_id) response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['input'], '@{1|редкий} @{X1|plur,datv}') self.assertEqual(response.data['input'], '@{1|редкий} @{X1|plur,datv}')
self.assertEqual(response.data['output'], 'редким синим слонам') self.assertEqual(response.data['output'], 'редким синим слонам')
self.assertEqual(len(response.data['refs']), 2) self.assertEqual(len(response.data['refs']), 2)
@ -189,13 +188,19 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/cst-create', method='post') @decl_endpoint('/api/rsforms/{item}/cst-create', method='post')
def test_create_constituenta(self): def test_create_constituenta(self):
data = {'alias': 'X3', 'cst_type': CstType.BASE} data = {'alias': 'X3'}
self.executeForbidden(data, item=self.unowned_id) self.executeForbidden(data=data, item=self.unowned_id)
self.owned.insert_new('X1') self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2') x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
response = self.executeCreated(data, item=self.owned_id) data['cst_type'] = 'invalid'
self.executeBadData(data=data, item=self.owned_id)
data['cst_type'] = CstType.BASE
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3') self.assertEqual(response.data['new_cst']['alias'], 'X3')
x3 = Constituenta.objects.get(alias=response.data['new_cst']['alias']) x3 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x3.order, 3) self.assertEqual(x3.order, 3)
@ -207,7 +212,7 @@ class TestRSFormViewset(EndpointTester):
'term_raw': 'test', 'term_raw': 'test',
'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}] 'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}]
} }
response = self.executeCreated(data, item=self.owned_id) response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], data['alias']) self.assertEqual(response.data['new_cst']['alias'], data['alias'])
x4 = Constituenta.objects.get(alias=response.data['new_cst']['alias']) x4 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x4.order, 3) self.assertEqual(x4.order, 3)
@ -234,14 +239,14 @@ class TestRSFormViewset(EndpointTester):
) )
data = {'target': x2_2.pk, 'alias': 'D2', 'cst_type': CstType.TERM} data = {'target': x2_2.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
self.executeForbidden(data, item=self.unowned_id) self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x1.alias, 'cst_type': CstType.TERM} data = {'target': x1.pk, 'alias': x1.alias, 'cst_type': CstType.TERM}
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x3.alias} data = {'target': x1.pk, 'alias': x3.alias}
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new( d1 = self.owned.insert_new(
alias='D1', alias='D1',
@ -253,7 +258,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(x1.cst_type, CstType.BASE) self.assertEqual(x1.cst_type, CstType.BASE)
data = {'target': x1.pk, 'alias': 'D2', 'cst_type': CstType.TERM} data = {'target': x1.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
response = self.executeOK(data, item=self.owned_id) response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'D2') self.assertEqual(response.data['new_cst']['alias'], 'D2')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.TERM) self.assertEqual(response.data['new_cst']['cst_type'], CstType.TERM)
d1.refresh_from_db() d1.refresh_from_db()
@ -280,14 +285,14 @@ class TestRSFormViewset(EndpointTester):
unowned = self.unowned.insert_new('X2') unowned = self.unowned.insert_new('X2')
data = {'substitutions': [{'original': x1.pk, 'substitution': unowned.pk, 'transfer_term': True}]} data = {'substitutions': [{'original': x1.pk, 'substitution': unowned.pk, 'transfer_term': True}]}
self.executeForbidden(data, item=self.unowned_id) self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': unowned.pk, 'substitution': x1.pk, 'transfer_term': True}]} data = {'substitutions': [{'original': unowned.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': x1.pk, 'substitution': x1.pk, 'transfer_term': True}]} data = {'substitutions': [{'original': x1.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new( d1 = self.owned.insert_new(
alias='D1', alias='D1',
@ -295,7 +300,7 @@ class TestRSFormViewset(EndpointTester):
definition_formal='X1' definition_formal='X1'
) )
data = {'substitutions': [{'original': x1.pk, 'substitution': x2.pk, 'transfer_term': True}]} data = {'substitutions': [{'original': x1.pk, 'substitution': x2.pk, 'transfer_term': True}]}
response = self.executeOK(data, item=self.owned_id) response = self.executeOK(data=data, item=self.owned_id)
d1.refresh_from_db() d1.refresh_from_db()
x2.refresh_from_db() x2.refresh_from_db()
self.assertEqual(x2.term_raw, 'Test1') self.assertEqual(x2.term_raw, 'Test1')
@ -315,7 +320,7 @@ class TestRSFormViewset(EndpointTester):
) )
data = {'substitutions': []} data = {'substitutions': []}
self.executeBadData(data) self.executeBadData(data=data)
data = {'substitutions': [ data = {'substitutions': [
{ {
@ -329,7 +334,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True 'transfer_term': True
} }
]} ]}
self.executeBadData(data) self.executeBadData(data=data)
data = {'substitutions': [ data = {'substitutions': [
{ {
@ -343,7 +348,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True 'transfer_term': True
} }
]} ]}
response = self.executeOK(data, item=self.owned_id) response = self.executeOK(data=data, item=self.owned_id)
d3.refresh_from_db() d3.refresh_from_db()
self.assertEqual(d3.definition_formal, r'D1 \ D2') self.assertEqual(d3.definition_formal, r'D1 \ D2')
@ -358,7 +363,7 @@ class TestRSFormViewset(EndpointTester):
'definition_formal': '3', 'definition_formal': '3',
'definition_raw': '4' 'definition_raw': '4'
} }
response = self.executeCreated(data, item=self.owned_id) response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3') self.assertEqual(response.data['new_cst']['alias'], 'X3')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.BASE) self.assertEqual(response.data['new_cst']['cst_type'], CstType.BASE)
self.assertEqual(response.data['new_cst']['convention'], '1') self.assertEqual(response.data['new_cst']['convention'], '1')
@ -374,13 +379,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id) self.set_params(item=self.owned_id)
data = {'items': [1337]} data = {'items': [1337]}
self.executeBadData(data) self.executeBadData(data=data)
x1 = self.owned.insert_new('X1') x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2') x2 = self.owned.insert_new('X2')
data = {'items': [x1.pk]} data = {'items': [x1.pk]}
response = self.executeOK(data) response = self.executeOK(data=data)
x2.refresh_from_db() x2.refresh_from_db()
self.owned.item.refresh_from_db() self.owned.item.refresh_from_db()
self.assertEqual(len(response.data['items']), 1) self.assertEqual(len(response.data['items']), 1)
@ -390,7 +395,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1') x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk]} data = {'items': [x3.pk]}
self.executeBadData(data, item=self.owned_id) self.executeBadData(data=data, item=self.owned_id)
@decl_endpoint('/api/rsforms/{item}/cst-moveto', method='patch') @decl_endpoint('/api/rsforms/{item}/cst-moveto', method='patch')
@ -398,13 +403,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id) self.set_params(item=self.owned_id)
data = {'items': [1337], 'move_to': 1} data = {'items': [1337], 'move_to': 1}
self.executeBadData(data) self.executeBadData(data=data)
x1 = self.owned.insert_new('X1') x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2') x2 = self.owned.insert_new('X2')
data = {'items': [x2.pk], 'move_to': 1} data = {'items': [x2.pk], 'move_to': 1}
response = self.executeOK(data) response = self.executeOK(data=data)
x1.refresh_from_db() x1.refresh_from_db()
x2.refresh_from_db() x2.refresh_from_db()
self.assertEqual(response.data['id'], self.owned_id) self.assertEqual(response.data['id'], self.owned_id)
@ -413,7 +418,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1') x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk], 'move_to': 1} data = {'items': [x3.pk], 'move_to': 1}
self.executeBadData(data) self.executeBadData(data=data)
@decl_endpoint('/api/rsforms/{item}/reset-aliases', method='patch') @decl_endpoint('/api/rsforms/{item}/reset-aliases', method='patch')

View File

@ -1,5 +1,5 @@
''' Testing views ''' ''' Testing views '''
from ..EndpointTester import EndpointTester, decl_endpoint from shared.EndpointTester import EndpointTester, decl_endpoint
class TestRSLanguageViews(EndpointTester): class TestRSLanguageViews(EndpointTester):
@ -8,30 +8,30 @@ class TestRSLanguageViews(EndpointTester):
@decl_endpoint('/api/rslang/to-ascii', method='post') @decl_endpoint('/api/rslang/to-ascii', method='post')
def test_convert_to_ascii(self): def test_convert_to_ascii(self):
data = {'data': '1=1'} data = {'data': '1=1'}
self.executeBadData(data) self.executeBadData(data=data)
data = {'expression': '1=1'} data = {'expression': '1=1'}
response = self.executeOK(data) response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1 \eq 1') self.assertEqual(response.data['result'], r'1 \eq 1')
@decl_endpoint('/api/rslang/to-math', method='post') @decl_endpoint('/api/rslang/to-math', method='post')
def test_convert_to_math(self): def test_convert_to_math(self):
data = {'data': r'1 \eq 1'} data = {'data': r'1 \eq 1'}
self.executeBadData(data) self.executeBadData(data=data)
data = {'expression': r'1 \eq 1'} data = {'expression': r'1 \eq 1'}
response = self.executeOK(data) response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1=1') self.assertEqual(response.data['result'], r'1=1')
@decl_endpoint('/api/rslang/parse-expression', method='post') @decl_endpoint('/api/rslang/parse-expression', method='post')
def test_parse_expression(self): def test_parse_expression(self):
data = {'data': r'1=1'} data = {'data': r'1=1'}
self.executeBadData(data) self.executeBadData(data=data)
data = {'expression': r'1=1'} data = {'expression': r'1=1'}
response = self.executeOK(data) response = self.executeOK(data=data)
self.assertEqual(response.data['parseResult'], True) self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math') self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[1][1]]') self.assertEqual(response.data['astText'], '[=[1][1]]')

View File

@ -7,8 +7,7 @@ from zipfile import ZipFile
from rest_framework import status from rest_framework import status
from apps.rsform.models import Constituenta, RSForm from apps.rsform.models import Constituenta, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
from ..EndpointTester import EndpointTester, decl_endpoint
class TestVersionViews(EndpointTester): class TestVersionViews(EndpointTester):
@ -31,11 +30,11 @@ class TestVersionViews(EndpointTester):
invalid_id = 1338 invalid_id = 1338
data = {'version': '1.0.0', 'description': 'test'} data = {'version': '1.0.0', 'description': 'test'}
self.executeNotFound(data, schema=invalid_id) self.executeNotFound(data=data, schema=invalid_id)
self.executeForbidden(data, schema=self.unowned.pk) self.executeForbidden(data=data, schema=self.unowned.pk)
self.executeBadData(invalid_data, schema=self.owned.pk) self.executeBadData(data=invalid_data, schema=self.owned.pk)
response = self.executeCreated(data, schema=self.owned.pk) response = self.executeCreated(data=data, schema=self.owned.pk)
self.assertTrue('version' in response.data) self.assertTrue('version' in response.data)
self.assertTrue('schema' in response.data) self.assertTrue('schema' in response.data)
self.assertTrue(response.data['version'] in [v['id'] for v in response.data['schema']['versions']]) self.assertTrue(response.data['version'] in [v['id'] for v in response.data['schema']['versions']])
@ -65,7 +64,7 @@ class TestVersionViews(EndpointTester):
@decl_endpoint('/api/versions/{version}', method='get') @decl_endpoint('/api/versions/{version}', method='get')
def test_access_version(self): def test_access_version(self):
data = {'version': '1.0.0', 'description': 'test'} data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data) version_id = self._create_version(data=data)
invalid_id = version_id + 1337 invalid_id = version_id + 1337
self.executeNotFound(version=invalid_id) self.executeNotFound(version=invalid_id)
@ -79,14 +78,14 @@ class TestVersionViews(EndpointTester):
data = {'version': '1.2.0', 'description': 'test1'} data = {'version': '1.2.0', 'description': 'test1'}
self.method = 'patch' self.method = 'patch'
self.executeForbidden(data) self.executeForbidden(data=data)
self.method = 'delete' self.method = 'delete'
self.executeForbidden() self.executeForbidden()
self.client.force_authenticate(user=self.user) self.client.force_authenticate(user=self.user)
self.method = 'patch' self.method = 'patch'
self.executeOK(data) self.executeOK(data=data)
response = self.get() response = self.get()
self.assertEqual(response.data['version'], data['version']) self.assertEqual(response.data['version'], data['version'])
self.assertEqual(response.data['description'], data['description']) self.assertEqual(response.data['description'], data['description'])
@ -139,7 +138,7 @@ class TestVersionViews(EndpointTester):
x2 = self.schema.insert_new('X2') x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new('D1', term_raw='TestTerm') d1 = self.schema.insert_new('D1', term_raw='TestTerm')
data = {'version': '1.0.0', 'description': 'test'} data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data) version_id = self._create_version(data=data)
invalid_id = version_id + 1337 invalid_id = version_id + 1337
d1.delete() d1.delete()

View File

@ -2,8 +2,9 @@
from drf_spectacular.utils import extend_schema, extend_schema_view from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics from rest_framework import generics
from shared import permissions
from .. import models as m from .. import models as m
from .. import permissions
from .. import serializers as s from .. import serializers as s

View File

@ -13,8 +13,9 @@ from rest_framework.decorators import action
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from shared import permissions
from .. import models as m from .. import models as m
from .. import permissions
from .. import serializers as s from .. import serializers as s
@ -42,17 +43,27 @@ class LibraryViewSet(viewsets.ModelViewSet):
def get_permissions(self): def get_permissions(self):
if self.action in ['update', 'partial_update']: if self.action in ['update', 'partial_update']:
permission_list = [permissions.ItemEditor] access_level = permissions.ItemEditor
elif self.action in [ elif self.action in [
'destroy', 'set_owner', 'set_access_policy', 'set_location', 'destroy',
'editors_add', 'editors_remove', 'editors_set' 'set_owner',
'set_access_policy',
'set_location',
'editors_add',
'editors_remove',
'editors_set'
]: ]:
permission_list = [permissions.ItemOwner] access_level = permissions.ItemOwner
elif self.action in ['create', 'clone', 'subscribe', 'unsubscribe']: elif self.action in [
permission_list = [permissions.GlobalUser] 'create',
'clone',
'subscribe',
'unsubscribe'
]:
access_level = permissions.GlobalUser
else: else:
permission_list = [permissions.ItemAnyone] access_level = permissions.ItemAnyone
return [permission() for permission in permission_list] return [access_level()]
def _get_item(self) -> m.LibraryItem: def _get_item(self) -> m.LibraryItem:
return cast(m.LibraryItem, self.get_object()) return cast(m.LibraryItem, self.get_object())
@ -68,7 +79,6 @@ class LibraryViewSet(viewsets.ModelViewSet):
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@transaction.atomic
@action(detail=True, methods=['post'], url_path='clone') @action(detail=True, methods=['post'], url_path='clone')
def clone(self, request: Request, pk): def clone(self, request: Request, pk):
''' Endpoint: Create deep copy of library item. ''' ''' Endpoint: Create deep copy of library item. '''
@ -85,19 +95,20 @@ class LibraryViewSet(viewsets.ModelViewSet):
clone.read_only = False clone.read_only = False
clone.access_policy = serializer.validated_data.get('access_policy', m.AccessPolicy.PUBLIC) clone.access_policy = serializer.validated_data.get('access_policy', m.AccessPolicy.PUBLIC)
clone.location = serializer.validated_data.get('location', m.LocationHead.USER) clone.location = serializer.validated_data.get('location', m.LocationHead.USER)
clone.save()
if clone.item_type == m.LibraryItemType.RSFORM: with transaction.atomic():
need_filter = 'items' in request.data clone.save()
for cst in m.RSForm(item).constituents(): if clone.item_type == m.LibraryItemType.RSFORM:
if not need_filter or cst.pk in request.data['items']: need_filter = 'items' in request.data
cst.pk = None for cst in m.RSForm(item).constituents():
cst.schema = clone if not need_filter or cst.pk in request.data['items']:
cst.save() cst.pk = None
return Response( cst.schema = clone
status=c.HTTP_201_CREATED, cst.save()
data=s.RSFormParseSerializer(clone).data return Response(
) status=c.HTTP_201_CREATED,
data=s.RSFormParseSerializer(clone).data
)
return Response(status=c.HTTP_400_BAD_REQUEST) return Response(status=c.HTTP_400_BAD_REQUEST)
@extend_schema( @extend_schema(
@ -266,24 +277,17 @@ class LibraryActiveView(generics.ListAPIView):
serializer_class = s.LibraryItemSerializer serializer_class = s.LibraryItemSerializer
def get_queryset(self): def get_queryset(self):
common_location = Q(location__startswith=m.LocationHead.COMMON) | Q(location__startswith=m.LocationHead.LIBRARY)
is_public = Q(access_policy=m.AccessPolicy.PUBLIC)
if self.request.user.is_anonymous: if self.request.user.is_anonymous:
return m.LibraryItem.objects.filter( return m.LibraryItem.objects \
Q(access_policy=m.AccessPolicy.PUBLIC), .filter(is_public) \
).filter( .filter(common_location).order_by('-time_update')
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
).order_by('-time_update')
else: else:
user = cast(m.User, self.request.user) user = cast(m.User, self.request.user)
# pylint: disable=unsupported-binary-operation # pylint: disable=unsupported-binary-operation
return m.LibraryItem.objects.filter( return m.LibraryItem.objects.filter(
( (is_public & common_location) |
Q(access_policy=m.AccessPolicy.PUBLIC) &
(
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
)
) |
Q(owner=user) | Q(owner=user) |
Q(editor__editor=user) | Q(editor__editor=user) |
Q(subscription__user=user) Q(subscription__user=user)

View File

@ -18,7 +18,6 @@ from .. import serializers as s
request=s.InlineSynthesisSerializer, request=s.InlineSynthesisSerializer,
responses={c.HTTP_200_OK: s.RSFormParseSerializer} responses={c.HTTP_200_OK: s.RSFormParseSerializer}
) )
@transaction.atomic
@api_view(['PATCH']) @api_view(['PATCH'])
def inline_synthesis(request: Request): def inline_synthesis(request: Request):
''' Endpoint: Inline synthesis. ''' ''' Endpoint: Inline synthesis. '''
@ -30,20 +29,21 @@ def inline_synthesis(request: Request):
schema = m.RSForm(serializer.validated_data['receiver']) schema = m.RSForm(serializer.validated_data['receiver'])
items = cast(list[m.Constituenta], serializer.validated_data['items']) items = cast(list[m.Constituenta], serializer.validated_data['items'])
new_items = schema.insert_copy(items)
for substitution in serializer.validated_data['substitutions']: with transaction.atomic():
original = cast(m.Constituenta, substitution['original']) new_items = schema.insert_copy(items)
replacement = cast(m.Constituenta, substitution['substitution']) for substitution in serializer.validated_data['substitutions']:
if original in items: original = cast(m.Constituenta, substitution['original'])
index = next(i for (i, cst) in enumerate(items) if cst == original) replacement = cast(m.Constituenta, substitution['substitution'])
original = new_items[index] if original in items:
else: index = next(i for (i, cst) in enumerate(items) if cst == original)
index = next(i for (i, cst) in enumerate(items) if cst == replacement) original = new_items[index]
replacement = new_items[index] else:
schema.substitute(original, replacement, substitution['transfer_term']) index = next(i for (i, cst) in enumerate(items) if cst == replacement)
replacement = new_items[index]
schema.substitute(original, replacement, substitution['transfer_term'])
schema.restore_order()
schema.restore_order()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data data=s.RSFormParseSerializer(schema.item).data

View File

@ -13,9 +13,10 @@ from rest_framework.decorators import action, api_view
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from .. import messages as msg from shared import messages as msg
from shared import permissions
from .. import models as m from .. import models as m
from .. import permissions
from .. import serializers as s from .. import serializers as s
from .. import utils from .. import utils
@ -33,11 +34,21 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def get_permissions(self): def get_permissions(self):
''' Determine permission class. ''' ''' Determine permission class. '''
if self.action in [ if self.action in [
'load_trs', 'cst_create', 'cst_delete_multiple', 'load_trs',
'reset_aliases', 'cst_rename', 'cst_substitute' 'reset_aliases',
'cst_create',
'cst_delete_multiple',
'cst_rename',
'cst_substitute'
]: ]:
permission_list = [permissions.ItemEditor] permission_list = [permissions.ItemEditor]
elif self.action in ['contents', 'details', 'export_trs', 'resolve', 'check']: elif self.action in [
'contents',
'details',
'export_trs',
'resolve',
'check'
]:
permission_list = [permissions.ItemAnyone] permission_list = [permissions.ItemAnyone]
else: else:
permission_list = [permissions.Anyone] permission_list = [permissions.Anyone]
@ -49,6 +60,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstCreateSerializer, request=s.CstCreateSerializer,
responses={ responses={
c.HTTP_201_CREATED: s.NewCstResponse, c.HTTP_201_CREATED: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
@ -60,10 +72,15 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
serializer = s.CstCreateSerializer(data=request.data) serializer = s.CstCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
data = serializer.validated_data data = serializer.validated_data
new_cst = schema.create_cst( if 'insert_after' in data:
data=data, try:
insert_after=data['insert_after'] if 'insert_after' in data else None insert_after = m.Constituenta.objects.get(pk=data['insert_after'])
) except m.LibraryItem.DoesNotExist:
return Response(status=c.HTTP_404_NOT_FOUND)
else:
insert_after = None
new_cst = schema.create_cst(data, insert_after)
schema.item.refresh_from_db() schema.item.refresh_from_db()
response = Response( response = Response(
status=c.HTTP_201_CREATED, status=c.HTTP_201_CREATED,
@ -81,6 +98,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstTargetSerializer, request=s.CstTargetSerializer,
responses={ responses={
c.HTTP_200_OK: s.NewMultiCstResponse, c.HTTP_200_OK: s.NewMultiCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
@ -117,11 +135,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstRenameSerializer, request=s.CstRenameSerializer,
responses={ responses={
c.HTTP_200_OK: s.NewCstResponse, c.HTTP_200_OK: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-rename') @action(detail=True, methods=['patch'], url_path='cst-rename')
def cst_rename(self, request: Request, pk): def cst_rename(self, request: Request, pk):
''' Rename constituenta possibly changing type. ''' ''' Rename constituenta possibly changing type. '''
@ -134,12 +152,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
cst.alias = serializer.validated_data['alias'] cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type'] cst.cst_type = serializer.validated_data['cst_type']
cst.save()
mapping = {old_alias: cst.alias} with transaction.atomic():
schema.apply_mapping(mapping, change_aliases=False) cst.save()
schema.item.refresh_from_db() schema.apply_mapping(mapping={old_alias: cst.alias}, change_aliases=False)
cst.refresh_from_db() schema.item.refresh_from_db()
cst.refresh_from_db()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
@ -155,11 +173,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstSubstituteSerializer, request=s.CstSubstituteSerializer,
responses={ responses={
c.HTTP_200_OK: s.RSFormParseSerializer, c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-substitute') @action(detail=True, methods=['patch'], url_path='cst-substitute')
def cst_substitute(self, request: Request, pk): def cst_substitute(self, request: Request, pk):
''' Substitute occurrences of constituenta with another one. ''' ''' Substitute occurrences of constituenta with another one. '''
@ -169,11 +187,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
context={'schema': schema.item} context={'schema': schema.item}
) )
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original']) with transaction.atomic():
replacement = cast(m.Constituenta, substitution['substitution']) for substitution in serializer.validated_data['substitutions']:
schema.substitute(original, replacement, substitution['transfer_term']) original = cast(m.Constituenta, substitution['original'])
schema.item.refresh_from_db() replacement = cast(m.Constituenta, substitution['substitution'])
schema.substitute(original, replacement, substitution['transfer_term'])
schema.item.refresh_from_db()
return Response( return Response(
status=c.HTTP_200_OK, status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data data=s.RSFormParseSerializer(schema.item).data
@ -185,6 +205,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstListSerializer, request=s.CstListSerializer,
responses={ responses={
c.HTTP_200_OK: s.RSFormParseSerializer, c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
@ -211,6 +232,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstMoveSerializer, request=s.CstMoveSerializer,
responses={ responses={
c.HTTP_200_OK: s.RSFormParseSerializer, c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
@ -279,6 +301,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.RSFormUploadSerializer, request=s.RSFormUploadSerializer,
responses={ responses={
c.HTTP_200_OK: s.RSFormParseSerializer, c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
@ -313,7 +336,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@action(detail=True, methods=['get']) @action(detail=True, methods=['get'], url_path='contents')
def contents(self, request: Request, pk): def contents(self, request: Request, pk):
''' Endpoint: View schema db contents (including constituents). ''' ''' Endpoint: View schema db contents (including constituents). '''
schema = s.RSFormSerializer(self.get_object()) schema = s.RSFormSerializer(self.get_object())
@ -331,7 +354,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@action(detail=True, methods=['get']) @action(detail=True, methods=['get'], url_path='details')
def details(self, request: Request, pk): def details(self, request: Request, pk):
''' Endpoint: Detailed schema view including statuses and parse. ''' ''' Endpoint: Detailed schema view including statuses and parse. '''
serializer = s.RSFormParseSerializer(cast(m.LibraryItem, self.get_object())) serializer = s.RSFormParseSerializer(cast(m.LibraryItem, self.get_object()))
@ -349,7 +372,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
}, },
) )
@action(detail=True, methods=['post']) @action(detail=True, methods=['post'], url_path='check')
def check(self, request: Request, pk): def check(self, request: Request, pk):
''' Endpoint: Check RSLang expression against schema context. ''' ''' Endpoint: Check RSLang expression against schema context. '''
serializer = s.ExpressionSerializer(data=request.data) serializer = s.ExpressionSerializer(data=request.data)
@ -371,7 +394,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }
) )
@action(detail=True, methods=['post']) @action(detail=True, methods=['post'], url_path='resolve')
def resolve(self, request: Request, pk): def resolve(self, request: Request, pk):
''' Endpoint: Resolve references in text against schema terms context. ''' ''' Endpoint: Resolve references in text against schema terms context. '''
serializer = s.TextSerializer(data=request.data) serializer = s.TextSerializer(data=request.data)

View File

@ -10,8 +10,9 @@ from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.response import Response from rest_framework.response import Response
from shared import permissions
from .. import models as m from .. import models as m
from .. import permissions
from .. import serializers as s from .. import serializers as s
from .. import utils from .. import utils
@ -54,6 +55,7 @@ class VersionViewset(
request=s.VersionCreateSerializer, request=s.VersionCreateSerializer,
responses={ responses={
c.HTTP_201_CREATED: s.NewVersionResponse, c.HTTP_201_CREATED: s.NewVersionResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None, c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None c.HTTP_404_NOT_FOUND: None
} }

View File

@ -1,14 +0,0 @@
''' Utility: Text messages. '''
# pylint: skip-file
def passwordAuthFailed():
return 'Неизвестное сочетание имени пользователя (email) и пароля'
def passwordsNotMatch():
return 'Введенные пароли не совпадают'
def emailAlreadyTaken():
return 'Пользователь с данным email уже существует'

View File

@ -4,8 +4,8 @@ from django.contrib.auth.password_validation import validate_password
from rest_framework import serializers from rest_framework import serializers
from apps.rsform.models import Editor, Subscription from apps.rsform.models import Editor, Subscription
from shared import messages as msg
from . import messages as msg
from . import models from . import models

View File

@ -1,30 +1,30 @@
''' Testing API: users. ''' ''' Testing API: users. '''
from rest_framework.test import APIClient, APITestCase from rest_framework.test import APIClient, APITestCase
from apps.rsform.tests.EndpointTester import EndpointTester, decl_endpoint
from apps.users.models import User from apps.users.models import User
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestUserAPIViews(EndpointTester): class TestUserAPIViews(EndpointTester):
''' Testing Authentication views. ''' ''' Testing Authentication views. '''
def setUp(self): def setUp(self):
super().setUp() super().setUpFullUsers()
@decl_endpoint('/users/api/login', method='post') @decl_endpoint('/users/api/login', method='post')
def test_login(self): def test_login(self):
self.logout() self.logout()
data = {'username': self.user.username, 'password': 'invalid'} data = {'username': self.user.username, 'password': 'invalid'}
self.executeBadData(data) self.executeBadData(data=data)
data = {'username': self.user.username, 'password': 'password'} data = {'username': self.user.username, 'password': 'password'}
self.executeAccepted(data) self.executeAccepted(data=data)
self.executeAccepted(data) self.executeAccepted(data=data)
self.logout() self.logout()
data = {'username': self.user.email, 'password': 'password'} data = {'username': self.user.email, 'password': 'password'}
self.executeAccepted(data) self.executeAccepted(data=data)
@decl_endpoint('/users/api/logout', method='post') @decl_endpoint('/users/api/logout', method='post')
@ -59,7 +59,7 @@ class TestUserUserProfileAPIView(EndpointTester):
''' Testing User profile views. ''' ''' Testing User profile views. '''
def setUp(self): def setUp(self):
super().setUp() super().setUpFullUsers()
self.user.first_name = 'John' self.user.first_name = 'John'
self.user.second_name = 'Smith' self.user.second_name = 'Smith'
self.user.save() self.user.save()
@ -84,7 +84,7 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'firstName', 'first_name': 'firstName',
'last_name': 'lastName', 'last_name': 'lastName',
} }
response = self.executeOK(data) response = self.executeOK(data=data)
self.user.refresh_from_db() self.user.refresh_from_db()
self.assertEqual(response.data['email'], '123@mail.ru') self.assertEqual(response.data['email'], '123@mail.ru')
self.assertEqual(self.user.email, '123@mail.ru') self.assertEqual(self.user.email, '123@mail.ru')
@ -98,10 +98,10 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'new', 'first_name': 'new',
'last_name': 'new2', 'last_name': 'new2',
} }
self.executeOK(data) self.executeOK(data=data)
data = {'email': self.user2.email} data = {'email': self.user2.email}
self.executeBadData(data) self.executeBadData(data=data)
self.logout() self.logout()
self.executeForbidden() self.executeForbidden()
@ -113,14 +113,14 @@ class TestUserUserProfileAPIView(EndpointTester):
'old_password': 'invalid', 'old_password': 'invalid',
'new_password': 'password2' 'new_password': 'password2'
} }
self.executeBadData(data) self.executeBadData(data=data)
data = { data = {
'old_password': 'password', 'old_password': 'password',
'new_password': 'password2' 'new_password': 'password2'
} }
oldHash = self.user.password oldHash = self.user.password
response = self.executeNoContent(data) response = self.executeNoContent(data=data)
self.user.refresh_from_db() self.user.refresh_from_db()
self.assertNotEqual(self.user.password, oldHash) self.assertNotEqual(self.user.password, oldHash)
self.assertTrue(self.client.login(username=self.user.username, password='password2')) self.assertTrue(self.client.login(username=self.user.username, password='password2'))
@ -154,7 +154,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName', 'first_name': 'firstName',
'last_name': 'lastName' 'last_name': 'lastName'
} }
self.executeBadData(data) self.executeBadData(data=data)
data = { data = {
'username': 'NewUser', 'username': 'NewUser',
@ -164,7 +164,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName', 'first_name': 'firstName',
'last_name': 'lastName' 'last_name': 'lastName'
} }
response = self.executeCreated(data) response = self.executeCreated(data=data)
self.assertTrue('id' in response.data) self.assertTrue('id' in response.data)
self.assertEqual(response.data['username'], data['username']) self.assertEqual(response.data['username'], data['username'])
self.assertEqual(response.data['email'], data['email']) self.assertEqual(response.data['email'], data['email'])
@ -179,7 +179,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName', 'first_name': 'firstName',
'last_name': 'lastName' 'last_name': 'lastName'
} }
self.executeBadData(data) self.executeBadData(data=data)
data = { data = {
'username': 'NewUser2', 'username': 'NewUser2',
@ -189,4 +189,4 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName', 'first_name': 'firstName',
'last_name': 'lastName' 'last_name': 'lastName'
} }
self.executeBadData(data) self.executeBadData(data=data)

View File

@ -1,4 +1,22 @@
[ [
{
"model": "auth.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$720000$gFZkaBswurtL0naQKiUnW7$3b6SUN3fY2Xl1H7erAszVpQl2LpoKusan+yJP7Bp3JA=",
"last_login": "2024-06-03T20:57:02.522Z",
"is_superuser": true,
"username": "admin",
"first_name": "Администратор",
"last_name": "",
"email": "admin@mail.ru",
"is_staff": true,
"is_active": true,
"date_joined": "2024-05-27T18:31:48.913Z",
"groups": [],
"user_permissions": []
}
},
{ {
"model": "auth.user", "model": "auth.user",
"pk": 3, "pk": 3,

View File

@ -74,6 +74,7 @@ INSTALLED_APPS = [
'apps.users', 'apps.users',
'apps.rsform', 'apps.rsform',
'apps.oss',
'drf_spectacular', 'drf_spectacular',
'drf_spectacular_sidecar', 'drf_spectacular_sidecar',

View File

@ -9,6 +9,7 @@ from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, Spec
urlpatterns = [ urlpatterns = [
path('admin', admin.site.urls), path('admin', admin.site.urls),
path('api/', include('apps.rsform.urls')), path('api/', include('apps.rsform.urls')),
path('api/', include('apps.oss.urls')),
path('users/', include('apps.users.urls')), path('users/', include('apps.users.urls')),
path('schema', SpectacularAPIView.as_view(), name='schema'), path('schema', SpectacularAPIView.as_view(), name='schema'),
path('redoc', SpectacularRedocView.as_view()), path('redoc', SpectacularRedocView.as_view()),

View File

@ -26,6 +26,21 @@ class EndpointTester(APITestCase):
''' Abstract base class for Testing endpoints. ''' ''' Abstract base class for Testing endpoints. '''
def setUp(self): def setUp(self):
self.factory = APIRequestFactory()
self.user = User.objects.create(
username='UserTest',
email='blank@test.com',
password='password'
)
self.user2 = User.objects.create(
username='UserTest2',
email='another@test.com',
password='password'
)
self.client = APIClient()
self.client.force_authenticate(user=self.user)
def setUpFullUsers(self):
self.factory = APIRequestFactory() self.factory = APIRequestFactory()
self.user = User.objects.create_user( self.user = User.objects.create_user(
username='UserTest', username='UserTest',

View File

@ -0,0 +1 @@
''' Utilities shared between applications. '''

View File

@ -6,6 +6,10 @@ def constituentaNotOwned(title: str):
return f'Конституента не принадлежит схеме: {title}' return f'Конституента не принадлежит схеме: {title}'
def operationNotOwned(title: str):
return f'Операция не принадлежит схеме: {title}'
def substitutionNotInList(): def substitutionNotInList():
return 'Отождествляемая конституента отсутствует в списке' return 'Отождествляемая конституента отсутствует в списке'
@ -64,3 +68,15 @@ def constituentaNoStructure():
def missingFile(): def missingFile():
return 'Отсутствует прикрепленный файл' return 'Отсутствует прикрепленный файл'
def passwordAuthFailed():
return 'Неизвестное сочетание имени пользователя (email) и пароля'
def passwordsNotMatch():
return 'Введенные пароли не совпадают'
def emailAlreadyTaken():
return 'Пользователь с данным email уже существует'

View File

@ -11,16 +11,27 @@ from rest_framework.permissions import \
from rest_framework.request import Request from rest_framework.request import Request
from rest_framework.views import APIView from rest_framework.views import APIView
from . import models as m from apps.oss.models import Operation
from apps.rsform.models import (
AccessPolicy,
Constituenta,
Editor,
LibraryItem,
Subscription,
Version
)
from apps.users.models import User
def _extract_item(obj: Any) -> m.LibraryItem: def _extract_item(obj: Any) -> LibraryItem:
if isinstance(obj, m.LibraryItem): if isinstance(obj, LibraryItem):
return obj return obj
elif isinstance(obj, m.Constituenta): elif isinstance(obj, Constituenta):
return cast(m.LibraryItem, obj.schema) return cast(LibraryItem, obj.schema)
elif isinstance(obj, (m.Version, m.Subscription, m.Editor)): elif isinstance(obj, Operation):
return cast(m.LibraryItem, obj.item) return cast(LibraryItem, obj.oss)
elif isinstance(obj, (Version, Subscription, Editor)):
return cast(LibraryItem, obj.item)
raise PermissionDenied({ raise PermissionDenied({
'message': 'Invalid type error. Please contact developers', 'message': 'Invalid type error. Please contact developers',
'object_id': obj.id 'object_id': obj.id
@ -60,10 +71,10 @@ class ItemEditor(ItemOwner):
if request.user.is_anonymous: if request.user.is_anonymous:
return False return False
item = _extract_item(obj) item = _extract_item(obj)
if m.Editor.objects.filter( if Editor.objects.filter(
item=item, item=item,
editor=cast(m.User, request.user) editor=cast(User, request.user)
).exists() and item.access_policy != m.AccessPolicy.PRIVATE: ).exists() and item.access_policy != AccessPolicy.PRIVATE:
return True return True
return super().has_object_permission(request, view, obj) return super().has_object_permission(request, view, obj)
@ -76,7 +87,7 @@ class ItemAnyone(ItemEditor):
def has_object_permission(self, request: Request, view: APIView, obj: Any) -> bool: def has_object_permission(self, request: Request, view: APIView, obj: Any) -> bool:
item = _extract_item(obj) item = _extract_item(obj)
if item.access_policy == m.AccessPolicy.PUBLIC: if item.access_policy == AccessPolicy.PUBLIC:
return True return True
return super().has_object_permission(request, view, obj) return super().has_object_permission(request, view, obj)

View File

@ -25,7 +25,6 @@ function BackendRun() {
DoMigrations DoMigrations
PrepareStatic -clearPrevious PrepareStatic -clearPrevious
AddInitialData AddInitialData
AddAdmin
} else { } else {
DoMigrations DoMigrations
PrepareStatic PrepareStatic
@ -48,13 +47,15 @@ function FlushData {
} }
function AddInitialData { function AddInitialData {
& $pyExec manage.py loaddata $initialData if (Test-Path -Path $initialData -PathType Leaf) {
} & $pyExec $djangoSrc flush --noinput
function AddAdmin { & $pyExec $djangoSrc loaddata $initialData
$env:DJANGO_SUPERUSER_USERNAME = 'admin' } else {
$env:DJANGO_SUPERUSER_PASSWORD = '1234' $env:DJANGO_SUPERUSER_USERNAME = 'admin'
$env:DJANGO_SUPERUSER_EMAIL = 'admin@admin.com' $env:DJANGO_SUPERUSER_PASSWORD = '1234'
& $pyExec $djangoSrc createsuperuser --noinput $env:DJANGO_SUPERUSER_EMAIL = 'admin@admin.com'
& $pyExec $djangoSrc createsuperuser --noinput
}
} }
function DoMigrations { function DoMigrations {