Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(2)

2019-08-03 13:18

本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。

需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选

择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。

FTP服务器上的Flume配置文件如下:

[plain] view plain copy

1. agent.channels = memorychannel 2. agent.sinks = target 3.

4. agent.sources.origin.type = spooldir

5. agent.sources.origin.spoolDir = /export/data/trivial/weblogs 6. agent.sources.origin.channels = memorychannel

7. agent.sources.origin.deserializer.maxLineLength = 2048 8.

9. agent.sources.origin.interceptors = i2

10. agent.sources.origin.interceptors.i2.type = host

11. agent.sources.origin.interceptors.i2.hostHeader = hostname 12.

13. agent.sinks.loggerSink.type = logger

14. agent.sinks.loggerSink.channel = memorychannel 15.

16. agent.channels.memorychannel.type = memory 17. agent.channels.memorychannel.capacity = 10000 18.

19. agent.sinks.target.type = avro

20. agent.sinks.target.channel = memorychannel 21. agent.sinks.target.hostname = 172.16.124.130 22. agent.sinks.target.port = 4545

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。

需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

在Hadoop服务器上的配置文件如下:

[plain] view plain copy

1. agent.sources = origin

2. agent.channels = memorychannel 3. agent.sinks = target 4.

5. agent.sources.origin.type = avro

6. agent.sources.origin.channels = memorychannel 7. agent.sources.origin.bind = 0.0.0.0 8. agent.sources.origin.port = 4545 9.

10. #agent.sources.origin.interceptors = i1 i2

11. #agent.sources.origin.interceptors.i1.type = timestamp 12. #agent.sources.origin.interceptors.i2.type = host

13. #agent.sources.origin.interceptors.i2.hostHeader = hostname 14.

15. agent.sinks.loggerSink.type = logger

16. agent.sinks.loggerSink.channel = memorychannel 17.

18. agent.channels.memorychannel.type = memory 19. agent.channels.memorychannel.capacity = 5000000

20. agent.channels.memorychannel.transactionCapacity = 1000000 21.

22. agent.sinks.target.type = hdfs

23. agent.sinks.target.channel = memorychannel

24. agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S 25. agent.sinks.target.hdfs.filePrefix = data-%{hostname} 26. agent.sinks.target.hdfs.rollInterval = 60 27. agent.sinks.target.hdfs.rollSize = 1073741824 28. agent.sinks.target.hdfs.rollCount = 1000000 29. agent.sinks.target.hdfs.round = true 30. agent.sinks.target.hdfs.roundValue = 10 31. agent.sinks.target.hdfs.roundUnit = minute 32. agent.sinks.target.hdfs.useLocalTimeStamp = true 33. agent.sinks.target.hdfs.minBlockReplicas=1 34. agent.sinks.target.hdfs.writeFormat=Text 35. agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。

Troubleshooting

使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题

需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。

使用Flume拉取到HDFS中的文件格式错乱

这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致

使用Mapreduce清洗日志文件

当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗 第一步,先用Mapreduce过滤掉无效的数据

[plain] view plain copy

1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date;

6. import java.util.StringTokenizer; 7. import java.util.regex.Matcher; 8. import java.util.regex.Pattern; 9.

10. import org.apache.hadoop.conf.Configuration; 11. import org.apache.hadoop.fs.Path; 12. import org.apache.hadoop.io.IntWritable; 13. import org.apache.hadoop.io.NullWritable; 14. import org.apache.hadoop.io.Text; 15. import org.apache.hadoop.mapreduce.Job; 16. import org.apache.hadoop.mapreduce.Mapper;

17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19.

20. import com.guludada.dataparser.WebLogParser; 21. 22.

23. public class logClean { 24.

25. public static class cleanMap extends Mapper

e> { 26.

27. private NullWritable v = NullWritable.get(); 28. private Text word = new Text();

29. WebLogParser webLogParser = new WebLogParser(); 30.

31. public void map(Object key,Text value,Context context) { 32.

33. //将一行内容转成string

34. String line = value.toString(); 35.

36. String cleanContent = webLogParser.parser(line); 37.

38. if(cleanContent != \ 39. word.set(cleanContent); 40. try {

41. context.write(word,v); 42. } catch (IOException e) {

43. // TODO Auto-generated catch block 44. e.printStackTrace();

45. } catch (InterruptedException e) { 46. // TODO Auto-generated catch block 47. e.printStackTrace(); 48. } 49. } 50. } 51. } 52.

53. public static void main(String[] args) throws Exception { 54.

55. Configuration conf = new Configuration(); 56.

57. conf.set(\ 58.

59. Job job = Job.getInstance(conf); 60.

61. job.setJarByClass(logClean.class); 62.

63. //指定本业务job要使用的mapper/Reducer业务类 64. job.setMapperClass(cleanMap.class); 65.

66. //指定mapper输出数据的kv类型

67. job.setMapOutputKeyClass(Text.class);

68. job.setMapOutputValueClass(NullWritable.class); 69.

70. //指定job的输入原始文件所在目录 71. Date curDate = new Date();

72. SimpleDateFormat sdf = new SimpleDateFormat(\ 73. String dateStr = sdf.format(curDate);

74. FileInputFormat.setInputPaths(job, new Path(\

tr + \ 75.

76. //指定job的输出结果所在目录

77. FileOutputFormat.setOutputPath(job, new Path(\

/\ 78.

79. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去

运行

80. boolean res = job.waitForCompletion(true); 81. System.exit(res?0:1); 82. 83. } 84. 85. }

[plain] view plain copy

1. package com.guludada.dataparser; 2.

3. import java.io.IOException;


Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(2).doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

下一篇:砂浆合同

相关阅读
本类排行
× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: