webtail 能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端,webtail文件读取基于linux的inotify,所以没有可移植性.
文件读取
先介绍下背景,之前遇到过服务器上需要长时间tail一个日志,之前经常是通过一个终端连到服务器上,但是对于长时间观察,就不太方便:老是要开着终端,没法用手机等移动设备查看;多人共享查看比较麻烦,都需要登录到服务器上。
webtail实现类似于tail,能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端。webtail文件读取基于linux的inotify,所以没有可移植性,websocket使用基于asio的websocketpp,代码维护在这里。
webtail代码维护在https://gitorious.org/webtail/webtail,目前还是一个简单可用的小应用,还有很多可以提升的地方。
本文介绍下文件读取的部分.
文件读取分为两个部分,文件读取和文件监控.
文件读取:
由于unix的文件多次打开独立维护file table,共享v-node table(参照APUE 3.10 file sharing)。因此只维护一个文件描述符和当前文件偏移,每次读取通过fstat获取文档当前大小,和维护的文件偏移进行对比,如果小于文件大小,则读取文件直到末尾,并输出。
这里有几点是模仿tailf的,首先是不修改文件访问时间,这是通过在open系统调用中,增加O_NOATIME,按照man open的解释:Do not update the file last access time (st_atime in the inode) when the file is read(2). This flag is intended for use by indexing or backup programs, where its use can significantly reduce the amount of disk activity. This flag may not be effective on all file systems. One example is NFS, where the server main?tains the access time. 启动了这个参数,通过read调用读取文件的时候,不会修改atime了。
其次是首次读取的时候,读取文件最后10行,由于websocket发送没有做缓存,所以从web端没法看见,这里代码也是参照了tailf,代码如下:
- char *buffer = new char[initLen * BUFSIZ];
- char *p = buffer;
- char lineBuffer[BUFSIZ];
- int readSize;
- int lineCount = 0;
- bool head = true;
- int i = 0, j = 0;
- while((readSize = ::read(fd, lineBuffer, BUFSIZ - 1)) > 0) {
- for(i = 0, j = 0; i < readSize; ++i) {
- // read line, save to buffer
- if(lineBuffer[i] == 'n') {
- std::memcpy(p, lineBuffer + j, i - j + 1);
- std::memset(p + i - j + 1, '', 1);
- j = i + 1;
- if(++lineCount >= initLen) {
- lineCount = 0;
- head = false;
- }
- p = buffer + (lineCount * BUFSIZ);
- }
- }
- // read break in the middle of line
- if(j < i) {
- // finished read all files
- if(readSize < BUFSIZ) {
- std::memcpy(p, lineBuffer + j, i - j + 1);
- std::memset(p + i - j + 1, '', 1);
- ++lineCount;
- } else if (j == 0){ // long line drop?
- continue;
- } else {
- // not finished, seek to line begin
- curPos = lseek(fd, j - i -1, SEEK_CUR);
- }
- }
- }
- std::string initReadResult;
- if(head) {
- for(i = 0; i < lineCount; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- }
- } else {
- for(i = lineCount; i < initLen; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- }
- for(i = 0; i < lineCount; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- } //Vevb.com
- }
- curPos = lseek(fd, 0, SEEK_CUR);
- delete buffer;
首先声明一段用来保存最终n行的缓存,缓存最长行长度是BUFSIZ,这个在linux中的定义是8192字节,然后每次读取BUFSIZ-1个字节,最后一个用来放,类似fgets的实现,解析其中的换行符,由于只打算在linux中使用,所以只解析n,最后根据状态,从缓存中读取最后n行数据到string中.
如果不是初始读取,之前已经说过逻辑了,将fstat获取到的文件大小和保存的当前偏移做比较,如果有新的内容,则读取,没有就直接返回。
文件监控:
文件监控直接通过了linux的inotify接口实现。这里没有考虑移植性,也就没像tailf那样,通过宏来判断是否支持inotify,如果不支持,降级使用循环轮寻的方式读取。
inotify的使用还是比较方便的基本上就是:inotify_init,inotify_add_watch,然后配合read系统调用,获取文件修改信息。因此实现也非常方便。
首先是在构造函数里面初始化inotify:inotifyFd = inotify_init();
然后提供一个watch接口,通过传入前文描述的TFile对象和内容读取的回调函数,添加对应文件的监控和回调,代码如下:
- void FileWatcher::watch ( boost::shared_ptr< TFile > tFile, std::list< FileWatcher::ReadCallBack > callBackList )
- {
- if(!tFile->hasError() && !callBackList.empty()) {
- int wd = inotify_add_watch(inotifyFd, tFile->name().c_str(), IN_MODIFY);
- if(wd > 0) {
- tFileMap.insert(std::make_pair<int, boost::shared_ptr<TFile> >(wd, tFile));
- callBackMap.insert(std::make_pair<int, std::list<ReadCallBack> >(wd, callBackList));
- //init read
- std::string initContent = tFile->read();
- BOOST_FOREACH(ReadCallBack &callback, callBackList) {
- callback(initContent);
- }
- }
- }
- }
这里通过TFile的文件名,向内核注册添加该文件的modify事件,并且在注册成功之后,进行初始读取(这里有个小问题,由于后面websocket端没有做缓存,所以由于初始读取的时候还没有任何websocket客户端连接,所以通过web无法读取初始内容,也就是文件最后10行)。同时,这个类维护两个hashmap,分别是监听描述符wd->tFile和wd->callbacklist。
监听完成后,就是启动监听,也就是通过读取fd,感知被监听文件的变更,由于这里只监听了文件修改,那么读取到这个事件之后,就可以对该文件进行增量读取(前文已经描述了读取方法),代码如下:
- char * buffer = new char[inotifyBufferSize];
- while(!_interrupted) {
- if(read(inotifyFd, buffer, inotifyBufferSize) < 0) {
- if(errno == EINTR) {
- // interrupt
- delete buffer;
- return;
- }
- }
- struct inotify_event *event = ( struct inotify_event * ) buffer;
- int wd = event->wd;
- BOOST_AUTO(tFileIter, tFileMap.find(wd));
- if(tFileIter != tFileMap.end()) {
- boost::shared_ptr<TFile> tFile = tFileIter->second;
- std::string content = tFile->read();
- BOOST_AUTO(iter, callBackMap.find(wd));
- if(iter != callBackMap.end()) {
- std::list<ReadCallBack> callbacks = iter->second;
- BOOST_FOREACH(ReadCallBack &callback, callbacks) {
- callback(content);
- }
- }
- }
- }
- delete buffer;
这里参照inotify的文档,首先读取缓冲区大小设置为:static const int inotifyBufferSize = sizeof(struct inotify_event) + NAME_MAX + 1;
也就是inotify_event结构的长度,和名字最大值。由于inotify_event是变长字段(包含一个可选的文件名字段),所以这里采用了系统限制的文件名最大值NAME_MAX,这个宏在climits中定义,在linux中大小为255字节。
然后通过系统调用read,读取文件描述符inotifyFd,这里如果没有新的事件产生,read会进入阻塞状态,节省系统资源。如果有返回,则处理返回的inotify_event对象(注意在监听modify事件的时候,是没有文件名的)。通过结构中的wd,从之前保存的hashmap中获取对应的tFile对象进行增量读取,然后再读取wd对应的回调函数,将读取内容返回。
这里有个小问题需要处理,就是如何中断读取。之前为了在gtest中能够通过单元测试的方式进行测试,通过查看手册可以知道,如果read调用被系统信号中断,会标记错误码为EINTR。所以,当读取失败的时候,可以通过对ERRNO检查,判断是否是信号中断。
由于程序会一直运行,知道通过信号终止,所以析构变的不是很重要了。这里析构函数里面通过调用inotify_rm_watch将之前保存的wd全部去掉,然后通过close调用交inotify的文件描述符关闭即可:
- FileWatcher::~FileWatcher()
- {
- if(inotifyFd > 0) {
- boost::unordered::unordered_map<int, boost::shared_ptr<TFile> >::iterator iter;
- for(iter = tFileMap.begin(); iter != tFileMap.end(); ++iter) {
- inotify_rm_watch(inotifyFd, iter->first);
- }
- close(inotifyFd);
- }
- }
- websocket
前面介绍了服务器端如何监听和增量读取文件,这里通过基于boost asio的websocketpp,实现了一个简单的websocket服务端,能够和浏览器进行通信,将读取到的文件通过websocket协议进行实时传送.
关于websocket的简单介绍,可以参考维基百科,websocket协议,在rfc6455中定义.
websocketpp对websocket和简单的http都进行了比较好的封装,只要实现几个handler,就可以完成对连接、消息等的操作和控制,主要需要处理的,可能有以下的handler,代码如下:
- typedef lib::function<void(connection_hdl)> open_handler;
- typedef lib::function<void(connection_hdl)> close_handler;
- typedef lib::function<void(connection_hdl)> http_handler;
- typedef lib::function<void(connection_hdl,message_ptr)> message_handler
分别处理连接创建,连接关闭,http请求和消息请求,其中connection_hdl是连接的weak_ptr.
这里对websocket使用很简单,唯一的需求,就是维护已经建立的连接(既创建连接的时候记录,关闭连接的时候移出),然后通过将自己的回调注册到文件监控类中,实时的将消息推送到websocket客户端。
首先,维护一个set,用来保存当前已经建立的所有连接:
- typedef std::set<websocketpp::connection_hdl,boost::owner_less<websocketpp::connection_hdl> > ConnectionSet;
然后在建立连接的时候插入到这个set中,代码如下:
- void WebSocketServer::onOpen ( websocketpp::connection_hdl hdl )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- _conns.insert(hdl);
- }
在连接关闭的时候,移除连接,代码如下:
- void WebSocketServer::onClose ( websocketpp::connection_hdl hdl )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- _conns.erase(hdl);
- }
另外,定义一个让filewatcher调用的回调,代码如下:
- void WebSocketServer::write ( const std::string& content )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- BOOST_AUTO(it, _conns.begin());
- for(; it != _conns.end(); ++it) {
- _s.send(*it, content, websocketpp::frame::opcode::text);
- }
- }
这个回调很简单,就是当有读取到内容的时候,遍历连接集合,向每个连接发送具体的内容,最后,为了方便用户访问,当发现是标准http请求的时候,我们返回一个简单的html,用于显示和建立websocket连接,如果不指定具体的http_handler,websocketpp在发现请求是http的时候,会返回http的426错误,Upgrade Required,代码如下:
- void WebSocketServer::httpHandler ( websocketpp::connection_hdl hdl )
- {
- server::connection_ptr connPtr = _s.get_con_from_hdl(hdl);;
- connPtr->set_status(websocketpp::http::status_code::ok);
- connPtr->set_body(htmlContent);
- }
另外,还有一个小坑,按照websocketpp的example,在启动的时候都是直接调用server的listen函数,而且使用的都是只有端口号的那个实现,实际使用过程中,发现这个只有端口号的实现,直接使用了ipv6协议,虽说如果本机同时支持ipv6和ipv4的情况下,两个协议对应的端口都会监听,但是遇到了服务器上关闭了ipv6,会导致boost asio抛出address_family_not_supported异常,导致应用被迫退出,为了兼容这种方式,对这个异常进行了抓取,重新降级尝试ipv4协议,这样能够很好的在只有ipv4的服务器上进行使用,代码如下:
- try{
- _s.listen(port);
- } catch(boost::system::system_error const& e) {
- if(e.code() == boost::asio::error::address_family_not_supported) {
- _s.listen(boost::asio::ip::tcp::v4(), port);
- }
- } catch (...) {
- throw;
- }
新闻热点
疑难解答