All files / app/lib/mysql mysql.js

98.24% Statements 56/57
73.33% Branches 11/15
100% Functions 4/4
98.24% Lines 56/57

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 5721x 21x 21x 21x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 21x 21x 21x 21x 8x 8x 21x 21x 21x 7x 7x 7x 7x 7x 7x 21x 21x 21x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x   1x 1x 1x 1x 1x 1x 21x 21x 21x
const Streamz = require('streamz');
const util = require('util');
const Promise = require('bluebird');
 
function Mysql(pool,options) {
  if (!(this instanceof Mysql))
    return new Mysql(pool);
 
  if (!pool)
    throw 'POOL_MISSING';
 
  Streamz.call(this,options);
  this.pool = pool;
  this.options = options || {};
}
 
util.inherits(Mysql,Streamz);
 
Mysql.prototype.getConnection = function() {
  return Promise.fromNode(this.pool.getConnection.bind(this.pool))
    .disposer(connection => connection.release());
};
 
Mysql.prototype.query = function(query,cb) {
  return Promise.using(this.getConnection(),connection => {
    // Trigger callback when we get a connection, not when we (later) get results
    // allowing overall concurrency to be controlled by the mysql pool
    if (typeof cb === 'function') cb();
    return Promise.fromNode(callback => connection.query(query,callback));
  });
};
 
Mysql.prototype.stream = function(query,cb) {
  const passThrough = Streamz();
 
  Promise.using(this.getConnection(),connection => {
    // Trigger callback when we get a connection, not when we (later) get results
    // allowing overall concurrency to be controlled by the mysql pool
    if (typeof cb === 'function') cb();
    return new Promise((resolve,reject) => {
      connection.query(query)
        .stream()
        .on('end',resolve)
        .on('error',reject)
        .pipe(passThrough);
      
      passThrough.on('error', err => {
        connection.destroy();
      });
    });
  })
  .catch(e => passThrough.emit('error',e));
 
  return passThrough;  
};
 
module.exports = Mysql;