Commit 8b790407 authored by Leo Iannacone's avatar Leo Iannacone

upgrade socket.io 0.9.17

parent b3f6d49d
0.9.17 / 2014-05-22
===================
* use static channels for remote syncing instead of subscribing/unsubscribing 5 channels for every connection
* Use destroy buffer size on websocket transport method as well
* http-polling : adding 'X-XSS-Protection : 0;' to headers necessary not only to jsonp-polling but http-polling
0.9.16 / 2013-06-06 0.9.16 / 2013-06-06
=================== ===================
......
...@@ -320,8 +320,46 @@ Manager.prototype.initStore = function () { ...@@ -320,8 +320,46 @@ Manager.prototype.initStore = function () {
this.store.subscribe('disconnect', function (id) { this.store.subscribe('disconnect', function (id) {
self.onDisconnect(id); self.onDisconnect(id);
}); });
};
// we need to do this in a pub/sub way since the client can POST the message
// over a different socket (ie: different Transport instance)
//use persistent channel for these, don't add and remove 5 channels for every connection
//eg. for 10,000 concurrent users this creates 50,000 channels in redis, which kind of slows things down
//we only need 5 (extra) total channels at all times
this.store.subscribe('message-remote',function (id, packet) {
self.onClientMessage(id, packet);
});
this.store.subscribe('disconnect-remote', function (id, reason) {
self.onClientDisconnect(id, reason);
});
this.store.subscribe('dispatch-remote', function (id, packet, volatile) {
var transport = self.transports[id];
if (transport) {
transport.onDispatch(packet, volatile);
}
if (!volatile) {
self.onClientDispatch(id, packet);
}
});
this.store.subscribe('heartbeat-clear', function (id) {
var transport = self.transports[id];
if (transport) {
transport.onHeartbeatClear();
}
});
this.store.subscribe('disconnect-force', function (id) {
var transport = self.transports[id];
if (transport) {
transport.onForcedDisconnect();
}
});
};
/** /**
* Called when a client handshakes. * Called when a client handshakes.
* *
...@@ -354,19 +392,17 @@ Manager.prototype.onOpen = function (id) { ...@@ -354,19 +392,17 @@ Manager.prototype.onOpen = function (id) {
if (this.closed[id]) { if (this.closed[id]) {
var self = this; var self = this;
this.store.unsubscribe('dispatch:' + id, function () { var transport = self.transports[id];
var transport = self.transports[id]; if (self.closed[id] && self.closed[id].length && transport) {
if (self.closed[id] && self.closed[id].length && transport) {
// if we have buffered messages that accumulate between calling // if we have buffered messages that accumulate between calling
// onOpen an this async callback, send them if the transport is // onOpen an this async callback, send them if the transport is
// still open, otherwise leave them buffered // still open, otherwise leave them buffered
if (transport.open) { if (transport.open) {
transport.payload(self.closed[id]); transport.payload(self.closed[id]);
self.closed[id] = []; self.closed[id] = [];
}
} }
}); }
} }
// clear the current transport // clear the current transport
...@@ -457,12 +493,6 @@ Manager.prototype.onClose = function (id) { ...@@ -457,12 +493,6 @@ Manager.prototype.onClose = function (id) {
this.closed[id] = []; this.closed[id] = [];
var self = this; var self = this;
this.store.subscribe('dispatch:' + id, function (packet, volatile) {
if (!volatile) {
self.onClientDispatch(id, packet);
}
});
}; };
/** /**
...@@ -512,7 +542,7 @@ Manager.prototype.onClientDisconnect = function (id, reason) { ...@@ -512,7 +542,7 @@ Manager.prototype.onClientDisconnect = function (id, reason) {
* @param text * @param text
*/ */
Manager.prototype.onDisconnect = function (id, local) { Manager.prototype.onDisconnect = function (id) {
delete this.handshaken[id]; delete this.handshaken[id];
if (this.open[id]) { if (this.open[id]) {
...@@ -542,13 +572,6 @@ Manager.prototype.onDisconnect = function (id, local) { ...@@ -542,13 +572,6 @@ Manager.prototype.onDisconnect = function (id, local) {
} }
this.store.destroyClient(id, this.get('client store expiration')); this.store.destroyClient(id, this.get('client store expiration'));
this.store.unsubscribe('dispatch:' + id);
if (local) {
this.store.unsubscribe('message:' + id);
this.store.unsubscribe('disconnect:' + id);
}
}; };
/** /**
...@@ -646,7 +669,7 @@ Manager.prototype.handleClient = function (data, req) { ...@@ -646,7 +669,7 @@ Manager.prototype.handleClient = function (data, req) {
if (this.transports[data.id] && this.transports[data.id].open) { if (this.transports[data.id] && this.transports[data.id].open) {
this.transports[data.id].onForcedDisconnect(); this.transports[data.id].onForcedDisconnect();
} else { } else {
this.store.publish('disconnect-force:' + data.id); this.store.publish('disconnect-force', data.id);
} }
req.res.writeHead(200); req.res.writeHead(200);
req.res.end(); req.res.end();
...@@ -699,14 +722,6 @@ Manager.prototype.handleClient = function (data, req) { ...@@ -699,14 +722,6 @@ Manager.prototype.handleClient = function (data, req) {
} }
} }
} }
this.store.subscribe('message:' + data.id, function (packet) {
self.onClientMessage(data.id, packet);
});
this.store.subscribe('disconnect:' + data.id, function (reason) {
self.onClientDisconnect(data.id, reason);
});
} }
} else { } else {
if (transport.open) { if (transport.open) {
...@@ -802,11 +817,11 @@ Manager.prototype.handleHandshake = function (data, req, res) { ...@@ -802,11 +817,11 @@ Manager.prototype.handleHandshake = function (data, req, res) {
res.writeHead(200, headers); res.writeHead(200, headers);
} }
res.end(hs);
self.onHandshake(id, newData || handshakeData); self.onHandshake(id, newData || handshakeData);
self.store.publish('handshake', id, newData || handshakeData); self.store.publish('handshake', id, newData || handshakeData);
res.end(hs);
self.log.info('handshake authorized', id); self.log.info('handshake authorized', id);
} else { } else {
writeErr(403, 'handshake unauthorized'); writeErr(403, 'handshake unauthorized');
......
...@@ -233,7 +233,7 @@ Socket.prototype.dispatch = function (packet, volatile) { ...@@ -233,7 +233,7 @@ Socket.prototype.dispatch = function (packet, volatile) {
this.manager.onClientDispatch(this.id, packet, volatile); this.manager.onClientDispatch(this.id, packet, volatile);
} }
this.manager.store.publish('dispatch:' + this.id, packet, volatile); this.manager.store.publish('dispatch-remote', this.id, packet, volatile);
} }
}; };
...@@ -296,7 +296,7 @@ Socket.prototype.disconnect = function () { ...@@ -296,7 +296,7 @@ Socket.prototype.disconnect = function () {
this.manager.transports[this.id].onForcedDisconnect(); this.manager.transports[this.id].onForcedDisconnect();
} else { } else {
this.manager.onClientDisconnect(this.id); this.manager.onClientDisconnect(this.id);
this.manager.store.publish('disconnect:' + this.id); this.manager.store.publish('disconnect-remote', this.id);
} }
} else { } else {
this.packet({type: 'disconnect'}); this.packet({type: 'disconnect'});
......
...@@ -89,20 +89,6 @@ Transport.prototype.onSocketConnect = function () { }; ...@@ -89,20 +89,6 @@ Transport.prototype.onSocketConnect = function () { };
Transport.prototype.setHandlers = function () { Transport.prototype.setHandlers = function () {
var self = this; var self = this;
// we need to do this in a pub/sub way since the client can POST the message
// over a different socket (ie: different Transport instance)
this.store.subscribe('heartbeat-clear:' + this.id, function () {
self.onHeartbeatClear();
});
this.store.subscribe('disconnect-force:' + this.id, function () {
self.onForcedDisconnect();
});
this.store.subscribe('dispatch:' + this.id, function (packet, volatile) {
self.onDispatch(packet, volatile);
});
this.bound = { this.bound = {
end: this.onSocketEnd.bind(this) end: this.onSocketEnd.bind(this)
, close: this.onSocketClose.bind(this) , close: this.onSocketClose.bind(this)
...@@ -126,10 +112,6 @@ Transport.prototype.setHandlers = function () { ...@@ -126,10 +112,6 @@ Transport.prototype.setHandlers = function () {
Transport.prototype.clearHandlers = function () { Transport.prototype.clearHandlers = function () {
if (this.handlersSet) { if (this.handlersSet) {
this.store.unsubscribe('disconnect-force:' + this.id);
this.store.unsubscribe('heartbeat-clear:' + this.id);
this.store.unsubscribe('dispatch:' + this.id);
this.socket.removeListener('end', this.bound.end); this.socket.removeListener('end', this.bound.end);
this.socket.removeListener('close', this.bound.close); this.socket.removeListener('close', this.bound.close);
this.socket.removeListener('error', this.bound.error); this.socket.removeListener('error', this.bound.error);
...@@ -350,7 +332,7 @@ Transport.prototype.onMessage = function (packet) { ...@@ -350,7 +332,7 @@ Transport.prototype.onMessage = function (packet) {
if (current && current.open) { if (current && current.open) {
current.onHeartbeatClear(); current.onHeartbeatClear();
} else { } else {
this.store.publish('heartbeat-clear:' + this.id); this.store.publish('heartbeat-clear', this.id);
} }
} else { } else {
if ('disconnect' == packet.type && packet.endpoint == '') { if ('disconnect' == packet.type && packet.endpoint == '') {
...@@ -359,7 +341,7 @@ Transport.prototype.onMessage = function (packet) { ...@@ -359,7 +341,7 @@ Transport.prototype.onMessage = function (packet) {
if (current) { if (current) {
current.onForcedDisconnect(); current.onForcedDisconnect();
} else { } else {
this.store.publish('disconnect-force:' + this.id); this.store.publish('disconnect-force', this.id);
} }
return; return;
...@@ -378,7 +360,7 @@ Transport.prototype.onMessage = function (packet) { ...@@ -378,7 +360,7 @@ Transport.prototype.onMessage = function (packet) {
current.onDispatch(ack); current.onDispatch(ack);
} else { } else {
this.manager.onClientDispatch(this.id, ack); this.manager.onClientDispatch(this.id, ack);
this.store.publish('dispatch:' + this.id, ack); this.store.publish('dispatch-remote', this.id, ack);
} }
} }
...@@ -386,7 +368,7 @@ Transport.prototype.onMessage = function (packet) { ...@@ -386,7 +368,7 @@ Transport.prototype.onMessage = function (packet) {
if (current) { if (current) {
this.manager.onClientMessage(this.id, packet); this.manager.onClientMessage(this.id, packet);
} else { } else {
this.store.publish('message:' + this.id, packet); this.store.publish('message-remote', this.id, packet);
} }
} }
}; };
...@@ -464,10 +446,10 @@ Transport.prototype.end = function (reason) { ...@@ -464,10 +446,10 @@ Transport.prototype.end = function (reason) {
this.disconnected = true; this.disconnected = true;
if (local) { if (local) {
this.manager.onClientDisconnect(this.id, reason, true); this.manager.onClientDisconnect(this.id, reason);
} else {
this.store.publish('disconnect:' + this.id, reason);
} }
this.store.publish('disconnect-remote', this.id, reason);
} }
}; };
......
...@@ -79,6 +79,7 @@ HTTPTransport.prototype.handleRequest = function (req) { ...@@ -79,6 +79,7 @@ HTTPTransport.prototype.handleRequest = function (req) {
// https://developer.mozilla.org/En/HTTP_Access_Control // https://developer.mozilla.org/En/HTTP_Access_Control
headers['Access-Control-Allow-Origin'] = origin; headers['Access-Control-Allow-Origin'] = origin;
headers['Access-Control-Allow-Credentials'] = 'true'; headers['Access-Control-Allow-Credentials'] = 'true';
headers['X-XSS-Protection'] = '0';
} }
} else { } else {
Transport.prototype.handleRequest.call(this, req); Transport.prototype.handleRequest.call(this, req);
......
...@@ -30,7 +30,7 @@ function WebSocket (mng, data, req) { ...@@ -30,7 +30,7 @@ function WebSocket (mng, data, req) {
// parser // parser
var self = this; var self = this;
this.parser = new Parser(); this.parser = new Parser({maxBuffer: mng.get('destroy buffer size')});
this.parser.on('data', function (packet) { this.parser.on('data', function (packet) {
self.log.debug(self.name + ' received data packet', packet); self.log.debug(self.name + ' received data packet', packet);
self.onMessage(parser.decodePacket(packet)); self.onMessage(parser.decodePacket(packet));
...@@ -41,6 +41,11 @@ function WebSocket (mng, data, req) { ...@@ -41,6 +41,11 @@ function WebSocket (mng, data, req) {
this.parser.on('error', function () { this.parser.on('error', function () {
self.end(); self.end();
}); });
this.parser.on('kick', function (reason) {
self.log.warn(self.name + ' parser forced user kick: ' + reason);
self.onMessage({type: 'disconnect', endpoint: ''});
self.end();
});
Transport.call(this, mng, data, req); Transport.call(this, mng, data, req);
}; };
...@@ -293,7 +298,9 @@ WebSocket.prototype.doClose = function () { ...@@ -293,7 +298,9 @@ WebSocket.prototype.doClose = function () {
* @api public * @api public
*/ */
function Parser () { function Parser (opts) {
this._maxBuffer = (opts && opts.maxBuffer) || 10E7;
this._dataLength = 0;
this.buffer = ''; this.buffer = '';
this.i = 0; this.i = 0;
}; };
...@@ -311,6 +318,13 @@ Parser.prototype.__proto__ = EventEmitter.prototype; ...@@ -311,6 +318,13 @@ Parser.prototype.__proto__ = EventEmitter.prototype;
*/ */
Parser.prototype.add = function (data) { Parser.prototype.add = function (data) {
this._dataLength += data.length;
if(this._dataLength > this._maxBuffer) {
this.buffer = ''; //Clear buffer
this.emit('kick', 'max buffer size reached');
return;
}
this.buffer += data; this.buffer += data;
this.parse(); this.parse();
}; };
......
...@@ -35,7 +35,7 @@ function WebSocket (mng, data, req) { ...@@ -35,7 +35,7 @@ function WebSocket (mng, data, req) {
var self = this; var self = this;
this.manager = mng; this.manager = mng;
this.parser = new Parser(); this.parser = new Parser({maxBuffer: mng.get('destroy buffer size')});
this.parser.on('data', function (packet) { this.parser.on('data', function (packet) {
self.onMessage(parser.decodePacket(packet)); self.onMessage(parser.decodePacket(packet));
}); });
...@@ -56,6 +56,11 @@ function WebSocket (mng, data, req) { ...@@ -56,6 +56,11 @@ function WebSocket (mng, data, req) {
self.log.warn(self.name + ' parser error: ' + reason); self.log.warn(self.name + ' parser error: ' + reason);
self.end(); self.end();
}); });
this.parser.on('kick', function (reason) {
self.log.warn(self.name + ' parser forced user kick: ' + reason);
self.onMessage({type: 'disconnect', endpoint: ''});
self.end();
});
Transport.call(this, mng, data, req); Transport.call(this, mng, data, req);
}; };
...@@ -266,7 +271,7 @@ WebSocket.prototype.doClose = function () { ...@@ -266,7 +271,7 @@ WebSocket.prototype.doClose = function () {
* @api public * @api public
*/ */
function Parser () { function Parser (opts) {
this.state = { this.state = {
activeFragmentedOperation: null, activeFragmentedOperation: null,
lastFragment: false, lastFragment: false,
...@@ -278,6 +283,8 @@ function Parser () { ...@@ -278,6 +283,8 @@ function Parser () {
this.expectBuffer = null; this.expectBuffer = null;
this.expectHandler = null; this.expectHandler = null;
this.currentMessage = ''; this.currentMessage = '';
this._maxBuffer = (opts && opts.maxBuffer) || 10E7;
this._dataLength = 0;
var self = this; var self = this;
this.opcodeHandlers = { this.opcodeHandlers = {
...@@ -448,6 +455,15 @@ Parser.prototype.__proto__ = EventEmitter.prototype; ...@@ -448,6 +455,15 @@ Parser.prototype.__proto__ = EventEmitter.prototype;
*/ */
Parser.prototype.add = function(data) { Parser.prototype.add = function(data) {
this._dataLength += data.length;
if (this._dataLength > this._maxBuffer) {
// Clear data
this.overflow = null;
this.expectBuffer = null;
// Kick client
this.emit('kick', 'max buffer size reached');
return;
}
if (this.expectBuffer == null) { if (this.expectBuffer == null) {
this.addToOverflow(data); this.addToOverflow(data);
return; return;
...@@ -491,6 +507,10 @@ Parser.prototype.addToOverflow = function(data) { ...@@ -491,6 +507,10 @@ Parser.prototype.addToOverflow = function(data) {
*/ */
Parser.prototype.expect = function(what, length, handler) { Parser.prototype.expect = function(what, length, handler) {
if (length > this._maxBuffer) {
this.emit('kick', 'expected input larger than max buffer');
return;
}
this.expectBuffer = new Buffer(length); this.expectBuffer = new Buffer(length);
this.expectOffset = 0; this.expectOffset = 0;
this.expectHandler = handler; this.expectHandler = handler;
......
...@@ -34,7 +34,7 @@ function WebSocket (mng, data, req) { ...@@ -34,7 +34,7 @@ function WebSocket (mng, data, req) {
var self = this; var self = this;
this.manager = mng; this.manager = mng;
this.parser = new Parser(); this.parser = new Parser({maxBuffer: mng.get('destroy buffer size')});
this.parser.on('data', function (packet) { this.parser.on('data', function (packet) {
self.onMessage(parser.decodePacket(packet)); self.onMessage(parser.decodePacket(packet));
}); });
...@@ -55,6 +55,11 @@ function WebSocket (mng, data, req) { ...@@ -55,6 +55,11 @@ function WebSocket (mng, data, req) {
self.log.warn(self.name + ' parser error: ' + reason); self.log.warn(self.name + ' parser error: ' + reason);
self.end(); self.end();
}); });
this.parser.on('kick', function (reason) {
self.log.warn(self.name + ' parser forced user kick: ' + reason);
self.onMessage({type: 'disconnect', endpoint: ''});
self.end();
});
Transport.call(this, mng, data, req); Transport.call(this, mng, data, req);
}; };
...@@ -265,7 +270,7 @@ WebSocket.prototype.doClose = function () { ...@@ -265,7 +270,7 @@ WebSocket.prototype.doClose = function () {
* @api public * @api public
*/ */
function Parser () { function Parser (opts) {
this.state = { this.state = {
activeFragmentedOperation: null, activeFragmentedOperation: null,
lastFragment: false, lastFragment: false,
...@@ -277,6 +282,8 @@ function Parser () { ...@@ -277,6 +282,8 @@ function Parser () {
this.expectBuffer = null; this.expectBuffer = null;
this.expectHandler = null; this.expectHandler = null;
this.currentMessage = ''; this.currentMessage = '';
this._maxBuffer = (opts && opts.maxBuffer) || 10E7;
this._dataLength = 0;
var self = this; var self = this;
this.opcodeHandlers = { this.opcodeHandlers = {
...@@ -447,6 +454,15 @@ Parser.prototype.__proto__ = EventEmitter.prototype; ...@@ -447,6 +454,15 @@ Parser.prototype.__proto__ = EventEmitter.prototype;
*/ */
Parser.prototype.add = function(data) { Parser.prototype.add = function(data) {
this._dataLength += data.length;
if (this._dataLength > this._maxBuffer) {
// Clear data
this.overflow = null;
this.expectBuffer = null;
// Kick client
this.emit('kick', 'max buffer size reached');
return;
}
if (this.expectBuffer == null) { if (this.expectBuffer == null) {
this.addToOverflow(data); this.addToOverflow(data);
return; return;
...@@ -490,6 +506,10 @@ Parser.prototype.addToOverflow = function(data) { ...@@ -490,6 +506,10 @@ Parser.prototype.addToOverflow = function(data) {
*/ */
Parser.prototype.expect = function(what, length, handler) { Parser.prototype.expect = function(what, length, handler) {
if (length > this._maxBuffer) {
this.emit('kick', 'expected input larger than max buffer');
return;
}
this.expectBuffer = new Buffer(length); this.expectBuffer = new Buffer(length);
this.expectOffset = 0; this.expectOffset = 0;
this.expectHandler = handler; this.expectHandler = handler;
......
gyp: Call to 'node -p -e "require('path').dirname(require.resolve('nan'))"' returned exit status 1. while trying to load binding.gyp
gyp ERR! configure error
gyp ERR! stack Error: `gyp` failed with exit code: 1
gyp ERR! stack at ChildProcess.onCpExit (/usr/share/node-gyp/lib/configure.js:431:16)
gyp ERR! stack at ChildProcess.EventEmitter.emit (events.js:98:17)
gyp ERR! stack at Process.ChildProcess._handle.onexit (child_process.js:797:12)
gyp ERR! System Linux 3.13.0-20-generic
gyp ERR! command "nodejs" "/usr/bin/node-gyp" "rebuild"
gyp ERR! cwd /home/l3on/Sources/devel/nodejs/debomatic-webui/debomatic-webui/node_modules/socket.io/node_modules/socket.io-client/node_modules/ws
gyp ERR! node -v v0.10.25
gyp ERR! node-gyp -v v0.10.10
gyp ERR! not ok
{ {
"name": "socket.io", "name": "socket.io",
"version": "0.9.16", "version": "0.9.17",
"description": "Real-time apps made cross-browser & easy with a WebSocket-like API", "description": "Real-time apps made cross-browser & easy with a WebSocket-like API",
"homepage": "http://socket.io", "homepage": "http://socket.io",
"keywords": [ "keywords": [
...@@ -65,6 +65,6 @@ ...@@ -65,6 +65,6 @@
"bugs": { "bugs": {
"url": "https://github.com/LearnBoost/socket.io/issues" "url": "https://github.com/LearnBoost/socket.io/issues"
}, },
"_id": "socket.io@0.9.16", "_id": "socket.io@0.9.17",
"_from": "socket.io@*" "_from": "socket.io@0.*"
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment