COLUMNTYPE > FIELD > <
FIELD > <
COLUMNNAME > speed
COLUMNNAME > <
COLUMNTYPE > int
COLUMNTYPE > FIELD > <
FIELD > <
COLUMNNAME >
location
COLUMNNAME >
<
COLUMNTYPE > string
COLUMNTYPE > FIELD >
FIELDLIST > <
DELIMITER > ,
DELIMITER >
TUPLEINFO >
通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。
Spout的实现步骤:
对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。 在数据得到了字段的说明后,将其转换成tuple。
声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。 Spout的具体编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。
public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) {
_collector = collector; try {
fileReader = new BufferedReader(new FileReader(new File(file))); }
catch (FileNotFoundException e) {
System.exit(1); }
}
public void nextTuple() {
protected void ListenFile(File file) {
Utils.sleep(2000);
RandomAccessFile access = null; String line = null; try {
while ((line = access.readLine()) != null) {
if (line !=null) {
String[] fields=null; if (tupleInfo.getDelimiter().equals(\ line.split(\ else
fields = line.split (tupleInfo.getDelimiter()); if (tupleInfo.getFieldList().size() == _collector.emit(new Values(fields)); } } }
catch (IOException ex){ } } }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; for(int i=0; i fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName(); fields = fields.length) } declarer.declare(new Fields(fieldsArr)); } declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。 Bolt的实现 Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。 Figure 3:Spout到Bolt的数据流程。 ThresholdCalculatorBolt Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是: 临界值检查 临界值栏数检查(拆分成字段的数目) 临界值数据类型(拆分后字段的类型) 临界值出现的频数 临界值时间段检查 Listing Four中的类,定义用来保存这些值。 Listing Four:ThresholdInfo类 public class ThresholdInfo implementsSerializable { private String action; private String rule; private Object thresholdValue; private int thresholdColNumber; private Integer timeWindow; private int frequencyOfOccurence; } 基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。 Listing Five:临界值检测代码段 public void execute(Tuple tuple, BasicOutputCollector collector) { if(tuple!=null) { List String valueToCheck = inputTupleList.get(thresholdColNum-1).toString(); String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals(\ { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals(\ { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else System.out.println(\not supported\ } } else