Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | 21x 21x 21x 21x 21x 5x 5x 5x 5x 5x 5x 5x 5x 21x 21x 13x 13x 13x 21x 21x 12x 12x 12x 12x 12x 12x 12x 21x 21x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x | const Streamz = require('streamz'); const Promise = require('bluebird'); class Postgres extends Streamz { constructor(pool, options = {}) { super(options); if (!pool) throw 'POOL_MISSING'; this.pool = pool; this.options = options; } getConnection() { return Promise.fromNode(this.pool.connect.bind(this.pool)) .disposer(connection => connection.release()); } query(query, cb) { return Promise.using(this.getConnection(), connection => { // Trigger callback when we get a connection, not when we (later) get results // allowing overall concurrency to be controlled by the Postgres pool if (typeof cb === 'function') cb(); return Promise.fromNode(callback => connection.query(query, callback)); }); } stream(query, cb) { const passThrough = Streamz(); const state = query && query.cursor ? query.cursor.state : query.state; if (!state || state !== 'initialized') { passThrough.emit('error', new Error('Query should be QueryStream')); return passThrough; } Promise.using(this.getConnection(), connection => { // Trigger callback when we get a connection, not when we (later) get results // allowing overall concurrency to be controlled by the Postgres pool if (typeof cb === 'function') cb(); return new Promise((resolve, reject) => { connection.query(query) .on('end', resolve) .on('error', reject) .pipe(passThrough); }); }) .catch(e => passThrough.emit('error', e)); return passThrough; } } module.exports = Postgres; |