Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(6)

2019-08-03 13:18

第三步,清洗第二步生成的Session信息,生成PageViews信息表

[plain] view plain copy

1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.Map; 13.

14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path;

16. import org.apache.hadoop.io.NullWritable; 17. import org.apache.hadoop.io.Text; 18. import org.apache.hadoop.mapreduce.Job; 19. import org.apache.hadoop.mapreduce.Mapper; 20. import org.apache.hadoop.mapreduce.Reducer; 21. import org.apache.hadoop.mapreduce.Mapper.Context;

22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24.

25. import com.guludada.clickstream.logClean.cleanMap; 26. import com.guludada.clickstream.logSession.sessionMapper; 27. import com.guludada.clickstream.logSession.sessionReducer; 28. import com.guludada.dataparser.PageViewsParser; 29. import com.guludada.dataparser.SessionParser; 30. import com.guludada.dataparser.WebLogParser; 31. import com.guludada.javabean.PageViewsBean; 32. import com.guludada.javabean.WebLogSessionBean; 33.

34. public class PageViews { 35.

36. public static class pageMapper extends Mapper { 37.

38. private Text word = new Text();

39.

40. public void map(Object key,Text value,Context context) { 41.

42. String line = value.toString();

43. String[] webLogContents = line.split(\ 44.

45. //根据session来分组

46. word.set(webLogContents[2]); 47. try {

48. context.write(word,value); 49. } catch (IOException e) {

50. // TODO Auto-generated catch block 51. e.printStackTrace();

52. } catch (InterruptedException e) { 53. // TODO Auto-generated catch block 54. e.printStackTrace(); 55. } 56. } 57. } 58.

59. public static class pageReducer extends Reducer

itable>{ 60.

61. private Text session = new Text(); 62. private Text content = new Text();

63. private NullWritable v = NullWritable.get();

64. PageViewsParser pageViewsParser = new PageViewsParser();

65. SimpleDateFormat sdf = new SimpleDateFormat(\

66. //上一条记录的访问信息

67. PageViewsBean lastStayPageBean = null; 68. Date lastVisitTime = null; 69.

70. @Override

71. protected void reduce(Text key, Iterable values, Context conte

xt) throws IOException, InterruptedException { 72.

73. //将session所对应的所有浏览记录按时间排序

74. ArrayList pageViewsBeanGroup = new ArrayList

eViewsBean>();

75. for(Text pageView : values) {

76. PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageV

iew.toString());

77. pageViewsBeanGroup.add(pageViewsBean);

78. }

79. Collections.sort(pageViewsBeanGroup,new Comparator

>() { 80.

81. public int compare(PageViewsBean pageViewsBean1, PageViewsBe

an pageViewsBean2) {

82. Date date1 = pageViewsBean1.getTimeWithDateFormat(); 83. Date date2 = pageViewsBean2.getTimeWithDateFormat(); 84. if(date1 == null && date2 == null) return 0; 85. return date1.compareTo(date2); 86. } 87. }); 88.

89. //计算每个页面的停留时间 90. int step = 0;

91. for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { 92.

93. Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); 94.

95. if(lastStayPageBean != null) {

96. //计算前后两次访问记录相差的时间,单位是秒

97. Integer timeDiff = (int) ((curVisitTime.getTime() - last

VisitTime.getTime())/1000);

98. //根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时

99. lastStayPageBean.setStayTime(timeDiff.toString()); 100. } 101.

102. //更新访问记录的步数 103. step++;

104. pageViewsBean.setStep(step+\

105. //更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信

息记录

106. lastStayPageBean = pageViewsBean; 107. lastVisitTime = curVisitTime; 108.

109. //输出pageViews信息

110. content.set(pageViewsParser.parser(pageViewsBean));

111. try {

112. context.write(content,v); 113. } catch (IOException e) {

114. // TODO Auto-generated catch block 115. e.printStackTrace();

116. } catch (InterruptedException e) { 117. // TODO Auto-generated catch block 118. e.printStackTrace(); 119. } 120. } 121. } 122. } 123.

124. public static void main(String[] args) throws Exception { 125.

126. Configuration conf = new Configuration(); 127.

128. conf.set(\ 129.

130. Job job = Job.getInstance(conf); 131.

132. job.setJarByClass(PageViews.class); 133.

134. //指定本业务job要使用的mapper/Reducer业务类 135. job.setMapperClass(pageMapper.class); 136. job.setReducerClass(pageReducer.class); 137.

138. //指定mapper输出数据的kv类型

139. job.setMapOutputKeyClass(Text.class); 140. job.setMapOutputValueClass(Text.class); 141.

142. //指定最终输出的数据的kv类型

143. job.setOutputKeyClass(Text.class);

144. job.setOutputValueClass(NullWritable.class); 145.

146. Date curDate = new Date();

147. SimpleDateFormat sdf = new SimpleDateFormat(\ 148. String dateStr = sdf.format(curDate); 149.

150. //指定job的输入原始文件所在目录

151. FileInputFormat.setInputPaths(job, new Path(\

ta/\

152. //指定job的输出结果所在目录

153. FileOutputFormat.setOutputPath(job, new Path(\

s/\ 154.

155. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn

去运行 156.


Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(6).doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

下一篇:砂浆合同

相关阅读
本类排行
× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: