Compare commits

...

11 Commits

Author SHA1 Message Date
AJ ONeal
4ad7b03117 v1.0.7 2018-04-20 19:14:15 -06:00
AJ ONeal
7afd97737c v1.0.6 2018-04-20 19:13:48 -06:00
AJ ONeal
81e82b2760 v1.0.4 2016-09-08 18:25:15 -06:00
AJ ONeal
7819398294 add forks on 'fork' event 2016-09-08 18:25:08 -06:00
AJ ONeal
b85545a8cc v1.0.3 2016-09-08 17:50:42 -06:00
AJ ONeal
1fbd2a4b71 v1.0.2 2016-09-08 17:49:37 -06:00
AJ ONeal
a43a8347a9 v1.0.1 2016-09-08 17:48:49 -06:00
AJ ONeal
6f386cb159 do less work when standalone 2016-09-08 17:48:14 -06:00
AJ ONeal
44518f9d61 Merge branch 'master' of github.com:coolaj86/cluster-rpc 2016-09-08 17:14:29 -06:00
AJ ONeal
1dfbc03155 fallback to bluebird 2016-09-08 17:14:07 -06:00
AJ ONeal
88e5eef8c1 Initial commit 2016-09-08 16:09:31 -06:00
10 changed files with 180 additions and 60 deletions

37
.gitignore vendored Normal file
View File

