Ruby scripting in Logstash

Learn about the Logstash Ruby filter plugin for advanced data transformation in your Logstash pipeline.

Logstash is a data processing pipeline that ingests data from multiple sources, transforms it, and sends it to your chosen destinations. Filter plugins are key for this process; they perform specific operations on your data as it goes through the pipeline.

Logstash includes several built-in filters for common tasks like parsing, enriching, and modifying data. But sometimes, you'll encounter scenarios that require custom logic that goes beyond what these standard filters can provide. This is where the Ruby filter plugin comes in.

The Ruby filter plugin allows you to execute custom Ruby code directly within your Logstash pipeline. When standard filters aren't enough, the Ruby filter enables you to handle complex data transformations, implement custom business logic, or integrate with external systems.

In this blog, we'll explore how to use Ruby filters, from basic to advanced usage.

When should you use the Ruby filter?

As an Elastic's consulting architect, I often see customers using Logstash for the data processing pipeline, even though nowadays it is not a state-of-the-art data processing engine. They often struggle with the limitations of standard filters when it comes to complex data manipulation or custom logic. In these cases, the Ruby filter can help overcome those challenges.

The Ruby filter is useful when standard Logstash filters can't meet your specific requirements. Here are some common use-cases:

  • Deep nested data manipulation: Modify complex JSON structures, arrays within arrays, or dynamically restructure data based on content
  • Advanced string processing: Parse and extract structured data from unstructured text
  • Implementing complex business logic: Create custom transformations that require conditional logic, loops, or complex calculations

Basic usage

Let's start with a simple example to understand how the Ruby filter works.

Configuring the Ruby filter

When you create a Logstash pipeline, you should place the configuration file in the /etc/logstash/conf.d directory. Alternatively, you can use -f option to specify the path to the configuration file when you boot Logstash manually, so that you can experiment with your pipelines easily.

$ ./bin/logstash -f /path/to/your_pipeline.conf

The configuration file should have a .conf extension.

To use the Ruby filter, define a ruby filter in the filter section of your Logstash pipeline configuration (*.conf) file. Here's a basic example:

filter {
  ruby {
    code => "
      event.set('new_field', 'Hello from Ruby!')
    "
  }
}

This inline Ruby filter defines a Ruby filter instance within your Logstash configuration. The code parameter provides the inline Ruby script that Logstash will execute for each event processed by this filter. Inside that script, there's an event variable available that represents the event itself. Event object contains the original data sent to Logstash and any additional fields created during Logstash’s filter stages. You can access those fields via the Logstash Event API such as event.get() and event.set(). In this example code, event.set('new_field', 'Hello from Ruby!') set a new field named new_field to the string value Hello from Ruby!. You can add any other code in this code block as needed.

Note that this event object is not a usual Ruby’s hash object although it acts as a key-value type data container. Check out this official documentation to learn more about the Event API.

Externalize Ruby script

For simple transformations, inline Ruby code is convenient. But, for complex logic or reusable functions, it is recommended to move the code into an external Ruby script. This improves maintainability and keeps your Logstash pipeline configuration clean.

First, create a Ruby script and save it as my_ruby_script.rb. The script must define a filter method that processes the event. It takes an event object as an argument, which represents the current event being processed. The filter method needs to return an array of events to emit. To drop the event, return an empty array.

For example, the following script reads the message field, calculates its length, and stores the result in a new field called message_length.

def register(params)
  # This method is called when the plugin is loaded.
  # You can use it to initialize any instance variables or perform setup tasks.
end

def filter(event)
  message = event.get('message')

  if message
    event.set('message_length', message.length)
  end

  return [event]
end

Next, set the Ruby filter configuration to reference the script using the path option. This tells Logstash to load and execute the external script. When using external scripts, ensure the file exists and has the correct permissions.

filter {
  ruby {
    path => "/path/to/my_ruby_script.rb"
  }
}

Now, each event is passed to the filter method in my_ruby_script.rb and is processed by it.

This approach helps you manage complex logic more effectively, making it easier to test, debug, and reuse your Ruby code.

Advanced usage

In this section, we will explore some advanced examples of using the Ruby filter in Logstash. These examples will demonstrate how to perform data transformations, enrich events, and implement custom logic using Ruby.

Manipulating nested data structures

A Logstash event is the core data structure that Logstash processes. It can contain various fields, including nested data structures like arrays and hashes. The Ruby filter allows you to manipulate these nested structures easily.

The Ruby filter can handle nested data structures, such as hashes and arrays, allowing you to modify or add fields within these structures. This is useful when dealing with complex data formats like JSON.

input {
  generator {
    lines => [
      '{"nested": {"key1": "value1", "key2": "value2"}}'
    ]
    count => 1
    codec => "json"
    ecs_compatibility => "disabled"
  }
}

filter {
  ruby {
    code => "
      nested_data = event.get('nested')

      if nested_data.is_a?(Hash)
        nested_data['key3'] = 'value3'
        event.set('nested', nested_data)
      end
    "
  }
}

output {
  stdout { codec => rubydebug }
}

This example includes a nested JSON object in the input data. The Ruby filter modifies the nested data by adding a new key-value pair. This type of manipulation for nested data is not possible with standard Logstash filters, making the Ruby filter a handy option for complex data structures.

Split a single event into multiple events

Ruby filters can also be used to split a single event into multiple events. This is useful when you have a single event containing an array of items and you want to create separate events for each item.

Note that neither Elasticsearch's ingest pipeline nor Beats/Elastic Agent's processors support splitting events. This is one of the strongest use cases for Logstash.

With split filter

You can use the split filter to split an event into multiple events based on a specified field. However, if you need to perform additional transformations or logic during the split, you can use the Ruby filter in combination with the split filter.

In the following example, we have an RSS feed as a single line of XML text. It contains multiple <item> elements. The Ruby filter is used to extract the <item> elements from the XML and store them in a new field called items. The split filter is then used to split the event into multiple events based on the items field.

input {
  generator {
    lines => [
      '<rss version="2.0"><channel><title>Sample RSS</title><item><title>Article 1</title><link>http://example.com/1</link><description>Desc 1</description></item><item><title>Article 2</title><link>http://example.com/2</link><description>Desc 2</description></item></channel></rss>'
    ]
    count => 1
    codec => "plain"
    ecs_compatibility => "disabled"
  }
}

filter {
  xml {
    source => "message"
    target => "rss"
    store_xml => true
    force_array => false
  }
  ruby {
    code => "event.set('items', event.get('[rss][channel][item]')) if event.get('[rss][channel][item]')"
  }
  split {
    field => "items"
  }
  ruby {
    code => "
      item = event.get('items')
      event.set('title', item['title']) if item['title']
      event.set('link', item['link']) if item['link']
      event.set('description', item['description']) if item['description']
    "
  }
  mutate {
    remove_field => ["@timestamp", "@version", "sequence", "host", "event", "message", "rss", "items"]
  }
}

output {
  stdout { codec => rubydebug }
}

This will output as:

{
          "title" => "Article 1",
           "link" => "http://example.com/1",
    "description" => "Desc 1"
}
{
          "title" => "Article 2",
           "link" => "http://example.com/2",
    "description" => "Desc 2"
}

As you may have noticed, the ruby filter is not essential in this case. The split filter can be used to split the event into multiple events based on the items field, and the mutate filter can be used to remove unnecessary fields. However, if you need to perform additional transformations or logic during the split, you can use the Ruby filter.

Use inline Ruby script

You can also use an inline Ruby script to split a single event into multiple events by using the event.clone method and the new_event_block variable, such as new_event_block.call(new_event). This allows you to create new events based on the original event while preserving its data.

