flink-cdc-connectors: [Bug] Demo Cannot Run

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.17.2

Flink CDC version

2.4.2

Database and its version

mysql: 8.0.35

Minimal reproduce step

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.ververica</groupId>
	<artifactId>FlinkCDCTest</artifactId>
	<version>1.0-SNAPSHOT</version>

	<properties>
		<maven.compiler.source>8</maven.compiler.source>
		<maven.compiler.target>8</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<scala.binary.version>2.12</scala.binary.version>
		<maven.compiler.source>${java.version}</maven.compiler.source>
		<maven.compiler.target>${java.version}</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
		<flink.forkCount>1</flink.forkCount>
		<flink.reuseForks>true</flink.reuseForks>

		<!-- dependencies versions -->
		<flink.version>1.17.2</flink.version>
		<slf4j.version>1.7.15</slf4j.version>
		<log4j.version>2.17.1</log4j.version>
		<debezium.version>1.9.7.Final</debezium.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-core</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
		<!--  Use flink shaded guava  18.0-13.0 for flink 1.13   -->
		<!--  Use flink shaded guava  30.1.1-jre-14.0 for flink-1.14  -->
		<!--  Use flink shaded guava  30.1.1-jre-15.0 for flink-1.15  -->
		<!--  Use flink shaded guava  30.1.1-jre-15.0 for flink-1.16  -->
		<!--  Use flink shaded guava  30.1.1-jre-16.1 for flink-1.17  -->
		<!--  Use flink shaded guava  31.1-jre-17.0   for flink-1.18  -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-shaded-guava</artifactId>
			<version>30.1.1-jre-16.1</version>
		</dependency>
		<dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<version>2.4.2</version>
		</dependency>
		<dependency>
			<groupId>io.debezium</groupId>
			<artifactId>debezium-connector-mysql</artifactId>
			<version>${debezium.version}</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<executions>
					<execution>
						<id>shade-flink</id>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!-- Shading test jar have bug in some previous version, so close this configuration here,
                            see https://issues.apache.org/jira/browse/MSHADE-284 -->
							<shadeTestJar>false</shadeTestJar>
							<shadedArtifactAttached>false</shadedArtifactAttached>
							<createDependencyReducedPom>true</createDependencyReducedPom>
							<dependencyReducedPomLocation>
								${project.basedir}/target/dependency-reduced-pom.xml
							</dependencyReducedPomLocation>
							<filters combine.children="append">
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>module-info.class</exclude>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<artifactSet>
								<includes>
									<!-- include nothing -->
									<include>io.debezium:debezium-api</include>
									<include>io.debezium:debezium-embedded</include>
									<include>io.debezium:debezium-core</include>
									<include>io.debezium:debezium-ddl-parser</include>
									<include>io.debezium:debezium-connector-mysql</include>
									<include>com.ververica:flink-connector-debezium</include>
									<include>com.ververica:flink-connector-mysql-cdc</include>
									<include>org.antlr:antlr4-runtime</include>
									<include>org.apache.kafka:*</include>
									<include>mysql:mysql-connector-java</include>
									<include>com.zendesk:mysql-binlog-connector-java</include>
									<include>com.fasterxml.*:*</include>
									<include>com.google.guava:*</include>
									<include>com.esri.geometry:esri-geometry-api</include>
									<include>com.zaxxer:HikariCP</include>
									<!--  Include fixed version 30.1.1-jre-16.0 of flink shaded guava  -->
									<include>org.apache.flink:flink-shaded-guava</include>
								</includes>
							</artifactSet>
							<relocations>
								<relocation>
									<pattern>org.apache.kafka</pattern>
									<shadedPattern>
										com.ververica.cdc.connectors.shaded.org.apache.kafka
									</shadedPattern>
								</relocation>
								<relocation>
									<pattern>org.antlr</pattern>
									<shadedPattern>
										com.ververica.cdc.connectors.shaded.org.antlr
									</shadedPattern>
								</relocation>
								<relocation>
									<pattern>com.fasterxml</pattern>
									<shadedPattern>
										com.ververica.cdc.connectors.shaded.com.fasterxml
									</shadedPattern>
								</relocation>
								<relocation>
									<pattern>com.google</pattern>
									<shadedPattern>
										com.ververica.cdc.connectors.shaded.com.google
									</shadedPattern>
								</relocation>
								<relocation>
									<pattern>com.esri.geometry</pattern>
									<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
								</relocation>
								<relocation>
									<pattern>com.zaxxer</pattern>
									<shadedPattern>
										com.ververica.cdc.connectors.shaded.com.zaxxer
									</shadedPattern>
								</relocation>
							</relocations>
						</configuration>
					</execution>
				</executions>
			</plugin>

		</plugins>
	</build>

</project>

CdcTest.java:

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class CdcTest {
	public static void main(String[] args) throws Exception {
		MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
				.hostname("")
				.port()
				.databaseList("") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
				.tableList("") // 设置捕获的表
				.username("")
				.password("")
				.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
				.build();

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 设置 3s 的 checkpoint 间隔
		env.enableCheckpointing(3000);

		env
				.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
				// 设置 source 节点的并行度为 1
				.setParallelism(1)
				.print().setParallelism(1); // 设置 sink 节点并行度为 1

		env.execute("Print MySQL Snapshot + Binlog");
	}
}

What did you expect to see?

when i see the document in datastream-api-package-guidance the demo doesn’t seem work successful

What did you see instead?

it not work because it throw NoClassDefFoundError

the error is:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
	at com.zigaai.CdcTest.main(CdcTest.java:11)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 1 more
Disconnected from the target VM, address: '127.0.0.1:4693', transport: 'socket'

build in java version: 1.8.0_342 21.0.1

Anything else?

No response

Are you willing to submit a PR?

  • I’m willing to submit a PR!

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 22 (9 by maintainers)

Most upvoted comments

The exception looks like you missed connector-base dependency, Flink bundle connector-base in flink dist jar.

I probably know what the problem is now, thank you for your help!