mirror of
https://github.com/IRBorisov/ConceptPortal.git
synced 2025-06-26 04:50:36 +03:00
Fix backend for versioning
This commit is contained in:
parent
da7af6f8dc
commit
abf11fecc0
|
@ -1,6 +1,6 @@
|
|||
''' Serializers for conceptual schema API. '''
|
||||
import json
|
||||
from typing import Optional, cast
|
||||
from typing import Optional, cast, Union
|
||||
from rest_framework import serializers
|
||||
from django.db import transaction
|
||||
|
||||
|
@ -365,9 +365,14 @@ class ResolverSerializer(serializers.Serializer):
|
|||
|
||||
class PyConceptAdapter:
|
||||
''' RSForm adapter for interacting with pyconcept module. '''
|
||||
def __init__(self, instance: RSForm):
|
||||
self.schema = instance
|
||||
self.data = self._prepare_request()
|
||||
def __init__(self, data: Union[RSForm, dict]):
|
||||
try:
|
||||
if 'items' in cast(dict, data):
|
||||
self.data = self._prepare_request_raw(cast(dict, data))
|
||||
else:
|
||||
self.data = self._prepare_request(cast(RSForm, data))
|
||||
except TypeError:
|
||||
self.data = self._prepare_request(cast(RSForm, data))
|
||||
self._checked_data: Optional[dict] = None
|
||||
|
||||
def parse(self) -> dict:
|
||||
|
@ -378,11 +383,11 @@ class PyConceptAdapter:
|
|||
raise ValueError(msg.pyconceptFailure())
|
||||
return self._checked_data
|
||||
|
||||
def _prepare_request(self) -> dict:
|
||||
def _prepare_request(self, schema: RSForm) -> dict:
|
||||
result: dict = {
|
||||
'items': []
|
||||
}
|
||||
items = self.schema.constituents().order_by('order')
|
||||
items = schema.constituents().order_by('order')
|
||||
for cst in items:
|
||||
result['items'].append({
|
||||
'entityUID': cst.pk,
|
||||
|
@ -394,6 +399,21 @@ class PyConceptAdapter:
|
|||
})
|
||||
return result
|
||||
|
||||
def _prepare_request_raw(self, data: dict) -> dict:
|
||||
result: dict = {
|
||||
'items': []
|
||||
}
|
||||
for cst in data['items']:
|
||||
result['items'].append({
|
||||
'entityUID': cst['id'],
|
||||
'cstType': cst['cst_type'],
|
||||
'alias': cst['alias'],
|
||||
'definition': {
|
||||
'formal': cst['definition_formal']
|
||||
}
|
||||
})
|
||||
return result
|
||||
|
||||
def _produce_response(self):
|
||||
if self._checked_data is not None:
|
||||
return
|
||||
|
@ -482,14 +502,22 @@ class RSFormParseSerializer(serializers.ModelSerializer):
|
|||
|
||||
def to_representation(self, instance: LibraryItem):
|
||||
result = RSFormSerializer(instance).data
|
||||
parse = PyConceptAdapter(RSForm(instance)).parse()
|
||||
for cst_data in result['items']:
|
||||
return self._parse_data(result)
|
||||
|
||||
def from_versioned_data(self, version: int, data: dict) -> dict:
|
||||
''' Load data from version and parse. '''
|
||||
item = cast(LibraryItem, self.instance)
|
||||
result = RSFormSerializer(item).from_versioned_data(version, data)
|
||||
return self._parse_data(result)
|
||||
|
||||
def _parse_data(self, data: dict) -> dict:
|
||||
parse = PyConceptAdapter(data).parse()
|
||||
for cst_data in data['items']:
|
||||
cst_data['parse'] = next(
|
||||
cst['parse'] for cst in parse['items']
|
||||
if cst['id'] == cst_data['id']
|
||||
)
|
||||
return result
|
||||
|
||||
return data
|
||||
|
||||
class RSFormUploadSerializer(serializers.Serializer):
|
||||
''' Upload data for RSForm serializer. '''
|
||||
|
@ -542,6 +570,41 @@ class RSFormTRSSerializer(serializers.Serializer):
|
|||
},
|
||||
}
|
||||
|
||||
def from_versioned_data(self, data: dict) -> dict:
|
||||
''' Load data from version. '''
|
||||
result = {
|
||||
'type': _TRS_TYPE,
|
||||
'title': data['title'],
|
||||
'alias': data['alias'],
|
||||
'comment': data['comment'],
|
||||
'items': [],
|
||||
'claimed': False,
|
||||
'selection': [],
|
||||
'version': _TRS_VERSION,
|
||||
'versionInfo': _TRS_HEADER
|
||||
}
|
||||
for cst in data['items']:
|
||||
result['items'].append({
|
||||
'entityUID': cst['id'],
|
||||
'type': _CST_TYPE,
|
||||
'cstType': cst['cst_type'],
|
||||
'alias': cst['alias'],
|
||||
'convention': cst['convention'],
|
||||
'term': {
|
||||
'raw': cst['term_raw'],
|
||||
'resolved': cst['term_resolved'],
|
||||
'forms': cst['term_forms']
|
||||
},
|
||||
'definition': {
|
||||
'formal': cst['definition_formal'],
|
||||
'text': {
|
||||
'raw': cst['definition_raw'],
|
||||
'resolved': cst['definition_resolved']
|
||||
},
|
||||
},
|
||||
})
|
||||
return result
|
||||
|
||||
def to_internal_value(self, data):
|
||||
result = super().to_internal_value(data)
|
||||
if 'owner' in data:
|
||||
|
|
|
@ -907,6 +907,54 @@ class TestVersionViews(APITestCase):
|
|||
response = self.client.get(f'/api/versions/{version_id}')
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
def test_retrieve_version_details(self):
|
||||
a1 = Constituenta.objects.create(
|
||||
schema=self.owned.item,
|
||||
alias='A1',
|
||||
cst_type='axiom',
|
||||
definition_formal='X1=X1',
|
||||
order=2
|
||||
)
|
||||
|
||||
data = {'version': '1.0.0', 'description': 'test'}
|
||||
response = self.client.post(
|
||||
f'/api/rsforms/{self.owned.item.id}/versions/create',
|
||||
data=data, format='json'
|
||||
)
|
||||
version_id = response.data['version']
|
||||
|
||||
a1.definition_formal = 'X1=X2'
|
||||
a1.save()
|
||||
|
||||
response = self.client.get(f'/api/rsforms/{self.owned.item.id}/versions/{version_id}')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
loaded_a1 = response.data['items'][1]
|
||||
self.assertEqual(loaded_a1['definition_formal'], 'X1=X1')
|
||||
self.assertEqual(loaded_a1['parse']['status'], 'verified')
|
||||
|
||||
def test_export_version(self):
|
||||
invalid_id = 1338
|
||||
response = self.client.get(f'/api/versions/{invalid_id}/export-file')
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
data = {'version': '1.0.0', 'description': 'test'}
|
||||
response = self.client.post(
|
||||
f'/api/rsforms/{self.owned.item.id}/versions/create',
|
||||
data=data, format='json'
|
||||
)
|
||||
version_id = response.data['version']
|
||||
|
||||
response = self.client.get(f'/api/versions/{version_id}/export-file')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(
|
||||
response.headers['Content-Disposition'],
|
||||
f'attachment; filename={self.owned.item.alias}.trs'
|
||||
)
|
||||
with io.BytesIO(response.content) as stream:
|
||||
with ZipFile(stream, 'r') as zipped_file:
|
||||
self.assertIsNone(zipped_file.testzip())
|
||||
self.assertIn('document.json', zipped_file.namelist())
|
||||
|
||||
|
||||
class TestRSLanguageViews(APITestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -15,6 +15,7 @@ urlpatterns = [
|
|||
path('rsforms/create-detailed', views.create_rsform),
|
||||
|
||||
path('versions/<int:pk>', views.VersionAPIView.as_view()),
|
||||
path('versions/<int:pk>/export-file', views.export_file),
|
||||
path('rsforms/<int:pk_item>/versions/create', views.create_version),
|
||||
path('rsforms/<int:pk_item>/versions/<int:pk_version>', views.retrieve_version),
|
||||
|
||||
|
|
|
@ -6,6 +6,10 @@ from zipfile import ZipFile
|
|||
from rest_framework.permissions import BasePermission, IsAuthenticated
|
||||
|
||||
|
||||
# Name for JSON inside Exteor files archive
|
||||
EXTEOR_INNER_FILENAME = 'document.json'
|
||||
|
||||
|
||||
class ObjectOwnerOrAdmin(BasePermission):
|
||||
''' Permission for object ownership restriction '''
|
||||
def has_object_permission(self, request, view, obj):
|
||||
|
@ -43,20 +47,20 @@ class ItemOwnerOrAdmin(BasePermission):
|
|||
return False
|
||||
return request.user.is_staff # type: ignore
|
||||
|
||||
def read_trs(file) -> dict:
|
||||
''' Read JSON from TRS file '''
|
||||
with ZipFile(file, 'r') as archive:
|
||||
json_data = archive.read('document.json')
|
||||
def read_zipped_json(data, json_filename: str) -> dict:
|
||||
''' Read JSON from zipped data '''
|
||||
with ZipFile(data, 'r') as archive:
|
||||
json_data = archive.read(json_filename)
|
||||
result: dict = json.loads(json_data)
|
||||
return result
|
||||
|
||||
|
||||
def write_trs(json_data: dict) -> bytes:
|
||||
''' Write json data to TRS file including version info '''
|
||||
def write_zipped_json(json_data: dict, json_filename: str) -> bytes:
|
||||
''' Write json JSON to bytes buffer '''
|
||||
content = BytesIO()
|
||||
data = json.dumps(json_data, indent=4, ensure_ascii=False)
|
||||
with ZipFile(content, 'w') as archive:
|
||||
archive.writestr('document.json', data=data)
|
||||
archive.writestr(json_filename, data=data)
|
||||
return content.getvalue()
|
||||
|
||||
def apply_pattern(text: str, mapping: dict[str, str], pattern: re.Pattern[str]) -> str:
|
||||
|
@ -89,3 +93,11 @@ def fix_old_references(text: str) -> str:
|
|||
pos_input = segment.end(0)
|
||||
output += text[pos_input : len(text)]
|
||||
return output
|
||||
|
||||
def filename_for_schema(alias: str) -> str:
|
||||
''' Generate filename for schema from alias. '''
|
||||
if alias == '' or not alias.isascii():
|
||||
# Note: non-ascii symbols in Content-Disposition
|
||||
# are not supported by some browsers
|
||||
return 'Schema.trs'
|
||||
return alias + '.trs'
|
||||
|
|
|
@ -372,7 +372,7 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
input_serializer.is_valid(raise_exception=True)
|
||||
schema = self._get_schema()
|
||||
load_metadata = input_serializer.validated_data['load_metadata']
|
||||
data = utils.read_trs(request.FILES['file'].file)
|
||||
data = utils.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
|
||||
data['id'] = schema.item.pk
|
||||
|
||||
serializer = s.RSFormTRSSerializer(
|
||||
|
@ -463,16 +463,10 @@ class RSFormViewSet(viewsets.GenericViewSet, generics.ListAPIView, generics.Retr
|
|||
@action(detail=True, methods=['get'], url_path='export-trs')
|
||||
def export_trs(self, request: Request, pk):
|
||||
''' Endpoint: Download Exteor compatible file. '''
|
||||
schema = s.RSFormTRSSerializer(self._get_schema()).data
|
||||
trs = utils.write_trs(schema)
|
||||
filename = self._get_schema().item.alias
|
||||
if filename == '' or not filename.isascii():
|
||||
# Note: non-ascii symbols in Content-Disposition
|
||||
# are not supported by some browsers
|
||||
filename = 'Schema'
|
||||
filename += '.trs'
|
||||
|
||||
response = HttpResponse(trs, content_type='application/zip')
|
||||
data = s.RSFormTRSSerializer(self._get_schema()).data
|
||||
file = utils.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
|
||||
filename = utils.filename_for_schema(self._get_schema().item.alias)
|
||||
response = HttpResponse(file, content_type='application/zip')
|
||||
response['Content-Disposition'] = f'attachment; filename={filename}'
|
||||
return response
|
||||
|
||||
|
@ -489,7 +483,7 @@ class TrsImportView(views.APIView):
|
|||
responses={c.HTTP_201_CREATED: s.LibraryItemSerializer}
|
||||
)
|
||||
def post(self, request: Request):
|
||||
data = utils.read_trs(request.FILES['file'].file)
|
||||
data = utils.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
|
||||
owner = cast(m.User, self.request.user)
|
||||
_prepare_rsform_data(data, request, owner)
|
||||
serializer = s.RSFormTRSSerializer(
|
||||
|
@ -527,7 +521,7 @@ def create_rsform(request: Request):
|
|||
is_canonical=serializer.validated_data.get('is_canonical', False),
|
||||
)
|
||||
else:
|
||||
data = utils.read_trs(request.FILES['file'].file)
|
||||
data = utils.read_zipped_json(request.FILES['file'].file, utils.EXTEOR_INNER_FILENAME)
|
||||
_prepare_rsform_data(data, request, owner)
|
||||
serializer_rsform = s.RSFormTRSSerializer(data=data, context={'load_meta': True})
|
||||
serializer_rsform.is_valid(raise_exception=True)
|
||||
|
@ -623,13 +617,37 @@ def retrieve_version(request: Request, pk_item: int, pk_version: int):
|
|||
if version.item != item:
|
||||
return Response(status=c.HTTP_404_NOT_FOUND)
|
||||
|
||||
data = s.RSFormSerializer(item).from_versioned_data(version.pk, version.data)
|
||||
data = s.RSFormParseSerializer(item).from_versioned_data(version.pk, version.data)
|
||||
return Response(
|
||||
status=c.HTTP_200_OK,
|
||||
data=data
|
||||
)
|
||||
|
||||
|
||||
@extend_schema(
|
||||
summary='export versioned data as file',
|
||||
tags=['Versions'],
|
||||
request=None,
|
||||
responses={
|
||||
(c.HTTP_200_OK, 'application/zip'): bytes,
|
||||
c.HTTP_404_NOT_FOUND: None
|
||||
}
|
||||
)
|
||||
@api_view(['GET'])
|
||||
def export_file(request: Request, pk: int):
|
||||
''' Endpoint: Download Exteor compatible file for versioned data. '''
|
||||
try:
|
||||
version = m.Version.objects.get(pk=pk)
|
||||
except m.Version.DoesNotExist:
|
||||
return Response(status=c.HTTP_404_NOT_FOUND)
|
||||
data = s.RSFormTRSSerializer(m.RSForm(version.item)).from_versioned_data(version.data)
|
||||
file = utils.write_zipped_json(data, utils.EXTEOR_INNER_FILENAME)
|
||||
filename = utils.filename_for_schema(data['alias'])
|
||||
response = HttpResponse(file, content_type='application/zip')
|
||||
response['Content-Disposition'] = f'attachment; filename={filename}'
|
||||
return response
|
||||
|
||||
|
||||
@extend_schema(
|
||||
summary='RS expression into Syntax Tree',
|
||||
tags=['FormalLanguage'],
|
||||
|
|
|
@ -11,7 +11,7 @@ def load_initial_schemas(apps, schema_editor):
|
|||
rootdir = os.path.join(os.getcwd(), 'data')
|
||||
for subdir, dirs, files in os.walk(rootdir):
|
||||
for file in files:
|
||||
data = utils.read_trs(os.path.join(subdir, file))
|
||||
data = utils.read_zipped_json(os.path.join(subdir, file))
|
||||
data['is_common'] = True
|
||||
data['is_canonical'] = True
|
||||
serializer = RSFormTRSSerializer(data=data, context={'load_meta': True})
|
||||
|
|
Loading…
Reference in New Issue
Block a user