BRE/webapi/portal/uploader.py
2024-06-07 19:50:21 +03:00

947 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Uploading data to rk.greatbook.ru"""
import time
import logging
import random
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
NoSuchElementException,
InvalidElementStateException,
ElementNotInteractableException
)
from selenium.webdriver.remote.webelement import WebElement
from .selenium_wrapper import WebBrowser
from .info_models import InputMethod, FieldType, FilterType
from .config import Config
from .crypto import decrypt_user
def _replace_comma_separators(value: str) -> str:
start = 0
while True:
pos = value.find(", ", start)
if pos == -1 or pos + 2 >= len(value):
return value
if value[pos + 2].isupper():
value = ';'.join([value[:pos], value[pos + 2:]])
start = pos + 1
class GreatbookUploader:
"""Entry point into portal data input automation"""
def __init__(self, browser: WebBrowser, config: Config):
self._impl = _UploaderImpl(browser, config)
self._login_timeout = config['Options'].getint('LoginTimeout')
self._time_login = 0
def __del__(self):
pass
def login(self) -> bool:
'''Login into Portal'''
if not self._impl.login():
return False
self._time_login = time.time()
return True
def logout(self):
'''Logout from current user'''
self._impl.logout()
def content_exists(self, content: str) -> bool:
'''Check if content name is taken'''
self._ensure_login()
return self._impl.content_exists(content)
def content_has_text(self, content: str) -> bool:
'''Warning! Requires to run content_exists first'''
return self._impl.content_has_text(content)
def create_task(self, data: dict) -> bool:
'''Create task in system. Returns empty string on error'''
self._ensure_login()
return self._impl.create_task(data)
def find_task_id(self, task_name: str) -> str:
'''Find task ID or empty string if not found'''
self._ensure_login()
return self._impl.find_task_id(task_name)
def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool:
'''Change task status. Can fail (return false) if status is unavailable'''
self._ensure_login()
return self._impl.set_task_status(task_id, new_status, status_label)
def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
'''Create metadata. Also changes status for new content'''
self._ensure_login()
return self._impl.fill_metadata(task_id, data, attributes)
def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
'''Update metadata'''
self._ensure_login()
return self._impl.update_metadata(task_id, data, attributes)
def get_tasks_data(self, filters: list[list]) -> list:
'''Export data for tasks matching task filter and department filter'''
self._ensure_login()
return self._impl.get_tasks_data(filters)
def get_tasks_content(self, task_ids: list[str]) -> list:
'''Export content for specific task ids'''
self._ensure_login()
return self._impl.get_tasks_content(task_ids)
def _ensure_login(self):
if time.time() - self._time_login < self._login_timeout:
return
logging.info('Relogging at %s', time.strftime('%H:%M:%S'))
self.logout()
self.login()
def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool:
'''Loads content'''
self._ensure_login()
return self._impl.load_content(content, authors, text, bibliography)
class _UploaderImpl:
"""Implementation of portal data input automation"""
_CONTEXT_CONTENT = 'CRUD EDITOR'
_CONTEXT_SEARCH = 'SEARCH'
_CONTENT_OMITTED = 'Заполните обязательный признак'
_XP_PREFIX_DIALOG = '//span[contains(text(),"{0}")]/parent::*/parent::*/descendant::'
_XP_PREFIX_CONTENT = '//section[@class="article-CRUD__widget"]/descendant::'
_XP_NOTIFICATION_UPDATE_OK = '//*[contains(text(),"{0} успешно отредактирован")]'
_PAGE_PORTAL = 'https://rk.greatbook.ru'
_PAGE_LOGIN = _PAGE_PORTAL
_PAGE_TASK = _PAGE_PORTAL + '/tasks/kanban/all'
_PAGE_CONTENT = _PAGE_PORTAL + '/glossary/all'
_PAGE_TASK_ID = _PAGE_PORTAL + '/tasks/{0}'
_PAGE_CONTENT_BY_TASK = _PAGE_PORTAL + '/widgets?link=task&id={0}'
_XP_LOGIN_TEST = 'app-login__right-container'
_XP_MAIN_ADD_BUTTON = '//button[@class="el-tooltip button-input-add primary"]'
_XP_LIST_ITEM_CONTAINS = '//li[contains(text(),"{0}")]'
_XP_SPAN_CONTAINS = '//span[contains(text(),"{0}")]'
_XP_SPAN_EXACT_MATCH = '//span[text()=" {0} " or text()="{0}"]'
_XP_BUTTON_CONTAINS = '//button[contains(text(),"{0}")]'
_XP_TABLE_ROW = '//div[@class="table__row"]'
_XP_TABLE_CELL = '(descendant::div[@class="table__row-cell sortable"])[{0}]/child::*/child::*'
_XP_TABLE_ROW_EXPAND = 'ancestor::div[@class="table__row-content"]/child::div[@class="table__row-expand"]'
_XP_NOTIFICATION_CLOSE = '//button[@class="notifications-layout__item-close"]'
_XP_NEW_TASK_BTN = '//div[@class="sidebar-tasks"]/button'
_XP_SPAN_STATUS = '//span[@class="task-form__row-status"]'
_XP_LOGIN_USER = '//input[@placeholder="Имя пользователя"]'
_XP_LOGIN_PW = '//input[@placeholder="Пароль"]'
_XP_LOGIN_BTN = '//button[@type="button"]'
_XP_USER_PROFILE = '//div[@class="el-tooltip top-bar__user-profile"]'
_XP_USER_EXIT = '//div[contains(text(),"Выйти")]'
_XP_TASKS_NEXT_BTN = '//i[@class="el-icon el-icon-arrow-right"]'
_XP_TASKS_PAGER = '//ul[@class="el-pager"]'
_XP_TASK_SELECT_STEP = '//span[text()="Выбрать шаг"]/parent::button'
_XP_TASK_CREATE_SUB = '//button[contains(text(),"Создать подзадачу")]'
_XP_TASK_DATA = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::span'
_XP_TASK_INPUT = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::input'
_XP_TASK_CONTENT_URL = '//a[text()="{0}"]'
_XP_TASK_RADIO_BTNS = '//span[@class="el-radio__inner"]'
_XP_TASK_PAGINATION_CONTROL = '//span[@class="el-pagination__sizes"]/descendant::input'
_XP_INPUT_ACCEPT = 'ancestor::div[@class="deferred-form-input__edit edit-mode"]/descendant::button[@class="button-with-icon-24x24 button-toggle"]'
_XP_DIALOG_LABEL = _XP_PREFIX_DIALOG + 'label[text()=" {1} " or text()="{1}"]'
_XP_DIALOG_INPUT = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::input'
_XP_DIALOG_COMBO = _XP_DIALOG_LABEL + '/following-sibling::div/child::div/child::div'
_XP_DIALOG_TEXT_AREA = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::textarea'
_XP_DIALOG_SWITCH = _XP_PREFIX_DIALOG + 'div[@id="{1}"]'
_XP_DIALOG_BTN_SELECT = '//button[contains(text(),"Выбрать")]'
_XP_DIALOG_BTN_CLEAN = '(//span[@class="tasks-filters-dialog__footer"]/button)[2]'
_XP_DIALOG_BTN_OK = '(//span[@class="tasks-filters-dialog__footer"]/button)[3]'
_XP_DIALOG_BTN_PRIMARY = _XP_PREFIX_DIALOG + 'button[@class="button-dialog button-type-primary"]'
_XP_DIALOG_BTN_CANCEL = _XP_PREFIX_DIALOG + 'button[@aria-label="Close"]'
_XP_CONTENT_LABEL = _XP_PREFIX_CONTENT + 'label[text()=" {0} " or text()="{0}"]'
_XP_CONTENT_INPUT = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::input'
_XP_CONTENT_COMBO = _XP_CONTENT_LABEL + '/following-sibling::div/child::div/child::div'
_XP_CONTENT_TEXT_AREA = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::textarea'
_XP_CONTENT_SWITCH = _XP_PREFIX_CONTENT + 'div[@id="{0}"]'
_XP_CONTENT_BTN_PRIMARY = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]'
_XP_CONTENT_BTN_CANCEL = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]'
_XP_CONTENT_BTN_EDIT = '//button[text()="Редактировать" or text()=" Редактировать "]'
_XP_CONTENT_IS_EDITING = '//*[contains(text(),"атрибуты и признаки")]'
_XP_CONTENT_CHAPTER = '//p[contains(text(), "Раздел")]'
_XP_CONTENT_VIEW_LABEL = '//span[@class="view-section__label" and text()="{0}:"]'
_XP_CONTENT_VIEW_VALUE = _XP_CONTENT_VIEW_LABEL + '/following-sibling::span/span'
_XP_CONTENT_TITLE = '//div[@class="top-bar__title-col flex"]/h2[text()="Редактор"]'
_XP_SEARCH_INPUT = '//label[contains(text(),"{0}")]/following-sibling::div/descendant::input'
_XP_SCROLLER_ITEM = '//div[text()=" {0} " or text()="{0}" or text()=" {0} "]'
_XP_SCROLLER_CHECKBOX = 'preceding-sibling::div[@class="tree__node-checkbox"]/label[@class="el-checkbox"]'
_XP_LIST_ITEM = '//li[.="{0}" or .=" {0} "]'
_XP_RELATIVE_BTN_CLOSE = 'descendant::button[@class="catalogs-block__close icon-close-tags"]'
_XP_RELATIVE_BTN_PLUS = 'following-sibling::button[@class="button-input-add"]'
_XP_RELATIVE_OUTPUT_END = 'following-sibling::input[@placeholder="Конец"]'
_XP_SEARCH_QUERY = '//input[@placeholder="Поиск"]'
_XP_SEARCH_BAR = '//div[@class="el-input el-input-group el-input-group--append el-input--suffix"]/input'
_XP_SEARCH_BTN = '//button[@slot="append"]'
_XP_SEARCH_OPTIONS = '//button[@class="tasks-filters__search-btn button-graphic"]'
_XP_SEARCH_NO_RESULTS = '//div[@class="search-list__no-results"]'
_XP_TASKS_VIEW_LIST = '(//button[@class="el-tooltip button-graphic"])[3]'
_XP_SEARCH_MAIN_HREF = '(//span[@class="parent-placeholder el-tooltip"]/parent::*/parent::*/preceding-sibling::*)[5]/descendant::a'
_XP_DIV_TAB = '//p[contains(text(),"{0}")]/parent::*/parent::*'
_XP_CONTENT_BTN_EDIT_TEXT = '//button[contains(text(),"Редактировать раздел")]'
_XP_EDITOR_TEXT_AREA = '//*[@class="ProseMirror"]'
def __init__(self, browser: WebBrowser, config: Config):
self.browser = browser
self._config = config
self._output_context = ''
self._test_run = self._config['Options'].getboolean('TestRun')
def __del__(self):
pass
def login(self) -> bool:
'''Login into Portal'''
self.browser.driver.get(self._PAGE_LOGIN)
if not self.browser.wait_presence(self._XP_LOGIN_TEST, By.CLASS_NAME):
return False
user = self._config['UserData']['UserLogin']
password = decrypt_user(user, self._config['UserData']['Password'], self._config['AppData']['Password'])
self.browser.by_xpath(self._XP_LOGIN_USER).send_keys(user)
self.browser.by_xpath(self._XP_LOGIN_PW).send_keys(password)
self.browser.by_xpath(self._XP_LOGIN_BTN).click()
return self._wait_loading()
def logout(self):
'''Logout from current user'''
try:
self.browser.by_xpath(self._XP_USER_PROFILE).click()
self.browser.by_xpath(self._XP_USER_EXIT).click()
self.browser.wait_ajax()
except NoSuchElementException:
pass
def content_exists(self, content: str) -> bool:
'''Check if content name is taken'''
self._load_page(self._PAGE_CONTENT)
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
search.clear()
search.send_keys(content)
search.send_keys(Keys.RETURN)
self.browser.wait_ajax()
results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content))
return len(results) > 0
def content_has_text(self, content: str) -> bool:
'''Check if found content has attached text'''
content_cell = self.browser.by_xpath(self._XP_SPAN_EXACT_MATCH.format(content))
return len(content_cell.find_elements(By.XPATH, self._XP_TABLE_ROW_EXPAND)) > 0
def create_task(self, data: dict) -> bool:
'''Create task in system. Returns empty string on error'''
if not self._open_new_task_form():
return ''
self._output_context = 'Создание задачи'
for field in FieldType:
if field in data:
if field in data:
self._output_field(field, data[field])
self._finalize_output()
self._wait_dialog_done()
return self.find_task_id(data[FieldType.task_name])
def find_task_id(self, task_name: str) -> str:
'''Find task ID or empty string if not found'''
self._load_page(self._PAGE_TASK)
self._list_view()
self._search_task_name(task_name)
for row in self.browser.by_xpath_m(self._XP_TABLE_ROW):
found_name = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3)).text
if found_name != task_name:
continue
parent = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(4)).text
if parent != 'Нет':
continue
status = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(10)).text
if status == 'Отменена':
continue
reference = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3))
return reference.get_attribute('href').split('/')[-1]
return ''
def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool:
'''Change task status. Can fail (return false) if status is unavailable'''
if task_id != '':
self._load_page(self._PAGE_TASK_ID.format(task_id))
self.browser.by_xpath(self._XP_TASK_SELECT_STEP).click()
self.browser.wait_ajax()
status_element = self.browser.by_xpath_m(self._XP_LIST_ITEM_CONTAINS.format(new_status))
if len(status_element) == 0:
return False
if not self._test_run:
status_element[0].click()
time.sleep(self._config['Options'].getint('WaitStatus'))
self.browser.driver.refresh()
self._wait_loading()
self._ensure_no_notifications()
status = self._get_task_status()
if status != status_label:
return False
return True
def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
'''Create metadata. Also changes status for new content'''
if not self._load_task_page(task_id, data[FieldType.task_name]):
return False
if self._has_content():
return False
is_article = data[FieldType.task_type] in ['Статья', 'Актуализация статьи из ЭВ БРЭ']
status = self._get_task_status()
if status == 'Новая':
status = 'Заполнение метаданных статьи' if is_article else 'Написание контента'
status_label = status
if not self.set_task_status('', status, status_label):
return False
if not self._create_content(data, attributes) or not self._wait_loading():
return False
if not self._has_bibliography():
self._create_bibliography_task()
return self._progress_metedata(is_article)
def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
'''Update metadata'''
if not self._load_task_page(task_id, data[FieldType.task_name]):
return False
content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db]))
if len(content) != 1:
return False
self._load_page(content[0].get_attribute('href'))
return self._update_content(data, attributes)
def get_tasks_data(self, filters: list[list]) -> list:
'''Export data for tasks matching task filter and department filter'''
self._load_page(self._PAGE_TASK)
self._list_view()
self.browser.by_xpath(self._XP_TASK_PAGINATION_CONTROL).click()
self.browser.wait_ajax()
self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('50 на странице')).click()
self.browser.wait_ajax()
if not self._search_all_tasks(filters):
return []
else:
return self._read_all_tasks()
def get_tasks_content(self, task_ids: list[str]) -> list[list[str]]:
'''Export content for specific task ids'''
result = []
for task_id in task_ids:
try:
self._load_page(self._PAGE_CONTENT_BY_TASK.format(task_id))
self.browser.wait_ajax()
title = self.browser.by_xpath_m(self._XP_CONTENT_TITLE)
if len(title) == 0:
continue
result.append(self._read_content(task_id))
self._load_page(self._PAGE_TASK_ID.format(task_id))
self.browser.wait_presence(self._XP_TASK_DATA.format(FieldType.task_manager.to_label()), By.XPATH)
editor = self.browser.by_xpath(self._XP_TASK_DATA.format(FieldType.task_manager.to_label())).text
result[-1].append(editor)
except Exception as e:
logging.info('Exception while reading content %s\n%s', task_id, e)
return result
def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool:
'''Update metadata
if not self._load_task_page(task_id, data[FieldType.task_name]):
return False
content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db]))
if len(content) != 1:
return False
self._load_page(content[0].get_attribute('href'))
return self._update_content(data, attributes)'''
self._load_page(self._PAGE_CONTENT)
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
search.clear()
search.send_keys(content)
search.send_keys(Keys.RETURN)
self._wait_loading()
results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content))
if len(results) != 1:
return False
results[0].click()
if not self._wait_loading():
return False
self.browser.wait_ajax()
main_tab = self.browser.by_xpath(self._XP_DIV_TAB.format('Раздел'))
main_tab.click()
self.browser.wait_ajax()
if len(authors) != 0:
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click()
self._wait_loading()
self._add_attrbute("Автор (на портале)")
self._output_field(FieldType.author, authors)
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method
else:
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click()
self._wait_loading()
# field = self.browser.by_xpath('//label[text()=" Аффилированные организации " or text()="Аффилированные организации"]/parent::*//span[contains(text(),"Добавьте")]')
# if len(field) != 0:
# self._output_field(FieldType.affiliated_organizations, "Редакция технологий и техники")
self._add_attrbute("Аффилированные организации")
self._output_item("Аффилированные организации", InputMethod.combo_dialog, ["Редакция технологий и техники"]) # remove feature
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Отменить")).click() # test method
# parse data_reader module
# разобрать на методы заведение авторов, библиографии, текст
if text != '':
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT_TEXT).click()
self.browser.wait_ajax()
textarea = self.browser.by_xpath(self._XP_EDITOR_TEXT_AREA)
textarea.click()
textarea.send_keys(text)
# self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click()
return True
def _load_page(self, page_url: str):
self.browser.driver.get(page_url)
if not self._wait_loading():
raise BaseException(f"Cannot load page {page_url}")
if not self._login_required():
return
if not self.login():
raise BaseException("Cannot login")
self.browser.driver.get(page_url)
if not self._wait_loading():
raise BaseException(f"Cannot load page {page_url}")
def _login_required(self) -> bool:
return len(self.browser.driver.find_elements(By.CLASS_NAME, self._XP_LOGIN_TEST)) != 0
def _has_content(self) -> bool:
return len(self.browser.by_xpath_m(self._XP_BUTTON_CONTAINS.format('+ Создать'))) == 0
def _is_content_editing(self) -> bool:
elems = self.browser.by_xpath_m(self._XP_CONTENT_IS_EDITING)
return len(elems) > 0
def _has_bibliography(self):
return len(self.browser.by_xpath_m(self._XP_SPAN_CONTAINS.format('Раздел библиография'))) != 0
def _get_task_status(self) -> str:
return self.browser.by_xpath(self._XP_SPAN_STATUS).text
def _wait_dialog_done(self):
time.sleep(self._config['Options'].getint('WaitData'))
def _wait_loading(self) -> bool:
if not self.browser.wait_presence(self._XP_MAIN_ADD_BUTTON, By.XPATH):
return False
self.browser.wait_ajax()
return True
def _ensure_no_notifications(self):
while True:
close_btns = self.browser.by_xpath_m(self._XP_NOTIFICATION_CLOSE)
if len(close_btns) == 0:
return
try:
close_btns[0].click()
except ElementNotInteractableException:
pass
self.browser.wait_ajax()
def _list_view(self):
self.browser.by_xpath(self._XP_TASKS_VIEW_LIST).click()
self.browser.wait_ajax()
def _load_task_page(self, task_id: str, task_name: str) -> bool:
self._load_page(self._PAGE_TASK_ID.format(task_id))
task_header = self.browser.by_xpath('//h3').text
return task_header == task_name
def _search_task_name(self, task_name: str):
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
search.clear()
search.click()
search.send_keys(task_name)
options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS)
options.click()
self.browser.wait_ajax()
self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH)
self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click()
self.browser.wait_ajax()
self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('Точное совпадение')).click()
self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click()
self.browser.wait_ajax()
def _search_all_tasks(self, filters: list[list]):
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
search.clear()
options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS)
options.click()
self.browser.wait_ajax()
self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH)
self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click()
self.browser.wait_ajax()
self._output_context = self._CONTEXT_SEARCH
for filter_id in range(FilterType.task_type, FilterType.observer):
label = FilterType(filter_id).to_label()
for value in filters[filter_id]:
if value != '':
self._output_combobox(label, value)
if filters[FilterType.created_begin] != '':
self._output_date_range(FilterType.created_begin.to_label(), filters[FilterType.created_begin], filters[FilterType.created_end])
if filters[FilterType.target_begin] != '':
self._output_date_range(FilterType.target_begin.to_label(), filters[FilterType.target_begin], filters[FilterType.target_end])
self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click()
self.browser.wait_ajax()
no_result = self.browser.by_xpath(self._XP_SEARCH_NO_RESULTS)
return not no_result.is_displayed()
def _read_all_tasks(self) -> list:
result = []
next_btn = self.browser.by_xpath(self._XP_TASKS_NEXT_BTN)
count_pages = int(self.browser.by_xpath(self._XP_TASKS_PAGER).find_elements(By.XPATH, 'descendant::li')[-1].text)
for _ in range(count_pages):
result += self._scan_tasks_table()
next_btn.click()
self.browser.wait_ajax()
time.sleep(random.randint(0, 100)/100)
self._ensure_no_notifications()
return result
def _scan_tasks_table(self) -> list:
result = []
for row in self.browser.by_xpath_m(self._XP_TABLE_ROW):
task_type = self._read_cell(row, 2).text
task_name = self._read_cell(row, 3).text
task_id = self._read_cell(row, 3).get_attribute('href').split('/')[-1]
parent = self._read_cell(row, 4)
if parent.text != 'Нет':
parent_id = parent.find_element(By.XPATH, 'descendant::a').get_attribute('href').split('/')[-1]
else:
parent_id = 'Нет'
content_name = self._read_cell(row, 6).text
target_date = self._read_cell(row, 7).text
supervisor = self._read_cell(row, 8).text
worker = self._read_cell(row, 9).text
status = self._read_cell(row, 10).text
result.append([task_type, status, content_name, supervisor, worker, target_date, task_id, task_name, parent_id])
return result
def _read_content(self, task_id: str) -> list:
editorial = self._read_field(FieldType.editorial)
if editorial.startswith('Редакция '):
editorial = editorial[len('Редакция '):]
result = [
task_id,
self._read_field(FieldType.biblio_name),
self._read_field(FieldType.change_score),
editorial,
self._read_field(FieldType.responsible),
self._read_field(FieldType.definition),
self._read_field(FieldType.object_type),
self._read_field(FieldType.markers),
self._read_field(FieldType.tags),
self._read_field(FieldType.source),
self._read_field(FieldType.electron_bre),
self._read_field(FieldType.main_page),
self._read_field(FieldType.is_general),
self._read_field(FieldType.actualize_period),
self._read_field(FieldType.age_restriction)
]
chapters = self.browser.by_xpath_m(self._XP_CONTENT_CHAPTER)
if len(chapters) == 0:
author = ''
else:
chapters[0].click()
self.browser.wait_ajax()
author = self._read_field(FieldType.author)
result.append(author)
return result
def _create_content(self, data: dict, attributes: list) -> bool:
self._ensure_no_notifications()
if not self.browser.wait_clickable(self._XP_BUTTON_CONTAINS.format('+ Создать'), By.XPATH):
return False
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('+ Создать')).click()
self.browser.wait_ajax()
self._output_context = 'Создать контент'
self._output_content_data(data, attributes)
self._wait_dialog_done()
if self._test_run:
return True
return self._has_content()
def _update_content(self, data: dict, attributes: list) -> bool:
''' updates meta-data '''
btn_edit = self.browser.by_xpath_m(self._XP_CONTENT_BTN_EDIT)
if len(btn_edit) != 1:
logging.warning('Не доступна кнопка Редактировать на странице контента')
return False
btn_edit[0].click()
if not self._wait_loading() or not self._is_content_editing():
return False
self._output_context = self._CONTEXT_CONTENT
self._output_content_data(data, attributes)
notifications = self.browser.by_xpath_m(self._XP_NOTIFICATION_UPDATE_OK.format(data[FieldType.content_name_db]))
return len(notifications) > 0
def _create_bibliography_task(self):
self._ensure_no_notifications()
executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.executor.to_label()))[0].text
supervisor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text
department = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.department.to_label()))[0].text
date_target = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.date_target.to_label()))[0].text
self._output_context = 'Создание подзадачи'
self.browser.by_xpath(self._XP_TASK_CREATE_SUB).click()
self.browser.by_xpath_m(self._XP_TASK_RADIO_BTNS)[0].click()
self.browser.by_xpath(self._XP_DIALOG_BTN_PRIMARY.format(self._output_context)).click()
self.browser.wait_ajax()
self._output_field(FieldType.task_type, 'Раздел библиография')
self._output_field(FieldType.executor, executor)
self._output_field(FieldType.supervisor, supervisor)
self._output_field(FieldType.department, department)
self._output_field(FieldType.date_target, date_target)
self._finalize_output()
self._wait_dialog_done()
def _progress_metedata(self, is_article: bool) -> bool:
if is_article:
if not self.set_task_status('', 'В службу', 'Ожидание проверки службой "Словник"'):
return False
new_executor = self._config['AppData']['SlovnikExecutor']
else:
if not self.set_task_status('', 'Заведующему редакцией', 'Проверка Заведующим редакцией'):
return False
new_executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text
self._ensure_no_notifications()
self._output_task_combo(FieldType.executor.to_label(), new_executor)
return True
def _read_cell(self, row: WebElement, column: int) -> WebElement:
return row.find_element(By.XPATH, self._XP_TABLE_CELL.format(column))
def _read_field(self, field: FieldType) -> str:
label = field.to_label()
input_method = field.input_method()
return self._read_item(label, input_method)
def _read_item(self, field_label: str, input_method: InputMethod) -> str:
xpath = self._XP_CONTENT_VIEW_VALUE.format(field_label)
retries = 3
value = ''
while True:
elems = self.browser.by_xpath_m(xpath)
if len(elems) == 0:
return ''
value = elems[0].text
if (self._validate_input(value) or retries == 0):
break
retries = retries - 1
time.sleep(0.5)
value = value.replace('\xad', '#')
value = value.replace('\xd7', '#')
if value == self._CONTENT_OMITTED:
return ''
if input_method in [InputMethod.combo_box, InputMethod.combo_dialog, InputMethod.combo_dialog_simple_list]:
value = _replace_comma_separators(value)
return value
def _validate_input(self, value: str) -> bool:
if value == '...':
return False
if value == 'Загрузка...':
return False
return True
def _open_new_task_form(self) -> bool:
self._load_page(self._PAGE_TASK)
self.browser.by_xpath(self._XP_NEW_TASK_BTN).click()
return self.browser.wait_presence(self._XP_DIALOG_BTN_PRIMARY.format('Создание задачи'), By.XPATH)
def _output_content_data(self, data: dict, attributes: list):
for field in FieldType:
if field in data:
self._output_field(field, data[field])
self.browser.wait_ajax()
for attr in attributes:
self._output_attribute(attr[0], attr[1], attr[2])
self.browser.wait_ajax()
self._finalize_output()
self.browser.wait_ajax()
def _finalize_output(self):
self._ensure_no_notifications()
btn_path = self._xpath_finalize_btn(self._output_context)
self.browser.by_xpath(btn_path).click()
self.browser.wait_ajax()
def _output_field(self, field: FieldType, value):
label = field.to_label()
input_method = field.input_method()
self._output_item(label, input_method, value)
def _output_attribute(self, name: str, input_method: InputMethod, value):
if not self._test_output_exists(name):
self._add_attrbute(name)
self._output_item(name, input_method, value)
def _add_attrbute(self, name: str):
self._ensure_no_notifications()
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('Добавить признак')).click()
self.browser.wait_ajax()
self._select_dialog_item(name)
def _output_item(self, field_label: str, input_method: InputMethod, value):
if value == '':
return
if input_method == InputMethod.read_only:
return
if not self._test_output_exists(field_label):
return
if input_method == InputMethod.combo_box:
self._output_combobox(field_label, value)
elif input_method == InputMethod.text_input:
self._output_text_input(field_label, value)
elif input_method == InputMethod.text_area:
self._output_text_area(field_label, value)
elif input_method == InputMethod.date_picker:
self._output_date(field_label, value)
elif input_method == InputMethod.bool_toggle:
self._output_bool(field_label, value)
elif input_method == InputMethod.combo_dialog_simple_list:
self._output_dialog(field_label, value, False)
elif input_method == InputMethod.combo_dialog:
if not isinstance(value, list):
raise "List argument expected. Given: %s" % type(value)
self._output_dialog(field_label, value, True)
def _test_output_exists(self, field_label: str) -> bool:
xpath = self._xpath_label(field_label, self._output_context)
elems = self.browser.by_xpath_m(xpath)
return len(elems) > 0
def _output_bool(self, field_label: str, value: bool):
bool_switch = self.browser.by_xpath(self._xpath_switch(field_label, self._output_context))
current_value = 'is-checked' in bool_switch.get_attribute('class')
if current_value != value:
bool_switch.click()
def _output_text_input(self, field_label: str, value: str):
self._output(self._xpath_input(field_label, self._output_context), value)
def _output_text_area(self, field_label: str, value: str):
self._output(self._xpath_textarea(field_label, self._output_context), value)
def _output_combobox(self, field_label: str, value: str):
if not self._output(self._xpath_input(field_label, self._output_context), value):
return
self.browser.wait_ajax()
list_item = self._find_active_list(value)
list_item.click()
self.browser.wait_ajax()
self.browser.send_esc()
def _output_date_range(self, field_label: str, value1: str, value2: str):
element = self.browser.by_xpath(self._xpath_input(field_label, self._output_context))
element.click()
element.send_keys(value1)
element = element.find_element(By.XPATH, self._XP_RELATIVE_OUTPUT_END)
element.click()
element.send_keys(value2)
self.browser.wait_ajax()
self.browser.send_enter()
def _output_task_combo(self, label: str, value: str):
element = self.browser.by_xpath_m(self._XP_TASK_DATA.format(label))[0]
element.click()
self.browser.wait_ajax()
element = self.browser.by_xpath_m(self._XP_TASK_INPUT.format(label))[0]
element.send_keys(value)
self.browser.wait_ajax()
list_item = self._find_active_list(value)
list_item.click()
self.browser.wait_ajax()
accept_btn = element.find_element(By.XPATH, self._XP_INPUT_ACCEPT)
if not self._test_run:
accept_btn.click()
self.browser.wait_ajax()
def _output_date(self, field_label: str, value: str):
self._output(self._xpath_input(field_label, self._output_context), value)
def _output_dialog(self, field_label, values: list[str], search_flag=True):
self._ensure_no_notifications()
item = self.browser.by_xpath(self._xpath_combo(field_label, self._output_context))
remove_btns = item.find_elements(By.XPATH, self._XP_RELATIVE_BTN_CLOSE)
for existing_item in remove_btns:
existing_item.click()
btn_plus = item.find_element(By.XPATH, self._XP_RELATIVE_BTN_PLUS)
for value in values:
btn_plus.click()
self.browser.wait_ajax()
self._select_dialog_item(value, search_flag)
def _output(self, xpath: str, value: str) -> bool:
element = self.browser.by_xpath(xpath)
element.click()
try:
element.clear()
except InvalidElementStateException:
pass
element.send_keys(value)
return True
def _find_active_list(self, value: str):
list_items = self.browser.by_xpath_m(self._XP_LIST_ITEM.format(value))
count_active = sum(1 for x in list_items if x.is_displayed())
if count_active > 1:
logging.info('%s multiple list results', value)
for list_item in list_items:
if list_item.is_displayed():
return list_item
logging.info('Cannot find list item: %s', value)
return None
def _find_active_scroller(self, value: str):
list_items = self.browser.by_xpath_m(self._XP_SCROLLER_ITEM.format(value))
count_active = sum(1 for x in list_items if x.is_displayed())
if count_active == 0:
logging.warning('Cannot find checkbox list item: %s', value)
raise NoSuchElementException
if count_active > 1:
logging.info('%s multiple list results', value)
for list_item in list_items:
checkbox = list_item.find_elements(By.XPATH, self._XP_SCROLLER_CHECKBOX)
if len(checkbox) > 0 and list_item.is_displayed():
return list_item
return None
def _select_dialog_item(self, value: str, search_flag=True):
if search_flag:
search = self.browser.by_xpath(self._XP_SEARCH_QUERY)
search.send_keys(value)
search.send_keys(Keys.ENTER)
self.browser.wait_ajax()
time.sleep(1) # additional wait for list to load
list_item = self._find_active_scroller(value)
list_item.click()
self.browser.by_xpath(self._XP_DIALOG_BTN_SELECT).click()
self.browser.wait_ajax()
def _xpath_label(self, field_label: str, context: str) -> str:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_LABEL.format(field_label)
else:
return self._XP_DIALOG_LABEL.format(context, field_label)
def _xpath_input(self, field_label: str, context: str) -> str:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_INPUT.format(field_label)
elif context == self._CONTEXT_SEARCH:
return self._XP_SEARCH_INPUT.format(field_label)
else:
return self._XP_DIALOG_INPUT.format(context, field_label)
def _xpath_textarea(self, field_label: str, context: str) -> str:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_TEXT_AREA.format(field_label)
else:
return self._XP_DIALOG_TEXT_AREA.format(context, field_label)
def _xpath_combo(self, field_label: str, context: str) -> str:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_COMBO.format(field_label)
else:
return self._XP_DIALOG_COMBO.format(context, field_label)
def _xpath_switch(self, field_label: str, context: str) -> str:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_SWITCH.format(field_label)
else:
return self._XP_DIALOG_SWITCH.format(context, field_label)
def _xpath_finalize_btn(self, context: str) -> str:
if not self._test_run:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_BTN_PRIMARY
else:
return self._XP_DIALOG_BTN_PRIMARY.format(context)
else:
if context == self._CONTEXT_CONTENT:
return self._XP_CONTENT_BTN_CANCEL
else:
return self._XP_DIALOG_BTN_CANCEL.format(context)