Here's an example of how to use the Ruby filter to split a single event into multiple events. The input and output are the same as in the previous example.

filter {
  xml {
    source => "message"
    target => "rss"
    store_xml => true
    force_array => false
  }
  ruby {
    code => "
      items = event.get('[rss][channel][item]')
      if items.is_a?(Array)
        items.each do |item|
          new_event = event.clone
          new_event.set('title', item['title'])
          new_event.set('link', item['link'])
          new_event.set('description', item['description'])
          new_event_block.call new_event
        end
        event.cancel
      elsif items.is_a?(Hash)
        event.set('title', items['title'])
        event.set('link', items['link'])
        event.set('description', items['description'])
      end
    "
  }
  mutate {
    remove_field => ["@timestamp", "@version", "sequence", "host", "event", "message", "rss", "items"]
  }
}

Use external Ruby script

You can also use an external Ruby script to split a single event into multiple events.

Configuration file:

filter {
  xml {
    source => "message"
    target => "rss"
    store_xml => true
    force_array => false
  }
  ruby {
    path => "path/to/ruby/split_event.rb"
  }
  mutate {
    remove_field => ["@timestamp", "@version", "sequence", "host", "event", "message", "rss", "items"]
  }
}

The Ruby script needs to be externalized as split_event.rb:

def filter(event)
  items = event.get('[rss][channel][item]')
  events = []
  if items.is_a?(Array)
    items.each do |item|
      new_event = event.clone
      new_event.set('title', item['title'])
      new_event.set('link', item['link'])
      new_event.set('description', item['description'])
      events << new_event
    end
    return events
  elsif items.is_a?(Hash)
    event.set('title', items['title'])
    event.set('link', items['link'])
    event.set('description', items['description'])
    return [event]
  else
    return []
  end
end

Remember, the filter method must return an array of events. You can return multiple events by cloning an incoming event object and adding them to the array, or you can return a single event as an array with one element.

return events
# or
# return [event]

This allows you to split a single event into multiple events.

Execute external commands and parse their output

Logstash exec input plugin allows you to execute external commands and their output will be an event of Logstash. The output of the command will be stored in the message field of the event.

Usually, the output of system commands are human readable, but not structured as JSON or other formats that Logstash can easily parse. To handle this, you can use the Ruby filter to parse the output and extract the information from it.

Here is an example of using the exec input plugin to execute the ps -ef command, which lists all running processes on a Unix-like system. The output will be parsed by the Ruby filter to extract relevant information about each process.

input {
  exec {
    command => "ps -ef"
    interval => 60
  }
}

