Blitzschnelle Suchergebnisse mit Elasticsearch: Eine Refactoring-Story

Ein kurzer Überblick über die Umsetzung wichtiger Features im Projekt mit Elasticsearch

Harsh Bhimjyani
Harsh Bhimjyani
, 23. Jun 2020

In diesem Artikel wirst du erfahren, wie wir die Probleme und Komplexitäten angegangen sind, die bei der Erstellung eines Backends mit Elasticsearch in einem großen und komplexen Projekt auftreten.

Super-schnelle Suchergebnisse

Unser Projekt bei Ambient Innovation ist ein großangelegtes Customer Relationship Management Tool (CRM) und die Art von Projekt, das jenseits der 20.000 Commits liegt. Ich erinnere mich an eine Rede auf der DjangoCon EU 2019 über die Wartung eines so großen Projekts und scheinbar leisten wir dabei eine recht solide Arbeit.

Die Liste der Suchergebnisse ist einer der wichtigsten und am häufigsten verwendeten Teile davon. Sie ist dahin gehend anpassbar, welche Daten man darin sehen möchte, es gibt verschiedene Filter und natürlich die Freitextsuche.

In unserem Projekt, das mit Django gebaut wurde, gab es für die Liste die ein oder andere komplexe Suchanfrage. Einige konnten über das ORM umgesetzt werden und für den Rest hatten wir Raw Queries. Da die Anzahl der Benutzer und der Bedarf an neuen Funktionen zunahmen, gab es Schwierigkeiten mit den Reaktionszeiten der Liste, sodass Refactoring die beste Lösung war. Wie der Titel dieses Artikels vermuten lässt, fiel die Wahl bei uns auf Elasticsearch.

Beginnen wir mit dem Definieren der Probleme und Aufgaben:

  • Effiziente Aktualisierung des Index und Bulk-Updates
  • Aktualisierung von Index-Zuordnungen ohne Downtime
  • Bauen von Freitextsuche und Filtern

Da es sich um ein Django-Projekt handelt, verwenden wir elasticsearch-dsl, da es ein gutes ORM für den Zugriff auf Elasticsearch mit einem auf unsere Bedürfnisse zugeschnittenen Managementsystem bietet. Das System verwaltet unsere Index-Mappings und die Indizierung. Beachten sollte man hier, das Dokument so schlank wie möglich zu halten und nur die Daten zu indexieren, die man für das Funktionieren der Features benötigt, da jegliche zusätzlichen Daten die Indizierungsdauer erhöhen können.

Effiziente Aktualisierung des Index und Umgang mit Bulk Updates

In Ordnung, unser Index ist also eingerichtet, die Daten sind indiziert, aber wir müssen sie mit der Datenbank synchron halten. Es gibt grundsätzlich zwei Möglichkeiten: entweder man aktualisiert das Dokument bei jedem Speichern des Django-Modells oder man richtet einen Cron-Job von x Stunden ein, um die Instanzen zu aktualisieren, die in den letzten x Stunden aktualisiert wurden.

Wir haben uns dafür entschieden, dies bei jedem Speichern zu tun.

Die Lösung

Man könnte annehmen, dass man am einfachsten post_save-Signale an den Django-Modellen einrichtet und das gesamte Dokument aktualisiert. Das würde bedeuten, das gesamte JSON-Dokument zu generieren und eine Aktualisierungsanfrage an Elasticsearch zu stellen. Das würde aber auch unnötige Datenbankabfragen zur Generierung dieses JSON-Dokuments bedeuten.

Was wir gemacht haben ist Folgendes: wir haben eine Teilmenge eines JSON generiert, die dem Schema auf dem Elasticsearch-Dokument entspricht und nur die Daten enthält, die geändert wurden. Dann führen wir die Aktualisierungsabfrage auf Elasticsearch aus, um dieses Teil-Update durchzuführen.

Hier ein Code-Beispiel zur Veranschaulichung:

