Robs Blog

Blog containing all sorts of info and help from my day to day life

Categories

Using node.js WebSockets to get live MQTT updates

To keep some of my pages up to date with the live sensor data I am using websockets to get MQTT publications.

On the server there are two parts. A Python script which is monitoring a serial port for data and then publishing it accordingly. Information about this can be found at  XRF Temperature Sensor -> XRF Slice of Pi

The second part on the server is a node.js application which is runs a websocket server. For further information about installing Node.js take a look at Install Node.js v0.8.2 on Raspbian The server application creates an mqtt subscription for every connection, it subscribes to the topic according to the url that the client has come in on i.e. xxxx.xxx.xxx.xx/MyTopic will cause a subscription on MyTopic.

Node.js server code

#!/usr/bin/env node
var util   = require('util');
sys = require("sys");
url = require("url");
var WebSocketServer = require('websocket').server;
var http = require('http');
spawn = require('child_process').spawn;

var args = { /* defaults */
    port: '8000',
    debug: 'true'
};
/* Parse command line options */
var pattern = /^--(.*?)(?:=(.*))?$/;
process.argv.forEach(function(value) {
    var match = pattern.exec(value);
    if (match) {
        args[match[1]] = match[2] ? match[2] : true;
    }
});
var port = parseInt(args.port, 10);
var debug = (args.debug == 'true');

var server = http.createServer(function(request, response) {
    console.log((new Date()) + ' Received request for ' + request.url);
    response.writeHead(404);
    response.end();
});
server.listen(port, function() {
    console.log((new Date()) + ' Server is listening on port ' + port);
});

wsServer = new WebSocketServer({
    httpServer: server,
    // You should not use autoAcceptConnections for production
    // applications, as it defeats all standard cross-origin protection
    // facilities built into the protocol and the browser.  You should
    // *always* verify the connection's origin and decide whether or not
    // to accept it.
    autoAcceptConnections: false
});

function originIsAllowed(origin) {
  // put logic here to detect whether the specified origin is allowed.
  return true;
}

wsServer.on('request', function(request) {
    if (!originIsAllowed(request.origin)) {
      // Make sure we only accept requests from an allowed origin
      request.reject();
      console.log((new Date()) + ' Connection from origin ' + request.origin + ' rejected.');
      return;
    }
    var myProcs=new Array();

    var connection = request.accept('currentcostdata', request.origin);
    console.log((new Date()) + " connection accepted from " + connection.remoteAddress + " via " + request.origin);
    var topicString = request.resource.substring(1);
    console.log((new Date()) + ' Connection Topic: ' + topicString);
    var mosquittopid = request.key;

	console.log((new Date()) + ' Connection ID: ' + mosquittopid);
	mosq = spawn('mosquitto_sub',['-t',topicString]);
	mosq.stdout.setEncoding('utf8')
	console.log((new Date()) + 'PID of mosquitto_sub for ' + topicString + ' ConID: ' + mosquittopid + ' is ' +mosq.pid);
	myProcs[mosquittopid] = mosq.pid;

	mosq.stdout.on('data', function (data) {
		connection.sendUTF(data)
		data=data.replace("\r","").replace("\n","");
		data=data.replace("\r\n","");
		console.log((new Date()) + 'Topic: ' + topicString + ' - ' + data);
	});

    connection.on('message', function(message) {
        if (message.type === 'utf8') {
            console.log('Received Message: ' + message.utf8Data);
            connection.sendUTF(message.utf8Data);
        }
    });

    connection.on('close', function(reasonCode, description) {
		//console.log((new Date()) + ' Peer ' + connection.remoteAddress + ' disconnected.');
		console.log((new Date()) + ' ' + mosquittopid + ' disconnected');
		console.log((new Date()) + ' ' + topicString + ' Closed: pid: ' + myProcs[mosquittopid] + '.');
		process.kill(myProcs[mosquittopid]);
		delete myProcs[mosquittopid];
    });
});

On the client side there is some JavaScript which opens the websockets connection to the server and then listens for updates and acts accoridingly

<script type="text/javascript">// <![CDATA[
		$(document).ready(function() {
			/* Define handler for websocket debug information*/
			function debug(str) {
				$("#debug_txt").html("

"+str+"

");
			};
			function rtrim(str) {
				return str.replace(/\s+$/,"");
			}
			/* Create a websocket connection */
			ws = new WebSocket("ws://myip:8000/CurrentCostTemp","currentcostdata");
			/* Define websocket handlers */
			ws.onmessage = function(evt) {
				var data = rtrim(evt.data);
				var temptitle = data +'\u00B0C';
				$("#TempValue").text(temptitle);
				debug('Temp - ' + temptitle);
			};
			ws.onclose = function() {
				debug("socket closed");
			};
			ws.onopen = function() {
				debug("connected...");
			};
		});

// ]]></script>

August 8th, 2012 by robconvery

3 Responses to “Using node.js WebSockets to get live MQTT updates”

  1. Roger Says:

    You could probably use http://mosquitto.org/js/ to do a similar thing, assuming your web client can access the port the mqtt server is on.

  2. Roger Says:

    That’s not quite right, I mean to say that you can use it to do mqtt over websockets directly. All you need is then something on the server that can convert the incoming websockets requests into connecting to an mqtt server then passing data between the two. mod_websockets for various web servers should do it and I’m sure it would be easy to implement in node. You wouldn’t even need to know anything about mqtt.

  3. robconvery Says:

    Yeah that is something I am going to look into. It was not available when I wrote my code.

Leave a Reply