在正常情况下,Kafka 中的每个 Topic 都会有很多个分区, 每个分区又会存在多个副本 在这些副本中, 存在一个 leader 分区, 而剩下的分区叫做 follower, 所有对分区的读写操作都是对 leader 分区进行的 所以当我们向 Kafka 写消息或者从 Kafka 读取消息的时候, 必须先找到对应分区的 Lea der 及其所在的 Broker 地址, 这样才可以进行后续的操作 本文将要介绍的就是 Kafka 是如何找到 leader 分区的 我们知道, Kafka 是使用 Scala 语言编写的, 但是其支持很多语言的客户端, 包括 :C/C++ PHP G o 以及 Ruby 等等 ( 参见 https://cwiki.apache.org/confluence/display/kafka/clients ) 这是为什么呢? 这是因为 Kafka 内部实现了一套基于 TCP 层的协议, 只要使用这种协议与 Kaf ka 进行通信, 就可以使用很多语言来操作 Kafka 目前 Kafka 内部支持多达 30 多种协议, 本文介绍的 Kafka 客户端是如何找到 leader 分区就涉及到 Kafka 内部的 Metadata 协议 Metadata 协议主要解决以下四种问题 : Kafka 中存在哪些主题? 每个主题有几个分区? Leader 分区所在的 broker 地址及端口? 每个 broker 的地址及端口是多少? 客户端只需要构造相应的请求, 并发送到 Broker 端, 即可获取到上面四个问题的答案 整个过程如下 : 客户端构造相应的请求客户端将请求发送到 Broker 端 Broker 端接收到请求处理, 并将结果发送到客户端 Metadata 请求协议 (v0-v3 版本 ) 如下 : TopicMetadataRequest => [TopicNames] TopicNames => string 客户端只需要构造一个 TopicMetadataRequest, 里面包括我们需要查询主题的名字 (TopicNam es); 当然, 我们可以一次查询多个主题, 只需要将这些主题放进 List 里面即可 同时, 我们还可以不传入任何主题的名字, 这时候 Kafka 将会把内部所有的主题相关的信息发送给客户端 1 / 7
目前 Metadata 请求协议存在五个版本,v0-v3 版本格式一致 但是这些协议存在一个问题 : 当 Kafka 服务器端将 auto.create.topics.enable 参数设置为 ture 时, 如果我们查询的主题不存在,Kafka 将会自动创建这个主题, 这很可能不是我们想要的结果 所以, 基于这个问题, 到了 Metadata 请求协议第五版, 格式已经变化了, 如下 : Metadata Request (Version: 4) => [TopicNames] allow_auto_topic_creation TopicNames => STRING allow_auto_topic_creation => BOOLEAN 我们可以指定 allow_auto_topic_creation 参数来告诉 Kafka 是否需要在主题不存在的时候创建, 这时候控制权就在我们了 Kafka 的 Broker 收到客户端的请求处理完之后, 会构造一个 TopicMetadataResponse, 并发送给客户端 TopicMetadataResponse 协议的格式如下 : MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32] 可以看到, 相应协议里面包含了每个分区的 Leader Replicas 以及 Isr 信息, 同时还包括了 Kafka 集群所有 Broker 的信息 如果处理出现了问题, 会出现相应的错误信息码, 主要包括下面几个 : UnknownTopic (3) LeaderNotAvailable (5) InvalidTopic (17) TopicAuthorizationFailed (29) 2 / 7
而且,Metadata 协议是目前唯一一个可以向任何 Broker 发送的协议 因为任何一个 Broker 在启动之后会存储这些 Metadata 信息的 而且,Kafka 提供的客户端在获取到 Metadata 信息之后也会将它存储到内存中的 并且在以下几种情况会更新已经缓存下来的 Metadata 信息 : 在 meta data.max.age.ms 参数配置的时间过期之后 ; 在往 Kafka 发送请求是收到 Not a Leader 异常 以上两种情况 Kafka 提供的客户端会自动再发送一次 Metadata 请求, 这样就可以获取到更新的信息 整个过程如下 : 如果想及时了解 Spark Hadoop 或者 Hbase 相关的文章, 欢迎关注微信公共帐号 :iteblog_hadoop 好了, 说了半天的, 我们来看看程序里面如何构造 TopicMetadataRequest 以及处理 TopicMetadataResponse package com.iteblog.kafka import kafka.api.topicmetadatarequest._ import kafka.api.{topicmetadatarequest, TopicMetadataResponse import kafka.consumer.simpleconsumer 3 / 7
///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 2017 年 07 月 28 日 Time: 22:12:43 bolg: https://www.iteblog.com 本文地址 :https://www.iteblog.com/archives/2215 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货过往记忆博客微信公共帐号 :iteblog_hadoop ///////////////////////////////////////////////////////////////////// object MetaDataDemo { def main(args: Array[String]): Unit = { val consumer = new SimpleConsumer("1.iteblog.com", 9092, 50, 1024 * 4, DefaultClientId) val req: TopicMetadataRequest = new TopicMetadataRequest(CurrentVersion, 0, DefaultClie ntid, List("iteblog_hadoop")) val resp: TopicMetadataResponse = consumer.send(req) println("broker Infos:") println(resp.brokers.mkstring("\n\t")) val metadata = resp.topicsmetadata metadata.foreach { topicmetadata => val partitionsmetadata = topicmetadata.partitionsmetadata partitionsmetadata.foreach { partitionmetadata => println(s"partitionid=${partitionmetadata.partitionid\n\tleader=${partitionmetadata.l eader" + s"\n\tisr=${partitionmetadata.isr\n\treplicas=${partitionmetadata.replicas") TopicMetadataRequest 是通过 SimpleConsumer 的 send 方法发送的, 其返回的是 TopicMetadataResponse, 其中就包含了我们需要的信息 运行上面的程序输出如下 : Broker Infos: id:5,host:5.iteblog.com,port:9092 id:1,host:1.iteblog.com,port:9092 id:6,host:6.iteblog.com,port:9092 id:2,host:2.iteblog.com,port:9092 id:7,host:7.iteblog.com,port:9092 id:3,host:3.iteblog.com,port:9092 id:8,host:8.iteblog.com,port:9092 4 / 7
id:4,host:4.iteblog.com,port:9092 partitionid=0 isr=vector(id:1,host:1.iteblog.com,port:9092) replicas=vector(id:1,host:1.iteblog.com,port:9092, id:8,host:8.iteblog.com,port:9092) partitionid=1 leader=some(id:2,host:2.iteblog.com,port:9092) isr=vector(id:2,host:2.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) replicas=vector(id:2,host:2.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) partitionid=2 leader=some(id:3,host:3.iteblog.com,port:9092) isr=vector(id:3,host:3.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) replicas=vector(id:3,host:3.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) partitionid=3 leader=some(id:4,host:4.iteblog.com,port:9092) isr=vector(id:4,host:4.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) replicas=vector(id:4,host:4.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) partitionid=4 leader=some(id:5,host:5.iteblog.com,port:9092) isr=vector(id:5,host:5.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) replicas=vector(id:5,host:5.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) partitionid=5 leader=some(id:6,host:6.iteblog.com,port:9092) isr=vector(id:6,host:6.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) replicas=vector(id:6,host:6.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) partitionid=6 leader=some(id:7,host:7.iteblog.com,port:9092) isr=vector(id:6,host:6.iteblog.com,port:9092, id:7,host:7.iteblog.com,port:9092) replicas=vector(id:7,host:7.iteblog.com,port:9092, id:6,host:6.iteblog.com,port:9092) partitionid=7 leader=some(id:8,host:8.iteblog.com,port:9092) isr=vector(id:8,host:8.iteblog.com,port:9092) replicas=vector(id:8,host:8.iteblog.com,port:9092, id:7,host:7.iteblog.com,port:9092) partitionid=8 isr=vector(id:2,host:2.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) replicas=vector(id:1,host:1.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) partitionid=9 leader=some(id:2,host:2.iteblog.com,port:9092) isr=vector(id:3,host:3.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) replicas=vector(id:2,host:2.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) partitionid=10 leader=some(id:3,host:3.iteblog.com,port:9092) isr=vector(id:4,host:4.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) replicas=vector(id:3,host:3.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) partitionid=11 5 / 7
leader=some(id:6,host:6.iteblog.com,port:9092) isr=vector(id:6,host:6.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) replicas=vector(id:6,host:6.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) partitionid=12 leader=some(id:7,host:7.iteblog.com,port:9092) isr=vector(id:7,host:7.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) replicas=vector(id:7,host:7.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) partitionid=13 leader=some(id:8,host:8.iteblog.com,port:9092) isr=vector(id:8,host:8.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) replicas=vector(id:8,host:8.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) partitionid=14 isr=vector(id:1,host:1.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) replicas=vector(id:1,host:1.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) partitionid=15 leader=some(id:2,host:2.iteblog.com,port:9092) isr=vector(id:2,host:2.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) replicas=vector(id:2,host:2.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) partitionid=16 leader=some(id:3,host:3.iteblog.com,port:9092) isr=vector(id:3,host:3.iteblog.com,port:9092, id:7,host:7.iteblog.com,port:9092) replicas=vector(id:3,host:3.iteblog.com,port:9092, id:7,host:7.iteblog.com,port:9092) partitionid=17 leader=some(id:4,host:4.iteblog.com,port:9092) isr=vector(id:4,host:4.iteblog.com,port:9092, id:8,host:8.iteblog.com,port:9092) replicas=vector(id:4,host:4.iteblog.com,port:9092, id:8,host:8.iteblog.com,port:9092) partitionid=18 leader=some(id:5,host:5.iteblog.com,port:9092) isr=vector(id:5,host:5.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) replicas=vector(id:5,host:5.iteblog.com,port:9092, id:1,host:1.iteblog.com,port:9092) partitionid=19 leader=some(id:6,host:6.iteblog.com,port:9092) isr=vector(id:6,host:6.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) replicas=vector(id:6,host:6.iteblog.com,port:9092, id:2,host:2.iteblog.com,port:9092) partitionid=20 leader=some(id:7,host:7.iteblog.com,port:9092) isr=vector(id:7,host:7.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) replicas=vector(id:7,host:7.iteblog.com,port:9092, id:3,host:3.iteblog.com,port:9092) partitionid=21 leader=some(id:8,host:8.iteblog.com,port:9092) isr=vector(id:8,host:8.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) replicas=vector(id:8,host:8.iteblog.com,port:9092, id:4,host:4.iteblog.com,port:9092) partitionid=22 isr=vector(id:1,host:1.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) 6 / 7
Powered by TCPDF (www.tcpdf.org) replicas=vector(id:1,host:1.iteblog.com,port:9092, id:5,host:5.iteblog.com,port:9092) 上面的输出就可以看到各个分区的 leader 所在机器 isr 以及所有 replicas 等信息 有一点我们需要注意, 因为目前存在多个版本的 Metadata 请求协议, 我们可以使用低版本的协议与高版本的 Kafka 集群进行通信, 因为高版本的 Kafka 能够支持低版本的 Metadata 请求协议 ; 但是我们不能使用高版本的 Metadata 请求协议与低版本的 Kafka 通信 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 7 / 7