Compare commits

..

No commits in common. "master" and "v1.5.0" have entirely different histories.

6 changed files with 553 additions and 719 deletions

View File

@ -1,4 +1,6 @@
# proxy-packer | a [Root](https://rootprojects.org) project
# proxy-packer
| Sponsored by [ppl](https://ppl.family) |
"The M-PROXY Protocol" for node.js
@ -15,7 +17,8 @@ Browser <--/ \--> Device
It's the kind of thing you'd use to build a poor man's VPN, or port-forward router.
# The M-PROXY Protocol
The M-PROXY Protocol
===================
This is similar to "The PROXY Protocol" (a la HAProxy), but desgined for multiplexed tls, http, tcp, and udp
tunneled over arbitrary streams (such as WebSockets).
@ -57,7 +60,8 @@ service port (string) The listening port, such as 443. Useful for no
host or server name (string) Useful for services that can be routed by name, such as http, https, smtp, and dns.
```
## Tunneled TCP SNI Packet
Tunneled TCP SNI Packet
-----------------------
You should see that the result is simply all of the original packet with a leading header.
@ -87,13 +91,15 @@ Note that `16 03 01 00` starts at the 29th byte (at index 28 or 0x1C) instead of
The v1 header uses strings for address and service descriptor information,
but future versions may be binary.
# API
API
===
```js
var Packer = require('proxy-packer');
```
## Unpacker / Parser State Machine
Unpacker / Parser State Machine
-----------------------
The unpacker creates a state machine.
@ -102,28 +108,28 @@ composing a full message with header and data (unless data length is 0).
The state machine progresses through these states:
- version
- headerLength
- header
- data
* version
* headerLength
* header
* data
At the end of the data event (which may or may not contain a buffer of data)
one of the appropriate handlers will be called.
- control
- connection
- message
- pause
- resume
- end
- error
* control
* connection
* message
* pause
* resume
* end
* error
```js
unpacker = Packer.create(handlers); // Create a state machine for unpacking
unpacker.fns.addData(chunk); // process a chunk of data
handlers.oncontrol = function(tun) {}; // for communicating with the proxy
handlers.oncontrol = function (tun) { } // for communicating with the proxy
// tun.data is an array
// '[ -1, "[Error] bad hello" ]'
// '[ 0, "[Error] out-of-band error message" ]'
@ -131,22 +137,22 @@ handlers.oncontrol = function(tun) {}; // for communicating with the proxy
// '[ 1, "add_token" ]'
// '[ 1, "delete_token" ]'
handlers.onconnection = function(tun) {}; // a client has established a connection
handlers.onconnection = function (tun) { } // a client has established a connection
handlers.onmessage = function(tun) {}; // a client has sent a message
handlers.onmessage = function (tun) { } // a client has sent a message
// tun = { family, address, port, data
// , service, serviceport, name };
handlers.onpause = function(tun) {}; // proxy requests to pause upload to a client
handlers.onpause = function (tun) { } // proxy requests to pause upload to a client
// tun = { family, address, port };
handlers.onresume = function(tun) {}; // proxy requests to resume upload to a client
handlers.onresume = function (tun) { } // proxy requests to resume upload to a client
// tun = { family, address, port };
handlers.onend = function(tun) {}; // proxy requests to close a client's socket
handlers.onend = function (tun) { } // proxy requests to close a client's socket
// tun = { family, address, port };
handlers.onerror = function(err) {}; // proxy is relaying a client's error
handlers.onerror = function (err) { } // proxy is relaying a client's error
// err = { message, family, address, port };
```
@ -157,7 +163,8 @@ handlers.onconnect = function (tun) { } // a new client has co
-->
## Packer & Extras
Packer & Extras
------
Packs header metadata about connection into a buffer (potentially with original data), ready to send.
@ -188,12 +195,12 @@ var socket = Packer.Stream.wrapSocket(socketOrStream); // workaround for https:/
```js
var myTransform = Packer.Transform.create({
address: {
family: '...',
address: '...',
port: '...'
},
family: '...'
, address: '...'
, port: '...'
}
// hint at the service to be used
service: 'https'
, service: 'https'
});
```
@ -210,7 +217,6 @@ hexdump output.bin
Where `input.json` looks something like this:
`input.json`:
```
{ "version": 1
, "address": {
@ -225,12 +231,12 @@ Where `input.json` looks something like this:
}
```
## Raw TCP SNI Packet
Raw TCP SNI Packet
------------------
and `sni.tcp.bin` is any captured tcp packet, such as this one with a tls hello:
`sni.tcp.bin`:
```
0 1 2 3 4 5 6 7 8 9 A B C D D F
0000000 16 03 01 00 c2 01 00 00 be 03 03 57 e3 76 50 66
@ -249,7 +255,8 @@ and `sni.tcp.bin` is any captured tcp packet, such as this one with a tls hello:
00000c7
```
## Tunneled TCP SNI Packet
Tunneled TCP SNI Packet
-----------------------
You should see that the result is simply all of the original packet with a leading header.

185
index.js
View File

@ -3,37 +3,36 @@
var Packer = module.exports;
var serviceEvents = {
default: 'tunnelData',
connection: 'tunnelConnection',
control: 'tunnelControl',
error: 'tunnelError',
end: 'tunnelEnd',
pause: 'tunnelPause',
resume: 'tunnelResume'
default: 'tunnelData'
, connection: 'tunnelConnection'
, control: 'tunnelControl'
, error: 'tunnelError'
, end: 'tunnelEnd'
, pause: 'tunnelPause'
, resume: 'tunnelResume'
};
var serviceFuncs = {
default: 'onmessage',
connection: 'onconnection',
control: 'oncontrol',
error: 'onerror',
end: 'onend',
pause: 'onpause',
resume: 'onresume'
default: 'onmessage'
, connection: 'onconnection'
, control: 'oncontrol'
, error: 'onerror'
, end: 'onend'
, pause: 'onpause'
, resume: 'onresume'
};
Packer.create = function (opts) {
var machine;
if (!opts.onMessage && !opts.onmessage) {
machine = new (require('events')).EventEmitter();
machine = new (require('events').EventEmitter)();
} else {
machine = {};
}
machine.onmessage = opts.onmessage || opts.onMessage;
machine.oncontrol = opts.oncontrol || opts.onControl;
machine.onconnection =
opts.onconnection || opts.onConnection || function() {};
machine.onconnection = opts.onconnection || opts.onConnection || function () {};
machine.onerror = opts.onerror || opts.onError;
machine.onend = opts.onend || opts.onEnd;
machine.onpause = opts.onpause || opts.onPause;
@ -47,7 +46,7 @@ Packer.create = function(opts) {
machine.bufIndex = 0;
machine.fns.collectData = function (chunk, size) {
var chunkLeft = chunk.length - machine.chunkIndex;
var hasLen = size > 0;
var hasLen = (size > 0);
if (!hasLen) {
return Buffer.alloc(0);
@ -68,10 +67,7 @@ Packer.create = function(opts) {
// Read and mark as read however much data we need from the chunk to complete our buffer.
var partLen = size - machine.bufIndex;
var part = chunk.slice(
machine.chunkIndex,
machine.chunkIndex + partLen
);
var part = chunk.slice(machine.chunkIndex, machine.chunkIndex+partLen);
machine.chunkIndex += partLen;
// If we had nothing buffered than the part of the chunk we just read is all we need.
@ -91,8 +87,8 @@ Packer.create = function(opts) {
machine.fns.version = function (chunk) {
//console.log('');
//console.log('[version]');
if (255 - machine._version !== chunk[machine.chunkIndex]) {
console.error('not v' + machine._version + ' (or data is corrupt)');
if ((255 - machine._version) !== chunk[machine.chunkIndex]) {
console.error("not v" + machine._version + " (or data is corrupt)");
// no idea how to fix this yet
}
machine.chunkIndex += 1;
@ -100,6 +96,7 @@ Packer.create = function(opts) {
return true;
};
machine.headerLen = 0;
machine.fns.headerLength = function (chunk) {
//console.log('');
@ -129,7 +126,6 @@ Packer.create = function(opts) {
machine.service = machine._headers[4];
machine.serviceport = machine._headers[5];
machine.name = machine._headers[6];
machine.servicename = machine._headers[7];
//console.log('machine.service', machine.service);
return true;
@ -149,6 +145,7 @@ Packer.create = function(opts) {
}
}
//
// data, end, error
//
@ -157,7 +154,7 @@ Packer.create = function(opts) {
try {
msg = JSON.parse(data.toString());
} catch(e) {
msg.message = 'e:' + JSON.stringify(data);
msg.message = data.toString();
msg.code = 'E_UNKNOWN_ERR';
}
}
@ -170,21 +167,10 @@ Packer.create = function(opts) {
msg.name = machine.name;
msg.data = data;
if ('connection' === machine.service) {
msg.service = machine.servicename;
}
//console.log('msn', machine.service);
if (machine.emit) {
machine.emit(
serviceEvents[machine.service] ||
serviceEvents[msg.service] ||
serviceEvents.default
);
machine.emit(serviceEvents[msg.service] || serviceEvents.default);
} else {
(machine[serviceFuncs[machine.service]] ||
machine[serviceFuncs[msg.service]] ||
machine[serviceFuncs.default])(msg);
(machine[serviceFuncs[msg.service]] || machine[serviceFuncs.default])(msg);
}
return true;
@ -204,74 +190,45 @@ Packer.create = function(opts) {
machine.state %= machine.states.length;
}
}
if ('data' === machine.states[machine.state] && 0 === machine.bodyLen) {
machine.fns[machine.states[machine.state]](chunk);
machine.state += 1;
machine.state %= machine.states.length;
}
};
return machine;
};
Packer.packHeader = function (meta, data, service, andBody, oldways) {
if (oldways && !data) {
data = Buffer.from(' ');
if (oldways) {
data = data || Buffer.from(' ');
}
if (data && !Buffer.isBuffer(data)) {
data = Buffer.from(JSON.stringify(data));
data = new Buffer(JSON.stringify(data));
}
if (oldways && !data.byteLength) {
data = Buffer.from(' ');
}
if (service && -1 === ['control', 'connection'].indexOf(service)) {
//console.log('end?', service);
if (service && service !== 'control') {
meta.service = service;
}
var size = (data && data.byteLength) || 0;
var size = data && data.byteLength || 0;
var sizeReserve = andBody ? size : 0;
var version = 1;
var header;
if (service === 'control') {
header = Buffer.from(['', '', '', size, service].join(','));
} else if (service === 'connection') {
header = Buffer.from(
[
meta.family,
meta.address,
meta.port,
size,
'connection',
meta.serviceport || '',
meta.name || '',
meta.service || ''
].join(',')
);
} else {
header = Buffer.from(
[
meta.family,
meta.address,
meta.port,
size,
meta.service || '',
meta.serviceport || '',
meta.name || ''
].join(',')
);
}
else {
header = Buffer.from([
meta.family, meta.address, meta.port, size,
(meta.service || ''), (meta.serviceport || ''), (meta.name || '')
].join(','));
}
var metaBuf = Buffer.from([ 255 - version, header.length ]);
var buf = Buffer.alloc(
metaBuf.byteLength + header.byteLength + sizeReserve
);
var buf = Buffer.alloc(metaBuf.byteLength + header.byteLength + sizeReserve);
metaBuf.copy(buf, 0);
header.copy(buf, 2);
if (sizeReserve) {
data.copy(buf, 2 + header.byteLength);
}
if (sizeReserve) { data.copy(buf, 2 + header.byteLength); }
return buf;
};
@ -290,36 +247,35 @@ function extractSocketProps(socket, propNames) {
propNames.forEach(function (propName) {
props[propName] = socket['_' + propName];
});
} else if (socket._handle) {
if (
socket._handle._parent &&
socket._handle._parent.owner &&
socket._handle._parent.owner.stream &&
socket._handle._parent.owner.stream.remotePort
} else if (
socket._handle
&& socket._handle._parent
&& socket._handle._parent.owner
&& socket._handle._parent.owner.stream
&& socket._handle._parent.owner.stream.remotePort
) {
propNames.forEach(function (propName) {
props[propName] = socket._handle._parent.owner.stream[propName];
});
} else if (
socket._handle._parentWrap &&
socket._handle._parentWrap.remotePort
socket._handle._parentWrap
&& socket._handle._parentWrap
&& socket._handle._parentWrap.remotePort
) {
propNames.forEach(function (propName) {
props[propName] = socket._handle._parentWrap[propName];
});
} else if (
socket._handle._parentWrap &&
socket._handle._parentWrap._handle &&
socket._handle._parentWrap._handle.owner &&
socket._handle._parentWrap._handle.owner.stream &&
socket._handle._parentWrap._handle.owner.stream.remotePort
socket._handle._parentWrap
&& socket._handle._parentWrap._handle
&& socket._handle._parentWrap._handle.owner
&& socket._handle._parentWrap._handle.owner.stream
&& socket._handle._parentWrap._handle.owner.stream.remotePort
) {
propNames.forEach(function (propName) {
props[propName] =
socket._handle._parentWrap._handle.owner.stream[propName];
props[propName] = socket._handle._parentWrap._handle.owner.stream[propName];
});
}
}
return props;
}
function extractSocketProp(socket, propName) {
@ -331,8 +287,7 @@ function extractSocketProp(socket, propName) {
try {
value = value || socket._handle._parentWrap[propName];
value =
value || socket._handle._parentWrap._handle.owner.stream[propName];
value = value || socket._handle._parentWrap._handle.owner.stream[propName];
} catch (e) {}
return value || '';
@ -343,17 +298,12 @@ Packer.socketToAddr = function(socket) {
// tlsSocket.remoteAddress = remoteAddress; // causes core dump
// console.log(tlsSocket.remoteAddress);
var props = extractSocketProps(socket, [
'remoteFamily',
'remoteAddress',
'remotePort',
'localPort'
]);
var props = extractSocketProps(socket, [ 'remoteFamily', 'remoteAddress', 'remotePort', 'localPort' ]);
return {
family: props.remoteFamily,
address: props.remoteAddress,
port: props.remotePort,
serviceport: props.localPort
family: props.remoteFamily
, address: props.remoteAddress
, port: props.remotePort
, serviceport: props.localPort
};
};
@ -365,14 +315,14 @@ Packer.socketToId = function(socket) {
return Packer.addrToId(Packer.socketToAddr(socket));
};
var addressNames = [
'remoteAddress',
'remotePort',
'remoteFamily',
'localAddress',
'localPort'
'remoteAddress'
, 'remotePort'
, 'remoteFamily'
, 'localAddress'
, 'localPort'
];
/*
var sockFuncs = [
'address'
, 'destroy'
@ -383,7 +333,6 @@ var sockFuncs = [
, 'setNoDelay'
, 'setTimeout'
];
*/
// Unlike Packer.Stream.create this should handle all of the events needed to make everything work.
Packer.wrapSocket = function (socket) {
// node v10.2+ doesn't need a workaround for https://github.com/nodejs/node/issues/8854
@ -474,8 +423,8 @@ var Dup = {
write: function (chunk, encoding, cb) {
//console.log('_write', chunk.byteLength);
this.__my_socket.write(chunk, encoding, cb);
},
read: function(size) {
}
, read: function (size) {
//console.log('_read');
var x = this.__my_socket.read(size);
if (x) {

View File

@ -1,6 +1,6 @@
{
"name": "proxy-packer",
"version": "2.0.4",
"version": "1.5.0",
"description": "A strategy for packing and unpacking a proxy stream (i.e. packets through a tunnel). Handles multiplexed and tls connections. Used by telebit and telebitd.",
"main": "index.js",
"scripts": {

View File

@ -1,11 +1,10 @@
{
"version": 1,
"address": {
"family": "IPv4",
"address": "127.0.1.1",
"port": 4321,
"service": "https",
"serviceport": 443
},
"filepath": "./sni.hello.bin"
{ "version": 1
, "address": {
"family": "IPv4"
, "address": "127.0.1.1"
, "port": 4321
, "service": "https"
, "serviceport": 443
}
, "filepath": "./sni.hello.bin"
}

View File

@ -1,4 +1,4 @@
(function() {
;(function () {
'use strict';
var fs = require('fs');
@ -7,23 +7,20 @@
var sni = require('sni');
if (!infile || !outfile) {
console.error('Usage:');
console.error('node test/pack.js test/input.json test/output.bin');
console.error("Usage:");
console.error("node test/pack.js test/input.json test/output.bin");
process.exit(1);
return;
}
var path = require('path');
var json = JSON.parse(fs.readFileSync(infile, 'utf8'));
var data = require('fs').readFileSync(
path.resolve(path.dirname(infile), json.filepath),
null
);
var data = require('fs').readFileSync(path.resolve(path.dirname(infile), json.filepath), null);
var Packer = require('../index.js');
var servername = sni(data);
var m = data.toString().match(/(?:^|[\r\n])Host: ([^\r\n]+)[\r\n]*/im);
var hostname = ((m && m[1].toLowerCase()) || '').split(':')[0];
var hostname = (m && m[1].toLowerCase() || '').split(':')[0];
/*
function pack() {
@ -43,13 +40,6 @@ function pack() {
json.address.name = servername || hostname;
var buf = Packer.pack(json.address, data);
fs.writeFileSync(outfile, buf, null);
console.log(
'wrote ' +
buf.byteLength +
" bytes to '" +
outfile +
"' ('hexdump " +
outfile +
"' to inspect)"
);
})();
console.log("wrote " + buf.byteLength + " bytes to '" + outfile + "' ('hexdump " + outfile + "' to inspect)");
}());

View File

@ -3,197 +3,87 @@
var sni = require('sni');
var hello = require('fs').readFileSync(__dirname + '/sni.hello.bin');
var version = 1;
function getAddress() {
return {
family: 'IPv4',
address: '127.0.1.1',
port: 4321,
service: 'foo-https',
serviceport: 443,
name: 'foo-pokemap.hellabit.com'
var address = {
family: 'IPv4'
, address: '127.0.1.1'
, port: 4321
, service: 'foo-https'
, serviceport: 443
, name: 'foo-pokemap.hellabit.com'
};
}
var addr = getAddress();
var connectionHeader =
addr.family +
',' +
addr.address +
',' +
addr.port +
',0,connection,' +
(addr.serviceport || '') +
',' +
(addr.name || '') +
',' +
(addr.service || '');
var header =
addr.family +
',' +
addr.address +
',' +
addr.port +
',' +
hello.byteLength +
',' +
(addr.service || '') +
',' +
(addr.serviceport || '') +
',' +
(addr.name || '');
var endHeader =
addr.family +
',' +
addr.address +
',' +
addr.port +
',0,end,' +
(addr.serviceport || '') +
',' +
(addr.name || '');
var header = address.family + ',' + address.address + ',' + address.port + ',' + hello.byteLength
+ ',' + (address.service || '') + ',' + (address.serviceport || '') + ',' + (address.name || '')
;
var buf = Buffer.concat([
Buffer.from([255 - version, connectionHeader.length]),
Buffer.from(connectionHeader),
Buffer.from([255 - version, header.length]),
Buffer.from(header),
hello,
Buffer.from([255 - version, endHeader.length]),
Buffer.from(endHeader)
Buffer.from([ 255 - version, header.length ])
, Buffer.from(header)
, hello
]);
var services = { ssh: 22, http: 4080, https: 8443 };
var services = { 'ssh': 22, 'http': 4080, 'https': 8443 };
var clients = {};
var count = 0;
var packer = require('../');
var machine = packer.create({
onconnection: function(tun) {
console.info('');
if (!tun.service || 'connection' === tun.service) {
throw new Error('missing service: ' + JSON.stringify(tun));
}
console.info('[onConnection]');
count += 1;
},
onmessage: function (tun) {
//console.log('onmessage', tun);
var id = tun.family + ',' + tun.address + ',' + tun.port;
var service = 'https';
var port = services[service];
var servername = sni(tun.data);
console.info(
'[onMessage]',
service,
port,
servername,
tun.data.byteLength
);
console.log('');
console.log('[onMessage]');
if (!tun.data.equals(hello)) {
throw new Error(
"'data' packet is not equal to original 'hello' packet"
);
throw new Error("'data' packet is not equal to original 'hello' packet");
}
//console.log('all', tun.data.byteLength, 'bytes are equal');
//console.log('src:', tun.family, tun.address + ':' + tun.port + ':' + tun.serviceport);
//console.log('dst:', 'IPv4 127.0.0.1:' + port);
console.log('all', tun.data.byteLength, 'bytes are equal');
console.log('src:', tun.family, tun.address + ':' + tun.port + ':' + tun.serviceport);
console.log('dst:', 'IPv4 127.0.0.1:' + port);
if (!clients[id]) {
clients[id] = true;
if (!servername) {
throw new Error("no servername found for '" + id + "'");
}
//console.log("servername: '" + servername + "'", tun.name);
console.log("servername: '" + servername + "'", tun.name);
}
count += 1;
},
onerror: function() {
throw new Error('Did not expect onerror');
},
onend: function() {
console.info('[onEnd]');
count += 1;
}
, onerror: function () {
throw new Error("Did not expect onerror");
}
, onend: function () {
throw new Error("Did not expect onend");
}
});
var packts, packed;
packts = [];
packts.push(packer.packHeader(getAddress(), null, 'connection'));
//packts.push(packer.pack(address, hello));
packts.push(packer.packHeader(getAddress(), hello));
packts.push(hello);
packts.push(packer.packHeader(getAddress(), null, 'end'));
packed = Buffer.concat(packts);
var packed = packer.pack(address, hello);
if (!packed.equals(buf)) {
console.error('');
console.error(buf.toString('hex') === packed.toString('hex'));
console.error('');
console.error('auto-packed:');
console.error(packed.toString('hex'), packed.byteLength);
console.error('');
console.error('hand-packed:');
console.error(buf.toString('hex'), buf.byteLength);
console.error('');
throw new Error('packer (new) did not pack as expected');
throw new Error("packer did not pack as expected");
}
packts = [];
packts.push(packer.pack(getAddress(), null, 'connection'));
packts.push(packer.pack(getAddress(), hello));
//packts.push(packer.packHeader(getAddress(), hello));
//packts.push(hello);
packts.push(packer.pack(getAddress(), null, 'end'));
packed = Buffer.concat(packts);
// XXX TODO REMOVE
//
// Nasty fix for short-term backwards-compat
//
// In the old way of doing things we always have at least one byte
// of data (due to a parser bug which has now been fixed) and so
// there are two strings padded with a space which gives the
// data a length of 1 rather than 0
//
// Here all four of those instances are replaced, but it requires
// maching a few things on either side.
//
// Only 6 bytes are changed - two 1 => 0, four ' ' => ''
var hex = packed
.toString('hex')
//.replace(/2c313939/, '2c30')
.replace(/32312c312c636f/, '32312c302c636f')
.replace(/3332312c312c656e64/, '3332312c302c656e64')
.replace(/7320/, '73')
.replace(/20$/, '');
if (hex !== buf.toString('hex')) {
console.error('');
console.error(buf.toString('hex') === hex);
console.error('');
console.error('auto-packed:');
console.error(hex, packed.byteLength);
console.error('');
console.error('hand-packed:');
console.error(buf.toString('hex'), buf.byteLength);
console.error('');
throw new Error('packer (old) did not pack as expected');
}
console.info('');
console.log('');
// full message in one go
// 223 = 2 + 22 + 199
console.info('[WHOLE BUFFER]', 2, header.length, hello.length, buf.byteLength);
console.log('[WHOLE BUFFER]', 2, header.length, hello.length, buf.byteLength);
clients = {};
machine.fns.addChunk(buf);
console.info('');
console.log('');
// messages one byte at a time
console.info('[BYTE-BY-BYTE BUFFER]', 1);
console.log('[BYTE-BY-BYTE BUFFER]', 1);
clients = {};
buf.forEach(function (byte) {
machine.fns.addChunk(Buffer.from([ byte ]));
});
console.info('');
console.log('');
// split messages in overlapping thirds
// 0-2 (2)
@ -203,26 +93,25 @@ console.info('');
// 225-247 (22)
// 247-446 (199)
buf = Buffer.concat([ buf, buf ]);
console.info('[OVERLAPPING BUFFERS]', buf.length);
console.log('[OVERLAPPING BUFFERS]', buf.length);
clients = {};
[
buf.slice(0, 7), // version + header
buf.slice(7, 14), // header
buf.slice(14, 21), // header
buf.slice(21, 28), // header + body
buf.slice(28, 217), // body
buf.slice(217, 224), // body + version
buf.slice(224, 238), // version + header
buf.slice(238, buf.byteLength) // header + body
[ buf.slice(0, 7) // version + header
, buf.slice(7, 14) // header
, buf.slice(14, 21) // header
, buf.slice(21, 28) // header + body
, buf.slice(28, 217) // body
, buf.slice(217, 224) // body + version
, buf.slice(224, 238) // version + header
, buf.slice(238, buf.byteLength) // header + body
].forEach(function (buf) {
machine.fns.addChunk(Buffer.from(buf));
});
console.info('');
console.log('');
process.on('exit', function () {
if (count !== 12) {
throw new Error('should have delivered 12 messages, not ' + count);
if (count !== 4) {
throw new Error("should have delivered 4 messages, not", count);
}
console.info('TESTS PASS');
console.info('');
console.log('TESTS PASS');
console.log('');
});