第三步,清洗第二步生成的Session信息,生成PageViews信息表
[plain] view plain copy
1. package com.guludada.clickstream; 2.
3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.Map; 13.
14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path;
16. import org.apache.hadoop.io.NullWritable; 17. import org.apache.hadoop.io.Text; 18. import org.apache.hadoop.mapreduce.Job; 19. import org.apache.hadoop.mapreduce.Mapper; 20. import org.apache.hadoop.mapreduce.Reducer; 21. import org.apache.hadoop.mapreduce.Mapper.Context;
22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24.
25. import com.guludada.clickstream.logClean.cleanMap; 26. import com.guludada.clickstream.logSession.sessionMapper; 27. import com.guludada.clickstream.logSession.sessionReducer; 28. import com.guludada.dataparser.PageViewsParser; 29. import com.guludada.dataparser.SessionParser; 30. import com.guludada.dataparser.WebLogParser; 31. import com.guludada.javabean.PageViewsBean; 32. import com.guludada.javabean.WebLogSessionBean; 33.
34. public class PageViews { 35.
36. public static class pageMapper extends Mapper
38. private Text word = new Text();
39.
40. public void map(Object key,Text value,Context context) { 41.
42. String line = value.toString();
43. String[] webLogContents = line.split(\ 44.
45. //根据session来分组
46. word.set(webLogContents[2]); 47. try {
48. context.write(word,value); 49. } catch (IOException e) {
50. // TODO Auto-generated catch block 51. e.printStackTrace();
52. } catch (InterruptedException e) { 53. // TODO Auto-generated catch block 54. e.printStackTrace(); 55. } 56. } 57. } 58.
59. public static class pageReducer extends Reducer itable>{ 60. 61. private Text session = new Text(); 62. private Text content = new Text(); 63. private NullWritable v = NullWritable.get(); 64. PageViewsParser pageViewsParser = new PageViewsParser(); 65. SimpleDateFormat sdf = new SimpleDateFormat(\ 66. //上一条记录的访问信息 67. PageViewsBean lastStayPageBean = null; 68. Date lastVisitTime = null; 69. 70. @Override 71. protected void reduce(Text key, Iterable xt) throws IOException, InterruptedException { 72. 73. //将session所对应的所有浏览记录按时间排序 74. ArrayList eViewsBean>(); 75. for(Text pageView : values) { 76. PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageV iew.toString()); 77. pageViewsBeanGroup.add(pageViewsBean); 78. } 79. Collections.sort(pageViewsBeanGroup,new Comparator >() { 80. 81. public int compare(PageViewsBean pageViewsBean1, PageViewsBe an pageViewsBean2) { 82. Date date1 = pageViewsBean1.getTimeWithDateFormat(); 83. Date date2 = pageViewsBean2.getTimeWithDateFormat(); 84. if(date1 == null && date2 == null) return 0; 85. return date1.compareTo(date2); 86. } 87. }); 88. 89. //计算每个页面的停留时间 90. int step = 0; 91. for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { 92. 93. Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); 94. 95. if(lastStayPageBean != null) { 96. //计算前后两次访问记录相差的时间,单位是秒 97. Integer timeDiff = (int) ((curVisitTime.getTime() - last VisitTime.getTime())/1000); 98. //根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时 间 99. lastStayPageBean.setStayTime(timeDiff.toString()); 100. } 101. 102. //更新访问记录的步数 103. step++; 104. pageViewsBean.setStep(step+\ 105. //更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信 息记录 106. lastStayPageBean = pageViewsBean; 107. lastVisitTime = curVisitTime; 108. 109. //输出pageViews信息 110. content.set(pageViewsParser.parser(pageViewsBean)); 111. try { 112. context.write(content,v); 113. } catch (IOException e) { 114. // TODO Auto-generated catch block 115. e.printStackTrace(); 116. } catch (InterruptedException e) { 117. // TODO Auto-generated catch block 118. e.printStackTrace(); 119. } 120. } 121. } 122. } 123. 124. public static void main(String[] args) throws Exception { 125. 126. Configuration conf = new Configuration(); 127. 128. conf.set(\ 129. 130. Job job = Job.getInstance(conf); 131. 132. job.setJarByClass(PageViews.class); 133. 134. //指定本业务job要使用的mapper/Reducer业务类 135. job.setMapperClass(pageMapper.class); 136. job.setReducerClass(pageReducer.class); 137. 138. //指定mapper输出数据的kv类型 139. job.setMapOutputKeyClass(Text.class); 140. job.setMapOutputValueClass(Text.class); 141. 142. //指定最终输出的数据的kv类型 143. job.setOutputKeyClass(Text.class); 144. job.setOutputValueClass(NullWritable.class); 145. 146. Date curDate = new Date(); 147. SimpleDateFormat sdf = new SimpleDateFormat(\ 148. String dateStr = sdf.format(curDate); 149. 150. //指定job的输入原始文件所在目录 151. FileInputFormat.setInputPaths(job, new Path(\ ta/\ 152. //指定job的输出结果所在目录 153. FileOutputFormat.setOutputPath(job, new Path(\ s/\ 154. 155. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn 去运行 156.