单节点:
1、解压kafka压缩包到安装目录(自己指定);
2、进入kafka目录并执行命令
> bin/zookeeper-server-start.sh config/zookeeper.PRoperties
#如果报错,修改kafka-run-class.sh,将 -XX:+UseCompressedOops 删除;
3、执行命令> bin/kafka-start.sh config/server.properties 1 >/dev/null 2>&1 &
4、创建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
5、发送消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
6、接收消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
集群搭建:
hdp0 hdp1 hdp2 hdp3
zookeeper配置在0 1 2上,并已启动。
1、在hdp0上修改配置文件
> vi config/server.properties:
broker.id=1
zookeeper.connect=hdp0:2181,hdp1:2181,hdp2:2181
(注意:log.dir=/tmp/kafka-logs #生产环境中不用tmp目录)
2、将kafka目录传到1 2 3上
3、修改各自的broker.id
4、在4台主机,kafka目录下分别执行命令> bin/kafka-start.sh config/server.properties &
5、创建topic
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic1
> bin/kafka-topics.sh --list --zookeeper hdp0:2181
6、发送消息
hdp1主机:
> bin/kafka-console-producer.sh --broker-list hdp0:9092 --topic mytopic
7、接受消息
hdp0主机:
> bin/kafka-console-consumer.sh --zookeeper hdp1:2181 --from-beginning --topic mytopic
8、查看topic的状态信息
> bin/kafka-topics.sh --describe --zookeeper hdp0:2181 --topic mytopic
新闻热点
疑难解答