首页 > 网站 > WEB开发 > 正文

engine.io客户端分析2--socket.io的基石

2024-04-27 14:15:12
字体:
来源:转载
供稿:网友

engine.io客户端分析2--socket.io的基石

转载请注明:TheViperhttp://www.cnblogs.com/TheViper

上一篇说到收到握手响应后的packet.type=open。接着是onHandshake()

Socket.PRototype.onHandshake = function (data) {  this.emit('handshake', data);  this.id = data.sid;  this.transport.query.sid = data.sid;  this.upgrades = this.filterUpgrades(data.upgrades);  this.pingInterval = data.pingInterval;  this.pingTimeout = data.pingTimeout;  this.onOpen();  // In case open handler closes socket  if  ('closed' == this.readyState) return;  this.setPing();  // Prolong liveness of socket on heartbeat  this.removeListener('heartbeat', this.onHeartbeat);  this.on('heartbeat', this.onHeartbeat);};
Socket.prototype.onOpen = function () {  debug('socket open');  this.readyState = 'open';  this.emit('open');  this.flush();};
Socket.prototype.flush = function () {  if ('closed' != this.readyState && this.transport.writable &&    !this.upgrading && this.writeBuffer.length) {    .......  this.transport.send(this.writeBuffer);  }};

writeBuffer里面没有内容,不会走里面。

Polling.prototype.onData = function(data){  .....  // decode payload  parser.decodePayload(data, this.socket.binaryType, callback);  // if an event did not trigger closing  if ('closed' != this.readyState) {    // if we got data we're not polling    this.polling = false;    this.emit('pollComplete');    if ('open' == this.readyState) {      this.poll();    } else {      debug('ignoring poll - transport state "%s"', this.readyState);    }  }};

新的get请求已经在之前onData里面的poll()建立了。

如果有数据,立刻用send,以post请求的方式,将数据传给服务端。比如,聊天的文字。

post请求除了用来维持心跳,还负责将客户端的数据传给服务端,长连接的get请求已经单独发出了,不能用其传递数据了,就只有用post请求了。

然后是setPing()

Socket.prototype.setPing = function () {  var self = this;  clearTimeout(self.pingIntervalTimer);  self.pingIntervalTimer = setTimeout(function () {    debug('writing ping packet - expecting pong within %sms', self.pingTimeout);    self.ping();    self.onHeartbeat(self.pingTimeout);  }, self.pingInterval);};

ping()用来向服务端发送心跳。

Socket.prototype.ping = function () {  this.sendPacket('ping');};

至此,握手结束。

话说握手的时候,服务端会setPingTimeout();

Socket.prototype.setPingTimeout = function () {  var self = this;  clearTimeout(self.pingTimeoutTimer);  self.pingTimeoutTimer = setTimeout(function () {    self.onClose('ping timeout');  }, self.server.pingTimeout);};

服务端会看在pingTimeout时间内,客户端有没有传送post请求,证明自己还在。

而客户端,在握手成功后,会setPing(),这个上面有。

然后等啊等,重要到pingInterval时间了,客户端发送post请求,this.sendPacket('ping');

另外,如果是客户端主动发送数据的话。

Socket.prototype.write =Socket.prototype.send = function (msg, fn) {  this.sendPacket('message', msg, fn);  return this;};

可以看到也是调用了sendPacket.

Socket.prototype.sendPacket = function (type, data, fn) {  if ('closing' == this.readyState || 'closed' == this.readyState) {    return;  }  var packet = { type: type, data: data };  this.emit('packetCreate', packet);  this.writeBuffer.push(packet);  this.callbackBuffer.push(fn);  this.flush();};
Socket.prototype.flush = function () {  if ('closed' != this.readyState && this.transport.writable &&    !this.upgrading && this.writeBuffer.length) {    debug('flushing %d packets in socket', this.writeBuffer.length);    this.transport.send(this.writeBuffer);    // keep track of current length of writeBuffer    // splice writeBuffer and callbackBuffer on `drain`    this.prevBufferLen = this.writeBuffer.length;    this.emit('flush');  }};

然后flush()

Transport.prototype.send = function(packets){  if ('open' == this.readyState) {    this.write(packets);  } else {    throw new Error('Transport not open');  }};
Polling.prototype.write = function(packets){  var self = this;  this.writable = false;  var callbackfn = function() {    self.writable = true;    self.emit('drain');  };  var self = this;  parser.encodePayload(packets, this.supportsBinary, function(data) {    self.doWrite(data, callbackfn);  });};
XHR.prototype.doWrite = function(data, fn){  var isBinary = typeof data !== 'string' && data !== undefined;  var req = this.request({ method: 'POST', data: data, isBinary: isBinary });  var self = this;  req.on('success', fn);  req.on('error', function(err){    self.onError('xhr post error', err);  });  this.sendXhr = req;};

注意,这里request绑定了success事件,这个后面收到响应后会用到。

服务端收到ping后,结束长连接的get请求,并通过它发回pong响应。

      xhr.onreadystatechange = function(){        if (4 != xhr.readyState) return;        if (200 == xhr.status || 1223 == xhr.status) {          self.onLoad();        } else {          // make sure the `error` event handler that's user-set          // does not throw in the same tick and gets caught here          setTimeout(function(){            self.onError(xhr.status);          }, 0);        }      };
Request.prototype.onLoad = function(){  var data;  try {    var contentType;    try {      contentType = this.xhr.getResponseHeader('Content-Type').split(';')[0];    } catch (e) {}    if (contentType === 'application/octet-stream') {      data = this.xhr.response;    } else {      if (!this.supportsBinary) {        data = this.xhr.responseText;      } else {        data = 'ok';      }    }  } catch (e) {    this.onError(e);  }  if (null != data) {    this.onData(data);  }};

onLoad()取回发回的数据。

Request.prototype.onSuccess = function(){  this.emit('success');  this.cleanup();};Request.prototype.onData = function(data){  this.emit('data', data);  this.onSuccess();};

注意,get请求上绑定data事件,用来接收数据;post请求上绑定success事件,用来确定接收服务端心跳成功。

对get请求,this.emit('data', data);这个在前面说收到握手响应的时候说过。只是最后解析出来的type是pong。

Socket.prototype.onPacket = function (packet) {  if ('opening' == this.readyState || 'open' == this.readyState) {    debug('socket receive: type "%s", data "%s"', packet.type, packet.data);    this.emit('packet', packet);    // Socket is live - any packet counts    this.emit('heartbeat');    switch (packet.type) {      case 'open':        this.onHandshake(parsejson(packet.data));        break;      case 'pong':        this.setPing();        break;      case 'error':        var err = new Error('server error');        err.code = packet.data;        this.emit('error', err);        break;      case 'message':        this.emit('data', packet.data);        this.emit('message', packet.data);        break;    }  } else {    debug('packet received with socket readyState "%s"', this.readyState);  }};

然后setPing()设置发出post请求的定时器。

对post请求的响应。注意onSuccess()里面this.emit('success');。这个在前面说发出post请求时,说到在request上绑定了success事件,这里就触发。回调函数是

  var callbackfn = function() {    self.writable = true;    self.emit('drain');  };
  transport  .on('drain', function(){    self.onDrain();  })
Socket.prototype.onDrain = function() {...  this.writeBuffer.splice(0, this.prevBufferLen);  this.callbackBuffer.splice(0, this.prevBufferLen);  // setting prevBufferLen = 0 is very important  // for example, when upgrading, upgrade packet is sent over,  // and a nonzero prevBufferLen could cause problems on `drain`  this.prevBufferLen = 0;  if (this.writeBuffer.length == 0) {    this.emit('drain');  } else {    this.flush();  }};

onDrain()里面会判断writeBuffer里有没有数据。道理和服务端onPollRequest()里面的this.emit("drain")一样,为了实时性。

最后说下,客户端新的get长连接请求是在什么时候发出的。

在解析get请求的响应时,self.onPacket()后并没有完,会调用poll()->doPoll().

Polling.prototype.onData = function(data){  var self = this;  debug('polling got data %s', data);  var callback = function(packet, index, total) {    // if its the first message we consider the transport open    if ('opening' == self.readyState) {      self.onOpen();    }    // if its a close packet, we close the ongoing requests    if ('close' == packet.type) {      self.onClose();      return false;    }    // otherwise bypass onData and handle the message    self.onPacket(packet);  };  // decode payload  parser.decodePayload(data, this.socket.binaryType, callback);  // if an event did not trigger closing  if ('closed' != this.readyState) {    //
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表