Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x | const Streamz = require('streamz'); const Promise = require('bluebird'); const util = require('util'); function Update(_c,collection,keys,options) { if (!(this instanceof Streamz)) return new Update(_c,collection,keys,options); if (isNaN(_c)) { options = keys; keys = collection; collection = _c; _c = undefined; } if (keys === undefined) throw new Error('Missing Keys'); Streamz.call(this, _c, null, options); this.collection = Promise.resolve(collection); this.options = options || {}; this.options.pushResults = this.options.pushResults || this.options.pushResult; // legacy this.keys = [].concat(keys); } util.inherits(Update,Streamz); Update.prototype._fn = function(d,cb) { this.collection .then(collection => { const bulk = collection.initializeUnorderedBulkOp(); [].concat(d || []).forEach(d => { const criteria = this.keys.reduce((p,key) => { const keyPieces = key.split('.'); const value = keyPieces.reduce((a, b) => { if (a[b] === undefined) { throw new Error('Key "' + b + '" not found in data ' + JSON.stringify(d)); } return a[b]; }, d); //if query referencing array, use $elemMatch instead of equality match to prevent issues with upsert if(keyPieces[0] == '$push' || keyPieces[0] == '$addToSet') { const arrayProp = keyPieces[1]; if(p[arrayProp] === undefined) { p[arrayProp] = {$elemMatch:{}}; } keyPieces.splice(0,2); p[arrayProp]['$elemMatch'][keyPieces.join('.')] = value; } else { //check if key starts with '$' to remove operator from query key if(key.charAt(0) == '$') { key = key.substring(key.indexOf('.') + 1); } p[key] = value; } return p; },{}); let op = bulk.find(criteria); if (this.options.upsert) { op = op.upsert(); } op.updateOne(d); }); bulk.execute(this.options.writeConcern,(err,d) => { console.log('inserted', d.insertedCount ?? d.nInserted,'upserted', d.upsertedCount ?? d.nUpserted, 'matched', d.matchedCount ?? d.nMatched, err); cb(err,this.options.pushResults && d); }); }); }; module.exports = Update; |