--创建一个数据库CREATE DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;--创建一个表,用户保存拉取目标表位置的信息CREATE TABLE mysqlsource.flume_meta ( source_tab varchar(255) NOT NULL, currentIndex varchar(255) NOT NULL, PRIMARY KEY (source_tab)) ENGINE=InnoDB DEFAULT CHARSET=utf8;--插入数据insert into mysqlsource.flume_meta(source_tab,currentIndex) values ('student','4');--创建要拉取数据的表CREATE TABLE mysqlsource.student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(255) NOT NULL, PRIMARY KEY (id)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;--向student表中添加测试数据insert into mysqlsource.student(id,name) values (1,'zhangsan'),(2,'lisi'),(3,'wangwu'),(4,'zhaoliu');
package com.shockang.study.bigdata.flume;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.PollableSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.SimpleEvent;import org.apache.flume.source.AbstractSource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.ParseException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;public class MysqlSource extends AbstractSource implements Configurable, PollableSource { //打印日志 private static final Logger LOG = LoggerFactory.getLogger(MysqlSource.class); //定义sqlHelper private QueryMysql sqlSourceHelper; @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } @Override public void configure(Context context) { //初始化 try { sqlSourceHelper = new QueryMysql(context); } catch (ParseException e) { e.printStackTrace(); } } /** * 接受mysql表中的数据 * * @return * @throws EventDeliveryException */ @Override public PollableSource.Status process() throws EventDeliveryException { try { //查询数据表 List > result = sqlSourceHelper.executeQuery(); //存放event的集合 List events = new ArrayList<>(); //存放event头集合 HashMap header = new HashMap<>(); //如果有返回数据,则将数据封装为event if (!result.isEmpty()) { List allRows = sqlSourceHelper.getAllRows(result); Event event = null; for (String row : allRows) { event = new SimpleEvent(); event.setBody(row.getBytes()); event.setHeaders(header); events.add(event); } //将event写入channel this.getChannelProcessor().processEventBatch(events); //更新数据表中的offset信息 sqlSourceHelper.updateOffset2DB(result.size()); } //等待时长 Thread.sleep(sqlSourceHelper.getRunQueryDelay()); return Status.READY; } catch (InterruptedException e) { LOG.error("Error procesing row", e); return Status.BACKOFF; } } @Override public synchronized void stop() { LOG.info("Stopping sql source {} ...", getName()); try { //关闭资源 sqlSourceHelper.close(); } finally { super.stop(); } }}
测试
① 程序打成jar包,上传jar包到flume的lib目录下
② 配置文件准备
vim mysqlsource.conf
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = com.shockang.study.bigdata.flume.MysqlSourcea1.sources.r1.connection.url = jdbc:mysql://node1:3306/mysqlsourcea1.sources.r1.connection.user = roota1.sources.r1.connection.password = 123456a1.sources.r1.table = studenta1.sources.r1.columns.to.select = *a1.sources.r1.start.from=0a1.sources.r1.run.query.delay=3000# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Describe the sinka1.sinks.k1.type = logger# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1