DataHub配置说明
common.properties配置说明
TBDS集群中DataHubServer的IP。agent会定期心跳给DataHubServer。
TBDS集群中DataHubServer的heartbeat.port。
新建接入时,配置的接入名称。标识该agent是属于哪个接入。一个agent对应一个接入。
agent所属接入的源数据库类型,可配置为SQLServer或者MySQL或者Oracle等。
agent所属接入的流向目标,可配置为HDFS或者DB。
注:红色配置项为必填项
Flume配置说明
log4j.properties配置说明
增加以下配置,用于生成指标日志文件,供TMetric agent采集:
log4j.logger.metrics=INFO, METRICS
log4j.appender.METRICS=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.METRICS.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.METRICS.rollingPolicy.ActiveFileName=${flume.log.dir}/metrics.log
log4j.appender.METRICS.rollingPolicy.FileNamePattern=${flume.log.dir}/metrics.log.%d{yyyy-MM-dd}
log4j.appender.METRICS.layout=org.apache.log4j.PatternLayout
log4j.appender.METRICS.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
flume.conf配置说明
全量数据采集
需配置HippoChannel与SQLSource。
HippoChannel配置说明:
Flume agent的channel类型,必须配置为com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel
Hippo topic。接入配置时选择的topic。
权限相关。生产者用户名。可在topic详情页面获得。
权限相关。生产者secretId。可在topic详情页面获得。
权限相关。生产者secretKey。可在topic详情页面获得。
hippo.producer.sendAsync.fqueue.path
hippo.producer.sendAsync.enabled
Hippo消息使用异步发送还是同步发送。机器配置较差时建议使用同步发送。
hippo.producer.sendAsync.timeout
hippo.producer.sendAsync.rateLimit
消息发送限流。默认值为1000,表示每秒最多发送1000条消息。
hippo.producer.sendAsync.fqueue.poll.interval
是否替换消息中的换行符。注意,如果数据中存在换行符而又不进行替换的话,则流向HDFS的文件里也会换行,导入Hive就会有问题。
hippo.lfReplace.replacement
SQLSource配置说明:
Flume agent的source类型,必须配置为com.tencent.tbds.tdbank.flume.source.sql.SQLSource
hibernate.connection.user
hibernate.connection.password
hibernate.connection.isolation
JDBC连接事务隔离级别。设置为2,即READ COMMITTED级别。
hibernate.connection.driver_class
JDBC连接需要的驱动类名。需要把驱动jar包放到flume的lib目录下。
如果通过JDBC URL无法解析出数据库类型,则需手动配置。
如果通过JDBC URL无法解析出数据库主机,则需手动配置。
WHERE子句。如果设置了column.name则WHERE子句中必须包含:index
,该字符串表示当前采集到的该字段的最大值。
另外可以使用:bizdate
,表示当前日期(不包括时间,例如2018-01-16)。
如果设置了column.name则为该字段最小值(不包括)。如果是时间戳类型字段,格式必须为yyyy-MM-dd HH:mm:ss.SSS
,否则必须为整数。
如果未设置column.name则为采集的起始偏移,必须为大于等于0的整数。
注意,如果配置了startFrom,则每次拉起cron任务时,都会将采集偏移重置回startFrom。
tables.$表名.column.isTimestamp
true或者false。该字段是否为时间戳类型。默认为false。
可配置为true。若为true,则表示数据采集会立即执行,采集完成后agent进程自动退出。
字段分隔符,需要与接入配置一致,默认为竖线。
注意,若使用特殊字符,如\u0001或者\t,则此处应配置为\\u0001或者\\t,即应加上转义
hibernate.connection.provider_class
设置为org.hibernate.connection.C3P0ConnectionProvider
可使用C3P0连接池。
配置示例:
agent1.sources = sqlSource
agent1.channels = ch1
##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.HippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue
##################### 全量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2
## 增加以下配置可使用索引字段(假设为id)提升查询性能
## 另外也可以使用自定义SQL
## agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
## agent1.sources.sqlSource.tables.tbl_poc1.column.name = id
agent1.sources.sqlSource.recoveryMode = true
agent1.sources.sqlSource.status.filePath = /data/sqlStatus
agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
增量数据采集
增量数据采集同样需要配置source与channel。channel与全量一致,使用HippoChannel。根据使用OGG或者使用SQLSource需要配置不同的source。
1) 若使用OGG,则source需要AvroSource,配置如下:
agent1.sources.src1.type = avro
agent1.sources.src1.bind = 0.0.0.0
agent1.sources.src1.port = 41414
agent1.sources.src1.channels = hippoChannel
2) 若使用SQLSource,可实现准实时增量数据采集,也可实现T+1增量数据采集:
2.1) 准实时增量数据采集,配置示例如下,
agent1.sources = sqlSource
agent1.channels = ch1
##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue
##################### 准实时增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc1, tbl_poc2
## 每5分钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0/5 * * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus
## 注意,如果没有设置startFrom,首次采集实际上是会执行全量采集的
## 配置startFrom有副作用,见配置说明
agent1.sources.sqlSource.tables.tbl_poc1.whereClause = WHERE id > :index ORDER BY id
agent1.sources.sqlSource.tables.tbl_poc1.column.name = id
agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true
agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
2.2) T+1增量数据采集,配置示例如下,
agent1.sources = sqlSource
agent1.channels = ch1
##################### HippoChannel #####################
agent1.channels.ch1.type = com.tencent.tbds.datahub.agent.flume.channel.hippo.H
ippoChannel
agent1.channels.ch1.hippo.topic = topic_poc
agent1.channels.ch1.hippo.controllerIpList = 10.0.0.10:8066
agent1.channels.ch1.hippo.producerGroup = group_poc
agent1.channels.ch1.hippo.producer.secretId = WJbcIYI6Kkj6OH6Uy2QMufDVvewFrKCzD3hi
agent1.channels.ch1.hippo.producer.secretKey = AHf3V1fqZt4cN6wPO6cdk0M0kMQnuHer
agent1.channels.ch1.hippo.producer.sendAsync.fqueue.path = /data/fqueue
##################### T+1增量数据采集 #####################
agent1.sources.sqlSource.channels = ch1
agent1.sources.sqlSource.type = com.tencent.tbds.tdbank.flume.source.sql.SQLSource
agent1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://10.0.0.1:3306/test
agent1.sources.sqlSource.hibernate.connection.user = tbds
agent1.sources.sqlSource.hibernate.connection.password = tbds
agent1.sources.sqlSource.hibernate.connection.isolation = 2
agent1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent1.sources.sqlSource.database = db_poc
agent1.sources.sqlSource.tables = tbl_poc2
## 每天凌晨1点钟采集一次数据
agent1.sources.sqlSource.cronExpression = 0 0 1 * * ?
agent1.sources.sqlSource.status.filePath = /data/sqlStatus
## 注意,如果没有设置startFrom,首次采集实际上是会执行全量采集的
## 配置startFrom有副作用,见配置说明
agent1.sources.sqlSource.tables.tbl_poc2.whereClause = WHERE modify_time > :index AND modify_time < :bizdate ORDER BY modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.name = modify_time
agent1.sources.sqlSource.tables.tbl_poc2.column.isTimestamp = true
agent1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent1.sources.sqlSource.hibernate.c3p0.min_size=1
agent1.sources.sqlSource.hibernate.c3p0.max_size=10
TMetric配置说明
agent.ini配置说明
只需修改以下配置:
; agent主机地址,使用本机ip
hostName=10.0.0.2
; agent监听的rpc端口,必填项
port=8003
; TMetric master的IP与配置的端口
masterAddressList=tbds-10-0-0-7:9000
; 修改为本地地址
bdbFilePath=/data/tmetric/bdb
metricSpliter=#
; 修改为本地地址,配置为flume agent指标日志文件。需保证TMetric agent对metrics.log有可读可执行权限
metricFiles=/data/flume/logs/metrics.log
; 修改为本地地址
failoverQueuePath=/data/tmetric/failover
OGG安装说明
OGG Flume Adapter安装说明
参考OGG-Flume-Adapter-部署文档。
OGG for MySQL/SQLServer/Oracle等安装说明
根据源库不同的类型不同的版本需要下载不同的OGG安装包,并且需要不同的配置。可根据不同环境联系我们提供安装文档,在此不做赘述。