diff --git a/README.md b/README.md index 1bfeca7..1487351 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ var store = require('memstore-cluster'); var numCores = require('os').cpus().length; var opts = { - sock: '/tmp/mydb.memstore.sock' + sock: '/tmp/memstore.sock' // If left 'null' or 'undefined' this defaults to a similar memstore // with no special logic for 'cookie' or 'expires' diff --git a/client.js b/client.js new file mode 100644 index 0000000..afbd018 --- /dev/null +++ b/client.js @@ -0,0 +1,189 @@ +'use strict'; + +/*global Promise*/ + +function startServer(opts) { + return require('./server').create(opts).then(function (server) { + // this process doesn't need to connect to itself + // through a socket + return server.masterClient; + }); +} + +function getConnection(opts) { + return new Promise(function (resolve) { + //setTimeout(function () { + var WebSocket = require('ws'); + var ws = new WebSocket('ws+unix:' + opts.sock); + + ws.on('error', function (err) { + console.error('[ERROR] ws connection failed, retrying'); + console.error(err); + + function retry() { + setTimeout(function () { + getConnection(opts).then(resolve, retry); + }, 100 + (Math.random() * 250)); + } + + if (!opts.connect && ('ENOENT' === err.code || 'ECONNREFUSED' === err.code)) { + console.log('[NO SERVER] attempting to create a server #######################'); + return startServer(opts).then(function (client) { + // ws.masterClient = client; + resolve({ masterClient: client }); + }, function () { + retry(); + }); + } + + retry(); + }); + + /* + ws.on('open', function () { + resolve(ws); + }); + */ + ws.___listeners = []; + ws.on('message', function (data) { + ws.___listeners.forEach(function (fn) { + try { + fn(data); + } catch(e) { + console.error("[ERROR] ws.on('message', fn) (multi-callback)"); + console.error(e); + // ignore + } + }); + }); + + function onInitMessage(str) { + // TODO there's no way to remove a listener... what to do? + var data; + + try { + data = JSON.parse(str); + } catch(e) { + console.error('[ERROR]'); + console.error(e); + } + + if ('methods' !== data.type) { + return; + } + + var index = ws.___listeners.indexOf(onInitMessage); + ws.___listeners.splice(index, 1); + ws._methods = data.methods; + + resolve(ws); + } + + ws.___listeners.push(onInitMessage); + //}, 100 + (Math.random() * 250)); + }); +} + +function create(opts) { + if (!opts.sock) { + opts.sock = '/tmp/memstore' + '.sock'; + } + + var promise; + var numcpus = require('os').cpus().length; + if (opts.standalone || (1 === numcpus && !opts.serve && !opts.connect)) { + return require('./memstore').create(opts); + } + + function retryServe() { + return startServer(opts).then(function (client) { + // ws.masterClient = client; + return { masterClient: client }; + }, function (err) { + console.error('[ERROR] retryServe()'); + console.error(err); + retryServe(); + }); + } + + if (opts.serve) { + promise = retryServe(); + } else { + promise = getConnection(opts); + } + + /* + if (opts.connect) { + } + */ + + // TODO maybe use HTTP POST instead? + return promise.then(function (ws) { + if (ws.masterClient) { + return ws.masterClient; + } + + var db = {}; + + function rpc(fname, args) { + var id; + var cb; + + if ('function' === typeof args[args.length - 1]) { + id = Math.random(); + cb = args.pop(); + } + + ws.send(JSON.stringify({ + type: 'rpc' + , func: fname + , args: args + , filename: opts.filename + , id: id + })); + + if (!cb) { + return; + } + + function onMessage(data) { + var cmd; + + try { + cmd = JSON.parse(data.toString('utf8')); + } catch(e) { + console.error('[ERROR] in client, from sql server parse json'); + console.error(e); + console.error(data); + console.error(); + + //ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); + return; + } + + if (cmd.id !== id) { + return; + } + + if ('on' !== fname) { + var index = ws.___listeners.indexOf(onMessage); + ws.___listeners.splice(index, 1); + } + + cb.apply(cmd.this, cmd.args); + } + + ws.___listeners.push(onMessage); + } + + ws._methods.forEach(function (key) { + db[key] = function () { + rpc(key, Array.prototype.slice.call(arguments)); + }; + }); + + return db; + }); +} + +module.exports.create = create; diff --git a/cluster.js b/cluster.js new file mode 100644 index 0000000..37b7bbd --- /dev/null +++ b/cluster.js @@ -0,0 +1,20 @@ +'use strict'; + +var memstore = require('./index'); + +function create(opts) { + var cluster = require('cluster'); + var numCores = require('os').cpus().length; + + if (!opts.serve && ('boolean' !== typeof opts.serve)) { + opts.serve = (numCores > 1) && cluster.isMaster; + } + + if (!opts.connect && ('boolean' !== typeof opts.connect)) { + opts.connect = (numCores > 1) && cluster.isWorker; + } + + return memstore.create(opts); +} + +module.exports.create = create; diff --git a/index.js b/index.js new file mode 100644 index 0000000..a961216 --- /dev/null +++ b/index.js @@ -0,0 +1,3 @@ +'use strict'; + +module.exports = require('./client'); diff --git a/memstore.js b/memstore.js new file mode 100644 index 0000000..8d7d26f --- /dev/null +++ b/memstore.js @@ -0,0 +1,60 @@ +'use strict'; + +/*global Promise*/ +var defer; + +if ('function' === typeof setImmediate) { + defer = setImmediate; +} else { + defer = function (fn) { process.nextTick(fn.bind.apply(fn, arguments)); }; +} + +function create(/*opts*/) { + // don't leak prototypes as implicitly non-null + var db = Object.create(null); + + return Promise.resolve({ + // required / recommended + set: function (id, data, fn) { + db[id] = data; + + if (!fn) { return; } + defer(fn, null); + } + , get: function (id, fn) { + if (!fn) { return; } + defer(fn, null, 'undefined' === typeof db[id] ? null : db[id]); + } + , touch: function (id, data, fn) { + db[id] = data; + + if (!fn) { return; } + defer(fn, null); + } + , destroy: function (id, fn) { + delete db[id]; + + if (!fn) { return; } + defer(fn, null); + } + // optional + , all: function (fn) { + if (!fn) { return; } + defer(fn, null, Object.keys(db).map(function (key) { + return db[key]; + })); + } + , length: function (fn) { + if (!fn) { return; } + defer(fn, null, Object.keys(db).length); + } + , clear: function (fn) { + db = Object.create(null); + + if (!fn) { return; } + defer(fn, null); + } + }); +} + +module.exports.create = create; diff --git a/package.json b/package.json new file mode 100644 index 0000000..9eaec96 --- /dev/null +++ b/package.json @@ -0,0 +1,32 @@ +{ + "name": "memstore-cluster", + "version": "1.0.0", + "description": "A wrapper to enable the use of a in-process store with node cluster via a socket server (i.e. for Raspberry Pi 2).", + "main": "index.js", + "scripts": { + "test": "node test-cluster.js", + "start": "node server.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/coolaj86/memstore-cluster.git" + }, + "keywords": [ + "store", + "session", + "connect", + "express", + "memstore", + "cluster", + "rpi2" + ], + "author": "AJ ONeal (http://coolaj86.com/)", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/coolaj86/memstore-cluster/issues" + }, + "homepage": "https://github.com/coolaj86/memstore-cluster#readme", + "dependencies": { + "ws": "^0.7.2" + } +} diff --git a/server.js b/server.js new file mode 100644 index 0000000..7d60f65 --- /dev/null +++ b/server.js @@ -0,0 +1,143 @@ +'use strict'; +/*global Promise*/ + +var wsses = {}; + +function createApp(server, options) { + var promise; + + if (wsses[options.filename]) { + return Promise.resolve(wsses[options.filename]); + } + + if (options.store) { + promise = Promise.resolve(options.store); + } else { + promise = require('./memstore').create(options); + } + + return promise.then(function (db) { + var url = require('url'); + //var express = require('express'); + //var app = express(); + var wss = server.wss; + + function app(req, res) { + res.end('NOT IMPLEMENTED'); + } + + function getMethods(db) { + /* + var instanceMethods = Object.keys(db) + .map(function (key) { return 'function' === typeof db[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + var protoMethods = Object.keys(Object.getPrototypeOf(db)) + .map(function (key) { return 'function' === typeof Object.getPrototypeOf(db)[key] ? key : null; }) + .filter(function (key) { return key; }) + ; + + return instanceMethods.concat(protoMethods); + */ + + return [ + 'set', 'get', 'touch', 'destroy' + , 'all', 'length', 'clear' + , 'on', 'off', 'removeEventListener', 'addEventListener' + ].filter(function (key) { + if ('function' === typeof db[key]) { + return true; + } + }); + } + + wss.on('connection', function (ws) { + ws.send(JSON.stringify({ + type: 'methods' + , methods: getMethods(db) + })); + + var location = url.parse(ws.upgradeReq.url, true); + // you might use location.query.access_token to authenticate or share sessions + // or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312 + + ws.__session_id = location.query.session_id || Math.random(); + + ws.on('message', function (buffer) { + var cmd; + + try { + cmd = JSON.parse(buffer.toString('utf8')); + } catch(e) { + console.error('[ERROR] parse json'); + console.error(e); + console.error(buffer); + console.error(); + ws.send(JSON.stringify({ type: 'error', value: { message: e.message, code: "E_PARSE_JSON" } })); + return; + } + + switch(cmd.type) { + case 'init': + break; + + case 'rpc': + cmd.args.push(function () { + var args = Array.prototype.slice.call(arguments); + + ws.send(JSON.stringify({ + this: this + , args: args + , id: cmd.id + })); + }); + + db[cmd.func].apply(db, cmd.args); + break; + + default: + throw new Error('UNKNOWN TYPE'); + //break; + } + + }); + + ws.send(JSON.stringify({ type: 'session', value: ws.__session_id })); + }); + + app.masterClient = db; + //wsses[options.filename] = app; + + return app; + }); +} + +function create(options) { + var server = require('http').createServer(); + var WebSocketServer = require('ws').Server; + var wss = new WebSocketServer({ server: server }); + //var port = process.env.PORT || process.argv[0] || 4080; + + var fs = require('fs'); + var ps = []; + + ps.push(new Promise(function (resolve) { + fs.unlink(options.sock, function () { + // ignore error when socket doesn't exist + + server.listen(options.sock, resolve); + }); + })); + + ps.push(createApp({ server: server, wss: wss }, options).then(function (app) { + server.on('request', app); + return { masterClient: app.masterClient }; + })); + + return Promise.all(ps).then(function (results) { + return results[1]; + }); +} + +module.exports.create = create; diff --git a/standalone.js b/standalone.js new file mode 100644 index 0000000..29c090f --- /dev/null +++ b/standalone.js @@ -0,0 +1,15 @@ +'use strict'; + +var memstore = require('./index'); + +function create(opts) { + opts.standalone = true; + + // TODO if cluster *is* used issue a warning? + // I suppose the user could be issuing a different filename for each + // ... but then they have no need to use this module, right? + + return memstore.create(opts); +} + +module.exports.create = create; diff --git a/test-cluster.js b/test-cluster.js new file mode 100644 index 0000000..d6b8ff7 --- /dev/null +++ b/test-cluster.js @@ -0,0 +1,52 @@ +'use strict'; + +var cluster = require('cluster'); +//var numCores = 2; +var numCores = require('os').cpus().length; +var id = (cluster.isMaster && '0' || cluster.worker.id).toString(); + +function run() { + var mstore = require('./cluster'); + + return mstore.create({ + standalone: null + , serve: null + , connect: null + }).then(function (store) { + store.set('foo', 'bar', function (err) { + if (err) { console.error(err); return; } + + store.get('baz', function (err, data) { + if (err) { console.error(err); return; } + console.log(id, 'should be null:', data); + }); + + store.get('foo', function (err, data) { + if (err) { console.error(err); return; } + console.log(id, 'should be bar:', data); + }); + }); + }); +} + +if (cluster.isMaster) { + // not a bad idea to setup the master before forking the workers + run().then(function () { + var i; + + for (i = 1; i <= numCores; i += 1) { + cluster.fork(); + } + }); +} else { + run(); +} + +// The native Promise implementation ignores errors because... dumbness??? +process.on('unhandledPromiseRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +}); diff --git a/test-standalone.js b/test-standalone.js new file mode 100644 index 0000000..fe9ae75 --- /dev/null +++ b/test-standalone.js @@ -0,0 +1,37 @@ +'use strict'; + +function run() { + var mstore = require('./standalone'); + + mstore.create({ + sock: '/tmp/memstore.sock' + , standalone: null + , serve: null + , connect: null + }).then(function (store) { + store.set('foo', 'bar', function (err) { + if (err) { console.error(err); return; } + + store.get('baz', function (err, data) { + if (err) { console.error(err); return; } + console.log('should be null:', data); + }); + + store.get('foo', function (err, data) { + if (err) { console.error(err); return; } + console.log('should be bar:', data); + }); + }); + }); +} + +run(); + +// The native Promise implementation ignores errors because... dumbness??? +process.on('unhandledPromiseRejection', function (err) { + console.error('Unhandled Promise Rejection'); + console.error(err); + console.error(err.stack); + + process.exit(1); +});