使用Storm实现实时大数据分析(4)

2019-02-15 12:11

catch (SQLException e1) {

e1.printStackTrace(); } } else {

long curTime = System.currentTimeMillis();

long diffInSeconds = (curTime-startTime)/(60*1000);

if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds) {

try {

executeBatch();

startTime = System.currentTimeMillis(); }

catch (SQLException e) { e.printStackTrace(); } } } }

public void executeBatch() throws SQLException {

batchExecuted=true;

prepStatement.executeBatch(); counter = new AtomicInteger(0); }

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

通过TopologyBuilder建立topology。

使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。 提交topology。

Listing Eight:建立和执行topology。

public class StormMain {

public static void main(String[] args) throws AlreadyAliveException,

InvalidTopologyException, InterruptedException

{

ParallelFileSpout parallelFileSpout = new ParallelFileSpout(); ThresholdBolt thresholdBolt = new ThresholdBolt(); DBWriterBolt dbWriterBolt = new DBWriterBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(\

builder.setBolt(\

builder.setBolt(\ if(this.argsMain!=nulnc630.coml && this.argsMain.length > 0) {

conf.setNumWorkers(1);

StormSubmitter.submitTopology(

this.argsMain[0], conf, builder.createTopology()); } else {

Config conf = new Config(); conf.setDebug(true);

conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster(); cluster.submitTopology(

\ } } }

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。


使用Storm实现实时大数据分析(4).doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

下一篇:新人音版第十册教案(完整版)

相关阅读
本类排行
× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: