> For the complete documentation index, see [llms.txt](https://tbds-book.gitbook.io/tbds/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://tbds-book.gitbook.io/tbds/shu-ju-jie-ru/tdbank_intro/tdbank_conf.md).

# TDBank采集接口详解

## DataHub配置说明

### common.properties配置说明

| **配置项**               | **配置说明**                                          | **默认值** |   |
| --------------------- | ------------------------------------------------- | ------- | - |
| data.hub.manager.ip   | TBDS集群中DataHubServer的IP。agent会定期心跳给DataHubServer。 |         |   |
| data.hub.manager.port | TBDS集群中DataHubServer的heartbeat.port。              |         |   |
| data.hub.agent.name   | agent名称。可根据业务随意配置。                                |         |   |
| data.hub.name         | 新建接入时，配置的接入名称。标识该agent是属于哪个接入。一个agent对应一个接入。      |         |   |
| data.hub.source       | agent所属接入的源数据库类型，可配置为SQLServer或者MySQL或者Oracle等。   |         |   |
| data.hub.target       | agent所属接入的流向目标，可配置为HDFS或者DB。                      |         |   |

注：红色配置项为必填项

## Flume配置说明

### log4j.properties配置说明

增加以下配置，用于生成指标日志文件，供TMetric agent采集：

```
log4j.logger.metrics=INFO, METRICS
log4j.appender.METRICS=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.METRICS.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.METRICS.rollingPolicy.ActiveFileName=${flume.log.dir}/metrics.log
log4j.appender.METRICS.rollingPolicy.FileNamePattern=${flume.log.dir}/metrics.log.%d{yyyy-MM-dd}
log4j.appender.METRICS.layout=org.apache.log4j.PatternLayout
log4j.appender.METRICS.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
```

### flume.conf配置说明

#### 全量数据采集

需配置HippoChannel与SQLSource。

HippoChannel配置说明：

| **配置项**                                       | **配置说明**                                                                                     | **默认值** |
| --------------------------------------------- | -------------------------------------------------------------------------------------------- | ------- |
| type                                          | Flume agent的channel类型，必须配置为`com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel` |         |
| hippo.topic                                   | Hippo topic。接入配置时选择的topic。                                                                   |         |
| hippo.controllerIpList                        | Hippo controller的IP列表。                                                                       |         |
| hippo.producerGroup                           | Hippo生产组名，可根据业务配置。                                                                           |         |
| hippo.producer.userName                       | 权限相关。生产者用户名。可在topic详情页面获得。                                                                   | admin   |
| hippo.producer.secretId                       | 权限相关。生产者secretId。可在topic详情页面获得。                                                              |         |
| hippo.producer.secretKey                      | 权限相关。生产者secretKey。可在topic详情页面获得。                                                             |         |
| hippo.producer.sendAsync.fqueue.path          | 消息发送失败时的暂存目录。                                                                                |         |
| hippo.producer.sendAsync.enabled              | Hippo消息使用异步发送还是同步发送。机器配置较差时建议使用同步发送。                                                         | true    |
| hippo.producer.sendAsync.timeout              | 消息发送超时时间。毫秒为单位。默认20秒超时。                                                                      | 20000   |
| hippo.producer.sendAsync.rateLimit            | 消息发送限流。默认值为1000，表示每秒最多发送1000条消息。                                                             | 1000    |
| hippo.producer.sendAsync.fqueue.poll.interval | 超时重传间隔的毫秒数。默认一分钟重传一次。                                                                        | 60000   |
| hippo.lfReplace.enabled                       | 是否替换消息中的换行符。注意，如果数据中存在换行符而又不进行替换的话，则流向HDFS的文件里也会换行，导入Hive就会有问题。                              | true    |
| hippo.lfReplace.replacement                   | 使用什么字符串来替换换行符。                                                                               | \\\n    |

SQLSource配置说明：

| **配置项**                              | **配置说明**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    | **默认值** |   |
| ------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | - |
| type                                 | Flume agent的source类型，必须配置为`com.tencent.tbds.tdbank.flume.source.sql.SQLSource`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |         |   |
| hibernate.connection.url             | JDBC连接URL。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |         |   |
| hibernate.connection.user            | JDBC连接用户名。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |         |   |
| hibernate.connection.password        | JDBC连接用户密码。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |         |   |
| hibernate.connection.isolation       | JDBC连接事务隔离级别。设置为2，即READ COMMITTED级别。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |         |   |
| hibernate.dialect                    | <p>根据不同数据库配置dialect。参考<a href="https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects。"><https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects。></a><br>注意，采集SQLServer数据时，根据对应版本配置为：<br>com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServerCustomDialect <br> com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2005CustomDialect <br> com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2008CustomDialect <br> com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2012CustomDialect</p> |         |   |
| hibernate.connection.driver\_class   | JDBC连接需要的驱动类名。需要把驱动jar包放到flume的lib目录下。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |         |   |
| dbType                               | 如果通过JDBC URL无法解析出数据库类型，则需手动配置。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |         |   |
| dbHost                               | 如果通过JDBC URL无法解析出数据库主机，则需手动配置。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |         |   |
| database                             | 要采集数据的源库。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |         |   |
| tables                               | 要采集数据的源表。可配置多个表，逗号分隔。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |         |   |
| tables.$表名.customQuery               | 完整的自定义SQL。可支持多表关联查询。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |         |   |
| tables.$表名.whereClause               | <p>WHERE子句。如果设置了column.name则WHERE子句中必须包含<code>:index</code>，该字符串表示当前采集到的该字段的最大值。<br>另外可以使用<code>:bizdate</code>，表示当前日期（不包括时间，例如2018-01-16）。</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                             |         |   |
| tables.$表名.startFrom                 | <p>如果设置了column.name则为该字段最小值（不包括）。如果是时间戳类型字段，格式必须为<code>yyyy-MM-dd HH:mm:ss.SSS</code>，否则必须为整数。<br>如果未设置column.name则为采集的起始偏移，必须为大于等于0的整数。<br>注意，如果配置了startFrom，则每次拉起cron任务时，都会将采集偏移重置回startFrom。</p>                                                                                                                                                                                                                                                                                                                                                                                                         |         |   |
| tables.$表名.column.name               | 通过某个字段来采集数据。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |         |   |
| tables.$表名.column.isTimestamp        | true或者false。该字段是否为时间戳类型。默认为false。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |         |   |
| cronExpression                       | <p>cron表达式，参考<a href="http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html。"><http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html。></a><br>全量数据采集将根据cron表达式指定的时间点来执行。如果recoveryMode为false则cronExpression必须配置。</p>                                                                                                                                                                                                                                                                                                                            |         |   |
| recoveryMode                         | 可配置为true。若为true，则表示数据采集会立即执行，采集完成后agent进程自动退出。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              | false   |   |
| output.delimiter                     | <p>字段分隔符，需要与接入配置一致，默认为竖线。<br>注意，若使用特殊字符，如\u0001或者\t，则此处应配置为\u0001或者\t，即应加上转义</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |         |   |
| query.pageSize                       | 分页查询大小。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     | 10000   |   |
| query.numThreads                     | 全量数据采集线程池大小。一张表占用一个线程。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | 10      |   |
| query.timeout                        | SQL查询超时设置，以秒为单位。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            | 60      |   |
| status.filePath                      | 用于存放当前数据采集进度的目录。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |         |   |
| hibernate.connection.provider\_class | 设置为`org.hibernate.connection.C3P0ConnectionProvider`可使用C3P0连接池。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |         |   |
| hibernate.c3p0.min\_size             | C3P0连接池最小值。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |         |   |
| hibernate.c3p0.max\_size             | C3P0连接池最大值。                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |         |   |

配置示例：

```
agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### 全量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2

## 增加以下配置可使用索引字段（假设为id）提升查询性能
## 另外也可以使用自定义SQL
## agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
## agent1.sources.sqlSource.tables.tbl_poc1.column.name = id

agent1.sources.sqlSource.recoveryMode = true
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
```

#### 增量数据采集

增量数据采集同样需要配置source与channel。channel与全量一致，使用HippoChannel。根据使用OGG或者使用SQLSource需要配置不同的source。

1\) 若使用OGG，则source需要AvroSource，配置如下：

```
agent1.sources.src1.type = avro
agent1.sources.src1.bind = 0.0.0.0
agent1.sources.src1.port = 41414
agent1.sources.src1.channels = hippoChannel
```

2\) 若使用SQLSource，可实现准实时增量数据采集，也可实现T+1增量数据采集：

2.1) 准实时增量数据采集，配置示例如下，

```
agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### 准实时增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2

## 每5分钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0/5 * * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

## 注意，如果没有设置startFrom，首次采集实际上是会执行全量采集的
## 配置startFrom有副作用，见配置说明
agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
agent1.sources.sqlSource.tables.tbl_poc1.column.name = id

agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
```

2.2) T+1增量数据采集，配置示例如下，

```
agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### T+1增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc2

## 每天凌晨1点钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0 1 * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

## 注意，如果没有设置startFrom，首次采集实际上是会执行全量采集的
## 配置startFrom有副作用，见配置说明
agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index AND modify_time < :bizdate ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
```

## TMetric配置说明

### agent.ini配置说明

只需修改以下配置：

```
; agent主机地址，使用本机ip
hostName=10.0.0.2
; agent监听的rpc端口，必填项
port=8003
; TMetric master的IP与配置的端口
masterAddressList=tbds-10-0-0-7:9000
; 修改为本地地址
bdbFilePath=/data/tmetric/bdb
metricSpliter=#
; 修改为本地地址，配置为flume agent指标日志文件。需保证TMetric agent对metrics.log有可读可执行权限
metricFiles=/data/flume/logs/metrics.log
; 修改为本地地址
failoverQueuePath=/data/tmetric/failover
```

## OGG安装说明

### OGG Flume Adapter安装说明

参考[OGG-Flume-Adapter-部署文档](/tbds/shu-ju-jie-ru/tdbank_intro/ogg_flume_adapter.md)。

### OGG for MySQL/SQLServer/Oracle等安装说明

根据源库不同的类型不同的版本需要下载不同的OGG安装包，并且需要不同的配置。可根据不同环境联系我们提供安装文档，在此不做赘述。


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://tbds-book.gitbook.io/tbds/shu-ju-jie-ru/tdbank_intro/tdbank_conf.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
