Computing stuff tied to the physical world

Flow – the inside perspective

In Software on Sep 17, 2013 at 00:01

This is the last of a four-part series on designing big apps (“big” as in not embedded, not necessarily many lines of code – on the contrary, in fact).

The current 0.8 version of HouseMon is my first foray into the world of streams and promises, on the host server as well as in the browser client.

First a note about source code structure: HouseMon consists of a set of subfolders in the app folder, and differs from the previous SocketStream-based design in that dependency info, client-side code, client-side layout files + images, and host-side code are now all grouped per module, instead of strewn across client, static, and server folders.

The “main” module starts the ball rolling, the other modules mostly just register themselves in a global app “registry”, which is simply a tree of nested key/value mappings. Here’s a somewhat stripped down version of app/main/host.coffee:

module.exports = (app, plugin) ->
  app.register 'nodemap.rf12-868,42,2', 'testnode'
  app.register 'nodemap.rf12-2', 'roomnode'
  app.register 'nodemap.rf12-3', 'radioblip'
  # etc...

  app.on 'running', ->
    Serial = @registry.interface.serial
    Logger = @registry.sink.logger
    Parser = @registry.pipe.parser
    Dispatcher = @registry.pipe.dispatcher
    ReadingLog = @registry.sink.readinglog

    app.db.on 'put', (key, val) ->
      console.log 'db:', key, '=', val

    jeelink = new Serial('usb-A900ad5m').on 'open', ->

      jeelink # log raw data to file, as timestamped lines of text
        .pipe(new Logger) # sink, can't chain this further

      jeelink # log decoded readings as entries in the database
        .pipe(new Parser)
        .pipe(new Dispatcher)
        .pipe(new ReadingLog app.db)

This is all server-side stuff and many details of the API are still in flux. It’s reading data from a JeeLink via USB serial, at which point we get a stream of messages of the form:

{ dev: 'usb-A900ad5m', msg: 'OK 2 116 254 1 0 121 15' }

(this can be seen by inserting “.on('data', console.log)” in the above pipeline)

These messages get sent to a “Logger” stream, which saves things to file in raw form:

L 17:18:49.618 usb-A900ad5m OK 2 116 254 1 0 121 15

One nice property of Node streams is that you can connect them to multiple outputs, and they’ll each receive these messages. So in the code above, the messages are also sent to a pipeline of “transform” streams.

The “Parser” stream understands RF12demo’s text output lines, and transforms it into this:

{ dev: 'usb-A900ad5m',
  msg: <Buffer 02 74 fe 01 00 79 0f>,
  type: 'rf12-868,42,2' }

Now each message has a type and a binary payload. The next step takes this through a “Dispatcher”, which looks up the type in the registry to tag it as a “testnode” and then processes this data using the definition stored in app/drivers/testnode.coffee:

module.exports = (app) ->

  app.register 'driver.testnode',
    in: 'Buffer'
    out:
      batt:
        title: 'Battery status', unit: 'V', scale: 3, min: 0, max: 5

    decode: (data) ->
      { batt: data.msg.readUInt16LE 5 }

A lot of this is meta information about the results decoded by this particular driver. The result of the dispatch process is a message like this:

{ dev: 'usb-A900ad5m',
  msg: { batt: 3961 },
  type: 'testnode',
  tag: 'rf12-868,42,2' }

The data has now been decoded. The result is one parameter in this case. At the end of the pipeline is the “ReadingLog” write stream which timestamps the data and saves it into the database. To see what’s going on, I’ve added an “on ‘put'” handler, which shows all changes to the database, such as:

db: reading~testnode~rf12-868,42,2~1379353286905 = { batt: 3961 }

The ~ separator is a convention with LevelDB to partition the key namespace. Since the timestamp is included in the key, this storage will store each entry in the database, i.e. as historical storage.

This was just a first example. The “StatusTable” stream takes a reading such as this:

db: reading~roomnode~rf12-868,5,24~1379344595028 = {
  "light": 20,
  "humi": 68,
  "moved": 1,
  "temp": 143
}

… and stores each value separately:

db: status~roomnode/rf12-868,5,24/light = {
  "key":"roomnode/rf12-868,5,24/light",
  "name":"light",
  "value":20,
  "type":"roomnode",
  "tag":"rf12-868,5,24",
  "time":1379344595028
}
// and 3 more...

Here, the information is placed inside the message, including the time stamp. Keys must be unique, so in this case only the last value will be kept in the database. In other words: this fills a “status” table with the latest value of each measurement value.

As last example, here is a pipeline which replays a saved log file, decodes it, and treats it as new readings (great for testing, but useless in production, clearly):

createLogStream = @registry.source.logstream

createLogStream('app/replay/20121130.txt.gz')
  # .pipe(new Replayer)
  .pipe(new Parser)
  .pipe(new Dispatcher)
  .pipe(new StatusTable app.db)

Unlike the previous HouseMon design, this now properly deals with back-pressure.

The “Replayer” stream is a fun one: it takes each message and delays passing it through (with an updated timestamp), so that this stream will simulate a real-time feed with new messages trickling in. Without it, the above pipeline processes the file as fast as it can.

The next step will be to connect a change stream through the WebSocket to the client, so that live status information can be displayed on-screen, using exactly the same techniques.

As you can see, it’s streams all the way down. Onwards!

  1. Interesting developments on HouseMon. I guess streams are more, well, streamlined; but also less easy to hook into (compared to listening to broadcasted events)?

    I’ll put my work on HouseMon in my house on hold while these considerable changes to the system are made, although once I understand the workings a bit I can move to the 0.8 version, since I’ve got no big setup yet, so no large disruptions if something goes wrong..

    • less easy to hook into (compared to listening to broadcasted events)?

      Right now, yes. But with some pipe-fitting utility code it can probably be made just as easy.

      You can already port things if you want to try out your own driver code. I adapted most of the existing ones in no time. See app/drivers/roomnode.coffee for an example.

      But there’s no interesting client-side presentation yet, so yeah, it’ll take a bit more time to make the setup practical again.

  2. Interesting! … some observations: Timestamping of incoming messages is now done by Serial class. Convenient.

    Now the glue seems to be in app/main/host.coffee. This file now has to know all of the relations to setup a given pipe. So it now becomes harder to replace say jeelink with centrallogger (another specialisation of a SerialPort). In the event driven approach, this could be fully controlled through the admin UI.

    Maybe one other decoupling is required, and that is decoupling feeds from essential piplines such as Parser-Dispatcher-ReadingLog.

  3. glue seems to be in app/main/host.coffee

    Yep. The plan is to make this setup data-driven, and to set it up and adjust it through the visual diagram editor I added recently. Then everything becomes a node with wires in between…

Comments are closed.