@Override
public void nextTuple() {
Utils.sleep(10000);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
LOG.info("emit " + word);
_collector.emit(new Values(word));
}
3.3 实现blot
blot 通常是继承 BaseRichBolt,BaseBasicBolt 或实现 IRichBolt 接口
并重写execute 方法
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
LOG.info("get word=" + word);
_collector.emit(tuple, new Values(word + "!!!"));
_collector.ack(tuple);
}
4. 打包jar
在pom.xml 文件中添加打包插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass>com.tencent.dc.lz.ExclamationTopology</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
通过maven 打包命令,会将项目打包成一个可执行jar 。
5. 本地调试
建议用户使用IntelliJ IDEA 进行本地调试,直接运行main 函数,会在本地嵌入运行一个storm 环境,方便用户调试核心逻辑。
6. 提交topology到集群
通过后台提交topology,需要切到nimbus 节点。 上传打包好的jar,切到系统用户jstorm。
使用提交命令格式如下:
/usr/local/jstorm/bin/storm jar topology.jar main.class topologyName
例如:
/usr/local/jstorm/bin/storm jar wordCountTstormDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.tencent.dc.lz.ExclamationTopology "nihao"