首页 > 网站 > WEB开发 > 正文

engine.io分析2--socket.io的基石

2024-04-27 14:15:25
字体:
来源:转载
供稿:网友

engine.io分析2--socket.io的基石

转载请注明: TheViperhttp://www.cnblogs.com/TheViper

源码分析

var engine = require('./node_modules/engine.io/lib/engine.io.js');var server = engine.listen(8000,{    transports:['polling']});server.on('connection', function(socket){  socket.send('utf 8 string');});

之前提到过engine.io只有websocket,polling两种传输方式。对于websocket,engine.io用了第三方库ws,这个从源码目录看的出来,调用很简单。所以本文只说polling这种方式.

本人精力有限,所以只分析服务端,当是抛砖引玉。

首先是加载engine.io.js.

exports = module.exports = function() {  // backwards compatible use as `.attach`  // if first argument is an http server  if (arguments.length && arguments[0] instanceof http.Server) {    return attach.apply(this, arguments);  }  // if first argument is not an http server, then just make a regular eio server  return exports.Server.apply(null, arguments);};

注释上说的很清楚,if是为了兼容以前的版本,判断传入的是不是http.然后初始化Server.

Server初始化只是将参数写入,没有指定参数的,使用默认参数,还有就是定义后面用到的变量。最后,如果参数方式中有websocket,就初始化WebSocketServer。

  if (~this.transports.indexOf('websocket')) {    this.ws = new WebSocketServer({ noServer: true, clientTracking: false });  }

接着是engine.listen().

function listen(port, options, fn) {  if ('function' == typeof options) {    fn = options;    options = {};  }  var server = http.createServer(function (req, res) {    res.writeHead(501);    res.end('Not Implemented');  });  server.listen(port, fn);  // create engine server  var engine = exports.attach(server, options);  engine.httpServer = server;  return engine;};

先判断是不是重载的方法,如果是,则没有指定端口,使用默认端口,并且参数移位。后面是将我这种定义方式,变成

var engine = require('engine.io');var http = require('http').createServer().listen(3000);var server = engine.attach(http);

这是文档上另一种定义方式。

接着是attach()

function attach(server, options) {  var engine = new exports.Server(options);  engine.attach(server, options);  return engine;};

转到server里面的attach(),里面做的事就比较多了。简单说就是

  // cache and clean up listeners  var listeners = server.listeners('request').slice(0);  server.removeAllListeners('request');  server.on('close', self.close.bind(self));  // add request handler  server.on('request', function(req, res){    if (check(req)) {      debug('intercepting request for path "%s"', path);      self.handleRequest(req, res);    } else {      for (var i = 0, l = listeners.length; i < l; i++) {        listeners[i].call(server, req, res);      }    }  });

清空监听器,有请求传入时用server.on('request')监听。

  function check (req) {    return path == req.url.substr(0, path.length);  }

一般情况下,check()返回的是true,搞不懂什么时候返回false.看后面好像是负载均衡,需要配置多个监听的时候用。不明白!

接着是self.handleRequest(req, res);

Server.PRototype.handleRequest = function(req, res){  debug('handling "%s" http request "%s"', req.method, req.url);  this.prepare(req);  req.res = res;  var self = this;  this.verify(req, false, function(err, success) {    if (!success) {      sendErrorMessage(req, res, err);      return;    }    if (req._query.sid) {      debug('setting new request for existing client');      self.clients[req._query.sid].transport.onRequest(req);    } else {      self.handshake(req._query.transport, req);    }  });};

this.prepare(req)是将请求里面的查询参数封装到req._query上。然后是verify()

Server.prototype.verify = function(req, upgrade, fn){  // transport check  var transport = req._query.transport;  if (!~this.transports.indexOf(transport)) {    debug('unknown transport "%s"', transport);    return fn(Server.errors.UNKNOWN_TRANSPORT, false);  }  // sid check  var sid = req._query.sid;  if (sid) {    if (!this.clients.hasOwnProperty(sid))      return fn(Server.errors.UNKNOWN_SID, false);    if (!upgrade && this.clients[sid].transport.name !== transport) {      debug('bad request: unexpected transport without upgrade');      return fn(Server.errors.BAD_REQUEST, false);    }  } else {    // handshake is GET only    if ('GET' != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);    if (!this.allowRequest) return fn(null, true);    return this.allowRequest(req, fn);  }  fn(null, true);};

记下请求中要求的传输方式,例如http://localhost:3000/socket.io/?EIO=3&transport=polling&t=1418545776090-1&sid=M5e5hB6NAOmh7YW1AAAB。可以看到由transport参数指定。然后记下sid,这个请求中也有。注意,第一次请求的时候,由于没有实例socket,所以没有socket的id。这时判断有没有sid,确定是不是第一次请求。

下面我们假定是第一次请求,然后走else.

if ('GET' != req.method),这里要判断下,因为engine.io会通过请求的方法的不同确定后面的执行流程,默认get方法维持长连接,当然发出握手请求的也是get方法,这个后面会说到。

this.allowRequest像是拦截器,拦截握手请求,和传输升级的请求(websocket).

例子没有设置allowRequest方法,就直接fn(null,true),进入回调函数.

  this.verify(req, false, function(err, success) {    if (!success) {      sendErrorMessage(req, res, err);      return;    }    if (req._query.sid) {      debug('setting new request for existing client');      self.clients[req._query.sid].transport.onRequest(req);    } else {      self.handshake(req._query.transport, req);    }  });

如果是第一次请求,后面执行握手;如果不是,则选定已经存在的socket,调用socket上面的transport(传输方式,已经绑定到socket)的onRequest方法,这个后面会说到。现在我们还是第一次请求,所以开始握手。

Server.prototype.handshake = function(transport, req){  var id = base64id.generateId();  debug('handshaking client "%s"', id);  var transportName = transport;  try {    var transport = new transports[transport](req);    if ('polling' == transportName) {      transport.maxHttpBufferSize = this.maxHttpBufferSize;    }    if (req._query && req._query.b64) {      transport.supportsBinary = false;    } else {      transport.supportsBinary = true;    }  }  catch (e) {    sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);    return;  }  var socket = new Socket(id, this, transport, req);  var self = this;  if (false !== this.cookie) {    transport.on('headers', function(headers){      headers['Set-Cookie'] = self.cookie + '=' + id;    });  }  transport.onRequest(req);  this.clients[id] = socket;  this.clientsCount++;  socket.once('close', function(){    delete self.clients[id];    self.clientsCount--;  });  this.emit('connection', socket);};

var transport = new transports[transport](req);这里的transports是transports = require('./transports')。加载的是transprots文件夹下面的index.js,不是transprots.js.

var XHR = require('./polling-xhr');var JSONP = require('./polling-jsonp');module.exports = exports = {  polling: polling,  websocket: require('./websocket')};exports.polling.upgradesTo = ['websocket'];function polling (req) {  if ('string' == typeof req._query.j) {    return new JSONP(req);  } else {    return new XHR(req);  }}

由于transport由请求中的transprot参数决定,这里是polling。所以会执行polling(req),再看请求参数里有没有j,最终确定具体的传输方式。这里是xhr.

function XHR(req){  Polling.call(this, req);}XHR.prototype.__proto__ = Polling.prototype;

这里XHR继承了Polling.然后实例化Polling.

function Polling (req) {  Transport.call(this, req);}Polling.prototype.__proto__ = Transport.prototype;

又去实例化Transport.Transport继承EventEmitter。

回到主线,var socket = new Socket(id, this, transport, req);这里开始实例化socket了。socket里面做了什么,后面会说到。然后在header上设置cookie,值是上面产生的随机id.方便开发者做权限控制。

接着是transport.onRequest(req); 进入polling-xhr.js

XHR.prototype.onRequest = function (req) {  if ('OPTIONS' == req.method) {    var res = req.res;    var headers = this.headers(req);    headers['access-Control-Allow-Headers'] = 'Content-Type';    res.writeHead(200, headers);    res.end();  } else {    Polling.prototype.onRequest.call(this, req);  }};

这里会先判断请求方法是不是options,这个好像只有高级浏览器中才会出现。

另外如果在客户端中设置了强制使用jsonp的话,并且服务端传输方式是polling的话,在高级浏览器中,也不会出现options请求。

    var socket = eio('http://localhost:8000',{        forceJSONP:true    });

jsonp就是用来解决Ajax跨域的,由此看出options请求在这里和ajax跨域有关。

第一次请求是get方法,走后面。

Polling.prototype.onRequest = function (req) {  var res = req.res;    if ('GET' == req.method) {    this.onPollRequest(req, res);  } else if ('POST' == req.method) {    this.onDataRequest(req, res);  } else {    res.writeHead(500);    res.end();  }};

又对请求方法进行判断。从方法名字就看的出,get方法用来维持长连接的,当然广播返回的数据也是通过这个get方法。post方法用来传输客户端的数据,比如客户端聊天的文字,如果没有数据,就会发

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表