Maxwell:发送MySQL binlog到Hadoop(kafka+flume)
发现了一个可以发送MySQL binlog到Hadoop的项目:maxwell
Maxwell安装和部署
二进制文件,参考官方文档QuickStart即可。
启动Maxwell
bin/maxwell --user='aaaa' --password='bbbb' --host='xxx.xxx.xxx.xxx' --producer=kafka --kafka.bootstrap.servers=192.168.11.211:9092,192.168.11.212:9092,192.168.11.62:9092
其中:
aaaa
和bbbb
是在QuickStart里面设置的MySQL用户和密码kafka.bootstrap.servers
参数是所有的kafka实例
使用kafka终端命令检查
先使用kafka终端命令检查binlog是否已经发送到kafka topic
# /opt/cloudera/parcels/KAFKA-2.0.2-1.2.0.2.p0.5/lib/kafka/bin/kafka-topics.sh -zookeeper=127.0.0.1:2181 -describe -topic maxwell
Topic:maxwell PartitionCount:1 ReplicationFactor:1 Configs:
Topic: maxwell Partition: 0 Leader: 1002 Replicas: 1002 Isr: 1002
使用消费者脚本
# /opt/cloudera/parcels/KAFKA-2.0.2-1.2.0.2.p0.5/lib/kafka/bin/kafka-console-consumer.sh -zookeeper=127.0.0.1:2181 --from-beginning --topic maxwell
## 类似下面的输出
{"database":"test3","table":"person","type":"insert","ts":1482915438,"xid":2881,"commit":true,"data":{"number":13,"name":"test13","birthday":"1992-11-30"}}
可以说kafka这边已经准备好了。
flume的配置
参考cloudera的文档(我的环境是CDH-5.3.9
,flume版本是1.5.0
,新的版本配置与我本文贴出来的不同,请注意)
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = zk01.example.com:2181
tier1.sources.source1.topic = weblogs
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
需要修改的是sources
的zookeeperConnect
和topic
,如果有需要,可以修改sinks
的hdfs.path
等配置。
如何确认已写入hadoop
在MySQL插入一条数据,然后查看hadoop
> use test3
> insert into person values(14, "test14", "2016-12-29");
# sudo -u hdfs hadoop fs -ls /tmp/kafka/maxwell/16-12-28/
Found 2 items
-rw-r--r-- 2 flume supergroup 156 2016-12-28 16:57 /tmp/kafka/maxwell/16-12-28/FlumeData.1482915441917
-rw-r--r-- 2 flume supergroup 156 2016-12-28 18:47 /tmp/kafka/maxwell/16-12-28/FlumeData.1482922069309
# sudo -u hdfs hadoop fs -cat /tmp/kafka/maxwell/16-12-28/FlumeData.1482922069309
{"database":"test3","table":"person","type":"insert","ts":1482922063,"xid":3121,"commit":true,"data":{"number":14,"name":"test14","birthday":"2016-12-29"}}
Have FUN
后记
遇到一个问题,在一台机器上重新配置了maxwell,发现maxwell发送到kafka总是TIMEOUT
。为了排查,启动maxwell时加上了--log_level=debug
,发现maxwell是用了域名去连接kafka,而这个域名只是内部域名,所以解析失败。/etc/hosts
加上后问题解决。