Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | 21x 21x 21x 21x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 21x 21x 21x 21x 8x 8x 21x 21x 21x 7x 7x 7x 7x 7x 7x 21x 21x 21x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x | const Streamz = require('streamz'); const util = require('util'); const Promise = require('bluebird'); function Mysql(pool,options) { if (!(this instanceof Mysql)) return new Mysql(pool); if (!pool) throw 'POOL_MISSING'; Streamz.call(this,options); this.pool = pool; this.options = options || {}; } util.inherits(Mysql,Streamz); Mysql.prototype.getConnection = function() { return Promise.fromNode(this.pool.getConnection.bind(this.pool)) .disposer(connection => connection.release()); }; Mysql.prototype.query = function(query,cb) { return Promise.using(this.getConnection(),connection => { // Trigger callback when we get a connection, not when we (later) get results // allowing overall concurrency to be controlled by the mysql pool if (typeof cb === 'function') cb(); return Promise.fromNode(callback => connection.query(query,callback)); }); }; Mysql.prototype.stream = function(query,cb) { const passThrough = Streamz(); Promise.using(this.getConnection(),connection => { // Trigger callback when we get a connection, not when we (later) get results // allowing overall concurrency to be controlled by the mysql pool if (typeof cb === 'function') cb(); return new Promise((resolve,reject) => { connection.query(query) .stream() .on('end',resolve) .on('error',reject) .pipe(passThrough); passThrough.on('error', err => { connection.destroy(); }); }); }) .catch(e => passThrough.emit('error',e)); return passThrough; }; module.exports = Mysql; |