IoT: Connectivity & Time Series Storage

This is third post in continuation of my post IoT: Thermography based operation monitoring where I talked about creating an IoT system to monitor an operation using a camera and an array of IR sensors.

In this post, I am going to set up a communication and storage system for IoT data. I am planning to use MQTT for communication as it is one of the most widely used protocols for sensor data communication. More details regarding MQTT can be found at http://mqtt.org.

From mqtt.org:

MQTT is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. A MQTT publisher connects to a broker, and can send any arbitrary data to a “topic”. A MQTT subscriber can choose what  “topic(s)” to listen to and process incoming data streams. In this post, we will listen to a topic and write incoming data to our storage system.

We will use Mosquitto as MQTT Broker which is available for download at https://mosquitto.org/.

For storage, I am going to use InfluxDB. It is well maintained and scalable open-source solution provided by Influx Data. More details regarding InfluxDB can be found at https://www.influxdata.com.

You can write a small code to subscribe to MQTT topic and write incoming data to InfluxDB, or you can use Telegraf to achieve the same. Telegraf is one more software available from Influx Data which can subscribe to an MQTT topic and write data to InfluxDB.

To wrap this post up, I’ll show a basic Grafana dashboard to display incoming time series to verify if the complete setup is working or not. Of course, I’ll have a much better visualization scheme in the next post. For Grafana details go to Grafana.net.

Note:

  1. This is very unsecured setup so do not deploy it in a production environment.
  2. I am going to setup it on a ubuntu VM running on AWS t2.micro instance. You can run it on your local Linux machine with minor modifications. Make sure that following ports are open and accessible: 1883 (MQTT), 3000 (Grafana), and 22 (SSH).

Dependencies

Please install following on your Linux server:

Configure Telegraf to have listner on/input from MQTT on topic ‘svt/ir’ with output as InfluxDB. Telegraf will connect to InfluxDB automatically in out-of-box configuration.

On your Raspberry-Pi, install Paho MQTT client:

pip install paho-mqtt

Transmitting sensor data from Raspberry Pi

We are going to upload a picture taken by the camera in the beginning. In a real system, we may want to upload continuous stream of image snapshots but that requires very high network bandwidth. We will upload the image using SCP (copy over SSH).

We will also transmit 64 IR pixel values as well average of all the IR pixel as a background temperature. We are going to read 64 IR pixel values from socket /var/run/mlx9062x.sock, calculate an average value and publish to MQTT broker in the following format.

 ir p0=<value0>,p1=<value1> .... p63=<value63>,avg=<average> <timestamp>

Python code and explanation:

1. Header setup and python modules

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
 
import os, sys, time
import numpy as np
from time import sleep
import subprocess
import paho.mqtt.client as mqtt
 
IP = r'&lt;server-ip&gt;'
USER = r'&lt;user&gt;'
KEY  = r’&lt;user-credentials&gt;

2. Capture a picture using Raspberry Pi camera and upload it to the server:

12
13
14
15
16
17
18
19
20
21
22
23
24
def getImage():
    fn = r'/home/pi/pics.jpg';
    proc = subprocess.Popen('raspistill -o %s -w 640 -h 480 -n -t 3' %(fn),
                        shell=True, stderr=subprocess.STDOUT)
    proc.wait()
 
    proc = subprocess.Popen('scp -i &lt;user-credentials&gt; %s &lt;user&gt;@&lt;server-ip&gt;:.' %(fn),
                        shell=True, stderr=subprocess.STDOUT)
    proc.wait()
 
    print "Image uploaded successfully"
 
getImage()

3. Set up a connection to the socket for the IR data and to MQTT broker:

25
26
27
28
29
30
31
32
33
fifo = open('/var/run/mlx9062x.sock', 'r')
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
 
client = mqtt.Client()
client.on_connect = on_connect
 
client.connect(IP, 1883, 60)

4. Publish data to the server:

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
while True:
    sleep(1)        
    ir_raw = fifo.read()
    ir_trimmed = ir_raw[0:128]
    ir = np.frombuffer(ir_trimmed, np.uint16)
    str1 = 'ir p0='+str(ir[0])
    for i in range(1, 64):
       str1 = str1 + ',p'+str(i)+'='+str(ir[i])
    avg = str(int(np.mean(ir)))
    now = int (time.time())
    str1 = str1 + ",avg=" + avg + " " + str(now) + "000000000"
    print str1
    client.publish('svt/ir', str1, qos=0, retain=False)
 
fifo.close()

Go to http://<server-ip>:3000 and set up a Grafana dashboard to display average temperature data.

There are excellent tutorials available online on how to setup basic dashboard. InfluxDB database name is telegraf with measurement name being ir. The average temperature is stored in avg column.

If everything works, you should see following in your Grafana dashboard:

Next week: How to display IR sensor data in a more meaningful manner?

Leave a Reply

Your email address will not be published. Required fields are marked *