filter {
  ruby {
    code => '
      processes = []
      lines = event.get("message").split("\n")  
      lines.each_with_index do |line, index|
        # Skip header line and empty lines
        next if index == 0 || line.strip.empty?
        entry = nil
        
        # Use regex to match the ps -ef output format more flexibly
        # This pattern accounts for variable spacing and different time formats
        if line =~ /^\s*(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+([\d:]+\.?\d*)\s+(.+)$/
          uid, pid, ppid, c, stime, tty, time, cmd = $1, $2, $3, $4, $5, $6, $7, $8
          
          entry = {
            "UID" => uid,
            "PID" => pid,
            "PPID" => ppid,
            "C" => c,
            "STIME" => stime,
            "TTY" => tty,
            "TIME" => time,
            "CMD" => cmd.strip
          }
        elsif line =~ /^\s*(\S+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.+)$/
          # Fallback pattern for lines that might not match the exact format
          # Split the remaining part more carefully
          uid, pid, ppid, c, remainder = $1, $2, $3, $4, $5
          
          # Split remainder into STIME, TTY, TIME, CMD
          parts = remainder.strip.split(/\s+/, 4)
          if parts.length >= 4
            stime, tty, time, cmd = parts[0], parts[1], parts[2], parts[3]
            
            entry = {
              "UID" => uid,
              "PID" => pid,
              "PPID" => ppid,
              "C" => c,
              "STIME" => stime,
              "TTY" => tty,
              "TIME" => time,
              "CMD" => cmd
            }
          end
        end
        if entry && entry["UID"] == "0"
          original_line = line.strip
          entry["original_line"] = original_line if original_line.length > 0
          processes.push(entry)
        end
      end
      event.set("processes", processes)
      event.remove("message")
      event.remove("event")
    '
  }
}

output {
  stdout { codec => rubydebug }
}

This example uses the exec input plugin to run the ps -ef command every 60 seconds. The Ruby filter processes the output, extracting relevant fields such as UID, PID, PPID, CPU usage (C), start time (STIME), TTY, total CPU time (TIME), and the command (CMD) executed. It works fine on my macOS environment, but you may need to adjust the regex patterns to match the output format of the ps -ef command on your system.

Use built-in libraries

The Ruby filter plugin allows you to use built-in Ruby libraries, which can be very useful for various tasks. For example, you can use the json library to parse JSON strings or the date library to manipulate dates.

Here is an example of using the json library to parse a JSON string stored in a field:

require 'json'

def filter(event)
  json_string = event.get('message')
  parsed_json = JSON.parse(json_string)
  event.set('parsed_json', parsed_json)
  return [event]
end

To avoid requiring the library every time, you should externalize your Ruby code so that you can use the require statement at the beginning of your Ruby filter script. This will load the library once and make it available for use in your script.

To check which libraries are available in your environment, you can list the built-in libraries by running the following code in the Ruby filter:

Gem.loaded_specs.sort_by { |name, _| name }.each do |name, spec|
  puts "#{name}: #{spec.version}"
end

Note: The built-in libraries are not officially supported by Logstash, and their behavior may change or they may not be available in future versions. Use them at your own risk.

Conclusion

The Logstash Ruby filter allows you to customize and extend the capabilities of your Logstash pipelines. In this post, we've covered the basics of using the Ruby filter and provided advanced usage examples.

By leveraging the Ruby filter, you can handle complex data processing tasks that require custom logic or advanced manipulation. Whether you're working with nested data structures, splitting events, or parsing and converting complex/unstructured text into structured JSON, the Ruby filter provides flexibility to meet your specific requirements.

We hope this guide has provided you with the knowledge and inspiration to explore the full potential of the Logstash Ruby filter. Happy scripting!

You can build search with data from any source. Check out this webinar to learn about different connectors and sources that Elasticsearch supports.

Ready to try this out on your own? Start a free trial.

Related content

Building an MCP server with Elasticsearch for real health data

June 26, 2025

Building an MCP server with Elasticsearch for real health data

Learn learn how to build an MCP server using FastMCP and Elasticsearch to manage and search data.

ECK made simple: Deploying Elasticsearch on GCP GKE Autopilot

June 19, 2025

ECK made simple: Deploying Elasticsearch on GCP GKE Autopilot

Learn how to deploy an Elasticsearch cluster on GCP using GKE Autopilot and ECK.

Elasticsearch open inference API adds support for IBM watsonx.ai rerank models

Elasticsearch open inference API adds support for IBM watsonx.ai rerank models

Exploring how to use IBM watsonx™ reranking when building search experiences in the Elasticsearch vector database.

Using Azure LLM Functions with Elasticsearch for smarter query experiences

June 13, 2025

Using Azure LLM Functions with Elasticsearch for smarter query experiences

Try out the example real estate search app that uses Azure Gen AI LLM Functions with Elasticsearch to provide flexible hybrid search results. See step-by-step how to configure and run the example app in GitHub Codespaces.

Geospatial distance search with ES|QL

June 11, 2025

Geospatial distance search with ES|QL

Exploring geospatial distance search in Elasticsearch Query Language (ES|QL), one of the most desired and useful features in Elasticsearch's geospatial search and in ES|QL.

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself