39. String line = value.toString();
40. String[] webLogContents = line.split(\ 41.
42. //根据session来分组
43. word.set(webLogContents[2]); 44. try {
45. context.write(word,value); 46. } catch (IOException e) {
47. // TODO Auto-generated catch block 48. e.printStackTrace();
49. } catch (InterruptedException e) { 50. // TODO Auto-generated catch block 51. e.printStackTrace(); 52. } 53. } 54. } 55.
56. public static class visitReducer extends Reducer ritable>{ 57. 58. private Text content = new Text(); 59. private NullWritable v = NullWritable.get(); 60. VisitsInfoParser visitsParser = new VisitsInfoParser(); 61. SimpleDateFormat sdf = new SimpleDateFormat(\ 62. PageViewsParser pageViewsParser = new PageViewsParser(); 63. Map 64. 65. String entry_URL = \ 66. String leave_URL = \ 67. int total_visit_pages = 0; 68. 69. @Override 70. protected void reduce(Text key, Iterable xt) throws IOException, InterruptedException { 71. 72. //将session所对应的所有浏览记录按时间排序 73. ArrayList 75. browseInfoGroup.add(browseInfo.toString()); 76. } 77. Collections.sort(browseInfoGroup,new Comparator 79. SimpleDateFormat sdf = new SimpleDateFormat(\ m:ss\ 80. public int compare(String browseInfo1, String browseInfo2) { 81. String dateStr1 = browseInfo1.split(\ seInfo1.split(\ 82. String dateStr2 = browseInfo2.split(\ seInfo2.split(\ 83. Date date1; 84. Date date2; 85. try { 86. date1 = sdf.parse(dateStr1); 87. date2 = sdf.parse(dateStr2); 88. if(date1 == null && date2 == null) return 0; 89. return date1.compareTo(date2); 90. } catch (ParseException e) { 91. // TODO Auto-generated catch block 92. e.printStackTrace(); 93. return 0; 94. } 95. } 96. }); 97. 98. //统计该session访问的总页面数,第一次进入的页面,跳出的页 面 99. for(String browseInfo : browseInfoGroup) { 100. 101. String[] browseInfoStrArr = browseInfo.split(\ 102. String curVisitURL = browseInfoStrArr[3]; 103. Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL ); 104. if(curVisitURLInteger == null) { 105. viewedPagesMap.put(curVisitURL, 1); 106. } 107. } 108. total_visit_pages = viewedPagesMap.size(); 109. String visitsInfo = visitsParser.parser(browseInfoGroup, total_ visit_pages+\ 110. content.set(visitsInfo); 111. try { 112. context.write(content,v); 113. } catch (IOException e) { 114. // TODO Auto-generated catch block 115. e.printStackTrace(); 116. } catch (InterruptedException e) { 117. // TODO Auto-generated catch block 118. e.printStackTrace(); 119. } 120. } 121. } 122. 123. public static void main(String[] args) throws Exception { 124. 125. Configuration conf = new Configuration(); 126. 127. conf.set(\ 128. 129. Job job = Job.getInstance(conf); 130. 131. job.setJarByClass(VisitsInfo.class); 132. 133. //指定本业务job要使用的mapper/Reducer业务类 134. job.setMapperClass(visitMapper.class); 135. job.setReducerClass(visitReducer.class); 136. 137. //指定mapper输出数据的kv类型 138. job.setMapOutputKeyClass(Text.class); 139. job.setMapOutputValueClass(Text.class); 140. 141. //指定最终输出的数据的kv类型 142. job.setOutputKeyClass(Text.class); 143. job.setOutputValueClass(NullWritable.class); 144. 145. Date curDate = new Date(); 146. SimpleDateFormat sdf = new SimpleDateFormat(\ 147. String dateStr = sdf.format(curDate); 148. 149. //指定job的输入原始文件所在目录 150. FileInputFormat.setInputPaths(job, new Path(\ ta/\ 151. //指定job的输出结果所在目录 152. FileOutputFormat.setOutputPath(job, new Path(\ fo\ 153. 154. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn 去运行 155. 156. boolean res = job.waitForCompletion(true); 157. System.exit(res?0:1); 158. 159. } 160. } [plain] view plain copy 1. package com.guludada.dataparser; 2. 3. import java.util.ArrayList; 4. 5. import com.guludada.javabean.PageViewsBean; 6. import com.guludada.javabean.VisitsInfoBean; 7. import com.guludada.javabean.WebLogSessionBean; 8. 9. public class VisitsInfoParser { 10. 11. public String parser(ArrayList um) { 12. 13. VisitsInfoBean visitsBean = new VisitsInfoBean(); 14. String entryPage = pageViewsGroup.get(0).split(\ 15. String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split (\ 16. String startTime = pageViewsGroup.get(0).split(\ iewsGroup.get(0).split(\ 17. String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(\ \ 18. \ \ 19. String session = pageViewsGroup.get(0).split(\ 20. String IP = pageViewsGroup.get(0).split(\ 21. String referal = pageViewsGroup.get(0).split(\ 22. 23. visitsBean.setSession(session); 24. visitsBean.setStart_time(startTime); 25. visitsBean.setEnd_time(endTime); 26. visitsBean.setEntry_page(entryPage); 27. visitsBean.setLeave_page(leavePage); 28. visitsBean.setVisit_page_num(totalVisitNum); 29. visitsBean.setIP_addr(IP); 30. visitsBean.setReferal(referal); 31. 32. return visitsBean.toString(); 33. } 34. } [plain] view plain copy 1. package com.guludada.javabean; 2. 3. import java.text.ParseException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date; 6. 7. public class VisitsInfoBean { 8. 9. String session; 10. String start_time; 11. String end_time; 12. String entry_page; 13. String leave_page; 14. String visit_page_num; 15. String IP_addr; 16. String referal; 17. 18. public String getSession() { 19. return session; 20. } 21. public void setSession(String session) { 22. this.session = session; 23. } 24. public String getStart_time() { 25. return start_time; 26. } 27. public void setStart_time(String start_time) { 28. this.start_time = start_time; 29. } 30. public String getEnd_time() { 31. return end_time; 32. } 33. public void setEnd_time(String end_time) { 34. this.end_time = end_time;