Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x | // Worker provides a simple framework around the cluster library to orchestrate a multicore ETL pipeline
// A tasklist is scheduled with a number of workers and then a process function is defined to process
// each element
const cluster = require('cluster');
const os = require('os');
const Promise = require('bluebird');
const isMaster = module.exports.isMaster = cluster.isMaster;
const isWorker = module.exports.isWorker = cluster.isWorker;
// Schedule a list of jobs to be distributed to workers
module.exports.schedule = function(list,threads,reporting) {
let i = 0, last = 0, workers=[], reportInterval;
if (!isMaster)
throw 'No scheduling from a worker';
threads = threads || os.cpus().length;
list = [].concat(list);
function next(worker) {
const item = list.pop();
if (!item) {
if (reporting) console.log('Worker done',worker.num);
worker.disconnect();
worker.done.resolve(true);
} else
worker.send(item);
}
function createWorker() {
const worker = cluster.fork();
worker.num = threads;
worker.done = Promise.defer();
workers.push(worker.done.promise);
worker.on('message',function(msg) {
if (msg.id === 'done')
next(worker);
if (msg.id === 'progress')
i+= msg.items;
});
}
while (threads--)
createWorker();
cluster.on('online',next);
if (reporting)
reportInterval = setInterval(() => {
console.log(i-last,last);
last = i;
},!isNaN(reporting) ? reporting : 1000);
return Promise.all(workers)
.then(() => {
clearInterval(reportInterval);
return i;
});
};
// This function should be overwritten in the worker
module.exports.process = function(d,callback) {
callback();
};
module.exports.progress = function(d) {
if (isWorker)
process.send({id:'progress',items:d});
};
if (isWorker)
process.on('message',d => {
const done = () => process.send({id: 'done'});
if (module.exports.process.length > 1)
module.exports.process(d,done);
else
Promise.resolve(module.exports.process(d)).then(done);
}); |