CDH5-flume+kafka
最近简单学习了如何在CDH5
上搭建flume
和kafka
,在这篇文章里记录下。
添加服务
我目前在使用的CDH版本是5.3.9,flume 直接可以在CDH上添加服务,非常简单,具体可见官网的文档。而kafka 则相对比较麻烦点,需要下载kafka的服务描述jar包:
# 在cloudera-scm-server上执行
mkdir -pv /opt/cloudera/csd
wget -c http://archive.cloudera.com/csds/kafka/KAFKA-1.2.0.jar -O /opt/cloudera/csd/KAFKA-1.2.0.jar
chown -R cloudera-scm:cloudera-scm /opt/cloudera/csd
需要重启Cloudera Management Service
,可以在CDH页面上执行。之后就可以在percel
页面中看到kafka
了。接着就是分配、激活kafka
和添加服务了。具体可以看下这篇文章。
我的这个测试集群一共有四台机器,其中cdh1.mycluster.com
为namenode
,zookeeper
也只安装在了namenode
上。cdh[2-4].mycluster.com
为datanode
。
配置flume
在页面上修改flume配置文件,如下:
tier1.sources=src_http_41800
tier1.channels=ch_kafka_hive_table
tier1.sinks=sink_hdfs_hive_table
tier1.sources.src_http_41800.type=http
tier1.sources.src_http_41800.port=41800
tier1.sources.src_http_41800.channels=ch_kafka_hive_table
tier1.sources.src_http_41800.handler=org.apache.flume.source.http.JSONHandler
tier1.channels.ch_kafka_hive_table.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka_hive_table.capacity=10000
tier1.channels.ch_kafka_hive_table.transactionCapacity=1000
tier1.channels.ch_kafka_hive_table.brokerList=cdh2.mycluster.com:9092,cdh3.mycluster.com:9092,cdh4.mycluster.com:9092
tier1.channels.ch_kafka_hive_table.topic=flume_hive_table
tier1.channels.ch_kafka_hive_table.zookeeperConnect=cdh1.mycluster.com:2181
tier1.channels.ch_kafka_hive_table.kafka.producer.type=async
tier1.channels.ch_kafka_hive_table.kafka.acks=all
tier1.channels.ch_kafka_hive_table.kafka.compression.type=snappy
tier1.channels.ch_kafka_hive_table.kafka.batch.size=16384
tier1.channels.ch_kafka_hive_table.kafka.linger.ms=100
tier1.channels.ch_kafka_hive_table.kafka.max.request.size=1048576
tier1.sinks.sink_hdfs_hive_table.type=hdfs
tier1.sinks.sink_hdfs_hive_table.channel=ch_kafka_hive_table
tier1.sinks.sink_hdfs_hive_table.hdfs.path=hdfs://cdh1.mycluster.com/user/hive/warehouse/%{database}.db/%{table}/%{partition}
tier1.sinks.sink_hdfs_hive_table.hdfs.writeFormat=Text
tier1.sinks.sink_hdfs_hive_table.hdfs.fileType=DataStream
tier1.sinks.sink_hdfs_hive_table.hdfs.inUsePrefix=.
tier1.sinks.sink_hdfs_hive_table.hdfs.rollInterval=3600
tier1.sinks.sink_hdfs_hive_table.hdfs.rollSize=0
tier1.sinks.sink_hdfs_hive_table.hdfs.rollCount=0
tier1.sinks.sink_hdfs_hive_table.hdfs.batchSize=1000
tier1.sinks.sink_hdfs_hive_table.hdfs.txnEventMax=1000
tier1.sinks.sink_hdfs_hive_table.hdfs.callTimeout=60000
tier1.sinks.sink_hdfs_hive_table.hdfs.appendTimeout=60000
重启flume后,可以发现flume起了个41800端口。通过POST提交数据到这个端口:
curl -X POST -H "Content-Type: application/json; charset=UTF-8" -d '[{"headers":{"database":"db_test","table":"t_log_test","partition":"dt=2016-08-09"},{"headers":{"database":"db_test","table":"t_log_test2","partition":"dt=2016-08-09"},"body":"4\t5\t6"}]' http://cdh2.mycluster.com:41800/
如果返回200则说明提交成功了。这时看下hadoop
:
# sudo -u hdfs hadoop fs -ls /user/hive/warehouse/db_test.db
Found 3 items
drwxrwxrwt - flume hive 0 2016-09-11 20:35 /user/hive/warehouse/db_test.db/t_log_test
drwxrwxrwt - flume hive 0 2016-09-11 20:35 /user/hive/warehouse/db_test.db/t_log_test2
# 数据先是存在tmp文件,1小时候才变成正式的FlumeData文件
# sudo -u hdfs hadoop fs -ls /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/
Found 1 items
-rw-r--r-- 2 flume hive 36 2016-09-11 21:35 /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/FlumeData.1473597323541
# sudo -u hdfs hadoop fs -cat /user/hive/warehouse/db_test.db/t_log_test/dt=2016-08-08/FlumeData.1473597323541
1 2 3