Streaming from MongodB

In my MeteorJS app I have external Mongodb in AWS

var url = 'mongodb://ec2-XX-XX-XXX-XX.us-west-2.compute.amazonaws.com:27017/storedata';

MongoClient = require('mongodb').MongoClient, assert = require('assert');

MongoClient.connect(url, function (err, db) {
    assert.equal(null, err);
    console.log("Connected successfully to energy server!");

    var col = db.collection('energydata');

    // Get the results using a find stream
    var cursor = col.find({});
    cursor.on('data', function (doc) {
        console.log('data is : ', doc);
    });

    cursor.once('end', function () {
        db.close();
    });    
});

as per doc:

http://mongodb.github.io/node-mongodb-native/2.0/tutorials/streams/

and this outputs which in my case is every 30 min data:

data is :  { _id: 57e14dce36b7c360df1e7a90,
  point_id: '130-E9DCE6-1327/10',
  time: '06/08/2015 10:00',
  energy: 16,
  created_date: Tue Sep 20 2016 01:00:00 GMT+0100 (BST) }
data is :  { _id: 57e14dce36b7c360df1e7a91,
  point_id: '130-E9DCE6-1327/10',
  time: '06/08/2015 10:30',
  energy: 16,
  created_date: Tue Sep 20 2016 01:00:00 GMT+0100 (BST) }

and so on and wanting to plot

time vs energy

But this is a one time call to get all available data. My question is what is the best practice so that when new data comes in, I will be able to retrieve only the new data and not having to retrieve all the data that’s already read in, as in typical time series visualization application say using Plotly etc?

I’ve done something similar to this (also for energy data). I used the timestamp (the time field in your example) as the MongoDB _id and ignored errors on attempts to re-insert duplicates. That way gave me a “free” index on the timestamp and made chart creation and live updating really easy by using a sort/limit in my find.

Hi Rob

This is going to be using Plotyjs / React so would be great if you can provide example.

I’ve never used Plotlyjs, so I’m going to have to decline :disappointed:

any plotting packages will be ok

Check the code in the repo I linked to. Apart from the charting package, the only real difference between that Blaze solution and a React one is that you render the chart in componentDidMount instead of Template.xxx.onCreated.

Yes I have componentDidMount, it was the data update methodology I am interested in retrieving, i.e only the data that is not already read in…

But the data is a URL link whereas I have data via Mongo_URL DNS from AWS

So do I. The url is used only to insert data into Mongo. Thereafter, whenever new data is inserted into the DB I use this to get it on the client via pub/sub:

And set reactiveVars which were used by the chart’s setData method to dynamically update.

$ meteor npm install
npm WARN enoent ENOENT: no such file or directory, open '/Users/seb/WebstormProjects/ukgrid/package.json’
npm WARN ukgrid No description
npm WARN ukgrid No repository field.
npm WARN ukgrid No README data
npm WARN ukgrid No license field.

$ meteor --version

$ meteor
/Users/seb/.meteor/packages/meteor-tool/.1.4.1_1.1h0re2h++os.osx.x86_64+web.browser+web.cordova/mt-os.osx.x86_64/dev_bundle/lib/node_modules/meteor-promise/promise_server.js:165
throw error;
^

Error: ENOENT: no such file or directory, open '/private/var/folders/7k/1gcvt4dd3rv3pw5l7llrxm54y85xlg/T/mt-2s1ydh/os.json’
at Error (native)

Keeps downloading…

Yeah - it’s pretty old and needs updating for npm and import/export. I was suggesting reading the relevant code (there’s not that much) rather than installing it.

So http://mongodb.github.io/node-mongodb-native/2.0/tutorials/streams/

is not really real time streaming is it?

I’ve never seen that before (MongoDB streams), but is it strictly necessary in Meteor?

Just MongodB driver for nodejs

Now updated for Meteor 1.4/ES2015/import/export. Also added a (brief) README.

Hi Rob

So whenever working with external MongodB (or whatever dB), the backend is always calling and building the entire database? For example say I have initial 1GB of data, and using MongodB driver

server.js

