947 lines
41 KiB
Python
947 lines
41 KiB
Python
![]() |
"""Uploading data to rk.greatbook.ru"""
|
|||
|
import time
|
|||
|
import logging
|
|||
|
import random
|
|||
|
|
|||
|
from selenium.webdriver.common.keys import Keys
|
|||
|
from selenium.webdriver.common.by import By
|
|||
|
from selenium.common.exceptions import (
|
|||
|
NoSuchElementException,
|
|||
|
InvalidElementStateException,
|
|||
|
ElementNotInteractableException
|
|||
|
)
|
|||
|
from selenium.webdriver.remote.webelement import WebElement
|
|||
|
|
|||
|
from .selenium_wrapper import WebBrowser
|
|||
|
from .info_models import InputMethod, FieldType, FilterType
|
|||
|
from .config import Config
|
|||
|
from .crypto import decrypt_user
|
|||
|
|
|||
|
|
|||
|
def _replace_comma_separators(value: str) -> str:
|
|||
|
start = 0
|
|||
|
while True:
|
|||
|
pos = value.find(", ", start)
|
|||
|
if pos == -1 or pos + 2 >= len(value):
|
|||
|
return value
|
|||
|
if value[pos + 2].isupper():
|
|||
|
value = ';'.join([value[:pos], value[pos + 2:]])
|
|||
|
start = pos + 1
|
|||
|
|
|||
|
|
|||
|
class GreatbookUploader:
|
|||
|
"""Entry point into portal data input automation"""
|
|||
|
def __init__(self, browser: WebBrowser, config: Config):
|
|||
|
self._impl = _UploaderImpl(browser, config)
|
|||
|
self._login_timeout = config['Options'].getint('LoginTimeout')
|
|||
|
self._time_login = 0
|
|||
|
|
|||
|
def __del__(self):
|
|||
|
pass
|
|||
|
|
|||
|
def login(self) -> bool:
|
|||
|
'''Login into Portal'''
|
|||
|
if not self._impl.login():
|
|||
|
return False
|
|||
|
self._time_login = time.time()
|
|||
|
return True
|
|||
|
|
|||
|
def logout(self):
|
|||
|
'''Logout from current user'''
|
|||
|
self._impl.logout()
|
|||
|
|
|||
|
def content_exists(self, content: str) -> bool:
|
|||
|
'''Check if content name is taken'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.content_exists(content)
|
|||
|
|
|||
|
def content_has_text(self, content: str) -> bool:
|
|||
|
'''Warning! Requires to run content_exists first'''
|
|||
|
return self._impl.content_has_text(content)
|
|||
|
|
|||
|
def create_task(self, data: dict) -> bool:
|
|||
|
'''Create task in system. Returns empty string on error'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.create_task(data)
|
|||
|
|
|||
|
def find_task_id(self, task_name: str) -> str:
|
|||
|
'''Find task ID or empty string if not found'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.find_task_id(task_name)
|
|||
|
|
|||
|
def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool:
|
|||
|
'''Change task status. Can fail (return false) if status is unavailable'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.set_task_status(task_id, new_status, status_label)
|
|||
|
|
|||
|
def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
|
|||
|
'''Create metadata. Also changes status for new content'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.fill_metadata(task_id, data, attributes)
|
|||
|
|
|||
|
def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
|
|||
|
'''Update metadata'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.update_metadata(task_id, data, attributes)
|
|||
|
|
|||
|
def get_tasks_data(self, filters: list[list]) -> list:
|
|||
|
'''Export data for tasks matching task filter and department filter'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.get_tasks_data(filters)
|
|||
|
|
|||
|
def get_tasks_content(self, task_ids: list[str]) -> list:
|
|||
|
'''Export content for specific task ids'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.get_tasks_content(task_ids)
|
|||
|
|
|||
|
def _ensure_login(self):
|
|||
|
if time.time() - self._time_login < self._login_timeout:
|
|||
|
return
|
|||
|
logging.info('Relogging at %s', time.strftime('%H:%M:%S'))
|
|||
|
self.logout()
|
|||
|
self.login()
|
|||
|
|
|||
|
def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool:
|
|||
|
'''Loads content'''
|
|||
|
self._ensure_login()
|
|||
|
return self._impl.load_content(content, authors, text, bibliography)
|
|||
|
|
|||
|
|
|||
|
class _UploaderImpl:
|
|||
|
"""Implementation of portal data input automation"""
|
|||
|
_CONTEXT_CONTENT = 'CRUD EDITOR'
|
|||
|
_CONTEXT_SEARCH = 'SEARCH'
|
|||
|
_CONTENT_OMITTED = 'Заполните обязательный признак'
|
|||
|
|
|||
|
_XP_PREFIX_DIALOG = '//span[contains(text(),"{0}")]/parent::*/parent::*/descendant::'
|
|||
|
_XP_PREFIX_CONTENT = '//section[@class="article-CRUD__widget"]/descendant::'
|
|||
|
|
|||
|
_XP_NOTIFICATION_UPDATE_OK = '//*[contains(text(),"{0} успешно отредактирован")]'
|
|||
|
|
|||
|
_PAGE_PORTAL = 'https://rk.greatbook.ru'
|
|||
|
_PAGE_LOGIN = _PAGE_PORTAL
|
|||
|
_PAGE_TASK = _PAGE_PORTAL + '/tasks/kanban/all'
|
|||
|
_PAGE_CONTENT = _PAGE_PORTAL + '/glossary/all'
|
|||
|
_PAGE_TASK_ID = _PAGE_PORTAL + '/tasks/{0}'
|
|||
|
_PAGE_CONTENT_BY_TASK = _PAGE_PORTAL + '/widgets?link=task&id={0}'
|
|||
|
|
|||
|
_XP_LOGIN_TEST = 'app-login__right-container'
|
|||
|
_XP_MAIN_ADD_BUTTON = '//button[@class="el-tooltip button-input-add primary"]'
|
|||
|
|
|||
|
_XP_LIST_ITEM_CONTAINS = '//li[contains(text(),"{0}")]'
|
|||
|
_XP_SPAN_CONTAINS = '//span[contains(text(),"{0}")]'
|
|||
|
_XP_SPAN_EXACT_MATCH = '//span[text()=" {0} " or text()="{0}"]'
|
|||
|
_XP_BUTTON_CONTAINS = '//button[contains(text(),"{0}")]'
|
|||
|
|
|||
|
_XP_TABLE_ROW = '//div[@class="table__row"]'
|
|||
|
_XP_TABLE_CELL = '(descendant::div[@class="table__row-cell sortable"])[{0}]/child::*/child::*'
|
|||
|
_XP_TABLE_ROW_EXPAND = 'ancestor::div[@class="table__row-content"]/child::div[@class="table__row-expand"]'
|
|||
|
|
|||
|
_XP_NOTIFICATION_CLOSE = '//button[@class="notifications-layout__item-close"]'
|
|||
|
|
|||
|
_XP_NEW_TASK_BTN = '//div[@class="sidebar-tasks"]/button'
|
|||
|
_XP_SPAN_STATUS = '//span[@class="task-form__row-status"]'
|
|||
|
|
|||
|
_XP_LOGIN_USER = '//input[@placeholder="Имя пользователя"]'
|
|||
|
_XP_LOGIN_PW = '//input[@placeholder="Пароль"]'
|
|||
|
_XP_LOGIN_BTN = '//button[@type="button"]'
|
|||
|
|
|||
|
_XP_USER_PROFILE = '//div[@class="el-tooltip top-bar__user-profile"]'
|
|||
|
_XP_USER_EXIT = '//div[contains(text(),"Выйти")]'
|
|||
|
|
|||
|
_XP_TASKS_NEXT_BTN = '//i[@class="el-icon el-icon-arrow-right"]'
|
|||
|
_XP_TASKS_PAGER = '//ul[@class="el-pager"]'
|
|||
|
|
|||
|
_XP_TASK_SELECT_STEP = '//span[text()="Выбрать шаг"]/parent::button'
|
|||
|
_XP_TASK_CREATE_SUB = '//button[contains(text(),"Создать подзадачу")]'
|
|||
|
_XP_TASK_DATA = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::span'
|
|||
|
_XP_TASK_INPUT = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::input'
|
|||
|
_XP_TASK_CONTENT_URL = '//a[text()="{0}"]'
|
|||
|
_XP_TASK_RADIO_BTNS = '//span[@class="el-radio__inner"]'
|
|||
|
_XP_TASK_PAGINATION_CONTROL = '//span[@class="el-pagination__sizes"]/descendant::input'
|
|||
|
_XP_INPUT_ACCEPT = 'ancestor::div[@class="deferred-form-input__edit edit-mode"]/descendant::button[@class="button-with-icon-24x24 button-toggle"]'
|
|||
|
|
|||
|
_XP_DIALOG_LABEL = _XP_PREFIX_DIALOG + 'label[text()=" {1} " or text()="{1}"]'
|
|||
|
_XP_DIALOG_INPUT = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::input'
|
|||
|
_XP_DIALOG_COMBO = _XP_DIALOG_LABEL + '/following-sibling::div/child::div/child::div'
|
|||
|
_XP_DIALOG_TEXT_AREA = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::textarea'
|
|||
|
_XP_DIALOG_SWITCH = _XP_PREFIX_DIALOG + 'div[@id="{1}"]'
|
|||
|
|
|||
|
_XP_DIALOG_BTN_SELECT = '//button[contains(text(),"Выбрать")]'
|
|||
|
_XP_DIALOG_BTN_CLEAN = '(//span[@class="tasks-filters-dialog__footer"]/button)[2]'
|
|||
|
_XP_DIALOG_BTN_OK = '(//span[@class="tasks-filters-dialog__footer"]/button)[3]'
|
|||
|
_XP_DIALOG_BTN_PRIMARY = _XP_PREFIX_DIALOG + 'button[@class="button-dialog button-type-primary"]'
|
|||
|
_XP_DIALOG_BTN_CANCEL = _XP_PREFIX_DIALOG + 'button[@aria-label="Close"]'
|
|||
|
|
|||
|
_XP_CONTENT_LABEL = _XP_PREFIX_CONTENT + 'label[text()=" {0} " or text()="{0}"]'
|
|||
|
_XP_CONTENT_INPUT = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::input'
|
|||
|
_XP_CONTENT_COMBO = _XP_CONTENT_LABEL + '/following-sibling::div/child::div/child::div'
|
|||
|
_XP_CONTENT_TEXT_AREA = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::textarea'
|
|||
|
_XP_CONTENT_SWITCH = _XP_PREFIX_CONTENT + 'div[@id="{0}"]'
|
|||
|
|
|||
|
_XP_CONTENT_BTN_PRIMARY = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]'
|
|||
|
_XP_CONTENT_BTN_CANCEL = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]'
|
|||
|
_XP_CONTENT_BTN_EDIT = '//button[text()="Редактировать" or text()=" Редактировать "]'
|
|||
|
_XP_CONTENT_IS_EDITING = '//*[contains(text(),"атрибуты и признаки")]'
|
|||
|
|
|||
|
_XP_CONTENT_CHAPTER = '//p[contains(text(), "Раздел")]'
|
|||
|
_XP_CONTENT_VIEW_LABEL = '//span[@class="view-section__label" and text()="{0}:"]'
|
|||
|
_XP_CONTENT_VIEW_VALUE = _XP_CONTENT_VIEW_LABEL + '/following-sibling::span/span'
|
|||
|
_XP_CONTENT_TITLE = '//div[@class="top-bar__title-col flex"]/h2[text()="Редактор"]'
|
|||
|
|
|||
|
_XP_SEARCH_INPUT = '//label[contains(text(),"{0}")]/following-sibling::div/descendant::input'
|
|||
|
_XP_SCROLLER_ITEM = '//div[text()=" {0} " or text()="{0}" or text()=" {0} "]'
|
|||
|
_XP_SCROLLER_CHECKBOX = 'preceding-sibling::div[@class="tree__node-checkbox"]/label[@class="el-checkbox"]'
|
|||
|
_XP_LIST_ITEM = '//li[.="{0}" or .=" {0} "]'
|
|||
|
|
|||
|
_XP_RELATIVE_BTN_CLOSE = 'descendant::button[@class="catalogs-block__close icon-close-tags"]'
|
|||
|
_XP_RELATIVE_BTN_PLUS = 'following-sibling::button[@class="button-input-add"]'
|
|||
|
_XP_RELATIVE_OUTPUT_END = 'following-sibling::input[@placeholder="Конец"]'
|
|||
|
|
|||
|
_XP_SEARCH_QUERY = '//input[@placeholder="Поиск"]'
|
|||
|
_XP_SEARCH_BAR = '//div[@class="el-input el-input-group el-input-group--append el-input--suffix"]/input'
|
|||
|
_XP_SEARCH_BTN = '//button[@slot="append"]'
|
|||
|
_XP_SEARCH_OPTIONS = '//button[@class="tasks-filters__search-btn button-graphic"]'
|
|||
|
_XP_SEARCH_NO_RESULTS = '//div[@class="search-list__no-results"]'
|
|||
|
_XP_TASKS_VIEW_LIST = '(//button[@class="el-tooltip button-graphic"])[3]'
|
|||
|
_XP_SEARCH_MAIN_HREF = '(//span[@class="parent-placeholder el-tooltip"]/parent::*/parent::*/preceding-sibling::*)[5]/descendant::a'
|
|||
|
|
|||
|
_XP_DIV_TAB = '//p[contains(text(),"{0}")]/parent::*/parent::*'
|
|||
|
_XP_CONTENT_BTN_EDIT_TEXT = '//button[contains(text(),"Редактировать раздел")]'
|
|||
|
|
|||
|
_XP_EDITOR_TEXT_AREA = '//*[@class="ProseMirror"]'
|
|||
|
|
|||
|
def __init__(self, browser: WebBrowser, config: Config):
|
|||
|
self.browser = browser
|
|||
|
self._config = config
|
|||
|
self._output_context = ''
|
|||
|
self._test_run = self._config['Options'].getboolean('TestRun')
|
|||
|
|
|||
|
def __del__(self):
|
|||
|
pass
|
|||
|
|
|||
|
def login(self) -> bool:
|
|||
|
'''Login into Portal'''
|
|||
|
self.browser.driver.get(self._PAGE_LOGIN)
|
|||
|
if not self.browser.wait_presence(self._XP_LOGIN_TEST, By.CLASS_NAME):
|
|||
|
return False
|
|||
|
|
|||
|
user = self._config['UserData']['UserLogin']
|
|||
|
password = decrypt_user(user, self._config['UserData']['Password'], self._config['AppData']['Password'])
|
|||
|
self.browser.by_xpath(self._XP_LOGIN_USER).send_keys(user)
|
|||
|
self.browser.by_xpath(self._XP_LOGIN_PW).send_keys(password)
|
|||
|
self.browser.by_xpath(self._XP_LOGIN_BTN).click()
|
|||
|
return self._wait_loading()
|
|||
|
|
|||
|
def logout(self):
|
|||
|
'''Logout from current user'''
|
|||
|
try:
|
|||
|
self.browser.by_xpath(self._XP_USER_PROFILE).click()
|
|||
|
self.browser.by_xpath(self._XP_USER_EXIT).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
except NoSuchElementException:
|
|||
|
pass
|
|||
|
|
|||
|
def content_exists(self, content: str) -> bool:
|
|||
|
'''Check if content name is taken'''
|
|||
|
self._load_page(self._PAGE_CONTENT)
|
|||
|
|
|||
|
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
|
|||
|
search.clear()
|
|||
|
search.send_keys(content)
|
|||
|
search.send_keys(Keys.RETURN)
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content))
|
|||
|
return len(results) > 0
|
|||
|
|
|||
|
def content_has_text(self, content: str) -> bool:
|
|||
|
'''Check if found content has attached text'''
|
|||
|
content_cell = self.browser.by_xpath(self._XP_SPAN_EXACT_MATCH.format(content))
|
|||
|
return len(content_cell.find_elements(By.XPATH, self._XP_TABLE_ROW_EXPAND)) > 0
|
|||
|
|
|||
|
def create_task(self, data: dict) -> bool:
|
|||
|
'''Create task in system. Returns empty string on error'''
|
|||
|
if not self._open_new_task_form():
|
|||
|
return ''
|
|||
|
|
|||
|
self._output_context = 'Создание задачи'
|
|||
|
for field in FieldType:
|
|||
|
if field in data:
|
|||
|
if field in data:
|
|||
|
self._output_field(field, data[field])
|
|||
|
self._finalize_output()
|
|||
|
self._wait_dialog_done()
|
|||
|
return self.find_task_id(data[FieldType.task_name])
|
|||
|
|
|||
|
def find_task_id(self, task_name: str) -> str:
|
|||
|
'''Find task ID or empty string if not found'''
|
|||
|
self._load_page(self._PAGE_TASK)
|
|||
|
self._list_view()
|
|||
|
self._search_task_name(task_name)
|
|||
|
|
|||
|
for row in self.browser.by_xpath_m(self._XP_TABLE_ROW):
|
|||
|
found_name = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3)).text
|
|||
|
if found_name != task_name:
|
|||
|
continue
|
|||
|
parent = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(4)).text
|
|||
|
if parent != 'Нет':
|
|||
|
continue
|
|||
|
status = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(10)).text
|
|||
|
if status == 'Отменена':
|
|||
|
continue
|
|||
|
reference = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3))
|
|||
|
return reference.get_attribute('href').split('/')[-1]
|
|||
|
return ''
|
|||
|
|
|||
|
def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool:
|
|||
|
'''Change task status. Can fail (return false) if status is unavailable'''
|
|||
|
if task_id != '':
|
|||
|
self._load_page(self._PAGE_TASK_ID.format(task_id))
|
|||
|
self.browser.by_xpath(self._XP_TASK_SELECT_STEP).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
status_element = self.browser.by_xpath_m(self._XP_LIST_ITEM_CONTAINS.format(new_status))
|
|||
|
if len(status_element) == 0:
|
|||
|
return False
|
|||
|
if not self._test_run:
|
|||
|
status_element[0].click()
|
|||
|
time.sleep(self._config['Options'].getint('WaitStatus'))
|
|||
|
self.browser.driver.refresh()
|
|||
|
self._wait_loading()
|
|||
|
self._ensure_no_notifications()
|
|||
|
status = self._get_task_status()
|
|||
|
if status != status_label:
|
|||
|
return False
|
|||
|
return True
|
|||
|
|
|||
|
def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
|
|||
|
'''Create metadata. Also changes status for new content'''
|
|||
|
if not self._load_task_page(task_id, data[FieldType.task_name]):
|
|||
|
return False
|
|||
|
if self._has_content():
|
|||
|
return False
|
|||
|
|
|||
|
is_article = data[FieldType.task_type] in ['Статья', 'Актуализация статьи из ЭВ БРЭ']
|
|||
|
status = self._get_task_status()
|
|||
|
if status == 'Новая':
|
|||
|
status = 'Заполнение метаданных статьи' if is_article else 'Написание контента'
|
|||
|
status_label = status
|
|||
|
if not self.set_task_status('', status, status_label):
|
|||
|
return False
|
|||
|
|
|||
|
if not self._create_content(data, attributes) or not self._wait_loading():
|
|||
|
return False
|
|||
|
if not self._has_bibliography():
|
|||
|
self._create_bibliography_task()
|
|||
|
return self._progress_metedata(is_article)
|
|||
|
|
|||
|
def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool:
|
|||
|
'''Update metadata'''
|
|||
|
if not self._load_task_page(task_id, data[FieldType.task_name]):
|
|||
|
return False
|
|||
|
content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db]))
|
|||
|
if len(content) != 1:
|
|||
|
return False
|
|||
|
self._load_page(content[0].get_attribute('href'))
|
|||
|
return self._update_content(data, attributes)
|
|||
|
|
|||
|
def get_tasks_data(self, filters: list[list]) -> list:
|
|||
|
'''Export data for tasks matching task filter and department filter'''
|
|||
|
self._load_page(self._PAGE_TASK)
|
|||
|
self._list_view()
|
|||
|
self.browser.by_xpath(self._XP_TASK_PAGINATION_CONTROL).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('50 на странице')).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
if not self._search_all_tasks(filters):
|
|||
|
return []
|
|||
|
else:
|
|||
|
return self._read_all_tasks()
|
|||
|
|
|||
|
def get_tasks_content(self, task_ids: list[str]) -> list[list[str]]:
|
|||
|
'''Export content for specific task ids'''
|
|||
|
result = []
|
|||
|
for task_id in task_ids:
|
|||
|
try:
|
|||
|
self._load_page(self._PAGE_CONTENT_BY_TASK.format(task_id))
|
|||
|
self.browser.wait_ajax()
|
|||
|
title = self.browser.by_xpath_m(self._XP_CONTENT_TITLE)
|
|||
|
if len(title) == 0:
|
|||
|
continue
|
|||
|
result.append(self._read_content(task_id))
|
|||
|
|
|||
|
self._load_page(self._PAGE_TASK_ID.format(task_id))
|
|||
|
self.browser.wait_presence(self._XP_TASK_DATA.format(FieldType.task_manager.to_label()), By.XPATH)
|
|||
|
editor = self.browser.by_xpath(self._XP_TASK_DATA.format(FieldType.task_manager.to_label())).text
|
|||
|
result[-1].append(editor)
|
|||
|
except Exception as e:
|
|||
|
logging.info('Exception while reading content %s\n%s', task_id, e)
|
|||
|
return result
|
|||
|
|
|||
|
def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool:
|
|||
|
'''Update metadata
|
|||
|
if not self._load_task_page(task_id, data[FieldType.task_name]):
|
|||
|
return False
|
|||
|
content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db]))
|
|||
|
if len(content) != 1:
|
|||
|
return False
|
|||
|
self._load_page(content[0].get_attribute('href'))
|
|||
|
return self._update_content(data, attributes)'''
|
|||
|
|
|||
|
self._load_page(self._PAGE_CONTENT)
|
|||
|
|
|||
|
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
|
|||
|
search.clear()
|
|||
|
search.send_keys(content)
|
|||
|
search.send_keys(Keys.RETURN)
|
|||
|
self._wait_loading()
|
|||
|
|
|||
|
results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content))
|
|||
|
if len(results) != 1:
|
|||
|
return False
|
|||
|
|
|||
|
results[0].click()
|
|||
|
if not self._wait_loading():
|
|||
|
return False
|
|||
|
|
|||
|
self.browser.wait_ajax()
|
|||
|
main_tab = self.browser.by_xpath(self._XP_DIV_TAB.format('Раздел'))
|
|||
|
|
|||
|
main_tab.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
if len(authors) != 0:
|
|||
|
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click()
|
|||
|
self._wait_loading()
|
|||
|
self._add_attrbute("Автор (на портале)")
|
|||
|
self._output_field(FieldType.author, authors)
|
|||
|
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method
|
|||
|
else:
|
|||
|
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click()
|
|||
|
self._wait_loading()
|
|||
|
# field = self.browser.by_xpath('//label[text()=" Аффилированные организации " or text()="Аффилированные организации"]/parent::*//span[contains(text(),"Добавьте")]')
|
|||
|
# if len(field) != 0:
|
|||
|
# self._output_field(FieldType.affiliated_organizations, "Редакция технологий и техники")
|
|||
|
self._add_attrbute("Аффилированные организации")
|
|||
|
self._output_item("Аффилированные организации", InputMethod.combo_dialog, ["Редакция технологий и техники"]) # remove feature
|
|||
|
|
|||
|
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method
|
|||
|
|
|||
|
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Отменить")).click() # test method
|
|||
|
# parse data_reader module
|
|||
|
# разобрать на методы заведение авторов, библиографии, текст
|
|||
|
|
|||
|
if text != '':
|
|||
|
self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT_TEXT).click()
|
|||
|
|
|||
|
self.browser.wait_ajax()
|
|||
|
textarea = self.browser.by_xpath(self._XP_EDITOR_TEXT_AREA)
|
|||
|
|
|||
|
textarea.click()
|
|||
|
|
|||
|
textarea.send_keys(text)
|
|||
|
|
|||
|
# self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click()
|
|||
|
return True
|
|||
|
|
|||
|
def _load_page(self, page_url: str):
|
|||
|
self.browser.driver.get(page_url)
|
|||
|
if not self._wait_loading():
|
|||
|
raise BaseException(f"Cannot load page {page_url}")
|
|||
|
|
|||
|
if not self._login_required():
|
|||
|
return
|
|||
|
if not self.login():
|
|||
|
raise BaseException("Cannot login")
|
|||
|
|
|||
|
self.browser.driver.get(page_url)
|
|||
|
if not self._wait_loading():
|
|||
|
raise BaseException(f"Cannot load page {page_url}")
|
|||
|
|
|||
|
def _login_required(self) -> bool:
|
|||
|
return len(self.browser.driver.find_elements(By.CLASS_NAME, self._XP_LOGIN_TEST)) != 0
|
|||
|
|
|||
|
def _has_content(self) -> bool:
|
|||
|
return len(self.browser.by_xpath_m(self._XP_BUTTON_CONTAINS.format('+ Создать'))) == 0
|
|||
|
|
|||
|
def _is_content_editing(self) -> bool:
|
|||
|
elems = self.browser.by_xpath_m(self._XP_CONTENT_IS_EDITING)
|
|||
|
return len(elems) > 0
|
|||
|
|
|||
|
def _has_bibliography(self):
|
|||
|
return len(self.browser.by_xpath_m(self._XP_SPAN_CONTAINS.format('Раздел библиография'))) != 0
|
|||
|
|
|||
|
def _get_task_status(self) -> str:
|
|||
|
return self.browser.by_xpath(self._XP_SPAN_STATUS).text
|
|||
|
|
|||
|
def _wait_dialog_done(self):
|
|||
|
time.sleep(self._config['Options'].getint('WaitData'))
|
|||
|
|
|||
|
def _wait_loading(self) -> bool:
|
|||
|
if not self.browser.wait_presence(self._XP_MAIN_ADD_BUTTON, By.XPATH):
|
|||
|
return False
|
|||
|
self.browser.wait_ajax()
|
|||
|
return True
|
|||
|
|
|||
|
def _ensure_no_notifications(self):
|
|||
|
while True:
|
|||
|
close_btns = self.browser.by_xpath_m(self._XP_NOTIFICATION_CLOSE)
|
|||
|
if len(close_btns) == 0:
|
|||
|
return
|
|||
|
try:
|
|||
|
close_btns[0].click()
|
|||
|
except ElementNotInteractableException:
|
|||
|
pass
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _list_view(self):
|
|||
|
self.browser.by_xpath(self._XP_TASKS_VIEW_LIST).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _load_task_page(self, task_id: str, task_name: str) -> bool:
|
|||
|
self._load_page(self._PAGE_TASK_ID.format(task_id))
|
|||
|
task_header = self.browser.by_xpath('//h3').text
|
|||
|
return task_header == task_name
|
|||
|
|
|||
|
def _search_task_name(self, task_name: str):
|
|||
|
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
|
|||
|
search.clear()
|
|||
|
search.click()
|
|||
|
search.send_keys(task_name)
|
|||
|
|
|||
|
options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS)
|
|||
|
options.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH)
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('Точное совпадение')).click()
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _search_all_tasks(self, filters: list[list]):
|
|||
|
search = self.browser.by_xpath(self._XP_SEARCH_BAR)
|
|||
|
search.clear()
|
|||
|
|
|||
|
options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS)
|
|||
|
options.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH)
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self._output_context = self._CONTEXT_SEARCH
|
|||
|
for filter_id in range(FilterType.task_type, FilterType.observer):
|
|||
|
label = FilterType(filter_id).to_label()
|
|||
|
for value in filters[filter_id]:
|
|||
|
if value != '':
|
|||
|
self._output_combobox(label, value)
|
|||
|
if filters[FilterType.created_begin] != '':
|
|||
|
self._output_date_range(FilterType.created_begin.to_label(), filters[FilterType.created_begin], filters[FilterType.created_end])
|
|||
|
if filters[FilterType.target_begin] != '':
|
|||
|
self._output_date_range(FilterType.target_begin.to_label(), filters[FilterType.target_begin], filters[FilterType.target_end])
|
|||
|
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
no_result = self.browser.by_xpath(self._XP_SEARCH_NO_RESULTS)
|
|||
|
return not no_result.is_displayed()
|
|||
|
|
|||
|
def _read_all_tasks(self) -> list:
|
|||
|
result = []
|
|||
|
next_btn = self.browser.by_xpath(self._XP_TASKS_NEXT_BTN)
|
|||
|
count_pages = int(self.browser.by_xpath(self._XP_TASKS_PAGER).find_elements(By.XPATH, 'descendant::li')[-1].text)
|
|||
|
for _ in range(count_pages):
|
|||
|
result += self._scan_tasks_table()
|
|||
|
next_btn.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
time.sleep(random.randint(0, 100)/100)
|
|||
|
self._ensure_no_notifications()
|
|||
|
return result
|
|||
|
|
|||
|
def _scan_tasks_table(self) -> list:
|
|||
|
result = []
|
|||
|
for row in self.browser.by_xpath_m(self._XP_TABLE_ROW):
|
|||
|
task_type = self._read_cell(row, 2).text
|
|||
|
task_name = self._read_cell(row, 3).text
|
|||
|
task_id = self._read_cell(row, 3).get_attribute('href').split('/')[-1]
|
|||
|
parent = self._read_cell(row, 4)
|
|||
|
if parent.text != 'Нет':
|
|||
|
parent_id = parent.find_element(By.XPATH, 'descendant::a').get_attribute('href').split('/')[-1]
|
|||
|
else:
|
|||
|
parent_id = 'Нет'
|
|||
|
content_name = self._read_cell(row, 6).text
|
|||
|
target_date = self._read_cell(row, 7).text
|
|||
|
supervisor = self._read_cell(row, 8).text
|
|||
|
worker = self._read_cell(row, 9).text
|
|||
|
status = self._read_cell(row, 10).text
|
|||
|
result.append([task_type, status, content_name, supervisor, worker, target_date, task_id, task_name, parent_id])
|
|||
|
return result
|
|||
|
|
|||
|
def _read_content(self, task_id: str) -> list:
|
|||
|
editorial = self._read_field(FieldType.editorial)
|
|||
|
if editorial.startswith('Редакция '):
|
|||
|
editorial = editorial[len('Редакция '):]
|
|||
|
result = [
|
|||
|
task_id,
|
|||
|
self._read_field(FieldType.biblio_name),
|
|||
|
self._read_field(FieldType.change_score),
|
|||
|
editorial,
|
|||
|
self._read_field(FieldType.responsible),
|
|||
|
self._read_field(FieldType.definition),
|
|||
|
self._read_field(FieldType.object_type),
|
|||
|
self._read_field(FieldType.markers),
|
|||
|
self._read_field(FieldType.tags),
|
|||
|
self._read_field(FieldType.source),
|
|||
|
self._read_field(FieldType.electron_bre),
|
|||
|
self._read_field(FieldType.main_page),
|
|||
|
self._read_field(FieldType.is_general),
|
|||
|
self._read_field(FieldType.actualize_period),
|
|||
|
self._read_field(FieldType.age_restriction)
|
|||
|
]
|
|||
|
|
|||
|
chapters = self.browser.by_xpath_m(self._XP_CONTENT_CHAPTER)
|
|||
|
if len(chapters) == 0:
|
|||
|
author = ''
|
|||
|
else:
|
|||
|
chapters[0].click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
author = self._read_field(FieldType.author)
|
|||
|
result.append(author)
|
|||
|
return result
|
|||
|
|
|||
|
def _create_content(self, data: dict, attributes: list) -> bool:
|
|||
|
self._ensure_no_notifications()
|
|||
|
|
|||
|
if not self.browser.wait_clickable(self._XP_BUTTON_CONTAINS.format('+ Создать'), By.XPATH):
|
|||
|
return False
|
|||
|
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('+ Создать')).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self._output_context = 'Создать контент'
|
|||
|
self._output_content_data(data, attributes)
|
|||
|
self._wait_dialog_done()
|
|||
|
|
|||
|
if self._test_run:
|
|||
|
return True
|
|||
|
return self._has_content()
|
|||
|
|
|||
|
def _update_content(self, data: dict, attributes: list) -> bool:
|
|||
|
''' updates meta-data '''
|
|||
|
btn_edit = self.browser.by_xpath_m(self._XP_CONTENT_BTN_EDIT)
|
|||
|
if len(btn_edit) != 1:
|
|||
|
logging.warning('Не доступна кнопка Редактировать на странице контента')
|
|||
|
return False
|
|||
|
btn_edit[0].click()
|
|||
|
if not self._wait_loading() or not self._is_content_editing():
|
|||
|
return False
|
|||
|
|
|||
|
self._output_context = self._CONTEXT_CONTENT
|
|||
|
self._output_content_data(data, attributes)
|
|||
|
notifications = self.browser.by_xpath_m(self._XP_NOTIFICATION_UPDATE_OK.format(data[FieldType.content_name_db]))
|
|||
|
return len(notifications) > 0
|
|||
|
|
|||
|
def _create_bibliography_task(self):
|
|||
|
self._ensure_no_notifications()
|
|||
|
|
|||
|
executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.executor.to_label()))[0].text
|
|||
|
supervisor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text
|
|||
|
department = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.department.to_label()))[0].text
|
|||
|
date_target = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.date_target.to_label()))[0].text
|
|||
|
|
|||
|
self._output_context = 'Создание подзадачи'
|
|||
|
self.browser.by_xpath(self._XP_TASK_CREATE_SUB).click()
|
|||
|
self.browser.by_xpath_m(self._XP_TASK_RADIO_BTNS)[0].click()
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_PRIMARY.format(self._output_context)).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self._output_field(FieldType.task_type, 'Раздел библиография')
|
|||
|
self._output_field(FieldType.executor, executor)
|
|||
|
self._output_field(FieldType.supervisor, supervisor)
|
|||
|
self._output_field(FieldType.department, department)
|
|||
|
self._output_field(FieldType.date_target, date_target)
|
|||
|
|
|||
|
self._finalize_output()
|
|||
|
self._wait_dialog_done()
|
|||
|
|
|||
|
def _progress_metedata(self, is_article: bool) -> bool:
|
|||
|
if is_article:
|
|||
|
if not self.set_task_status('', 'В службу', 'Ожидание проверки службой "Словник"'):
|
|||
|
return False
|
|||
|
new_executor = self._config['AppData']['SlovnikExecutor']
|
|||
|
else:
|
|||
|
if not self.set_task_status('', 'Заведующему редакцией', 'Проверка Заведующим редакцией'):
|
|||
|
return False
|
|||
|
new_executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text
|
|||
|
self._ensure_no_notifications()
|
|||
|
self._output_task_combo(FieldType.executor.to_label(), new_executor)
|
|||
|
return True
|
|||
|
|
|||
|
def _read_cell(self, row: WebElement, column: int) -> WebElement:
|
|||
|
return row.find_element(By.XPATH, self._XP_TABLE_CELL.format(column))
|
|||
|
|
|||
|
def _read_field(self, field: FieldType) -> str:
|
|||
|
label = field.to_label()
|
|||
|
input_method = field.input_method()
|
|||
|
return self._read_item(label, input_method)
|
|||
|
|
|||
|
def _read_item(self, field_label: str, input_method: InputMethod) -> str:
|
|||
|
xpath = self._XP_CONTENT_VIEW_VALUE.format(field_label)
|
|||
|
retries = 3
|
|||
|
value = ''
|
|||
|
while True:
|
|||
|
elems = self.browser.by_xpath_m(xpath)
|
|||
|
if len(elems) == 0:
|
|||
|
return ''
|
|||
|
value = elems[0].text
|
|||
|
if (self._validate_input(value) or retries == 0):
|
|||
|
break
|
|||
|
retries = retries - 1
|
|||
|
time.sleep(0.5)
|
|||
|
value = value.replace('\xad', '#')
|
|||
|
value = value.replace('\xd7', '#')
|
|||
|
if value == self._CONTENT_OMITTED:
|
|||
|
return ''
|
|||
|
if input_method in [InputMethod.combo_box, InputMethod.combo_dialog, InputMethod.combo_dialog_simple_list]:
|
|||
|
value = _replace_comma_separators(value)
|
|||
|
return value
|
|||
|
|
|||
|
def _validate_input(self, value: str) -> bool:
|
|||
|
if value == '...':
|
|||
|
return False
|
|||
|
if value == 'Загрузка...':
|
|||
|
return False
|
|||
|
return True
|
|||
|
|
|||
|
def _open_new_task_form(self) -> bool:
|
|||
|
self._load_page(self._PAGE_TASK)
|
|||
|
self.browser.by_xpath(self._XP_NEW_TASK_BTN).click()
|
|||
|
return self.browser.wait_presence(self._XP_DIALOG_BTN_PRIMARY.format('Создание задачи'), By.XPATH)
|
|||
|
|
|||
|
def _output_content_data(self, data: dict, attributes: list):
|
|||
|
for field in FieldType:
|
|||
|
if field in data:
|
|||
|
self._output_field(field, data[field])
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
for attr in attributes:
|
|||
|
self._output_attribute(attr[0], attr[1], attr[2])
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
self._finalize_output()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _finalize_output(self):
|
|||
|
self._ensure_no_notifications()
|
|||
|
btn_path = self._xpath_finalize_btn(self._output_context)
|
|||
|
self.browser.by_xpath(btn_path).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _output_field(self, field: FieldType, value):
|
|||
|
label = field.to_label()
|
|||
|
input_method = field.input_method()
|
|||
|
self._output_item(label, input_method, value)
|
|||
|
|
|||
|
def _output_attribute(self, name: str, input_method: InputMethod, value):
|
|||
|
if not self._test_output_exists(name):
|
|||
|
self._add_attrbute(name)
|
|||
|
self._output_item(name, input_method, value)
|
|||
|
|
|||
|
def _add_attrbute(self, name: str):
|
|||
|
self._ensure_no_notifications()
|
|||
|
self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('Добавить признак')).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
self._select_dialog_item(name)
|
|||
|
|
|||
|
def _output_item(self, field_label: str, input_method: InputMethod, value):
|
|||
|
if value == '':
|
|||
|
return
|
|||
|
if input_method == InputMethod.read_only:
|
|||
|
return
|
|||
|
if not self._test_output_exists(field_label):
|
|||
|
return
|
|||
|
|
|||
|
if input_method == InputMethod.combo_box:
|
|||
|
self._output_combobox(field_label, value)
|
|||
|
elif input_method == InputMethod.text_input:
|
|||
|
self._output_text_input(field_label, value)
|
|||
|
elif input_method == InputMethod.text_area:
|
|||
|
self._output_text_area(field_label, value)
|
|||
|
elif input_method == InputMethod.date_picker:
|
|||
|
self._output_date(field_label, value)
|
|||
|
elif input_method == InputMethod.bool_toggle:
|
|||
|
self._output_bool(field_label, value)
|
|||
|
elif input_method == InputMethod.combo_dialog_simple_list:
|
|||
|
self._output_dialog(field_label, value, False)
|
|||
|
elif input_method == InputMethod.combo_dialog:
|
|||
|
if not isinstance(value, list):
|
|||
|
raise "List argument expected. Given: %s" % type(value)
|
|||
|
self._output_dialog(field_label, value, True)
|
|||
|
|
|||
|
def _test_output_exists(self, field_label: str) -> bool:
|
|||
|
xpath = self._xpath_label(field_label, self._output_context)
|
|||
|
elems = self.browser.by_xpath_m(xpath)
|
|||
|
return len(elems) > 0
|
|||
|
|
|||
|
def _output_bool(self, field_label: str, value: bool):
|
|||
|
bool_switch = self.browser.by_xpath(self._xpath_switch(field_label, self._output_context))
|
|||
|
current_value = 'is-checked' in bool_switch.get_attribute('class')
|
|||
|
if current_value != value:
|
|||
|
bool_switch.click()
|
|||
|
|
|||
|
def _output_text_input(self, field_label: str, value: str):
|
|||
|
self._output(self._xpath_input(field_label, self._output_context), value)
|
|||
|
|
|||
|
def _output_text_area(self, field_label: str, value: str):
|
|||
|
self._output(self._xpath_textarea(field_label, self._output_context), value)
|
|||
|
|
|||
|
def _output_combobox(self, field_label: str, value: str):
|
|||
|
if not self._output(self._xpath_input(field_label, self._output_context), value):
|
|||
|
return
|
|||
|
self.browser.wait_ajax()
|
|||
|
list_item = self._find_active_list(value)
|
|||
|
list_item.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
self.browser.send_esc()
|
|||
|
|
|||
|
def _output_date_range(self, field_label: str, value1: str, value2: str):
|
|||
|
element = self.browser.by_xpath(self._xpath_input(field_label, self._output_context))
|
|||
|
element.click()
|
|||
|
element.send_keys(value1)
|
|||
|
element = element.find_element(By.XPATH, self._XP_RELATIVE_OUTPUT_END)
|
|||
|
element.click()
|
|||
|
element.send_keys(value2)
|
|||
|
self.browser.wait_ajax()
|
|||
|
self.browser.send_enter()
|
|||
|
|
|||
|
def _output_task_combo(self, label: str, value: str):
|
|||
|
element = self.browser.by_xpath_m(self._XP_TASK_DATA.format(label))[0]
|
|||
|
element.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
element = self.browser.by_xpath_m(self._XP_TASK_INPUT.format(label))[0]
|
|||
|
element.send_keys(value)
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
list_item = self._find_active_list(value)
|
|||
|
list_item.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
accept_btn = element.find_element(By.XPATH, self._XP_INPUT_ACCEPT)
|
|||
|
if not self._test_run:
|
|||
|
accept_btn.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _output_date(self, field_label: str, value: str):
|
|||
|
self._output(self._xpath_input(field_label, self._output_context), value)
|
|||
|
|
|||
|
def _output_dialog(self, field_label, values: list[str], search_flag=True):
|
|||
|
self._ensure_no_notifications()
|
|||
|
item = self.browser.by_xpath(self._xpath_combo(field_label, self._output_context))
|
|||
|
remove_btns = item.find_elements(By.XPATH, self._XP_RELATIVE_BTN_CLOSE)
|
|||
|
for existing_item in remove_btns:
|
|||
|
existing_item.click()
|
|||
|
|
|||
|
btn_plus = item.find_element(By.XPATH, self._XP_RELATIVE_BTN_PLUS)
|
|||
|
for value in values:
|
|||
|
btn_plus.click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
self._select_dialog_item(value, search_flag)
|
|||
|
|
|||
|
def _output(self, xpath: str, value: str) -> bool:
|
|||
|
element = self.browser.by_xpath(xpath)
|
|||
|
element.click()
|
|||
|
try:
|
|||
|
element.clear()
|
|||
|
except InvalidElementStateException:
|
|||
|
pass
|
|||
|
element.send_keys(value)
|
|||
|
return True
|
|||
|
|
|||
|
def _find_active_list(self, value: str):
|
|||
|
list_items = self.browser.by_xpath_m(self._XP_LIST_ITEM.format(value))
|
|||
|
count_active = sum(1 for x in list_items if x.is_displayed())
|
|||
|
if count_active > 1:
|
|||
|
logging.info('%s multiple list results', value)
|
|||
|
for list_item in list_items:
|
|||
|
if list_item.is_displayed():
|
|||
|
return list_item
|
|||
|
logging.info('Cannot find list item: %s', value)
|
|||
|
return None
|
|||
|
|
|||
|
def _find_active_scroller(self, value: str):
|
|||
|
list_items = self.browser.by_xpath_m(self._XP_SCROLLER_ITEM.format(value))
|
|||
|
count_active = sum(1 for x in list_items if x.is_displayed())
|
|||
|
if count_active == 0:
|
|||
|
logging.warning('Cannot find checkbox list item: %s', value)
|
|||
|
raise NoSuchElementException
|
|||
|
if count_active > 1:
|
|||
|
logging.info('%s multiple list results', value)
|
|||
|
for list_item in list_items:
|
|||
|
checkbox = list_item.find_elements(By.XPATH, self._XP_SCROLLER_CHECKBOX)
|
|||
|
if len(checkbox) > 0 and list_item.is_displayed():
|
|||
|
return list_item
|
|||
|
return None
|
|||
|
|
|||
|
def _select_dialog_item(self, value: str, search_flag=True):
|
|||
|
if search_flag:
|
|||
|
search = self.browser.by_xpath(self._XP_SEARCH_QUERY)
|
|||
|
search.send_keys(value)
|
|||
|
search.send_keys(Keys.ENTER)
|
|||
|
self.browser.wait_ajax()
|
|||
|
time.sleep(1) # additional wait for list to load
|
|||
|
|
|||
|
list_item = self._find_active_scroller(value)
|
|||
|
list_item.click()
|
|||
|
|
|||
|
self.browser.by_xpath(self._XP_DIALOG_BTN_SELECT).click()
|
|||
|
self.browser.wait_ajax()
|
|||
|
|
|||
|
def _xpath_label(self, field_label: str, context: str) -> str:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_LABEL.format(field_label)
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_LABEL.format(context, field_label)
|
|||
|
|
|||
|
def _xpath_input(self, field_label: str, context: str) -> str:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_INPUT.format(field_label)
|
|||
|
elif context == self._CONTEXT_SEARCH:
|
|||
|
return self._XP_SEARCH_INPUT.format(field_label)
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_INPUT.format(context, field_label)
|
|||
|
|
|||
|
def _xpath_textarea(self, field_label: str, context: str) -> str:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_TEXT_AREA.format(field_label)
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_TEXT_AREA.format(context, field_label)
|
|||
|
|
|||
|
def _xpath_combo(self, field_label: str, context: str) -> str:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_COMBO.format(field_label)
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_COMBO.format(context, field_label)
|
|||
|
|
|||
|
def _xpath_switch(self, field_label: str, context: str) -> str:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_SWITCH.format(field_label)
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_SWITCH.format(context, field_label)
|
|||
|
|
|||
|
def _xpath_finalize_btn(self, context: str) -> str:
|
|||
|
if not self._test_run:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_BTN_PRIMARY
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_BTN_PRIMARY.format(context)
|
|||
|
else:
|
|||
|
if context == self._CONTEXT_CONTENT:
|
|||
|
return self._XP_CONTENT_BTN_CANCEL
|
|||
|
else:
|
|||
|
return self._XP_DIALOG_BTN_CANCEL.format(context)
|