1.数据库设置
(1)确保数据库已经开启了binlog,并且保存格式为ROW
(2)创建新账号,确保对所有的库表有读权限即可
2.下载对应的canal客户端程序
这次测试使用的是https://github.com/alibaba/canal/releases 链接下的 canal.deployer-1.1.6.tar.gz
3.设置canal的配置文件
配置文件在canal.deployer-1.1.6/conf/example/instance.properties,其中设置以下几处即可
canal.instance.master.address=xxxx:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxx
4.编写java程序
package net.generator.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalTest {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
// 获取连接,连接的是本地启动的canal
// example代表的是配置来源,对应的是下载的canal文件夹conf下面的example文件夹
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
"example", "", "");
while (true) {
// 连接
canalConnector.connect();
// 订阅数据库-筛选
//订阅全部
//canalConnector.subscribe(".*\\..*");
//订阅wf_course_v3下面的所有表
canalConnector.subscribe("wf_course_v3\\..*");
// 获取数据
//类似事务,手动确认,配合ack及rollback方法,canal会将上次读取确认的位置记录下来
//Message message = canalConnector.getWithoutAck(100);
//自动确认
Message message = canalConnector.get(100);
// 获取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
// 判断集合是否为空,如果为空,则线程等待2秒再拉取数据
if (entries.size() <= 0) {
// System.out.println("当次抓取没有数据,休息一会儿。。。");
Thread.sleep(1000);
} else {
// 遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
// 1,获取表名entries = {ArrayList@979} size = 2
String tableName = entry.getHeader().getTableName();
// 2,获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
// 3,获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
// 4.判断当前entryType类型是否为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
//5.反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//6.获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//7.获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
//8.遍历rowDataList并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforData = new JSONObject();
List<CanalEntry.Column> beforClountList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforClountList) {
beforData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterClountList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterClountList) {
afterData.put(column.getName(), column.getValue());
}
// 打印数据
System.out.println("" + tableName + ",EventType:" + eventType + ",Before:" + beforData + ",After:" + afterData);
}
} else {
System.out.println("当前操作类型为" + entryType);
}
}
}
}
}
}
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2023/05/30 06:04