在某些情况下, 我们可能会在 Spring 中将一些 WEB 上的信息发送到 Kafka 中, 这时候我们就需要在 Spring 中编写 Producer 相关的代码了 ; 不过高兴的是,Spring 本身提供了操作 Kafka 的相关类库, 我们可以直接通过 xml 文件配置然后直接在后端的代码中使用 Kafka, 非常地方便 本文将介绍如果在 Spring 中将消息发送到 Kafka 在这之前, 请将下面的依赖加入到你的 pom.xml 文件中 : <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> <version>4.1.0.release</version> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-kafka</artifactid> <version>1.0.0.m2</version> </dependency> 在 Spring 中将消息发送到 Kafka 需要我们定义一个配置文件,Spring 为我们提供了 Outbound Channel Adapter, 其主要将消息从 Spring 框架发送到 Kafka, 我们需要在 xml 文件中配置 intkafka:outbound-channel-adapter 标签, 这样可以使得 Spring 可以抽取到消息的 Key 目标主题 分区等信息 本文将介绍如何把用户注册的信息发送到 Kafka 这里用到的配置文件如下 : <int:publish-subscribe-channel id="inputtokafka"/> <int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaproducercontext" auto-startup="true" channel="inputtokafka" order="1"> </int-kafka:outbound-channel-adapter> 1 / 6
<int-kafka:producer-context id="kafkaproducercontext" producer-properties="producerproperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="www.iteblog.com:9092" key-class-type="java.lang.string" value-class-type="com.iteblog.dao.user" topic="user" compression-codec="none"/> </int-kafka:producer-configurations> </int-kafka:producer-context> <bean id="producerproperties" class="org.springframework.beans.factory.config.propertiesfactorybean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="send.buffer.bytes">5242880</prop> </props> </property> </bean> int:publish-subscribe-channel 标签定义了消息发送的通道 ;int-kafka:producer-context 标签里面可以定义 producer 的 context, 在里面我们可以设置 broker 的地址,key 和 value 的类型, 需要发送消息的目标 Topic 等相关属性 ; 我们可以在自定义的 bean 中定义 Kafka Producer 的相关属性, 本文对应的是 producerproperties, 这里我们定义了 topic.metadata.refresh.interval.ms 等相关属性, 更多的属性可以参见 Kafka 的官方文档 然后可以在 int-int-kafka:producer-context 标签通过 producer-properties 来引用 配置文件设置好之后, 我们就可以编写 Java 代码了 : package com.iteblog.controller; import com.iteblog.dao.user; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.messagebuilder; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.responsebody; 2 / 6
/** * Created by https://www.iteblog.com on 2016/10/21. */ @Controller public class IteblogController { @Autowired @Qualifier("inputToKafka") MessageChannel channel; private static final Log logger = LogFactory.getLog(IteblogController.class); @RequestMapping(method = RequestMethod.POST, value = "/register") @ResponseBody public User registerjson(user user) { logger.warn("user: " + user); channel.send(messagebuilder.withpayload(user).build()); return user; withpayload 接受的就是消息的内容 ;MessageChannel 就是发送消息的通道, 所有的消息都是通过这个通道发送到 Kafka;User 类的代码如下 : package com.iteblog.dao; /** * Created by https://www.iteblog.com on 2016/10/21. */ public class User { private String username; private String email; public User() { public User(String username, String email) { this.username = username; this.email = email; public String getusername() { 3 / 6
return username; public void setusername(string username) { this.username = username; public String getemail() { return email; public void setemail(string email) { this.email = email; @Override public String tostring() { return "User{" + "username='" + username + '\'' + ", email='" + email + '\'' + ''; 在发送消息的时候, 我们还可以设置消息的 Key, 以及需要发送的 Topic, 如下 : channel.send(messagebuilder.withpayload(user).setheader("topic", "user").setheader("messagekey", user.getusername()).build()); 这里通过设置 topic messagekey 等 Header 信息, 来分别制定目标主题的名称 ( 当然, 如果只有一个主题我们不需要手动指定,Spring 会自定选择配置文件里面指定的 Topic; 如果有多个需要手动指定 ) 和 key 的值 我们还可以设置消息的 Key 和 Value 编码格式, 如下 : <bean id="userencoder" class="org.springframework.integration.kafka.serializer.avro.avroreflectdatumbackedkafk aencoder"> <constructor-arg value="com.iteblog.dao.user"></constructor> </bean> 4 / 6
<bean id="keyencoder" class="org.springframework.integration.kafka.serializer.avro.avroreflectdatumbackedkafk aencoder"> <constructor-arg value="java.lang.string"></constructor> </bean> 然后在配置文件里面配置 : <int-kafka:producer-context id="kafkaproducercontext" producer-properties="producerproperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="www.iteblog.com:9092" key-class-type="java.lang.string" value-class-type="com.iteblog.dao.user" value-encoder="userencoder" key-encoder="keyencoder" topic="user" compression-codec="none"/> </int-kafka:producer-configurations> </int-kafka:producer-context> 如果我们得 Topic 有多个分区, 我们还可以指定每条消息的分区 ID 计算规则, 如下 : <bean id="iteblogpartitioner" class="org.springframework.integration.kafka.support.defaultpa rtitioner"/> 我们使用了 Spring 默认分区类, 也就是计算 Key 的 hashcode 再对分区数求模 (Utils.abs(key.hashC ode()) % numpartitions), 然后我们可以在 int-kafka:producer-configuration 里面加上以下配置 partitioner="iteblogpartitioner" 到这里, 我们就可以在 Tomcat 中启动上面的 Web 工程, 然后访问 https://www.iteblog.com/regist er 并传入 username 和 email 参数即可将消息发送到 Kafka, 完整的工程目录结构如下 :. 5 / 6
Powered by TCPDF (www.tcpdf.org) pom.xml src main java com iteblog controller IteblogController.java dao User.java resources applicationcontext.xml kafka-outbound-context.xml log4j.properties webapp index.jsp register.html WEB-INF web.xml 10 directories, 9 files 我将在后面的文章介绍如何在 Spring 中接收 Kafka 的消息, 敬请关注 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 6 / 6