# 堆代码 duidaima.com version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
version: '3' services: databend: image: datafuselabs/databend volumes: - /Users/hanshanjie/databend/local-test/databend/databend-query.toml:/etc/databend/query.toml environment: QUERY_DEFAULT_USER: databend QUERY_DEFAULT_PASSWORD: databend MINIO_ENABLED: 'true' ports: - '8000:8000' - '9000:9000' - '3307:3307' - '8124:8124'在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:
ocker-compose up -d该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动。
git clone https://github.com/databendcloud/flink-connector-databend cd flink-connector-databend mvn clean install -DskipTests将 target/flink-connector-databend-1.16.0-SNAPSHOT.jar 拷贝到目录 flink-1.16.0/lib/ 下。
docker-compose exec mysql mysql -uroot -p1234562.创建数据库 mydb 和表 products,并插入数据:
CREATE DATABASE mydb; USE mydb; CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); ALTER TABLE products AUTO_INCREMENT = 10; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");3.Databend 中建表
CREATE TABLE bend_products (id INT NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512) );启动 Flink 集群和 Flink SQL CLI
cd flink-16.02.使用下面的命令启动 Flink 集群
./bin/start-cluster.sh3.启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
./bin/sql-client.sh在 Flink SQL CLI 中使用 Flink DDL 创建表
-- Flink SQL Flink SQL> SET execution.checkpointing.interval = 3s;然后,对于数据库中的表 products 使用 Flink SQL CLI 创建对应的表,用于同步底层数据库表的数据
-- Flink SQL Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products', 'server-time-zone' = 'UTC' );最后,创建 d_products 表, 用来订单数据写入 Databend 中
-- Flink SQL create table d_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED) with ('connector' = 'databend', 'url'='databend://localhost:8000', 'username'='databend', 'password'='databend', 'database-name'='default', 'table-name'='bend_products', 'sink.batch-size' = '5', 'sink.flush-interval' = '1000', 'sink.max-retries' = '3');使用 Flink SQL 将 products 表中的数据同步到 Databend 的 d_products 表中:
insert into d_products select * from products;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");
这些数据会立即同步到 Databend 当中。
docker-compose down在 Flink 所在目录 flink-1.16.0 下执行如下命令停止 Flink 集群:
./bin/stop-cluster.sh