Elasticsearch’s ingest pipelines are a powerful mechanism to pre-process documents before indexing them. Whether you’re shipping application logs, metrics, or structured events, ingest pipelines give you a flexible, scalable way to enrich, transform, or drop data at the ingestion phase.

In this article, we’ll explore:

  • What is an Elasticsearch Ingest Pipeline?
  • How to configure ingest pipelines
  • Core processors: grok, set, rename, remove, geoip, etc.
  • Advanced processors: dissect, csv, and json
  • When to use what: Choosing the right processor for your data
  • Complete real-world examples

What Is an Ingest Pipeline?

An ingest pipeline in Elasticsearch is a sequence of processors that execute on incoming documents. Think of it as a transformation pipeline — raw logs in, enriched documents out — before they’re indexed.

Each processor is a modular step in that transformation: parsing a field, extracting structured data, adding geo-information, or even dropping irrelevant fields.

Creating a Simple Ingest Pipeline

PUT _ingest/pipeline/add_env
{
  "description": "Add environment tag",
  "processors": [
    {
      "set": {
        "field": "env",
        "value": "production"
      }
    }
  ]
}

Let’s define a basic ingest pipeline that adds a static field to incoming documents:

This pipeline adds "env": "production" to every document processed by it.

 Core Processors in Action

1. grok – Regex-based parsing

The grok processor is used to parse unstructured log data into structured fields.

Log Example:

2025-04-16T14:01:09Z ERROR PaymentService - Transaction failed for user 12345
Grok pattern:
{
  "grok": {
    "field": "message",
    "patterns": [
      "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{WORD:service} - %{GREEDYDATA:message}"
    ]
  }
}




This will extract:

{
"timestamp": "2025-04-16T14:01:09Z",
"level": "ERROR",
"service": "PaymentService",
"message": "Transaction failed for user 12345"
}

2. date Processor

Used to convert parsed date fields into actual datetime objects.

{
  "date": {
    "field": "timestamp",
    "formats": ["ISO8601"]
  }
}




3. set, rename, and remove

  • set – Add/overwrite a value
  • rename – Change field name
  • remove – Delete a field
{
  "rename": {
    "field": "log_message",
    "target_field": "message"
  }
},
{
  "set": {
    "field": "env",
    "value": "production"
  }
},
{
  "remove": {
    "field": "raw_input"
  }
}




4. geoip Processor

Automatically enriches IP addresses with geographical data.

Log Example:

{
  "client_ip": "8.8.8.8"
}




Processor:

{
  "geoip": {
    "field": "client_ip"
  }
}




Will enrich with:

{
  "geoip": {
    "continent_name": "North America",
    "country_name": "United States",
    "region_name": "California",
    "city_name": "Mountain View"
  }
}




5. user_agent Processor

Extracts structured fields from a user-agent string.

Log:

{
  "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)..."
}




Processor:

{
  "user_agent": {
    "field": "user_agent"
  }
}




Result:

{
  "user_agent": {
    "name": "Chrome",
    "version": "110.0.5481.177",
    "os": "Mac OS X 10.15.7",
    "device": "Mac"
  }
}




6. script Processor

Custom logic using painless script.

{
  "script": {
    "lang": "painless",
    "source": "ctx['upper_level'] = ctx.level.toUpperCase();"
  }
}




Full Example: Access Log Pipeline

Apache Access Log Example:

127.0.0.1 - frank [10/Oct/2024:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326




Pipeline:

PUT _ingest/pipeline/apache_access
{
  "description": "Parse Apache access logs",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{COMMONAPACHELOG}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": ["dd/MMM/yyyy:HH:mm:ss Z"]
      }
    },
    {
      "geoip": {
        "field": "clientip"
      }
    }
  ]
}




Testing Pipelines with Simulate API

You can simulate how documents are processed using the _simulate endpoint. 

POST _ingest/pipeline/apache_access/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "127.0.0.1 - frank [10/Oct/2024:13:55:36 -0700] \"GET /apache_pb.gif HTTP/1.0\" 200 2326"
      }
    }
  ]
}




Best Practices

  • Use simulate When testing new pipelines.
  • Add a tag processor for tracking the source.
  • Handle parsing failures gracefully with on_failure.
  • Use ingest pipelines at index time, not for analytics or aggregations.

Advanced Processors

Now let’s go deeper into custom processors that give you fine-grained control over unstructured or delimited logs.

dissect – Token-based splitting (faster than grok)

Use when log fields are consistently delimited, e.g. tabs or spaces.

Log:

2024-04-12T12:00:00 GET /api/v1/resource 200




Pattern:

PUT _ingest/pipeline/dissect_log
{
  "description": "Dissect simple HTTP log",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{timestamp} %{method} %{endpoint} %{status}"
      }
    }
  ]
}




When to use: Use dissect When logs follow a strict structure, it’s lighter and faster than regex-based grok.

csv – Parsing comma-separated values

Log:

2024-04-12,GET,/login,401




Pipeline:

PUT _ingest/pipeline/csv_log
{
  "description": "Parse CSV log",
  "processors": [
    {
      "csv": {
        "field": "message",
        "target_fields": ["timestamp", "method", "path", "status"]
      }
    }
  ]
}




When to use: Use csv When logs are comma-delimited with a consistent structure. Ideal for exported reports or tabular logs.

json – Extract JSON from a string field

Log:

{"timestamp":"2024-04-12","level":"error","msg":"failed login"}




Pipeline:

PUT _ingest/pipeline/json_log
{
  "description": "Parse embedded JSON string",
  "processors": [
    {
      "json": {
        "field": "message",
        "add_to_root": true
      }
    }
  ]
}




When to use: If your logs come as JSON strings, use this processor to flatten them into the document structure.

Choosing the Right Processor

Use Case Processor Semi-structured logs (e.g., Apache, Syslog) grok Strictly structured, delimited logs dissect CSV exports, tabular logs csv Embedded JSON json IP geo-location geoip Field management (rename/set/remove) set, rename, remove

Chaining Processors for Enrichment

Here’s a more advanced pipeline combining multiple processors:

PUT _ingest/pipeline/full_log_pipeline
{
  "description": "End-to-end log enrichment",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{timestamp} %{ip} %{method} %{uri} %{status}"
      }
    },
    {
      "geoip": {
        "field": "ip"
      }
    },
    {
      "set": {
        "field": "env",
        "value": "prod"
      }
    },
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}




Final Thoughts

Elasticsearch ingest pipelines allow you to impose structure on unorganized data at the edge of your data stream. Whether you’re handling logs, metrics, or events, understanding when and how to use processors effectively is crucial for developing efficient and scalable observability solutions.

Reach out on LinkedIn for any questions or


Discover more from Tech Insights & Blogs by Rahul Ranjan

Subscribe to get the latest posts sent to your email.

Leave a comment

Trending