安装java安装zookeeper1 部署配置2 配置说明21 myid文件和servermyid22 zoocfg23 log4jPRoperties24 zkEnvsh和zkServersh文件3 参数说明4 启动测试安装KAFKA1 部署配置2 启动服务器3 kafka测试3 日志说明supervisor管理1 管理zookeeper2 管理kafka开发
mkdir /usr/local/java cp jdk-8u20-linux-x64.tar.gz /usr/local/java tar zxvf jdk-8u20-linux-x64.tar.gz vim /etc/profile
JAVA_HOME=/usr/local/java/jdk1.8.0_20JRE_HOME=JAVA_HOME/jreCLASS_PATH=.:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$JAVA_HOME/lib/dt.jarPATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATHexport JAVA_HOME JRE_HOME PATH CLASS_PATHsource /etc/profile java -version
mkdir /usr/local/zookeeper-cluster cp zookeeper-3.5.2-alpha.tar.gz /usr/local/zookeeper-cluster/ tar zxvf zookeeper-3.5.2-alpha.tar.gz cd /usr/local/zookeeper-cluster/zookeeper-3.5.2-alpha mv conf/zoo_sample.cfg conf/zoo.cfg mkdir data mkdir datalog vim conf/zoo.cfg
clientPort=2181dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889mv zookeeper-3.5.2-alpha/ zookeeper-3.5.2-node1 cp -R zookeeper-3.5.2-node1 zookeeper-3.5.2-node2 cp -R zookeeper-3.5.2-node1 zookeeper-3.5.2-node3 node2 conf/zoo.cfg
clientPort=2182dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889node3 conf/zoo.cfg
clientPort=2183dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889写入 myid
#node1echo "1" > zookeeper-3.5.2-node1/data/myid#node2echo "2" > zookeeper-3.5.2-node2/data/myid#node3echo "3" > zookeeper-3.5.2-node3/data/myid在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
文件是zookeeper配置文件 在conf目录里。
文件是zk的日志输出文件在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。
zkServer.sh 主的管理程序文件 zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件
启动
zookeeper-3.5.2-node1/bin/zkServer.sh startzookeeper-3.5.2-node2/bin/zkServer.sh startzookeeper-3.5.2-node3/bin/zkServer.sh sta测试
[root@paasagento zookeeper-cluster]# zookeeper-3.5.2-node1/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: leader连接
zookeeper-3.5.2-node1/bin/zkCli.sh -server 127.0.0.1:2181ls /mkdir /usr/local/kafka cp kafka_2.11-0.10.1.1.tgz /usr/local/kafka/ tar zxvf kafka_2.11-0.10.1.1.tgz cd kafka_2.11-0.10.1.1 vim config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况port=9092 #当前kafka对外提供服务的端口默认是9092host.name=192.168.1.172 #broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置num.network.threads=3 #broker处理消息的最大线程数,一般情况下数量为cpu核数num.io.threads=8 #broker处理磁盘IO的线程数,数值为cpu核数2倍socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小log.dirs=/tmp/kafka-logs_1 #kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2num.partitions=3 #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖num.recovery.threads.per.data.dir=1 #用于在启动时,用于日志恢复的线程个数,默认是1.log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天log.segment.bytes=1073741824 #topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000 #ZooKeeper的连接超时时间mv config/server.properties config/server1.properties cp -R config/server1.properties config/server2.properties cp -R config/server1.properties config/server3.properties kafka2
broker.id=2port=9093host.name=192.168.1.172num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logs_2num.partitions=3num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000kafka3
broker.id=3port=9094host.name=192.168.1.172num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logs_2num.partitions=3num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties & bin/kafka-server-start.sh config/server3.properties &
[root@paasagento kafka_1]# jobs[1] 运行中 bin/kafka-server-start.sh config/server1.properties &[2]- 运行中 bin/kafka-server-start.sh config/server2.properties &[3]+ 运行中 bin/kafka-server-start.sh config/server3.properties &bin/zkCli.sh -server 192.168.1.172:2182[zk: 192.168.1.172:2182(CONNECTED) 8] ls /[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, zookeeper][zk: 192.168.1.172:2182(CONNECTED) 5] get /brokers/ids/1{"jmx_port":-1,"timestamp":"1484654956028","endpoints":["PLAINTEXT://192.168.1.172:9092"],"host":"192.168.1.172","version":3,"port":9092}[zk: 192.168.1.172:2182(CONNECTED) 6] get /brokers/ids/2{"jmx_port":-1,"timestamp":"1484655055260","endpoints":["PLAINTEXT://192.168.1.172:9093"],"host":"192.168.1.172","version":3,"port":9093}[zk: 192.168.1.172:2182(CONNECTED) 7] get /brokers/ids/3{"jmx_port":-1,"timestamp":"1484655071043","endpoints":["PLAINTEXT://192.168.1.172:9094"],"host":"192.168.1.172","version":3,"port":9094产生topic,3个分片,3个副本 bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 3 –topic test_topic
[root@paasagento kafka_1]# bin/kafka-topics.sh --list --zookeeper localhost:2181test_topic[root@paasagento kafka_1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs: Topic: test_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2发布消息 bin/kafka-console-producer.sh –broker-list 192.168.1.172:9092,192.168.1.172:9093,192.168.172:9094 –topic test_topic 消费消息 bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test_topic zookeeper查看topic bin/zkCli.sh -server 192.168.1.172:2182
[zk: 192.168.1.172:2182(CONNECTED) 2] get /brokers/topics/test_topic{"version":1,"partitions":{"2":[3,1,2],"1":[2,3,1],"0":[1,2,3]}}server.log kafka的运行日志 state-change.log kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里 controller.log kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
vim bin/zkE 在最上面增加环境路径
JAVA_HOME=/usr/local/java/jdk1.8.0_20export JAVA_HOMEvim /etc/supervisor/zookeeper.conf
[program:zookeeper]command=/usr/local/zookeeper-3.5.2-alpha/bin/zkServer.sh start-foregroundautostart=trueautorestart=truestartsecs=10stdout_logfile=/var/log/zookeeper.logstdout_logfile_maxbytes=1MBstdout_logfile_backups=10stdout_capture_maxbytes=1MBstderr_logfile=/var/log/zookeeper.logstderr_logfile_maxbytes=1MBstderr_logfile_backups=10stderr_capture_maxbytes=1MBsupervisorctl reload bin/zkServer.sh status
vim bin/kafka-run-class.sh 在最上面增加环境路径
JAVA_HOME=/usr/local/java/jdk1.8.0_20export JAVA_HOMEvim /etc/supervisor/kafka.conf
[program:kafka]command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesuser=rootautostart=trueautorestart=truestartsecs=10stdout_logfile=/var/log/kafka.logstdout_logfile_maxbytes=1MBstdout_logfile_backups=10stdout_capture_maxbytes=1MBstderr_logfile=/var/log/kafka.logstderr_logfile_maxbytes=1MBstderr_logfile_backups=10stderr_capture_maxbytes=1MBsupervisorctl reload
静待第二章的py开发和第三章的c#开发
新闻热点
疑难解答