In this article, we will guide you through the process of creating a real-time data ingestion system for Nifty 50 stocks using Python, Elasticsearch, and Docker. The project involves retrieving stock data from the NSE India API, processing it, and storing it in Elasticsearch to enable efficient querying and analysis. For simplicity, we will retrieve data on an hourly basis, but you can adjust the frequency as needed. Additionally, we will set up and configure the application using Docker Compose.
In the future, we plan to create a dashboard using Flask to visualize the data we have collected. This dashboard will enable us to interact with the data and gain insights from it. Additionally, we intend to incorporate machine learning models to analyze the stock data and make predictions based on it.
Project Overview
Our project involves creating a Python application that will perform the following tasks:
- Fetches Hourly Nifty 50 Stock Data: Periodically retrieves data from the NSE India API.
- Processes and Transforms Data: Formats the data to match our Elasticsearch index schema.
- Ingests Data into Elasticsearch: The processed data is stored in an Elasticsearch index for the purpose of querying and visualization.
- Containerizes the Application: Utilizes Docker to bundle and execute the application within a containerized environment.
- Manages Configuration: Utilizes Docker Compose for managing configuration and environment variables.
Fetching Hourly Nifty 50 Stock Data
The data is retrieved from the NSE India API, which provides real-time stock data. We will use the Python requests library to fetch this data.
Python Code for Fetching Data
import requests
import logging
# API URL
NSE_API_URL = "https://www.nseindia.com/api/equity-stockIndices?index=NIFTY%2050"
def fetch_nifty50_data():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36'
}
try:
response = requests.get(NSE_API_URL, headers=headers)
response.raise_for_status()
logging.debug("Fetched data from NSE API successfully.")
logging.debug(f"Response JSON: {response.json()}")
return response.json()
except requests.RequestException as e:
logging.error(f"Error fetching data from NSE API: {e}")
return {}
- We define the API URL to fetch Nifty 50 stock data.
- We use the
requestslibrary to send a GET request. - We handle exceptions to log any errors encountered during the request.
Processing and Transforming Data
The retrieved data must be modified to match the schema of the Elasticsearch index. We will align the fields from the API response to meet the structure needed by Elasticsearch. All fields will be ingested in this instance, but it can also be customized to send only specific fields if required.
Python Code for Processing Data
from datetime import datetime
def process_data(data):
processed_records = []
for item in data.get('data', []):
record = {
'priority': item.get('priority'),
'symbol': item.get('symbol'),
'identifier': item.get('identifier'),
'series': item.get('series'),
'open': item.get('open'),
'dayHigh': item.get('dayHigh'),
'dayLow': item.get('dayLow'),
'lastPrice': item.get('lastPrice'),
'previousClose': item.get('previousClose'),
'change': item.get('change'),
'pChange': item.get('pChange'),
'totalTradedVolume': item.get('totalTradedVolume'),
'totalTradedValue': item.get('totalTradedValue'),
'lastUpdateTime': item.get('lastUpdateTime'),
'yearHigh': item.get('yearHigh'),
'ffmc': item.get('ffmc'),
'yearLow': item.get('yearLow'),
'nearWKH': item.get('nearWKH'),
'nearWKL': item.get('nearWKL'),
'perChange365d': item.get('perChange365d'),
'date365dAgo': item.get('date365dAgo'),
'chart365dPath': item.get('chart365dPath'),
'date30dAgo': item.get('date30dAgo'),
'perChange30d': item.get('perChange30d'),
'chart30dPath': item.get('chart30dPath'),
'chartTodayPath': item.get('chartTodayPath'),
'meta': item.get('meta'),
'timestamp': datetime.now()
}
processed_records.append(record)
return processed_records
In this function:
- We extract and map the fields from the raw data.
- We add a
timestampto each record to capture when the data was ingested.
Ingesting Data into Elasticsearch
We use the elasticsearch Python library to index the transformed data into Elasticsearch.
Python Code for Ingesting Data
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError, RequestError, ConnectionError
def ingest_to_elasticsearch(es, index, data):
for record in data:
try:
es.index(index=index, body=record)
logging.debug(f"Document indexed: {record}")
except NotFoundError as e:
logging.error(f"Index not found: {e}")
except RequestError as e:
logging.error(f"Request error: {e}")
except ConnectionError as e:
logging.error(f"Connection error: {e}")
except Exception as e:
logging.error(f"Error indexing document: {e}")
In this function:
- We index each record into Elasticsearch.
- We handle exceptions related to indexing and connection issues.
Containerizing the Application with Docker
Docker is used to package the application and its dependencies into a container, ensuring consistent behavior across different environments.
Dockerfile
# Use official Python image as a parent image
FROM python:3.9-slim
# Set the working directory
WORKDIR /app
# Copy the requirements.txt file into the container
COPY requirements.txt .
# Install any dependencies specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY app.py .
# Run the Python script
CMD ["python", "app.py"]
Managing Configuration with Docker Compose
Docker Compose simplifies the management of environment variables and service configuration.
docker-compose.yml
version: '3.7'
services:
nifty50-stock-ingestor:
build:
context: ./app
dockerfile: Dockerfile.app
container_name: nifty50-stock-ingestor
environment:
- ELASTICSEARCH_HOST={Replace Host}
- ELASTICSEARCH_USERNAME={Replace Username}
- ELASTICSEARCH_PASSWORD={Replace Password}
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
The docker-compose.yml:
- Builds the Docker image from the specified
Dockerfile. - Defines environment variables for Elasticsearch configuration.
- Configures logging options.
- Sets the container to restart unless stopped manually.
Summary
In this article, we developed a Python application to fetch, process, and load Nifty 50 stock data into Elasticsearch. Docker was used to containerize the application, while Docker Compose managed the configuration and environment variables. This setup allows for straightforward scaling and deployment of the application in various environments.
With this approach, you can establish a robust pipeline for ingesting real-time stock data, ensuring efficient querying and analysis capabilities with Elasticsearch.
The Project code is available on my GitHub.
#Python #Flask #Elasticsearch #ElasticAPM #Docker #WebDevelopment #RealTimeApplications #Monitoring #DevOps
Reach out at Linkedin for any questions






Leave a comment