MongoClient = require('mongodb').MongoClient, assert = require('assert');
var url = 'mongodb://ec2-XX-XX-XXX-XX.us-west-2.compute.amazonaws.com:27017/storedata';

 MongoClient.connect(url, function (err, db) {
    assert.equal(null, err);
    console.log("Connected successfully to energy server!");

    var col = db.collection('energydata');

    // Get the results using a find stream
    var cursor = col.find({});
    cursor.on('data', function (doc) {
        console.log('data is : ', doc); 
    });

    cursor.once('end', function () {
        db.close();
    });
});

and then 30 mins later inside cron job I have new data so now dataset is 1.000001 GB, so I again need to run the above code to update the database, even though it is just 0.000001 GB of new data? What I am trying to say is that is there no way of just retrieving data that is not already loaded into app instead of loading the entire database each cron job?

Just did update since you mentioned it was updated but still getting:

$ git pull
$ meteor npm install
$ meteor
/Users/seb/.meteor/packages/meteor-tool/.1.4.1_1.1h0re2h++os.osx.x86_64+web.browser+web.cordova/mt-os.osx.x86_64/isopackets/ddp/npm/node_modules/meteor/promise/node_modules/meteor-promise/promise_server.js:165
throw error;
^

Error: ENOENT: no such file or directory, open '/private/var/folders/7k/1gcvt4dd3rv3pw5l7llrxm54y85xlg/T/mt-19kjk67/os.json’
at Error (native)

As I said earlier, I’ve never seen MongoDB streams before, so cannot comment on what you need to do.

I just tested my repo:

 /home/rfallows: git clone https://github.com/robfallows/ukgrid.git ukg
Cloning into 'ukg'...
remote: Counting objects: 95, done.
remote: Compressing objects: 100% (20/20), done.
remote: Total 95 (delta 8), reused 0 (delta 0), pack-reused 73
Unpacking objects: 100% (95/95), done.
Checking connectivity... done.
 /home/rfallows: cd ukg
  /home/rfallows/ukg: meteor npm i
ukgrid@ /home/rfallows/ukg
├── highcharts@5.0.0
├─┬ meteor-node-stubs@0.2.3
│ ├── assert@1.3.0
│ ├─┬ browserify-zlib@0.1.4
│ │ └── pako@0.2.8
│ ├─┬ buffer@4.5.1
│ │ ├── base64-js@1.1.2
│ │ ├── ieee754@1.1.6
│ │ └── isarray@1.0.0
│ ├─┬ console-browserify@1.1.0
│ │ └── date-now@0.1.4
│ ├── constants-browserify@1.0.0
│ ├─┬ crypto-browserify@3.11.0
│ │ ├─┬ browserify-cipher@1.0.0
│ │ │ ├─┬ browserify-aes@1.0.6
│ │ │ │ ├── buffer-xor@1.0.3
│ │ │ │ └── cipher-base@1.0.2
│ │ │ ├─┬ browserify-des@1.0.0
│ │ │ │ ├── cipher-base@1.0.2
│ │ │ │ └─┬ des.js@1.0.0
│ │ │ │   └── minimalistic-assert@1.0.0
│ │ │ └── evp_bytestokey@1.0.0
│ │ ├─┬ browserify-sign@4.0.0
│ │ │ ├── bn.js@4.11.1
│ │ │ ├── browserify-rsa@4.0.1
│ │ │ ├─┬ elliptic@6.2.3
│ │ │ │ ├── brorand@1.0.5
│ │ │ │ └── hash.js@1.0.3
│ │ │ └─┬ parse-asn1@5.0.0
│ │ │   ├─┬ asn1.js@4.5.2
│ │ │   │ └── minimalistic-assert@1.0.0
│ │ │   ├─┬ browserify-aes@1.0.6
│ │ │   │ ├── buffer-xor@1.0.3
│ │ │   │ └── cipher-base@1.0.2
│ │ │   └── evp_bytestokey@1.0.0
│ │ ├─┬ create-ecdh@4.0.0
│ │ │ ├── bn.js@4.11.1
│ │ │ └─┬ elliptic@6.2.3
│ │ │   ├── brorand@1.0.5
│ │ │   └── hash.js@1.0.3
│ │ ├─┬ create-hash@1.1.2
│ │ │ ├── cipher-base@1.0.2
│ │ │ ├── ripemd160@1.0.1
│ │ │ └── sha.js@2.4.5
│ │ ├── create-hmac@1.1.4
│ │ ├─┬ diffie-hellman@5.0.2
│ │ │ ├── bn.js@4.11.1
│ │ │ └─┬ miller-rabin@4.0.0
│ │ │   └── brorand@1.0.5
│ │ ├── inherits@2.0.1
│ │ ├── pbkdf2@3.0.4
│ │ ├─┬ public-encrypt@4.0.0
│ │ │ ├── bn.js@4.11.1
│ │ │ ├── browserify-rsa@4.0.1
│ │ │ └─┬ parse-asn1@5.0.0
│ │ │   ├─┬ asn1.js@4.5.2
│ │ │   │ └── minimalistic-assert@1.0.0
│ │ │   ├─┬ browserify-aes@1.0.6
│ │ │   │ ├── buffer-xor@1.0.3
│ │ │   │ └── cipher-base@1.0.2
│ │ │   └── evp_bytestokey@1.0.0
│ │ └── randombytes@2.0.3
│ ├── domain-browser@1.1.7
│ ├── events@1.1.0
│ ├─┬ http-browserify@1.7.0
│ │ ├── Base64@0.2.1
│ │ └── inherits@2.0.1
│ ├── https-browserify@0.0.1
│ ├── os-browserify@0.2.1
│ ├── path-browserify@0.0.0
│ ├── process@0.11.2
│ ├── punycode@1.4.1
│ ├── querystring-es3@0.2.1
│ ├─┬ readable-stream@2.0.6
│ │ ├── core-util-is@1.0.2
│ │ ├── inherits@2.0.1
│ │ ├── isarray@1.0.0
│ │ ├── process-nextick-args@1.0.6
│ │ └── util-deprecate@1.0.2
│ ├─┬ stream-browserify@2.0.1
│ │ └── inherits@2.0.1
│ ├── string_decoder@0.10.31
│ ├── timers-browserify@1.4.2
│ ├── tty-browserify@0.0.0
│ ├─┬ url@0.11.0
│ │ ├── punycode@1.3.2
│ │ └── querystring@0.2.0
│ ├─┬ util@0.10.3
│ │ └── inherits@2.0.1
│ └─┬ vm-browserify@0.0.4
│   └── indexof@0.0.1
├── moment@2.15.1
└─┬ xml2js@0.4.17
  ├── sax@1.2.1
  └─┬ xmlbuilder@4.2.1
    └── lodash@4.16.4

  /home/rfallows/ukg: meteor --settings settings.json
[[[[[ ~/ukg ]]]]]

=> Started proxy.
=> Started MongoDB.
=> Started your app.

=> App running at: http://localhost:3000/

Hi Rob,

I think the streaming is not streaming per se in Mongodb, but cursor.on and cursor.once as convenient methods…

A MongoDB stream is just an efficient mechanism for permuting a cursor without having to dump the entire result set into memory. It’s just a Node stream under the hood.

It sounds like you’re trying to use it “tail” your collection – a stream can only be used in this way for a capped collection
https://docs.mongodb.com/manual/core/tailable-cursors/

Since your dataset only updates every 30 minutes, it’d be much more practical if you just query every N minutes and use a well-covered selector to restrict to documents inserted since time X. If you need to synchronize existing docs as well, you’d either need to track lastModified times as well, or tail the oplog. Meteor’s mongo driver implements oplog tailing for you, or you could roll your own solution.

I recently implemented a Node server which replicates and syncs large collections to elasticsearch in realtime – streams were useful to scalably permute each collection, and to tail the oplog to catch the diffs.

I think the part you want to focus on in your code is this:

    // Get the results using a find stream
    var cursor = col.find({});

Your find selector tells it to get everything. So, that’s what it does each time it is run.

You want to change your selector so it only gets the data added since the last time you received data.

I noticed you said “and then 30 mins later inside cron job”. So, if you’re running this script once every 30 minutes then you only need to change this code’s find selector to get data added after 30 minutes ago.

var thirtyMinutesAgo = Date.now() - (30 * 60 * 1000);
  , selector = { created_date: { $gt: thirtyMinutesAgo } };

var cursor = col.find(selector);

This relies on the created_date field being comparable to time in milliseconds.