Introduction
The rapid increase in connected devices in our data-driven environment has made it crucial to handle vast amounts of information effectively. MQTT (Message Queuing Telemetry Transport) is a streamlined messaging protocol that’s particularly suited for devices with limited resources and low-bandwidth networks, making it perfect for Internet of Things (IoT) applications. An MQTT broker, like Coreflux, paired with a scalable cloud platform such as DigitalOcean, can address the challenges associated with processing and analyzing IoT data.
This guide will show you how to link an MQTT broker with a managed OpenSearch service on DigitalOcean. This integration facilitates the real-time collection and storage of data, which simplifies the monitoring, analysis, and visualization of your IoT data.
Coreflux & DigitalOcean Partnership
Coreflux provides a lightweight MQTT Broker designed to enhance IoT communication on DigitalOcean.
What is MQTT?
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe networking protocol that is widely embraced in IoT ecosystems. It is designed to function effectively in situations involving devices with constraints, as well as in low-bandwidth, high-latency, or unreliable network conditions, allowing for efficient, real-time messaging.
About Coreflux
Coreflux provides a lightweight MQTT broker that enables efficient, real-time communication between IoT devices and applications. Built for optimal scalability and reliability, Coreflux is crafted for environments that require low latency and high throughput.
With Coreflux, you can ensure an efficient messaging system that supports seamless data flow between devices, whether you’re creating a small IoT project or implementing a comprehensive industrial monitoring system.
By using Coreflux on DigitalOcean, you can benefit from:
- Scalability: Effortlessly manage increasing amounts of data and devices without sacrificing performance.
- Reliability: Guarantee consistent messaging across all connected devices.
- Efficiency: Make the most of bandwidth in environments with limited network resources.
The preceding illustration shows a real-world case of solar power park monitoring using OpenSearch Dashboards.
Prerequisites
Before you begin the integration process, ensure you have the following:
-
A DigitalOcean account. If you haven’t created one yet, you can sign up at DigitalOcean.
-
Coreflux Broker should be set up and reachable. If it’s not already configured, refer to the Coreflux documentation or the introduction steps in this guide.
-
MQTT Explorer: This application is needed to interface with the MQTT broker. You can obtain it from its official website.
-
Python Environment: Ensure Python is installed along with the necessary libraries such as
paho-mqtt
andOpenSearch-py
. -
You will also require the Python script that connects the Coreflux MQTT broker with your DigitalOcean OpenSearch instance. This script is responsible for monitoring published MQTT messages, processing them, and saving them in OpenSearch.
If you are unfamiliar with Python, here’s a brief overview of what the script accomplishes:
It connects to Coreflux using the paho-mqtt library, subscribes to designated topics, processes and indexes the data it receives, and can also provide feedback published back to the MQTT broker to indicate processing status.
Step 1 – Set Up Your Coreflux MQTT Broker
You can refer to a Coreflux tutorial for a quick guide on starting a Free Trial for the Online MQTT Broker:
Or you can follow the step-by-step instructions below:
-
Create a Coreflux Account
- Visit the Coreflux website and sign up for a free account if you don’t already have one.
- Once registered, confirm your email address to activate your account.
-
Start a Free Trial Broker
- Once you log in, navigate to the MQTT Broker section.
- Select the option to start a free trial for a new MQTT broker.
- Choose a data center region that is closest to your IoT devices or the DigitalOcean data center where you plan to deploy OpenSearch to minimize latency.
- Confirm your selections to initiate the trial.
-
Receive Broker Credentials
- Coreflux will email you the credentials required to access your broker (like broker URL, port, username, and password) right after the broker is created.
- Safeguard these credentials for configuring your IoT devices and the forthcoming Python script.
-
Set Up MQTT Explorer
- Install the MQTT Explorer tool if you haven’t already.
- Configure MQTT Explorer to connect to your Coreflux broker using the credentials you received:
- Broker Address: Enter the broker URL.
- Port: Use the provided port (typically 8883 for SSL connections).
- Username/Password: Provide the credentials sent to your email.
- Establish a connection to the broker and try subscribing to a topic to verify functionality.
-
Test the Broker Connection
- Publish a test message from MQTT Explorer to one of the topics on your Coreflux broker.
- Ensure that the message is received and accurately displayed in the Explorer, confirming your broker is operational.
Step 2 – Set Up Your OpenSearch Instance on DigitalOcean
With your Coreflux MQTT broker in place and tested, it’s time to connect it to a managed OpenSearch instance on DigitalOcean:
-
Log in to DigitalOcean:
- Visit the DigitalOcean dashboard and log in using your credentials.
-
Create a New Database:
- From the dashboard, select the Databases option from the menu.
- Click on Create Database Cluster.
- Select OpenSearch from the available database types.
-
Configure Your OpenSearch Instance:
- Select a data center region that is geographically near your IoT devices or the Coreflux broker to reduce latency.
- Choose your plan, starting with a basic one that fits your current requirements, with the option to scale up as necessary.
-
Create the Cluster:
- After configuration, click on Create Cluster.
- Wait for DigitalOcean to provision your OpenSearch instance, a process that could take several minutes.
-
Get Your Connection Details:
- Access the Connection Details tab in your database cluster after the cluster has been created.
- Make note of the following details:
- Host
- Port
- Username
- Password
- These details will be important for the connection established in your Python script.
Step 3: Mapping the Index in Your OpenSearch Instance
Before indexing data from your Coreflux MQTT broker into OpenSearch, it’s necessary to define the mapping for your index, which outlines the schema and specifies data types for each field.
Here’s how to create and map an index in your OpenSearch instance:
Access the OpenSearch Dashboard
Log into the OpenSearch dashboard using the connection details collected during the setup and navigate to the “Index Management” section.
Create a New Index
Begin by clicking on Create Index and provide a name for your index, such as machine_production
.
Define the Mapping
During the index creation process, click on the Mappings tab and establish the fields for your data. Here is an example mapping:
{
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"machine_id": {
"type": "keyword"
},
"temperature": {
"type": "float"
},
"status": {
"type": "keyword"
},
"error_code": {
"type": "integer"
}
}
}
}
In this example:
- timestamp is recorded as a date type, suitable for time-related searches.
- machine_id and status use the keyword type, which allows for exact matching without analysis.
- temperature is classified as a float type to accommodate decimal numbers.
- error_code is maintained as an integer type for whole numbers.
Finalize the Index Creation
After you’ve defined your mappings, review the settings and click Create Index. OpenSearch will generate the index according to the mappings specified, getting it ready to store and manage data published from your Coreflux MQTT broker.
Test the Mapping
Utilize your Python script or the OpenSearch API to index a test document to ensure it complies with the defined mapping.
An example of a test document:
{
"timestamp": "2024-08-23T10:30:00Z",
"machine_id": "MACHINE123",
"temperature": 75.5,
"status": "operational",
"error_code": 0
}
Push this document into the machine-production
index (or your selected index) and validate that all fields are stored and searchable as intended.
Step 4 – Integrate Coreflux with OpenSearch Using Python
Now that Coreflux and OpenSearch are established, it’s time to connect the two using a Python script that communicates with the Coreflux broker, processes messages, and saves data into OpenSearch.
Set Up Your Environment Variables
In the directory of your Python script, create a .env
file. Add the following environment variables, substituting the placeholder values for your actual credentials:
MQTT_BROKER=
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
OPENSEARCH_HOST=
OPENSEARCH_USERNAME=
OPENSEARCH_PASSWORD=
Install Required Python Libraries
Make sure you have the essential Python libraries installed using pip
.
pip install paho-mqtt opensearch-py python-dotenv
Write or Configure the Python Script
Utilize the available Python script, which connects to the Coreflux MQTT broker, listens to published messages, and indexes them into OpenSearch. Be certain the script refers correctly to the environment variables you set up.
Run the Script
Execute the Python script. It should connect to the Coreflux broker, subscribe to targeted topics, and store the messages in your OpenSearch instance.
python mqttToOS.py
Keep an eye on the output to confirm that messages are processed and saved as expected.
Step 5 – Have fun with your integration
Test Data Flow
- Publish Sample Data: Use MQTT Explorer to send sample datasets to your Coreflux broker. Experiment with various payload structures to observe how they are handled and saved in OpenSearch.
– Data Validation: Check if the data in OpenSearch corresponds accurately with the payloads you sent. Ensure consistency and correctness of your integration.
– Real-Time Monitoring: Set up a real-time feed with MQTT Explorer to continuously publish messages and observe how OpenSearch processes the incoming data streams.
Build Visualizations
- Create Dashboards: Leverage OpenSearch’s dashboarding tools to create visual representations of your IoT data. Monitor metrics such as device uptime and sensor outputs.
- Trend Analysis: Explore trends over time by aggregating data in OpenSearch to identify patterns or anomalies.
- Geo-Visualizations: If your data includes location information, create geographical maps depicting data points based on location.
Optimize and Scale
-
Performance Tuning: Experiment with various configurations for both the broker and OpenSearch to optimize performance.
-
Load Testing: Simulate high traffic conditions by broadcasting numerous messages simultaneously to identify potential bottlenecks.
-
Scaling: DigitalOcean provides the option to scale resources as needed based on your growth in data needs.
Conclusion
Connecting Coreflux MQTT Broker with DigitalOcean’s Managed OpenSearch service provides an effective solution for real-time IoT data analytics. This guide has equipped you with the tools needed to establish a data pipeline for collecting, processing, and visualizing IoT data proficiently.
With the reliability of Coreflux and the powerful search capabilities of OpenSearch, you can efficiently manage large volumes of data and uncover meaningful insights in real time. Whether you are overseeing industrial processes, monitoring environmental data, or managing smart home technologies, this integration enables swift, data-driven decision-making.
Learn how to get started with OpenSearch on DigitalOcean.
Try out a Coreflux Online MQTT Broker trial or explore additional information through the Coreflux Documentation and Tutorials.
Thank you for engaging with the DigitalOcean Community. Explore our offerings in compute, storage, networking, and managed databases.
Learn more about our products.
Sr. Technical Writer at DigitalOcean | Medium Top Writers (AI & ChatGPT) | 2M+ monthly views & 34K Subscribers | Former Cloud Consultant at AMEX | Ex-SRE (DevOps) at NUTANIX
Welcome to DediRock, your trusted partner in high-performance hosting solutions. At DediRock, we specialize in providing dedicated servers, VPS hosting, and cloud services tailored to meet the unique needs of businesses and individuals alike. Our mission is to deliver reliable, scalable, and secure hosting solutions that empower our clients to achieve their digital goals. With a commitment to exceptional customer support, cutting-edge technology, and robust infrastructure, DediRock stands out as a leader in the hosting industry. Join us and experience the difference that dedicated service and unwavering reliability can make for your online presence. Launch our website.