Nima Rezainia

Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector

Confluent Cloud users can now use the updated Elasticsearch Sink Connector with Elastic Agent and Elastic Integrations for a fully-managed and highly scalable data ingest architecture.

11 min read
Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector

Elastic and Confluent are key technology partners and we're pleased to announce new investments in that partnership. Confluent's Kafka is a key component of many Enterprise ingest architectures, and it ensures that customers can guarantee delivery of critical Observability and Security data into their Elasticsearch clusters. Together, we've been working on key improvements to how our products fit together. With Elastic Agent's new Kafka output and Confluent's newly improved Elasticsearch Sink Connectors it's never been easier to seamlessly collect data from the edge, stream it through Kafka, and into an Elasticsearch cluster.

In this blog, we examine a simple way to integrate Elastic Agent with Confluent Cloud's Kafka offering to reduce the operational burden of ingesting business-critical data.

Benefits of Elastic Agent and Confluent Cloud

When combined, Elastic Agent and Confluent Cloud's updated Elasticsearch Sink connector provide a myriad of advantages for organizations of all sizes. This combined solution offers flexibility in handling any type of data ingest workload in an efficient and resilient manner.

Fully Managed

When combined, Elastic Cloud Serverless and Confluent Cloud provide users with a fully managed service. This makes it effortless to deploy and ingest nearly unlimited data volumes without having to worry about nodes, clusters, or scaling.

Full Elastic Integrations Support

Sending data through Kafka is fully supported with any of the 300+ Elastic Integrations. In this blog post, we outline how to set up the connection between the two platforms. This ensures you can benefit from our investments in built-in alerts, SLOs, AI Assistants, and more.

Decoupled Architecture

Kafka acts as a resilient buffer between data sources (such as Elastic Agent and Logstash) and Elasticsearch, decoupling data producers from consumers. This can significantly reduce total cost of ownership by enabling you to size your Elasticsearch cluster based on typical data ingest volume, not maximum ingest volume. It also ensures system resilience during spikes in data volume.

Ultimate control over your data

With our new Output per Integration capability, customers can now send different data to different destinations using the same agent. Customers can easily send security logs directly to Confluent Cloud/Kafka, which can provide delivery guarantees, while sending less critical application logs and system metrics directly to Elasticsearch.

Deploying the reference architecture

In the following sections, we will walk you through one of the ways Confluent Kafka can be integrated with Elastic Agent and Elasticsearch using Confluent Cloud's Elasticsearch Sink Connector. As with any streaming and data collection technology, there are many ways a pipeline can be configured depending on the particular use case. This blog post will focus on a simple architecture that can be used as a starting point for more complex deployments.

Some of the highlights of this architecture are:

  • Dynamic Kafka topic selection at Elastic Agents
  • Elasticsearch Sink Connectors for fully managed transfer from Confluent Kafka to Elasticsearch
  • Processing data leveraging Elastic's 300+ Integrations

Prerequisites

Before getting started ensure you have a Kafka cluster deployed in Confluent Cloud, an Elasticsearch cluster or project deployed in Elastic Cloud, and an installed and enrolled Elastic Agent.

Configure Confluent Cloud Kafka Cluster for Elastic Agent

Navigate to the Kafka cluster in Confluent Cloud, and select

Cluster Settings
. Locate and note the
Bootstrap Server
address, we will need this value later when we create the Kafka Output in Fleet.

Navigate to

Topics
in the left-hand navigation menu and create two topics:

  1. A topic named
    logs
  2. A topic named
    metrics

Next, navigate to

API Keys
in the left-hand navigation menu:

  1. Click
    + Add API Key
  2. Select the
    Service Account
    API key type
  3. Provide a meaningful name for this API Key
  4. Grant the key write permission to the
    metrics
    and
    logs
    topics
  5. Create the key

Note the provided Key and the Secret, we will need it later when we configure the Kafka Output in Fleet.

Configure Elasticsearch and Elastic Agent

In this section, we will configure the Elastic Agent to send data to Confluent Cloud's Kafka cluster and we will configure Elasticsearch so it can receive data from the Confluent Cloud Elasticsearch Sink Connector.

Configure Elastic Agent to send data to Confluent Cloud

Elastic Fleet simplifies sending data to Kafka and Confluent Cloud. With Elastic Agent, a Kafka "output" can be easily attached to all data coming from an agent or it can be applied only to data coming from a specific data source.

Find

Fleet
in the left-hand navigation, click the
Settings
tab. On the
Settings
tab, find the
Outputs
section and click
Add Output
.

Perform the following steps to configure the new Kafka output:

  1. Provide a
    Name
    for the output
  2. Set the
    Type
    to
    Kafka
  3. Populate the
    Hosts
    field with the
    Bootstrap Server
    address we noted earlier .
  4. Under
    Authentication
    , populate the
    Username
    with the
    API Key
    and the
    Password
    with the
    Secret
    we noted earlier
  5. Under
    Topics
    , select
    Dynamic Topic
    and set
    Topic from field
    to
    data_stream.type
  6. Click
    Save and apply settings

Next, we will navigate to the

Agent Policies
tab in Fleet and click to edit the Agent Policy that we want to attach the Kafka output to. With the Agent Policy open, click the
Settings
tab and change
Output for integrations
and
Output for agent monitoring
to the Kafka output we just created.

Selecting an Output per Elastic Integration: To set the Kafka output to be used for specific data sources, see the integration-level outputs documentation.

A note about Topic Selection: The

