BigQuery is a Google platform that allows you to centralize data from their different sources and services into one repository. It also enables you to do data analysis and use GenAI and ML tools. Below are the ways to bring data into BigQuery:

Indexing data from all of these sources into Elasticsearch allows you to centralize your data sources for a better observability experience.
In this article, you'll learn how to index data from BigQuery into Elasticsearch using Python, enabling you to unify data from different systems for search and analysis.
You can use the example from this article in this Google Colab notebook.
Steps
Prepare BigQuery
To use BigQuery, you need to access Google Cloud Console and create a project. Once done, you'll be redirected to this view:

BigQuery allows you to transfer data from Google Drive and Google Cloud Storage, and to upload local files. To upload data to BigQuery you must first create a dataset. Create one and name it "server-logs" so we can upload some files.
For this article, we'll upload a local dataset that includes different types of articles. Check BigQuery’s official documentation to learn how to upload local files.
Dataset
The file we will upload to BigQuery has data from a server log with HTTP responses and their descriptions in a ndjson format. The ndjson file includes these fields: ip_address
, _timestamp
, http_method
, endpoint
, status_code
, response_time
and status_code_description
.
BigQuery will extract data from this file. Then, we'll consolidate it with Python and index it to Elasticsearch.
Create a file named logs.ndjson and populate it with the following:
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:03Z", "http_method": "GET", "endpoint": "/about", "status_code": "404", "response_time": 89, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:07Z", "http_method": "GET", "endpoint": "/contact", "status_code": "404", "response_time": 76, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:01Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 123, "status_code_description": "OK"}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:04Z", "http_method": "GET", "endpoint": "/products", "status_code": "200", "response_time": 156, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:05Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 101, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:08Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 98, "status_code_description": "OK"}
{"ip_address": "192.168.1.6", "_timestamp": "2024-12-03T12:00:10Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 105, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:02Z", "http_method": "POST", "endpoint": "/login", "status_code": "500", "response_time": 340, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.5", "_timestamp": "2024-12-03T12:00:09Z", "http_method": "POST", "endpoint": "/payment", "status_code": "500", "response_time": 512, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.4", "_timestamp": "2024-12-03T12:00:06Z", "http_method": "POST", "endpoint": "/checkout", "status_code": "503", "response_time": 450, "status_code_description": "Service temporarily unavailable during the checkout process."}
We upload this file to the dataset we've just created (shown as "server_logs") and use "logs" as table name (shown as "table id").
Once you're done, your files should look like this:

