Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x 21x | // Worker provides a simple framework around the cluster library to orchestrate a multicore ETL pipeline // A tasklist is scheduled with a number of workers and then a process function is defined to process // each element const cluster = require('cluster'); const os = require('os'); const Promise = require('bluebird'); const isMaster = module.exports.isMaster = cluster.isMaster; const isWorker = module.exports.isWorker = cluster.isWorker; // Schedule a list of jobs to be distributed to workers module.exports.schedule = function(list,threads,reporting) { let i = 0, last = 0, workers=[], reportInterval; if (!isMaster) throw 'No scheduling from a worker'; threads = threads || os.cpus().length; list = [].concat(list); function next(worker) { const item = list.pop(); if (!item) { if (reporting) console.log('Worker done',worker.num); worker.disconnect(); worker.done.resolve(true); } else worker.send(item); } function createWorker() { const worker = cluster.fork(); worker.num = threads; worker.done = Promise.defer(); workers.push(worker.done.promise); worker.on('message',function(msg) { if (msg.id === 'done') next(worker); if (msg.id === 'progress') i+= msg.items; }); } while (threads--) createWorker(); cluster.on('online',next); if (reporting) reportInterval = setInterval(() => { console.log(i-last,last); last = i; },!isNaN(reporting) ? reporting : 1000); return Promise.all(workers) .then(() => { clearInterval(reportInterval); return i; }); }; // This function should be overwritten in the worker module.exports.process = function(d,callback) { callback(); }; module.exports.progress = function(d) { if (isWorker) process.send({id:'progress',items:d}); }; if (isWorker) process.on('message',d => { const done = () => process.send({id: 'done'}); if (module.exports.process.length > 1) module.exports.process(d,done); else Promise.resolve(module.exports.process(d)).then(done); }); |