📖
tbds
  • 前言
  • 部署
    • 安装步骤
  • 平台管理
    • 用户管理
    • 项目管理
    • 系统设置
  • 组件访问开发
    • kafka
    • hadoop
    • hbase
    • accessKey
    • hive
    • spark
    • ftp
    • portalApi
    • hermes
    • 代码访问组件demos
      • hbase
      • hdfs
      • kafka
      • hive
  • 数据接入
    • 数据接入
      • TDBank产品化介绍及使用指引
      • TDBank数据接入指引
      • TDBank采集接口详解
      • OGG Flume Adapter 部署文档
      • DB Demo
      • HDFS Demo
    • Hippo管理
    • 进度监控
    • 血缘分析
  • 实时计算
    • 任务管理
    • 库表模板管理
  • 数据分析
    • 数据交互
    • ideX
  • 任务调度
    • 操作指引
    • 工作流
      • 基本概念
      • 任务基本信息
      • 任务调度设置
      • 任务参数配置
        • shell 脚本
        • ftp导入hdfs
        • kafka导入hbase
        • kafka导入hdfs
        • hdfs导出hbase
        • hive sql 脚本
        • hive导入hdfs
        • hdfs导出DB(mysql,postgreSQL,sql server)
        • tstorm任务
        • spark任务
        • mapreduce任务
        • db导入hdfs
        • hdfs导出hive
        • hdfs导出hive-tdsort
        • hive导出mysql
        • mysql导入hive
      • Demo
        • FTP导入HDFS
        • HDFS导出HBASE
        • HDFS导出Mysql
        • HDFS导入HIVE
        • Hive SQL脚本
        • Hive导出MySQL
        • Hive导入HDFS
        • KAFKA导出HBASE
        • KAFKA导出HDFS
        • MYSQL导入HDFS
        • MySQL导入Hive
        • Shell脚本
        • SPARK任务
      • 升级
        • 集成代码更新
        • rpm升级
      • 补充
        • 手动迁移base组件
        • 手动安装runner组件
        • 自定义任务开发文档
        • 时间隐式变量说明
        • 下发并发数说明和调整操作
        • Issues版本说明
        • 设置分组
        • 跨工作流依赖
      • 常见问题定位和解决方式
        • 常用操作
        • 实时接入任务hdfs2hive (tdsort)
        • 实例日志获取失败
        • 实例日志中提示下载文件失败
        • taskSchedule指标为空定位方法
        • 实例依赖失效确认方式
        • 任务实例诊断按钮无调度信息
        • 诊断和定位操作参考
        • 实例一直等待终止定位
        • mongodb 常见问题和处理办法
    • 任务管理
      • 工作流列表
      • 任务管理
      • 任务运行管理
      • 其他
    • 服务器配置
      • 基本概念
      • 操作指南
  • Tstorm
    • Tstorm介绍
    • 开发实例
      • wordcount
  • 数据展现
    • 自助报表
  • 数据资产
    • 库表管理
      • 可管理库表
      • 可读写库表
      • 无归属库表
      • 维表管理
      • 新建表
    • 数据血缘
    • 数据提取
      • 数据地图
      • 任务列表
      • 架构关联
  • 运维中心
    • 系统运维
      • 组件部署
      • 链接归集
      • 诊断
      • 备份
    • 访问管理
    • 文件管理
    • 监控告警
      • 监控
      • 告警
  • 机器学习
    • 系统简介
    • TDInsight功能介绍
      • 工作流
        • 新建工程
        • 新建工作流
        • 创建和配置节点
        • 运行
        • 日志查看
      • 数据输入输出
      • 组件
        • Spark组件
        • Sparkstreaming组件
        • pySpark组件
        • xgboost组件
    • 多实例并发
      • 3种方式驱动实例
      • 实例查询
      • 历史实例
    • TDInsight模型与在线推理
      • 数据流
      • 模型的训练与使用
      • 模型在线服务
    • TDInsight算法/组件说明
      • 数据说明
      • 特征工程
        • 数据预处理
        • 特征提取
        • 特征转换
        • 特征选择
        • 特征评估
      • 机器学习
        • 分类算法
        • 聚类算法
        • 回归算法
        • 推荐算法
        • 关联规则
      • 模型评估
        • Binary Evaluator
        • Multi Evaluator
        • Regression Evaluator
      • 可视化
        • 关系
        • 分布
        • 对比
        • 组合
      • 深度学习算法简介
        • 计算机视觉
        • 自然语言处理
        • 表示学习
