catch (SQLException e1) {
e1.printStackTrace(); } } else {
long curTime = System.currentTimeMillis();
long diffInSeconds = (curTime-startTime)/(60*1000);
if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds) {
try {
executeBatch();
startTime = System.currentTimeMillis(); }
catch (SQLException e) { e.printStackTrace(); } } } }
public void executeBatch() throws SQLException {
batchExecuted=true;
prepStatement.executeBatch(); counter = new AtomicInteger(0); }
一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。
在本地集群上运行和测试topology
通过TopologyBuilder建立topology。
使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。 提交topology。
Listing Eight:建立和执行topology。
public class StormMain {
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, InterruptedException
{
ParallelFileSpout parallelFileSpout = new ParallelFileSpout(); ThresholdBolt thresholdBolt = new ThresholdBolt(); DBWriterBolt dbWriterBolt = new DBWriterBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(\
builder.setBolt(\
builder.setBolt(\ if(this.argsMain!=nulnc630.coml && this.argsMain.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(
this.argsMain[0], conf, builder.createTopology()); } else {
Config conf = new Config(); conf.setDebug(true);
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster(); cluster.submitTopology(
\ } } }
topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。
这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。