Introduction

Managing Elasticsearch indexes can be a challenge if you’re in a small team. Especially when index templates are changing from time to time. How does the new data end up using the new index templates, who is creating the new indexes and how is it done via code?

At the company I work at, at the time of writing, we send data via our API into Elasticsearch. When our data structure needs a change, we update this as code and magic happens. Allow me to show you how.

Setup

Prerequisites

Before you start you need the following:

  • GitHub or any other git version control tool
  • Kubernetes
  • Argo Workflows installed on kubernetes docs
  • Argo Events installed on kubernetes docs
  • Flux installed on kubernetes docs
  • Elasticsearch installed on kubernetes (via eck operator)

Setting up Elasticsearch templates

We’ve created a GitHub repository elastic-mappings that contain both the scripts and templates to use within Elasticsearch.

Our structure looks similar to:

.
├── README.md
├── components
│   ├── defaults.json
│   ├── ip.json
├── index_templates
│   ├── development
│   │   └── our-index.json
│   └── production
│       └── our-index.json
├── lifecyles
│   └── lc-14-days.json
├── pipelines
│   └── enrichment.json
└── scripts
    ├── components.sh
    ├── deploy.sh
    ├── index_templates.sh
    ├── lifecycles.sh
    └── pipelines.sh
Folder Description
components These are index template components
index_templates These are the definitions of the index templates, they’re consume the template components
lifecycles The definitions of life cycle policies
pipelines The Elasticsearch pipeline definitions
scripts Scripts to call the Elasticsearch API to PUT the templates

All the templates in JSON are the same as you’d write them in the dev toolbox queries.

The scripts are just simple curl commands. With a pipeline.sh that gets called within Argo Workflows to orchestrate all scripts.

Setting up Argo Events

Argo Events will receive a webhook call from flux, to be configured later, and this call will trigger a Argo Workflow to be deployed.

The files below will create the Sensor, EventSource and the required WorkflowTemplates. I’ve left out the ServiceAccount, RoleBindings and ClusterRoleBindings. As they can differ per environment.


apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: elastic-mappings
spec:
  dependencies:
  - name: event
    eventSourceName: event
    eventName: repository
  template:
    serviceAccountName: argo
  triggers:
  - template:
      name: ci-elastic-mappings
      k8s:
        group: argoproj.io
        version: v1alpha1
        resource: workflows
        operation: create
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: updater-
              labels:
                app.kubernetes.io/name: tests
                app.kubernetes.io/component: ci
                app.kubernetes.io/instance: tests
                app.kubernetes.io/managed-by: flux
            spec:
              workflowTemplateRef:
                name: pipeline-template

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: event
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  webhook:
    repository:
      port: "12000"
      endpoint: /event
      method: POST

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: pipeline-template
spec:
  serviceAccountName: argo
  onExit: exit-handler
  entrypoint: pipeline
  templates:
  - name: pipeline
    steps:
    - - name: updater
        templateRef:
          name: updater
          template: updater
        arguments:
          parameters:
          - name: environment
            value: dummy

  - name: exit-handler
    steps:
    - - name: notify
        templateRef:
          name: exit-handler-template
          template: exit-handler
          clusterScope: true
        arguments:
          parameters:
          - name: channel
            value: ci-notifications
          - name: message
            value: "ELK templates updated"
          - name: status
            value: "{{workflow.status}}"
          - name: duration
            value: "{{workflow.duration}}"
          - name: failures
            value: "{{workflow.failures}}"

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: updater
spec:
  templates:
  - name: updater
    inputs:
      parameters:
      - name: environment
      artifacts:
      - name: git-repo
        path: /src
        git:
          repo: "[email protected]:Username/elastic-mappings"
          revision: "master"
          sshPrivateKeySecret:
            name: ci-key-rsa
            key: identity
    container:
      image: bitnami/kubectl:1.23.5
      workingDir: /src
      command:
      - /bin/bash
      - -c
      env:
      - name: ES_ENV
        value: "{{inputs.parameters.environment}}"
      - name: ES_HOST
        value: https://elasticsearch-es-http.elk-stack.svc:9200
      - name: ES_USER
        value: elastic
      - name: ES_PASSWORD
        valueFrom:
          secretKeyRef:
            name: elasticsearch-es-elastic-user
            key: elastic
      args:
      - |
        set -eu

        ./scripts/deploy.sh ${ES_HOST}

        for index in our-index; do
          # Create a new index with the version as a suffix, using the new index templates
          curl -X PUT "${ES_HOST}/${index}-$(jq -rc '.version' < index_templates/{{inputs.parameters.environment}}/${index}.json)?pretty" \
            --silent \
            --user "${ES_USER}:${ES_PASSWORD}" \
            --insecure \
            -H 'Content-Type: application/json' || echo "Index already exists"
        done

        set -x
        cat <<EOF | kubectl apply -f -
        apiVersion: v1
        kind: ConfigMap
        metadata:
          name: elastic-indexes
          namespace: api
        data:
          our-index: "$(jq -rc '.version' < index_templates/{{inputs.parameters.environment}}/our-index.json)"
        EOF

And in turn when there is a POST call being done on the http://event-eventsource-svc.elastic-mappings.svc.cluster.local:12000/event URL, the Sensor will deploy a Workflow. And at the very end there is a commit into a ConfigMap that hosts the version number(s) of the index(es). This is how our API (via an kubernetes informer) knows which index to pick.

Setting up Flux

To let flux call a generic webhook at you need to create a Alert and an Provider of the notification.toolkit.fluxcd.io custom resources.

The Alert will be triggered upon changes of the specified GitRepository and uses the Provider to send out the notification.


apiVersion: source.toolkit.fluxcd.io/v1beta1
kind: GitRepository
metadata:
  name: elastic-mappings
  namespace: flux-system
spec:
  interval: 1m
  url: ssh://[email protected]/Username/elastic-mappings.git
  ref:
    branch: master
  secretRef:
    name: ci-key-rsa

apiVersion: notification.toolkit.fluxcd.io/v1beta1
kind: Alert
metadata:
  name: elastic-mappings
  namespace: flux-system
spec:
  providerRef:
    name: elastic-mappings
  eventSeverity: info
  eventSources:
  - kind: GitRepository
    name: 'elastic-mappings'
  exclusionList:
  - ".*unable to clone.*waiting.*socket"
  - ".*Dependencies do not meet ready condition.*"

apiVersion: notification.toolkit.fluxcd.io/v1beta1
kind: Provider
metadata:
  name: elastic-mapping
    namespace: flux-system
    spec:
      type: generic
        address: http://event-eventsource-svc.index-updater.svc.cluster.local:12000/event

Flow diagram

G R A I F e r T l p g W u o o e C x s b o i E h m G t v o m i o e o i t r n k t y t s F l u x A r A g S l o e e n r E s t v o e r n t s F l u x A P r r g o o v i W d o e r r k f l o w E K C l u o a b n s e f t r i i n g c e M s t a e e p a s r c h i n f o r m e A r p p : A P I