Configure the BigQuery Python client
Below, we'll learn how to use the BigQuery Python client and Google Colab to build an app.
1. Dependencies
First, we must install the following dependencies:
!pip install google-cloud-bigquery elasticsearch==8.16
The google-cloud-bigquery
dependency has the necessary tools to consume the BigQuery data, elasticsearch allows it to connect to Elastic and index the data, and getpass lets us enter sensitive variables without exposing them in the code. Let's import all the necessary dependencies:
from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass
from datetime import datetime
import json
We also need to declare other variables and initialize the Elasticsearch client for Python:
ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")
# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f'{PROJECT_ID}.server-logs'
# Elasticsearch client
es_client = Elasticsearch(
ELASTICSEARCH_ENDPOINT,
api_key=ELASTIC_API_KEY,
)
2. Authentication
To get the necessary credentials to use BigQuery, we'll use auth. Run the command line below and choose the same account you used to create the Google Cloud project:
auth.authenticate_user()
Now, let's see the data in BigQuery:
client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)
data = {}
for table in tables:
# Table id must be in format <dataset_name>.<table_name>
table_id = f"{DATASET_ID}.{table.table_id}"
print(f"Processing table: {table.table_id}")
# Query to retrieve BigQuery tables data
query = f"""
SELECT *
FROM `{table_id}`
"""
query_job = client.query(query)
results = query_job.result()
print(f"Results for table: {table.table_id}:")
data[table.table_id] = []
for row in results:
# Saving data with key=table_id
data[table.table_id].append(dict(row))
print(row)
# variable with data
logs_data = data['logs']
This should be the result you see:
Processing table: logs
Results for table: logs:
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/about', datetime.datetime(2024, 12, 3, 12, 0, 3, tzinfo=datetime.timezone.utc), 89, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/contact', datetime.datetime(2024, 12, 3, 12, 0, 7, tzinfo=datetime.timezone.utc), 76, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 1, tzinfo=datetime.timezone.utc), 123, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/products', datetime.datetime(2024, 12, 3, 12, 0, 4, tzinfo=datetime.timezone.utc), 156, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 5, tzinfo=datetime.timezone.utc), 101, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 8, tzinfo=datetime.timezone.utc), 98, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 10, tzinfo=datetime.timezone.utc), 105, '192.168.1.6'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/login', datetime.datetime(2024, 12, 3, 12, 0, 2, tzinfo=datetime.timezone.utc), 340, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/payment', datetime.datetime(2024, 12, 3, 12, 0, 9, tzinfo=datetime.timezone.utc), 512, '192.168.1.5'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Service temporarily unavailable during the checkout process.', 503, 'POST', '/checkout', datetime.datetime(2024, 12, 3, 12, 0, 6, tzinfo=datetime.timezone.utc), 450, '192.168.1.4'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
With this simple code, we've extracted the data from BigQuery. We've stored it in the logs_data
variable and can now use it with Elasticsearch.
Index data to Elasticsearch
We'll begin by defining the data structure from the Kibana Devtools console:
es_client.indices.create(
index="bigquery-logs",
body={
"mappings": {
"properties": {
"status_code_description": {"type": "match_only_text"},
"status_code": {"type": "keyword"},
"@timestamp": {"type": "date"},
"ip_address": {"type": "ip"},
"http_method": {"type": "keyword"},
"endpoint": {"type": "keyword"},
"response_time": {"type": "integer"},
}
}
}
)
The match_only_text field is a variant of the text field type that saves disk space by not storing the metadata to calculate scores. We use it since logs are usually time-centric, i.e. the date is more important than the match quality in the text field. Queries that use a textfield are compatible with the ones that use a match_only_text
field.
We'll index the files using the Elasticsearch_bulk api:
bulk_data = []
for log_entry in logs_data:
# Convert timestamp to ISO 8601 string
timestamp_iso8601 = log_entry["_timestamp"].isoformat()
# Prepare action metadata
action_metadata = {
"index": {
"_index": "bigquery-logs",
"_id": f"{log_entry['ip_address']}-{timestamp_iso8601}"
}
}
# Prepare document
document = {
"ip_address": log_entry["ip_address"],
"status_code": log_entry["status_code"],
"@timestamp": timestamp_iso8601,
"http_method": log_entry["http_method"],
"endpoint": log_entry["endpoint"],
"response_time": log_entry["response_time"],
"status_code_description": log_entry["status_code_description"]
}
# Append to bulk data
bulk_data.append(action_metadata)
bulk_data.append(document)
print(bulk_data)
# Indexing data
response = es_client.bulk(body=bulk_data)
Search data
We can now run queries using the data from the bigquery-logs
index.
For this example, we'll run a search using the error descriptions from the server in the (status_code_description
field). In addition, we'll sort them by date and get the IP addresses of the errors:
es_client.search(
index="bigquery-logs",
body={
"query": {"match": {"status_code_description": "error"}},
"sort": [{"@timestamp": {"order": "desc"}}],
"aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},
},
)
This is the result:
{
...
"hits": {
...
"hits": [
{
"_index": "bigquery-logs",
"_id": "192.168.1.5-2024-12-03T12:00:09+00:00",
"_score": null,
"_source": {
"ip_address": "192.168.1.5",
"status_code": 500,
"@timestamp": "2024-12-03T12:00:09+00:00",
"http_method": "POST",
"endpoint": "/payment",
"response_time": 512,
"status_code_description": "Internal error while processing the payment gateway."
},
"sort": [
1733227209000
]
},
{
"_index": "bigquery-logs",
"_id": "192.168.1.2-2024-12-03T12:00:02+00:00",
"_score": null,
"_source": {
"ip_address": "192.168.1.2",
"status_code": 500,
"@timestamp": "2024-12-03T12:00:02+00:00",
"http_method": "POST",
"endpoint": "/login",
"response_time": 340,
"status_code_description": "Internal error while processing the payment gateway."
},
"sort": [
1733227202000
]
}
]
},
"aggregations": {
"by_ip": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "192.168.1.2",
"doc_count": 1
},
{
"key": "192.168.1.5",
"doc_count": 1
}
]
}
}
}
Conclusion
Tools like BigQuery, which help to centralize information, are very useful for data management. In addition to search, using BigQuery with Elasticsearch allows you to leverage the power of ML and data analysis to detect or analyze issues in a simpler and faster way.
Ready to try this out on your own? Start a free trial.
Elasticsearch has integrations for tools from LangChain, Cohere and more. Join our advanced semantic search webinar to build your next GenAI app!
Related content

May 26, 2025
Displaying fields in an Elasticsearch index
Exploring techniques for displaying fields in an Elasticsearch index.

May 9, 2025
Deleting a field from a document in Elasticsearch
Exploring methods for deleting a field from a document in Elasticsearch.

May 21, 2025
Elasticsearch shards and replicas: Getting started guide
Master the concepts of shards and replicas in Elasticsearch and learn how to optimize them.

May 19, 2025
Elasticsearch string contains substring: Advanced query techniques
Explore techniques for querying Elasticsearch to find documents where a field contains a specific substring.

May 16, 2025
How to optimize Elasticsearch disk space and usage
Explaining how to prevent and handle cases when disk is too full (over utilization) and when the disk capacity is underutilized.