Powered by GitBook
On this page
  • 1. 创建maven 项目
  • 2. 引入依赖
  • 3. 编写topology
  • 3.1 编写入口类
  • 3.2 实现spout
  • 3.3 实现blot
  • 4. 打包jar
  • 5. 本地调试
  • 6. 提交topology到集群
  • 7. 查看任务和查看日志
  • 8. 停止top
  • 9. 更多

Was this helpful?

  1. Tstorm
  2. 开发实例

wordcount

介绍开发一个wordcount 计算任务运行在tstorm上

1. 创建maven 项目

生成一个maven 项目。 编译的java 版本 使用1.8

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
        <source>1.8</source>
        <target>1.8</target>
        <encoding>UTF8</encoding>
    </configuration>
</plugin>

2. 引入依赖

Tstorm基于storm 0.9.6 版本,所以依赖的storm-core 建议使用0.9.6 如果本地调试模块,依赖scope 必须选用 compile。如果提交到生产环境,依赖scope 使用provided。

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.6</version>
    <scope>provided</scope>
    <!-- 如果本地调试模块 ,必须选用 compile,如果提交到生产环境使用provided-->
</dependency>

3. 编写topology

3.1 编写入口类

topology 入口就是一个普通的java 程序。

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("word", new WordSpout(), 10);
        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);// work 个数
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            // 本地提交模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            TimeUnit.SECONDS.sleep(10);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

3.2 实现spout

spout 通常继承 BaseRichSpout或者实现IRichSpout接口 并重写 nextTuple 方法

    @Override
    public void nextTuple() {
        Utils.sleep(10000);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        LOG.info("emit " + word);
        _collector.emit(new Values(word));
    }

3.3 实现blot

blot 通常是继承 BaseRichBolt,BaseBasicBolt 或实现 IRichBolt 接口 并重写execute 方法

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        LOG.info("get word=" + word);
        _collector.emit(tuple, new Values(word + "!!!"));
        _collector.ack(tuple);
    }

4. 打包jar

在pom.xml 文件中添加打包插件

  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.5.5</version>
      <configuration>
          <archive>
              <manifest>
                  <!--这里要替换成jar包main方法所在类 -->
                  <mainClass>com.tencent.dc.lz.ExclamationTopology</mainClass>
              </manifest>
          </archive>
          <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
      </configuration>
      <executions>
          <execution>
              <id>make-assembly</id> <!-- this is used for inheritance merges -->
              <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
              <goals>
                  <goal>single</goal>
              </goals>
          </execution>
      </executions>
  </plugin>

通过maven 打包命令,会将项目打包成一个可执行jar 。

5. 本地调试

建议用户使用IntelliJ IDEA 进行本地调试,直接运行main 函数,会在本地嵌入运行一个storm 环境,方便用户调试核心逻辑。

6. 提交topology到集群

通过后台提交topology,需要切到nimbus 节点。 上传打包好的jar,切到系统用户jstorm。 使用提交命令格式如下: /usr/local/jstorm/bin/storm jar topology.jar main.class topologyName 例如:

/usr/local/jstorm/bin/storm jar wordCountTstormDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.tencent.dc.lz.ExclamationTopology "nihao"

7. 查看任务和查看日志

8. 停止top

通过后台命令杀死对应的topology /usr/local/jstorm/bin/storm kill topologyName

9. 更多

更多后台操作命令使用: /usr/local/jstorm/bin/storm help 提交topology也可以通过工作流中的storm任务。

Previous开发实例Next数据展现

Last updated 4 years ago

Was this helpful?

更多开发实例参考: 更多storm 接口参考:

通过后台命令(/usr/local/jstorm/bin/storm list) 查看提交topology 状态(使用jstorm 用户)。 也可以使用storm ui 查看topology 运行状态和日志。访问地址为 ()

https://github.com/apache/storm/tree/v0.9.6/examples/storm-starter
http://storm.apache.org/releases/0.9.6/index.html
http://portalIP:8080/tstorm