使用canal订阅阿里云数据库PolarDB

/ 后端 / 没有评论 / 573浏览

1.数据库设置

(1)确保数据库已经开启了binlog,并且保存格式为ROW
(2)创建新账号,确保对所有的库表有读权限即可

2.下载对应的canal客户端程序

这次测试使用的是https://github.com/alibaba/canal/releases 链接下的 canal.deployer-1.1.6.tar.gz

3.设置canal的配置文件

配置文件在canal.deployer-1.1.6/conf/example/instance.properties,其中设置以下几处即可

canal.instance.master.address=xxxx:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxx

4.编写java程序

package net.generator.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalTest {
    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

        // 获取连接,连接的是本地启动的canal
        // example代表的是配置来源,对应的是下载的canal文件夹conf下面的example文件夹
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),
                "example", "", "");

        while (true) {
            // 连接
            canalConnector.connect();
            // 订阅数据库-筛选
            //订阅全部
            //canalConnector.subscribe(".*\\..*");
            //订阅wf_course_v3下面的所有表
            canalConnector.subscribe("wf_course_v3\\..*");

            // 获取数据
            //类似事务,手动确认,配合ack及rollback方法,canal会将上次读取确认的位置记录下来
            //Message message = canalConnector.getWithoutAck(100);
            //自动确认
            Message message = canalConnector.get(100);

            // 获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();
            // 判断集合是否为空,如果为空,则线程等待2秒再拉取数据
            if (entries.size() <= 0) {
                // System.out.println("当次抓取没有数据,休息一会儿。。。");
                Thread.sleep(1000);
            } else {
                // 遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {
                    // 1,获取表名entries = {ArrayList@979}  size = 2
                    String tableName = entry.getHeader().getTableName();
                    // 2,获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    // 3,获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
                    // 4.判断当前entryType类型是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        //5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //6.获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        //8.遍历rowDataList并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            JSONObject beforData = new JSONObject();
                            List<CanalEntry.Column> beforClountList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforClountList) {
                                beforData.put(column.getName(), column.getValue());
                            }
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterClountList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterClountList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            // 打印数据
                            System.out.println("" + tableName + ",EventType:" + eventType + ",Before:" + beforData + ",After:" + afterData);
                        }

                    } else {
                        System.out.println("当前操作类型为" + entryType);
                    }
                }
            }
        }
    }
}