Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x | const Streamz = require('streamz');
const Promise = require('bluebird');
const util = require('util');
function Update(_c,collection,keys,options) {
if (!(this instanceof Streamz))
return new Update(_c,collection,keys,options);
if (isNaN(_c)) {
options = keys;
keys = collection;
collection = _c;
_c = undefined;
}
if (keys === undefined)
throw new Error('Missing Keys');
Streamz.call(this, _c, null, options);
this.collection = Promise.resolve(collection);
this.options = options || {};
this.options.pushResults = this.options.pushResults || this.options.pushResult; // legacy
this.keys = [].concat(keys);
}
util.inherits(Update,Streamz);
Update.prototype._fn = function(d,cb) {
this.collection
.then(collection => {
const bulk = collection.initializeUnorderedBulkOp();
[].concat(d || []).forEach(d => {
const criteria = this.keys.reduce((p,key) => {
const keyPieces = key.split('.');
const value = keyPieces.reduce((a, b) => {
if (a[b] === undefined) {
throw new Error('Key "' + b + '" not found in data ' + JSON.stringify(d));
}
return a[b];
}, d);
//if query referencing array, use $elemMatch instead of equality match to prevent issues with upsert
if(keyPieces[0] == '$push' || keyPieces[0] == '$addToSet') {
const arrayProp = keyPieces[1];
if(p[arrayProp] === undefined) {
p[arrayProp] = {$elemMatch:{}};
}
keyPieces.splice(0,2);
p[arrayProp]['$elemMatch'][keyPieces.join('.')] = value;
} else {
//check if key starts with '$' to remove operator from query key
if(key.charAt(0) == '$') {
key = key.substring(key.indexOf('.') + 1);
}
p[key] = value;
}
return p;
},{});
let op = bulk.find(criteria);
if (this.options.upsert) {
op = op.upsert();
}
op.updateOne(d);
});
bulk.execute(this.options.writeConcern,(err,d) => {
console.log('inserted', d.insertedCount ?? d.nInserted,'upserted', d.upsertedCount ?? d.nUpserted, 'matched', d.matchedCount ?? d.nMatched, err);
cb(err,this.options.pushResults && d);
});
});
};
module.exports = Update;
|