"""Uploading data to rk.greatbook.ru""" import time import logging import random from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from selenium.common.exceptions import ( NoSuchElementException, InvalidElementStateException, ElementNotInteractableException ) from selenium.webdriver.remote.webelement import WebElement from .selenium_wrapper import WebBrowser from .info_models import InputMethod, FieldType, FilterType from .config import Config from .crypto import decrypt_user def _replace_comma_separators(value: str) -> str: start = 0 while True: pos = value.find(", ", start) if pos == -1 or pos + 2 >= len(value): return value if value[pos + 2].isupper(): value = ';'.join([value[:pos], value[pos + 2:]]) start = pos + 1 class GreatbookUploader: """Entry point into portal data input automation""" def __init__(self, browser: WebBrowser, config: Config): self._impl = _UploaderImpl(browser, config) self._login_timeout = config['Options'].getint('LoginTimeout') self._time_login = 0 def __del__(self): pass def login(self) -> bool: '''Login into Portal''' if not self._impl.login(): return False self._time_login = time.time() return True def logout(self): '''Logout from current user''' self._impl.logout() def content_exists(self, content: str) -> bool: '''Check if content name is taken''' self._ensure_login() return self._impl.content_exists(content) def content_has_text(self, content: str) -> bool: '''Warning! Requires to run content_exists first''' return self._impl.content_has_text(content) def create_task(self, data: dict) -> bool: '''Create task in system. Returns empty string on error''' self._ensure_login() return self._impl.create_task(data) def find_task_id(self, task_name: str) -> str: '''Find task ID or empty string if not found''' self._ensure_login() return self._impl.find_task_id(task_name) def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool: '''Change task status. Can fail (return false) if status is unavailable''' self._ensure_login() return self._impl.set_task_status(task_id, new_status, status_label) def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool: '''Create metadata. Also changes status for new content''' self._ensure_login() return self._impl.fill_metadata(task_id, data, attributes) def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool: '''Update metadata''' self._ensure_login() return self._impl.update_metadata(task_id, data, attributes) def get_tasks_data(self, filters: list[list]) -> list: '''Export data for tasks matching task filter and department filter''' self._ensure_login() return self._impl.get_tasks_data(filters) def get_tasks_content(self, task_ids: list[str]) -> list: '''Export content for specific task ids''' self._ensure_login() return self._impl.get_tasks_content(task_ids) def _ensure_login(self): if time.time() - self._time_login < self._login_timeout: return logging.info('Relogging at %s', time.strftime('%H:%M:%S')) self.logout() self.login() def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool: '''Loads content''' self._ensure_login() return self._impl.load_content(content, authors, text, bibliography) class _UploaderImpl: """Implementation of portal data input automation""" _CONTEXT_CONTENT = 'CRUD EDITOR' _CONTEXT_SEARCH = 'SEARCH' _CONTENT_OMITTED = 'Заполните обязательный признак' _XP_PREFIX_DIALOG = '//span[contains(text(),"{0}")]/parent::*/parent::*/descendant::' _XP_PREFIX_CONTENT = '//section[@class="article-CRUD__widget"]/descendant::' _XP_NOTIFICATION_UPDATE_OK = '//*[contains(text(),"{0} успешно отредактирован")]' _PAGE_PORTAL = 'https://rk.greatbook.ru' _PAGE_LOGIN = _PAGE_PORTAL _PAGE_TASK = _PAGE_PORTAL + '/tasks/kanban/all' _PAGE_CONTENT = _PAGE_PORTAL + '/glossary/all' _PAGE_TASK_ID = _PAGE_PORTAL + '/tasks/{0}' _PAGE_CONTENT_BY_TASK = _PAGE_PORTAL + '/widgets?link=task&id={0}' _XP_LOGIN_TEST = 'app-login__right-container' _XP_MAIN_ADD_BUTTON = '//button[@class="el-tooltip button-input-add primary"]' _XP_LIST_ITEM_CONTAINS = '//li[contains(text(),"{0}")]' _XP_SPAN_CONTAINS = '//span[contains(text(),"{0}")]' _XP_SPAN_EXACT_MATCH = '//span[text()=" {0} " or text()="{0}"]' _XP_BUTTON_CONTAINS = '//button[contains(text(),"{0}")]' _XP_TABLE_ROW = '//div[@class="table__row"]' _XP_TABLE_CELL = '(descendant::div[@class="table__row-cell sortable"])[{0}]/child::*/child::*' _XP_TABLE_ROW_EXPAND = 'ancestor::div[@class="table__row-content"]/child::div[@class="table__row-expand"]' _XP_NOTIFICATION_CLOSE = '//button[@class="notifications-layout__item-close"]' _XP_NEW_TASK_BTN = '//div[@class="sidebar-tasks"]/button' _XP_SPAN_STATUS = '//span[@class="task-form__row-status"]' _XP_LOGIN_USER = '//input[@placeholder="Имя пользователя"]' _XP_LOGIN_PW = '//input[@placeholder="Пароль"]' _XP_LOGIN_BTN = '//button[@type="button"]' _XP_USER_PROFILE = '//div[@class="el-tooltip top-bar__user-profile"]' _XP_USER_EXIT = '//div[contains(text(),"Выйти")]' _XP_TASKS_NEXT_BTN = '//i[@class="el-icon el-icon-arrow-right"]' _XP_TASKS_PAGER = '//ul[@class="el-pager"]' _XP_TASK_SELECT_STEP = '//span[text()="Выбрать шаг"]/parent::button' _XP_TASK_CREATE_SUB = '//button[contains(text(),"Создать подзадачу")]' _XP_TASK_DATA = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::span' _XP_TASK_INPUT = '//label[text()=" {0} " or text()="{0}"]/following-sibling::div/descendant::input' _XP_TASK_CONTENT_URL = '//a[text()="{0}"]' _XP_TASK_RADIO_BTNS = '//span[@class="el-radio__inner"]' _XP_TASK_PAGINATION_CONTROL = '//span[@class="el-pagination__sizes"]/descendant::input' _XP_INPUT_ACCEPT = 'ancestor::div[@class="deferred-form-input__edit edit-mode"]/descendant::button[@class="button-with-icon-24x24 button-toggle"]' _XP_DIALOG_LABEL = _XP_PREFIX_DIALOG + 'label[text()=" {1} " or text()="{1}"]' _XP_DIALOG_INPUT = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::input' _XP_DIALOG_COMBO = _XP_DIALOG_LABEL + '/following-sibling::div/child::div/child::div' _XP_DIALOG_TEXT_AREA = _XP_DIALOG_LABEL + '/following-sibling::div/descendant::textarea' _XP_DIALOG_SWITCH = _XP_PREFIX_DIALOG + 'div[@id="{1}"]' _XP_DIALOG_BTN_SELECT = '//button[contains(text(),"Выбрать")]' _XP_DIALOG_BTN_CLEAN = '(//span[@class="tasks-filters-dialog__footer"]/button)[2]' _XP_DIALOG_BTN_OK = '(//span[@class="tasks-filters-dialog__footer"]/button)[3]' _XP_DIALOG_BTN_PRIMARY = _XP_PREFIX_DIALOG + 'button[@class="button-dialog button-type-primary"]' _XP_DIALOG_BTN_CANCEL = _XP_PREFIX_DIALOG + 'button[@aria-label="Close"]' _XP_CONTENT_LABEL = _XP_PREFIX_CONTENT + 'label[text()=" {0} " or text()="{0}"]' _XP_CONTENT_INPUT = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::input' _XP_CONTENT_COMBO = _XP_CONTENT_LABEL + '/following-sibling::div/child::div/child::div' _XP_CONTENT_TEXT_AREA = _XP_CONTENT_LABEL + '/following-sibling::div/descendant::textarea' _XP_CONTENT_SWITCH = _XP_PREFIX_CONTENT + 'div[@id="{0}"]' _XP_CONTENT_BTN_PRIMARY = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]' _XP_CONTENT_BTN_CANCEL = _XP_PREFIX_CONTENT + 'button[@class="button-dialog button-type-primary"]' _XP_CONTENT_BTN_EDIT = '//button[text()="Редактировать" or text()=" Редактировать "]' _XP_CONTENT_IS_EDITING = '//*[contains(text(),"атрибуты и признаки")]' _XP_CONTENT_CHAPTER = '//p[contains(text(), "Раздел")]' _XP_CONTENT_VIEW_LABEL = '//span[@class="view-section__label" and text()="{0}:"]' _XP_CONTENT_VIEW_VALUE = _XP_CONTENT_VIEW_LABEL + '/following-sibling::span/span' _XP_CONTENT_TITLE = '//div[@class="top-bar__title-col flex"]/h2[text()="Редактор"]' _XP_SEARCH_INPUT = '//label[contains(text(),"{0}")]/following-sibling::div/descendant::input' _XP_SCROLLER_ITEM = '//div[text()=" {0} " or text()="{0}" or text()=" {0} "]' _XP_SCROLLER_CHECKBOX = 'preceding-sibling::div[@class="tree__node-checkbox"]/label[@class="el-checkbox"]' _XP_LIST_ITEM = '//li[.="{0}" or .=" {0} "]' _XP_RELATIVE_BTN_CLOSE = 'descendant::button[@class="catalogs-block__close icon-close-tags"]' _XP_RELATIVE_BTN_PLUS = 'following-sibling::button[@class="button-input-add"]' _XP_RELATIVE_OUTPUT_END = 'following-sibling::input[@placeholder="Конец"]' _XP_SEARCH_QUERY = '//input[@placeholder="Поиск"]' _XP_SEARCH_BAR = '//div[@class="el-input el-input-group el-input-group--append el-input--suffix"]/input' _XP_SEARCH_BTN = '//button[@slot="append"]' _XP_SEARCH_OPTIONS = '//button[@class="tasks-filters__search-btn button-graphic"]' _XP_SEARCH_NO_RESULTS = '//div[@class="search-list__no-results"]' _XP_TASKS_VIEW_LIST = '(//button[@class="el-tooltip button-graphic"])[3]' _XP_SEARCH_MAIN_HREF = '(//span[@class="parent-placeholder el-tooltip"]/parent::*/parent::*/preceding-sibling::*)[5]/descendant::a' _XP_DIV_TAB = '//p[contains(text(),"{0}")]/parent::*/parent::*' _XP_CONTENT_BTN_EDIT_TEXT = '//button[contains(text(),"Редактировать раздел")]' _XP_EDITOR_TEXT_AREA = '//*[@class="ProseMirror"]' def __init__(self, browser: WebBrowser, config: Config): self.browser = browser self._config = config self._output_context = '' self._test_run = self._config['Options'].getboolean('TestRun') def __del__(self): pass def login(self) -> bool: '''Login into Portal''' self.browser.driver.get(self._PAGE_LOGIN) if not self.browser.wait_presence(self._XP_LOGIN_TEST, By.CLASS_NAME): return False user = self._config['UserData']['UserLogin'] password = decrypt_user(user, self._config['UserData']['Password'], self._config['AppData']['Password']) self.browser.by_xpath(self._XP_LOGIN_USER).send_keys(user) self.browser.by_xpath(self._XP_LOGIN_PW).send_keys(password) self.browser.by_xpath(self._XP_LOGIN_BTN).click() return self._wait_loading() def logout(self): '''Logout from current user''' try: self.browser.by_xpath(self._XP_USER_PROFILE).click() self.browser.by_xpath(self._XP_USER_EXIT).click() self.browser.wait_ajax() except NoSuchElementException: pass def content_exists(self, content: str) -> bool: '''Check if content name is taken''' self._load_page(self._PAGE_CONTENT) search = self.browser.by_xpath(self._XP_SEARCH_BAR) search.clear() search.send_keys(content) search.send_keys(Keys.RETURN) self.browser.wait_ajax() results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content)) return len(results) > 0 def content_has_text(self, content: str) -> bool: '''Check if found content has attached text''' content_cell = self.browser.by_xpath(self._XP_SPAN_EXACT_MATCH.format(content)) return len(content_cell.find_elements(By.XPATH, self._XP_TABLE_ROW_EXPAND)) > 0 def create_task(self, data: dict) -> bool: '''Create task in system. Returns empty string on error''' if not self._open_new_task_form(): return '' self._output_context = 'Создание задачи' for field in FieldType: if field in data: if field in data: self._output_field(field, data[field]) self._finalize_output() self._wait_dialog_done() return self.find_task_id(data[FieldType.task_name]) def find_task_id(self, task_name: str) -> str: '''Find task ID or empty string if not found''' self._load_page(self._PAGE_TASK) self._list_view() self._search_task_name(task_name) for row in self.browser.by_xpath_m(self._XP_TABLE_ROW): found_name = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3)).text if found_name != task_name: continue parent = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(4)).text if parent != 'Нет': continue status = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(10)).text if status == 'Отменена': continue reference = row.find_element(By.XPATH, self._XP_TABLE_CELL.format(3)) return reference.get_attribute('href').split('/')[-1] return '' def set_task_status(self, task_id: str, new_status: str, status_label: str) -> bool: '''Change task status. Can fail (return false) if status is unavailable''' if task_id != '': self._load_page(self._PAGE_TASK_ID.format(task_id)) self.browser.by_xpath(self._XP_TASK_SELECT_STEP).click() self.browser.wait_ajax() status_element = self.browser.by_xpath_m(self._XP_LIST_ITEM_CONTAINS.format(new_status)) if len(status_element) == 0: return False if not self._test_run: status_element[0].click() time.sleep(self._config['Options'].getint('WaitStatus')) self.browser.driver.refresh() self._wait_loading() self._ensure_no_notifications() status = self._get_task_status() if status != status_label: return False return True def fill_metadata(self, task_id: str, data: dict, attributes: list) -> bool: '''Create metadata. Also changes status for new content''' if not self._load_task_page(task_id, data[FieldType.task_name]): return False if self._has_content(): return False is_article = data[FieldType.task_type] in ['Статья', 'Актуализация статьи из ЭВ БРЭ'] status = self._get_task_status() if status == 'Новая': status = 'Заполнение метаданных статьи' if is_article else 'Написание контента' status_label = status if not self.set_task_status('', status, status_label): return False if not self._create_content(data, attributes) or not self._wait_loading(): return False if not self._has_bibliography(): self._create_bibliography_task() return self._progress_metedata(is_article) def update_metadata(self, task_id: str, data: dict, attributes: list) -> bool: '''Update metadata''' if not self._load_task_page(task_id, data[FieldType.task_name]): return False content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db])) if len(content) != 1: return False self._load_page(content[0].get_attribute('href')) return self._update_content(data, attributes) def get_tasks_data(self, filters: list[list]) -> list: '''Export data for tasks matching task filter and department filter''' self._load_page(self._PAGE_TASK) self._list_view() self.browser.by_xpath(self._XP_TASK_PAGINATION_CONTROL).click() self.browser.wait_ajax() self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('50 на странице')).click() self.browser.wait_ajax() if not self._search_all_tasks(filters): return [] else: return self._read_all_tasks() def get_tasks_content(self, task_ids: list[str]) -> list[list[str]]: '''Export content for specific task ids''' result = [] for task_id in task_ids: try: self._load_page(self._PAGE_CONTENT_BY_TASK.format(task_id)) self.browser.wait_ajax() title = self.browser.by_xpath_m(self._XP_CONTENT_TITLE) if len(title) == 0: continue result.append(self._read_content(task_id)) self._load_page(self._PAGE_TASK_ID.format(task_id)) self.browser.wait_presence(self._XP_TASK_DATA.format(FieldType.task_manager.to_label()), By.XPATH) editor = self.browser.by_xpath(self._XP_TASK_DATA.format(FieldType.task_manager.to_label())).text result[-1].append(editor) except Exception as e: logging.info('Exception while reading content %s\n%s', task_id, e) return result def load_content(self, content: str, authors: str, text: str, bibliography: str) -> bool: '''Update metadata if not self._load_task_page(task_id, data[FieldType.task_name]): return False content = self.browser.by_xpath_m(self._XP_TASK_CONTENT_URL.format(data[FieldType.content_name_db])) if len(content) != 1: return False self._load_page(content[0].get_attribute('href')) return self._update_content(data, attributes)''' self._load_page(self._PAGE_CONTENT) search = self.browser.by_xpath(self._XP_SEARCH_BAR) search.clear() search.send_keys(content) search.send_keys(Keys.RETURN) self._wait_loading() results = self.browser.by_xpath_m(self._XP_SPAN_EXACT_MATCH.format(content)) if len(results) != 1: return False results[0].click() if not self._wait_loading(): return False self.browser.wait_ajax() main_tab = self.browser.by_xpath(self._XP_DIV_TAB.format('Раздел')) main_tab.click() self.browser.wait_ajax() if len(authors) != 0: self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click() self._wait_loading() self._add_attrbute("Автор (на портале)") self._output_field(FieldType.author, authors) self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method else: self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT).click() self._wait_loading() # field = self.browser.by_xpath('//label[text()=" Аффилированные организации " or text()="Аффилированные организации"]/parent::*//span[contains(text(),"Добавьте")]') # if len(field) != 0: # self._output_field(FieldType.affiliated_organizations, "Редакция технологий и техники") self._add_attrbute("Аффилированные организации") self._output_item("Аффилированные организации", InputMethod.combo_dialog, ["Редакция технологий и техники"]) # remove feature self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() # test method self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Отменить")).click() # test method # parse data_reader module # разобрать на методы заведение авторов, библиографии, текст if text != '': self.browser.by_xpath(self._XP_CONTENT_BTN_EDIT_TEXT).click() self.browser.wait_ajax() textarea = self.browser.by_xpath(self._XP_EDITOR_TEXT_AREA) textarea.click() textarea.send_keys(text) # self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format("Сохранить")).click() return True def _load_page(self, page_url: str): self.browser.driver.get(page_url) if not self._wait_loading(): raise BaseException(f"Cannot load page {page_url}") if not self._login_required(): return if not self.login(): raise BaseException("Cannot login") self.browser.driver.get(page_url) if not self._wait_loading(): raise BaseException(f"Cannot load page {page_url}") def _login_required(self) -> bool: return len(self.browser.driver.find_elements(By.CLASS_NAME, self._XP_LOGIN_TEST)) != 0 def _has_content(self) -> bool: return len(self.browser.by_xpath_m(self._XP_BUTTON_CONTAINS.format('+ Создать'))) == 0 def _is_content_editing(self) -> bool: elems = self.browser.by_xpath_m(self._XP_CONTENT_IS_EDITING) return len(elems) > 0 def _has_bibliography(self): return len(self.browser.by_xpath_m(self._XP_SPAN_CONTAINS.format('Раздел библиография'))) != 0 def _get_task_status(self) -> str: return self.browser.by_xpath(self._XP_SPAN_STATUS).text def _wait_dialog_done(self): time.sleep(self._config['Options'].getint('WaitData')) def _wait_loading(self) -> bool: if not self.browser.wait_presence(self._XP_MAIN_ADD_BUTTON, By.XPATH): return False self.browser.wait_ajax() return True def _ensure_no_notifications(self): while True: close_btns = self.browser.by_xpath_m(self._XP_NOTIFICATION_CLOSE) if len(close_btns) == 0: return try: close_btns[0].click() except ElementNotInteractableException: pass self.browser.wait_ajax() def _list_view(self): self.browser.by_xpath(self._XP_TASKS_VIEW_LIST).click() self.browser.wait_ajax() def _load_task_page(self, task_id: str, task_name: str) -> bool: self._load_page(self._PAGE_TASK_ID.format(task_id)) task_header = self.browser.by_xpath('//h3').text return task_header == task_name def _search_task_name(self, task_name: str): search = self.browser.by_xpath(self._XP_SEARCH_BAR) search.clear() search.click() search.send_keys(task_name) options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS) options.click() self.browser.wait_ajax() self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH) self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click() self.browser.wait_ajax() self.browser.by_xpath(self._XP_SPAN_CONTAINS.format('Точное совпадение')).click() self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click() self.browser.wait_ajax() def _search_all_tasks(self, filters: list[list]): search = self.browser.by_xpath(self._XP_SEARCH_BAR) search.clear() options = self.browser.by_xpath(self._XP_SEARCH_OPTIONS) options.click() self.browser.wait_ajax() self.browser.wait_presence(self._XP_DIALOG_BTN_CLEAN, By.XPATH) self.browser.by_xpath(self._XP_DIALOG_BTN_CLEAN).click() self.browser.wait_ajax() self._output_context = self._CONTEXT_SEARCH for filter_id in range(FilterType.task_type, FilterType.observer): label = FilterType(filter_id).to_label() for value in filters[filter_id]: if value != '': self._output_combobox(label, value) if filters[FilterType.created_begin] != '': self._output_date_range(FilterType.created_begin.to_label(), filters[FilterType.created_begin], filters[FilterType.created_end]) if filters[FilterType.target_begin] != '': self._output_date_range(FilterType.target_begin.to_label(), filters[FilterType.target_begin], filters[FilterType.target_end]) self.browser.by_xpath(self._XP_DIALOG_BTN_OK).click() self.browser.wait_ajax() no_result = self.browser.by_xpath(self._XP_SEARCH_NO_RESULTS) return not no_result.is_displayed() def _read_all_tasks(self) -> list: result = [] next_btn = self.browser.by_xpath(self._XP_TASKS_NEXT_BTN) count_pages = int(self.browser.by_xpath(self._XP_TASKS_PAGER).find_elements(By.XPATH, 'descendant::li')[-1].text) for _ in range(count_pages): result += self._scan_tasks_table() next_btn.click() self.browser.wait_ajax() time.sleep(random.randint(0, 100)/100) self._ensure_no_notifications() return result def _scan_tasks_table(self) -> list: result = [] for row in self.browser.by_xpath_m(self._XP_TABLE_ROW): task_type = self._read_cell(row, 2).text task_name = self._read_cell(row, 3).text task_id = self._read_cell(row, 3).get_attribute('href').split('/')[-1] parent = self._read_cell(row, 4) if parent.text != 'Нет': parent_id = parent.find_element(By.XPATH, 'descendant::a').get_attribute('href').split('/')[-1] else: parent_id = 'Нет' content_name = self._read_cell(row, 6).text target_date = self._read_cell(row, 7).text supervisor = self._read_cell(row, 8).text worker = self._read_cell(row, 9).text status = self._read_cell(row, 10).text result.append([task_type, status, content_name, supervisor, worker, target_date, task_id, task_name, parent_id]) return result def _read_content(self, task_id: str) -> list: editorial = self._read_field(FieldType.editorial) if editorial.startswith('Редакция '): editorial = editorial[len('Редакция '):] result = [ task_id, self._read_field(FieldType.biblio_name), self._read_field(FieldType.change_score), editorial, self._read_field(FieldType.responsible), self._read_field(FieldType.definition), self._read_field(FieldType.object_type), self._read_field(FieldType.markers), self._read_field(FieldType.tags), self._read_field(FieldType.source), self._read_field(FieldType.electron_bre), self._read_field(FieldType.main_page), self._read_field(FieldType.is_general), self._read_field(FieldType.actualize_period), self._read_field(FieldType.age_restriction) ] chapters = self.browser.by_xpath_m(self._XP_CONTENT_CHAPTER) if len(chapters) == 0: author = '' else: chapters[0].click() self.browser.wait_ajax() author = self._read_field(FieldType.author) result.append(author) return result def _create_content(self, data: dict, attributes: list) -> bool: self._ensure_no_notifications() if not self.browser.wait_clickable(self._XP_BUTTON_CONTAINS.format('+ Создать'), By.XPATH): return False self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('+ Создать')).click() self.browser.wait_ajax() self._output_context = 'Создать контент' self._output_content_data(data, attributes) self._wait_dialog_done() if self._test_run: return True return self._has_content() def _update_content(self, data: dict, attributes: list) -> bool: ''' updates meta-data ''' btn_edit = self.browser.by_xpath_m(self._XP_CONTENT_BTN_EDIT) if len(btn_edit) != 1: logging.warning('Не доступна кнопка Редактировать на странице контента') return False btn_edit[0].click() if not self._wait_loading() or not self._is_content_editing(): return False self._output_context = self._CONTEXT_CONTENT self._output_content_data(data, attributes) notifications = self.browser.by_xpath_m(self._XP_NOTIFICATION_UPDATE_OK.format(data[FieldType.content_name_db])) return len(notifications) > 0 def _create_bibliography_task(self): self._ensure_no_notifications() executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.executor.to_label()))[0].text supervisor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text department = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.department.to_label()))[0].text date_target = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.date_target.to_label()))[0].text self._output_context = 'Создание подзадачи' self.browser.by_xpath(self._XP_TASK_CREATE_SUB).click() self.browser.by_xpath_m(self._XP_TASK_RADIO_BTNS)[0].click() self.browser.by_xpath(self._XP_DIALOG_BTN_PRIMARY.format(self._output_context)).click() self.browser.wait_ajax() self._output_field(FieldType.task_type, 'Раздел библиография') self._output_field(FieldType.executor, executor) self._output_field(FieldType.supervisor, supervisor) self._output_field(FieldType.department, department) self._output_field(FieldType.date_target, date_target) self._finalize_output() self._wait_dialog_done() def _progress_metedata(self, is_article: bool) -> bool: if is_article: if not self.set_task_status('', 'В службу', 'Ожидание проверки службой "Словник"'): return False new_executor = self._config['AppData']['SlovnikExecutor'] else: if not self.set_task_status('', 'Заведующему редакцией', 'Проверка Заведующим редакцией'): return False new_executor = self.browser.by_xpath_m(self._XP_TASK_DATA.format(FieldType.supervisor.to_label()))[0].text self._ensure_no_notifications() self._output_task_combo(FieldType.executor.to_label(), new_executor) return True def _read_cell(self, row: WebElement, column: int) -> WebElement: return row.find_element(By.XPATH, self._XP_TABLE_CELL.format(column)) def _read_field(self, field: FieldType) -> str: label = field.to_label() input_method = field.input_method() return self._read_item(label, input_method) def _read_item(self, field_label: str, input_method: InputMethod) -> str: xpath = self._XP_CONTENT_VIEW_VALUE.format(field_label) retries = 3 value = '' while True: elems = self.browser.by_xpath_m(xpath) if len(elems) == 0: return '' value = elems[0].text if (self._validate_input(value) or retries == 0): break retries = retries - 1 time.sleep(0.5) value = value.replace('\xad', '#') value = value.replace('\xd7', '#') if value == self._CONTENT_OMITTED: return '' if input_method in [InputMethod.combo_box, InputMethod.combo_dialog, InputMethod.combo_dialog_simple_list]: value = _replace_comma_separators(value) return value def _validate_input(self, value: str) -> bool: if value == '...': return False if value == 'Загрузка...': return False return True def _open_new_task_form(self) -> bool: self._load_page(self._PAGE_TASK) self.browser.by_xpath(self._XP_NEW_TASK_BTN).click() return self.browser.wait_presence(self._XP_DIALOG_BTN_PRIMARY.format('Создание задачи'), By.XPATH) def _output_content_data(self, data: dict, attributes: list): for field in FieldType: if field in data: self._output_field(field, data[field]) self.browser.wait_ajax() for attr in attributes: self._output_attribute(attr[0], attr[1], attr[2]) self.browser.wait_ajax() self._finalize_output() self.browser.wait_ajax() def _finalize_output(self): self._ensure_no_notifications() btn_path = self._xpath_finalize_btn(self._output_context) self.browser.by_xpath(btn_path).click() self.browser.wait_ajax() def _output_field(self, field: FieldType, value): label = field.to_label() input_method = field.input_method() self._output_item(label, input_method, value) def _output_attribute(self, name: str, input_method: InputMethod, value): if not self._test_output_exists(name): self._add_attrbute(name) self._output_item(name, input_method, value) def _add_attrbute(self, name: str): self._ensure_no_notifications() self.browser.by_xpath(self._XP_BUTTON_CONTAINS.format('Добавить признак')).click() self.browser.wait_ajax() self._select_dialog_item(name) def _output_item(self, field_label: str, input_method: InputMethod, value): if value == '': return if input_method == InputMethod.read_only: return if not self._test_output_exists(field_label): return if input_method == InputMethod.combo_box: self._output_combobox(field_label, value) elif input_method == InputMethod.text_input: self._output_text_input(field_label, value) elif input_method == InputMethod.text_area: self._output_text_area(field_label, value) elif input_method == InputMethod.date_picker: self._output_date(field_label, value) elif input_method == InputMethod.bool_toggle: self._output_bool(field_label, value) elif input_method == InputMethod.combo_dialog_simple_list: self._output_dialog(field_label, value, False) elif input_method == InputMethod.combo_dialog: if not isinstance(value, list): raise "List argument expected. Given: %s" % type(value) self._output_dialog(field_label, value, True) def _test_output_exists(self, field_label: str) -> bool: xpath = self._xpath_label(field_label, self._output_context) elems = self.browser.by_xpath_m(xpath) return len(elems) > 0 def _output_bool(self, field_label: str, value: bool): bool_switch = self.browser.by_xpath(self._xpath_switch(field_label, self._output_context)) current_value = 'is-checked' in bool_switch.get_attribute('class') if current_value != value: bool_switch.click() def _output_text_input(self, field_label: str, value: str): self._output(self._xpath_input(field_label, self._output_context), value) def _output_text_area(self, field_label: str, value: str): self._output(self._xpath_textarea(field_label, self._output_context), value) def _output_combobox(self, field_label: str, value: str): if not self._output(self._xpath_input(field_label, self._output_context), value): return self.browser.wait_ajax() list_item = self._find_active_list(value) list_item.click() self.browser.wait_ajax() self.browser.send_esc() def _output_date_range(self, field_label: str, value1: str, value2: str): element = self.browser.by_xpath(self._xpath_input(field_label, self._output_context)) element.click() element.send_keys(value1) element = element.find_element(By.XPATH, self._XP_RELATIVE_OUTPUT_END) element.click() element.send_keys(value2) self.browser.wait_ajax() self.browser.send_enter() def _output_task_combo(self, label: str, value: str): element = self.browser.by_xpath_m(self._XP_TASK_DATA.format(label))[0] element.click() self.browser.wait_ajax() element = self.browser.by_xpath_m(self._XP_TASK_INPUT.format(label))[0] element.send_keys(value) self.browser.wait_ajax() list_item = self._find_active_list(value) list_item.click() self.browser.wait_ajax() accept_btn = element.find_element(By.XPATH, self._XP_INPUT_ACCEPT) if not self._test_run: accept_btn.click() self.browser.wait_ajax() def _output_date(self, field_label: str, value: str): self._output(self._xpath_input(field_label, self._output_context), value) def _output_dialog(self, field_label, values: list[str], search_flag=True): self._ensure_no_notifications() item = self.browser.by_xpath(self._xpath_combo(field_label, self._output_context)) remove_btns = item.find_elements(By.XPATH, self._XP_RELATIVE_BTN_CLOSE) for existing_item in remove_btns: existing_item.click() btn_plus = item.find_element(By.XPATH, self._XP_RELATIVE_BTN_PLUS) for value in values: btn_plus.click() self.browser.wait_ajax() self._select_dialog_item(value, search_flag) def _output(self, xpath: str, value: str) -> bool: element = self.browser.by_xpath(xpath) element.click() try: element.clear() except InvalidElementStateException: pass element.send_keys(value) return True def _find_active_list(self, value: str): list_items = self.browser.by_xpath_m(self._XP_LIST_ITEM.format(value)) count_active = sum(1 for x in list_items if x.is_displayed()) if count_active > 1: logging.info('%s multiple list results', value) for list_item in list_items: if list_item.is_displayed(): return list_item logging.info('Cannot find list item: %s', value) return None def _find_active_scroller(self, value: str): list_items = self.browser.by_xpath_m(self._XP_SCROLLER_ITEM.format(value)) count_active = sum(1 for x in list_items if x.is_displayed()) if count_active == 0: logging.warning('Cannot find checkbox list item: %s', value) raise NoSuchElementException if count_active > 1: logging.info('%s multiple list results', value) for list_item in list_items: checkbox = list_item.find_elements(By.XPATH, self._XP_SCROLLER_CHECKBOX) if len(checkbox) > 0 and list_item.is_displayed(): return list_item return None def _select_dialog_item(self, value: str, search_flag=True): if search_flag: search = self.browser.by_xpath(self._XP_SEARCH_QUERY) search.send_keys(value) search.send_keys(Keys.ENTER) self.browser.wait_ajax() time.sleep(1) # additional wait for list to load list_item = self._find_active_scroller(value) list_item.click() self.browser.by_xpath(self._XP_DIALOG_BTN_SELECT).click() self.browser.wait_ajax() def _xpath_label(self, field_label: str, context: str) -> str: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_LABEL.format(field_label) else: return self._XP_DIALOG_LABEL.format(context, field_label) def _xpath_input(self, field_label: str, context: str) -> str: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_INPUT.format(field_label) elif context == self._CONTEXT_SEARCH: return self._XP_SEARCH_INPUT.format(field_label) else: return self._XP_DIALOG_INPUT.format(context, field_label) def _xpath_textarea(self, field_label: str, context: str) -> str: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_TEXT_AREA.format(field_label) else: return self._XP_DIALOG_TEXT_AREA.format(context, field_label) def _xpath_combo(self, field_label: str, context: str) -> str: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_COMBO.format(field_label) else: return self._XP_DIALOG_COMBO.format(context, field_label) def _xpath_switch(self, field_label: str, context: str) -> str: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_SWITCH.format(field_label) else: return self._XP_DIALOG_SWITCH.format(context, field_label) def _xpath_finalize_btn(self, context: str) -> str: if not self._test_run: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_BTN_PRIMARY else: return self._XP_DIALOG_BTN_PRIMARY.format(context) else: if context == self._CONTEXT_CONTENT: return self._XP_CONTENT_BTN_CANCEL else: return self._XP_DIALOG_BTN_CANCEL.format(context)