554 lines
16 KiB
JavaScript
554 lines
16 KiB
JavaScript
/*!
|
|
* ws: a node.js websocket client
|
|
* Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
|
|
* MIT Licensed
|
|
*/
|
|
|
|
var util = require('util')
|
|
, events = require('events')
|
|
, http = require('http')
|
|
, crypto = require('crypto')
|
|
, Options = require('options')
|
|
, WebSocket = require('./WebSocket')
|
|
, Extensions = require('./Extensions')
|
|
, PerMessageDeflate = require('./PerMessageDeflate')
|
|
, tls = require('tls')
|
|
, url = require('url');
|
|
|
|
/**
|
|
* WebSocket Server implementation
|
|
*/
|
|
|
|
function WebSocketServer(options, callback) {
|
|
if (this instanceof WebSocketServer === false) {
|
|
return new WebSocketServer(options, callback);
|
|
}
|
|
|
|
events.EventEmitter.call(this);
|
|
|
|
options = new Options({
|
|
host: '0.0.0.0',
|
|
port: null,
|
|
server: null,
|
|
verifyClient: null,
|
|
handleProtocols: null,
|
|
path: null,
|
|
noServer: false,
|
|
disableHixie: false,
|
|
clientTracking: true,
|
|
perMessageDeflate: true,
|
|
maxPayload: 100 * 1024 * 1024
|
|
}).merge(options);
|
|
|
|
if (!options.isDefinedAndNonNull('port') && !options.isDefinedAndNonNull('server') && !options.value.noServer) {
|
|
throw new TypeError('`port` or a `server` must be provided');
|
|
}
|
|
|
|
var self = this;
|
|
|
|
if (options.isDefinedAndNonNull('port')) {
|
|
this._server = http.createServer(function (req, res) {
|
|
var body = http.STATUS_CODES[426];
|
|
res.writeHead(426, {
|
|
'Content-Length': body.length,
|
|
'Content-Type': 'text/plain'
|
|
});
|
|
res.end(body);
|
|
});
|
|
this._server.allowHalfOpen = false;
|
|
this._server.listen(options.value.port, options.value.host, callback);
|
|
this._closeServer = function() { if (self._server) self._server.close(); };
|
|
}
|
|
else if (options.value.server) {
|
|
this._server = options.value.server;
|
|
if (options.value.path) {
|
|
// take note of the path, to avoid collisions when multiple websocket servers are
|
|
// listening on the same http server
|
|
if (this._server._webSocketPaths && options.value.server._webSocketPaths[options.value.path]) {
|
|
throw new Error('two instances of WebSocketServer cannot listen on the same http server path');
|
|
}
|
|
if (typeof this._server._webSocketPaths !== 'object') {
|
|
this._server._webSocketPaths = {};
|
|
}
|
|
this._server._webSocketPaths[options.value.path] = 1;
|
|
}
|
|
}
|
|
if (this._server) {
|
|
this._onceServerListening = function() { self.emit('listening'); };
|
|
this._server.once('listening', this._onceServerListening);
|
|
}
|
|
|
|
if (typeof this._server != 'undefined') {
|
|
this._onServerError = function(error) { self.emit('error', error) };
|
|
this._server.on('error', this._onServerError);
|
|
this._onServerUpgrade = function(req, socket, upgradeHead) {
|
|
//copy upgradeHead to avoid retention of large slab buffers used in node core
|
|
var head = new Buffer(upgradeHead.length);
|
|
upgradeHead.copy(head);
|
|
|
|
self.handleUpgrade(req, socket, head, function(client) {
|
|
self.emit('connection'+req.url, client);
|
|
self.emit('connection', client);
|
|
});
|
|
};
|
|
this._server.on('upgrade', this._onServerUpgrade);
|
|
}
|
|
|
|
this.options = options.value;
|
|
this.path = options.value.path;
|
|
this.clients = [];
|
|
}
|
|
|
|
/**
|
|
* Inherits from EventEmitter.
|
|
*/
|
|
|
|
util.inherits(WebSocketServer, events.EventEmitter);
|
|
|
|
/**
|
|
* Immediately shuts down the connection.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
WebSocketServer.prototype.close = function(callback) {
|
|
// terminate all associated clients
|
|
var error = null;
|
|
try {
|
|
for (var i = 0, l = this.clients.length; i < l; ++i) {
|
|
this.clients[i].terminate();
|
|
}
|
|
}
|
|
catch (e) {
|
|
error = e;
|
|
}
|
|
|
|
// remove path descriptor, if any
|
|
if (this.path && this._server._webSocketPaths) {
|
|
delete this._server._webSocketPaths[this.path];
|
|
if (Object.keys(this._server._webSocketPaths).length == 0) {
|
|
delete this._server._webSocketPaths;
|
|
}
|
|
}
|
|
|
|
// close the http server if it was internally created
|
|
try {
|
|
if (typeof this._closeServer !== 'undefined') {
|
|
this._closeServer();
|
|
}
|
|
}
|
|
finally {
|
|
if (this._server) {
|
|
this._server.removeListener('listening', this._onceServerListening);
|
|
this._server.removeListener('error', this._onServerError);
|
|
this._server.removeListener('upgrade', this._onServerUpgrade);
|
|
}
|
|
delete this._server;
|
|
}
|
|
if(callback)
|
|
callback(error);
|
|
else if(error)
|
|
throw error;
|
|
}
|
|
|
|
/**
|
|
* Handle a HTTP Upgrade request.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
WebSocketServer.prototype.handleUpgrade = function(req, socket, upgradeHead, cb) {
|
|
// check for wrong path
|
|
if (this.options.path) {
|
|
var u = url.parse(req.url);
|
|
if (u && u.pathname !== this.options.path) return;
|
|
}
|
|
|
|
if (typeof req.headers.upgrade === 'undefined' || req.headers.upgrade.toLowerCase() !== 'websocket') {
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
|
|
if (req.headers['sec-websocket-key1']) handleHixieUpgrade.apply(this, arguments);
|
|
else handleHybiUpgrade.apply(this, arguments);
|
|
}
|
|
|
|
module.exports = WebSocketServer;
|
|
|
|
/**
|
|
* Entirely private apis,
|
|
* which may or may not be bound to a sepcific WebSocket instance.
|
|
*/
|
|
|
|
function handleHybiUpgrade(req, socket, upgradeHead, cb) {
|
|
// handle premature socket errors
|
|
var errorHandler = function() {
|
|
try { socket.destroy(); } catch (e) {}
|
|
}
|
|
socket.on('error', errorHandler);
|
|
|
|
// verify key presence
|
|
if (!req.headers['sec-websocket-key']) {
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
|
|
// verify version
|
|
var version = parseInt(req.headers['sec-websocket-version']);
|
|
if ([8, 13].indexOf(version) === -1) {
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
|
|
// verify protocol
|
|
var protocols = req.headers['sec-websocket-protocol'];
|
|
|
|
// verify client
|
|
var origin = version < 13 ?
|
|
req.headers['sec-websocket-origin'] :
|
|
req.headers['origin'];
|
|
|
|
// handle extensions offer
|
|
var extensionsOffer = Extensions.parse(req.headers['sec-websocket-extensions']);
|
|
|
|
// handler to call when the connection sequence completes
|
|
var self = this;
|
|
var completeHybiUpgrade2 = function(protocol) {
|
|
|
|
// calc key
|
|
var key = req.headers['sec-websocket-key'];
|
|
var shasum = crypto.createHash('sha1');
|
|
shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
|
|
key = shasum.digest('base64');
|
|
|
|
var headers = [
|
|
'HTTP/1.1 101 Switching Protocols'
|
|
, 'Upgrade: websocket'
|
|
, 'Connection: Upgrade'
|
|
, 'Sec-WebSocket-Accept: ' + key
|
|
];
|
|
|
|
if (typeof protocol != 'undefined') {
|
|
headers.push('Sec-WebSocket-Protocol: ' + protocol);
|
|
}
|
|
|
|
var extensions = {};
|
|
try {
|
|
extensions = acceptExtensions.call(self, extensionsOffer);
|
|
} catch (err) {
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
|
|
if (Object.keys(extensions).length) {
|
|
var serverExtensions = {};
|
|
Object.keys(extensions).forEach(function(token) {
|
|
serverExtensions[token] = [extensions[token].params]
|
|
});
|
|
headers.push('Sec-WebSocket-Extensions: ' + Extensions.format(serverExtensions));
|
|
}
|
|
|
|
// allows external modification/inspection of handshake headers
|
|
self.emit('headers', headers);
|
|
|
|
socket.setTimeout(0);
|
|
socket.setNoDelay(true);
|
|
try {
|
|
socket.write(headers.concat('', '').join('\r\n'));
|
|
}
|
|
catch (e) {
|
|
// if the upgrade write fails, shut the connection down hard
|
|
try { socket.destroy(); } catch (e) {}
|
|
return;
|
|
}
|
|
|
|
var client = new WebSocket([req, socket, upgradeHead], {
|
|
protocolVersion: version,
|
|
protocol: protocol,
|
|
extensions: extensions,
|
|
maxPayload: self.options.maxPayload
|
|
});
|
|
|
|
if (self.options.clientTracking) {
|
|
self.clients.push(client);
|
|
client.on('close', function() {
|
|
var index = self.clients.indexOf(client);
|
|
if (index != -1) {
|
|
self.clients.splice(index, 1);
|
|
}
|
|
});
|
|
}
|
|
|
|
// signal upgrade complete
|
|
socket.removeListener('error', errorHandler);
|
|
cb(client);
|
|
}
|
|
|
|
// optionally call external protocol selection handler before
|
|
// calling completeHybiUpgrade2
|
|
var completeHybiUpgrade1 = function() {
|
|
// choose from the sub-protocols
|
|
if (typeof self.options.handleProtocols == 'function') {
|
|
var protList = (protocols || "").split(/, */);
|
|
var callbackCalled = false;
|
|
var res = self.options.handleProtocols(protList, function(result, protocol) {
|
|
callbackCalled = true;
|
|
if (!result) abortConnection(socket, 401, 'Unauthorized');
|
|
else completeHybiUpgrade2(protocol);
|
|
});
|
|
if (!callbackCalled) {
|
|
// the handleProtocols handler never called our callback
|
|
abortConnection(socket, 501, 'Could not process protocols');
|
|
}
|
|
return;
|
|
} else {
|
|
if (typeof protocols !== 'undefined') {
|
|
completeHybiUpgrade2(protocols.split(/, */)[0]);
|
|
}
|
|
else {
|
|
completeHybiUpgrade2();
|
|
}
|
|
}
|
|
}
|
|
|
|
// optionally call external client verification handler
|
|
if (typeof this.options.verifyClient == 'function') {
|
|
var info = {
|
|
origin: origin,
|
|
secure: typeof req.connection.authorized !== 'undefined' || typeof req.connection.encrypted !== 'undefined',
|
|
req: req
|
|
};
|
|
if (this.options.verifyClient.length == 2) {
|
|
this.options.verifyClient(info, function(result, code, name) {
|
|
if (typeof code === 'undefined') code = 401;
|
|
if (typeof name === 'undefined') name = http.STATUS_CODES[code];
|
|
|
|
if (!result) abortConnection(socket, code, name);
|
|
else completeHybiUpgrade1();
|
|
});
|
|
return;
|
|
}
|
|
else if (!this.options.verifyClient(info)) {
|
|
abortConnection(socket, 401, 'Unauthorized');
|
|
return;
|
|
}
|
|
}
|
|
|
|
completeHybiUpgrade1();
|
|
}
|
|
|
|
function handleHixieUpgrade(req, socket, upgradeHead, cb) {
|
|
// handle premature socket errors
|
|
var errorHandler = function() {
|
|
try { socket.destroy(); } catch (e) {}
|
|
}
|
|
socket.on('error', errorHandler);
|
|
|
|
// bail if options prevent hixie
|
|
if (this.options.disableHixie) {
|
|
abortConnection(socket, 401, 'Hixie support disabled');
|
|
return;
|
|
}
|
|
|
|
// verify key presence
|
|
if (!req.headers['sec-websocket-key2']) {
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
|
|
var origin = req.headers['origin']
|
|
, self = this;
|
|
|
|
// setup handshake completion to run after client has been verified
|
|
var onClientVerified = function() {
|
|
var wshost;
|
|
if (!req.headers['x-forwarded-host'])
|
|
wshost = req.headers.host;
|
|
else
|
|
wshost = req.headers['x-forwarded-host'];
|
|
var location = ((req.headers['x-forwarded-proto'] === 'https' || socket.encrypted) ? 'wss' : 'ws') + '://' + wshost + req.url
|
|
, protocol = req.headers['sec-websocket-protocol'];
|
|
|
|
// build the response header and return a Buffer
|
|
var buildResponseHeader = function() {
|
|
var headers = [
|
|
'HTTP/1.1 101 Switching Protocols'
|
|
, 'Upgrade: WebSocket'
|
|
, 'Connection: Upgrade'
|
|
, 'Sec-WebSocket-Location: ' + location
|
|
];
|
|
if (typeof protocol != 'undefined') headers.push('Sec-WebSocket-Protocol: ' + protocol);
|
|
if (typeof origin != 'undefined') headers.push('Sec-WebSocket-Origin: ' + origin);
|
|
|
|
return new Buffer(headers.concat('', '').join('\r\n'));
|
|
};
|
|
|
|
// send handshake response before receiving the nonce
|
|
var handshakeResponse = function() {
|
|
|
|
socket.setTimeout(0);
|
|
socket.setNoDelay(true);
|
|
|
|
var headerBuffer = buildResponseHeader();
|
|
|
|
try {
|
|
socket.write(headerBuffer, 'binary', function(err) {
|
|
// remove listener if there was an error
|
|
if (err) socket.removeListener('data', handler);
|
|
return;
|
|
});
|
|
} catch (e) {
|
|
try { socket.destroy(); } catch (e) {}
|
|
return;
|
|
};
|
|
};
|
|
|
|
// handshake completion code to run once nonce has been successfully retrieved
|
|
var completeHandshake = function(nonce, rest, headerBuffer) {
|
|
// calculate key
|
|
var k1 = req.headers['sec-websocket-key1']
|
|
, k2 = req.headers['sec-websocket-key2']
|
|
, md5 = crypto.createHash('md5');
|
|
|
|
[k1, k2].forEach(function (k) {
|
|
var n = parseInt(k.replace(/[^\d]/g, ''))
|
|
, spaces = k.replace(/[^ ]/g, '').length;
|
|
if (spaces === 0 || n % spaces !== 0){
|
|
abortConnection(socket, 400, 'Bad Request');
|
|
return;
|
|
}
|
|
n /= spaces;
|
|
md5.update(String.fromCharCode(
|
|
n >> 24 & 0xFF,
|
|
n >> 16 & 0xFF,
|
|
n >> 8 & 0xFF,
|
|
n & 0xFF));
|
|
});
|
|
md5.update(nonce.toString('binary'));
|
|
|
|
socket.setTimeout(0);
|
|
socket.setNoDelay(true);
|
|
|
|
try {
|
|
var hashBuffer = new Buffer(md5.digest('binary'), 'binary');
|
|
var handshakeBuffer = new Buffer(headerBuffer.length + hashBuffer.length);
|
|
headerBuffer.copy(handshakeBuffer, 0);
|
|
hashBuffer.copy(handshakeBuffer, headerBuffer.length);
|
|
|
|
// do a single write, which - upon success - causes a new client websocket to be setup
|
|
socket.write(handshakeBuffer, 'binary', function(err) {
|
|
if (err) return; // do not create client if an error happens
|
|
var client = new WebSocket([req, socket, rest], {
|
|
protocolVersion: 'hixie-76',
|
|
protocol: protocol
|
|
});
|
|
if (self.options.clientTracking) {
|
|
self.clients.push(client);
|
|
client.on('close', function() {
|
|
var index = self.clients.indexOf(client);
|
|
if (index != -1) {
|
|
self.clients.splice(index, 1);
|
|
}
|
|
});
|
|
}
|
|
|
|
// signal upgrade complete
|
|
socket.removeListener('error', errorHandler);
|
|
cb(client);
|
|
});
|
|
}
|
|
catch (e) {
|
|
try { socket.destroy(); } catch (e) {}
|
|
return;
|
|
}
|
|
}
|
|
|
|
// retrieve nonce
|
|
var nonceLength = 8;
|
|
if (upgradeHead && upgradeHead.length >= nonceLength) {
|
|
var nonce = upgradeHead.slice(0, nonceLength);
|
|
var rest = upgradeHead.length > nonceLength ? upgradeHead.slice(nonceLength) : null;
|
|
completeHandshake.call(self, nonce, rest, buildResponseHeader());
|
|
}
|
|
else {
|
|
// nonce not present in upgradeHead
|
|
var nonce = new Buffer(nonceLength);
|
|
upgradeHead.copy(nonce, 0);
|
|
var received = upgradeHead.length;
|
|
var rest = null;
|
|
var handler = function (data) {
|
|
var toRead = Math.min(data.length, nonceLength - received);
|
|
if (toRead === 0) return;
|
|
data.copy(nonce, received, 0, toRead);
|
|
received += toRead;
|
|
if (received == nonceLength) {
|
|
socket.removeListener('data', handler);
|
|
if (toRead < data.length) rest = data.slice(toRead);
|
|
|
|
// complete the handshake but send empty buffer for headers since they have already been sent
|
|
completeHandshake.call(self, nonce, rest, new Buffer(0));
|
|
}
|
|
}
|
|
|
|
// handle additional data as we receive it
|
|
socket.on('data', handler);
|
|
|
|
// send header response before we have the nonce to fix haproxy buffering
|
|
handshakeResponse();
|
|
}
|
|
}
|
|
|
|
// verify client
|
|
if (typeof this.options.verifyClient == 'function') {
|
|
var info = {
|
|
origin: origin,
|
|
secure: typeof req.connection.authorized !== 'undefined' || typeof req.connection.encrypted !== 'undefined',
|
|
req: req
|
|
};
|
|
if (this.options.verifyClient.length == 2) {
|
|
var self = this;
|
|
this.options.verifyClient(info, function(result, code, name) {
|
|
if (typeof code === 'undefined') code = 401;
|
|
if (typeof name === 'undefined') name = http.STATUS_CODES[code];
|
|
|
|
if (!result) abortConnection(socket, code, name);
|
|
else onClientVerified.apply(self);
|
|
});
|
|
return;
|
|
}
|
|
else if (!this.options.verifyClient(info)) {
|
|
abortConnection(socket, 401, 'Unauthorized');
|
|
return;
|
|
}
|
|
}
|
|
|
|
// no client verification required
|
|
onClientVerified();
|
|
}
|
|
|
|
function acceptExtensions(offer) {
|
|
var extensions = {};
|
|
var options = this.options.perMessageDeflate;
|
|
var maxPayload = this.options.maxPayload;
|
|
if (options && offer[PerMessageDeflate.extensionName]) {
|
|
var perMessageDeflate = new PerMessageDeflate(options !== true ? options : {}, true, maxPayload);
|
|
perMessageDeflate.accept(offer[PerMessageDeflate.extensionName]);
|
|
extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
|
|
}
|
|
return extensions;
|
|
}
|
|
|
|
function abortConnection(socket, code, name) {
|
|
try {
|
|
var response = [
|
|
'HTTP/1.1 ' + code + ' ' + name,
|
|
'Content-type: text/html'
|
|
];
|
|
socket.write(response.concat('', '').join('\r\n'));
|
|
}
|
|
catch (e) { /* ignore errors - we've aborted this connection */ }
|
|
finally {
|
|
// ensure that an early aborted connection is shut down completely
|
|
try { socket.destroy(); } catch (e) {}
|
|
}
|
|
}
|