首页 > 网站 > WEB开发 > 正文

engine.io分析3--socket.io的基石

2024-04-27 14:15:25
字体:
来源:转载
供稿:网友

engine.io分析3--socket.io的基石

转载请注明: TheViperhttp://www.cnblogs.com/TheViper

上一篇讲了第一次请求(握手)的执行流程,这篇说说握手后,engine.io是怎么传输数据的。

engine.io对http的request事件进行了绑定。在回调函数中,根据有没有socket id来判断是不是握手请求。

    if (req._query.sid) {      debug('setting new request for existing client');      self.clients[req._query.sid].transport.onRequest(req);    } else {      self.handshake(req._query.transport, req);    }

上一篇走的是self.handshake(req._query.transport, req);,这篇走self.clients[req._query.sid].transport.onRequest(req);。

上一篇的握手请求是get方法。维持长连接的请求也是get方法。广播时,engine.io收到数据,会向长连接中写入数据,返回长连接,然后关闭。如果长连接超时了或者返回错误,客户端也会再次发出。如果在规定的时间(pingTimeout,默认60秒)内,服务器端没有收到客户端发出的长连接请求,则认定这个socket无效,从clients中清除该socket.

function Socket (id, server, transport, req) {。。。。  this.setTransport(transport);  this.onOpen();}
Socket.PRototype.setPingTimeout = function () {  var self = this;  clearTimeout(self.pingTimeoutTimer);  self.pingTimeoutTimer = setTimeout(function () {    self.onClose('ping timeout');  }, self.server.pingInterval + self.server.pingTimeout);};
Socket.prototype.onOpen = function () {  this.readyState = 'open';  // sends an `open` packet  this.transport.sid = this.id;  this.sendPacket('open', JSON.stringify({      sid: this.id    , upgrades: this.getAvailableUpgrades()    , pingInterval: this.server.pingInterval    , pingTimeout: this.server.pingTimeout  }));  this.emit('open');  this.setPingTimeout();};

可以看到,每次握手成功,初始化一个socket的时候,就会设置一个超时定时器。超时的话执行onClose(),里面就是各种清空,初始化,然后触发close事件,取消其他事件绑定等。

又回到onRequest(),针对请求方法的不同,执行不同的策略,

Polling.prototype.onRequest = function (req) {  var res = req.res;    if ('GET' == req.method) {    this.onPollRequest(req, res);  } else if ('POST' == req.method) {    this.onDataRequest(req, res);  } else {    res.writeHead(500);    res.end();  }};

如果是get方法

Polling.prototype.onPollRequest = function (req, res) {  debug('setting request');  this.req = req;  this.res = res;  var self = this;  function onClose () {    self.onError('poll connection closed prematurely');  }  function cleanup () {    req.removeListener('close', onClose);    self.req = self.res = null;  }  req.cleanup = cleanup;  req.on('close', onClose);  this.writable = true;  this.emit('drain');};

onPollRequest()里面其实就做了close事件绑定,触发drain事件。drain事件是在socket.js里面this.setTransport(transport);中绑定的。

this.transport.on('drain', this.flush.bind(this));可以看到,最终还是用了flush().在这里出发emit事件,就是为了刷出writeBuffer.比如,

server.on('connection', function(socket){  socket.send('utf 8 string');});

可以看到,如果有客户端加入成功,就会发出字符串响应。这时,客户端刚发出维持长连接的get请求,服务端出发emit事件,返回200响应,使writerbuffer(utf 8 string)输出到客户端。如果这里不出发emit事件,强制刷出,那数据会直到客户端发出心跳post请求时,服务端结束上一个get请求,返回get 200响应时,才会包含在里面输出到客户端,这样就没有了实时性了。

如果是post方法

Polling.prototype.onDataRequest = function (req, res) {  if (this.dataReq) {    // assert: this.dataRes, '.dataReq and .dataRes should be (un)set together'    this.onError('data request overlap from client');    res.writeHead(500);    return;  }  var isBinary = 'application/octet-stream' == req.headers['content-type'];  this.dataReq = req;  this.dataRes = res;  var chunks = isBinary ? new Buffer(0) : '';  var self = this;  function cleanup () {    chunks = isBinary ? new Buffer(0) : '';    req.removeListener('data', onData);    req.removeListener('end', onEnd);    req.removeListener('close', onClose);    self.dataReq = self.dataRes = null;  }  function onClose () {    cleanup();    self.onError('data request connection closed prematurely');  }  function onData (data) {    var contentLength;    if (typeof data == 'string') {      chunks += data;      contentLength = Buffer.byteLength(chunks);    } else {      chunks = Buffer.concat([chunks, data]);      contentLength = chunks.length;    }    if (contentLength > self.maxHttpBufferSize) {      chunks = '';      req.connection.destroy();    }  }  function onEnd () {    self.onData(chunks);    var headers = {      // text/html is required instead of text/plain to avoid an      // unwanted download dialog on certain user-agents (GH-43)      'Content-Type': 'text/html',      'Content-Length': 2    };    // prevent XSS warnings on IE    // https://github.com/LearnBoost/socket.io/pull/1333    var ua = req.headers['user-agent'];    if (ua && (~ua.indexOf(';MSIE') || ~ua.indexOf('Trident/'))) {      headers['X-XSS-Protection'] = '0';    }    res.writeHead(200, self.headers(req, headers));    res.end('ok');    cleanup();  }  req.abort = cleanup;  req.on('close', onClose);  req.on('data', onData);  req.on('end', onEnd);  if (!isBinary) req.setEncoding('utf8');};

里面其实就是对http request里面事件的绑定。当有数据传入时,onData(),判断是不是超出设定的buffer最大长度,如果没有超出,则将数据写入chunks。

当收到数据后,onEnd().然后是self.onData(chunks);

Polling.prototype.onData = function (data) {  debug('received "%s"', data);  var self = this;  var callback = function(packet) {    if ('close' == packet.type) {      debug('got xhr close packet');      self.onClose();      return false;    }    self.onPacket(packet);  };  parser.decodePayload(data, callback);};

parser.decodePayload(data, callback);用来处理字符编码的,可以简单的看成是对数据处理后执行回调函数。

self.onPacket(packet);在父类transport.js里面。

Transport.prototype.onPacket = function (packet) {  this.emit('packet', packet);};

触发transport上的packet事件,这个又是在socket.js的setTransport()里绑定的。前面的onPollRequest()里的drain事件也是。

Socket.prototype.setTransport = function (transport) {  this.transport = transport;  this.transport.once('error', this.onError.bind(this));  this.transport.on('packet', this.onPacket.bind(this));  this.transport.on('drain', this.flush.bind(this));  this.transport.once('close', this.onClose.bind(this, 'transport close'));  //this function will manage packet events (also message callbacks)  this.setupSendCallback();};

然后是onPacket()回调

Socket.prototype.onPacket = function (packet) {  if ('open' == this.readyState) {    // export packet event    debug('packet');    this.emit('packet', packet);    // Reset ping timeout on any packet, incoming data is a good sign of    // other side's liveness    this.setPingTimeout();    switch (packet.type) {      case 'ping':        debug('got ping');        this.sendPacket('pong');        this.emit('heartbeat');        break;      case 'error':        this.onClose('parse error');        break;      case 'message':        this.emit('data', packet.data);        this.emit('message', packet.data);        break;    }  } else {    debug('packet received with closed socket');  }};

重新设置超时定时器.由于每次长连接关闭的时候总会再发出一个post请求,engine.io由此确定客户端还在,这时,packet.type是ping,然后返回响应的心跳。客户端收到才会重新发出新的get长连接请求。

如果post请求中带有数据,packet.type会是message.触发data,message事件。绑定,回调在服务端自定义。这样服务器端就收到了客户端的数据。

接着self.onData(chunks);,后面就是返回post方法的200响应了。表示收到客户端发过来的心跳了。

注意,这里post方法只会返回ok,服务端传的数据是通过get方法返回的。

具体的,socket.send,socket.write.这是向客户端返回数据的方法。很简单。

Socket.prototype.send =Socket.prototype.write = function(data, callback){  this.sendPacket('message', data, callback);  return this;};

这个方法最后用了flush().flush()是用来结束当前长连接的get请求,如果有数据传给客户端,则向其中添加数据。如果没有,则里面的响应是pong。这个后面会在客户端分析中具体说到。

Socket.prototype.flush = function () {  if ('closed' != this.readyState && this.transport.writable    && this.writeBuffer.length) {    debug('flushing buffer to transport');    this.emit('flush', this.writeBuffer);    this.server.emit('flush', this, this.writeBuffer);    var wbuf = this.writeBuffer;    this.writeBuffer = [];    this.packetsFn = [];    this.transport.send(wbuf);    this.emit('drain');    this.server.emit('drain', this);  }};
Polling.prototype.send = function (packets) {.........  var self = this;  parser.encodePayload(packets, this.supportsBinary, function(data) {    self.write(data);  });};
XHR.prototype.doWrite = function(data){  // explicit UTF-8 is required for pages not served under utf  var isString = typeof data == 'string';  var contentType = isString    ? 'text/plain; charset=UTF-8'    : 'application/octet-stream';  var contentLength = '' + (isString ? Buffer.byteLength(data) : da
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表