Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ad7b03117 | ||
|
|
7afd97737c | ||
|
|
81e82b2760 | ||
|
|
7819398294 | ||
|
|
b85545a8cc | ||
|
|
1fbd2a4b71 | ||
|
|
a43a8347a9 |
14
README.md
14
README.md
@ -19,8 +19,8 @@ you're trying to use (for example express-session/session/memory, sqlite3, level
|
||||
and then you will supply the names of the methods you wish to export to **worker**
|
||||
processes.
|
||||
|
||||
You must pass each worker via `addWorker()` so that it signals the worker to creates
|
||||
its own rpc-wrapped instance.
|
||||
By default each worker will be added when `cluster` emits a `fork` event.
|
||||
If needed you can set `addOnFork` to `false` and call `addWorker(worker)` manually.
|
||||
|
||||
### master
|
||||
|
||||
@ -32,14 +32,15 @@ var db = require('level')('./mydb')
|
||||
|
||||
// Wrap the instance
|
||||
var crpc = require('cluster-rpc/master').create({
|
||||
instance: db
|
||||
addOnFork: true // default
|
||||
, instance: db
|
||||
, methods: [ 'get', 'put' ]
|
||||
, name: 'foo-level'
|
||||
});
|
||||
|
||||
|
||||
// You must add each worker
|
||||
crpc.addWorker(cluster.fork());
|
||||
// If you set addOnFork to false, You must manually add each worker
|
||||
// crpc.addWorker(cluster.fork());
|
||||
|
||||
|
||||
crpc.then(function (db) {
|
||||
@ -77,7 +78,8 @@ if (cluster.isMaster) {
|
||||
|
||||
|
||||
crpc = require('cluster-rpc/master').create({
|
||||
instance: require('level')('./mydb')
|
||||
addOnFork: false
|
||||
, instance: require('level')('./mydb')
|
||||
, methods: [ 'get', 'put' ]
|
||||
, name: 'foo-level'
|
||||
});
|
||||
|
||||
28
master.js
28
master.js
@ -30,24 +30,21 @@ function setup(opts) {
|
||||
var crypto = require('crypto');
|
||||
var methods = getMethods(opts.instance, opts.methods);
|
||||
var token = crypto.randomBytes(16).toString('hex');
|
||||
var msgPrefix = 'cluster-rpc.' + opts.name;
|
||||
var rpcPrefix = msgPrefix + '.rpc';
|
||||
var resultPrefix = msgPrefix + '.result';
|
||||
var initPrefix = msgPrefix + '.init';
|
||||
var inst = opts.instance;
|
||||
var prefixes = require('./prefixes.js').create(opts); // uses opts.name
|
||||
|
||||
opts.master = opts.master || require('./process/master').create();
|
||||
opts.master = opts.master || require('./process/master').create(prefixes);
|
||||
|
||||
opts.master.on('connection', function (w) {
|
||||
//console.log('debug w: worker connection');
|
||||
if (opts.debug) { console.log('[cluster-rpc] [master] worker connected'); }
|
||||
w.send({
|
||||
methods: methods
|
||||
, _token: token
|
||||
, type: initPrefix
|
||||
, type: prefixes.init
|
||||
});
|
||||
|
||||
w.on('message', function (cmd) {
|
||||
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
|
||||
if (0 !== (cmd.type||'').indexOf(prefixes.root)) {
|
||||
//console.log('debug w: got unknown message type');
|
||||
return;
|
||||
}
|
||||
@ -62,7 +59,7 @@ function setup(opts) {
|
||||
}
|
||||
|
||||
switch (cmd.type) {
|
||||
case rpcPrefix:
|
||||
case prefixes.rpc:
|
||||
cmd.args.push(function callback() {
|
||||
// args is probably err, data in most cases
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
@ -72,7 +69,7 @@ function setup(opts) {
|
||||
, id: cmd.id
|
||||
//, this: this
|
||||
, _token: token
|
||||
, type: resultPrefix
|
||||
, type: prefixes.result
|
||||
});
|
||||
});
|
||||
|
||||
@ -89,18 +86,25 @@ function setup(opts) {
|
||||
}
|
||||
|
||||
module.exports.create = function (opts) {
|
||||
if (opts.debug) { console.log('[cluster-rpc] [master] create'); }
|
||||
var cluster = require('cluster');
|
||||
var PromiseA = opts.PromiseA || global.Promise || require('bluebird');
|
||||
var init = false;
|
||||
|
||||
opts._promise = PromiseA.resolve(opts.instance);
|
||||
opts._promise.addWorker = function (w) {
|
||||
if (opts.debug) { console.log('[cluster-rpc] [master] addWorker wrapper'); }
|
||||
if (!init) {
|
||||
init = true;
|
||||
setup();
|
||||
setup(opts);
|
||||
}
|
||||
return opts.master.addWorker(w);
|
||||
};
|
||||
|
||||
opts._promise = PromiseA.resolve(opts.instance);
|
||||
if (false !== opts.addOnFork) {
|
||||
if (opts.debug) { console.log('[cluster-rpc] [master] -- will call addWorker on each fork'); }
|
||||
cluster.on('fork', opts._promise.addWorker);
|
||||
}
|
||||
|
||||
return opts._promise;
|
||||
};
|
||||
|
||||
10
package.json
10
package.json
@ -1,14 +1,15 @@
|
||||
{
|
||||
"name": "cluster-rpc",
|
||||
"version": "1.0.0",
|
||||
"version": "1.0.7",
|
||||
"description": "A simple way to wrap a single-instance module to enable it to work with node cluster.",
|
||||
"homepage": "https://git.coolaj86.com/coolaj86/cluster-rpc.js",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "node test.js"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git"
|
||||
"url": "git+https://git.coolaj86.com/coolaj86/cluster-rpc.js.git"
|
||||
},
|
||||
"keywords": [
|
||||
"cluster",
|
||||
@ -17,7 +18,6 @@
|
||||
"author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)",
|
||||
"license": "(MIT OR Apache-2.0)",
|
||||
"bugs": {
|
||||
"url": "https://github.com/coolaj86/cluster-rpc/issues"
|
||||
},
|
||||
"homepage": "https://github.com/coolaj86/cluster-rpc#readme"
|
||||
"url": "https://git.coolaj86.com/coolaj86/cluster-rpc.js/issues"
|
||||
}
|
||||
}
|
||||
|
||||
18
prefixes.js
Normal file
18
prefixes.js
Normal file
@ -0,0 +1,18 @@
|
||||
'use strict';
|
||||
|
||||
module.exports.create = function (opts) {
|
||||
//var msgPrefix = 'cluster-rpc.' + opts.name;
|
||||
//var rpcPrefix = msgPrefix + '.rpc';
|
||||
//var resultPrefix = msgPrefix + '.result';
|
||||
//var initPrefix = msgPrefix + '.init';
|
||||
var root = 'com.daplie.cluster-rpc.' + (opts.name ? opts.name + '.' : '');
|
||||
return {
|
||||
root: root
|
||||
, rpc: root + 'rpc'
|
||||
, result: root + 'result'
|
||||
, init: root + 'init'
|
||||
, connect: root + 'connect'
|
||||
// TODO the things that are using this should probably accept opts
|
||||
, debug: opts.debug
|
||||
};
|
||||
};
|
||||
@ -1,27 +1,45 @@
|
||||
'use strict';
|
||||
|
||||
module.exports.create = function () {
|
||||
module.exports.create = function (prefixes) {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] master created'); }
|
||||
var m = new (require('events').EventEmitter)();
|
||||
|
||||
m.addWorker = function (worker) {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] [master] adding worker'); }
|
||||
m._workers = [];
|
||||
|
||||
var w = new (require('events').EventEmitter)();
|
||||
|
||||
worker.on('online', function () {
|
||||
//console.log('debug mw: worker is up')
|
||||
function emitConnection() {
|
||||
if (w.__online) {
|
||||
return;
|
||||
}
|
||||
|
||||
w.__online = true;
|
||||
m.emit('connection', w);
|
||||
}
|
||||
|
||||
worker.on('online', function () {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker came online, at fork'); }
|
||||
emitConnection();
|
||||
});
|
||||
|
||||
worker.on('message', function (data) {
|
||||
//console.log('debug mw: worker sends message', data)
|
||||
if (prefixes.connect === data.type) {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker connected, manually'); }
|
||||
emitConnection();
|
||||
return;
|
||||
}
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker sent message', data); }
|
||||
w.emit('message', data);
|
||||
});
|
||||
|
||||
w.send = function (data) {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] [master] sending', data); }
|
||||
worker.send(data);
|
||||
};
|
||||
|
||||
// TODO remove workers that exit
|
||||
m._workers.push(w);
|
||||
};
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
'use strict';
|
||||
|
||||
module.exports.create = function (process) {
|
||||
module.exports.create = function (process, prefixes) {
|
||||
if (prefixes.debug) { console.log('[cluster-rpc] worker created'); }
|
||||
var w = new (require('events').EventEmitter)();
|
||||
|
||||
process.on('message', function (data) {
|
||||
@ -11,5 +12,10 @@ module.exports.create = function (process) {
|
||||
process.send(data);
|
||||
};
|
||||
|
||||
// if this were a web / unix socket there would be a 'connection' event
|
||||
// emulating this is useful since the worker may create its cluster rpc
|
||||
// at any time, (which means it may miss the 'fork' event)
|
||||
w.send({ type: prefixes.connect });
|
||||
|
||||
return w;
|
||||
};
|
||||
|
||||
31
test.js
31
test.js
@ -2,34 +2,32 @@
|
||||
|
||||
var cluster = require('cluster');
|
||||
var crpc;
|
||||
var db = {
|
||||
|
||||
function runMaster() {
|
||||
|
||||
var db = {
|
||||
get: function (key, cb) {
|
||||
cb(null, db[key]);
|
||||
}
|
||||
, put: function (key, val, cb) {
|
||||
, put: function (key, val, cb) {
|
||||
db[key] = val;
|
||||
cb(null);
|
||||
if (cb) { cb(null); }
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
if (cluster.isMaster) {
|
||||
|
||||
};
|
||||
|
||||
crpc = require('./master').create({
|
||||
instance: db
|
||||
, methods: [ 'get', 'put' ]
|
||||
, name: 'foo-level'
|
||||
});
|
||||
crpc.addWorker(cluster.fork());
|
||||
crpc.then(function () {
|
||||
db.put('foo', 'bar');
|
||||
});
|
||||
|
||||
cluster.fork();
|
||||
|
||||
}
|
||||
else {
|
||||
|
||||
function runWorker() {
|
||||
|
||||
crpc = require('./worker').create({
|
||||
name: 'foo-level'
|
||||
@ -38,6 +36,17 @@ else {
|
||||
|
||||
}
|
||||
|
||||
if (cluster.isMaster) {
|
||||
|
||||
runMaster();
|
||||
|
||||
}
|
||||
else {
|
||||
|
||||
runWorker();
|
||||
|
||||
}
|
||||
|
||||
|
||||
crpc.then(function (db) {
|
||||
setTimeout(function () {
|
||||
|
||||
22
worker.js
22
worker.js
@ -9,11 +9,8 @@ module.exports.create = function (opts) {
|
||||
var crypto = require('crypto');
|
||||
var token = null;
|
||||
var inst = {};
|
||||
var ws = opts.worker = opts.worker || require('./process/worker').create(process);
|
||||
var msgPrefix = 'cluster-rpc.' + opts.name;
|
||||
var rpcPrefix = msgPrefix + '.rpc';
|
||||
var resultPrefix = msgPrefix + '.result';
|
||||
var initPrefix = msgPrefix + '.init';
|
||||
var prefixes = require('./prefixes.js').create(opts);
|
||||
var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes);
|
||||
|
||||
ws.___listeners = [];
|
||||
|
||||
@ -30,7 +27,7 @@ module.exports.create = function (opts) {
|
||||
args: args
|
||||
, func: fname
|
||||
, id: id
|
||||
, type: rpcPrefix
|
||||
, type: prefixes.rpc
|
||||
, _token: token
|
||||
});
|
||||
|
||||
@ -96,24 +93,21 @@ module.exports.create = function (opts) {
|
||||
|
||||
return new PromiseA(function (resolve) {
|
||||
ws.on('message', function (cmd) {
|
||||
//console.log('debug m: mesage', cmd);
|
||||
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
|
||||
//console.log('debug m: ignore msg', cmd);
|
||||
//console.log(cmd.type, msgPrefix);
|
||||
//console.log(cmd.type.indexOf(msgPrefix));
|
||||
if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); }
|
||||
|
||||
if (0 !== (cmd.type||'').indexOf(prefixes.root)) {
|
||||
return;
|
||||
}
|
||||
if (token && token !== cmd._token) {
|
||||
//console.log('debug m: ignore msg', cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (cmd.type) {
|
||||
case initPrefix:
|
||||
case prefixes.init:
|
||||
onInit(cmd);
|
||||
resolve(inst);
|
||||
break;
|
||||
case resultPrefix:
|
||||
case prefixes.result:
|
||||
onResult(cmd);
|
||||
break;
|
||||
default:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user