data_stream.type
field is a reserved field which Elastic Agent automatically sets to
logs
if the data we're sending is a log and
metrics
if the data we're sending is a metric. Enabling Dynamic Topic selection using
data_stream.type
, will cause Elastic Agent to automatically route metrics to a
metrics
topic and logs to a
logs
topic. For information on topic selection, see the Kafka Output's Topics settings documentation.

Configuring a publishing endpoint in Elasticsearch

Next, we will set up two publishing endpoints (data streams) for the Confluent Cloud Sink Connector to use when publishing documents to Elasticsearch:

  1. We will create a data stream
    logs-kafka.reroute-default
    for handling logs
  2. We will create a data stream
    metrics-kafka.reroute-default
    for handling metrics

If we were to leave the data in those data streams as-is, the data would be available but we would find the data is unparsed and lacking vital enrichment. So we will also create two index templates and two ingest pipelines to make sure the data is processed by our Elastic Integrations.

Creating the Elasticsearch Index Templates and Ingest Pipelines

The following steps use Dev Tools in Kibana, but all of these steps can be completed via the REST API or using the relevant user interfaces in Stack Management.

First, we will create the Index Template and Ingest Pipeline for handling logs:

PUT _index_template/logs-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-kafka.reroute"
    }
  },
  "index_patterns": [
    "logs-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{{data_stream.dataset}}"
        ],
        "namespace": [
          "{{data_stream.namespace}}"
        ]
      }
    }
  ]
}

Next, we will create the Index Template and Ingest Pipeline for handling metrics:

PUT _index_template/metrics-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "metrics-kafka.reroute"
    }
  },
  "index_patterns": [
    "metrics-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{{data_stream.dataset}}"
        ],
        "namespace": [
          "{{data_stream.namespace}}"
        ]
      }
    }
  ]
}

A note about rerouting: For a practical example of how this works, a document related to a Linux Network Metric would be first land in

metrics-kafka.reroute-default
and this Ingest Pipeline would inspect the document and find
data_stream.dataset
set to
system.network
and
data_stream.namespace
set to
default
. It would use these values to reroute the document from
metrics-kafka.reroute-default
to
metrics-system.network-default
where it would be processed by the
system
integration.

Configure the Confluent Cloud Elasticsearch Sink Connector

Now it's time to configure the Confluent Cloud Elasticsearch Sink Connector. We will perform the following steps twice and create two separate connectors, one connector for logs and one connector for metrics. Where the required settings differ, we will highlight the correct values.

Navigate to your Kafka cluster in Confluent Cloud and select Connectors from the left-hand navigation menu. On the Connectors page, select

Elasticsearch Service Sink
from a catalog of connectors available.

Confluent Cloud presents a simplified workflow for the user to configure a connector. Here we will walk through each step of the process:

Step 1: Topic Selection

First, we will select the topic that the connector will consume data from based on which connector we are deploying:

  • When deploying the Elasticsearch Sink Connector for logs, select the
    logs
    topic.
  • When deploying the Elasticsearch Sink Connector for metrics, select the
    metrics
    topic.

Step 2: Kafka Credentials

Choose

KAFKA_API_KEY
as the cluster authentication mode. Provide the
API Key
and
Secret
noted earlier when we gather required Confluent Cloud Cluster information.

Step 3: Authentication

Provide the Elasticsearch Endpoint address of our Elasticsearch cluster as the

Connection URI
. The
Connection user
and
Connection password
are the authentication information for the account in Elasticsearch that will be used by the Elasticsearch Sink Connector to write data to Elasticsearch.

Step 4: Configuration

In this step we will keep the

Input Kafka record value format
set to
JSON
. Next, expand
Advanced Configuration
.

  1. We will set
    Data Stream Dataset
    to
    kafka.reroute
  2. We will set
    Data Stream Type
    based on the connector we are deploying:
    • When deploying the Elasticsearch Sink Connector for logs, we will set
      Data Stream Type
      to
      logs
    • When deploying the Elasticsearch Sink Connector for metrics, we will set
      Data Stream Type
      to
      metrics
  3. The correct values for other settings will depend on the specific environment.

Step 5: Sizing

In this step, notice that Confluent Cloud provides a recommended minimum number of tasks for our deployment. Following the recommendation here is a good starting place for most deployments.

Step 6: Review and Launch

Review the

Connector configuration
and
Connector pricing
sections and if everything looks good, it's time to click
continue
and launch the connector! The connector may report as provisioning but will soon start consuming data from the Kafka topic and writing it to the Elasticsearch cluster.

You can now navigate to Discover in Kibana and find your logs flowing into Elasticsearch! Also check out the real time metrics that Confluent Cloud provides for your new Elasticsearch Sink Connector deployments.

If you have only deployed the first

logs
sink connector, you can now repeat the steps above to deploy the second
metrics
sink connector.

Enjoy your fully managed data ingest architecture

If you followed the steps above, congratulations. You have successfully:

  1. Configured Elastic Agent to send logs and metrics to dedicated topics in Kafka
  2. Created publishing endpoints (data streams) in Elasticsearch dedicated to handling data from the Elasticsearch Sink Connector
  3. Configured managed Elasticsearch Sink connectors to consume data from multiple topics and publish that data to Elasticsearch

Next you should enable additional integrations, deploy more Elastic Agents, explore your data in Kibana, and enjoy the benefits of a fully managed data ingest architecture with Elastic Serverless and Confluent Cloud!

Share this article