在Spring中使用Kafka:Producer篇

Similar documents
使用MapReduce读取XML文件

使用Cassandra和Spark 2.0实现Rest API服务

通过Hive将数据写入到ElasticSearch

Hadoop&Spark解决二次排序问题(Hadoop篇)

三种方法实现Hadoop(MapReduce)全局排序(1)

Guava学习之Resources

使用Hive读取ElasticSearch中的数据

Spark读取Hbase中的数据

Apache CarbonData集群模式使用指南

Flume-ng与Mysql整合开发

Microsoft Word - Hibernate与Struts2和Spring组合指导.doc

Hive:用Java代码通过JDBC连接Hiveserver

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

使用Spark SQL读取Hive上的数据

如何在 Apache Hive 中解析 Json 数组

Kafka客户端是如何找到 leader 分区的

使用Apache Beam读写HDFS上的文件

Guava学习之CharSequenceReader

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

韶关:神奇丹霞

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下

新・解きながら学ぶJava

EJB-Programming-3.PDF

伊春:醉人林都

使用 XFire 与 Spring 开发 Web Service 2 实现功能与特点 基于 J2EE 平台的 Web Service 服务 开发方便, 配置简单 设计接口 实现服务 配置暴露接口 XFire 将自动生成对应的 wsdl 支持高级详细配置 与 Spring 无缝集成 运行环境 JDK

IoC容器和Dependency Injection模式.doc

nb.PDF

Flink快速上手之Scala API使用

Flink快速上手(QuickStart)

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

<!-- import outer proper

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

在Fedora上部署Hadoop2.2.0伪分布式平台

关林:武圣陵寝

泰山:五岳独尊

Flink on YARN部署快速入门指南

没 有 多 余 的 Contruol 或 Action 了 原 来 Domain 层 被 服 务 层 Service layer 遮 挡, 在 右 边 图 中, 则 Domain 层 直 接 暴 露 给 前 台 了, 没 有 被 遮 挡, 裸 露 了 这 样 一 步 到 位 实 现 领 域 模 型

国内26省市新能源汽车推广规划已出台

res/layout 目录下的 main.xml 源码 : <?xml version="1.0" encoding="utf 8"?> <TabHost android:layout_height="fill_parent" xml

前言 C# C# C# C C# C# C# C# C# microservices C# More Effective C# More Effective C# C# C# C# Effective C# 50 C# C# 7 Effective vii

北戴河:海阔天空

untitled

OOP with Java 通知 Project 4: 4 月 18 日晚 9 点 关于抄袭 没有分数

六种使用Linux命令发送带附件的邮件

使用Apache Spark将数据写入ElasticSearch

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

SparkR(R on Spark)编程指南

Hadoop元数据合并异常及解决方法


Ubuntu和CentOS如何配置SSH使得无密码登陆

西岭雪山滑雪场

Microsoft Word - ch04三校.doc

本章学习目标 小风 Java 实战系列教程 SpringMVC 简介 SpringMVC 的入门案例 SpringMVC 流程分析 配置注解映射器和适配器 注解的使用 使用不同方式的跳转页面 1. SpringMVC 简介 Spring web mvc

