All files / app/lib/postgres postgres.js

94.54% Statements 52/55
73.33% Branches 11/15
100% Functions 4/4
94.54% Lines 52/55

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 5621x 21x 21x 21x 21x 5x 5x 5x 5x 5x 5x 5x 5x 21x 21x 13x 13x 13x 21x 21x 12x 12x 12x 12x 12x 12x 12x 21x 21x 1x 1x 1x 1x       1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x  
const Streamz = require('streamz');
const Promise = require('bluebird');
 
class Postgres extends Streamz {
  constructor(pool, options = {}) {
    super(options);
 
    if (!pool)
      throw 'POOL_MISSING';
 
    this.pool = pool;
    this.options = options;
  }
 
  getConnection() {
    return Promise.fromNode(this.pool.connect.bind(this.pool))
      .disposer(connection => connection.release());
  }
 
  query(query, cb) {
    return Promise.using(this.getConnection(), connection => {
      // Trigger callback when we get a connection, not when we (later) get results
      // allowing overall concurrency to be controlled by the Postgres pool
      if (typeof cb === 'function') cb();
      return Promise.fromNode(callback => connection.query(query, callback));
    });
  }
 
  stream(query, cb) {
    const passThrough = Streamz();
    const state = query && query.cursor ? query.cursor.state : query.state;
 
    if (!state || state !== 'initialized') {
      passThrough.emit('error', new Error('Query should be QueryStream'));
      return passThrough;
    }
 
    Promise.using(this.getConnection(), connection => {
      // Trigger callback when we get a connection, not when we (later) get results
      // allowing overall concurrency to be controlled by the Postgres pool
      if (typeof cb === 'function') cb();
      return new Promise((resolve, reject) => {
        connection.query(query)
          .on('end', resolve)
          .on('error', reject)
          .pipe(passThrough);
      });
    })
    .catch(e => passThrough.emit('error', e));
 
    return passThrough;
  }
}
 
module.exports = Postgres;