@ -0,0 +1,37 @@
# Logs
logs
*.log
npm-debug.log*
# Runtime data
pids
*.pid
*.seed
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# nyc test coverage
.nyc_output
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# node-waf configuration
.lock-wscript
# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules
jspm_packages
# Optional npm cache directory
.npm
# Optional REPL history
.node_repl_history

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 AJ ONeal
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -19,8 +19,8 @@ you're trying to use (for example express-session/session/memory, sqlite3, level
and then you will supply the names of the methods you wish to export to **worker**
processes.
You must pass each worker via `addWorker()` so that it signals the worker to creates
its own rpc-wrapped instance.
By default each worker will be added when `cluster` emits a `fork` event.
If needed you can set `addOnFork` to `false` and call `addWorker(worker)` manually.
### master
@ -32,14 +32,15 @@ var db = require('level')('./mydb')
// Wrap the instance
var crpc = require('cluster-rpc/master').create({
instance: db
addOnFork: true // default
, instance: db
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});
// You must add each worker
crpc.addWorker(cluster.fork());
// If you set addOnFork to false, You must manually add each worker
// crpc.addWorker(cluster.fork());
crpc.then(function (db) {
@ -77,7 +78,8 @@ if (cluster.isMaster) {
crpc = require('cluster-rpc/master').create({
instance: require('level')('./mydb')
addOnFork: false
, instance: require('level')('./mydb')
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});

View File

@ -26,29 +26,25 @@ function getMethods(inst, keys) {
});
}
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise;
function setup(opts) {
var crypto = require('crypto');
var inst = opts.instance;
var methods = getMethods(opts.instance, opts.methods);
var token = crypto.randomBytes(16).toString('hex');
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
var inst = opts.instance;
var prefixes = require('./prefixes.js').create(opts); // uses opts.name
opts.master = opts.master || require('./process/master').create();
opts.master = opts.master || require('./process/master').create(prefixes);
opts.master.on('connection', function (w) {
//console.log('debug w: worker connection');
if (opts.debug) { console.log('[cluster-rpc] [master] worker connected'); }
w.send({
methods: methods
, _token: token
, type: initPrefix
, type: prefixes.init
});
w.on('message', function (cmd) {
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
if (0 !== (cmd.type||'').indexOf(prefixes.root)) {
//console.log('debug w: got unknown message type');
return;
}
@ -63,7 +59,7 @@ module.exports.create = function (opts) {
}
switch (cmd.type) {
case rpcPrefix:
case prefixes.rpc:
cmd.args.push(function callback() {
// args is probably err, data in most cases
var args = Array.prototype.slice.call(arguments);
@ -73,7 +69,7 @@ module.exports.create = function (opts) {
, id: cmd.id
//, this: this
, _token: token
, type: resultPrefix
, type: prefixes.result
});
});
@ -87,9 +83,28 @@ module.exports.create = function (opts) {
});
});
}
opts._promise = PromiseA.resolve(inst);
opts._promise.addWorker = opts.master.addWorker;
module.exports.create = function (opts) {
if (opts.debug) { console.log('[cluster-rpc] [master] create'); }
var cluster = require('cluster');
var PromiseA = opts.PromiseA || global.Promise || require('bluebird');
var init = false;
opts._promise = PromiseA.resolve(opts.instance);
opts._promise.addWorker = function (w) {
if (opts.debug) { console.log('[cluster-rpc] [master] addWorker wrapper'); }
if (!init) {
init = true;
setup(opts);
}
return opts.master.addWorker(w);
};
if (false !== opts.addOnFork) {
if (opts.debug) { console.log('[cluster-rpc] [master] -- will call addWorker on each fork'); }
cluster.on('fork', opts._promise.addWorker);
}
return opts._promise;
};

View File

@ -1,14 +1,15 @@
{
"name": "cluster-rpc",
"version": "1.0.0",
"version": "1.0.7",
"description": "A simple way to wrap a single-instance module to enable it to work with node cluster.",
"homepage": "https://git.coolaj86.com/coolaj86/cluster-rpc.js",
"main": "index.js",
"scripts": {
"test": "node test.js"
},
"repository": {
"type": "git",
"url": "git+ssh://git@github.com/coolaj86/cluster-rpc.git"
"url": "git+https://git.coolaj86.com/coolaj86/cluster-rpc.js.git"
},
"keywords": [
"cluster",
@ -17,7 +18,6 @@
"author": "AJ ONeal <coolaj86@gmail.com> (https://coolaj86.com/)",
"license": "(MIT OR Apache-2.0)",
"bugs": {
"url": "https://github.com/coolaj86/cluster-rpc/issues"
},
"homepage": "https://github.com/coolaj86/cluster-rpc#readme"
"url": "https://git.coolaj86.com/coolaj86/cluster-rpc.js/issues"
}
}

18
prefixes.js Normal file
View File

@ -0,0 +1,18 @@
'use strict';
module.exports.create = function (opts) {
//var msgPrefix = 'cluster-rpc.' + opts.name;
//var rpcPrefix = msgPrefix + '.rpc';
//var resultPrefix = msgPrefix + '.result';
//var initPrefix = msgPrefix + '.init';
var root = 'com.daplie.cluster-rpc.' + (opts.name ? opts.name + '.' : '');
return {
root: root
, rpc: root + 'rpc'
, result: root + 'result'
, init: root + 'init'
, connect: root + 'connect'
// TODO the things that are using this should probably accept opts
, debug: opts.debug
};
};

View File

@ -1,27 +1,45 @@
'use strict';
module.exports.create = function () {
module.exports.create = function (prefixes) {
if (prefixes.debug) { console.log('[cluster-rpc] master created'); }
var m = new (require('events').EventEmitter)();
m.addWorker = function (worker) {
if (prefixes.debug) { console.log('[cluster-rpc] [master] adding worker'); }
m._workers = [];
var w = new (require('events').EventEmitter)();
worker.on('online', function () {
//console.log('debug mw: worker is up')
function emitConnection() {
if (w.__online) {
return;
}
w.__online = true;
m.emit('connection', w);
}
worker.on('online', function () {
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker came online, at fork'); }
emitConnection();
});
worker.on('message', function (data) {
//console.log('debug mw: worker sends message', data)
if (prefixes.connect === data.type) {
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker connected, manually'); }
emitConnection();
return;
}
if (prefixes.debug) { console.log('[cluster-rpc] [master] worker sent message', data); }
w.emit('message', data);
});
w.send = function (data) {
if (prefixes.debug) { console.log('[cluster-rpc] [master] sending', data); }
worker.send(data);
};
// TODO remove workers that exit
m._workers.push(w);
};

View File

@ -1,6 +1,7 @@
'use strict';
module.exports.create = function (process) {
module.exports.create = function (process, prefixes) {
if (prefixes.debug) { console.log('[cluster-rpc] worker created'); }
var w = new (require('events').EventEmitter)();
process.on('message', function (data) {
@ -11,5 +12,10 @@ module.exports.create = function (process) {
process.send(data);
};
// if this were a web / unix socket there would be a 'connection' event
// emulating this is useful since the worker may create its cluster rpc
// at any time, (which means it may miss the 'fork' event)
w.send({ type: prefixes.connect });
return w;
};

37
test.js
View File

@ -2,34 +2,32 @@
var cluster = require('cluster');
var crpc;
var db = {
get: function (key, cb) {
cb(null, db[key]);
}
, put: function (key, val, cb) {
db[key] = val;
cb(null);
}
};
function runMaster() {
if (cluster.isMaster) {
var db = {
get: function (key, cb) {
cb(null, db[key]);
}
, put: function (key, val, cb) {
db[key] = val;
if (cb) { cb(null); }
}
};
crpc = require('./master').create({
instance: db
, methods: [ 'get', 'put' ]
, name: 'foo-level'
});
crpc.addWorker(cluster.fork());
crpc.then(function () {
db.put('foo', 'bar');
});
cluster.fork();
}
else {
function runWorker() {
crpc = require('./worker').create({
name: 'foo-level'
@ -38,6 +36,17 @@ else {
}
if (cluster.isMaster) {
runMaster();
}
else {
runWorker();
}
crpc.then(function (db) {
setTimeout(function () {

View File

@ -5,15 +5,12 @@
// com.daplie.ipc.result - receive results and callback id
module.exports.create = function (opts) {
var PromiseA = opts.PromiseA || global.Promise;
var PromiseA = opts.PromiseA || global.Promise || require('bluebird');
var crypto = require('crypto');
var token = null;
var inst = {};
var ws = opts.worker = opts.worker || require('./process/worker').create(process);
var msgPrefix = 'cluster-rpc.' + opts.name;
var rpcPrefix = msgPrefix + '.rpc';
var resultPrefix = msgPrefix + '.result';
var initPrefix = msgPrefix + '.init';
var prefixes = require('./prefixes.js').create(opts);
var ws = opts.worker = opts.worker || require('./process/worker').create(process, prefixes);
ws.___listeners = [];
@ -30,7 +27,7 @@ module.exports.create = function (opts) {
args: args
, func: fname
, id: id
, type: rpcPrefix
, type: prefixes.rpc
, _token: token
});
@ -96,24 +93,21 @@ module.exports.create = function (opts) {
return new PromiseA(function (resolve) {
ws.on('message', function (cmd) {
//console.log('debug m: mesage', cmd);
if (0 !== (cmd.type||'').indexOf(msgPrefix)) {
//console.log('debug m: ignore msg', cmd);
//console.log(cmd.type, msgPrefix);
//console.log(cmd.type.indexOf(msgPrefix));
if (opts.debug) { console.log('[cluster-rpc] [worker] message received', cmd); }
if (0 !== (cmd.type||'').indexOf(prefixes.root)) {
return;
}
if (token && token !== cmd._token) {
//console.log('debug m: ignore msg', cmd);
return;
}
switch (cmd.type) {
case initPrefix:
case prefixes.init:
onInit(cmd);
resolve(inst);
break;
case resultPrefix:
case prefixes.result:
onResult(cmd);
break;
default: