EN

龙芯业务

LOONGSON BUSINESS

栏目导航

Storm

LOONGSON SOLUTION

Storm

开源生态 2019-04-15 阅读次数:

请在下面红框内插入图片,不要放其它内容

  • 软件名称
    Storm
  • 软件版本
    2.0.0
  • 源码链接
    https://github.com/apache/storm/releases
  • 软件介绍
    ApacheStorm是一个自由开源的分布式实时计算系统。Storm使得可靠地处理无边界的数据流变得容易,可以像Hadoop对批处理所做的那样进行实时处理。Storm很简单,可以与任何编程语言一起使用,使用起来很有趣! Storm有许多用例:实时分析、在线机器学习、连续计算、分布式RPC、ETL等等。Storm很快:在一个benchmark测试中它的处理速度可以达到为单节点每秒超过一百万个元组。它具有可扩展性、容错性,可以保证您的数据得到处理,并且易于设置和操作。 Storm集成了已经使用过的队列和数据库技术。storm拓扑消费数据流,并以任意复杂的方式处理这些数据流,在计算的每个阶段之间重新分配数据流。
  • 使用帮助
    Storm集群中运行的示例图如下:

    Storm的使用拓扑图

     
    我们先来明确要用Storm做什么。
    那么第一个程序,就简单的输出下信息。
    具体步骤如下:
    1. 启动topology,设置好Spout和Bolt。
    2. 将Spout获取的数据传递给Bolt。
    3. Bolt接受Spout的数据进行打印。

    Spout

    那么首先开始编写Spout类。一般是实现 IRichSpout 或继承BaseRichSpout该类,然后实现该方法。这里我们继承BaseRichSpout这个类,该类需要实现这几个主要的方法:
    一、open
    open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。
    有三个参数,它们的作用分别是:
    1. Storm配置的Map;
    2. topology中组件的信息;
    3. 发射tuple的方法;
    代码示例:
      @Override
        public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
            System.out.println("open:"+map.get("test"));
            this.collector = collector;
        }
    二、nextTuple
    nextTuple()方法是Spout实现的核心。
    也就是主要执行方法,用于输出信息,通过collector.emit方法发射。
    这里我们的数据信息已经写死了,所以这里我们就直接将数据进行发送。
    这里设置只发送两次。
    代码示例:
         @Override
        public void nextTuple() {
            if(count<=2){
                System.out.println("第"+count+"次开始发送数据...");
                this.collector.emit(new Values(message));
            }
            count++;
        }
    三、declareOutputFields
    declareOutputFields是在IComponent接口中定义,用于声明数据格式。
    即输出的一个Tuple中,包含几个字段。
    因为这里我们只发射一个,所以就指定一个。如果是多个,则用逗号隔开。
    代码示例:
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            System.out.println("定义格式...");
            declarer.declare(new Fields(field));
        }
    四、ack
    ack是在ISpout接口中定义,用于表示Tuple处理成功。
    代码示例:
        @Override
        public void ack(Object obj) {
            System.out.println("ack:"+obj);
        }
    五、fail
    fail是在ISpout接口中定义,用于表示Tuple处理失败。
    代码示例:
        @Override
        public void fail(Object obj) {
            System.out.println("失败:"+obj);
        }
    六、close
    close是在ISpout接口中定义,用于表示Topology停止。
    代码示例:
      @Override
        public void close() {
            System.out.println("关闭...");
        }
    至于还有其他的,这里就不在一一列举了。

    Bolt

    Bolt是用于处理数据的组件,主要是由execute方法来进行实现。一般来说需要实现 IRichBolt 或继承BaseRichBolt该类,然后实现其方法。
    需要实现方法如下:
    一、prepare
    在Bolt启动前执行,提供Bolt启动环境配置的入口。
    参数基本和Sqout一样。
    一般对于不可序列化的对象进行实例化。
    这里的我们就简单的打印下
        @Override
        public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
            System.out.println("prepare:"+map.get("test"));
            this.collector=collector;
        }
    :如果是可以序列化的对象,那么最好是使用构造函数。
    二、execute
    execute()方法是Bolt实现的核心。
    也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
    从tuple中获取消息可以使用 tuple.getString()tuple.getStringByField();这两个方法。个人推荐第二种,可以通过field来指定接收的消息。
    :如果继承的是IRichBolt,则需要手动ack。这里就不用了,BaseRichBolt会自动帮我们应答。
    代码示例:
        @Override
        public void execute(Tuple tuple) {//      String msg=tuple.getString(0);
            String msg=tuple.getStringByField("test");
            //这里我们就不做消息的处理,只打印
            System.out.println("Bolt第"+count+"接受的消息:"+msg); 
            count++;
            /**
             * 
             * 没次调用处理一个输入的tuple,所有的tuple都必须在一定时间内应答。
             * 可以是ack或者fail。否则,spout就会重发tuple。
             *///      collector.ack(tuple);
        }
    三、declareOutputFields
    和Spout的一样。
    因为到了这里就不再输出了,所以就什么都没写。
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {        
        }
    cleanup
    cleanup是IBolt接口中定义,用于释放bolt占用的资源。
    Storm在终止一个bolt之前会调用这个方法。
    因为这里没有什么资源需要释放,所以就简单的打印一句就行了。
    @Override
        public void cleanup() {
            System.out.println("资源释放");
        }

    Topology

    这里我们就是用main方法进行提交topology。
    不过在提交topology之前,需要进行相应的设置。
    这里我就不一一细说了,代码的注释已经很详细了。
    代码示例:
        import org.apache.storm.Config;
        import org.apache.storm.LocalCluster;
        import org.apache.storm.StormSubmitter;
        import org.apache.storm.topology.TopologyBuilder;
        
        public class App {
            
            private static final String str1="test1"; 
            private static final String str2="test2"; 
        
            public static void main(String[] args)  {
                // TODO Auto-generated method stub
                //定义一个拓扑
                TopologyBuilder builder=new TopologyBuilder();
                //设置一个Executeor(线程),默认一个
                builder.setSpout(str1, new TestSpout());
                //设置一个Executeor(线程),和一个task
                builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1);
                Config conf = new Config();
                conf.put("test", "test");
                try{
                  //运行拓扑
               if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称
                 System.out.println("远程模式");
                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
              } else{//没有参数时,本地提交
            //启动本地模式
             System.out.println("本地模式");
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("111" ,conf,  builder.createTopology() );
            Thread.sleep(10000);
        //  关闭本地集群
            cluster.shutdown();
              }
                }catch (Exception e){
                    e.printStackTrace();
                }   
            }
        }
    运行该方法,输出结果如下:
    本地模式
    定义格式...
    open:test
    1次开始发送数据...
    2次开始发送数据...
    prepare:test
    Bolt1接受的消息:这是个测试消息!
    Bolt2接受的消息:这是个测试消息!
    资源释放
    关闭...
  •  
  •