将 MD5 的工具类拷贝到项目中 二 微服务模块的搭建 我们将权限的查询放到一个单独的模块中, 这个模块提供接口供给消费者远程调用 (RPC), 这次范例是微服开发的雏形, 在以后你使用 springcloud 的时候会使用到今天的概念 1 使用 maven 创建新的模块 (microboot-sh

1 Framework.NET Framework Microsoft Windows.NET Framework.NET Framework NOTE.NET NET Framework.NET Framework 2.0 ( 3 ).NET Framework 2.0.NET F


EJB-Programming-4-cn.doc

「西醫基層總額支付委員會《第28次委員會議紀錄

案例分享产品文档

Spark 2.0介绍:在Spark SQL中定义查询优化规则

untitled

主程式 : public class Main3Activity extends AppCompatActivity { ListView listview; // 先整理資料來源,listitem.xml 需要傳入三種資料 : 圖片 狗狗名字 狗狗生日 // 狗狗圖片 int[] pic =new

( 总 第 1073 期 ) 浙 江 省 人 民 政 府 主 办 2015 年 3 月 17 日 出 版 省 政 府 令 省 政 府 文 件 目 录 浙 江 省 大 型 群 众 性 活 动 安 全 管 理 办 法 ( 浙 江 省 人 民 政 府 令 第 333 号 ) (3) 浙 江 省 人 民 政

使 用 Java 语 言 模 拟 保 险 箱 容 量 门 板 厚 度 箱 体 厚 度 属 性 锁 具 类 型 开 保 险 箱 关 保 险 箱 动 作 存 取 款

《大话设计模式》第一章

電機工程系認可證照清單 /7/1

大连软~1

江门:中国第一侨乡

Hadoop 2.2.0安装和配置lzo

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

1.5招募说明书(草案)

目 录 第 一 部 分 档 案 局 概 况 一 主 要 职 责 二 部 门 决 算 单 位 构 成 第 二 部 分 档 案 局 2016 年 度 部 门 预 算 表 一 2016 年 度 市 级 部 门 收 支 预 算 总 表 二 2016 年 度 市 级 部 门 支 出 预 算 表 三 2016

2015 年 度 收 入 支 出 决 算 总 表 单 位 名 称 : 北 京 市 朝 阳 区 卫 生 局 单 位 : 万 元 收 入 支 出 项 目 决 算 数 项 目 ( 按 功 能 分 类 ) 决 算 数 一 财 政 拨 款 一 一 般 公 共 服 务 支 出 二

エスポラージュ株式会社 住所 : 東京都江東区大島 東急ドエルアルス大島 HP: ******************* * 关于 Java 测试试题 ******

三种方法实现Hadoop(MapReduce)全局排序(2)

Hive几种数据导入方式

untitled

将 MySQL 的全量数据导入到 Apache Solr 中

基于ECO的UML模型驱动的数据库应用开发1.doc

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

<4D F736F F D A67EAF64BEC7BCFABEC7AAF7C2B2B3B95FA5FEB3A1AAA95F2D31312E31362E646F63>

得 依 法 召 集 股 東 臨 時 會 第 十 一 條 : 股 東 常 會 之 召 集 應 於 開 會 三 十 日 前, 股 東 臨 時 會 之 召 集 應 於 開 會 十 五 日 前, 將 開 會 日 期 地 點 及 召 集 事 由 通 知 各 股 東 並 公 告 之 第 十 二 條 : 本 公

同 時, 那 些 百 萬 富 翁 們 正 乘 坐 着 私 家 噴 射 機 駛 往 歐 洲, 甘 願 花 大 把 的 鈔 票 接 受 替 代 療 法 並 且 重 獲 了 健 康 替 代 療 法 總 是 很 靈 嗎? 不, 當 然 不 是 在 這 世 界 上 没 有 盡 善 盡 美 的 事 物 但 是

高校发展动态

杭师大党字〔2011〕15号中共杭州师范大学委员会关于进一步加强和改进发展党员工作的意见

<4D F736F F D B2C431A6B8A4A4A4DFA8C6B0C8B77CC4B3ACF6BFFD E646F63>

untitled

OOP with Java 通知 Project 3: 3 月 29 日晚 9 点 4 月 1 日上课

D getinitparameternames() 9 下 列 选 项 中, 属 于 Servlet API 中 提 供 的 request 对 象 的 包 装 类 的 是 ( ) A HttpServletRequestWrapper B HttpServletRequest C HttpServ

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

Microsoft Word - 扉页.doc

untitled

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

计算机网络与经济(二).doc

教 务 信 息 教 学 日 常 运 行 工 作 4 月 6 日, 教 务 部 高 丽 琴 老 师 参 加 南 昌 市 教 育 局 2016 年 中 小 学 教 师 资 格 认 定 工 作 培 训 会 2016 年 教 师 资 格 认 定 工 作 是 从 省 考 向 国 考 过 渡 后 的 第 一 次

Microsoft PowerPoint ?????????3 [Compatibility Mode]

Transcription:

在某些情况下, 我们可能会在 Spring 中将一些 WEB 上的信息发送到 Kafka 中, 这时候我们就需要在 Spring 中编写 Producer 相关的代码了 ; 不过高兴的是,Spring 本身提供了操作 Kafka 的相关类库, 我们可以直接通过 xml 文件配置然后直接在后端的代码中使用 Kafka, 非常地方便 本文将介绍如果在 Spring 中将消息发送到 Kafka 在这之前, 请将下面的依赖加入到你的 pom.xml 文件中 : <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-core</artifactid> <version>4.1.0.release</version> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-kafka</artifactid> <version>1.0.0.m2</version> </dependency> 在 Spring 中将消息发送到 Kafka 需要我们定义一个配置文件,Spring 为我们提供了 Outbound Channel Adapter, 其主要将消息从 Spring 框架发送到 Kafka, 我们需要在 xml 文件中配置 intkafka:outbound-channel-adapter 标签, 这样可以使得 Spring 可以抽取到消息的 Key 目标主题 分区等信息 本文将介绍如何把用户注册的信息发送到 Kafka 这里用到的配置文件如下 : <int:publish-subscribe-channel id="inputtokafka"/> <int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaproducercontext" auto-startup="true" channel="inputtokafka" order="1"> </int-kafka:outbound-channel-adapter> 1 / 6

<int-kafka:producer-context id="kafkaproducercontext" producer-properties="producerproperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="www.iteblog.com:9092" key-class-type="java.lang.string" value-class-type="com.iteblog.dao.user" topic="user" compression-codec="none"/> </int-kafka:producer-configurations> </int-kafka:producer-context> <bean id="producerproperties" class="org.springframework.beans.factory.config.propertiesfactorybean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="send.buffer.bytes">5242880</prop> </props> </property> </bean> int:publish-subscribe-channel 标签定义了消息发送的通道 ;int-kafka:producer-context 标签里面可以定义 producer 的 context, 在里面我们可以设置 broker 的地址,key 和 value 的类型, 需要发送消息的目标 Topic 等相关属性 ; 我们可以在自定义的 bean 中定义 Kafka Producer 的相关属性, 本文对应的是 producerproperties, 这里我们定义了 topic.metadata.refresh.interval.ms 等相关属性, 更多的属性可以参见 Kafka 的官方文档 然后可以在 int-int-kafka:producer-context 标签通过 producer-properties 来引用 配置文件设置好之后, 我们就可以编写 Java 代码了 : package com.iteblog.controller; import com.iteblog.dao.user; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.messagebuilder; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.responsebody; 2 / 6

/** * Created by https://www.iteblog.com on 2016/10/21. */ @Controller public class IteblogController { @Autowired @Qualifier("inputToKafka") MessageChannel channel; private static final Log logger = LogFactory.getLog(IteblogController.class); @RequestMapping(method = RequestMethod.POST, value = "/register") @ResponseBody public User registerjson(user user) { logger.warn("user: " + user); channel.send(messagebuilder.withpayload(user).build()); return user; withpayload 接受的就是消息的内容 ;MessageChannel 就是发送消息的通道, 所有的消息都是通过这个通道发送到 Kafka;User 类的代码如下 : package com.iteblog.dao; /** * Created by https://www.iteblog.com on 2016/10/21. */ public class User { private String username; private String email; public User() { public User(String username, String email) { this.username = username; this.email = email; public String getusername() { 3 / 6

return username; public void setusername(string username) { this.username = username; public String getemail() { return email; public void setemail(string email) { this.email = email; @Override public String tostring() { return "User{" + "username='" + username + '\'' + ", email='" + email + '\'' + ''; 在发送消息的时候, 我们还可以设置消息的 Key, 以及需要发送的 Topic, 如下 : channel.send(messagebuilder.withpayload(user).setheader("topic", "user").setheader("messagekey", user.getusername()).build()); 这里通过设置 topic messagekey 等 Header 信息, 来分别制定目标主题的名称 ( 当然, 如果只有一个主题我们不需要手动指定,Spring 会自定选择配置文件里面指定的 Topic; 如果有多个需要手动指定 ) 和 key 的值 我们还可以设置消息的 Key 和 Value 编码格式, 如下 : <bean id="userencoder" class="org.springframework.integration.kafka.serializer.avro.avroreflectdatumbackedkafk aencoder"> <constructor-arg value="com.iteblog.dao.user"></constructor> </bean> 4 / 6

<bean id="keyencoder" class="org.springframework.integration.kafka.serializer.avro.avroreflectdatumbackedkafk aencoder"> <constructor-arg value="java.lang.string"></constructor> </bean> 然后在配置文件里面配置 : <int-kafka:producer-context id="kafkaproducercontext" producer-properties="producerproperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="www.iteblog.com:9092" key-class-type="java.lang.string" value-class-type="com.iteblog.dao.user" value-encoder="userencoder" key-encoder="keyencoder" topic="user" compression-codec="none"/> </int-kafka:producer-configurations> </int-kafka:producer-context> 如果我们得 Topic 有多个分区, 我们还可以指定每条消息的分区 ID 计算规则, 如下 : <bean id="iteblogpartitioner" class="org.springframework.integration.kafka.support.defaultpa rtitioner"/> 我们使用了 Spring 默认分区类, 也就是计算 Key 的 hashcode 再对分区数求模 (Utils.abs(key.hashC ode()) % numpartitions), 然后我们可以在 int-kafka:producer-configuration 里面加上以下配置 partitioner="iteblogpartitioner" 到这里, 我们就可以在 Tomcat 中启动上面的 Web 工程, 然后访问 https://www.iteblog.com/regist er 并传入 username 和 email 参数即可将消息发送到 Kafka, 完整的工程目录结构如下 :. 5 / 6

Powered by TCPDF (www.tcpdf.org) pom.xml src main java com iteblog controller IteblogController.java dao User.java resources applicationcontext.xml kafka-outbound-context.xml log4j.properties webapp index.jsp register.html WEB-INF web.xml 10 directories, 9 files 我将在后面的文章介绍如何在 Spring 中接收 Kafka 的消息, 敬请关注 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 6 / 6