http://m.blog.csdn.net/article/details?id=51168573
发表于2016/4/16 16:06:45 1310人阅读
分类: Kafka Lua Nginx
最近在做点监控系统,先后采用了两套方案:方案一:Nginx记录日志 --> tail语句扫描日志到Kafka --> java站点消费Kafka消息后存储到MySQL数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现方案一遇到的问题:1.1 面对海量数据,日志文件增长很快,磁盘占用大1.2 Java站点消费Kafka消息后存储到Mysql数据库 太慢为了解决上述两个问题,采用方案二:方案二:Nginx+Lua直接向Kafa发送数据 --> Java站点消费Kafka消息后存储到Mysql数据库 --> Mysql数据库 --> 数据库定时汇聚数据 --> 界面呈现2.1 Nginx+Lua直接向Kafa发送数据: Nginx不记录日志,直接发送到Kafka,不占用磁盘空间2.2 Java站点消费Kafka消息后存储到Mysql数据库: 合并更新操作,如对同一个sessionid的更新操作有100条,合并后只有一条更新语句方案二遇到的问题:2.3 Nginx可以直接用完整的OpenResty,也可以在现有的Nginx基础上添加插件, 我采用的是在现有的Nginx基础上添加插件方案一具体如下:Nginx1.9.9kafka_2.11-0.9.0.11.1、Nginx配置文件里的配置:log_format ht-video '$request|$msec|$http_x_forwarded_for|$http_user_agent|$request_body';server{ server_name lc.zhihuishu.com; location / { root /usr/local/nginxnew/html/view; index index.html; } location /ht{ #root html; #index index.html index.htm; if ( $request ~ "GET" ) { #access_log logs/ht-video.log ht-video; access_log /htlog/ht-video.log ht-video; } PRoxy_set_header Host $http_host; proxy_pass http://localhost:6099; client_max_body_size 8000M; #error_page 405 =200 $1; } } server { listen 6099; server_name localhost; location /ht { root /usr/local/nginxnew/html/hightower; index index.html index.htm; client_max_body_size 8000M; error_page 405 =200 $1; }1.2、修改本机Hosts文件vi /etc/hosts10.252.126.242 kafka1.livecourse.com10.252.126.242 zookeeper1.livecourse.com1.3、修改Kafka文件vi /usr/local/kafka_2.11-0.9.0.1/config/server.properties 这个文件里要改下面5点:broker.id=0port=9092host.name=10.252.126.242log.dirs=/able4/kafka-logszookeeper.connect=zookeeper1.livecourse.com:2181vi /usr/local/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh 添加两行:export JAVA_HOME=/usr/local/java/jdk1.7.0_79export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre1.4、启动zookeeper和kafka/usr/local/kafka_2.11-0.9.0.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/zookeeper.properties/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties1.5、扫描Nginx日志发送到kafkatail -n 0 -f /usr/local/nginxnew/logs/ht-video.log | /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1.livecourse.com:9092 --topic HT-VIDEO方案二具体如下:Nginx1.9.9LuaJIT-2.0.4lua-nginx-module-0.10.2ngx_devel_kit-0.3.0rc1lua-resty-kafkakafka_2.11-0.9.0.1参考的这几篇文章方成此方案Nginx与Lua
基于Lua+Kafka+Heka的Nginx Log实时监控系统
Lua如何读取nginx内置对象和方法
Kafka官方文档
nginx+lua+kafka实现日志统一收集汇总
1 安装LuaJIT下载http://luajit.org/download.htmlhttp://luajit.org/install.html 命令如下:tar zxf LuaJIT-2.0.4.tar.gzcd LuaJIT-2.0.4make PREFIX=/usr/local/LuaJITmake install PREFIX=/usr/local/LuaJITecho "/usr/local/LuaJIT/lib" > /etc/ld.so.conf.d/usr_local_luajit_lib.confldconfig#注意环境变量!export LUAJIT_LIB=/usr/local/LuaJIT/libexport LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.02 安装lua-nginx-modulehttps://github.com/openresty/lua-nginx-module/tagscd /usr/localtar zxvf lua-nginx-module-0.10.2.tar.gz3 安装ngx_devel_kit
https://github.com/simpl/ngx_devel_kit/tags
http://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtmlcd /usr/localtar zxvf ngx_devel_kit-0.3.0rc1.tar.gz 4 安装编译Nginxhttp://17173ops.com/2013/11/01/17173-ngx-lua-manual.shtml给Nginx添加下面的参数,如果已经安装了Nginx则用 nginx -V --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc1如:cd /usr/local/nginx-1.9.9./configure --prefix=/usr/local/nginxnew/ --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module --add-module=/root/install/ngx_log_if-master --add-module=/usr/local/lua-nginx-module-0.10.2 --add-module=/usr/local/ngx_devel_kit-0.3.0rc15 lua插件lua-resty-kafkahttps://github.com/doujiang24/lua-resty-kafkamkdir /usr/local/lua上传lua-cjson-2.1.0.3.tar.gz到/usr/local/lua上传lua-resty-kafka到/usr/local/lua这里遇到些问题,我改写了其中的client.lua文件的两个方法:方法1:function _M.new(self, broker_list, client_config) local opts = client_config or {} local socket_config = { socket_timeout = opts.socket_timeout or 3000, keepalive_timeout = opts.keepalive_timeout or 600 * 1000, -- 10 min keepalive_size = opts.keepalive_size or 2,socket_config } --start 0 == wangsihong 2016-4-16 local broker_list_host_ip = opts.broker_list_host_ip or {} local cli = setmetatable({ broker_list = broker_list,broker_list_host_ip = broker_list_host_ip, topic_partitions = {}, brokers = {}, client_id = "worker" .. pid(), socket_config = socket_config, }, mt) --end 0 == wangsihong 2016-4-16 if opts.refresh_interval then meta_refresh(nil, cli, opts.refresh_interval / 1000) -- in ms end return cliend方法2:function _M.choose_broker(self, topic, partition_id) local brokers, partitions = self:fetch_metadata(topic) if not brokers then return nil, partitions end local partition = partitions[partition_id] if not partition then return nil, "not found partition" end local config = brokers[partition.leader] if not config then return nil, "not found broker" end --start 1 == wangsihong 2016-4-16 local broker_list_host_ip = self.broker_list_host_ip for k = 1, #broker_list_host_ip do local hostip = broker_list_host_ip[k] if config ~= nil and hostip ~= nil and config.host == hostip.host then config.host = broker_list_host_ip[k].ip end end --end 1 == wangsihong 2016-4-16 return configend6 lua插件cjsonhttp://www.kyne.com.au/~mark/software/lua-cjson-manual.htmlcd /usr/local/luatar zxvf lua-cjson-2.1.0.3.tar.gz 7 安装Nginx1.9.9好后修改nginx.confhttp{# lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;/usr/local/lua/lua-cjson-2.1.0.3/lua/?.lua;;";# lua_package_cpath '/usr/local/LuaJIT/lib/lua/5.1/?.so;'; lua_package_path "/usr/local/lua/lua-resty-kafka/lib/?.lua;;"; server{#为了方便调试,关闭了lua_code_cache,如果是生产环境,应该开启它。lua_code_cache off;listen 80;server_name localhost;location = /lua-v { content_by_lua ' ngx.header.content_type = "text/plain"; if jit then ngx.say(jit.version) else ngx.say(_VERSION) end '; }location /ht3/video/p { content_by_lua ' ngx.header.content_type = "text/plain"; --local cjson = require "cjson" --local client = require "resty.kafka.client" local producer = require "resty.kafka.producer" local broker_list = { { host = "10.252.126.242", port = 9092 }, { host = "10.252.126.242", port = 9093 }, { host = "10.252.126.242", port = 9094 }, { host = "10.252.126.242", port = 9095 }, { host = "10.252.126.242", port = 9096 }, }local broker_list_host_ip = { { host = "kafka1.livecourse.com", ip = "10.252.126.242" },} local key = "key" --local message = $request|$msec|$remote_addrx|$http_user_agent|$request_body local myIP = ngx.req.get_headers()["X-Real-IP"] if myIP == nil then myIP = ngx.req.get_headers()["x_forwarded_for"] end if myIP == nil then myIP = ngx.var.remote_addr end local h = ngx.req.get_headers()local message = ngx.req.get_method() .. " " .. ngx.var.uriif ngx.var.args ~= nil then message = message .. "?" .. ngx.var.args .. "|"endmessage = message .. ngx.now() .. "|"message = message .. myIP .. "|"message = message .. h["user-agent"] .. "|"local bodyData = ngx.req.get_body_data()if bodyData == nil thenbodyData = "-"endmessage = message .. bodyData -- 定义kafka异步生产者 -- 发送日志消息,send第二个参数key,用于kafka路由控制: -- key为nill(空)时,一段时间向同一partition写入数据 -- 指定key,按照key的hash写入到对应的partition-- 指定key,按照key的hash写入到对应的partition
local key = nil
-- this is async producer_type and bp will be reused in the whole nginx worker local bp = producer:new(broker_list, { producer_type = "async" , broker_list_host_ip = broker_list_host_ip,refresh_interval = 3000}) local ok, err = bp:send("HT-VIDEO", key, message) if not ok then --ngx.say("send err:", err) ngx.say("void(1);"); return endngx.say("void(0);"); --ngx.say("send success, ok:", ok) '; } }}8 完成请求Url:http://localhost/ht3/video/p?a=bbbb 就能将日志内容发送到kafka
新闻热点
疑难解答