How to ingest data to Elasticsearch through Apache Kafka

Detailing the process of integrating Apache Kafka with Elasticsearch for data ingestion and indexing.

In this article, we show how to integrate Apache Kafka with Elasticsearch for data ingestion and indexing. We will provide an overview of Kafka, its concept of producers and consumers, and we will create a logs index where messages will be received and indexed through Apache Kafka. The project is implemented in Python, and the code is available on GitHub.

Prerequisites

  • Docker and Docker Compose: Ensure you have Docker and Docker Compose installed on your machine.
  • Python 3.x: To run the Producer and Consumer scripts.

A brief introduction to Apache Kafka

Apache Kafka is a distributed streaming platform that enables high scalability and availability, as well as fault tolerance. In Kafka, data management occurs through the main components:

  • Broker: responsible for storing and distributing messages between producers and consumers.
  • Zookeeper: manages and coordinates the Kafka brokers, controlling the state of the cluster, partition leaders, and consumer information.
  • Topics: channels where data is published and stored for consumption.
  • Consumers and Producers: while producers send data to the topics, consumers retrieve that data.

These components work together to form the Kafka ecosystem, providing a robust framework for data streaming.

Project Structure

To understand the data ingestion process, we divided it into stages:

  • Infrastructure Provisioning: setting up the Docker environment to support Kafka, Elasticsearch, and Kibana.
  • Producer Creation: implementing the Kafka Producer, which sends data to the logs topic.
  • Consumer Creation: developing the Kafka Consumer to read and index messages in Elasticsearch.
  • Ingestion Validation: verifying and validating the sent and consumed data.

Infrastructure Configuration with Docker Compose

We utilized Docker Compose to configure and manage the necessary services. Below, you will find the Docker Compose code that sets up each service required for the integration of Apache Kafka, Elasticsearch, and Kibana, ensuring a data ingestion process.

version: "3"

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST:${HOST_IP}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
    container_name: elasticsearch-8.15.1
    environment:
      - node.name=elasticsearch
      - xpack.security.enabled=false
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200

  kibana:
    image: docker.elastic.co/kibana/kibana:8.15.1
    container_name: kibana-8.15.1
    ports:
      - 5601:5601
    environment:
      ELASTICSEARCH_URL: http://elasticsearch:9200
      ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'

You can access the file directly from GitHub.

Data Sending with the Kafka Producer

The producer is responsible for sending messages to the logs topic. By sending messages in batches, it increases network usage efficiency, allowing optimizations with the batch_size and linger_ms settings, which control the quantity and latency of the batches, respectively. The configuration acks='all' ensures that messages are stored durably, which is essential for important log data.

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],  # Specifies the Kafka server to connect
   value_serializer=lambda x: json.dumps(x).encode('utf-8'),  # Serializes data as JSON and encodes it to UTF-8 before sending
   batch_size=16384,     # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
   linger_ms=10,         # Sets the maximum delay (in milliseconds) before sending the batch
   acks='all'            # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)


def generate_log_message():
   levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
   messages = [
       "User login successful",
       "User login failed",
       "Database connection established",
       "Database connection failed",
       "Service started",
       "Service stopped",
       "Payment processed",
       "Payment failed"
   ]
   log_entry = {
       "level": random.choice(levels),
       "message": random.choice(messages),
       "timestamp": time.time()
   }
   return log_entry

def send_log_batches(topic, num_batches=5, batch_size=10):
   for i in range(num_batches):
       logger.info(f"Sending batch {i + 1}/{num_batches}")
       for _ in range(batch_size):
           log_message = generate_log_message()
           producer.send(topic, value=log_message)
       producer.flush()


if __name__ == "__main__":
   topic = "logs"
   send_log_batches(topic)
   producer.close()

When starting the producer, messages are sent in batches to the topic, as shown below:

INFO:kafka.conn:Set configuration …
INFO:log_producer:Sending batch 1/5 
INFO:log_producer:Sending batch 2/5
INFO:log_producer:Sending batch 3/5
INFO:log_producer:Sending batch 4/5

Consumption and Indexing of Data with the Kafka Consumer

The consumer is designed to process messages efficiently, consuming batches from the logs topic and indexing them into Elasticsearch. With auto_offset_reset='latest', it ensures that the consumer starts processing the most recent messages, ignoring the older ones, and max_poll_records=10 limits the batch to 10 messages. With fetch_max_wait_ms=2000, the consumer waits up to 2 seconds to accumulate enough messages before processing the batch.

In its main loop, the consumer consumes log messages, processes, and indexes each batch into Elasticsearch, ensuring continuous data ingestion.

consumer = KafkaConsumer(
   'logs',                                # Topic name
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='latest',            # Ensures reading from the latest offset if the group has no offset stored
   enable_auto_commit=True,               # Automatically commits the offset after processing
   group_id='log_consumer_group',         # Specifies the consumer group to manage offset tracking
   max_poll_records=10,                   # Maximum number of messages per batch
   fetch_max_wait_ms=2000                 # Maximum wait time to form a batch (in ms)
)