"""
Assume the structure of the document looks like this

{
    "user": {
        "first_name": "Foo",
        "last_name": "Bar",
        "age": "30",
        "city": {
            "id": 5,
            "name": "Cologne",
            "slug": "cologne",
            "country": {
                "id": 1,
                "name": "Germany",
                "slug": "germany"
            }
        }
    },
    "tweet": "Some awesome tweet",
    "created_at": "2019-10-07 12:00"
    ... and some more data
}

What if the user changed his age and the city he lives in.
So we will update only the user node in the structure with the User model instance
and we don't need the rest of the model instances.

The resulting sub-json structure with new data should look like
{
    "user": {
        "age": "31",
        "city": {
            "id": 6,
            "name": "Hamburg",
            "slug": "hamburg",
        }
    }
}

"""
from elasticsearch_dsl.connections import connections


# How a simple function would look like
def update_current_document(id, instance, fields_changed=[]):
    """
    Arguments:
        id: {integer} -- id of the document. Ideally should be the id of the root object in the document
        instance {Model instance} -- The updated user model object
        fields_changed {list} -- list of the model fields changed
    """

    sub_data = generate_sub_dictionary(instance, fields_changed)
    client = connections.create_connection()
    client.update(index='tweets', doc_type='doc', id=id, body={'doc': sub_data})


def generate_sub_dictionary(instance, fields_changed=[]):
    """
    This method assumes you have the structure of the Elasticsearch document saved somewhere
    as a dictionary and you are generating a sub dictionary with values using a recursive backtracking algorithm.
    """

Was tun, wenn Konflikte auftreten?

Eines der Dinge, auf die wir beim Ausführen dieser Updates gestoßen sind, waren Versionskonflikte. Jedes Dokument in Elasticsearch hat eine Versionsnummer, die bei jeder Änderung eines Dokuments inkrementiert wird. Bei einer Query für ein Dokument in Elasticsearch wird in der Antwort auch die Versionsnummer angegeben. Wenn man es nun mit einer Versionsnummer aktualisieren möchte, wird erwartet, dass ein Dokument mit der gleichen Version im Index vorhanden ist. Wenn die Versionsnummer nicht übereinstimmt oder wenn sie höher ist als die aktuelle, wird ein Versionskonfliktfehler auftreten.

version conflict, current version [2] is different than the one provided [1]

Was also tun?

  1. Ein Weg wäre, eine Aktualisierungsabfrage für den Index zu starten, die den Index aktualisiert und die neuesten Daten für die Suche verfügbar macht. Standardmäßig beträgt das Aktualisierungsintervall von Elasticsearch eine Sekunde, man kann dies aber auch direkt abrufen, wenn man nicht warten möchte.
  2. Der andere Weg, den wir einschlugen, bestand darin, diese eine Sekunde zu warten und die Aktualisierung zu wiederholen.

Bulk Updates

Nehmen wir das Dokument aus dem vorherigen Beispiel. Wenn wir den Namen der Stadt aktualisieren, dann müssen wir alle Dokumente im Index aktualisieren, in denen diese Stadt vorkommt. Hmm, es ist wohl keine gute Idee, jede einzelne von ihnen durchzugehen und eine Aktualisierungsabfrage zu machen. Wir verwenden update by query von Elasticsearch.

So sieht das bei uns aus:

from elasticsearch_dsl import connections, UpdateByQuery


def bulk_update_docs(ids, instance, fields_changed=[]):
    """
    Arguments:
        ids: {list} -- list of ids of documents we want to update
        instance {Model instance} -- The updated user model object
        fields_changed {list} -- list of the model fields changed
    """

    body = generate_sub_dictionary(instance, fields_changed)
    client = connections.create_connection()
    qry = UpdateByQuery(using=client, index='tweets').params(conflicts='proceed').query('terms', _id=ids)
    
    # update_by_query uses scripts, so we need to generate the formatted source string that matches our body structure
    source = generate_source_string(body)
    qry = qry.script(source=source, params=body)


def generate_source_string(body):
    """
    Recursive algorithm to generate source strings for creating the bulk update call to ES.
    Input:
    {
        'user': {
            'city': {
                'name': 'New Cologne',
                'slug': 'new-cologne',
            }
        }
    }

    Result:
    => ['ctx._source.user.city.name=params.user.city.name', 'ctx._source.user.city.slug=params.user.city.slug']
    """

Aktualisierung von Index-Mappings ohne Downtime

Unser Index ist jetzt also mit der Datenbank synchronisiert. Was passiert aber, wenn wir nun Zuordnungen im Index ändern?

Idealerweise sollten wir das Mapping in den Elasticsearch-Indizes nicht einfach während der Laufzeit aktualisieren können, also müssen wir die gesamten Daten erneut indizieren. Da dies keine wirklich gute Lösung ist, hat Elasticsearch eine eigene Lösung dafür, nämlich die Reindex-API. Wir können jedoch nicht einfach das Neuindexieren auf dem Index anwenden.

Wir müssen einen neuen temporären Index erstellen und die Daten darin neu indizieren sowie den Alias aktualisieren. Index-Aliasse ermöglichen es uns, den Index mit diesem Namen anstelle des tatsächlichen Indexnamens abzufragen.

Nehmen wir an, man möchte seinen Index mit dem Namen „Tweets“ abfragen. Tweets ist also unser Alias, und der Indexname wäre tweets_v1. Wenn wir ein Mapping in unserem Index ändern wollen, dann erstellen wir einen neuen Index mit dem Namen tweets_v2 und kopieren alle Daten mit Hilfe der Reindex-API in diesen Index. Dann verküpfen wir den Tweets-Alias damit und löschen den alten Index. So einfach ist das.

Freitextsuche und Filter

Unser Ziel war es, dass die Freitextsuche auch dann funktioniert, wenn wir nur einen Teil des Wortes eingeben, wie zum Beispiel __icontains beim Django ORM. Dafür haben wir eine Reihe von Analyzer für die Felder verwendet, die wir durchsuchbar machen wollten. Ein Analyzer teilt/formatiert die Wörter in mehrere Token, die dann mit unserem Suchbegriff abgeglichen werden. Um die Wörter für diese partielle Wortübereinstimmung aufzubrechen, haben wir den ngram tokenizer von Elasticsearch verwendet. Dies kann leicht mit elasticsearch-dsl umgesetzt werden:

# an example from elasticsearch-dsl docs
from elasticsearch_dsl import analyzer


html_strip = analyzer('html_strip',
    tokenizer="standard",
    filter=["standard", "lowercase", "stop", "snowball"],
    char_filter=["html_strip"]
)

class Post(Document):
    title = Text()
    category = Text(
        analyzer=html_strip
    )

Für die Filter verwenden wir die Faceted Search von Elasticsearch, die Aggregationen zur Erzeugung von Filtern verwendet. Elasticsearch-dsl bietet eine nette Möglichkeit, unsere Faceted Search mit Aggregationen zu generieren:

from elasticsearch_dsl import FacetedSearch, TermsFacet


class TweetSearch(FacetedSearch):

    facets = {
        # use bucket aggregations to define facets
        'city': TermsFacet(field='user.city.name'),
    }

    def search(self):
        s = super().search()
        # override the search queryset here.

Allerdings möchten wir den Namen der Stadt auch suchbar machen, also analysierten wir ihn und indexierten ihn als Textfeld. Begriffsaggregationen benötigen jedoch ein Keyword-Feld. Wir verwendeten die Multi-Fields-API von Elasticsearch, um dieses Problem zu lösen.

# we also index a keyword version of the field
name = Text(
    analyzer=my_custom_analyzer,
    fields={'keyword': es.Keyword()}
)


# and use it with facets like this
'city': TermsFacet(field='user.city.name.keyword'),

# so now we can use the same field for free text search as well as filters

Dies war eine kurzer Überblick darüber, wie wir ein wichtiges Feature unseres Projekts mit Elasticsearch im Refactoring-Verfahren umgesetzt haben.

Vielen Dank an Ronny Vedrilla und Denis Anuschewski.