Unstructured data processing with NVIDIA NeMo Retriever, Unstructured, and Elasticsearch

Learn how to build a scalable data pipeline for unstructured documents using NeMo Retriever, Unstructured Platform, and Elasticsearch for RAG applications.

In this blog, we will discuss how to implement a scalable data processing pipeline using NVIDIA NeMo Retriever extraction models, Unstructured Platform and Elasticsearch. This pipeline transforms unstructured data from a data source into structured, searchable content ready for downstream AI applications, such as RAG. Retrieval Augmented Generation (RAG) is an AI technique where Large Language Models (LLMs) are provided with external knowledge to generate responses to user queries. This allows LLM responses to be tailored to specific context, making answers more accurate and relevant.

Before we get started, let’s take a look at the key components enabling this pipeline and what each brings to the table.

Pipeline components

NeMo Retriever extraction is a set of microservices for transforming unstructured documents into structured content and metadata. It handles document parsing, visual structure identification, and OCR processing at scale. The RAG NVIDIA AI Blueprint provides a starting point for how to use the NeMo Retriever microservices in a high-performance extraction pipeline.

Unstructured is an ETL+ platform for orchestrating the entirety of unstructured data processing: from ingesting unstructured data from multiple data sources, converting raw, unstructured files into structured data through a configurable workflow engine, enriching data with additional transformations, all the way to uploading the results into vector stores, databases and search engines. It provides a visual UI, APIs, and scalable backend infrastructure to orchestrate document parsing, enrichment, and embedding in a single workflow.

Elasticsearch is an industry-leading search and analytics engine that now includes native vector search capabilities. It can function as both a traditional text database and a vector database, enabling semantic search at scale with features like k-NN similarity search.

Now that we’ve introduced the core components, let’s take a look at how they work together in a typical workflow before diving into the implementation.

RAG with NeMo Retriever - Unstructured - Elasticsearch

While here we only provide key highlights, you can find the full notebook here.

This blog can be divided into 3 parts:

  • Setting up the source and destination connectors
  • Setting up the workflow with Unstructured API
  • RAG over the processed data

Unstructured workflow is represented as a DAG where the nodes, called connectors, control where the data is ingested from and where the processed results are uploaded to. These nodes are required in any workflow. A source connector configures ingestion of the raw data from a data source, and the destination connector configures the data uploading of the processed data into a vector store, search engine, or a database.

For this blog, we store research papers in Amazon S3 and we want the processed data to be delivered into Elasticsearch for downstream use. This means that before we can build a data processing workflow, we need to create a source connector for Amazon S3, and a destination connector for Elasticsearch with Unstructured API.

Step 1: Setting up the S3 source connector

When creating a source connector, you need to give it a unique name, specify its type (e.g. S3, or Google Drive), and provide the configuration which typically contains the location of the source you're connecting to (e.g. S3 bucket URI, or Google Drive folder) and authentication details.

source_connector_response = unstructured_client.sources.create_source(
    request=CreateSourceRequest(
        create_source_connector=CreateSourceConnector(
            name="demo_source1",
            type=SourceConnectorType.S3,
            config=S3SourceConnectorConfigInput(
                key=os.environ['S3_AWS_KEY'],
                secret=os.environ['S3_AWS_SECRET'],
                remote_url=os.environ["S3_REMOTE_URL"],
                recursive=False #True/False
            )
        )
    )
)

pretty_print_model(source_connector_response.source_connector_information)

Step 2: Setting up the Elasticsearch destination connector

Next, let’s set up the Elasticsearch destination connector. The Elasticsearch index that you use must have a schema that is compatible with the schema of the documents that Unstructured produces for you—you can find all the details in the documentation.

destination_connector_response = unstructured_client.destinations.create_destination(
    request=CreateDestinationRequest(
        create_destination_connector=CreateDestinationConnector(
            name="demo_dest-3",
            type=DestinationConnectorType.ELASTICSEARCH,
            config=ElasticsearchConnectorConfigInput(
                hosts=[os.environ['es_host']],
                es_api_key=os.environ['es_api_key'],
                index_name="demo-index"
            )
        )
    )
)

Step 3: Creating a workflow with Unstructured

Once you have the source and destination connectors, you can create a new data processing workflow. We’ll build the workflow DAG with the following nodes:

  • NeMo Retriever for document partitioning
  • Unstructured’s Image Summarizer, Table Summarizer, and Named Entity Recognition nodes for content enrichment
  • Chunker and Embedder nodes for making the content ready for similarity search
from unstructured_client.models.shared import (
    WorkflowNode,
    WorkflowNodeType,
    WorkflowType,
    Schedule
)

# Partition the content by using NV-Ingest
parition_node = WorkflowNode(
            name="Ingest",
            subtype="nvingest",
            type="partition",
            settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},
        )


# Summarize each detected image.
image_summarizer_node = WorkflowNode(
    name="Image summarizer",
    subtype="openai_image_description",
    type=WorkflowNodeType.PROMPTER,
    settings={}
)

# Summarize each detected table.
table_summarizer_node = WorkflowNode(
    name="Table summarizer",
    subtype="anthropic_table_description",
    type=WorkflowNodeType.PROMPTER,
    settings={}
)

# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(
    name="Named entity recognizer",
    subtype="openai_ner",
    type=WorkflowNodeType.PROMPTER,
    settings={
        "prompt_interface_overrides": None
    }
)

# Chunk the partitioned content.
chunk_node = WorkflowNode(
    name="Chunker",
    subtype="chunk_by_title",
    type=WorkflowNodeType.CHUNK,
    settings={
        "unstructured_api_url": None,
        "unstructured_api_key": None,
        "multipage_sections": False,
        "combine_text_under_n_chars": 0,
        "include_orig_elements": True,
        "max_characters": 1537,
        "overlap": 160,
        "overlap_all": False,
        "contextual_chunking_strategy": None
    }
)

# Generate vector embeddings.
embed_node = WorkflowNode(
    name="Embedder",
    subtype="azure_openai",
    type=WorkflowNodeType.EMBED,
    settings={
        "model_name": "text-embedding-3-large"
    }
)


response = unstructured_client.workflows.create_workflow(
    request={
        "create_workflow": {
            "name": f"s3-to-es-NV-Ingest-custom-workflow",
            "source_id": source_connector_response.source_connector_information.id,
            "destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e",
            "workflow_type": WorkflowType.CUSTOM,
            "workflow_nodes": [
                parition_node,
                image_summarizer_node,
                table_summarizer_node,
                named_entity_recognizer_node,
                chunk_node,
                embed_node
            ],
        }
    }
)

workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)

job = unstructured_client.workflows.run_workflow(
    request={
        "workflow_id": workflow_id,
    }
)

pretty_print_model(job.job_information)

Once your job for this workflow completes, the data is uploaded into Elasticsearch and we can proceed with building a basic RAG application.

Step 4: RAG setup

Let's go ahead with a simple retriever that will connect to the data, take in the user query, embed it with the same model that was used to embed the original data, and calculate cosine similarity to retrieve the top 3 documents.

from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import os

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
    openai_api_key=os.environ['OPENAI_API_KEY']

)

vector_store = ElasticsearchStore(
    es_url=os.environ['es_host'],
    index_name="demo-index",
    embedding=embeddings,
    es_api_key=os.environ['es_api_key'],
    query_field="text",
    vector_query_field="embeddings",
    distance_strategy="COSINE"
)

retriever = vector_store.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}  # Number of results to return
)

Then let's set up a workflow to receive a user query, fetch similar documents from Elasticsearch, and use the documents as context to answer the user’s question.

from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def generate_answer(question: str, documents: str):

    prompt = """
    You are an assistant that can answer user questions given provided context.
    Your answer should be thorough and technical.
    If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'
    """

    augmented_prompt = (
        f"{prompt}"
        f"User question: {question}\n\n"
        f"{documents}"
    )
    response = client.chat.completions.create(
        messages=[
            {'role': 'system', 'content': 'You answer users questions.'},
            {'role': 'user', 'content': augmented_prompt},
        ],
        model="gpt-4o-2024-11-20",
        temperature=0,
    )

    return response.choices[0].message.content


def format_docs(docs):
    seen_texts = set()
    useful_content = [doc.page_content for doc in docs]

    return  "\nRetrieved documents:\n" + "".join(
        [
            f"\n\n===== Document {str(i)} =====\n" + doc
            for i, doc in enumerate(useful_content)
        ]
    )
def rag(query):
  docs = retriever.invoke(query)
  documents = format_docs(docs)
  answer = generate_answer(query, documents)
  return documents, answer

Putting everything together we get:

query = "How did the response lengths change with training?"

docs, answer = rag(query)

print(answer)

And a response:

Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.

### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.

2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.

3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.

### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.

In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.

Elasticsearch provides various strategies to enhance search, including Hybrid search, a combination of approximate semantic search and keyword-based search.

This approach can improve the relevance of the top documents used as context in the RAG architecture. To enable it, you need to modify the vector_store initialization as follows:

from langchain_elasticsearch import DenseVectorStrategy

vector_store = ElasticsearchStore(
    es_url=os.environ['es_host'],
    index_name="demo-index",
    embedding=embeddings,
    es_api_key=os.environ['es_api_key'],
    query_field="text",
    vector_query_field="embeddings",
    strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)

Conclusion

Good RAG starts with well-prepared data, and Unstructured simplifies this critical first step. By enabling partitioning with NeMo Retriever, metadata enrichment of unstructured data and efficient ingestion into Elasticsearch, it ensures that your RAG pipeline is built on a solid foundation, unlocking its full potential for all your downstream tasks.

Ready to try this out on your own? Start a free trial.

Elasticsearch has integrations for tools from LangChain, Cohere and more. Join our advanced semantic search webinar to build your next GenAI app!

Related content

First to hybrid search: with Elasticsearch and Semantic Kernel

May 21, 2025

First to hybrid search: with Elasticsearch and Semantic Kernel

Hybrid search capabilities are now available in the .NET Elasticsearch Semantic Kernel connector. Learn how to get started in this blog post.

Spring AI and Elasticsearch as your vector database

Spring AI and Elasticsearch as your vector database

Building a complete AI application using Spring AI and Elasticsearch.

Get set, build: Red Hat OpenShift AI applications powered by Elasticsearch vector database

May 21, 2025

Get set, build: Red Hat OpenShift AI applications powered by Elasticsearch vector database

The Elasticsearch vector database is now supported by the ‘AI Generation with LLM and RAG’ Validated Pattern. This blog walks you through how to get started.

Using LlamaIndex Workflows with Elasticsearch

April 21, 2025

Using LlamaIndex Workflows with Elasticsearch

Learn how to create an Elasticsearch-based step for your LlamaIndex workflow.

Using AutoGen with Elasticsearch

April 24, 2025

Using AutoGen with Elasticsearch

Learn to create an Elasticsearch tool for your agents with AutoGen.

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself