TDBank采集接口详解

DataHub配置说明

common.properties配置说明

配置项

配置说明

默认值

data.hub.manager.ip

TBDS集群中DataHubServer的IP。agent会定期心跳给DataHubServer。

data.hub.manager.port

TBDS集群中DataHubServer的heartbeat.port。

data.hub.agent.name

agent名称。可根据业务随意配置。

data.hub.name

新建接入时,配置的接入名称。标识该agent是属于哪个接入。一个agent对应一个接入。

data.hub.source

agent所属接入的源数据库类型,可配置为SQLServer或者MySQL或者Oracle等。

data.hub.target

agent所属接入的流向目标,可配置为HDFS或者DB。

注:红色配置项为必填项

Flume配置说明

log4j.properties配置说明

增加以下配置,用于生成指标日志文件,供TMetric agent采集:

log4j.logger.metrics=INFO, METRICS
log4j.appender.METRICS=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.METRICS.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.METRICS.rollingPolicy.ActiveFileName=${flume.log.dir}/metrics.log
log4j.appender.METRICS.rollingPolicy.FileNamePattern=${flume.log.dir}/metrics.log.%d{yyyy-MM-dd}
log4j.appender.METRICS.layout=org.apache.log4j.PatternLayout
log4j.appender.METRICS.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n

flume.conf配置说明

全量数据采集

需配置HippoChannel与SQLSource。

HippoChannel配置说明:

配置项

配置说明

默认值

type

Flume agent的channel类型,必须配置为com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel

hippo.topic

Hippo topic。接入配置时选择的topic。

hippo.controllerIpList

Hippo controller的IP列表。

hippo.producerGroup

Hippo生产组名,可根据业务配置。

hippo.producer.userName

权限相关。生产者用户名。可在topic详情页面获得。

admin

hippo.producer.secretId

权限相关。生产者secretId。可在topic详情页面获得。

hippo.producer.secretKey

权限相关。生产者secretKey。可在topic详情页面获得。

hippo.producer.sendAsync.fqueue.path

消息发送失败时的暂存目录。

hippo.producer.sendAsync.enabled

Hippo消息使用异步发送还是同步发送。机器配置较差时建议使用同步发送。

true

hippo.producer.sendAsync.timeout

消息发送超时时间。毫秒为单位。默认20秒超时。

20000

hippo.producer.sendAsync.rateLimit

消息发送限流。默认值为1000,表示每秒最多发送1000条消息。

1000

hippo.producer.sendAsync.fqueue.poll.interval

超时重传间隔的毫秒数。默认一分钟重传一次。

60000

hippo.lfReplace.enabled

是否替换消息中的换行符。注意,如果数据中存在换行符而又不进行替换的话,则流向HDFS的文件里也会换行,导入Hive就会有问题。

true

hippo.lfReplace.replacement

使用什么字符串来替换换行符。

\\n

SQLSource配置说明:

配置项

配置说明

默认值

type

Flume agent的source类型,必须配置为com.tencent.tbds.tdbank.flume.source.sql.SQLSource

hibernate.connection.url

JDBC连接URL。

hibernate.connection.user

JDBC连接用户名。

hibernate.connection.password

JDBC连接用户密码。

hibernate.connection.isolation

JDBC连接事务隔离级别。设置为2,即READ COMMITTED级别。

hibernate.dialect

根据不同数据库配置dialect。参考https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects。 注意,采集SQLServer数据时,根据对应版本配置为: com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServerCustomDialect com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2005CustomDialect com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2008CustomDialect com.tencent.tbds.tdbank.flume.source.sql.utils.SQLServer2012CustomDialect

hibernate.connection.driver_class

JDBC连接需要的驱动类名。需要把驱动jar包放到flume的lib目录下。

dbType

如果通过JDBC URL无法解析出数据库类型,则需手动配置。

dbHost

如果通过JDBC URL无法解析出数据库主机,则需手动配置。

database

要采集数据的源库。

tables

要采集数据的源表。可配置多个表,逗号分隔。

tables.$表名.customQuery

完整的自定义SQL。可支持多表关联查询。

tables.$表名.whereClause

WHERE子句。如果设置了column.name则WHERE子句中必须包含:index,该字符串表示当前采集到的该字段的最大值。 另外可以使用:bizdate,表示当前日期(不包括时间,例如2018-01-16)。

tables.$表名.startFrom

如果设置了column.name则为该字段最小值(不包括)。如果是时间戳类型字段,格式必须为yyyy-MM-dd HH:mm:ss.SSS,否则必须为整数。 如果未设置column.name则为采集的起始偏移,必须为大于等于0的整数。 注意,如果配置了startFrom,则每次拉起cron任务时,都会将采集偏移重置回startFrom。

tables.$表名.column.name

通过某个字段来采集数据。

tables.$表名.column.isTimestamp

true或者false。该字段是否为时间戳类型。默认为false。

cronExpression

cron表达式,参考http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html。 全量数据采集将根据cron表达式指定的时间点来执行。如果recoveryMode为false则cronExpression必须配置。

recoveryMode

可配置为true。若为true,则表示数据采集会立即执行,采集完成后agent进程自动退出。

false

output.delimiter

字段分隔符,需要与接入配置一致,默认为竖线。 注意,若使用特殊字符,如\u0001或者\t,则此处应配置为\\u0001或者\\t,即应加上转义

query.pageSize

分页查询大小。

10000

query.numThreads

全量数据采集线程池大小。一张表占用一个线程。

10

query.timeout

SQL查询超时设置,以秒为单位。

60

status.filePath

用于存放当前数据采集进度的目录。

hibernate.connection.provider_class

设置为org.hibernate.connection.C3P0ConnectionProvider可使用C3P0连接池。

hibernate.c3p0.min_size

C3P0连接池最小值。

hibernate.c3p0.max_size

C3P0连接池最大值。

配置示例:

agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### 全量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2

## 增加以下配置可使用索引字段(假设为id)提升查询性能
## 另外也可以使用自定义SQL
## agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
## agent1.sources.sqlSource.tables.tbl_poc1.column.name = id

agent1.sources.sqlSource.recoveryMode = true
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10

增量数据采集

增量数据采集同样需要配置source与channel。channel与全量一致,使用HippoChannel。根据使用OGG或者使用SQLSource需要配置不同的source。

1) 若使用OGG,则source需要AvroSource,配置如下:

agent1.sources.src1.type = avro
agent1.sources.src1.bind = 0.0.0.0
agent1.sources.src1.port = 41414
agent1.sources.src1.channels = hippoChannel

2) 若使用SQLSource,可实现准实时增量数据采集,也可实现T+1增量数据采集:

2.1) 准实时增量数据采集,配置示例如下,

agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### 准实时增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2

## 每5分钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0/5 * * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

## 注意,如果没有设置startFrom,首次采集实际上是会执行全量采集的
## 配置startFrom有副作用,见配置说明
agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
agent1.sources.sqlSource.tables.tbl_poc1.column.name = id

agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10

2.2) T+1增量数据采集,配置示例如下,

agent1.sources = sqlSource
agent1.channels = ch1

##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue

##################### T+1增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc2

## 每天凌晨1点钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0 1 * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus

## 注意,如果没有设置startFrom,首次采集实际上是会执行全量采集的
## 配置startFrom有副作用,见配置说明
agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index AND modify_time < :bizdate ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true

agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10

TMetric配置说明

agent.ini配置说明

只需修改以下配置:

; agent主机地址,使用本机ip
hostName=10.0.0.2
; agent监听的rpc端口,必填项
port=8003
; TMetric master的IP与配置的端口
masterAddressList=tbds-10-0-0-7:9000
; 修改为本地地址
bdbFilePath=/data/tmetric/bdb
metricSpliter=#
; 修改为本地地址,配置为flume agent指标日志文件。需保证TMetric agent对metrics.log有可读可执行权限
metricFiles=/data/flume/logs/metrics.log
; 修改为本地地址
failoverQueuePath=/data/tmetric/failover

OGG安装说明

OGG Flume Adapter安装说明

参考OGG-Flume-Adapter-部署文档

OGG for MySQL/SQLServer/Oracle等安装说明

根据源库不同的类型不同的版本需要下载不同的OGG安装包,并且需要不同的配置。可根据不同环境联系我们提供安装文档,在此不做赘述。

Last updated