Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | 21x 21x 21x 21x 22x 22x 22x 11x 11x 11x 11x 11x 11x 11x 11x 11x 22x 10x 10x 10x 22x 22x 22x 22x 21x 21x 21x 21x 12x 12x 10x 10x 10x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 20x 10x 10x 10x 12x 12x 10x 10x 12x 12x 21x 21x 21x | const Streamz = require('streamz'); const Promise = require('bluebird'); const util = require('util'); function Update(_c,collection,keys,options) { if (!(this instanceof Streamz)) return new Update(_c,collection,keys,options); if (isNaN(_c)) { options = keys; keys = collection; collection = _c; _c = undefined; } if (keys === undefined) throw new Error('Missing Keys'); Streamz.call(this, _c, null, options); this.collection = Promise.resolve(collection); this.options = options || {}; this.options.pushResults = this.options.pushResults || this.options.pushResult; // legacy this.keys = [].concat(keys); } util.inherits(Update,Streamz); Update.prototype._fn = function(d) { return this.collection .then(collection => { const bulk = collection.initializeUnorderedBulkOp(); [].concat(d || []).forEach(d => { const criteria = this.keys.reduce((p,key) => { if (d[key] === undefined) throw new Error('Key not found in data'); p[key] = d[key]; return p; },{}); let op = bulk.find(criteria); if (this.options.upsert) op = op.upsert(); let payload = (d.$set || d.$addToSet) ? d : {$set: d}; if (d.$update) payload = d.$update; op.updateOne(payload); }); return bulk.execute(this.options.writeConcern); }) .then(d => { if (this.options.pushResults) return d; }); }; module.exports = Update; |