Lazy loaded image
🎋Beam+Kafka同步Oracle日志到ClickHouse
00 分钟
2019-12-2
2024-12-5
type
status
date
slug
summary
tags
category
icon
password
 
用kafka的oracle connect插件存储oracle日志,用beam读取kafka数据插入到clickhouse中 ## oracle配置 ### archivelog模式 数据库必须处于archivelog模式,并且必须启用补充日志记录 在数据库服务器上执行

启用补充日志记录

kafka配置

插件配置文件

在$KAFKA_HOME/config 目录新建OracleSourceConnector.properties配置文件, 文件内示例配置
Copy of 配置说明
Name
Type
Description
String
连接器名称
String
此连接器的Java类的名称
String
数据库的标识符名称(例如Test,Dev,Prod)或用于标识数据库的特定名称该名称将用作主题和架构名称的标头
Integer
创建的最大任务数此连接器使用单个任务.
String
消息将写入的主题的名称如果设置了值,则所有消息都将写入此声明的topic,如果未设置,则将为每个数据库表动态创建一个主题
String
要连接的数据库的服务名称或sid通常使用数据库服务名称
String
Oracle数据库服务器的IP地址或主机名
Integer
Oracle数据库服务器的端口号
String
数据库的用户名
String
数据库用户密码
Integer
此配置属性设置Oracle行提取大小值
String
白名单格式为用户名.表名用逗号分隔,如果要收集用户下的所有的表,用用户名.*
Boolean
如果为true,则将捕获的sql DML语句解析为字段和值;如果为false,则仅发布sql DML语句
Boolean
如果为true,则在连接器启动时将偏移值设置为数据库的当前SCN如果为false,则连接器将从上一个偏移值开始
Long
如果设置此属性,则将偏移值设置为该指定值,并且logminer将从此SCN启动如果连接器希望从所需的SCN启动,则可以使用此属性
Boolean
如果为true,则启用多租户支持如果为false,将使用单实例配置
String
黑名单格式为用户名.表名用逗号分隔,如果要收集用户下的所有的表,用用户名.*.
String
以逗号分隔的DML操作列表(INSERT,UPDATE,DELETE)如果未指定,则将执行复制所有DML操作的默认行为,如果指定,则仅捕获指定的操作

添加kafka插件包

==kafka-connect-oracle-1.0.68.jar==和相应版本的oracle的驱动包放入到 $KAFKA_HOME/lib文件夹中

启动插件

Copy of kafka消息说明
字段
说明
数据库日志时间
数据库用户名
数据库表名
时间戳
执行的sql语句
操作
更新后的数据
更新前的数据

ClickHouse 建表

Beam 程序

maven添加 依赖包

创建配置参数类

kafka消息映射类

主程序

把beam包放在flink上运行

notion image

clickhouse中的数据

notion image
上一篇
开发备忘录
下一篇
代理-kubernetes安装本地开发环境vpn

评论
Loading...