Flume-ng 与 Mysql 整合开发 我们知道,Flume 可以和许多的系统进行整合, 包括了 Hadoop Spark Kafka Hbase 等等 ; 当然, 强悍的 Flume 也是可以和 Mysql 进行整合, 将分析好的日志存储到 Mysql( 当然, 你也可以存放到 pg oracle 等等关系型数据库 ) 不过我这里想多说一些 :Flume 是分布式收集日志的系统 ; 既然都分布式了, 数据量应该很大, 为什么你要将 Flume 分析出来的数据用 Mysql 进行储存? 能否在下面评论处留下你的使用场景呢? 如果想及时了解 Spark Hadoop 或者 Hbase 相关的文章, 欢迎关注微信公共帐号 :iteblog_hadoop 其实,Flume 和 Mysql 进行整合开发的过程也是相当的简单的 代码如下 : package com.iteblog.flume; /** * User: 过往记忆 * Date: 14-9-4 * Time: 下午 13:16 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1109 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ import com.google.common.base.preconditions; import com.google.common.base.throwables; import com.google.common.collect.lists; import org.apache.flume.*; import org.apache.flume.conf.configurable; 1 / 7
import org.apache.flume.sink.abstractsink; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.sql.connection; import java.sql.drivermanager; import java.sql.preparedstatement; import java.sql.sqlexception; import java.util.list; public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname; private String port; private String databasename; private String tablename; private String user; private String password; private PreparedStatement preparedstatement; private Connection conn; private int batchsize; public MysqlSink() { LOG.info("MysqlSink start..."); public void configure(context context) { hostname = context.getstring("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getstring("port"); Preconditions.checkNotNull(port, "port must be set!!"); databasename = context.getstring("databasename"); Preconditions.checkNotNull(databaseName, "databasename must be set!!"); tablename = context.getstring("tablename"); Preconditions.checkNotNull(tableName, "tablename must be set!!"); user = context.getstring("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getstring("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchsize = context.getinteger("batchsize", 100); Preconditions.checkNotNull(batchSize > 0, "batchsize must be a positive number!!"); 2 / 7
public void start() { super.start(); // 调用 Class.forName() 方法加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); catch (ClassNotFoundException e) { String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databasename; // 调用 DriverManager 对象的 getconnection() 方法, 获得一个 Connection 对象 conn = DriverManager.getConnection(url, user, password); conn.setautocommit(false); // 创建一个 Statement 对象 preparedstatement = conn.preparestatement("insert into " + tablename + " (content) values (?)"); catch (SQLException e) { System.exit(1); public void stop() { super.stop(); if (preparedstatement!= null) { preparedstatement.close(); catch (SQLException e) { if (conn!= null) { conn.close(); catch (SQLException e) { 3 / 7
public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getchannel(); Transaction transaction = channel.gettransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); for (int i = 0; i < batchsize; i++) { event = channel.take(); if (event!= null) { content = new String(event.getBody()); actions.add(content); else { result = Status.BACKOFF; break; if (actions.size() > 0) { preparedstatement.clearbatch(); for (String temp : actions) { preparedstatement.setstring(1, temp); preparedstatement.addbatch(); preparedstatement.executebatch(); conn.commit(); transaction.commit(); catch (Throwable e) { transaction.rollback(); catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); finally { transaction.close(); 4 / 7
return result; pom 文件中的依赖 : <dependencies> <groupid>org.apache.flume</groupid> <artifactid>flume-ng-core</artifactid> <groupid>org.apache.flume</groupid> <artifactid>flume-ng-configuration</artifactid> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>5.1.25</version> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> <scope>test</scope> </dependencies> 运行程序时, 先在 Mysql 中创建一个表 mysql> create table mysqltest( -> id int(11) NOT NULL AUTO_INCREMENT, 5 / 7
-> content varchar(50000) NOT NULL, -> PRIMARY KEY (`id`) -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; Query OK, 0 rows affected, 1 warning (0.05 sec) 然后在 flume 中创建以下配置 # User: 过往记忆 # Date: 14-9-4 # Time: 下午 13:16 # bolg: https://www.iteblog.com # 本文地址 :https://www.iteblog.com/archives/1109 # 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 # 过往记忆博客微信公共帐号 :iteblog_hadoop agent.sinks.mysqlsink.type = com.iteblog.flume.mysqlsink agent.sinks.mysqlsink.hostname=localhost agent.sinks.mysqlsink.port=3306 agent.sinks.mysqlsink.databasename=ngmonitor agent.sinks.mysqlsink.tablename=mysqltest agent.sinks.mysqlsink.user=root agent.sinks.mysqlsink.password=123456 agent.sinks.mysqlsink.channel = c1 用下面的命令就可以启动 : bin/flume-ng agent -c conf/ -f conf/mysql_test.conf -n agent 再看下 Mysql 中的情况 : mysql> select count(*) from mysqltest; +----------+ count(*) +----------+ 98300 +----------+ 6 / 7
Powered by TCPDF (www.tcpdf.org) Flume-ng 与 Mysql 整合开发 好了, 开发完成! 上面的程序还可以改进, 可以用 Mybatis 进行编写, 将 Flume 处理逻辑和业务的处理逻辑分离开, 这样下次只需要处理业务,Flume 那块都不需要我们去考虑了, 大大降低了编程的难度 具体怎么开发我就不说了, 有需要请线下讨论 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 7 / 7