All files / app/lib/mongo bulk.js

14.28% Statements 11/77
100% Branches 1/1
0% Functions 0/2
14.28% Lines 11/77

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 7821x 21x 21x 21x                                         21x 21x 21x 21x                                                                                             21x 21x 21x  
const Streamz = require('streamz');
const Promise = require('bluebird');
const util = require('util');
 
function Update(_c,collection,keys,options) {
  if (!(this instanceof Streamz))
    return new Update(_c,collection,keys,options);

  if (isNaN(_c)) {
    options = keys;
    keys = collection;
    collection = _c;
    _c = undefined;
  }

  if (keys === undefined)
    throw new Error('Missing Keys');

  Streamz.call(this, _c, null, options);
  this.collection = Promise.resolve(collection);
  this.options = options || {};
  this.options.pushResults = this.options.pushResults || this.options.pushResult; // legacy
  this.keys = [].concat(keys);
}
 
util.inherits(Update,Streamz);
 
Update.prototype._fn = function(d,cb) {
  this.collection
    .then(collection => {
      const bulk = collection.initializeUnorderedBulkOp();

      [].concat(d || []).forEach(d => {
        const criteria = this.keys.reduce((p,key) => {
          const keyPieces = key.split('.');
          const value = keyPieces.reduce((a, b) => {
            if (a[b] === undefined) {
              throw new Error('Key "' + b + '" not found in data ' + JSON.stringify(d));
            }
            return a[b];
          }, d);

          //if query referencing array, use $elemMatch instead of equality match to prevent issues with upsert
          if(keyPieces[0] == '$push' || keyPieces[0] == '$addToSet')  {
            const arrayProp = keyPieces[1];
            if(p[arrayProp] === undefined) {
              p[arrayProp] = {$elemMatch:{}};
            }
            keyPieces.splice(0,2);
            p[arrayProp]['$elemMatch'][keyPieces.join('.')] = value;
          } else {
            //check if key starts with '$' to remove operator from query key
            if(key.charAt(0) == '$') {
              key = key.substring(key.indexOf('.') + 1);
            }
            p[key] = value;
          }
          return p;
        },{});

        let op = bulk.find(criteria);

        if (this.options.upsert) {
          op = op.upsert();
        }

        op.updateOne(d);
      });

      bulk.execute(this.options.writeConcern,(err,d) => {
        console.log('inserted', d.insertedCount ?? d.nInserted,'upserted', d.upsertedCount ?? d.nUpserted, 'matched', d.matchedCount ?? d.nMatched, err);
        cb(err,this.options.pushResults && d);
      });
    });
};
 
module.exports = Update;