How to ingest data to Elasticsearch through Apache Airflow

Learn how to ingest data to Elasticsearch through Apache Airflow.

Apache Airflow

Apache Airflow is a platform designed to create, schedule, and monitor workflows. It is used to orchestrate ETL processes, data pipelines, and other complex workflows, offering flexibility and scalability. Its visual interface and real-time monitoring capabilities make pipeline management more accessible and efficient, allowing you to track the progress and results of your executions. Below are its four main pillars:

  • Dynamic: Pipelines are defined in Python, allowing for dynamic and flexible workflow generation.
  • Extensible: Airflow can be integrated with a variety of environments, custom operators can be created, and specific code can be executed as needed.
  • Elegant: Pipelines are written in a clean and explicit manner.
  • Scalable: Its modular architecture uses a message queue to orchestrate an arbitrary number of workers.

In practice, Airflow can be used in scenarios such as:

  • Data import: Orchestrate the daily ingestion of data into a database such as Elasticsearch.
  • Log monitoring: Manage the collection and processing of log files, which are then analyzed in Elasticsearch to identify errors or anomalies.
  • Integration of multiple data sources: Combine information from different systems (APIs, databases, files) into a single layer in Elasticsearch, simplifying search and reporting.

DAG: Directed Acyclic Graphs

In Airflow, workflows are represented by DAGs (Directed Acyclic Graphs). A DAG is a structure that defines the sequence in which tasks will be executed. The main characteristics of DAGs are:

  • Composition by independent tasks: Each task represents a unit of work and is designed to be executed independently.
  • Sequencing: The sequence in which tasks are executed is explicitly defined in the DAG.
  • Reusability: DAGs are designed to be executed repeatedly, facilitating process automation.

Main components of Airflow

The Airflow ecosystem is composed of several components that work together to orchestrate tasks:

  • Scheduler: Responsible for scheduling DAGs and sending tasks for execution by workers.
  • Executor: Manages the execution of tasks, delegating them to workers.
  • Web Server: Provides a graphical interface for interacting with DAGs and tasks.
  • Dags Folder: Folder where we store DAGs written in Python.
  • Metadata: Database that serves as a repository for the tool, used by the scheduler and executor to store execution status.

Apache Airflow and Elasticsearch

We will demonstrate the use of Apache Airflow and Elasticsearch to orchestrate tasks and index results in Elasticsearch. The goal of this demonstration is to create a pipeline of tasks to update records in an Elasticsearch index. This index contains a database of movies, where users can rate and assign ratings. Imagining a scenario with hundreds of daily ratings, it is necessary to keep the ratings record updated. To do this, a DAG will be developed that will be executed daily, responsible for retrieving the new consolidated ratings and updating the records in the index.

In the DAG flow, we will have a task to fetch the ratings, followed by a task to validate the results. If the data does not exist, the DAG will be directed to a failure task. Otherwise, the data will be indexed in Elasticsearch. The goal is to update the rating field of movies in an index by retrieving the ratings through a method with the mechanism responsible for calculating the scores.

Using Apache Airflow and Elasticsearch with Docker

To create a containerized environment, we will use Apache Airflow with Docker. Follow the instructions in the "Running Airflow in Docker" guide to set up Airflow practically.

As for Elasticsearch, I will use a cluster on Elastic Cloud, but if you prefer, you can also configure Elasticsearch with Docker. An index has already been created containing a movie catalog, with the movie data indexed. The 'rating' field of these movies will be updated.

Creating the DAG

After installing via Docker, a folder structure will be created, including the dags folder, where we must place our DAG files for Airflow to recognize them.

Before that, we need to ensure the necessary dependencies are installed. Here are the dependencies for this project:

pip install apache-airflow apache-airflow-providers-elasticsearch

We will create the file update_ratings_movies.py and start coding the tasks.

Now, let's import the necessary libraries:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

We will use the ElasticsearchPythonHook, a component that simplifies the integration between Airflow and an Elasticsearch cluster by abstracting the connection and the use of external APIs.

Next, we define the DAG, specifying its main arguments:

  • dag_id: the name of the DAG.
  • start_date: when the DAG will start.
  • schedule: defines the periodicity (daily in our case).
  • doc_md: documentation that will be imported and displayed in the Airflow interface.

Defining the Tasks

Now, let's define the DAG's tasks. The first task will be responsible for retrieving the movie rating data. We will use the PythonOperator with the task_id set to 'get_movie_ratings'. The python_callable parameter will call the function responsible for fetching the ratings.

get_ratings_operator = PythonOperator(
   task_id='get_movie_ratings',
   python_callable=get_movie_ratings_task
)

Next, we need to validate whether the results are valid. For this, we will use a conditional with a BranchPythonOperator. The task_id will be 'validate_result', and the python_callable will call the validation function. The op_args parameter will be used to pass the result of the previous task, 'get_movie_ratings', to the validation function.

validate_result = BranchPythonOperator(
   task_id='validate_result',
   python_callable=validate_result,
   op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

If the validation is successful, we will take the data from the 'get_movie_ratings' task and index it into Elasticsearch. To achieve this, we will create a new task, 'index_movie_ratings', which will use the PythonOperator. The op_args parameter will pass the results of the 'get_movie_ratings' task to the indexing function.

index_ratings_operator = PythonOperator(
   task_id='index_movie_ratings',
   python_callable=index_movie_ratings_task,
   op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

If the validation indicates a failure, the DAG will proceed to a failure notification task. In this example, we simply print a message, but in a real-world scenario, we could configure alerts to notify about the failures.

failed_get_rating_operator = PythonOperator(
   task_id='failed_get_rating_operator',
   python_callable=lambda: print('Ratings were False, skipping indexing.')
)

Finally, we define the task dependencies, ensuring they execute in the correct order:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

Now follows the complete code of our DAG:

"""
DAG update Rating Movies
"""
import ast
import random

from airflow import DAG
from datetime import datetime

from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook


def index_movie_ratings_task(movies):
   es_hook = ElasticsearchPythonHook(hosts=None,
                                     es_conn_args={
                                         "cloud_id": "cloud_id"
                                         "api_key": "api-key"
                                     })
   es_client = es_hook.get_conn
   actions = []
   for movie in ast.literal_eval(movies):
       actions.append(
           {
               "update": {
                   "_id": movie["id"],
                   "_index": "movies"
               }
           }
       )
       actions.append(
           {
               "doc": {
                   "rating": movie["rating"]
               },
               "doc_as_upsert": True
           }
       )
   result = es_client.bulk(operations=actions)
   print(f"Ingestion completed.")
   print(result)
   return True


def get_movie_ratings_task():
   movies = [
       {"id": i, "rating": round(random.uniform(1, 10), 1)}
       for i in range(1, 100)
   ]
   return movies

def validate_result(result):
   if not result:
       return 'failed_get_rating_operator'
   else:
       return 'index_movie_ratings'


with DAG(
       dag_id="update_ratings_movies_2024",
       start_date=datetime(2024, 12, 29),
       schedule="@daily",
       doc_md=__doc__,
):
   get_ratings_operator = PythonOperator(
       task_id='get_movie_ratings',
       python_callable=get_movie_ratings_task
   )

   validate_result = BranchPythonOperator(
       task_id='validate_result',
       python_callable=validate_result,
       op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],
       provide_context=True
   )

   index_ratings_operator = PythonOperator(
       task_id='index_movie_ratings',
       python_callable=index_movie_ratings_task,
       op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
   )

   failed_get_rating_operator = PythonOperator(
       task_id='failed_get_rating_operator',
       python_callable=lambda: print('Ratings were False, skipping indexing.')
   )

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

Visualizing the DAG Execution

In the Apache Airflow interface, we can visualize the execution of the DAGs. Simply go to the "DAGs" tab and locate the DAG you created.

Below, we can visualize the executions of the tasks and their respective statuses. By selecting an execution for a specific date, we can access the logs of each task. Note that in the index_movie_ratings task, we can see the indexing results in the index, and that it was successfully completed.

In the other tabs, it is possible to access additional information about the tasks and the DAG, assisting in the analysis and resolution of potential issues.

Conclusion

In this article, we demonstrated how to integrate Apache Airflow with Elasticsearch to create a data ingestion solution. We showed how to configure the DAG, define the tasks responsible for retrieving, validating, and indexing movie data, as well as monitor and visualize the execution of these tasks in the Airflow interface.

This approach can be easily adapted to different types of data and workflows, making Airflow a useful tool for orchestrating data pipelines in various scenarios.

References:

Apache AirFlow

https://airflow.apache.org/

Install Apache Airflow with Docker

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python Operator

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

Ready to try this out on your own? Start a free trial.

Elasticsearch has integrations for tools from LangChain, Cohere and more. Join our advanced semantic search webinar to build your next GenAI app!

Related content

Navigating graphs for Retrieval-Augmented Generation using Elasticsearch

January 20, 2025

Navigating graphs for Retrieval-Augmented Generation using Elasticsearch

Discover how to use Knowledge Graphs to enhance RAG results while storing the graph efficiently in Elasticsearch. This guide explores a detailed strategy for dynamically generating knowledge subgraphs tailored to a user’s query.

Jira connector tutorial part II: 6 optimization tips

January 16, 2025

Jira connector tutorial part II: 6 optimization tips

After connecting Jira to Elasticsearch, we'll now review best practices to escalate this deployment.

Jira connector tutorial part I

January 15, 2025

Jira connector tutorial part I

Indexing our Jira content into Elasticsearch to create a unified data source and do search with Document Level Security.

High Quality RAG with Aryn DocPrep, DocParse and Elasticsearch vector database

January 21, 2025

High Quality RAG with Aryn DocPrep, DocParse and Elasticsearch vector database

Learn how to achieve high-quality RAG with effective data preparation using Aryn.ai DocParse, DocPrep, and Elasticsearch vector database.

How to create your own Spotify Wrapped in Kibana

January 14, 2025

How to create your own Spotify Wrapped in Kibana

Based on the downloadable Spotify personal history, we'll generate a custom version of "Wrapped" with the top artists, songs, and trends over the year

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself