All files / app/lib cluster.js

30.48% Statements 25/82
50% Branches 1/2
0% Functions 0/3
30.48% Lines 25/82

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 8221x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x                                                                                                 21x 21x 21x 21x   21x 21x 21x     21x 21x 21x 21x            
// Worker provides a simple framework around the cluster library to orchestrate a multicore ETL pipeline
// A tasklist is scheduled with a number of workers and then a process function is defined to process
// each element
 
const cluster = require('cluster');
const os = require('os');
const Promise = require('bluebird');
 
const isMaster = module.exports.isMaster = cluster.isMaster;
const isWorker = module.exports.isWorker = cluster.isWorker;
 
 
// Schedule a list of jobs to be distributed to workers
module.exports.schedule = function(list,threads,reporting) {
  let i = 0, last = 0, workers=[], reportInterval;
  if (!isMaster) 
    throw 'No scheduling from a worker';

  threads = threads || os.cpus().length;
  list = [].concat(list);

  function next(worker) {
    const item = list.pop();
    if (!item) {
      if (reporting) console.log('Worker done',worker.num);
      worker.disconnect();
      worker.done.resolve(true);
    } else 
      worker.send(item);
  }

  function createWorker() {
    const worker = cluster.fork();
    
    worker.num = threads;
    worker.done = Promise.defer();
    workers.push(worker.done.promise);
    worker.on('message',function(msg) {
      if (msg.id === 'done')
        next(worker);
      
      if (msg.id === 'progress')
        i+= msg.items;
    });
  }

  while (threads--)
    createWorker();

  cluster.on('online',next);

  if (reporting)
    reportInterval = setInterval(() => {
      console.log(i-last,last);
      last = i;
    },!isNaN(reporting) ? reporting : 1000);

  return Promise.all(workers)
    .then(() => {
      clearInterval(reportInterval);
      return i;
    });
};
 
// This function should be overwritten in the worker
module.exports.process = function(d,callback) {
  callback();
};
 
module.exports.progress = function(d) {
  if (isWorker)
    process.send({id:'progress',items:d});
};
 
if (isWorker)
  process.on('message',d => {
    const done = ()  => process.send({id: 'done'});
    if (module.exports.process.length > 1)
      module.exports.process(d,done);
    else
      Promise.resolve(module.exports.process(d)).then(done);
  });