def create_bulk_actions(logs):
   for log in logs:
       yield {
           "_index": "logs",
           "_source": {
               'level': log['level'],
               'message': log['message'],
               'timestamp': log['timestamp']
           }
       }

if __name__ == "__main__":
   try:
       print("Starting message processing…")
       while True:

           messages = consumer.poll(timeout_ms=1000)  # Poll receive messages

           # process each batch messages
           for _, records in messages.items():
               logs = [json.loads(record.value) for record in records]
               bulk_actions = create_bulk_actions(logs)
               response = helpers.bulk(es, bulk_actions)
               print(f"Indexed {response[0]} logs.")
   except Exception as e:
       print(f"Erro: {e}")
   finally:
       consumer.close()
       print(f"Finish")

Visualizing Data in Kibana

With Kibana, we can explore and validate the data ingested from Kafka and indexed in Elasticsearch. By accessing Dev Tools in Kibana, you can view the indexed messages and confirm that the data is as expected. For example, if our Kafka producer sent 5 batches of 10 messages each, we should see a total of 50 records in the index.

To verify the data, you can use the following query in the Dev Tools section:

GET /logs/_search
{
  "query": {
    "match_all": {}
  }
}

Response:

Additionally, Kibana provides the ability to create visualizations and dashboards that can help make the analysis more intuitive and interactive. Below, you can see some examples of the dashboards and visualizations we created, which illustrate the data in various formats, enhancing our understanding of the information processed.

Data Ingestion with Kafka Connect

Kafka Connect is a service designed to facilitate integration between data sources and destinations (sinks), such as databases or file systems. It operates with predefined connectors that handle data movement automatically. In our case, Elasticsearch functions as the data sink.

Using Kafka Connect, we can simplify the data ingestion process, eliminating the need to manually implement the data ingestion workflow into Elasticsearch. With the appropriate connector, Kafka Connect allows data sent to a Kafka topic to be directly indexed in Elasticsearch with minimal setup and no additional coding required.

Working with Kafka Connect

To implement Kafka Connect, we’ll add the kafka-connect service to our Docker Compose setup. A key part of this configuration is installing the Elasticsearch connector, which will handle data indexing.

After configuring the service and creating the Kafka Connect container, a configuration file for the Elasticsearch connector will be needed. This file defines essential parameters such as:

  • connection.url: Connection URL for Elasticsearch.
  • topics: The Kafka topic the connector will monitor (in this case, "logs").
  • type.name: Document type in Elasticsearch (typically _doc).
  • value.converter: Converts Kafka messages to JSON format.
  • value.converter.schemas.enable: Specifies whether the schema should be included.
  • schema.ignore and key.ignore: Settings to ignore Kafka schemas and keys during indexing.

Below is the curl command to create the Elasticsearch connector in Kafka Connect:

curl --location '{{url}}/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "logs",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "_doc",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "schema.ignore": "true",
        "key.ignore": "true"
    }
}'

With this configuration, Kafka Connect will automatically begin ingesting data sent to the "logs" topic and indexing it in Elasticsearch. This approach allows for fully automated data ingestion and indexing without requiring additional coding, thereby simplifying the entire integration process.

Conclusion

Integrating Kafka and Elasticsearch creates a powerful pipeline for real-time data ingestion and analysis. This guide provides a foundational approach for building a robust data ingestion architecture, with seamless visualization and analysis in Kibana, ready to adapt to more complex requirements in the future.

Furthermore, using Kafka Connect makes the integration between Kafka and Elasticsearch even more streamlined, eliminating the need for additional code to process and index data. Kafka Connect enables data sent to a specific topic to be automatically indexed in Elasticsearch with minimal configuration.





You can build search with data from any source. Check out this webinar to learn about different connectors and sources that Elasticsearch supports.

Ready to try this out on your own? Start a free trial.

Related content

Jira connector tutorial part II: 6 optimization tips

January 16, 2025

Jira connector tutorial part II: 6 optimization tips

After connecting Jira to Elasticsearch, we'll now review best practices to escalate this deployment.

Jira connector tutorial part I

January 15, 2025

Jira connector tutorial part I

Indexing our Jira content into Elasticsearch to create a unified data source and do search with Document Level Security.

Chatting with your PDFs using Playground

January 8, 2025

Chatting with your PDFs using Playground

Learn how to upload PDF files into Kibana and interact with them using Elastic Playground. This blog showcases a practical example of chatting with PDFs in Playground.

How to ingest data to Elasticsearch through Kafka

December 24, 2024

How to ingest data to Elasticsearch through Kafka

A step-by-step guide to integrating Apache Kafka with Elasticsearch for efficient data ingestion, indexing, and visualization using Python, Docker Compose, and Kafka Connect.

How to ingest data from AWS S3 into Elastic Cloud - Part 3: Elastic S3 Connector

November 5, 2024

How to ingest data from AWS S3 into Elastic Cloud - Part 3: Elastic S3 Connector

Learn about different options to ingest data from AWS S3 into Elastic Cloud. This time we will focus on Elastic S3 Connector.

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself