From 379a405ea10210523b158de805c883c863a519f5 Mon Sep 17 00:00:00 2001 From: henrikek Date: Thu, 26 Feb 2026 00:27:12 +0100 Subject: [PATCH] Add settings ES MAX_INDEX_SIZE and RETRY if large content --- ESSArch_Core/config/settings.py | 2 ++ ESSArch_Core/ip/models.py | 1 + ESSArch_Core/ip/tasks.py | 1 - ESSArch_Core/search/ingest.py | 44 ++++++++++++++++++++---- ESSArch_Core/tags/documents.py | 59 +++++++++++++++++++++++++-------- 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/ESSArch_Core/config/settings.py b/ESSArch_Core/config/settings.py index 3ea760624..d7214e851 100644 --- a/ESSArch_Core/config/settings.py +++ b/ESSArch_Core/config/settings.py @@ -328,6 +328,8 @@ } ELASTICSEARCH_BATCH_SIZE = 1000 +ELASTICSEARCH_MAX_INDEX_SIZE = None # Example 100 * 1024 * 1024 # 100MB +ELASTICSEARCH_RETRY_WITHOUT_CONTENT_IF_TOO_LARGE = False # Storage diff --git a/ESSArch_Core/ip/models.py b/ESSArch_Core/ip/models.py index 480d660f6..0d2c89de1 100644 --- a/ESSArch_Core/ip/models.py +++ b/ESSArch_Core/ip/models.py @@ -2777,6 +2777,7 @@ def create_migration_workflow(self, temp_path, storage_methods, export_path='', return ip_migrate_workflow_step + @transaction.atomic def write_to_search_index(self, task): logger = logging.getLogger('essarch.ip') srcdir = self.object_path diff --git a/ESSArch_Core/ip/tasks.py b/ESSArch_Core/ip/tasks.py index 5dfbd2c30..1d73b9c1f 100644 --- a/ESSArch_Core/ip/tasks.py +++ b/ESSArch_Core/ip/tasks.py @@ -583,7 +583,6 @@ def PreserveInformationPackage(self, storage_method_pk, temp_path=None): @app.task(bind=True) -@transaction.atomic def WriteInformationPackageToSearchIndex(self): ip = self.get_information_package() ip.write_to_search_index(self.get_processtask()) diff --git a/ESSArch_Core/search/ingest.py b/ESSArch_Core/search/ingest.py index 3358b14fa..27757dec5 100644 --- a/ESSArch_Core/search/ingest.py +++ b/ESSArch_Core/search/ingest.py @@ -3,7 +3,7 @@ import uuid from django.conf import settings -from elasticsearch.exceptions import ElasticsearchException +from elasticsearch import ConnectionError, RequestError, TransportError from ESSArch_Core.fixity.format import FormatIdentifier from ESSArch_Core.tags.documents import Directory, File @@ -51,17 +51,49 @@ def index_document(tag_version, filepath, index_file_content=True): doc = File.from_obj(tag_version) + MAX_INDEX_SIZE = getattr(settings, 'ELASTICSEARCH_MAX_INDEX_SIZE', None) + + if index_file_content and MAX_INDEX_SIZE and size > MAX_INDEX_SIZE: + logger.info(f'Skipping content indexing due to size for {filepath}') + index_file_content = False + try: if index_file_content: with open(filepath, 'rb') as f: doc = File.enrich_with_content(doc, file_obj=f) - doc.save() else: - logger.debug('Skip to index file content for {}'.format(filepath)) - doc.save() - except ElasticsearchException: - logger.exception('Failed to index {}'.format(filepath)) + logger.debug(f'Skip to index file content for {filepath}') + doc.save() + + except TransportError as e: + if e.status_code == 413: + logger.error(f'Document too large to index: {filepath}') + RETRY_WITHOUT_CONTENT_IF_TOO_LARGE = getattr(settings, + 'ELASTICSEARCH_RETRY_WITHOUT_CONTENT_IF_TOO_LARGE', + False) + if index_file_content and RETRY_WITHOUT_CONTENT_IF_TOO_LARGE: + logger.info(f'Retrying without file content for {filepath}') + try: + doc = File.from_obj(tag_version) + doc.save() + except Exception: + logger.exception(f'Retry without content also failed for {filepath}') + raise + else: + logger.error(f'Not retrying without content for {filepath} due to settings') + raise + else: + logger.exception(f'Elasticsearch transport error indexing {filepath}') + raise + + except (ConnectionError, RequestError): + logger.exception(f'Elasticsearch connection/request error indexing {filepath}') + raise + + except Exception: + logger.exception(f'Unexpected error indexing {filepath}') raise + return doc, tag_version diff --git a/ESSArch_Core/tags/documents.py b/ESSArch_Core/tags/documents.py index d9bea439b..8bf5f5c20 100644 --- a/ESSArch_Core/tags/documents.py +++ b/ESSArch_Core/tags/documents.py @@ -1,4 +1,5 @@ import logging +import time from urllib.parse import urljoin import requests @@ -24,6 +25,13 @@ from ESSArch_Core.search.documents import DocumentBase from ESSArch_Core.tags.models import StructureUnit, TagVersion +from ESSArch_Core.util import ( + pretty_mb_per_sec, + pretty_size, + pretty_time_to_sec, +) + +MB = 1024 * 1024 ngram_tokenizer = tokenizer('custom_ngram_tokenizer', type='ngram', min_gram=3, max_gram=3) @@ -415,24 +423,47 @@ def from_obj(cls, obj, archive=None): @classmethod def enrich_with_content(cls, doc, file_obj): + logger = logging.getLogger('essarch.search') base_url = getattr(settings, 'TIKA_URL', 'http://localhost:9998') TIKA_URL = urljoin(base_url.rstrip('/') + '/', 'tika') file_obj.seek(0) - response = requests.put( - TIKA_URL, - data=file_obj, - headers={"Accept": "text/plain; charset=utf-8"}, - timeout=600 - ) - if response.status_code == 200: - doc.content = response.content.decode("utf-8") - else: - logger = logging.getLogger('essarch.search') - logger.warning('Failed to extract content for file with id %s using Tika, status code: %s, response: %s', - doc.id, response.status_code, response.text) - raise Exception('Failed to extract content for file with id {}, status code: {}, response: {}'.format( - doc.id, response.status_code, response.text)) + try: + time_start = time.time() + response = requests.put( + TIKA_URL, + data=file_obj, + headers={"Accept": "text/plain; charset=utf-8"}, + timeout=600 + ) + time_end = time.time() + time_elapsed = time_end - time_start + fsize_mb = doc.size / MB + try: + mb_per_sec = fsize_mb / time_elapsed + except ZeroDivisionError: + mb_per_sec = fsize_mb + + if response.status_code == 200: + doc.content = response.content.decode("utf-8") + response_size_bytes = len(response.content) + response_size_chars = len(doc.content) + logger.info( + f'Extracted content for file name {doc.filename} ({doc.id}), size: ' + f'{pretty_size(response_size_bytes)}, decoded: {response_size_chars} characters, filesize: ' + f'{pretty_size(doc.size)}, speed: {pretty_mb_per_sec(mb_per_sec)} MB/Sec ' + f'({pretty_time_to_sec(time_elapsed)} sec)') + else: + logger.warning( + f'Failed to extract content for file name {doc.filename} ({doc.id}) using Tika, ' + f'status code: {response.status_code}, response: {response.text}') + raise Exception( + f'Failed to extract content for file with name {doc.filename} ({doc.id}), ' + f'status code: {response.status_code}, response: {response.text}') + + except requests.RequestException as e: + logger.error(f'Request to TIKA failed for file name {doc.filename} ({doc.id}), error: {str(e)}') + raise return doc