博客
关于我
Flink CDC的使用
阅读量:479 次
发布时间:2019-03-06

本文共 4602 字,大约阅读时间需要 15 分钟。

MySQL数据准备与Flink CDC实时数据捕获

MySQL数据准备

在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:

  • 创建并使用数据库
  • create database if not exists test;
    use test;
    1. 创建学生表
    2. drop table if exists stu;
      create table stu (
      id int primary key auto_increment,
      name varchar(100),
      age int
      );
      1. 插入初始数据
      2. insert into stu(name, age) values("张三", 18);
        insert into stu(name, age) values("李四", 20);
        insert into stu(name, age) values("王五", 21);

        注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。

        开启MySQL binlog

        为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。

      3. 修改MySQL配置文件
      4. sudo vim /etc/my.cnf
        1. 在配置文件中添加以下内容:
        2. server-id = 1
          log-bin=mysql-bin
          binlog_format=row
          binlog-do-db=test

          注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。

          1. 重启MySQL服务
          2. sudo systemctl restart mysqld

            Flink代码开发

            本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。

            依赖管理

          3. 添加Flink CDC依赖
          4. com.ververica
            flink-connector-mysql-cdc
            2.4.0
            1. 其他Flink依赖(如 flink-table-api-java-bridge 等)
            2. org.apache.flink
              flink-table-api-java-bridge
              ${flink.version}
              import com.ververica.cdc.connectors.mysql.source.MySqlSource;
              import com.ververica.cdc.connectors.mysql.table.StartupOptions;
              import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
              import org.apache.flink.api.common.eventtime.WatermarkStrategy;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              public class FlinkCDCDemo {
              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(4);
              MySqlSource
              mySqlSource = MySqlSource.builder()
              .hostname("node4")
              .port(3306)
              .username("root")
              .password("000000")
              .databaseList("test")
              .tableList("test.stu")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .startupOptions(StartupOptions.initial())
              .build();
              DataStreamSource
              dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source")
              .setParallelism(1);
              dataStreamSource.print();
              env.execute();
              }
              }

              注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。

              测试与验证

              添加新数据

              执行以下SQL语句:

              mysql> insert into stu(name, age) values("赵六", 23);

              输出示例

              {
              "before": null,
              "after": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831654000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2300,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "c",
              "ts_ms": 1719831654692,
              "transaction": null
              }

              修改数据

              执行以下SQL语句:

              mysql> update stu set name="zl", age=19 where name="赵六";

              输出示例

              {
              "before": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "after": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831987000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2604,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "u",
              "ts_ms": 1719831987238,
              "transaction": null
              }

              删除数据

              执行以下SQL语句:

              mysql> delete from stu where id=4;

              输出示例

              {
              "before": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "after": null,
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719832151000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2913,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "d",
              "ts_ms": 1719832151198,
              "transaction": null
              }

              注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。

    转载地址:http://iaqbz.baihongyu.com/

    你可能感兴趣的文章
    MySQL“被动”性能优化汇总
    查看>>
    MySQL、HBase 和 Elasticsearch:特点与区别详解
    查看>>
    MySQL、Redis高频面试题汇总
    查看>>
    MYSQL、SQL Server、Oracle数据库排序空值null问题及其解决办法
    查看>>
    mysql一个字段为空时使用另一个字段排序
    查看>>
    MySQL一个表A中多个字段关联了表B的ID,如何关联查询?
    查看>>
    MYSQL一直显示正在启动
    查看>>
    MySQL一站到底!华为首发MySQL进阶宝典,基础+优化+源码+架构+实战五飞
    查看>>
    MySQL万字总结!超详细!
    查看>>
    Mysql下载以及安装(新手入门,超详细)
    查看>>
    MySQL不会性能调优?看看这份清华架构师编写的MySQL性能优化手册吧
    查看>>