Flume-ng与Mysql整合开发

Similar documents
Spark读取Hbase中的数据

Apache CarbonData集群模式使用指南

Hive:用Java代码通过JDBC连接Hiveserver

使用Cassandra和Spark 2.0实现Rest API服务

"+handlermethod.getbean().getclass().getname()); public void aftercompletion(httpservletrequest req, HttpServletResponse resp, Object handler, Excepti

通过Hive将数据写入到ElasticSearch

Kafka客户端是如何找到 leader 分区的

使用MapReduce读取XML文件

Guava学习之Resources

在Spring中使用Kafka:Producer篇

三种方法实现Hadoop(MapReduce)全局排序(1)

Hadoop元数据合并异常及解决方法

SparkR(R on Spark)编程指南

韶关:神奇丹霞

使用Spark SQL读取Hive上的数据

Microsoft Word - 01.DOC

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

EJB-Programming-4-cn.doc

六种使用Linux命令发送带附件的邮件

自定义Spark Streaming接收器(Receivers)

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

untitled

JBuilder Weblogic

将 MySQL 的全量数据导入到 Apache Solr 中

ASP.NET实现下拉框二级联动组件

untitled

untitled

FileMaker 15 ODBC 和 JDBC 指南

FileMaker 16 ODBC 和 JDBC 指南

Java

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

一步一步教你搞网站同步镜像!|动易Cms

untitled

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下


案例分享产品文档

JavaIO.PDF

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

伊春:醉人林都

Spark作业代码(源码)IDE远程调试

1-1 database columnrow record field 不 DBMS Access Paradox SQL Server Linux MySQL Oracle IBM Informix IBM DB2 Sybase 1-2

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

江门:中国第一侨乡

将 MD5 的工具类拷贝到项目中 二 微服务模块的搭建 我们将权限的查询放到一个单独的模块中, 这个模块提供接口供给消费者远程调用 (RPC), 这次范例是微服开发的雏形, 在以后你使用 springcloud 的时候会使用到今天的概念 1 使用 maven 创建新的模块 (microboot-sh

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

chp6.ppt

untitled

HBase 中加盐(Salting)之后的表如何读取:协处理器篇

PowerPoint Presentation

《大话设计模式》第一章

关林:武圣陵寝

泰山:五岳独尊

Chapter 9: Objects and Classes

基于ECO的UML模型驱动的数据库应用开发1.doc

Apache Spark 2.4 新增内置函数和高阶函数使用介绍

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

国内26省市新能源汽车推广规划已出台

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

北戴河:海阔天空

SQL Server SQL Server SQL Mail Windows NT

三种方法实现Hadoop(MapReduce)全局排序(2)

SA-DK2-U3Rユーザーズマニュアル

untitled

Oracle高级复制冲突解决机制的研究

威 福 髮 藝 店 桃 園 市 蘆 竹 區 中 山 里 福 祿 一 街 48 號 地 下 一 樓 50,000 獨 資 李 依 純 105/04/06 府 經 登 字 第 號 宏 品 餐 飲 桃 園 市 桃 園 區 信 光 里 民

回滚段探究

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

ebook 96-16


西岭雪山滑雪场

Ubuntu和CentOS如何配置SSH使得无密码登陆

在Fedora上部署Hadoop2.2.0伪分布式平台

untitled

D getinitparameternames() 9 下 列 选 项 中, 属 于 Servlet API 中 提 供 的 request 对 象 的 包 装 类 的 是 ( ) A HttpServletRequestWrapper B HttpServletRequest C HttpServ

Flink快速上手(QuickStart)

Hive几种数据导入方式

學 科 100% ( 為 單 複 選 題, 每 題 2.5 分, 共 100 分 ) 1. 請 參 閱 附 圖 作 答 : (A) 選 項 A (B) 選 項 B (C) 選 項 C (D) 選 項 D Ans:D 2. 下 列 對 於 資 料 庫 正 規 化 (Normalization) 的 敘

Oracle 4

6-1 Table Column Data Type Row Record 1. DBMS 2. DBMS MySQL Microsoft Access SQL Server Oracle 3. ODBC SQL 1. Structured Query Language 2. IBM

用手機直接傳值不透過網頁連接, 來當作搖控器控制家電 ( 電視遙控器 ) 按下按鍵發送同時會回傳值來確定是否有送出 問題 :1. 應該是使用了太多 thread 導致在傳值上有問題 2. 一次按很多次按鈕沒辦法即時反應

EJB-Programming-3.PDF

RunPC2_.doc

Microsoft Word - Learn Objective-C.doc

Spark1.4中DataFrame功能加强,新增科学和数学函数

untitled

21 个你应该知道的 wget 命令

RUN_PC連載_12_.doc

Microsoft Word - MIS.doc

Get Started产品文档

4.1 AMI MQSeries API AMI MQI AMI / / AMI JavaC C++ AMI / AMI AMI - / /

Microsoft PowerPoint - ch6 [相容模式]

Microsoft PowerPoint 網站.ppt

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

untitled

建模与图形思考

Fun Time (1) What happens in memory? 1 i n t i ; 2 s h o r t j ; 3 double k ; 4 char c = a ; 5 i = 3; j = 2; 6 k = i j ; H.-T. Lin (NTU CSIE) Referenc

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

PowerPoint 演示文稿

PowerPoint 演示文稿

Transcription:

Flume-ng 与 Mysql 整合开发 我们知道,Flume 可以和许多的系统进行整合, 包括了 Hadoop Spark Kafka Hbase 等等 ; 当然, 强悍的 Flume 也是可以和 Mysql 进行整合, 将分析好的日志存储到 Mysql( 当然, 你也可以存放到 pg oracle 等等关系型数据库 ) 不过我这里想多说一些 :Flume 是分布式收集日志的系统 ; 既然都分布式了, 数据量应该很大, 为什么你要将 Flume 分析出来的数据用 Mysql 进行储存? 能否在下面评论处留下你的使用场景呢? 如果想及时了解 Spark Hadoop 或者 Hbase 相关的文章, 欢迎关注微信公共帐号 :iteblog_hadoop 其实,Flume 和 Mysql 进行整合开发的过程也是相当的简单的 代码如下 : package com.iteblog.flume; /** * User: 过往记忆 * Date: 14-9-4 * Time: 下午 13:16 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1109 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ import com.google.common.base.preconditions; import com.google.common.base.throwables; import com.google.common.collect.lists; import org.apache.flume.*; import org.apache.flume.conf.configurable; 1 / 7

import org.apache.flume.sink.abstractsink; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.sql.connection; import java.sql.drivermanager; import java.sql.preparedstatement; import java.sql.sqlexception; import java.util.list; public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname; private String port; private String databasename; private String tablename; private String user; private String password; private PreparedStatement preparedstatement; private Connection conn; private int batchsize; public MysqlSink() { LOG.info("MysqlSink start..."); public void configure(context context) { hostname = context.getstring("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getstring("port"); Preconditions.checkNotNull(port, "port must be set!!"); databasename = context.getstring("databasename"); Preconditions.checkNotNull(databaseName, "databasename must be set!!"); tablename = context.getstring("tablename"); Preconditions.checkNotNull(tableName, "tablename must be set!!"); user = context.getstring("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getstring("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchsize = context.getinteger("batchsize", 100); Preconditions.checkNotNull(batchSize > 0, "batchsize must be a positive number!!"); 2 / 7

public void start() { super.start(); // 调用 Class.forName() 方法加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); catch (ClassNotFoundException e) { String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databasename; // 调用 DriverManager 对象的 getconnection() 方法, 获得一个 Connection 对象 conn = DriverManager.getConnection(url, user, password); conn.setautocommit(false); // 创建一个 Statement 对象 preparedstatement = conn.preparestatement("insert into " + tablename + " (content) values (?)"); catch (SQLException e) { System.exit(1); public void stop() { super.stop(); if (preparedstatement!= null) { preparedstatement.close(); catch (SQLException e) { if (conn!= null) { conn.close(); catch (SQLException e) { 3 / 7

public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getchannel(); Transaction transaction = channel.gettransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); for (int i = 0; i < batchsize; i++) { event = channel.take(); if (event!= null) { content = new String(event.getBody()); actions.add(content); else { result = Status.BACKOFF; break; if (actions.size() > 0) { preparedstatement.clearbatch(); for (String temp : actions) { preparedstatement.setstring(1, temp); preparedstatement.addbatch(); preparedstatement.executebatch(); conn.commit(); transaction.commit(); catch (Throwable e) { transaction.rollback(); catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); finally { transaction.close(); 4 / 7

return result; pom 文件中的依赖 : <dependencies> <groupid>org.apache.flume</groupid> <artifactid>flume-ng-core</artifactid> <groupid>org.apache.flume</groupid> <artifactid>flume-ng-configuration</artifactid> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>5.1.25</version> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> <scope>test</scope> </dependencies> 运行程序时, 先在 Mysql 中创建一个表 mysql> create table mysqltest( -> id int(11) NOT NULL AUTO_INCREMENT, 5 / 7

-> content varchar(50000) NOT NULL, -> PRIMARY KEY (`id`) -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; Query OK, 0 rows affected, 1 warning (0.05 sec) 然后在 flume 中创建以下配置 # User: 过往记忆 # Date: 14-9-4 # Time: 下午 13:16 # bolg: https://www.iteblog.com # 本文地址 :https://www.iteblog.com/archives/1109 # 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 # 过往记忆博客微信公共帐号 :iteblog_hadoop agent.sinks.mysqlsink.type = com.iteblog.flume.mysqlsink agent.sinks.mysqlsink.hostname=localhost agent.sinks.mysqlsink.port=3306 agent.sinks.mysqlsink.databasename=ngmonitor agent.sinks.mysqlsink.tablename=mysqltest agent.sinks.mysqlsink.user=root agent.sinks.mysqlsink.password=123456 agent.sinks.mysqlsink.channel = c1 用下面的命令就可以启动 : bin/flume-ng agent -c conf/ -f conf/mysql_test.conf -n agent 再看下 Mysql 中的情况 : mysql> select count(*) from mysqltest; +----------+ count(*) +----------+ 98300 +----------+ 6 / 7

Powered by TCPDF (www.tcpdf.org) Flume-ng 与 Mysql 整合开发 好了, 开发完成! 上面的程序还可以改进, 可以用 Mybatis 进行编写, 将 Flume 处理逻辑和业务的处理逻辑分离开, 这样下次只需要处理业务,Flume 那块都不需要我们去考虑了, 大大降低了编程的难度 具体怎么开发我就不说了, 有需要请线下讨论 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 7 / 7