使用Storm实现实时大数据分析(3)

2019-02-15 12:11

{

if(frequencyChkOp.equals(\ {

if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

splitAndEmit(inputTupleList,collector); } }

else if(frequencyChkOp.equals(\ {

if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > fr0quency)

splitAndEmit(inputTupleList,collector); } } } } else if(thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\thresholdDataType.equalsIgnoreCase(\ {

String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) {

long valueToCheck Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); long curTime = System.currentTimeMillis();

long diffInMinutes = (curTime-startTime)/(1000);

System.out.println(\ if(diffInMinutes>=timeWindow) {

if(frequencyChkOp.equals(\ {

if(valueToCheck Double.parseDouble(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

|| || || || = <

splitAndEmit(inputTupleList,collector); } }

else if(frequencyChkOp.equals(\ {

if(valueToCheck > Double.parseDouble(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

splitAndEmit(inputTupleList,collector); } }

else if(frequencyChkOp.equals(\ {

if(valueToCheck == Double.parseDouble(thresholdValue.toString())) {

count.incrementAndGet(); if(count.get() > frequency)

splitAndEmit(inputTupleList,collector); } }

else if(frequencyChkOp.equals(\ { . . .

} } } else

splitAndEmit(null,collector); } else {

System.err.println(\ splitAndEmit(null,collector); } }

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。

DBWriterBolt

经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology

调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。

public void prepare( Map StormConf, TopologyContext context ) { try {

Class.forName(dbClass); }

catch (ClassNotFoundException e) {

System.out.println(\ e.printStackTrace(); }

try {

connection driverManager.getConnection(

\ connection.prepareStatement(\

StringBuilder createQuery = new StringBuilder(

\ for(Field fields : tupleInfo.getFieldList()) {

if(fields.getColumnType().equalsIgnoreCase(\

createQuery.append(fields.getColumnName()+\ARCHAR(500),\ else

createQuery.append(fields.getColumnName()+\\ }

createQuery.append(\

connection.prepareStatement(createQuery.toString()).execute();

// Insert Query

StringBuilder insertQuery = new StringBuilder(\ String tempCreateQuery = new String(); for(Field fields : tupleInfo.getFieldList()) {

insertQuery.append(fields.getColumnName()+\ }

insertQuery.append(\ for(Field fiewww.tt951.comlds : tupleInfo.getFieldList())

{

insertQuery.append(\ }

insertQuery.append(\

prepStatement = connection.prepareStatement(insertQuery.toString()); }

catch (SQLException e) {

e.printStackTrace(); } }

数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。

Listing Seven:数据插入的代码部分。

public void execute(Tuple tuple, BasicOutputCollector collector) {

batchExecuted=false; if(tuple!=null) {

List<Object> inputTupleList = (List<Object>) tuple.getValues(); int dbIndex=0;

for(int i=0;i<tupleInfo.getFieldList().size();i++) {

Field field = tupleInfo.getFieldList().get(i); try {

dbIndex = i+1;

if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setString(dbIndex, inputTupleList.get(i).toString()); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setInt(dbIndex,

Integer.parseInt(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setLong(dbIndex,

Long.parseLong(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setFloat(dbIndex,

Float.parseFloat(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setDouble(dbIndex,

Double.parseDouble(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\

prepStatement.setShort(dbIndex,

Short.parseShort(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setBoolean(dbIndex,

Boolean.parseBoolean(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ prepStatement.setByte(dbIndex,

Byte.parseByte(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase(\ {

Date dateToAdd=null;

if (!(inputTupleList.get(i) instanceof Date)) {

DateFormat df = new SimpleDateFormat(\ try {

dateToAdd = df.parse(inputTupleList.get(i).toString()); }

catch (ParseException e) {

System.err.println(\ } } else {

dateToAdd = (Date)inputTupleList.get(i);

java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime()); prepStatement.setDate(dbIndex, sqlDate); } }

catch (SQLException e) {

e.printStackTrace(); } }

Date now = new Date(); try {

prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime())); prepStatement.addBatch(); counter.incrementAndGet(); if (counter.get()== batchSize) executeBatch(); }


使用Storm实现实时大数据分析(3).doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

下一篇:新人音版第十册教案(完整版)

相关阅读
本类排行
× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: