Implementing oss backend pt2
Some checks are pending
Backend CI / build (3.12) (push) Waiting to run

This commit is contained in:
Ivan 2024-07-19 19:28:55 +03:00
parent 86033ccd46
commit a256582a65
29 changed files with 621 additions and 263 deletions

View File

@ -4,5 +4,5 @@ from apps.rsform.models import LibraryItem, LibraryItemType
from .api_OSS import OperationSchema
from .Argument import Argument
from .Operation import Operation
from .Operation import Operation, OperationType
from .SynthesisSubstitution import SynthesisSubstitution

View File

@ -1,4 +1,6 @@
''' Models: OSS API. '''
from typing import Optional
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import QuerySet
@ -7,7 +9,7 @@ from apps.rsform.models import LibraryItem, LibraryItemType
from shared import messages as msg
from .Argument import Argument
from .Operation import Operation, OperationType
from .Operation import Operation
from .SynthesisSubstitution import SynthesisSubstitution
@ -33,18 +35,26 @@ class OperationSchema:
return Argument.objects.filter(operation__oss=self.item)
def substitutions(self) -> QuerySet[SynthesisSubstitution]:
''' Operation arguments. '''
''' Operation substitutions. '''
return SynthesisSubstitution.objects.filter(operation__oss=self.item)
def update_positions(self, data: list[dict]):
''' Update positions. '''
lookup = {x['id']: x for x in data}
operations = self.operations()
for item in operations:
if item.pk in lookup:
item.position_x = lookup[item.pk]['position_x']
item.position_y = lookup[item.pk]['position_y']
Operation.objects.bulk_update(operations, ['position_x', 'position_y'])
@transaction.atomic
def create_operation(self, operation_type: OperationType, alias: str, **kwargs) -> Operation:
def create_operation(self, **kwargs) -> Operation:
''' Insert new operation. '''
if self.operations().filter(alias=alias).exists():
raise ValidationError(msg.aliasTaken(alias))
if kwargs['alias'] != '' and self.operations().filter(alias=kwargs['alias']).exists():
raise ValidationError(msg.aliasTaken(kwargs['alias']))
result = Operation.objects.create(
oss=self.item,
alias=alias,
operation_type=operation_type,
**kwargs
)
self.item.save()
@ -55,4 +65,64 @@ class OperationSchema:
def delete_operation(self, operation: Operation):
''' Delete operation. '''
operation.delete()
# deal with attached schema
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_input(self, target: Operation, schema: Optional[LibraryItem]):
''' Set input schema for operation. '''
if schema == target.result:
return
if schema:
target.result = schema
target.alias = schema.alias
target.title = schema.title
target.comment = schema.comment
else:
target.result = None
target.save()
# trigger on_change effects
self.item.save()
@transaction.atomic
def add_argument(self, operation: Operation, argument: Operation) -> Optional[Argument]:
''' Add Argument to operation. '''
if Argument.objects.filter(operation=operation, argument=argument).exists():
return None
result = Argument.objects.create(operation=operation, argument=argument)
self.item.save()
return result
@transaction.atomic
def clear_arguments(self, target: Operation):
''' Clear all arguments for operation. '''
if not Argument.objects.filter(operation=target).exists():
return
Argument.objects.filter(operation=target).delete()
SynthesisSubstitution.objects.filter(operation=target).delete()
# trigger on_change effects
self.item.save()
@transaction.atomic
def set_substitutions(self, target: Operation, substitutes: list[dict]):
''' Clear all arguments for operation. '''
SynthesisSubstitution.objects.filter(operation=target).delete()
for sub in substitutes:
SynthesisSubstitution.objects.create(
operation=target,
original=sub['original'],
substitution=sub['substitution'],
transfer_term=sub['transfer_term']
)
# trigger on_change effects
self.item.save()

View File

@ -2,9 +2,11 @@
from apps.rsform.serializers import LibraryItemSerializer
from .basics import OperationPositionSerializer, PositionsSerializer
from .data_access import (
ArgumentSerializer,
OperationCreateSerializer,
OperationDeleteSerializer,
OperationSchemaSerializer,
OperationSerializer
)

View File

@ -0,0 +1,16 @@
''' Basic serializers that do not interact with database. '''
from rest_framework import serializers
class OperationPositionSerializer(serializers.Serializer):
''' Operation position. '''
id = serializers.IntegerField()
position_x = serializers.FloatField()
position_y = serializers.FloatField()
class PositionsSerializer(serializers.Serializer):
''' Operations position for OperationSchema. '''
positions = serializers.ListField(
child=OperationPositionSerializer()
)

View File

@ -1,13 +1,16 @@
''' Serializers for persistent data manipulation. '''
from typing import cast
from django.db.models import F
from rest_framework import serializers
from rest_framework.serializers import PrimaryKeyRelatedField as PKField
from apps.rsform.models import LibraryItem
from apps.rsform.serializers import LibraryItemDetailsSerializer
from shared import messages as msg
from ..models import Argument, Operation, OperationSchema
from ..models import Argument, Operation, OperationSchema, OperationType
from .basics import OperationPositionSerializer
class OperationSerializer(serializers.ModelSerializer):
@ -27,37 +30,44 @@ class ArgumentSerializer(serializers.ModelSerializer):
fields = ('operation', 'argument')
class OperationCreateSerializer(serializers.Serializer):
''' Serializer: Operation creation. '''
class OperationData(serializers.ModelSerializer):
''' Serializer: Operation creation data. '''
alias = serializers.CharField()
operation_type = serializers.ChoiceField(OperationType.choices)
class OperationPositionSerializer(serializers.ModelSerializer):
''' Serializer: Positions of a single operations in OSS. '''
class Meta:
''' serializer metadata. '''
model = Operation
fields = 'id', 'position_x', 'position_y'
class Meta:
''' serializer metadata. '''
model = Operation
fields = \
'alias', 'operation_type', 'title', \
'comment', 'position_x', 'position_y'
def validate(self, attrs):
oss = cast(LibraryItem, self.context['oss'])
operation = cast(Operation, self.instance)
if operation.oss != oss:
raise serializers.ValidationError({
'id': msg.operationNotOwned(oss.title)
})
return attrs
class OperationCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. '''
item_data = OperationData()
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
class Meta:
''' serializer metadata. '''
model = Operation
fields = \
'alias', 'operation_type', 'title', \
'comment', 'position_x', 'position_y'
class OperationDeleteSerializer(serializers.Serializer):
''' Serializer: Delete operation. '''
target = PKField(many=False, queryset=Operation.objects.all())
positions = serializers.ListField(
child=OperationPositionSerializer(),
default=[]
)
def validate(self, attrs):
oss = cast(LibraryItem, self.context['oss'])
operation = cast(Operation, attrs['target'])
if oss and operation.oss != oss:
raise serializers.ValidationError({
f'{operation.id}': msg.operationNotOwned(oss.title)
})
self.instance = operation
return attrs
class OperationSchemaSerializer(serializers.ModelSerializer):
@ -85,13 +95,14 @@ class OperationSchemaSerializer(serializers.ModelSerializer):
result['graph'].append(ArgumentSerializer(argument).data)
result['substitutions'] = []
for substitution in oss.substitutions().values(
'operation',
'original',
'original__alias',
'original__term_resolved',
'transfer_term',
'substitution',
'substitution__alias',
'substitution__term_resolved',
'transfer_term'
original_alias=F('original__alias'),
original_term=F('original__term_resolved'),
substitution_alias=F('substitution__alias'),
substitution_term=F('substitution__term_resolved'),
):
result['substitutions'].append(substitution)
return result

View File

@ -0,0 +1,3 @@
''' Tests. '''
from .s_models import *
from .s_views import *

View File

@ -0,0 +1 @@
''' Tests for Django Models. '''

View File

@ -0,0 +1,2 @@
''' Tests for REST API. '''
from .t_oss import *

View File

@ -0,0 +1,189 @@
''' Testing API: Operation Schema. '''
from rest_framework import status
from apps.oss.models import Operation, OperationSchema, OperationType
from apps.rsform.models import AccessPolicy, LibraryItem, LibraryItemType, LocationHead, RSForm
from shared.EndpointTester import EndpointTester, decl_endpoint
class TestOssViewset(EndpointTester):
''' Testing OSS view. '''
def setUp(self):
super().setUp()
self.owned = OperationSchema.create(title='Test', alias='T1', owner=self.user)
self.owned_id = self.owned.item.pk
self.unowned = OperationSchema.create(title='Test2', alias='T2')
self.unowned_id = self.unowned.item.pk
self.private = OperationSchema.create(title='Test2', alias='T2', access_policy=AccessPolicy.PRIVATE)
self.private_id = self.private.item.pk
self.invalid_id = self.private.item.pk + 1337
def populateData(self):
self.ks1 = RSForm.create(alias='KS1', title='Test1')
self.ks1x1 = self.ks1.insert_new('X1', term_resolved='X1_1')
self.ks2 = RSForm.create(alias='KS2', title='Test2')
self.ks2x1 = self.ks2.insert_new('X2', term_resolved='X1_2')
self.operation1 = self.owned.create_operation(
alias='1',
operation_type=OperationType.INPUT,
result=self.ks1.item
)
self.operation2 = self.owned.create_operation(
alias='2',
operation_type=OperationType.INPUT,
result=self.ks2.item
)
self.operation3 = self.owned.create_operation(
alias='3',
operation_type=OperationType.SYNTHESIS
)
self.owned.add_argument(self.operation3, self.operation1)
self.owned.add_argument(self.operation3, self.operation2)
self.owned.set_substitutions(self.operation3, [{
'original': self.ks1x1,
'substitution': self.ks2x1,
'transfer_term': False
}])
@decl_endpoint('/api/oss/{item}/details', method='get')
def test_details(self):
self.populateData()
response = self.executeOK(item=self.owned_id)
self.assertEqual(response.data['owner'], self.owned.item.owner.pk)
self.assertEqual(response.data['title'], self.owned.item.title)
self.assertEqual(response.data['alias'], self.owned.item.alias)
self.assertEqual(response.data['location'], self.owned.item.location)
self.assertEqual(response.data['access_policy'], self.owned.item.access_policy)
self.assertEqual(response.data['visible'], self.owned.item.visible)
self.assertEqual(response.data['item_type'], LibraryItemType.OPERATION_SCHEMA)
self.assertEqual(len(response.data['items']), 3)
self.assertEqual(response.data['items'][0]['id'], self.operation1.pk)
self.assertEqual(response.data['items'][0]['operation_type'], self.operation1.operation_type)
self.assertEqual(len(response.data['substitutions']), 1)
sub = response.data['substitutions'][0]
self.assertEqual(sub['operation'], self.operation3.pk)
self.assertEqual(sub['original'], self.ks1x1.pk)
self.assertEqual(sub['substitution'], self.ks2x1.pk)
self.assertEqual(sub['transfer_term'], False)
self.assertEqual(sub['original_alias'], self.ks1x1.alias)
self.assertEqual(sub['original_term'], self.ks1x1.term_resolved)
self.assertEqual(sub['substitution_alias'], self.ks2x1.alias)
self.assertEqual(sub['substitution_term'], self.ks2x1.term_resolved)
graph = response.data['graph']
self.assertEqual(len(graph), 2)
self.assertEqual(graph[0]['operation'], self.operation3.pk)
self.assertEqual(graph[0]['argument'], self.operation1.pk)
self.assertEqual(graph[1]['operation'], self.operation3.pk)
self.assertEqual(graph[1]['argument'], self.operation2.pk)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
self.logout()
self.executeOK(item=self.owned_id)
self.executeOK(item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/update-positions', method='patch')
def test_update_positions(self):
self.populateData()
self.executeBadData(item=self.owned_id)
data = {'positions': []}
self.executeOK(data=data)
data = {'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337},
{'id': self.operation2.pk, 'position_x': 36.1, 'position_y': 1437},
{'id': self.invalid_id, 'position_x': 31, 'position_y': 12},
]}
self.toggle_admin(True)
self.executeOK(data=data, item=self.unowned_id)
self.operation1.refresh_from_db()
self.assertNotEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertNotEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.toggle_admin(False)
self.executeOK(data=data, item=self.owned_id)
self.operation1.refresh_from_db()
self.operation2.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.assertEqual(self.operation2.position_x, data['positions'][1]['position_x'])
self.assertEqual(self.operation2.position_y, data['positions'][1]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.executeForbidden(item=self.private_id)
@decl_endpoint('/api/oss/{item}/create-operation', method='post')
def test_create_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'item_data': {
'alias': 'Test3',
'title': 'Test title',
'comment': 'Тест кириллицы',
'position_x': 1,
'position_y': 1,
},
'positions': [
{'id': self.operation1.pk, 'position_x': 42.1, 'position_y': 1337}
]
}
self.executeBadData(data=data)
data['item_data']['operation_type'] = 'invalid'
self.executeBadData(data=data)
data['item_data']['operation_type'] = OperationType.INPUT
response = self.executeCreated(data=data)
self.assertEqual(len(response.data['oss']['items']), 4)
new_operation = response.data['new_operation']
self.assertEqual(new_operation['alias'], data['item_data']['alias'])
self.assertEqual(new_operation['operation_type'], data['item_data']['operation_type'])
self.assertEqual(new_operation['title'], data['item_data']['title'])
self.assertEqual(new_operation['comment'], data['item_data']['comment'])
self.assertEqual(new_operation['position_x'], data['item_data']['position_x'])
self.assertEqual(new_operation['position_y'], data['item_data']['position_y'])
self.operation1.refresh_from_db()
self.assertEqual(self.operation1.position_x, data['positions'][0]['position_x'])
self.assertEqual(self.operation1.position_y, data['positions'][0]['position_y'])
self.executeForbidden(data=data, item=self.unowned_id)
self.toggle_admin(True)
self.executeCreated(data=data, item=self.unowned_id)
@decl_endpoint('/api/oss/{item}/delete-operation', method='patch')
def test_delete_operation(self):
self.executeNotFound(item=self.invalid_id)
self.populateData()
self.executeBadData(item=self.owned_id)
data = {
'positions': []
}
self.executeBadData(data=data)
data['target'] = self.operation1.pk
self.toggle_admin(True)
self.executeBadData(data=data, item=self.unowned_id)
self.logout()
self.executeForbidden(data=data, item=self.owned_id)
self.login()
response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 2)

View File

@ -5,7 +5,7 @@ from rest_framework import routers
from . import views
library_router = routers.SimpleRouter(trailing_slash=False)
library_router.register('rsforms', views.OssViewSet, 'RSForm')
library_router.register('oss', views.OssViewSet, 'OSS')
urlpatterns = [
path('', include(library_router.urls)),

View File

@ -1,6 +1,7 @@
''' Endpoints for OSS. '''
from typing import cast
from django.db import transaction
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import generics
from rest_framework import status as c
@ -28,8 +29,9 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
def get_permissions(self):
''' Determine permission class. '''
if self.action in [
'operation_create',
'operation_delete'
'create_operation',
'delete_operation',
'update_positions'
]:
permission_list = [permissions.ItemEditor]
elif self.action in ['details']:
@ -56,55 +58,84 @@ class OssViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retriev
data=serializer.data
)
@extend_schema(
summary='update positions',
tags=['OSS'],
request=s.PositionsSerializer,
responses={
c.HTTP_200_OK: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='update-positions')
def update_positions(self, request: Request, pk):
''' Endpoint: Update operations positions. '''
schema = self._get_schema()
serializer = s.PositionsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
schema.update_positions(serializer.validated_data['positions'])
return Response(status=c.HTTP_200_OK)
@extend_schema(
summary='create operation',
tags=['OSS'],
request=s.OperationCreateSerializer(),
responses={
c.HTTP_201_CREATED: s.NewOperationResponse,
c.HTTP_403_FORBIDDEN: None
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['post'], url_path='operation-create')
def operation_create(self, request: Request, pk):
@action(detail=True, methods=['post'], url_path='create-operation')
def create_operation(self, request: Request, pk):
''' Create new operation. '''
schema = self._get_schema()
serializer = s.OperationCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
new_operation = schema.create_operation(*data)
schema.item.refresh_from_db()
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
new_operation = schema.create_operation(**serializer.validated_data['item_data'])
schema.item.refresh_from_db()
response = Response(
status=c.HTTP_201_CREATED,
data={
'new_operation': s.OperationSerializer(new_operation).data,
'schema': s.OperationSchemaSerializer(schema.item).data
'oss': s.OperationSchemaSerializer(schema.item).data
}
)
return response
# @extend_schema(
# summary='delete operation',
# tags=['RSForm'],
# request=s.CstListSerializer,
# responses={
# c.HTTP_200_OK: s.RSFormParseSerializer,
# c.HTTP_403_FORBIDDEN: None,
# c.HTTP_404_NOT_FOUND: None
# }
# )
# @action(detail=True, methods=['patch'], url_path='operation-delete')
# def operation_delete(self, request: Request, pk):
# ''' Endpoint: Delete operation. '''
# schema = self._get_schema()
# serializer = s.CstListSerializer(
# data=request.data,
# context={'schema': schema.item}
# )
# serializer.is_valid(raise_exception=True)
# schema.delete_cst(serializer.validated_data['items'])
# schema.item.refresh_from_db()
# return Response(
# status=c.HTTP_200_OK,
# data=s.RSFormParseSerializer(schema.item).data
# )
@extend_schema(
summary='delete operation',
tags=['OSS'],
request=s.OperationDeleteSerializer,
responses={
c.HTTP_200_OK: s.OperationSchemaSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@action(detail=True, methods=['patch'], url_path='delete-operation')
def delete_operation(self, request: Request, pk):
''' Endpoint: Delete operation. '''
schema = self._get_schema()
serializer = s.OperationDeleteSerializer(
data=request.data,
context={'oss': schema.item}
)
serializer.is_valid(raise_exception=True)
with transaction.atomic():
schema.update_positions(serializer.validated_data['positions'])
schema.delete_operation(serializer.validated_data['target'])
schema.item.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.OperationSchemaSerializer(schema.item).data
)

View File

@ -35,7 +35,7 @@ class LocationSerializer(serializers.Serializer):
class AccessPolicySerializer(serializers.Serializer):
''' Serializer: Constituenta renaming. '''
access_policy = serializers.CharField(max_length=500)
access_policy = serializers.CharField()
def validate(self, attrs):
attrs = super().validate(attrs)

View File

@ -141,6 +141,8 @@ class CstDetailsSerializer(serializers.ModelSerializer):
class CstCreateSerializer(serializers.ModelSerializer):
''' Serializer: Constituenta creation. '''
insert_after = serializers.IntegerField(required=False, allow_null=True)
alias = serializers.CharField(max_length=8)
cst_type = serializers.ChoiceField(CstType.choices)
class Meta:
''' serializer metadata. '''

View File

@ -1,5 +1,5 @@
''' Tests. '''
from .s_models.t_RSForm import *
from .s_models import *
from .s_views import *
from .t_graph import *
from .t_imports import *

View File

@ -1,4 +1,4 @@
''' Tests for REST API. '''
''' Tests for Django Models. '''
from .t_Constituenta import *
from .t_Editor import *
from .t_LibraryItem import *

View File

@ -14,20 +14,20 @@ class TestNaturalLanguageViews(EndpointTester):
@decl_endpoint(endpoint='/api/cctext/parse', method='post')
def test_parse_text(self):
data = {'text': 'синим слонам'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self._assert_tags(response.data['result'], 'datv,NOUN,plur,anim,masc')
@decl_endpoint(endpoint='/api/cctext/inflect', method='post')
def test_inflect(self):
data = {'text': 'синий слон', 'grams': 'plur,datv'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], 'синим слонам')
@decl_endpoint(endpoint='/api/cctext/generate-lexeme', method='post')
def test_generate_lexeme(self):
data = {'text': 'синий слон'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(len(response.data['items']), 12)
self.assertEqual(response.data['items'][0]['text'], 'синий слон')

View File

@ -51,18 +51,18 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch')
def test_partial_update(self):
data = {'convention': 'tt'}
self.executeForbidden(data, item=self.cst2.pk)
self.executeForbidden(data=data, item=self.cst2.pk)
self.logout()
self.executeForbidden(data, item=self.cst1.pk)
self.executeForbidden(data=data, item=self.cst1.pk)
self.login()
response = self.executeOK(data, item=self.cst1.pk)
response = self.executeOK(data=data, item=self.cst1.pk)
self.cst1.refresh_from_db()
self.assertEqual(response.data['convention'], 'tt')
self.assertEqual(self.cst1.convention, 'tt')
self.executeOK(data, item=self.cst1.pk)
self.executeOK(data=data, item=self.cst1.pk)
@decl_endpoint('/api/constituents/{item}', method='patch')
@ -71,7 +71,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': 'New term',
'definition_raw': 'New def'
}
response = self.executeOK(data, item=self.cst3.pk)
response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db()
self.assertEqual(response.data['term_resolved'], 'New term')
self.assertEqual(self.cst3.term_resolved, 'New term')
@ -85,7 +85,7 @@ class TestConstituentaAPI(EndpointTester):
'term_raw': '@{X1|nomn,sing}',
'definition_raw': '@{X1|nomn,sing} @{X1|sing,datv}'
}
response = self.executeOK(data, item=self.cst3.pk)
response = self.executeOK(data=data, item=self.cst3.pk)
self.cst3.refresh_from_db()
self.assertEqual(self.cst3.term_resolved, self.cst1.term_resolved)
self.assertEqual(response.data['term_resolved'], self.cst1.term_resolved)
@ -96,7 +96,7 @@ class TestConstituentaAPI(EndpointTester):
@decl_endpoint('/api/constituents/{item}', method='patch')
def test_readonly_cst_fields(self):
data = {'alias': 'X33', 'order': 10}
response = self.executeOK(data, item=self.cst1.pk)
response = self.executeOK(data=data, item=self.cst1.pk)
self.assertEqual(response.data['alias'], 'X1')
self.assertEqual(response.data['alias'], self.cst1.alias)
self.assertEqual(response.data['order'], self.cst1.order)

View File

@ -48,7 +48,7 @@ class TestLibraryViewset(EndpointTester):
'title': 'Title',
'alias': 'alias',
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'item_type': LibraryItemType.OPERATION_SCHEMA,
@ -58,7 +58,7 @@ class TestLibraryViewset(EndpointTester):
'visible': False,
'read_only': True
}
response = self.executeCreated(data)
response = self.executeCreated(data=data)
self.assertEqual(response.data['owner'], self.user.pk)
self.assertEqual(response.data['item_type'], data['item_type'])
self.assertEqual(response.data['title'], data['title'])
@ -69,25 +69,25 @@ class TestLibraryViewset(EndpointTester):
self.logout()
data = {'title': 'Title2'}
self.executeForbidden(data)
self.executeForbidden(data=data)
@decl_endpoint('/api/library/{item}', method='patch')
def test_update(self):
data = {'id': self.unowned.pk, 'title': 'New Title'}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_editor(self.unowned, True)
response = self.executeOK(data, item=self.unowned.pk)
response = self.executeOK(data=data, item=self.unowned.pk)
self.assertEqual(response.data['title'], data['title'])
self.unowned.access_policy = AccessPolicy.PRIVATE
self.unowned.save()
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
data = {'id': self.owned.pk, 'title': 'New Title'}
response = self.executeOK(data, item=self.owned.pk)
response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['alias'], self.owned.alias)
@ -98,7 +98,7 @@ class TestLibraryViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED,
'location': LocationHead.LIBRARY
}
response = self.executeOK(data, item=self.owned.pk)
response = self.executeOK(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(response.data['owner'], self.owned.owner.pk)
self.assertEqual(response.data['access_policy'], self.owned.access_policy)
@ -111,22 +111,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user)
data = {'user': self.user2.pk}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user2)
self.assertEqual(self.owned.time_update, time_update)
self.executeForbidden(data, item=self.owned.pk)
self.executeForbidden(data=data, item=self.owned.pk)
self.toggle_admin(True)
data = {'user': self.user.pk}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.owner, self.user)
@ -135,20 +135,20 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'access_policy': 'invalid'}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'access_policy': AccessPolicy.PRIVATE}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.access_policy, data['access_policy'])
self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True)
self.executeOK(data, item=self.unowned.pk)
self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db()
self.assertEqual(self.unowned.access_policy, data['access_policy'])
@ -157,29 +157,29 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'location': 'invalid'}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'location': '/U/temp'}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location'])
data = {'location': LocationHead.LIBRARY}
self.executeForbidden(data, item=self.owned.pk)
self.executeForbidden(data=data, item=self.owned.pk)
data = {'location': '/U/temp'}
self.toggle_editor(self.unowned, True)
self.executeForbidden(data, item=self.unowned.pk)
self.executeForbidden(data=data, item=self.unowned.pk)
self.toggle_admin(True)
data = {'location': LocationHead.LIBRARY}
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.location, data['location'])
self.executeOK(data, item=self.unowned.pk)
self.executeOK(data=data, item=self.unowned.pk)
self.unowned.refresh_from_db()
self.assertEqual(self.unowned.location, data['location'])
@ -188,22 +188,22 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
data = {'user': self.user2.pk}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user, self.user2]))
@ -212,25 +212,25 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'user': self.invalid_user}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'user': self.user.pk}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user)
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [])
Editor.add(item=self.owned, user=self.user)
Editor.add(item=self.owned, user=self.user2)
data = {'user': self.user2.pk}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
@ -239,30 +239,30 @@ class TestLibraryViewset(EndpointTester):
time_update = self.owned.time_update
data = {'users': [self.invalid_user]}
self.executeBadData(data, item=self.owned.pk)
self.executeBadData(data=data, item=self.owned.pk)
data = {'users': [self.user.pk]}
self.executeNotFound(data, item=self.invalid_item)
self.executeForbidden(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeForbidden(data=data, item=self.unowned.pk)
self.executeOK(data, item=self.owned.pk)
self.executeOK(data=data, item=self.owned.pk)
self.owned.refresh_from_db()
self.assertEqual(self.owned.time_update, time_update)
self.assertEqual(self.owned.editors(), [self.user])
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user])
data = {'users': [self.user2.pk]}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [self.user2])
data = {'users': []}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(self.owned.editors(), [])
data = {'users': [self.user2.pk, self.user.pk]}
self.executeOK(data)
self.executeOK(data=data)
self.assertEqual(set(self.owned.editors()), set([self.user2, self.user]))
@ -375,11 +375,11 @@ class TestLibraryViewset(EndpointTester):
)
data = {'title': 'Title1337'}
self.executeNotFound(data, item=self.invalid_item)
self.executeCreated(data, item=self.unowned.pk)
self.executeNotFound(data=data, item=self.invalid_item)
self.executeCreated(data=data, item=self.unowned.pk)
data = {'title': 'Title1338'}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 2)
self.assertEqual(response.data['items'][0]['alias'], x12.alias)
@ -389,12 +389,12 @@ class TestLibraryViewset(EndpointTester):
self.assertEqual(response.data['items'][1]['term_resolved'], d2.term_resolved)
data = {'title': 'Title1340', 'items': []}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 0)
data = {'title': 'Title1341', 'items': [x12.pk]}
response = self.executeCreated(data, item=self.owned.pk)
response = self.executeCreated(data=data, item=self.owned.pk)
self.assertEqual(response.data['title'], data['title'])
self.assertEqual(len(response.data['items']), 1)
self.assertEqual(response.data['items'][0]['alias'], x12.alias)

View File

@ -23,20 +23,20 @@ class TestInlineSynthesis(EndpointTester):
'items': [],
'substitutions': []
}
self.executeForbidden(data)
self.executeForbidden(data=data)
data['receiver'] = invalid_id
self.executeBadData(data)
self.executeBadData(data=data)
data['receiver'] = self.schema1.item.pk
data['source'] = invalid_id
self.executeBadData(data)
self.executeBadData(data=data)
data['source'] = self.schema1.item.pk
self.executeOK(data)
self.executeOK(data=data)
data['items'] = [invalid_id]
self.executeBadData(data)
self.executeBadData(data=data)
def test_inline_synthesis(self):
@ -67,7 +67,7 @@ class TestInlineSynthesis(EndpointTester):
}
]
}
response = self.executeOK(data)
response = self.executeOK(data=data)
result = {item['alias']: item for item in response.data['items']}
self.assertEqual(len(result), 6)
self.assertEqual(result['X2']['term_raw'], ks1_x2.term_raw)

View File

@ -43,7 +43,7 @@ class TestRSFormViewset(EndpointTester):
'access_policy': AccessPolicy.PROTECTED,
'visible': False
}
self.executeBadData(data)
self.executeBadData(data=data)
with open(f'{work_dir}/data/sample-rsform.trs', 'rb') as file:
data['file'] = file
@ -123,14 +123,14 @@ class TestRSFormViewset(EndpointTester):
def test_check(self):
self.owned.insert_new('X1')
data = {'expression': 'X1=X1'}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[X1][X1]]')
self.assertEqual(response.data['typification'], 'LOGIC')
self.assertEqual(response.data['valueClass'], 'value')
self.executeOK(data, item=self.unowned_id)
self.executeOK(data=data, item=self.unowned_id)
@decl_endpoint('/api/rsforms/{item}/resolve', method='post')
@ -141,7 +141,7 @@ class TestRSFormViewset(EndpointTester):
)
data = {'text': '@{1|редкий} @{X1|plur,datv}'}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['input'], '@{1|редкий} @{X1|plur,datv}')
self.assertEqual(response.data['output'], 'редким синим слонам')
self.assertEqual(len(response.data['refs']), 2)
@ -188,13 +188,19 @@ class TestRSFormViewset(EndpointTester):
@decl_endpoint('/api/rsforms/{item}/cst-create', method='post')
def test_create_constituenta(self):
data = {'alias': 'X3', 'cst_type': CstType.BASE}
self.executeForbidden(data, item=self.unowned_id)
data = {'alias': 'X3'}
self.executeForbidden(data=data, item=self.unowned_id)
self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
self.executeBadData(item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
response = self.executeCreated(data, item=self.owned_id)
data['cst_type'] = 'invalid'
self.executeBadData(data=data, item=self.owned_id)
data['cst_type'] = CstType.BASE
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3')
x3 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x3.order, 3)
@ -206,7 +212,7 @@ class TestRSFormViewset(EndpointTester):
'term_raw': 'test',
'term_forms': [{'text': 'form1', 'tags': 'sing,datv'}]
}
response = self.executeCreated(data, item=self.owned_id)
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], data['alias'])
x4 = Constituenta.objects.get(alias=response.data['new_cst']['alias'])
self.assertEqual(x4.order, 3)
@ -233,14 +239,14 @@ class TestRSFormViewset(EndpointTester):
)
data = {'target': x2_2.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
self.executeForbidden(data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id)
self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x1.alias, 'cst_type': CstType.TERM}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'target': x1.pk, 'alias': x3.alias}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new(
alias='D1',
@ -252,7 +258,7 @@ class TestRSFormViewset(EndpointTester):
self.assertEqual(x1.cst_type, CstType.BASE)
data = {'target': x1.pk, 'alias': 'D2', 'cst_type': CstType.TERM}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'D2')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.TERM)
d1.refresh_from_db()
@ -279,14 +285,14 @@ class TestRSFormViewset(EndpointTester):
unowned = self.unowned.insert_new('X2')
data = {'substitutions': [{'original': x1.pk, 'substitution': unowned.pk, 'transfer_term': True}]}
self.executeForbidden(data, item=self.unowned_id)
self.executeBadData(data, item=self.owned_id)
self.executeForbidden(data=data, item=self.unowned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': unowned.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
data = {'substitutions': [{'original': x1.pk, 'substitution': x1.pk, 'transfer_term': True}]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
d1 = self.owned.insert_new(
alias='D1',
@ -294,7 +300,7 @@ class TestRSFormViewset(EndpointTester):
definition_formal='X1'
)
data = {'substitutions': [{'original': x1.pk, 'substitution': x2.pk, 'transfer_term': True}]}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
d1.refresh_from_db()
x2.refresh_from_db()
self.assertEqual(x2.term_raw, 'Test1')
@ -314,7 +320,7 @@ class TestRSFormViewset(EndpointTester):
)
data = {'substitutions': []}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'substitutions': [
{
@ -328,7 +334,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True
}
]}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'substitutions': [
{
@ -342,7 +348,7 @@ class TestRSFormViewset(EndpointTester):
'transfer_term': True
}
]}
response = self.executeOK(data, item=self.owned_id)
response = self.executeOK(data=data, item=self.owned_id)
d3.refresh_from_db()
self.assertEqual(d3.definition_formal, r'D1 \ D2')
@ -357,7 +363,7 @@ class TestRSFormViewset(EndpointTester):
'definition_formal': '3',
'definition_raw': '4'
}
response = self.executeCreated(data, item=self.owned_id)
response = self.executeCreated(data=data, item=self.owned_id)
self.assertEqual(response.data['new_cst']['alias'], 'X3')
self.assertEqual(response.data['new_cst']['cst_type'], CstType.BASE)
self.assertEqual(response.data['new_cst']['convention'], '1')
@ -373,13 +379,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id)
data = {'items': [1337]}
self.executeBadData(data)
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
data = {'items': [x1.pk]}
response = self.executeOK(data)
response = self.executeOK(data=data)
x2.refresh_from_db()
self.owned.item.refresh_from_db()
self.assertEqual(len(response.data['items']), 1)
@ -389,7 +395,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk]}
self.executeBadData(data, item=self.owned_id)
self.executeBadData(data=data, item=self.owned_id)
@decl_endpoint('/api/rsforms/{item}/cst-moveto', method='patch')
@ -397,13 +403,13 @@ class TestRSFormViewset(EndpointTester):
self.set_params(item=self.owned_id)
data = {'items': [1337], 'move_to': 1}
self.executeBadData(data)
self.executeBadData(data=data)
x1 = self.owned.insert_new('X1')
x2 = self.owned.insert_new('X2')
data = {'items': [x2.pk], 'move_to': 1}
response = self.executeOK(data)
response = self.executeOK(data=data)
x1.refresh_from_db()
x2.refresh_from_db()
self.assertEqual(response.data['id'], self.owned_id)
@ -412,7 +418,7 @@ class TestRSFormViewset(EndpointTester):
x3 = self.unowned.insert_new('X1')
data = {'items': [x3.pk], 'move_to': 1}
self.executeBadData(data)
self.executeBadData(data=data)
@decl_endpoint('/api/rsforms/{item}/reset-aliases', method='patch')

View File

@ -8,30 +8,30 @@ class TestRSLanguageViews(EndpointTester):
@decl_endpoint('/api/rslang/to-ascii', method='post')
def test_convert_to_ascii(self):
data = {'data': '1=1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': '1=1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1 \eq 1')
@decl_endpoint('/api/rslang/to-math', method='post')
def test_convert_to_math(self):
data = {'data': r'1 \eq 1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': r'1 \eq 1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['result'], r'1=1')
@decl_endpoint('/api/rslang/parse-expression', method='post')
def test_parse_expression(self):
data = {'data': r'1=1'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'expression': r'1=1'}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.assertEqual(response.data['parseResult'], True)
self.assertEqual(response.data['syntax'], 'math')
self.assertEqual(response.data['astText'], '[=[1][1]]')

View File

@ -30,11 +30,11 @@ class TestVersionViews(EndpointTester):
invalid_id = 1338
data = {'version': '1.0.0', 'description': 'test'}
self.executeNotFound(data, schema=invalid_id)
self.executeForbidden(data, schema=self.unowned.pk)
self.executeBadData(invalid_data, schema=self.owned.pk)
self.executeNotFound(data=data, schema=invalid_id)
self.executeForbidden(data=data, schema=self.unowned.pk)
self.executeBadData(data=invalid_data, schema=self.owned.pk)
response = self.executeCreated(data, schema=self.owned.pk)
response = self.executeCreated(data=data, schema=self.owned.pk)
self.assertTrue('version' in response.data)
self.assertTrue('schema' in response.data)
self.assertTrue(response.data['version'] in [v['id'] for v in response.data['schema']['versions']])
@ -64,7 +64,7 @@ class TestVersionViews(EndpointTester):
@decl_endpoint('/api/versions/{version}', method='get')
def test_access_version(self):
data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data)
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
self.executeNotFound(version=invalid_id)
@ -78,14 +78,14 @@ class TestVersionViews(EndpointTester):
data = {'version': '1.2.0', 'description': 'test1'}
self.method = 'patch'
self.executeForbidden(data)
self.executeForbidden(data=data)
self.method = 'delete'
self.executeForbidden()
self.client.force_authenticate(user=self.user)
self.method = 'patch'
self.executeOK(data)
self.executeOK(data=data)
response = self.get()
self.assertEqual(response.data['version'], data['version'])
self.assertEqual(response.data['description'], data['description'])
@ -138,7 +138,7 @@ class TestVersionViews(EndpointTester):
x2 = self.schema.insert_new('X2')
d1 = self.schema.insert_new('D1', term_raw='TestTerm')
data = {'version': '1.0.0', 'description': 'test'}
version_id = self._create_version(data)
version_id = self._create_version(data=data)
invalid_id = version_id + 1337
d1.delete()

View File

@ -43,17 +43,27 @@ class LibraryViewSet(viewsets.ModelViewSet):
def get_permissions(self):
if self.action in ['update', 'partial_update']:
permission_list = [permissions.ItemEditor]
access_level = permissions.ItemEditor
elif self.action in [
'destroy', 'set_owner', 'set_access_policy', 'set_location',
'editors_add', 'editors_remove', 'editors_set'
'destroy',
'set_owner',
'set_access_policy',
'set_location',
'editors_add',
'editors_remove',
'editors_set'
]:
permission_list = [permissions.ItemOwner]
elif self.action in ['create', 'clone', 'subscribe', 'unsubscribe']:
permission_list = [permissions.GlobalUser]
access_level = permissions.ItemOwner
elif self.action in [
'create',
'clone',
'subscribe',
'unsubscribe'
]:
access_level = permissions.GlobalUser
else:
permission_list = [permissions.ItemAnyone]
return [permission() for permission in permission_list]
access_level = permissions.ItemAnyone
return [access_level()]
def _get_item(self) -> m.LibraryItem:
return cast(m.LibraryItem, self.get_object())
@ -69,7 +79,6 @@ class LibraryViewSet(viewsets.ModelViewSet):
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['post'], url_path='clone')
def clone(self, request: Request, pk):
''' Endpoint: Create deep copy of library item. '''
@ -86,19 +95,20 @@ class LibraryViewSet(viewsets.ModelViewSet):
clone.read_only = False
clone.access_policy = serializer.validated_data.get('access_policy', m.AccessPolicy.PUBLIC)
clone.location = serializer.validated_data.get('location', m.LocationHead.USER)
clone.save()
if clone.item_type == m.LibraryItemType.RSFORM:
need_filter = 'items' in request.data
for cst in m.RSForm(item).constituents():
if not need_filter or cst.pk in request.data['items']:
cst.pk = None
cst.schema = clone
cst.save()
return Response(
status=c.HTTP_201_CREATED,
data=s.RSFormParseSerializer(clone).data
)
with transaction.atomic():
clone.save()
if clone.item_type == m.LibraryItemType.RSFORM:
need_filter = 'items' in request.data
for cst in m.RSForm(item).constituents():
if not need_filter or cst.pk in request.data['items']:
cst.pk = None
cst.schema = clone
cst.save()
return Response(
status=c.HTTP_201_CREATED,
data=s.RSFormParseSerializer(clone).data
)
return Response(status=c.HTTP_400_BAD_REQUEST)
@extend_schema(
@ -267,24 +277,17 @@ class LibraryActiveView(generics.ListAPIView):
serializer_class = s.LibraryItemSerializer
def get_queryset(self):
common_location = Q(location__startswith=m.LocationHead.COMMON) | Q(location__startswith=m.LocationHead.LIBRARY)
is_public = Q(access_policy=m.AccessPolicy.PUBLIC)
if self.request.user.is_anonymous:
return m.LibraryItem.objects.filter(
Q(access_policy=m.AccessPolicy.PUBLIC),
).filter(
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
).order_by('-time_update')
return m.LibraryItem.objects \
.filter(is_public) \
.filter(common_location).order_by('-time_update')
else:
user = cast(m.User, self.request.user)
# pylint: disable=unsupported-binary-operation
return m.LibraryItem.objects.filter(
(
Q(access_policy=m.AccessPolicy.PUBLIC) &
(
Q(location__startswith=m.LocationHead.COMMON) |
Q(location__startswith=m.LocationHead.LIBRARY)
)
) |
(is_public & common_location) |
Q(owner=user) |
Q(editor__editor=user) |
Q(subscription__user=user)

View File

@ -18,7 +18,6 @@ from .. import serializers as s
request=s.InlineSynthesisSerializer,
responses={c.HTTP_200_OK: s.RSFormParseSerializer}
)
@transaction.atomic
@api_view(['PATCH'])
def inline_synthesis(request: Request):
''' Endpoint: Inline synthesis. '''
@ -30,20 +29,21 @@ def inline_synthesis(request: Request):
schema = m.RSForm(serializer.validated_data['receiver'])
items = cast(list[m.Constituenta], serializer.validated_data['items'])
new_items = schema.insert_copy(items)
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
if original in items:
index = next(i for (i, cst) in enumerate(items) if cst == original)
original = new_items[index]
else:
index = next(i for (i, cst) in enumerate(items) if cst == replacement)
replacement = new_items[index]
schema.substitute(original, replacement, substitution['transfer_term'])
with transaction.atomic():
new_items = schema.insert_copy(items)
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
if original in items:
index = next(i for (i, cst) in enumerate(items) if cst == original)
original = new_items[index]
else:
index = next(i for (i, cst) in enumerate(items) if cst == replacement)
replacement = new_items[index]
schema.substitute(original, replacement, substitution['transfer_term'])
schema.restore_order()
schema.restore_order()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data

View File

@ -34,11 +34,21 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
def get_permissions(self):
''' Determine permission class. '''
if self.action in [
'load_trs', 'cst_create', 'cst_delete_multiple',
'reset_aliases', 'cst_rename', 'cst_substitute'
'load_trs',
'reset_aliases',
'cst_create',
'cst_delete_multiple',
'cst_rename',
'cst_substitute'
]:
permission_list = [permissions.ItemEditor]
elif self.action in ['contents', 'details', 'export_trs', 'resolve', 'check']:
elif self.action in [
'contents',
'details',
'export_trs',
'resolve',
'check'
]:
permission_list = [permissions.ItemAnyone]
else:
permission_list = [permissions.Anyone]
@ -50,6 +60,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstCreateSerializer,
responses={
c.HTTP_201_CREATED: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -87,6 +98,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstTargetSerializer,
responses={
c.HTTP_200_OK: s.NewMultiCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -123,11 +135,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstRenameSerializer,
responses={
c.HTTP_200_OK: s.NewCstResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-rename')
def cst_rename(self, request: Request, pk):
''' Rename constituenta possibly changing type. '''
@ -140,12 +152,12 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
cst.alias = serializer.validated_data['alias']
cst.cst_type = serializer.validated_data['cst_type']
cst.save()
mapping = {old_alias: cst.alias}
schema.apply_mapping(mapping, change_aliases=False)
schema.item.refresh_from_db()
cst.refresh_from_db()
with transaction.atomic():
cst.save()
schema.apply_mapping(mapping={old_alias: cst.alias}, change_aliases=False)
schema.item.refresh_from_db()
cst.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
@ -161,11 +173,11 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstSubstituteSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
)
@transaction.atomic
@action(detail=True, methods=['patch'], url_path='cst-substitute')
def cst_substitute(self, request: Request, pk):
''' Substitute occurrences of constituenta with another one. '''
@ -175,11 +187,13 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
context={'schema': schema.item}
)
serializer.is_valid(raise_exception=True)
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
schema.substitute(original, replacement, substitution['transfer_term'])
schema.item.refresh_from_db()
with transaction.atomic():
for substitution in serializer.validated_data['substitutions']:
original = cast(m.Constituenta, substitution['original'])
replacement = cast(m.Constituenta, substitution['substitution'])
schema.substitute(original, replacement, substitution['transfer_term'])
schema.item.refresh_from_db()
return Response(
status=c.HTTP_200_OK,
data=s.RSFormParseSerializer(schema.item).data
@ -191,6 +205,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstListSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -217,6 +232,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.CstMoveSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}
@ -285,6 +301,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
request=s.RSFormUploadSerializer,
responses={
c.HTTP_200_OK: s.RSFormParseSerializer,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}

View File

@ -55,6 +55,7 @@ class VersionViewset(
request=s.VersionCreateSerializer,
responses={
c.HTTP_201_CREATED: s.NewVersionResponse,
c.HTTP_400_BAD_REQUEST: None,
c.HTTP_403_FORBIDDEN: None,
c.HTTP_404_NOT_FOUND: None
}

View File

@ -16,15 +16,15 @@ class TestUserAPIViews(EndpointTester):
def test_login(self):
self.logout()
data = {'username': self.user.username, 'password': 'invalid'}
self.executeBadData(data)
self.executeBadData(data=data)
data = {'username': self.user.username, 'password': 'password'}
self.executeAccepted(data)
self.executeAccepted(data)
self.executeAccepted(data=data)
self.executeAccepted(data=data)
self.logout()
data = {'username': self.user.email, 'password': 'password'}
self.executeAccepted(data)
self.executeAccepted(data=data)
@decl_endpoint('/users/api/logout', method='post')
@ -84,7 +84,7 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName',
}
response = self.executeOK(data)
response = self.executeOK(data=data)
self.user.refresh_from_db()
self.assertEqual(response.data['email'], '123@mail.ru')
self.assertEqual(self.user.email, '123@mail.ru')
@ -98,10 +98,10 @@ class TestUserUserProfileAPIView(EndpointTester):
'first_name': 'new',
'last_name': 'new2',
}
self.executeOK(data)
self.executeOK(data=data)
data = {'email': self.user2.email}
self.executeBadData(data)
self.executeBadData(data=data)
self.logout()
self.executeForbidden()
@ -113,14 +113,14 @@ class TestUserUserProfileAPIView(EndpointTester):
'old_password': 'invalid',
'new_password': 'password2'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'old_password': 'password',
'new_password': 'password2'
}
oldHash = self.user.password
response = self.executeNoContent(data)
response = self.executeNoContent(data=data)
self.user.refresh_from_db()
self.assertNotEqual(self.user.password, oldHash)
self.assertTrue(self.client.login(username=self.user.username, password='password2'))
@ -154,7 +154,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'username': 'NewUser',
@ -164,7 +164,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
response = self.executeCreated(data)
response = self.executeCreated(data=data)
self.assertTrue('id' in response.data)
self.assertEqual(response.data['username'], data['username'])
self.assertEqual(response.data['email'], data['email'])
@ -179,7 +179,7 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)
data = {
'username': 'NewUser2',
@ -189,4 +189,4 @@ class TestSignupAPIView(EndpointTester):
'first_name': 'firstName',
'last_name': 'lastName'
}
self.executeBadData(data)
self.executeBadData(data=data)

View File

@ -9,6 +9,7 @@ from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, Spec
urlpatterns = [
path('admin', admin.site.urls),
path('api/', include('apps.rsform.urls')),
path('api/', include('apps.oss.urls')),
path('users/', include('apps.users.urls')),
path('schema', SpectacularAPIView.as_view(), name='schema'),
path('redoc', SpectacularRedocView.as_view()),

View File

@ -11,6 +11,7 @@ from rest_framework.permissions import \
from rest_framework.request import Request
from rest_framework.views import APIView
from apps.oss.models import Operation
from apps.rsform.models import (
AccessPolicy,
Constituenta,
@ -27,6 +28,8 @@ def _extract_item(obj: Any) -> LibraryItem:
return obj
elif isinstance(obj, Constituenta):
return cast(LibraryItem, obj.schema)
elif isinstance(obj, Operation):
return cast(LibraryItem, obj.oss)
elif isinstance(obj, (Version, Subscription, Editor)):
return cast(LibraryItem, obj.item)
raise PermissionDenied({