diff --git a/CHANGES.rst b/CHANGES.rst index f074283..3267a9e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,7 @@ Next release ============ +- Support custom analyzers in Elasticsearch - Fix bug '_csv_split not found' 0.13.2 diff --git a/annotator/annotation.py b/annotator/annotation.py index ff26913..eec5e05 100644 --- a/annotator/annotation.py +++ b/annotator/annotation.py @@ -35,7 +35,6 @@ } } - class Annotation(es.Model): __type__ = TYPE diff --git a/annotator/elasticsearch.py b/annotator/elasticsearch.py index 077b7d4..d3a65b9 100644 --- a/annotator/elasticsearch.py +++ b/annotator/elasticsearch.py @@ -66,6 +66,67 @@ def conn(self): self._connection = self._connect() return self._connection + def drop_all(self): + """Delete the index and its contents""" + if self.conn.indices.exists(self.index): + self.conn.indices.close(self.index) + self.conn.indices.delete(self.index) + + def create_all(self, models, analysis_settings): + mappings = _compile_mappings(models) + + # Test for index existence while also checking if connection works + try: + index_exists = self.conn.indices.exists(self.index) + except elasticsearch.exceptions.ConnectionError as e: + msg = ('Can not access ElasticSearch at {0}! ' + 'Check to ensure it is running.').format(self.host) + raise elasticsearch.exceptions.ConnectionError('N/A', msg, e) + + if not index_exists: + # If index does not yet exist, simply create the index + self.conn.indices.create(self.index, body={ + 'mappings': mappings, + 'settings': {'analysis': analysis_settings}, + }) + else: + # Otherwise, update its settings and mappings + self._update_analysis(analysis_settings) + self._update_mappings(mappings) + + def _update_analysis(self, analysis): + """Update analyzers and filters""" + settings = self.conn.indices.get_settings(index=self.index).values()[0] + existing = settings['settings']['index'].get('analysis', {}) + # Only bother if new settings would differ from existing settings + if not self._analysis_up_to_date(existing, analysis): + try: + self.conn.indices.close(index=self.index) + self.conn.indices.put_settings(index=self.index, + body={'analysis': analysis}) + finally: + self.conn.indices.open(index=self.index) + + def _update_mappings(self, mappings): + """Update mappings. + + Warning: can explode because of a MergeMappingError when mappings are + incompatible""" + for doc_type, body in mappings.items(): + self.conn.indices.put_mapping( + index=self.index, + doc_type=doc_type, + body=body + ) + + @staticmethod + def _analysis_up_to_date(existing, analysis): + """Tell whether existing analysis settings are up to date""" + new_analysis = existing.copy() + for section, items in analysis.items(): + new_analysis.setdefault(section,{}).update(items) + return new_analysis == existing + class _Model(dict): """Base class that represents a document type in an ElasticSearch index. @@ -74,7 +135,7 @@ class _Model(dict): __type__ -- The name of the document type __mapping__ -- A mapping of the document's fields - Mapping: Calling create_all() will create the mapping in the index. + Mapping: One field, 'id', is treated specially. Its value will not be stored, but be used as the _id identifier of the document in Elasticsearch. If an item is indexed without providing an id, the _id is automatically @@ -87,25 +148,6 @@ class _Model(dict): with 'analyzer':'standard'. """ - @classmethod - def create_all(cls): - log.info("Creating index '%s'." % cls.es.index) - conn = cls.es.conn - try: - conn.indices.create(cls.es.index) - except elasticsearch.exceptions.RequestError as e: - # Reraise anything that isn't just a notification that the index - # already exists (either as index or as an alias). - if not (e.error.startswith('IndexAlreadyExistsException') - or e.error.startswith('InvalidIndexNameException')): - log.fatal("Failed to create an Elasticsearch index") - raise - log.warn("Index creation failed as index appears to already exist.") - mapping = cls.get_mapping() - conn.indices.put_mapping(index=cls.es.index, - doc_type=cls.__type__, - body=mapping) - @classmethod def get_mapping(cls): return { @@ -122,10 +164,8 @@ def get_mapping(cls): } @classmethod - def drop_all(cls): - if cls.es.conn.indices.exists(cls.es.index): - cls.es.conn.indices.close(cls.es.index) - cls.es.conn.indices.delete(cls.es.index) + def get_analysis(cls): + return getattr(cls, '__analysis__', {}) # It would be lovely if this were called 'get', but the dict semantics # already define that method name. @@ -215,6 +255,18 @@ def make_model(es): return type('Model', (_Model,), {'es': es}) +def _compile_mappings(models): + """Collect the mappings from the models""" + mappings = {} + for model in models: + mappings.update(model.get_mapping()) + return mappings + + +def _csv_split(s, delimiter=','): + return [r for r in csv.reader([s], delimiter=delimiter)][0] + + def _build_query(query, offset, limit): # Create a match query for each keyword match_clauses = [{'match': {k: v}} for k, v in iteritems(query)] diff --git a/annotator/elasticsearch_analyzers.py b/annotator/elasticsearch_analyzers.py new file mode 100644 index 0000000..5318a09 --- /dev/null +++ b/annotator/elasticsearch_analyzers.py @@ -0,0 +1,9 @@ +"""Custom Elasticsearch analyzers that can be used for indexing fields in + models (Annotation, Document). +""" + +ANALYSIS = { + 'analyzer': {}, + 'filter': {}, + 'tokenizer': {}, +} diff --git a/annotator/reindexer.py b/annotator/reindexer.py index b4283e1..162cedd 100644 --- a/annotator/reindexer.py +++ b/annotator/reindexer.py @@ -4,11 +4,13 @@ from .annotation import Annotation from .document import Document +from .elasticsearch_analyzers import ANALYSIS class Reindexer(object): es_models = Annotation, Document + analysis_settings = ANALYSIS def __init__(self, conn, interactive=False): self.conn = conn @@ -60,7 +62,9 @@ def alias(self, index, alias): def get_index_config(self): # Configure index mappings - index_config = {'mappings': {}} + index_config = {'mappings': {}, + 'settings': {'analysis': self.analysis_settings}, + } for model in self.es_models: index_config['mappings'].update(model.get_mapping()) return index_config diff --git a/run.py b/run.py index 3af8aad..7d92f88 100755 --- a/run.py +++ b/run.py @@ -20,7 +20,8 @@ from flask import Flask, g, current_app import elasticsearch -from annotator import es, annotation, auth, authz, document, store +from annotator import es, annotation, auth, authz, document, \ + elasticsearch_analyzers, store from tests.helpers import MockUser, MockConsumer, MockAuthenticator from tests.helpers import mock_authorizer @@ -60,20 +61,19 @@ def main(): if app.config.get('AUTHZ_ON') is not None: es.authorization_enabled = app.config['AUTHZ_ON'] - with app.test_request_context(): - try: - annotation.Annotation.create_all() - document.Document.create_all() - except elasticsearch.exceptions.RequestError as e: - if e.error.startswith('MergeMappingException'): - date = time.strftime('%Y-%m-%d') - log.fatal("Elasticsearch index mapping is incorrect! Please " - "reindex it. You can use reindex.py for this, e.g. " - "python reindex.py --host {0} {1} {1}-{2}".format( - es.host, - es.index, - date)) - raise + try: + es.create_all(models=[annotation.Annotation, document.Document], + analysis_settings=elasticsearch_analyzers.ANALYSIS) + except elasticsearch.exceptions.RequestError as e: + if e.error.startswith('MergeMappingException'): + date = time.strftime('%Y-%m-%d') + log.fatal("Elasticsearch index mapping is incorrect! Please " + "reindex it. You can use reindex.py for this, e.g. " + "python reindex.py --host {0} {1} {1}-{2}".format( + es.host, + es.index, + date)) + raise @app.before_request def before_request(): diff --git a/tests/__init__.py b/tests/__init__.py index 9b37303..6522f42 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1,8 @@ import os from flask import Flask, g, request -from annotator import es, auth, authz, annotation, store, document +from annotator import es, auth, authz, annotation, document, \ + elasticsearch_analyzers, store from .helpers import MockUser, MockConsumer @@ -30,15 +31,13 @@ class TestCase(object): @classmethod def setup_class(cls): cls.app = create_app() - annotation.Annotation.drop_all() - document.Document.drop_all() + es.drop_all() def setup(self): - annotation.Annotation.create_all() - document.Document.create_all() + es.create_all(models=[annotation.Annotation, document.Document], + analysis_settings=elasticsearch_analyzers.ANALYSIS) es.conn.cluster.health(wait_for_status='yellow') self.cli = self.app.test_client() def teardown(self): - annotation.Annotation.drop_all() - document.Document.drop_all() + es.drop_all()