使用Storm实现实时大数据分析(2)

2019-02-15 12:11

COLUMNTYPE > <

FIELD > <

COLUMNNAME > speed

COLUMNNAME > <

COLUMNTYPE > int

COLUMNTYPE > <

FIELD > <

COLUMNNAME >

location

COLUMNNAME >

<

COLUMNTYPE > string

COLUMNTYPE >

FIELDLIST > <

DELIMITER > ,

DELIMITER >

TUPLEINFO >

通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。

Spout的实现步骤:

对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。 在数据得到了字段的说明后,将其转换成tuple。

声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。 Spout的具体编码在Listing Three中显示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) {

_collector = collector; try {

fileReader = new BufferedReader(new FileReader(new File(file))); }

catch (FileNotFoundException e) {

System.exit(1); }

}

public void nextTuple() {

protected void ListenFile(File file) {

Utils.sleep(2000);

RandomAccessFile access = null; String line = null; try {

while ((line = access.readLine()) != null) {

if (line !=null) {

String[] fields=null; if (tupleInfo.getDelimiter().equals(\ line.split(\ else

fields = line.split (tupleInfo.getDelimiter()); if (tupleInfo.getFieldList().size() == _collector.emit(new Values(fields)); } } }

catch (IOException ex){ } } }

public void declareOutputFields(OutputFieldsDeclarer declarer) {

String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; for(int i=0; i

fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName(); fields = fields.length)

}

declarer.declare(new Fields(fieldsArr)); }

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

临界值栏数检查(拆分成字段的数目) 临界值数据类型(拆分后字段的类型) 临界值出现的频数 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类

public class ThresholdInfo implementsSerializable {

private String action; private String rule;

private Object thresholdValue; private int thresholdColNumber; private Integer timeWindow;

private int frequencyOfOccurence; }

基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

Listing Five:临界值检测代码段

public void execute(Tuple tuple, BasicOutputCollector collector) {

if(tuple!=null) {

List inputTupleList = (List) tuple.getValues(); int thresholdColNum = thresholdInfo.getThresholdColNumber(); Object thresholdValue = thresholdInfo.getThresholdValue(); String thresholdDataType = tupleInfo.getFieldList().get(twww.sm136.comhresholdColNum-1).getColumnType(); Integer timeWindow = thresholdInfo.getTimeWindow(); int frequency = thresholdInfo.getFrequencyOfOccurence(); if(thresholdDataType.equalsIgnoreCase(\ {

String valueToCheck = inputTupleList.get(thresholdColNum-1).toString(); String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) {

long curTime = System.currentTimeMillis();

long diffInMinutes = (curTime-startTime)/(1000); if(diffInMinutes>=timeWindow) {

if(frequencyChkOp.equals(\ {

if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

splitAndEmit(inputTupleList,collector); } }

else if(frequencyChkOp.equals(\ {

if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

splitAndEmit(inputTupleList,collector); } }

else System.out.println(\not supported\

} } else


使用Storm实现实时大数据分析(2).doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

Copyright © 2019-2022 免费范文网 版权所有
声明 :本网站尊重并保护知识产权,根据《信息网络传播权保护条例》,如果我们转载的作品侵犯了您的权利,请在一个月内通知我们,我们会及时删除。
客服QQ: 邮箱:tiandhx2@hotmail.com
苏ICP备16052595号-18

× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: