Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ESSArch_Core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@
}

ELASTICSEARCH_BATCH_SIZE = 1000
ELASTICSEARCH_MAX_INDEX_SIZE = None # Example 100 * 1024 * 1024 # 100MB
ELASTICSEARCH_RETRY_WITHOUT_CONTENT_IF_TOO_LARGE = False

# Storage

Expand Down
1 change: 1 addition & 0 deletions ESSArch_Core/ip/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ def create_migration_workflow(self, temp_path, storage_methods, export_path='',

return ip_migrate_workflow_step

@transaction.atomic
def write_to_search_index(self, task):
logger = logging.getLogger('essarch.ip')
srcdir = self.object_path
Expand Down
1 change: 0 additions & 1 deletion ESSArch_Core/ip/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ def PreserveInformationPackage(self, storage_method_pk, temp_path=None):


@app.task(bind=True)
@transaction.atomic
def WriteInformationPackageToSearchIndex(self):
ip = self.get_information_package()
ip.write_to_search_index(self.get_processtask())
Expand Down
44 changes: 38 additions & 6 deletions ESSArch_Core/search/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid

from django.conf import settings
from elasticsearch.exceptions import ElasticsearchException
from elasticsearch import ConnectionError, RequestError, TransportError

from ESSArch_Core.fixity.format import FormatIdentifier
from ESSArch_Core.tags.documents import Directory, File
Expand Down Expand Up @@ -51,17 +51,49 @@ def index_document(tag_version, filepath, index_file_content=True):

doc = File.from_obj(tag_version)

MAX_INDEX_SIZE = getattr(settings, 'ELASTICSEARCH_MAX_INDEX_SIZE', None)

if index_file_content and MAX_INDEX_SIZE and size > MAX_INDEX_SIZE:
logger.info(f'Skipping content indexing due to size for {filepath}')
index_file_content = False

try:
if index_file_content:
with open(filepath, 'rb') as f:
doc = File.enrich_with_content(doc, file_obj=f)
doc.save()
else:
logger.debug('Skip to index file content for {}'.format(filepath))
doc.save()
except ElasticsearchException:
logger.exception('Failed to index {}'.format(filepath))
logger.debug(f'Skip to index file content for {filepath}')
doc.save()

except TransportError as e:
if e.status_code == 413:
logger.error(f'Document too large to index: {filepath}')
RETRY_WITHOUT_CONTENT_IF_TOO_LARGE = getattr(settings,
'ELASTICSEARCH_RETRY_WITHOUT_CONTENT_IF_TOO_LARGE',
False)
if index_file_content and RETRY_WITHOUT_CONTENT_IF_TOO_LARGE:
logger.info(f'Retrying without file content for {filepath}')
try:
doc = File.from_obj(tag_version)
doc.save()
except Exception:
logger.exception(f'Retry without content also failed for {filepath}')
raise
else:
logger.error(f'Not retrying without content for {filepath} due to settings')
raise
else:
logger.exception(f'Elasticsearch transport error indexing {filepath}')
raise

except (ConnectionError, RequestError):
logger.exception(f'Elasticsearch connection/request error indexing {filepath}')
raise

except Exception:
logger.exception(f'Unexpected error indexing {filepath}')
raise

return doc, tag_version


Expand Down
59 changes: 45 additions & 14 deletions ESSArch_Core/tags/documents.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time
from urllib.parse import urljoin

import requests
Expand All @@ -24,6 +25,13 @@

from ESSArch_Core.search.documents import DocumentBase
from ESSArch_Core.tags.models import StructureUnit, TagVersion
from ESSArch_Core.util import (
pretty_mb_per_sec,
pretty_size,
pretty_time_to_sec,
)

MB = 1024 * 1024

ngram_tokenizer = tokenizer('custom_ngram_tokenizer', type='ngram', min_gram=3,
max_gram=3)
Expand Down Expand Up @@ -415,24 +423,47 @@ def from_obj(cls, obj, archive=None):

@classmethod
def enrich_with_content(cls, doc, file_obj):
logger = logging.getLogger('essarch.search')
base_url = getattr(settings, 'TIKA_URL', 'http://localhost:9998')
TIKA_URL = urljoin(base_url.rstrip('/') + '/', 'tika')
file_obj.seek(0)
response = requests.put(
TIKA_URL,
data=file_obj,
headers={"Accept": "text/plain; charset=utf-8"},
timeout=600
)

if response.status_code == 200:
doc.content = response.content.decode("utf-8")
else:
logger = logging.getLogger('essarch.search')
logger.warning('Failed to extract content for file with id %s using Tika, status code: %s, response: %s',
doc.id, response.status_code, response.text)
raise Exception('Failed to extract content for file with id {}, status code: {}, response: {}'.format(
doc.id, response.status_code, response.text))
try:
time_start = time.time()
response = requests.put(
TIKA_URL,
data=file_obj,
headers={"Accept": "text/plain; charset=utf-8"},
timeout=600
)
time_end = time.time()
time_elapsed = time_end - time_start
fsize_mb = doc.size / MB
try:
mb_per_sec = fsize_mb / time_elapsed
except ZeroDivisionError:
mb_per_sec = fsize_mb

if response.status_code == 200:
doc.content = response.content.decode("utf-8")
response_size_bytes = len(response.content)
response_size_chars = len(doc.content)
logger.info(
f'Extracted content for file name {doc.filename} ({doc.id}), size: '
f'{pretty_size(response_size_bytes)}, decoded: {response_size_chars} characters, filesize: '
f'{pretty_size(doc.size)}, speed: {pretty_mb_per_sec(mb_per_sec)} MB/Sec '
f'({pretty_time_to_sec(time_elapsed)} sec)')
else:
logger.warning(
f'Failed to extract content for file name {doc.filename} ({doc.id}) using Tika, '
f'status code: {response.status_code}, response: {response.text}')
raise Exception(
f'Failed to extract content for file with name {doc.filename} ({doc.id}), '
f'status code: {response.status_code}, response: {response.text}')

except requests.RequestException as e:
logger.error(f'Request to TIKA failed for file name {doc.filename} ({doc.id}), error: {str(e)}')
raise

return doc

Expand Down
Loading