{
if(frequencyChkOp.equals(\ {
if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {
count.incrementAndGet(); if(count.get() > frequency)
splitAndEmit(inputTupleList,collector); } }
else if(frequencyChkOp.equals(\ {
if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {
count.incrementAndGet(); if(count.get() > fr0quency)
splitAndEmit(inputTupleList,collector); } } } } else if(thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\ {
String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) {
long valueToCheck Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); long curTime = System.currentTimeMillis();
long diffInMinutes = (curTime-startTime)/(1000);
System.out.println(\ if(diffInMinutes>=timeWindow) {
if(frequencyChkOp.equals(\ {
if(valueToCheck Double.parseDouble(thresholdValue.toString())) {
count.incrementAndGet(); if(count.get() > frequency)
|| || || || = <
splitAndEmit(inputTupleList,collector); } }
else if(frequencyChkOp.equals(\ {
if(valueToCheck > Double.parseDouble(thresholdValue.toString())) {
count.incrementAndGet(); if(count.get() > frequency)
splitAndEmit(inputTupleList,collector); } }
else if(frequencyChkOp.equals(\ {
if(valueToCheck == Double.parseDouble(thresholdValue.toString())) {
count.incrementAndGet(); if(count.get() > frequency)
splitAndEmit(inputTupleList,collector); } }
else if(frequencyChkOp.equals(\ { . . .
} } } else
splitAndEmit(null,collector); } else {
System.err.println(\ splitAndEmit(null,collector); } }
经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。
DBWriterBolt
经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology
调用的第一个方法。方法的编码如Listing Six所示。
Listing Six:建表编码。
public void prepare( Map StormConf, TopologyContext context ) { try {
Class.forName(dbClass); }
catch (ClassNotFoundException e) {
System.out.println(\ e.printStackTrace(); }
try {
connection driverManager.getConnection(
\ connection.prepareStatement(\
StringBuilder createQuery = new StringBuilder(
\ for(Field fields : tupleInfo.getFieldList()) {
if(fields.getColumnType().equalsIgnoreCase(\
createQuery.append(fields.getColumnName()+\ARCHAR(500),\ else
createQuery.append(fields.getColumnName()+\\ }
createQuery.append(\
connection.prepareStatement(createQuery.toString()).execute();
// Insert Query
StringBuilder insertQuery = new StringBuilder(\ String tempCreateQuery = new String(); for(Field fields : tupleInfo.getFieldList()) {
insertQuery.append(fields.getColumnName()+\ }
insertQuery.append(\ for(Field fiewww.tt951.comlds : tupleInfo.getFieldList())
{
insertQuery.append(\ }
insertQuery.append(\
prepStatement = connection.prepareStatement(insertQuery.toString()); }
catch (SQLException e) {
e.printStackTrace(); } }
数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。
Listing Seven:数据插入的代码部分。
public void execute(Tuple tuple, BasicOutputCollector collector) {
batchExecuted=false; if(tuple!=null) {
List<Object> inputTupleList = (List<Object>) tuple.getValues(); int dbIndex=0;
for(int i=0;i<tupleInfo.getFieldList().size();i++) {
Field field = tupleInfo.getFieldList().get(i); try {
dbIndex = i+1;
if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setString(dbIndex, inputTupleList.get(i).toString()); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setInt(dbIndex,
Integer.parseInt(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setLong(dbIndex,
Long.parseLong(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setFloat(dbIndex,
Float.parseFloat(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setDouble(dbIndex,
Double.parseDouble(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\
prepStatement.setShort(dbIndex,
Short.parseShort(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setBoolean(dbIndex,
Boolean.parseBoolean(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setByte(dbIndex,
Byte.parseByte(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ {
Date dateToAdd=null;
if (!(inputTupleList.get(i) instanceof Date)) {
DateFormat df = new SimpleDateFormat(\ try {
dateToAdd = df.parse(inputTupleList.get(i).toString()); }
catch (ParseException e) {
System.err.println(\ } } else {
dateToAdd = (Date)inputTupleList.get(i);
java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime()); prepStatement.setDate(dbIndex, sqlDate); } }
catch (SQLException e) {
e.printStackTrace(); } }
Date now = new Date(); try {
prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime())); prepStatement.addBatch(); counter.incrementAndGet(); if (counter.get()== batchSize) executeBatch(); }