Compare commits

..

No commits in common. "master" and "v1.0.1" have entirely different histories.

8 changed files with 56 additions and 107 deletions

View File

@ -19,8 +19,8 @@ you're trying to use (for example express-session/session/memory, sqlite3, level
and then you will supply the names of the methods you wish to export to **worker** and then you will supply the names of the methods you wish to export to **worker**
processes. processes.
By default each worker will be added when `cluster` emits a `fork` event. You must pass each worker via `addWorker()` so that it signals the worker to creates
If needed you can set `addOnFork` to `false` and call `addWorker(worker)` manually. its own rpc-wrapped instance.
### master ### master
@ -32,15 +32,14 @@ var db = require('level')('./mydb')
// Wrap the instance // Wrap the instance
var crpc = require('cluster-rpc/master').create({ var crpc = require('cluster-rpc/master').create({
addOnFork: true // default instance: db
, instance: db
, methods: [ 'get', 'put' ] , methods: [ 'get', 'put' ]
, name: 'foo-level' , name: 'foo-level'
}); });
// If you set addOnFork to false, You must manually add each worker // You must add each worker
// crpc.addWorker(cluster.fork()); crpc.addWorker(cluster.fork());
crpc.then(function (db) { crpc.then(function (db) {
@ -78,8 +77,7 @@ if (cluster.isMaster) {
crpc = require('cluster-rpc/master').create({ crpc = require('cluster-rpc/master').create({
addOnFork: false instance: require('level')('./mydb')
, instance: require('level')('./mydb')
, methods: [ 'get', 'put' ] , methods: [ 'get', 'put' ]
, name: 'foo-level' , name: 'foo-level'
}); });

View File

@ -30,21 +30,24 @@ function setup(opts) {
var crypto = require('crypto'); var crypto = require('crypto');
var methods = getMethods(opts.instance, opts.methods); var methods = getMethods(opts.instance, opts.methods);
var token = crypto.randomBytes(16).toString('hex'); var token = crypto.randomBytes(16).toString('hex');
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
var inst = opts.instance; var inst = opts.instance;
var prefixes = require('./prefixes.js').create(opts); // uses opts.name
opts.master = opts.master || require('./process/master').create(prefixes); opts.master = opts.master || require('./process/master').create();
opts.master.on('connection', function (w) { opts.master.on('connection', function (w) {
if (opts.debug) { console.log('[cluster-rpc] [master] worker connected'); } //console.log('debug w: worker connection');
w.send({ w.send({
methods: methods methods: methods
, _token: token , _token: token
, type: prefixes.init , type: initPrefix
}); });
w.on('message', function (cmd) { w.on('message', function (cmd) {
if (0 !== (cmd.type||'').indexOf(prefixes.root)) { if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
//console.log('debug w: got unknown message type'); //console.log('debug w: got unknown message type');
return; return;
} }
@ -59,7 +62,7 @@ function setup(opts) {
} }
switch (cmd.type) { switch (cmd.type) {
case prefixes.rpc: case rpcPrefix:
cmd.args.push(function callback() { cmd.args.push(function callback() {
// args is probably err, data in most cases // args is probably err, data in most cases
var args = Array.prototype.slice.call(arguments); var args = Array.prototype.slice.call(arguments);
@ -69,7 +72,7 @@ function setup(opts) {
, id: cmd.id , id: cmd.id
//, this: this //, this: this
, _token: token , _token: token
, type: prefixes.result , type: resultPrefix
}); });
}); });
@ -86,25 +89,18 @@ function setup(opts) {
} }
module.exports.create = function (opts) { module.exports.create = function (opts) {
if (opts.debug) { console.log('[cluster-rpc] [master] create'); }
var cluster = require('cluster');
var PromiseA = opts.PromiseA || global.Promise || require('bluebird'); var PromiseA = opts.PromiseA || global.Promise || require('bluebird');
var init = false; var init = false;
opts._promise = PromiseA.resolve(opts.instance);
opts._promise.addWorker = function (w) { opts._promise.addWorker = function (w) {
if (opts.debug) { console.log('[cluster-rpc] [master] addWorker wrapper'); }
if (!init) { if (!init) {
init = true; init = true;
setup(opts); setup();
} }
return opts.master.addWorker(w); return opts.master.addWorker(w);
}; };
if (false !== opts.addOnFork) { opts._promise = PromiseA.resolve(opts.instance);
if (opts.debug) { console.log('[cluster-rpc] [master] -- will call addWorker on each fork'); }
cluster.on('fork', opts._promise.addWorker);
}
return opts._promise; return opts._promise;
}; };

View File

@ -1,15 +1,14 @@
{ {
"name": "cluster-rpc", "name": "cluster-rpc",
"version": "1.0.7", "version": "1.0.0",
"description": "A simple way to wrap a single-instance module to enable it to work with node cluster.", "description": "A simple way to wrap a single-instance module to enable it to work with node cluster.",
"homepage": "https://git.coolaj86.com/coolaj86/cluster-rpc.js",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
"test": "node test.js" "test": "node test.js"
}, },
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git+https://git.coolaj86.com/coolaj86/cluster-rpc.js.git" "url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git"
}, },
"keywords": [ "keywords": [
"cluster", "cluster",
@ -18,6 +17,7 @@
"author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)", "author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)",
"license": "(MIT OR Apache-2.0)", "license": "(MIT OR Apache-2.0)",
"bugs": { "bugs": {
"url": "https://git.coolaj86.com/coolaj86/cluster-rpc.js/issues" "url": "https://github.com/coolaj86/cluster-rpc/issues"
} },
"homepage": "https://github.com/coolaj86/cluster-rpc#readme"
} }

View File

@ -1,18 +0,0 @@
'use strict';
module.exports.create = function (opts) {
//var msgPrefix = 'cluster-rpc.' + opts.name;
//var rpcPrefix = msgPrefix + '.rpc';
//var resultPrefix = msgPrefix + '.result';
//var initPrefix = msgPrefix + '.init';
var root = 'com.daplie.cluster-rpc.' + (opts.name ? opts.name + '.' : '');
return {
root: root
, rpc: root + 'rpc'
, result: root + 'result'
, init: root + 'init'
, connect: root + 'connect'
// TODO the things that are using this should probably accept opts
, debug: opts.debug
};
};

View File

@ -1,45 +1,27 @@
'use strict'; 'use strict';
module.exports.create = function (prefixes) { module.exports.create = function () {
if (prefixes.debug) { console.log('[cluster-rpc] master created'); }
var m = new (require('events').EventEmitter)(); var m = new (require('events').EventEmitter)();
m.addWorker = function (worker) { m.addWorker = function (worker) {
if (prefixes.debug) { console.log('[cluster-rpc] [master] adding worker'); }
m._workers = []; m._workers = [];
var w = new (require('events').EventEmitter)(); var w = new (require('events').EventEmitter)();
function emitConnection() {
if (w.__online) {
return;
}
w.__online = true;
m.emit('connection', w);
}
worker.on('online', function () { worker.on('online', function () {
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker came online, at fork'); } //console.log('debug mw: worker is up')
emitConnection(); m.emit('connection', w);
}); });
worker.on('message', function (data) { worker.on('message', function (data) {
if (prefixes.connect === data.type) { //console.log('debug mw: worker sends message', data)
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker connected, manually'); }
emitConnection();
return;
}
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker sent message', data); }
w.emit('message', data); w.emit('message', data);
}); });
w.send = function (data) { w.send = function (data) {
if (prefixes.debug) { console.log('[cluster-rpc] [master] sending', data); }
worker.send(data); worker.send(data);
}; };
// TODO remove workers that exit
m._workers.push(w); m._workers.push(w);
}; };

View File

@ -1,7 +1,6 @@
'use strict'; 'use strict';
module.exports.create = function (process, prefixes) { module.exports.create = function (process) {
if (prefixes.debug) { console.log('[cluster-rpc] worker created'); }
var w = new (require('events').EventEmitter)(); var w = new (require('events').EventEmitter)();
process.on('message', function (data) { process.on('message', function (data) {
@ -12,10 +11,5 @@ module.exports.create = function (process, prefixes) {
process.send(data); process.send(data);
}; };
// if this were a web / unix socket there would be a 'connection' event
// emulating this is useful since the worker may create its cluster rpc
// at any time, (which means it may miss the 'fork' event)
w.send({ type: prefixes.connect });
return w; return w;
}; };

25
test.js
View File

@ -2,32 +2,34 @@
var cluster = require('cluster'); var cluster = require('cluster');
var crpc; var crpc;
function runMaster() {
var db = { var db = {
get: function (key, cb) { get: function (key, cb) {
cb(null, db[key]); cb(null, db[key]);
} }
, put: function (key, val, cb) { , put: function (key, val, cb) {
db[key] = val; db[key] = val;
if (cb) { cb(null); } cb(null);
} }
}; };
if (cluster.isMaster) {
crpc = require('./master').create({ crpc = require('./master').create({
instance: db instance: db
, methods: [ 'get', 'put' ] , methods: [ 'get', 'put' ]
, name: 'foo-level' , name: 'foo-level'
}); });
crpc.addWorker(cluster.fork());
crpc.then(function () { crpc.then(function () {
db.put('foo', 'bar'); db.put('foo', 'bar');
}); });
cluster.fork();
} }
else {
function runWorker() {
crpc = require('./worker').create({ crpc = require('./worker').create({
name: 'foo-level' name: 'foo-level'
@ -36,17 +38,6 @@ function runWorker() {
} }
if (cluster.isMaster) {
runMaster();
}
else {
runWorker();
}
crpc.then(function (db) { crpc.then(function (db) {
setTimeout(function () { setTimeout(function () {

View File

@ -9,8 +9,11 @@ module.exports.create = function (opts) {
var crypto = require('crypto'); var crypto = require('crypto');
var token = null; var token = null;
var inst = {}; var inst = {};
var prefixes = require('./prefixes.js').create(opts); var ws = opts.worker = opts.worker || require('./process/worker').create(process);
var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes); var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
ws.___listeners = []; ws.___listeners = [];
@ -27,7 +30,7 @@ module.exports.create = function (opts) {
args: args args: args
, func: fname , func: fname
, id: id , id: id
, type: prefixes.rpc , type: rpcPrefix
, _token: token , _token: token
}); });
@ -93,21 +96,24 @@ module.exports.create = function (opts) {
return new PromiseA(function (resolve) { return new PromiseA(function (resolve) {
ws.on('message', function (cmd) { ws.on('message', function (cmd) {
if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); } //console.log('debug m: mesage', cmd);
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
if (0 !== (cmd.type||'').indexOf(prefixes.root)) { //console.log('debug m: ignore msg', cmd);
//console.log(cmd.type, msgPrefix);
//console.log(cmd.type.indexOf(msgPrefix));
return; return;
} }
if (token && token !== cmd._token) { if (token && token !== cmd._token) {
//console.log('debug m: ignore msg', cmd);
return; return;
} }
switch (cmd.type) { switch (cmd.type) {
case prefixes.init: case initPrefix:
onInit(cmd); onInit(cmd);
resolve(inst); resolve(inst);
break; break;
case prefixes.result: case resultPrefix:
onResult(cmd); onResult(cmd);
break; break;
default: default: