##本例以Kafka-sacle-2.1.3-2.8.0版本为参照 #1.kafka启动服务 ./bin/kafka-server-start.sh ./config/server.properties --注: 生产环境常使用"bin/kafka-server-start.sh config/server.properties > ${KAFKA_HOME}/kafka.log 2>&1 &" #2.kafka停止服务 ./bin/kafka-server-stop.sh #3.kafka查看所有主题 ./bin/kafka-topics.sh --list --zookeeper localhost:9092 --注: 若是使用2.8版本以上未启动zookeeper,则将 "--zookeeper" 替换为 "--bootstrap-server"等; 若配置文件中明确声明了IP地址,则localhost需替换成相应的IP地址; #4.查看所有主题信息 ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --注: 若是使用2.8版本以上未启动zookeeper,则将 "--zookeeper" 替换为 "--bootstrap-server"等;且"localhost:2181"需替换成"localhost:9092" #5.查看指定主题详细信息 ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic foo --注: 若是使用2.8版本以上未启动zookeeper,则将 "--zookeeper" 替换为 "--bootstrap-server"等;且"localhost:2181"需替换成"localhost:9092" #6.创建kafka主题 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --注: --partitions 1 代表1个分区, --replication-factor 1 代表一个每个分区1个副本 若是使用2.8版本以上未启动zookeeper,则将 "--zookeeper" 替换为 "--bootstrap-server"等;且"localhost:2181"需替换成"localhost:9092" #7.删除kafka主题 ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test --注: 若是使用2.8版本以上未启动zookeeper,则将 "--zookeeper" 替换为 "--bootstrap-server"等;且"localhost:2181"需替换成"localhost:9092" #8.查看某个topic对应的消息数量 #为topic增加partition ./bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name --partitions 40 --注: 添加分区需谨慎,会打乱hash(key)%number_of_partitions; Kafka 目前不支持减少主题的分区数量; 为topic增加副本(未验证) ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute #9.测试kafka消息生产者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --注: 若有加密并以设置,需要在命令行末位添加"--producer.config config/producer.properties" #10.测试Kafka消息消费者 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --注: 若有加密并以设置,需要在命令行末位添加"--consumer.config config/consumer.properties" 若要从头开始读取/消费主题消息,则需要添加"--from-beginning" 若要从尾部开始消费消息(不按zookeeper记录),则需要添加"--offset latest --partition 0","--partition 0"指定消费分区,"--max-messages 2" #11.显示所有消费者(组) ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --注: 若有加密并以设置,需要在命令行末位添加"--command-config config/consumer.properties" #12.获取正在消费的topic的group的offset ./bin/kafka-consumer-groups.sh --describe --group group_alarm_test --bootstrap-server localhost:9092 --注: 若有加密并以设置,需要在命令行末位添加"--command-config config/consumer.properties" #13.查看对应主题topic的消息数量 --暂未更新-- #14.