@@ -416,12 +427,6 @@
org.apache.maven.plugins
maven-surefire-plugin
-
-
- listener
- org.apache.amoro.listener.AmoroRunListener
-
-
-verbose:class
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
index dbf8238e77..79d03b98d2 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java
@@ -23,12 +23,19 @@
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.MockAmoroManagementServer;
+import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestAms;
+import org.apache.amoro.UnifiedCatalog;
+import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.catalog.CatalogTestHelper;
-import org.apache.amoro.catalog.TableTestBase;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import org.apache.amoro.flink.write.MixedFormatRowDataTaskWriterFactory;
import org.apache.amoro.io.reader.GenericKeyedDataReader;
+import org.apache.amoro.mixed.CatalogLoader;
+import org.apache.amoro.mixed.MixedFormatCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.scan.CombinedScanTask;
import org.apache.amoro.scan.KeyedTableScanTask;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
@@ -37,6 +44,14 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableBuilder;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.CatalogUtil;
+import org.apache.amoro.utils.MixedTableUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
@@ -67,13 +82,15 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
@@ -84,14 +101,24 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
-public class FlinkTestBase extends TableTestBase {
+/**
+ * JUnit 5 base class for Flink integration tests in this module.
+ *
+ * This class intentionally no longer extends {@code TableTestBase}/{@code CatalogTestBase}
+ * (which remain on JUnit 4 until the closing PR of the umbrella migration). The catalog/table
+ * lifecycle is re-implemented here against the same {@link CatalogTestHelper}/{@link
+ * TableTestHelper} contracts so that children can stay clean Jupiter classes; for parameterized
+ * children that cannot pass helpers through a constructor, call {@link
+ * #initFlinkTestBase(CatalogTestHelper, TableTestHelper)} from the {@code @ParameterizedTest}
+ * method body.
+ */
+public class FlinkTestBase {
private static final Logger LOG = LoggerFactory.getLogger(FlinkTestBase.class);
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+ protected static final TestAms TEST_AMS = new TestAms();
- @Rule public TestName name = new TestName();
+ protected static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
public static String metastoreUri;
@@ -113,14 +140,228 @@ public class FlinkTestBase extends TableTestBase {
public static InternalCatalogBuilder catalogBuilder;
+ private CatalogTestHelper catalogTestHelper;
+ private TableTestHelper tableTestHelper;
+ private CatalogMeta catalogMeta;
+ private MixedFormatCatalog mixedFormatCatalog;
+ private UnifiedCatalog unifiedCatalog;
+ private org.apache.iceberg.catalog.Catalog icebergCatalog;
+ private MixedTable mixedTable;
+ private TableMetaStore tableMetaStore;
+ private File tempRoot;
+
+ /**
+ * No-arg constructor for parameterized children that pass helpers via {@link #initFlinkTestBase}.
+ */
+ public FlinkTestBase() {}
+
public FlinkTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
- super(catalogTestHelper, tableTestHelper);
+ this.catalogTestHelper = catalogTestHelper;
+ this.tableTestHelper = tableTestHelper;
+ }
+
+ @BeforeAll
+ public static void startFlinkBaseClassResources() throws Exception {
+ TEST_AMS.before();
+ MINI_CLUSTER_RESOURCE.before();
}
- @Before
- public void before() throws Exception {
+ @AfterAll
+ public static void stopFlinkBaseClassResources() {
+ try {
+ MINI_CLUSTER_RESOURCE.after();
+ } finally {
+ TEST_AMS.after();
+ }
+ }
+
+ @BeforeEach
+ public void setUpFlinkTestBaseLifecycle() throws Exception {
+ if (catalogTestHelper == null) {
+ // Parameterized child: the @ParameterizedTest body must call initFlinkTestBase(...).
+ return;
+ }
+ initLifecycle();
+ }
+
+ @AfterEach
+ public void tearDownFlinkTestBaseLifecycle() {
+ teardownLifecycle();
+ }
+
+ /**
+ * Initializer used by {@code @ParameterizedTest} children. Sets the helpers (since the parameters
+ * are received by the test method, not the constructor) and runs the catalog/table setup.
+ */
+ protected void initFlinkTestBase(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) throws Exception {
+ this.catalogTestHelper = catalogTestHelper;
+ this.tableTestHelper = tableTestHelper;
+ initLifecycle();
+ }
+
+ private void initLifecycle() throws Exception {
+ tempRoot = Files.createTempDirectory("flink-test-base").toFile();
+ String baseDir = tempRoot.getPath();
+ if (!SystemUtils.IS_OS_UNIX) {
+ baseDir = "file:/" + baseDir.replace("\\", "/");
+ }
+ catalogMeta = catalogTestHelper.buildCatalogMeta(baseDir);
+ catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, TEST_AMS.getServerUrl());
+ getAmsHandler().createCatalog(catalogMeta);
metastoreUri = getCatalogUri();
catalogBuilder = InternalCatalogBuilder.builder().amsUri(metastoreUri);
+ if (tableTestHelper != null) {
+ createTestTable();
+ }
+ }
+
+ private void teardownLifecycle() {
+ if (tableTestHelper != null && unifiedCatalog != null) {
+ try {
+ unifiedCatalog.dropTable(
+ tableTestHelper.id().getDatabase(), tableTestHelper.id().getTableName(), true);
+ } catch (Exception e) {
+ LOG.warn("dropTable failed", e);
+ }
+ try {
+ unifiedCatalog.dropDatabase(TableTestHelper.TEST_DB_NAME);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (catalogMeta != null) {
+ try {
+ getAmsHandler().dropCatalog(catalogMeta.getCatalogName());
+ } catch (Exception e) {
+ LOG.warn("dropCatalog failed", e);
+ }
+ }
+ if (tempRoot != null) {
+ try {
+ FileUtils.deleteDirectory(tempRoot);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean temp directory {}", tempRoot, e);
+ }
+ }
+ catalogMeta = null;
+ mixedFormatCatalog = null;
+ unifiedCatalog = null;
+ icebergCatalog = null;
+ mixedTable = null;
+ tableMetaStore = null;
+ tempRoot = null;
+ }
+
+ private void createTestTable() {
+ this.tableMetaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
+ getUnifiedCatalog().createDatabase(TableTestHelper.TEST_DB_NAME);
+ TableFormat format = getTestFormat();
+ if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
+ createMixedFormatTable();
+ } else if (TableFormat.ICEBERG.equals(format)) {
+ createIcebergFormatTable();
+ }
+ }
+
+ private void createMixedFormatTable() {
+ TableBuilder tableBuilder =
+ getMixedFormatCatalog()
+ .newTableBuilder(TableTestHelper.TEST_TABLE_ID, tableTestHelper.tableSchema());
+ tableBuilder.withProperties(tableTestHelper.tableProperties());
+ if (isKeyedTable()) {
+ tableBuilder.withPrimaryKeySpec(tableTestHelper.primaryKeySpec());
+ }
+ if (isPartitionedTable()) {
+ tableBuilder.withPartitionSpec(tableTestHelper.partitionSpec());
+ }
+ mixedTable = tableBuilder.create();
+ }
+
+ private void createIcebergFormatTable() {
+ getIcebergCatalog()
+ .createTable(
+ org.apache.iceberg.catalog.TableIdentifier.of(
+ TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME),
+ tableTestHelper.tableSchema(),
+ tableTestHelper.partitionSpec(),
+ tableTestHelper.tableProperties());
+ mixedTable =
+ (MixedTable)
+ getUnifiedCatalog()
+ .loadTable(TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME)
+ .originalTable();
+ }
+
+ public static MockAmoroManagementServer.AmsHandler getAmsHandler() {
+ return TEST_AMS.getAmsHandler();
+ }
+
+ protected MixedFormatCatalog getMixedFormatCatalog() {
+ if (mixedFormatCatalog == null) {
+ mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
+ }
+ return mixedFormatCatalog;
+ }
+
+ protected void refreshMixedFormatCatalog() {
+ this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
+ }
+
+ protected String getCatalogUri() {
+ return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
+ }
+
+ protected CatalogMeta getCatalogMeta() {
+ return catalogMeta;
+ }
+
+ protected TableFormat getTestFormat() {
+ return catalogTestHelper.tableFormat();
+ }
+
+ protected org.apache.iceberg.catalog.Catalog getIcebergCatalog() {
+ if (icebergCatalog == null) {
+ icebergCatalog = catalogTestHelper.buildIcebergCatalog(catalogMeta);
+ }
+ return icebergCatalog;
+ }
+
+ protected UnifiedCatalog getUnifiedCatalog() {
+ if (unifiedCatalog == null) {
+ unifiedCatalog = catalogTestHelper.buildUnifiedCatalog(catalogMeta);
+ }
+ return unifiedCatalog;
+ }
+
+ protected MixedTable getMixedTable() {
+ return mixedTable;
+ }
+
+ protected UnkeyedTable getBaseStore() {
+ return MixedTableUtil.baseStore(mixedTable);
+ }
+
+ protected TableMetaStore getTableMetaStore() {
+ return this.tableMetaStore;
+ }
+
+ protected boolean isKeyedTable() {
+ return tableTestHelper.primaryKeySpec() != null
+ && tableTestHelper.primaryKeySpec().primaryKeyExisted();
+ }
+
+ protected boolean isPartitionedTable() {
+ return tableTestHelper.partitionSpec() != null
+ && tableTestHelper.partitionSpec().isPartitioned();
+ }
+
+ protected TableTestHelper tableTestHelper() {
+ return tableTestHelper;
+ }
+
+ protected CatalogTestHelper catalogTestHelper() {
+ return catalogTestHelper;
}
public void config() {
@@ -129,6 +370,10 @@ public void config() {
props.put(CatalogFactoryOptions.AMS_URI.key(), metastoreUri);
}
+ protected int defaultParallelism() {
+ return 1;
+ }
+
public static void prepare() throws Exception {
KAFKA_CONTAINER.start();
}
@@ -163,7 +408,7 @@ protected StreamExecutionEnvironment getEnv() {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
- env.setParallelism(1);
+ env.setParallelism(defaultParallelism());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(300);
env.getCheckpointConfig()
@@ -307,12 +552,18 @@ protected static void commit(KeyedTable keyedTable, WriteResult result, boolean
}
}
+ /**
+ * 3-arg variant of the keyed task writer factory with a default mask of 3. The 4-arg variant
+ * lives under a different name ({@link #createKeyedTaskWriterWithMask}) to avoid the inherited
+ * static signature clashing with the {@code FlinkTaskWriterBaseTest} interface's default method
+ * in classes that {@code implements} that interface.
+ */
protected static TaskWriter createKeyedTaskWriter(
KeyedTable keyedTable, RowType rowType, boolean base) {
- return createKeyedTaskWriter(keyedTable, rowType, base, 3);
+ return createKeyedTaskWriterWithMask(keyedTable, rowType, base, 3);
}
- protected static TaskWriter createKeyedTaskWriter(
+ protected static TaskWriter createKeyedTaskWriterWithMask(
KeyedTable keyedTable, RowType rowType, boolean base, long mask) {
MixedFormatRowDataTaskWriterFactory taskWriterFactory =
new MixedFormatRowDataTaskWriterFactory(keyedTable, rowType, base);
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/TestFlinkSchemaUtil.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/TestFlinkSchemaUtil.java
index 0eeee1bc9d..62c438285e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/TestFlinkSchemaUtil.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/TestFlinkSchemaUtil.java
@@ -21,8 +21,8 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.iceberg.Schema;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Map;
@@ -55,6 +55,6 @@ public void testFlinkSchemaToIcebergSchema() {
TableSchema fromIcebergSchema =
FlinkSchemaUtil.toSchema(icebergSchema, new ArrayList<>(), extraOptions);
- Assert.assertEquals(flinkSchema, fromIcebergSchema);
+ Assertions.assertEquals(flinkSchema, fromIcebergSchema);
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkAmoroCatalogITCase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkAmoroCatalogITCase.java
index 92b2a286a1..69cdb99014 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkAmoroCatalogITCase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkAmoroCatalogITCase.java
@@ -27,7 +27,6 @@
import org.apache.amoro.formats.paimon.PaimonHadoopCatalogTestHelper;
import org.apache.amoro.formats.paimon.PaimonHiveCatalogTestHelper;
import org.apache.amoro.formats.paimon.PaimonTable;
-import org.apache.amoro.hive.TestHMS;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.Catalog;
@@ -35,42 +34,48 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.paimon.table.FileStoreTable;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
-/** ITCase for Flink UnifiedCatalog based on AmoroCatalogTestBase */
-@RunWith(value = Parameterized.class)
+/** ITCase for Flink UnifiedCatalog based on AmoroCatalogTestBase. */
public class FlinkAmoroCatalogITCase extends AmoroCatalogITCaseBase {
- static final TestHMS TEST_HMS = new TestHMS();
AbstractCatalog flinkCatalog;
- public FlinkAmoroCatalogITCase(AmoroCatalogTestHelper> catalogTestHelper) {
- super(catalogTestHelper);
+ static Stream parameters() {
+ return Stream.of(
+ Arguments.of(PaimonHiveCatalogTestHelper.defaultHelper()),
+ Arguments.of(PaimonHadoopCatalogTestHelper.defaultHelper()));
}
- @Parameterized.Parameters(name = "{0}")
- public static Object[] parameters() {
- return new Object[] {
- PaimonHiveCatalogTestHelper.defaultHelper(), PaimonHadoopCatalogTestHelper.defaultHelper()
- };
+ @BeforeAll
+ public static void startTestHms() throws Exception {
+ TEST_HMS.before();
}
- @BeforeClass
- public static void beforeAll() throws Exception {
- TEST_HMS.before();
+ @AfterAll
+ public static void stopTestHms() {
+ TEST_HMS.after();
}
- @Before
- public void setup() throws Exception {
- createDatabase();
- createTable();
+ @AfterEach
+ public void teardown() {
+ if (flinkCatalog != null) {
+ flinkCatalog.close();
+ }
+ }
+
+ private void setUpForParam(AmoroCatalogTestHelper> catalogTestHelper) throws Exception {
+ initAmoroCatalog(catalogTestHelper);
+ catalogTestHelper.createDatabase(TEST_DB_NAME);
+ catalogTestHelper.createTable(TEST_DB_NAME, TEST_TABLE_NAME);
String catalog = "unified_catalog";
exec(
"CREATE CATALOG %s WITH ('type'='unified', 'metastore.url'='%s')",
@@ -83,32 +88,10 @@ public void setup() throws Exception {
assertEquals(catalog, flinkCatalog.getName());
}
- @After
- public void teardown() {
- TEST_HMS.after();
- if (flinkCatalog != null) {
- flinkCatalog.close();
- }
- }
-
- public void createDatabase() {
- try {
- catalogTestHelper.createDatabase(TEST_DB_NAME);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void createTable() {
- try {
- catalogTestHelper.createTable(TEST_DB_NAME, TEST_TABLE_NAME);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testTableExists() throws Exception {
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("parameters")
+ public void testTableExists(AmoroCatalogTestHelper> catalogTestHelper) throws Exception {
+ setUpForParam(catalogTestHelper);
CatalogBaseTable catalogBaseTable =
flinkCatalog.getTable(new ObjectPath(TEST_DB_NAME, TEST_TABLE_NAME));
assertNotNull(catalogBaseTable);
@@ -120,8 +103,10 @@ public void testTableExists() throws Exception {
catalogBaseTable.getUnresolvedSchema().getColumns().size());
}
- @Test
- public void testInsertAndQuery() throws Exception {
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("parameters")
+ public void testInsertAndQuery(AmoroCatalogTestHelper> catalogTestHelper) throws Exception {
+ setUpForParam(catalogTestHelper);
exec("INSERT INTO %s SELECT 1, 'Lily', 1234567890", TEST_TABLE_NAME);
TableResult tableResult =
exec("select * from %s /*+OPTIONS('monitor-interval'='1s')*/ ", TEST_TABLE_NAME);
@@ -132,8 +117,11 @@ public void testInsertAndQuery() throws Exception {
assertEquals(Row.of(1, "Lily", 1234567890).toString(), actualRow.toString());
}
- @Test
- public void testSwitchCurrentCatalog() {
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("parameters")
+ public void testSwitchCurrentCatalog(AmoroCatalogTestHelper> catalogTestHelper)
+ throws Exception {
+ setUpForParam(catalogTestHelper);
String memCatalog = "mem_catalog";
exec("create catalog %s with('type'='generic_in_memory')", memCatalog);
exec(
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
index 6e9a654bf5..756e3776d1 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalogITCase.java
@@ -34,44 +34,49 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
-@RunWith(value = Parameterized.class)
public class FlinkUnifiedCatalogITCase extends CatalogITCaseBase {
static final TestHMS TEST_HMS = new TestHMS();
AbstractCatalog flinkCatalog;
TableIdentifier identifier;
- public FlinkUnifiedCatalogITCase(CatalogTestHelper catalogTestHelper) {
- super(catalogTestHelper, new BasicTableTestHelper(true, false));
+ static Stream parameters() {
+ return Stream.of(
+ Arguments.of(new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf())),
+ Arguments.of(new HiveCatalogTestHelper(TableFormat.MIXED_ICEBERG, TEST_HMS.getHiveConf())),
+ Arguments.of(new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf())));
}
- @Parameterized.Parameters(name = "catalogTestHelper = {0}")
- public static Object[][] parameters() {
- return new Object[][] {
- {new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf())},
- {new HiveCatalogTestHelper(TableFormat.MIXED_ICEBERG, TEST_HMS.getHiveConf())},
- {new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf())}
- };
+ @BeforeAll
+ public static void startTestHms() throws Exception {
+ TEST_HMS.before();
}
- @BeforeClass
- public static void beforeAll() throws Exception {
- TEST_HMS.before();
+ @AfterAll
+ public static void stopTestHms() {
+ TEST_HMS.after();
+ }
+
+ @AfterEach
+ public void teardown() {
+ if (flinkCatalog != null) {
+ flinkCatalog.close();
+ }
}
- @Before
- public void setup() throws Exception {
+ private void setUpForParam(CatalogTestHelper catalogTestHelper) throws Exception {
+ initCatalogITCase(catalogTestHelper, new BasicTableTestHelper(true, false));
String catalog = "unified_catalog";
exec("CREATE CATALOG %s WITH ('type'='unified', 'ams.uri'='%s')", catalog, getCatalogUri());
exec("USE CATALOG %s", catalog);
@@ -83,16 +88,10 @@ public void setup() throws Exception {
identifier = tableTestHelper().id();
}
- @After
- public void teardown() {
- TEST_HMS.after();
- if (flinkCatalog != null) {
- flinkCatalog.close();
- }
- }
-
- @Test
- public void testTableExists() throws TableNotExistException {
+ @ParameterizedTest(name = "catalogTestHelper = {0}")
+ @MethodSource("parameters")
+ public void testTableExists(CatalogTestHelper catalogTestHelper) throws Exception {
+ setUpForParam(catalogTestHelper);
CatalogBaseTable catalogBaseTable =
flinkCatalog.getTable(new ObjectPath(identifier.getDatabase(), identifier.getTableName()));
assertNotNull(catalogBaseTable);
@@ -101,8 +100,10 @@ public void testTableExists() throws TableNotExistException {
catalogBaseTable.getUnresolvedSchema().getColumns().size());
}
- @Test
- public void testInsertAndQuery() throws Exception {
+ @ParameterizedTest(name = "catalogTestHelper = {0}")
+ @MethodSource("parameters")
+ public void testInsertAndQuery(CatalogTestHelper catalogTestHelper) throws Exception {
+ setUpForParam(catalogTestHelper);
exec(
"INSERT INTO %s SELECT 1, 'Lily', 1234567890, TO_TIMESTAMP('2020-01-01 01:02:03')",
identifier.getTableName());
@@ -116,8 +117,10 @@ public void testInsertAndQuery() throws Exception {
Row.of(1, "Lily", 1234567890L, "2020-01-01T01:02:03").toString(), actualRow.toString());
}
- @Test
- public void testSwitchCurrentCatalog() {
+ @ParameterizedTest(name = "catalogTestHelper = {0}")
+ @MethodSource("parameters")
+ public void testSwitchCurrentCatalog(CatalogTestHelper catalogTestHelper) throws Exception {
+ setUpForParam(catalogTestHelper);
String memCatalog = "mem_catalog";
exec("create catalog %s with('type'='generic_in_memory')", memCatalog);
exec(
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
index 3536511d8f..b19fcb4c9c 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java
@@ -29,16 +29,22 @@
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import org.apache.amoro.MockAmoroManagementServer;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestAms;
+import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
-import org.apache.amoro.catalog.CatalogTestBase;
import org.apache.amoro.flink.MiniClusterResource;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
+import org.apache.amoro.mixed.CatalogLoader;
+import org.apache.amoro.mixed.MixedFormatCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableIdentifier;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
@@ -52,18 +58,17 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.io.File;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -74,28 +79,26 @@
/**
* Test cases for mixed catalog factories, including:
* CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER, CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER,
- * CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER
+ * CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER.
+ *
+ * This test no longer extends the still-JUnit-4 {@code CatalogTestBase}; the bits of catalog
+ * lifecycle that were pulled in from there are inlined so the class can be a clean Jupiter test.
*/
-@RunWith(value = Parameterized.class)
-public class TestMixedCatalog extends CatalogTestBase {
- private String catalogName;
- private String catalogFactoryType;
+public class TestMixedCatalog {
private static final Logger LOG = LoggerFactory.getLogger(TestMixedCatalog.class);
- public TestMixedCatalog(String catalogFactoryType) {
- super(new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG));
- this.catalogFactoryType = catalogFactoryType;
- this.catalogName = catalogFactoryType + "_catalog";
- }
+ protected static final TestAms TEST_AMS = new TestAms();
- @Parameterized.Parameters(name = "catalogFactoryType = {0}")
- public static Object[] parameters() {
- return new Object[] {
- CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER, CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
- };
- }
+ private final BasicCatalogTestHelper testHelper =
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG);
+
+ private CatalogMeta catalogMeta;
+ private MixedFormatCatalog mixedFormatCatalog;
+ private File tempRoot;
+
+ private String catalogName;
+ private String catalogFactoryType;
- @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
protected Map props;
private static final String DB = TableTestHelper.TEST_DB_NAME;
@@ -103,8 +106,56 @@ public static Object[] parameters() {
private volatile StreamExecutionEnvironment env = null;
private volatile StreamTableEnvironment tEnv = null;
- @Before
- public void before() throws Exception {
+ @BeforeAll
+ public static void startTestAms() throws Exception {
+ TEST_AMS.before();
+ }
+
+ @AfterAll
+ public static void stopTestAms() {
+ TEST_AMS.after();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ try {
+ if (catalogName != null) {
+ sql("DROP TABLE IF EXISTS " + catalogName + "." + DB + "." + TABLE);
+ sql("USE CATALOG default_catalog");
+ sql("DROP DATABASE IF EXISTS " + catalogName + "." + DB);
+ Assertions.assertTrue(
+ CollectionUtil.isNullOrEmpty(getMixedFormatCatalog().listDatabases()));
+ sql("DROP CATALOG " + catalogName);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Test teardown failed", t);
+ }
+ if (catalogMeta != null) {
+ try {
+ getAmsHandler().dropCatalog(catalogMeta.getCatalogName());
+ } catch (Exception e) {
+ LOG.warn("dropCatalog failed", e);
+ }
+ catalogMeta = null;
+ }
+ if (tempRoot != null) {
+ try {
+ FileUtils.deleteDirectory(tempRoot);
+ } catch (Exception e) {
+ // ignore
+ }
+ tempRoot = null;
+ }
+ mixedFormatCatalog = null;
+ }
+
+ private void setUpForParam(String catalogFactoryType) throws Exception {
+ this.catalogFactoryType = catalogFactoryType;
+ this.catalogName = catalogFactoryType + "_catalog";
+ tempRoot = Files.createTempDirectory("test-mixed-catalog").toFile();
+ catalogMeta = testHelper.buildCatalogMeta(tempRoot.getPath());
+ catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, TEST_AMS.getServerUrl());
+ getAmsHandler().createCatalog(catalogMeta);
props = Maps.newHashMap();
props.put("type", catalogFactoryType);
props.put(CatalogFactoryOptions.AMS_URI.key(), getCatalogUri());
@@ -113,27 +164,49 @@ public void before() throws Exception {
sql("CREATE DATABASE " + catalogName + "." + DB);
}
- @After
- public void after() {
- sql("DROP TABLE IF EXISTS " + catalogName + "." + DB + "." + TABLE);
- // Switch away from the catalog before dropping the database.
- // Flink 1.19+ rejects dropping the currently active database.
- sql("USE CATALOG default_catalog");
- sql("DROP DATABASE IF EXISTS " + catalogName + "." + DB);
- Assert.assertTrue(CollectionUtil.isNullOrEmpty(getMixedFormatCatalog().listDatabases()));
- sql("DROP CATALOG " + catalogName);
+ static MockAmoroManagementServer.AmsHandler getAmsHandler() {
+ return TEST_AMS.getAmsHandler();
+ }
+
+ protected MixedFormatCatalog getMixedFormatCatalog() {
+ if (mixedFormatCatalog == null) {
+ mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
+ }
+ return mixedFormatCatalog;
+ }
+
+ protected String getCatalogUri() {
+ return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
+ }
+
+ static Stream parameters() {
+ return Stream.of(
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER);
}
- @Test
- public void testMixedCatalog() {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testMixedCatalog(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
String[] catalogs = getTableEnv().listCatalogs();
- Assert.assertArrayEquals(
+ Assertions.assertArrayEquals(
Arrays.stream(catalogs).sorted().toArray(),
Stream.of("default_catalog", catalogName).sorted().toArray());
}
- @Test
- public void testDDL() {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testDDL(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
sql(
"CREATE TABLE "
+ catalogName
@@ -150,16 +223,22 @@ public void testDDL() {
sql("USE " + catalogName + "." + DB);
sql("SHOW tables");
- Assert.assertTrue(
+ Assertions.assertTrue(
getMixedFormatCatalog()
.loadTable(TableIdentifier.of(catalogName, DB, TABLE))
.isKeyedTable());
}
- @Test
- public void testComputeIndex() {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testComputeIndex(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
// if compute column before any physical column, will throw exception.
- Assert.assertThrows(
+ Assertions.assertThrows(
org.apache.flink.table.api.TableException.class,
() ->
sql(
@@ -190,8 +269,14 @@ public void testComputeIndex() {
+ ") ");
}
- @Test
- public void testDDLWithVirtualColumn() throws IOException {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testDDLWithVirtualColumn(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
// create mixed-format table with compute columns and watermark under mixed-format catalog
// org.apache.iceberg.flink.TypeToFlinkType will convert Timestamp to Timestamp(6), so we cast
// datatype manually
@@ -221,28 +306,34 @@ public void testDDLWithVirtualColumn() throws IOException {
Arrays.stream(computedIndex)
.forEach(
x -> {
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, x, NAME)));
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, x, EXPR)));
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, x, DATA_TYPE)));
});
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME)));
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)));
- Assert.assertTrue(
+ Assertions.assertTrue(
properties.containsKey(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE)));
List result = sql("DESC " + catalogName + "." + DB + "." + TABLE + "");
- Assert.assertEquals(6, result.size());
+ Assertions.assertEquals(6, result.size());
}
- @Test
- public void testDMLWithVirtualColumn() throws IOException {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testDMLWithVirtualColumn(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
// create mixed-format table with compute columns under mixed-format catalog
sql(
"CREATE TABLE "
@@ -277,8 +368,14 @@ public void testDMLWithVirtualColumn() throws IOException {
checkRows(rows);
}
- @Test
- public void testReadNotMatchColumn() throws IOException {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testReadNotMatchColumn(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
// create mixed-format table with compute columns under mixed-format catalog
sql(
"CREATE TABLE "
@@ -306,12 +403,12 @@ public void testReadNotMatchColumn() throws IOException {
.set(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, 2, EXPR), afterExpr)
.commit();
- Assert.assertNotEquals(
+ Assertions.assertNotEquals(
beforeExpr,
amoroTable.properties().get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, 2, EXPR)));
// property for expr do not match any columns in amoro, will throw exception.
- Assert.assertThrows(
+ Assertions.assertThrows(
IllegalStateException.class,
() -> sql("DESC " + catalogName + "." + DB + "." + TABLE + ""));
amoroTable
@@ -323,8 +420,14 @@ public void testReadNotMatchColumn() throws IOException {
sql("DESC " + catalogName + "." + DB + "." + TABLE + "");
}
- @Test
- public void testDML() throws IOException {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testDML(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
sql(
"CREATE TABLE default_catalog.default_database."
+ TABLE
@@ -374,19 +477,19 @@ public void testDML() throws IOException {
+ " /*+ OPTIONS("
+ "'streaming'='false'"
+ ") */");
- Assert.assertEquals(1, rows.size());
+ Assertions.assertEquals(1, rows.size());
sql("DROP TABLE default_catalog.default_database." + TABLE);
}
private void checkRows(List rows) {
- Assert.assertEquals(1, rows.size());
+ Assertions.assertEquals(1, rows.size());
int id = (int) rows.get(0).getField("id");
int computeId = (int) rows.get(0).getField("compute_id");
- Assert.assertEquals(1, id);
+ Assertions.assertEquals(1, id);
// computeId should be id+5
- Assert.assertEquals(id + 5, computeId);
- Assert.assertEquals(4, rows.get(0).getFieldNames(true).size());
+ Assertions.assertEquals(id + 5, computeId);
+ Assertions.assertEquals(4, rows.get(0).getFieldNames(true).size());
}
protected List sql(String query, Object... args) {
@@ -505,8 +608,14 @@ private void insertValue() {
sql("DROP TABLE default_catalog.default_database." + TABLE);
}
- @Test
- public void testAlterUnKeyTable() throws Exception {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testAlterUnKeyTable(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
sql(
"CREATE TABLE "
+ catalogName
@@ -534,12 +643,18 @@ public void testAlterUnKeyTable() throws Exception {
+ "SET ( 'write.metadata.delete-after-commit.enabled' = 'false')");
Map unKeyTableProperties =
getMixedFormatCatalog().loadTable(TableIdentifier.of(catalogName, DB, TABLE)).properties();
- Assert.assertEquals(
+ Assertions.assertEquals(
unKeyTableProperties.get("write.metadata.delete-after-commit.enabled"), "false");
}
- @Test
- public void testAlterKeyTable() throws Exception {
+ @ParameterizedTest(name = "catalogFactoryType = {0}")
+ @ValueSource(
+ strings = {
+ CatalogFactoryOptions.MIXED_ICEBERG_IDENTIFIER,
+ CatalogFactoryOptions.MIXED_HIVE_IDENTIFIER
+ })
+ public void testAlterKeyTable(String catalogFactoryType) throws Exception {
+ setUpForParam(catalogFactoryType);
sql(
"CREATE TABLE "
+ catalogName
@@ -584,8 +699,8 @@ public void testAlterKeyTable() throws Exception {
Map keyTableProperties =
getMixedFormatCatalog().loadTable(TableIdentifier.of(catalogName, DB, TABLE)).properties();
- Assert.assertEquals(keyTableProperties.get("self-optimizing.enabled"), "true");
- Assert.assertEquals(keyTableProperties.get("self-optimizing.group"), "flink");
- Assert.assertEquals(keyTableProperties.get("write.upsert.enabled"), "true");
+ Assertions.assertEquals(keyTableProperties.get("self-optimizing.enabled"), "true");
+ Assertions.assertEquals(keyTableProperties.get("self-optimizing.group"), "flink");
+ Assertions.assertEquals(keyTableProperties.get("write.upsert.enabled"), "true");
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalogTablePartitions.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalogTablePartitions.java
index e951acce03..754bdfcc38 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalogTablePartitions.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalogTablePartitions.java
@@ -44,8 +44,9 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.LinkedList;
@@ -61,8 +62,8 @@ public TestMixedCatalogTablePartitions() {
new BasicTableTestHelper(true, true));
}
- public void before() throws Exception {
- super.before();
+ @BeforeEach
+ public void configProps() {
super.config();
}
@@ -105,7 +106,7 @@ public void testListPartitionsUnKeyedTable() throws TableNotPartitionedException
new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-02"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
- Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected);
+ Assertions.assertEquals(list, expected, "Should produce the expected catalog partition specs.");
}
@Test
@@ -149,8 +150,8 @@ public void testListPartitionsKeyedTable() throws TableNotPartitionedException {
new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-02"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
- Assert.assertEquals(
- "Should produce the expected catalog partition specs.", partitionList, expected);
+ Assertions.assertEquals(
+ partitionList, expected, "Should produce the expected catalog partition specs.");
}
@Test
@@ -203,21 +204,21 @@ public void testListPartitionsByFilter()
new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-01", "name", "mark"));
expected.add(partitionSpec1);
expected.add(partitionSpec2);
- Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected);
+ Assertions.assertEquals(list, expected, "Should produce the expected catalog partition specs.");
List listCatalogPartitionSpec =
mixedCatalog.listPartitions(
objectPath,
new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-01", "name", "Gerry")));
- Assert.assertEquals(
- "Should produce the expected catalog partition specs.", listCatalogPartitionSpec.size(), 1);
+ Assertions.assertEquals(
+ listCatalogPartitionSpec.size(), 1, "Should produce the expected catalog partition specs.");
try {
mixedCatalog.listPartitions(
objectPath,
new CatalogPartitionSpec(ImmutableMap.of("dt", "2023-10-01", "name1", "Gerry")));
} catch (Exception e) {
- Assert.assertTrue(e instanceof PartitionSpecInvalidException);
+ Assertions.assertTrue(e instanceof PartitionSpecInvalidException);
}
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/ByteArraySetSerializerTest.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/ByteArraySetSerializerTest.java
index 8f7aae0606..e2a8151b68 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/ByteArraySetSerializerTest.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/ByteArraySetSerializerTest.java
@@ -18,8 +18,8 @@
package org.apache.amoro.flink.lookup;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +38,8 @@ public void testByteArraySetSerializer() {
byteSet.add(new ByteArrayWrapper(data, data.length));
byte[] serialized = ByteArraySetSerializer.serialize(byteSet);
Set actualSet = ByteArraySetSerializer.deserialize(serialized);
- Assert.assertEquals(byteSet.size(), actualSet.size());
- Assert.assertEquals(byteSet, actualSet);
+ Assertions.assertEquals(byteSet.size(), actualSet.size());
+ Assertions.assertEquals(byteSet, actualSet);
}
@Test
@@ -56,13 +56,13 @@ public void testPerformance() {
totalSize += 4 + tmp.length;
}
LOG.info("added {} items process time: {}", num, System.currentTimeMillis() - start);
- Assert.assertEquals(num, byteArraySet.size());
+ Assertions.assertEquals(num, byteArraySet.size());
start = System.currentTimeMillis();
byte[] serialized = ByteArraySetSerializer.serialize(byteArraySet);
long cost = System.currentTimeMillis() - start;
assert serialized != null;
- Assert.assertEquals(totalSize, serialized.length);
+ Assertions.assertEquals(totalSize, serialized.length);
LOG.info(
"serialized cost: {}, num= {}, result byte array size={}.", cost, num, serialized.length);
@@ -70,14 +70,14 @@ public void testPerformance() {
Set actualSet = ByteArraySetSerializer.deserialize(serialized);
cost = System.currentTimeMillis() - start;
LOG.info("deserialized cost: {}, num= {}, set size={}.", cost, num, actualSet.size());
- Assert.assertEquals(byteArraySet, actualSet);
+ Assertions.assertEquals(byteArraySet, actualSet);
// exists
sb = new StringBuilder();
start = System.currentTimeMillis();
for (int i = 0; i < num; i++) {
sb.append(i);
- Assert.assertTrue(
+ Assertions.assertTrue(
actualSet.contains(
new ByteArrayWrapper(sb.toString().getBytes(), sb.toString().getBytes().length)));
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/TestKVTable.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/TestKVTable.java
index 3dc0551cc6..d0d6f9d78c 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/TestKVTable.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/TestKVTable.java
@@ -20,7 +20,7 @@
import static org.apache.amoro.flink.table.descriptors.MixedFormatValidator.LOOKUP_CACHE_TTL_AFTER_WRITE;
import static org.apache.amoro.flink.table.descriptors.MixedFormatValidator.ROCKSDB_WRITING_THREADS;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.amoro.flink.lookup.filter.RowDataPredicate;
import org.apache.amoro.flink.lookup.filter.RowDataPredicateExpressionVisitor;
@@ -50,14 +50,10 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,16 +71,15 @@
import java.util.stream.Collectors;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-@RunWith(value = Parameterized.class)
public class TestKVTable extends TestRowDataPredicateBase {
private static final Logger LOG = LoggerFactory.getLogger(TestKVTable.class);
- @Rule public TemporaryFolder temp = new TemporaryFolder();
- @Rule public TestName name = new TestName();
+ @TempDir public java.nio.file.Path temp;
+
private final Configuration config = new Configuration();
private final List primaryKeys = Lists.newArrayList("id", "grade");
private final List primaryKeysDisorder = Lists.newArrayList("grade", "num", "id");
- private final boolean guavaCacheEnabled;
+ private boolean guavaCacheEnabled;
private final Schema mixedTableSchema =
new Schema(
@@ -94,25 +89,19 @@ public class TestKVTable extends TestRowDataPredicateBase {
private String dbPath;
- @Parameterized.Parameters(name = "guavaCacheEnabled = {0}")
- public static Object[][] parameters() {
- return new Object[][] {{true}, {false}};
- }
-
- public TestKVTable(boolean guavaCacheEnabled) {
+ private void setUpForParam(boolean guavaCacheEnabled) throws IOException {
this.guavaCacheEnabled = guavaCacheEnabled;
- }
-
- @Before
- public void before() throws IOException {
- dbPath = temp.newFolder().getPath();
+ dbPath = java.nio.file.Files.createTempDirectory(temp, "kv-table").toFile().getPath();
if (!guavaCacheEnabled) {
config.set(MixedFormatValidator.LOOKUP_CACHE_MAX_ROWS, 0L);
}
}
- @Test
- public void testRowDataSerializer() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testRowDataSerializer(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
BinaryRowDataSerializer binaryRowDataSerializer = new BinaryRowDataSerializer(3);
GenericRowData genericRowData = (GenericRowData) row(1, "2", 3);
@@ -126,10 +115,10 @@ public void testRowDataSerializer() throws IOException {
BinaryRowData desRowData =
binaryRowDataSerializer.deserialize(new DataInputDeserializer(view.getCopyOfBuffer()));
- Assert.assertNotNull(desRowData);
- Assert.assertEquals(record.getInt(0), desRowData.getInt(0));
- Assert.assertEquals(record.getInt(1), desRowData.getInt(1));
- Assert.assertEquals(record.getInt(2), desRowData.getInt(2));
+ Assertions.assertNotNull(desRowData);
+ Assertions.assertEquals(record.getInt(0), desRowData.getInt(0));
+ Assertions.assertEquals(record.getInt(1), desRowData.getInt(1));
+ Assertions.assertEquals(record.getInt(2), desRowData.getInt(2));
// test join key rowData
binaryRowDataSerializer = new BinaryRowDataSerializer(2);
@@ -149,11 +138,14 @@ public void testRowDataSerializer() throws IOException {
view.clear();
binaryRowDataSerializer.serialize(binaryRowData1, view);
byte[] rowBytes1 = view.getCopyOfBuffer();
- Assert.assertArrayEquals(rowBytes1, rowBytes);
+ Assertions.assertArrayEquals(rowBytes1, rowBytes);
}
- @Test
- public void testInitialUniqueKeyTable() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testInitialUniqueKeyTable(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
config.setInteger(ROCKSDB_WRITING_THREADS, 5);
List joinKeys = Lists.newArrayList("id", "grade");
try (UniqueIndexTable uniqueIndexTable = (UniqueIndexTable) createTable(joinKeys)) {
@@ -212,8 +204,11 @@ public void testInitialUniqueKeyTable() throws IOException {
}
}
- @Test
- public void testSecondaryKeysMapping() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testSecondaryKeysMapping(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
// primary keys are id and grade.
List joinKeys = Lists.newArrayList("grade", "id");
try (SecondaryIndexTable secondaryIndexTable =
@@ -252,8 +247,11 @@ public void testSecondaryKeysMapping() throws IOException {
}
}
- @Test
- public void testInitialSecondaryKeyTable() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testInitialSecondaryKeyTable(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
config.setInteger(ROCKSDB_WRITING_THREADS, 10);
config.set(LOOKUP_CACHE_TTL_AFTER_WRITE, Duration.ofMinutes(1000));
// primary keys are id and grade.
@@ -303,8 +301,11 @@ private void writeAndAssert(SecondaryIndexTable secondaryIndexTable) throws IOEx
assertTableSet(secondaryIndexTable, row(3), row(3, "3", 5), row(3, "4", 4));
}
- @Test
- public void testCacheExpired() throws InterruptedException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testCacheExpired(boolean guavaCacheEnabled) throws Exception {
+ setUpForParam(guavaCacheEnabled);
+
Cache cache =
CacheBuilder.newBuilder().expireAfterWrite(Duration.ofSeconds(1)).build();
cache.put(1, 1);
@@ -318,21 +319,24 @@ public void testCacheExpired() throws InterruptedException {
}
return v;
});
- Assert.assertEquals(Integer.valueOf(1), cache.getIfPresent(1));
- Assert.assertEquals(Integer.valueOf(2), cache.getIfPresent(2));
+ Assertions.assertEquals(Integer.valueOf(1), cache.getIfPresent(1));
+ Assertions.assertEquals(Integer.valueOf(2), cache.getIfPresent(2));
Thread.sleep(1001);
- Assert.assertEquals(2, cache.size());
- Assert.assertNull(cache.getIfPresent(1));
- Assert.assertNull(cache.getIfPresent(2));
+ Assertions.assertEquals(2, cache.size());
+ Assertions.assertNull(cache.getIfPresent(1));
+ Assertions.assertNull(cache.getIfPresent(2));
cache.cleanUp();
cache.put(3, 3);
- Assert.assertEquals(1, cache.size());
- Assert.assertNull(cache.getIfPresent(1));
- Assert.assertEquals(Integer.valueOf(3), cache.getIfPresent(3));
+ Assertions.assertEquals(1, cache.size());
+ Assertions.assertNull(cache.getIfPresent(1));
+ Assertions.assertEquals(Integer.valueOf(3), cache.getIfPresent(3));
}
- @Test
- public void testPredicate() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testPredicate(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
String filter = "id >= 2 and num < 5 and num > 2";
Optional rowDataPredicate = generatePredicate(filter);
@@ -388,8 +392,11 @@ public void testPredicate() throws IOException {
row(4, "4", 4));
}
- @Test
- public void testSecondaryIndexPredicate() throws IOException {
+ @ParameterizedTest(name = "guavaCacheEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testSecondaryIndexPredicate(boolean guavaCacheEnabled) throws IOException {
+ setUpForParam(guavaCacheEnabled);
+
String filter = "id >= 2 and num < 5 and num > 2";
Optional rowDataPredicate = generatePredicate(filter);
@@ -500,12 +507,12 @@ private void assertTable(KVTable table, RowData... rows) throws IOExcep
RowData key = rows[i], expected = rows[i + 1];
List values = table.get(key);
- Assert.assertNotNull(values);
+ Assertions.assertNotNull(values);
if (expected == null) {
- Assert.assertEquals(0, values.size());
+ Assertions.assertEquals(0, values.size());
continue;
}
- Assert.assertEquals(expected.toString(), 1, values.size());
+ Assertions.assertEquals(1, values.size(), expected.toString());
RowData actual = values.get(0);
assertRecord(expected, actual);
}
@@ -515,10 +522,10 @@ private void assertTableSet(KVTable table, RowData key, RowData... expe
throws IOException {
List values = table.get(key);
if (expects == null) {
- Assert.assertEquals(0, values.size());
+ Assertions.assertEquals(0, values.size());
return;
}
- Assert.assertEquals(expects.length, values.size());
+ Assertions.assertEquals(expects.length, values.size());
values = values.stream().sorted(compare()).collect(Collectors.toList());
List expectsAfterSort =
Arrays.stream(expects).sorted(compare()).collect(Collectors.toList());
@@ -546,16 +553,16 @@ private void assertRecord(RowData expected, RowData actual) {
switch (j) {
case 0:
case 2:
- Assert.assertEquals(
- String.format("expected:%s, actual:%s.", expected.toString(), actual),
+ Assertions.assertEquals(
expected.getInt(j),
- binaryRowData.getInt(j));
+ binaryRowData.getInt(j),
+ String.format("expected:%s, actual:%s.", expected.toString(), actual));
break;
case 1:
- Assert.assertEquals(
- String.format("expected:%s, actual:%s.", expected, actual),
+ Assertions.assertEquals(
expected.getString(j),
- binaryRowData.getString(j));
+ binaryRowData.getString(j),
+ String.format("expected:%s, actual:%s.", expected, actual));
break;
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateAllFieldTypes.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateAllFieldTypes.java
index b48aa9bfeb..c97ed3470c 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateAllFieldTypes.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateAllFieldTypes.java
@@ -18,9 +18,9 @@
package org.apache.amoro.flink.lookup.filter;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.amoro.flink.util.DateTimeUtils;
import org.apache.flink.table.api.DataTypes;
@@ -33,8 +33,8 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.sql.Timestamp;
@@ -53,7 +53,7 @@ public class TestRowDataPredicateAllFieldTypes extends TestRowDataPredicateBase
List columns = new ArrayList<>();
protected ResolvedSchema schema;
- @Before
+ @BeforeEach
public void setUp() {
columns.add(0, Column.physical("f0", DataTypes.INT()));
columns.add(1, Column.physical("f1", DataTypes.STRING()));
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateBase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateBase.java
index c50d2d38a1..d4e18a714c 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateBase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateBase.java
@@ -36,7 +36,7 @@
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.types.logical.RowType;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
import java.util.Collections;
import java.util.List;
@@ -47,7 +47,7 @@ public abstract class TestRowDataPredicateBase {
public static StreamExecutionEnvironment env;
public static TableEnvironment tEnv;
- @Before
+ @BeforeEach
public void init() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateExpressionVisitor.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateExpressionVisitor.java
index e69689dbbd..66d9716bc6 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateExpressionVisitor.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/lookup/filter/TestRowDataPredicateExpressionVisitor.java
@@ -18,9 +18,9 @@
package org.apache.amoro.flink.lookup.filter;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
@@ -29,8 +29,8 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,7 +47,7 @@ public class TestRowDataPredicateExpressionVisitor extends TestRowDataPredicateB
List columns = new ArrayList<>();
ResolvedSchema schema;
- @Before
+ @BeforeEach
public void setUp() {
columns.add(0, Column.physical("id", DataTypes.INT()));
columns.add(1, Column.physical("name", DataTypes.STRING()));
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java
index ef5d08a68b..2f6334ec2d 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSource.java
@@ -47,8 +47,8 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.LocalDateTime;
@@ -145,7 +145,7 @@ public void testUnkeyedTableDataStream() throws Exception {
GenericRowData.of(
o.getInt(0), o.getString(1), o.getLong(2), o.getTimestamp(3, 6))));
- Assert.assertEquals(new HashSet<>(expectedRecords), rowData);
+ Assertions.assertEquals(new HashSet<>(expectedRecords), rowData);
}
@Test
@@ -198,7 +198,7 @@ public void testUnkeyedStreamingRead() throws Exception {
}
jobClient.cancel();
- Assert.assertEquals(new HashSet<>(expectedRecords), rowData);
+ Assertions.assertEquals(new HashSet<>(expectedRecords), rowData);
}
@Test
@@ -272,7 +272,7 @@ public void testUnkeyedSnapshotRead() throws Exception {
}
jobClient.cancel();
- Assert.assertEquals(new HashSet<>(expectedRecords), rowData);
+ Assertions.assertEquals(new HashSet<>(expectedRecords), rowData);
CloseableIterator resultIterator =
FlinkSource.forRowData()
@@ -299,6 +299,6 @@ public void testUnkeyedSnapshotRead() throws Exception {
o.getInt(0), o.getString(1), o.getLong(2), o.getTimestamp(3, 6))));
expectedRecords = DataUtil.toRowData(s1);
- Assert.assertEquals(new HashSet<>(expectedRecords), rowData);
+ Assertions.assertEquals(new HashSet<>(expectedRecords), rowData);
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSplitPlanner.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSplitPlanner.java
index 859776b94c..3d881000e3 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSplitPlanner.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestFlinkSplitPlanner.java
@@ -22,8 +22,8 @@
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.scan.ChangeTableIncrementalScan;
import org.apache.iceberg.Snapshot;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
@@ -37,7 +37,7 @@ public void testPlanSplitFromKeyedTable() {
testKeyedTable.changeTable().refresh();
List splitList =
FlinkSplitPlanner.planFullTable(testKeyedTable, new AtomicInteger());
- Assert.assertEquals(7, splitList.size());
+ Assertions.assertEquals(7, splitList.size());
}
@Test
@@ -47,7 +47,7 @@ public void testIncrementalChangelog() throws IOException {
List splitList =
FlinkSplitPlanner.planFullTable(testKeyedTable, new AtomicInteger());
- Assert.assertEquals(7, splitList.size());
+ Assertions.assertEquals(7, splitList.size());
long startSnapshotId = testKeyedTable.changeTable().currentSnapshot().snapshotId();
writeUpdate();
@@ -67,6 +67,6 @@ public void testIncrementalChangelog() throws IOException {
List changeSplits =
FlinkSplitPlanner.planChangeTable(changeTableScan, new AtomicInteger());
- Assert.assertEquals(1, changeSplits.size());
+ Assertions.assertEquals(1, changeSplits.size());
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java
index ecdf4c47e9..b21b0d0d19 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/TestMixedFormatSource.java
@@ -81,11 +81,10 @@
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +108,6 @@ public class TestMixedFormatSource extends TestRowDataReaderFunction implements
private static final long serialVersionUID = 7418812854449034756L;
private static final int PARALLELISM = 1;
- @Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
@@ -119,13 +117,23 @@ public class TestMixedFormatSource extends TestRowDataReaderFunction implements
.withHaLeadershipControl()
.build());
+ @BeforeEach
+ public void startMiniClusterResource() throws Exception {
+ miniClusterResource.before();
+ }
+
+ @AfterEach
+ public void stopMiniClusterResource() {
+ miniClusterResource.after();
+ }
+
protected KeyedTable testFailoverTable;
protected static final String SINK_TABLE_NAME = "test_sink_exactly_once";
protected static final TableIdentifier FAIL_TABLE_ID =
TableIdentifier.of(
TableTestHelper.TEST_CATALOG_NAME, TableTestHelper.TEST_DB_NAME, SINK_TABLE_NAME);
- @Before
+ @BeforeEach
public void testSetup() throws IOException {
MixedFormatCatalog testCatalog = getMixedFormatCatalog();
@@ -145,7 +153,7 @@ public void testSetup() throws IOException {
}
}
- @After
+ @AfterEach
public void dropTable() {
miniClusterResource.cancelAllJobs();
getMixedFormatCatalog().dropTable(FAIL_TABLE_ID, true);
@@ -266,7 +274,7 @@ public void testDimTaskManagerFailover() throws Exception {
Thread.sleep(1000);
LOG.info("wait for watermark after failover");
}
- Assert.assertEquals(Long.MAX_VALUE, WatermarkAwareFailWrapper.getWatermarkAfterFailover());
+ Assertions.assertEquals(Long.MAX_VALUE, WatermarkAwareFailWrapper.getWatermarkAfterFailover());
}
@Test
@@ -359,7 +367,7 @@ public void testMixedFormatContinuousSourceWithEmptyChangeInInit() throws Except
List actualResult =
collectRecordsFromUnboundedStream(clientAndIterator, baseData.size());
- Assert.assertEquals(new HashSet<>(baseData), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(baseData), new HashSet<>(actualResult));
LOG.info(
"begin write update_before update_after data and commit new snapshot to change table.");
@@ -369,7 +377,7 @@ public void testMixedFormatContinuousSourceWithEmptyChangeInInit() throws Except
actualResult = collectRecordsFromUnboundedStream(clientAndIterator, excepts2().length * 2);
jobClient.cancel();
- Assert.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
getMixedFormatCatalog().dropTable(tableId, true);
}
@@ -429,7 +437,7 @@ public void testMixedFormatSourceEnumeratorWithChangeExpired() throws Exception
commit(table, result, false);
for (DataFile dataFile : changeDataFiles) {
- Assert.assertTrue(table.io().exists(dataFile.path().toString()));
+ Assertions.assertTrue(table.io().exists(dataFile.path().toString()));
}
final Duration monitorInterval = Duration.ofSeconds(1);
@@ -446,12 +454,12 @@ public void testMixedFormatSourceEnumeratorWithChangeExpired() throws Exception
List actualResult =
collectRecordsFromUnboundedStream(clientAndIterator, changeData.size());
- Assert.assertEquals(new HashSet<>(changeData), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(changeData), new HashSet<>(actualResult));
// expire changeTable snapshots
DeleteFiles deleteFiles = table.changeTable().newDelete();
for (DataFile dataFile : changeDataFiles) {
- Assert.assertTrue(table.io().exists(dataFile.path().toString()));
+ Assertions.assertTrue(table.io().exists(dataFile.path().toString()));
deleteFiles.deleteFile(dataFile);
}
deleteFiles.commit();
@@ -472,7 +480,7 @@ public void testMixedFormatSourceEnumeratorWithChangeExpired() throws Exception
actualResult = collectRecordsFromUnboundedStream(clientAndIterator, excepts2().length * 2);
jobClient.cancel();
- Assert.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
getMixedFormatCatalog().dropTable(tableId, true);
}
@@ -532,7 +540,7 @@ public void testMixedFormatSourceEnumeratorWithBaseExpired() throws Exception {
commit(table, result, true);
for (DataFile dataFile : baseDataFiles) {
- Assert.assertTrue(table.io().exists(dataFile.path().toString()));
+ Assertions.assertTrue(table.io().exists(dataFile.path().toString()));
}
final Duration monitorInterval = Duration.ofSeconds(1);
@@ -549,12 +557,12 @@ public void testMixedFormatSourceEnumeratorWithBaseExpired() throws Exception {
List actualResult =
collectRecordsFromUnboundedStream(clientAndIterator, baseData.size());
- Assert.assertEquals(new HashSet<>(baseData), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(baseData), new HashSet<>(actualResult));
// expire baseTable snapshots
DeleteFiles deleteFiles = table.baseTable().newDelete();
for (DataFile dataFile : baseDataFiles) {
- Assert.assertTrue(table.io().exists(dataFile.path().toString()));
+ Assertions.assertTrue(table.io().exists(dataFile.path().toString()));
deleteFiles.deleteFile(dataFile);
}
deleteFiles.commit();
@@ -575,7 +583,7 @@ public void testMixedFormatSourceEnumeratorWithBaseExpired() throws Exception {
actualResult = collectRecordsFromUnboundedStream(clientAndIterator, excepts2().length * 2);
jobClient.cancel();
- Assert.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
+ Assertions.assertEquals(new HashSet<>(updateRecords()), new HashSet<>(actualResult));
getMixedFormatCatalog().dropTable(tableId, true);
}
@@ -682,7 +690,7 @@ private boolean equalsRecords(List expected, List tableRecords
try {
RowData[] expectedArray = sortRowDataCollection(expected);
RowData[] actualArray = sortRowDataCollection(tableRecords);
- Assert.assertArrayEquals(expectedArray, actualArray);
+ Assertions.assertArrayEquals(expectedArray, actualArray);
return true;
} catch (Throwable e) {
return false;
@@ -813,12 +821,12 @@ private List collectRecordsFromUnboundedStream(
}
}
- Assert.assertEquals(
+ Assertions.assertEquals(
+ numElements,
+ result.size(),
String.format(
"The stream ended before reaching the requested %d records. Only %d records were received, received list:%s.",
- numElements, result.size(), Arrays.toString(result.toArray())),
- numElements,
- result.size());
+ numElements, result.size(), Arrays.toString(result.toArray())));
return result;
}
@@ -855,7 +863,8 @@ private ClientAndIterator executeAndCollectWithClient(
env.fromSource(
mixedFormatSource, WatermarkStrategy.noWatermarks(), "MixedFormatParallelSource")
.setParallelism(PARALLELISM);
- return DataStreamUtils.collectWithClient(source, "job_" + name.getMethodName());
+ return DataStreamUtils.collectWithClient(
+ source, "job_" + getClass().getSimpleName() + "_" + System.nanoTime());
}
private static GenericRowData convert(RowData row) {
@@ -1113,7 +1122,7 @@ public void processWatermark(Watermark mark) throws Exception {
}
if (failoverHappened) {
LOG.info("failover happened, watermark: {}", mark);
- Assert.assertEquals(Long.MAX_VALUE, mark.getTimestamp());
+ Assertions.assertEquals(Long.MAX_VALUE, mark.getTimestamp());
if (watermarkAfterFailover == -1) {
watermarkAfterFailover = mark.getTimestamp();
} else {
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaConsumer.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaConsumer.java
index 029ebf3e75..4a7b83ddfa 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaConsumer.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaConsumer.java
@@ -35,10 +35,10 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,12 +54,12 @@
public class TestKafkaConsumer extends TestBaseLog {
private static final Logger LOG = LoggerFactory.getLogger(TestKafkaConsumer.class);
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
KAFKA_CONTAINER.start();
}
- @AfterClass
+ @AfterAll
public static void shutdown() throws Exception {
KAFKA_CONTAINER.close();
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaSourceReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaSourceReader.java
index 2747d9ec59..2dfc42b391 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaSourceReader.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestKafkaSourceReader.java
@@ -23,7 +23,7 @@
import static org.apache.amoro.flink.kafka.testutils.KafkaContainerTest.readRecordsBytes;
import static org.apache.amoro.flink.shuffle.RowKindUtil.transformFromFlinkRowKind;
import static org.apache.amoro.flink.table.descriptors.MixedFormatValidator.MIXED_FORMAT_LOG_CONSISTENCY_GUARANTEE_ENABLE;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.amoro.flink.kafka.testutils.KafkaContainerTest;
import org.apache.amoro.flink.read.source.log.kafka.LogKafkaPartitionSplit;
@@ -51,12 +51,11 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,23 +76,21 @@ public class TestKafkaSourceReader {
private static final int NUM_RECORDS_PER_SPLIT = 10;
private static final int TOTAL_NUM_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
- @Rule public TestName testName = new TestName();
-
private static final byte[] JOB_ID = IdGenerator.generateUpstreamId();
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
KAFKA_CONTAINER.start();
}
- @AfterClass
+ @AfterAll
public static void shutdown() throws Exception {
KAFKA_CONTAINER.close();
}
- @Before
- public void initData() throws Exception {
- topic = TestUtil.getUtMethodName(testName);
+ @BeforeEach
+ public void initData(TestInfo testInfo) throws Exception {
+ topic = TestUtil.getUtMethodName(testInfo);
KafkaContainerTest.createTopics(KAFKA_PARTITION_NUMS, 1, topic);
write(topic, TOTAL_NUM_RECORDS);
}
@@ -233,16 +230,16 @@ public void emitWatermark(Watermark watermark) {}
public void validate() {
assertEquals(
- String.format("Should be %d distinct elements in total", TOTAL_NUM_RECORDS),
TOTAL_NUM_RECORDS,
- consumedValues.size());
+ consumedValues.size(),
+ String.format("Should be %d distinct elements in total", TOTAL_NUM_RECORDS));
assertEquals(
- String.format("Should be %d elements in total", TOTAL_NUM_RECORDS),
TOTAL_NUM_RECORDS,
- count);
- assertEquals("The min value should be 0", 0, min);
+ count,
+ String.format("Should be %d elements in total", TOTAL_NUM_RECORDS));
+ assertEquals(0, min, "The min value should be 0");
assertEquals(
- "The max value should be " + (TOTAL_NUM_RECORDS - 1), TOTAL_NUM_RECORDS - 1, max);
+ TOTAL_NUM_RECORDS - 1, max, "The max value should be " + (TOTAL_NUM_RECORDS - 1));
}
public int count() {
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestLogKafkaPartitionSplitReader.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestLogKafkaPartitionSplitReader.java
index 1c45d4abb9..53efe9f865 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestLogKafkaPartitionSplitReader.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hidden/kafka/TestLogKafkaPartitionSplitReader.java
@@ -21,7 +21,7 @@
import static org.apache.amoro.flink.kafka.testutils.KafkaContainerTest.KAFKA_CONTAINER;
import static org.apache.amoro.flink.kafka.testutils.KafkaContainerTest.readRecordsBytes;
import static org.apache.amoro.flink.shuffle.RowKindUtil.transformFromFlinkRowKind;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.amoro.flink.kafka.testutils.KafkaConfigGenerate;
import org.apache.amoro.flink.kafka.testutils.KafkaContainerTest;
@@ -51,10 +51,10 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class TestLogKafkaPartitionSplitReader {
private static Map> splitsByOwners;
private static final byte[] JOB_ID = IdGenerator.generateUpstreamId();
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
KAFKA_CONTAINER.start();
@@ -89,12 +89,12 @@ public static void prepare() throws Exception {
splitsByOwners = getSplitsByOwners(earliestOffsets);
}
- @AfterClass
+ @AfterAll
public static void shutdown() throws Exception {
KAFKA_CONTAINER.close();
}
- @Before
+ @BeforeEach
public void initData() throws Exception {
// |0 1 2 3 4 5 6 7 8 9 Flip 10 11 12 13 14| 15 16 17 18 19
write(TOPIC1, 0);
@@ -222,9 +222,10 @@ private void assignSplitsAndFetchUntilFinish(
numConsumedRecords.forEach(
(splitId, recordCount) -> {
assertEquals(
- String.format("%s should have %d records.", splits.get(splitId), expectedRecordCount),
expectedRecordCount,
- (long) recordCount);
+ (long) recordCount,
+ String.format(
+ "%s should have %d records.", splits.get(splitId), expectedRecordCount));
});
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestShuffleSplitAssigner.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestShuffleSplitAssigner.java
index eef62b8dbe..092c00b79e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestShuffleSplitAssigner.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestShuffleSplitAssigner.java
@@ -31,8 +31,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.table.data.RowData;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +67,7 @@ public void testSingleParallelism() {
}
}
- Assert.assertEquals(splitList.size(), actual.size());
+ Assertions.assertEquals(splitList.size(), actual.size());
}
@Test
@@ -90,7 +90,7 @@ public void testMultiParallelism() {
}
}
- Assert.assertEquals(splitList.size(), actual.size());
+ Assertions.assertEquals(splitList.size(), actual.size());
}
@Test
@@ -156,7 +156,7 @@ public String toString() {
.map(treeNode -> new long[] {treeNode.mask(), treeNode.index()})
.toArray(value -> new long[actualNodes.size()][]);
- Assert.assertArrayEquals(expectNodes, result);
+ Assertions.assertArrayEquals(expectNodes, result);
}
@Test
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestSplitAssignerAwaiting.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestSplitAssignerAwaiting.java
index 71c85609ae..2f552c52dd 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestSplitAssignerAwaiting.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestSplitAssignerAwaiting.java
@@ -21,8 +21,8 @@
import org.apache.amoro.flink.read.FlinkSplitPlanner;
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplitState;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.List;
@@ -36,8 +36,8 @@ public class TestSplitAssignerAwaiting extends TestShuffleSplitAssigner {
public void testEmpty() {
ShuffleSplitAssigner splitAssigner = instanceSplitAssigner(1);
Split split = splitAssigner.getNext(0);
- Assert.assertNotNull(split);
- Assert.assertEquals(Split.Status.UNAVAILABLE, split.status());
+ Assertions.assertNotNull(split);
+ Assertions.assertEquals(Split.Status.UNAVAILABLE, split.status());
}
@Test
@@ -91,11 +91,11 @@ private void assertAvailableFuture(ShuffleSplitAssigner assigner, Runnable addSp
// calling isAvailable again should return the same object reference
// note that thenAccept will return a new future.
// we want to assert the same instance on the assigner returned future
- Assert.assertSame(future, assigner.isAvailable());
+ Assertions.assertSame(future, assigner.isAvailable());
// now add some splits
addSplitsRunnable.run();
- Assert.assertTrue(futureCompleted.get());
+ Assertions.assertTrue(futureCompleted.get());
for (int i = 0; i < 1; ++i) {
assertGetNext(assigner, Split.Status.AVAILABLE);
@@ -106,21 +106,21 @@ private void assertAvailableFuture(ShuffleSplitAssigner assigner, Runnable addSp
private void assertGetNext(ShuffleSplitAssigner assigner, Split.Status expectedStatus) {
Split result = assigner.getNext(0);
- Assert.assertEquals(expectedStatus, result.status());
+ Assertions.assertEquals(expectedStatus, result.status());
switch (expectedStatus) {
case AVAILABLE:
- Assert.assertNotNull(result.split());
+ Assertions.assertNotNull(result.split());
break;
case UNAVAILABLE:
- Assert.assertNull(result.split());
+ Assertions.assertNull(result.split());
break;
default:
- Assert.fail("Unknown status: " + expectedStatus);
+ Assertions.fail("Unknown status: " + expectedStatus);
}
}
private void assertSnapshot(ShuffleSplitAssigner assigner, int splitCount) {
Collection stateBeforeGet = assigner.state();
- Assert.assertEquals(splitCount, stateBeforeGet.size());
+ Assertions.assertEquals(splitCount, stateBeforeGet.size());
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestStaticSplitAssigner.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestStaticSplitAssigner.java
index 3c614e921d..638abdfa90 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestStaticSplitAssigner.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/assigner/TestStaticSplitAssigner.java
@@ -21,8 +21,8 @@
import org.apache.amoro.flink.read.FlinkSplitPlanner;
import org.apache.amoro.flink.read.hybrid.reader.TestRowDataReaderFunction;
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public void testSingleParallelism() throws IOException {
}
}
- Assert.assertEquals(splitList.size(), actual.size());
+ Assertions.assertEquals(splitList.size(), actual.size());
}
}
@@ -77,7 +77,7 @@ public void testMultiParallelism() throws IOException {
}
}
- Assert.assertEquals(splitList.size(), actual.size());
+ Assertions.assertEquals(splitList.size(), actual.size());
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestContinuousSplitPlannerImpl.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestContinuousSplitPlannerImpl.java
index ff41fb5319..e6e6b27d64 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestContinuousSplitPlannerImpl.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestContinuousSplitPlannerImpl.java
@@ -33,7 +33,7 @@
import org.apache.flink.types.RowKind;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.io.TaskWriter;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public TestContinuousSplitPlannerImpl(
new BasicTableTestHelper(true, true));
}
- @Before
+ @BeforeEach
public void init() throws IOException {
testKeyedTable = getMixedTable().asKeyedTable();
// write base
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumStateSerializer.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumStateSerializer.java
index 1c1f52616f..04d51245a6 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumStateSerializer.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumStateSerializer.java
@@ -25,8 +25,8 @@
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.flink.read.hybrid.split.TemporalJoinSplits;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,12 +60,12 @@ public void testMixedFormatEnumState() throws IOException {
new MixedFormatSourceEnumStateSerializer();
byte[] ser = mixedFormatSourceEnumStateSerializer.serialize(expect);
- Assert.assertNotNull(ser);
+ Assertions.assertNotNull(ser);
MixedFormatSourceEnumState actual = mixedFormatSourceEnumStateSerializer.deserialize(1, ser);
- Assert.assertEquals(expect.pendingSplits().size(), actual.pendingSplits().size());
- Assert.assertEquals(
+ Assertions.assertEquals(expect.pendingSplits().size(), actual.pendingSplits().size());
+ Assertions.assertEquals(
Objects.requireNonNull(expect.shuffleSplitRelation()).length,
Objects.requireNonNull(actual.shuffleSplitRelation()).length);
@@ -86,10 +86,10 @@ public void testMixedFormatEnumState() throws IOException {
}
}
- Assert.assertEquals(splitList.size(), actualSplits.size());
+ Assertions.assertEquals(splitList.size(), actualSplits.size());
TemporalJoinSplits temporalJoinSplits = actual.temporalJoinSplits();
- Assert.assertEquals(expect.temporalJoinSplits(), temporalJoinSplits);
+ Assertions.assertEquals(expect.temporalJoinSplits(), temporalJoinSplits);
}
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumerator.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumerator.java
index 2849bf3ca3..396300cc9e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumerator.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestMixedFormatSourceEnumerator.java
@@ -44,9 +44,9 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.io.TaskWriter;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.LocalDate;
@@ -79,7 +79,7 @@ public TestMixedFormatSourceEnumerator() {
protected static final LocalDateTime LDT =
LocalDateTime.of(LocalDate.of(2022, 1, 1), LocalTime.of(0, 0, 0, 0));
- @Before
+ @BeforeEach
public void init() throws IOException {
testKeyedTable = getMixedTable().asKeyedTable();
// write change insert
@@ -152,7 +152,7 @@ public void testReadersNumGreaterThanSplits() throws Exception {
Collection pendingSplitsEmpty =
enumerator.snapshotState(1).pendingSplits();
- Assert.assertEquals(splitCount, pendingSplitsEmpty.size());
+ Assertions.assertEquals(splitCount, pendingSplitsEmpty.size());
// register readers, and let them request a split
// 4 split, 5 subtask, one or more subtask will fetch empty split
@@ -177,13 +177,13 @@ public void testReadersNumGreaterThanSplits() throws Exception {
enumerator.addReader(4);
enumerator.handleSourceEvent(4, new SplitRequestEvent());
- Assert.assertEquals(parallelism - splitCount, enumerator.getReadersAwaitingSplit().size());
- Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
+ Assertions.assertEquals(parallelism - splitCount, enumerator.getReadersAwaitingSplit().size());
+ Assertions.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
}
private void assertSnapshot(ShuffleSplitAssigner assigner, int splitCount) {
Collection stateBeforeGet = assigner.state();
- Assert.assertEquals(splitCount, stateBeforeGet.size());
+ Assertions.assertEquals(splitCount, stateBeforeGet.size());
}
private ShuffleSplitAssigner instanceSplitAssigner(
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestTemporalJoinSplitsThreadSafe.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestTemporalJoinSplitsThreadSafe.java
index ba378491d1..c65aa1aae4 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestTemporalJoinSplitsThreadSafe.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/enumerator/TestTemporalJoinSplitsThreadSafe.java
@@ -20,8 +20,8 @@
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.flink.read.hybrid.split.TemporalJoinSplits;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
@@ -72,7 +72,7 @@ public void round(List allSplit, Collection mixedForma
CompletableFuture f4 =
CompletableFuture.runAsync(() -> temporalJoinSplits.addSplitsBack(as2));
CompletableFuture.allOf(f1, f2, f3, f4).join();
- Assert.assertTrue(temporalJoinSplits.removeAndReturnIfAllFinished(allSplit));
+ Assertions.assertTrue(temporalJoinSplits.removeAndReturnIfAllFinished(allSplit));
}
static class TestMixedFormatSplit extends MixedFormatSplit {
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
index 61da6d3e71..77b371f8fd 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/MixedIncrementalLoaderTest.java
@@ -21,7 +21,7 @@
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
-import org.apache.amoro.catalog.TableTestBase;
+import org.apache.amoro.flink.FlinkTestBase;
import org.apache.amoro.flink.read.MixedIncrementalLoader;
import org.apache.amoro.flink.read.hybrid.enumerator.ContinuousSplitPlanner;
import org.apache.amoro.flink.read.hybrid.enumerator.MergeOnReadIncrementalPlanner;
@@ -42,34 +42,25 @@
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.TaskWriter;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
-@RunWith(value = Parameterized.class)
-public class MixedIncrementalLoaderTest extends TableTestBase implements FlinkTaskWriterBaseTest {
+public class MixedIncrementalLoaderTest extends FlinkTestBase implements FlinkTaskWriterBaseTest {
- public MixedIncrementalLoaderTest(boolean partitionedTable) {
- super(
+ private void setUpForParam(boolean partitionedTable) throws Exception {
+ initFlinkTestBase(
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
new BasicTableTestHelper(true, partitionedTable));
+ populateTable();
}
- @Parameterized.Parameters(name = "partitionedTable = {0}")
- public static Object[][] parameters() {
- // todo mix hive test
- return new Object[][] {{true}, {false}};
- }
-
- @Before
- public void before() throws IOException {
+ private void populateTable() throws IOException {
MixedTable mixedTable = getMixedTable();
TableSchema flinkPartialSchema =
TableSchema.builder()
@@ -109,8 +100,10 @@ public void before() throws IOException {
}
}
- @Test
- public void testMOR() {
+ @ParameterizedTest(name = "partitionedTable = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMOR(boolean partitionedTable) throws Exception {
+ setUpForParam(partitionedTable);
KeyedTable keyedTable = getMixedTable().asKeyedTable();
List expressions =
Lists.newArrayList(Expressions.greaterThan("op_time", "2022-06-20T10:10:11.0"));
@@ -154,9 +147,9 @@ public void testMOR() {
}
}
if (isPartitionedTable()) {
- Assert.assertEquals(6, actuals.size());
+ Assertions.assertEquals(6, actuals.size());
} else {
- Assert.assertEquals(9, actuals.size());
+ Assertions.assertEquals(9, actuals.size());
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/TestRowDataReaderFunction.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/TestRowDataReaderFunction.java
index a86e2afda8..13169f245e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/TestRowDataReaderFunction.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/reader/TestRowDataReaderFunction.java
@@ -41,8 +41,8 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.TaskWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -179,7 +179,7 @@ public void testReadNodesUpMoved() throws IOException {
}
protected void assertArrayEquals(RowData[] excepts, List actual) {
- Assert.assertArrayEquals(excepts, sortRowDataCollection(actual));
+ Assertions.assertArrayEquals(excepts, sortRowDataCollection(actual));
}
protected RowData[] sortRowDataCollection(Collection records) {
@@ -207,7 +207,7 @@ protected void writeUpdateWithSpecifiedMaskOne() throws IOException {
protected void writeUpdateWithSpecifiedMask(List input, KeyedTable table, long mask)
throws IOException {
// write change update
- TaskWriter taskWriter = createKeyedTaskWriter(table, ROW_TYPE, false, mask);
+ TaskWriter taskWriter = createKeyedTaskWriterWithMask(table, ROW_TYPE, false, mask);
for (RowData record : input) {
taskWriter.write(record);
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/split/TestMixedFormatSplitSerializer.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/split/TestMixedFormatSplitSerializer.java
index f1a6d115c3..d372753566 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/split/TestMixedFormatSplitSerializer.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/read/hybrid/split/TestMixedFormatSplitSerializer.java
@@ -21,8 +21,8 @@
import org.apache.amoro.flink.read.FlinkSplitPlanner;
import org.apache.amoro.flink.read.hybrid.reader.TestRowDataReaderFunction;
import org.apache.flink.util.FlinkRuntimeException;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
@@ -60,7 +60,7 @@ private void assertSerializedSplitEquals(List expected) {
})
.collect(Collectors.toList());
- Assert.assertArrayEquals(
+ Assertions.assertArrayEquals(
expected.toArray(new MixedFormatSplit[0]),
contents.stream()
.map(
@@ -84,6 +84,6 @@ public void testNullableSplit() throws IOException {
MixedFormatSplit actual = serializer.deserialize(1, ser);
- Assert.assertNull(actual);
+ Assertions.assertNull(actual);
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestLogRecordV1.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestLogRecordV1.java
index bafb9c28cc..9e8bac740a 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestLogRecordV1.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestLogRecordV1.java
@@ -18,8 +18,8 @@
package org.apache.amoro.flink.shuffle;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.amoro.data.ChangeAction;
import org.apache.amoro.log.Bytes;
@@ -35,8 +35,8 @@
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
@@ -83,7 +83,7 @@ public void testLogDataSerialize() throws IOException {
byte[] bytes = logDataJsonSerialization.serialize(logData);
- Assert.assertNotNull(bytes);
+ Assertions.assertNotNull(bytes);
String actualJson = new String(Bytes.subByte(bytes, 18, bytes.length - 18));
String expected =
"{\"f_boolean\":true,\"f_int\":1,\"f_long\":123456789,\"f_list_string\":[null,\"b\",null,\"c\",null]}";
@@ -93,7 +93,7 @@ public void testLogDataSerialize() throws IOException {
new LogDataJsonDeserialization<>(
userSchema, LogRecordV1.factory, LogRecordV1.arrayFactory, LogRecordV1.mapFactory);
LogData result = logDataJsonDeserialization.deserialize(bytes);
- Assert.assertNotNull(result);
+ Assertions.assertNotNull(result);
check(logData, result);
}
@@ -118,7 +118,7 @@ public void testLogDataSerializeNullList() throws IOException {
byte[] bytes = logDataJsonSerialization.serialize(logData);
- Assert.assertNotNull(bytes);
+ Assertions.assertNotNull(bytes);
String actualJson = new String(Bytes.subByte(bytes, 18, bytes.length - 18));
String expected =
"{\"f_boolean\":true,\"f_int\":1,\"f_long\":123456789,\"f_list_string\":[null,null,null]}";
@@ -128,7 +128,7 @@ public void testLogDataSerializeNullList() throws IOException {
new LogDataJsonDeserialization<>(
userSchema, LogRecordV1.factory, LogRecordV1.arrayFactory, LogRecordV1.mapFactory);
LogData result = logDataJsonDeserialization.deserialize(bytes);
- Assert.assertNotNull(result);
+ Assertions.assertNotNull(result);
check(logData, result);
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestRoundRobinShuffleRulePolicy.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestRoundRobinShuffleRulePolicy.java
index c870eb2287..125deb1e5e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestRoundRobinShuffleRulePolicy.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/shuffle/TestRoundRobinShuffleRulePolicy.java
@@ -27,144 +27,154 @@
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
-@RunWith(Parameterized.class)
public class TestRoundRobinShuffleRulePolicy extends FlinkTestBase {
- public TestRoundRobinShuffleRulePolicy(boolean keyedTable, boolean partitionedTable) {
- super(
+ static Stream parameters() {
+ return Stream.of(
+ Arguments.of(true, true),
+ Arguments.of(true, false),
+ Arguments.of(false, true),
+ Arguments.of(false, false));
+ }
+
+ private void setUpForParam(boolean keyedTable, boolean partitionedTable) throws Exception {
+ initFlinkTestBase(
new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
new BasicTableTestHelper(keyedTable, partitionedTable));
}
- @Parameterized.Parameters(name = "keyedTable = {0}, partitionedTable = {1}")
- public static Object[][] parameters() {
- return new Object[][] {
- {true, true},
- {true, false},
- {false, true},
- {false, false}
- };
- }
+ @ParameterizedTest(name = "keyedTable = {0}, partitionedTable = {1}")
+ @MethodSource("parameters")
+ public void testPrimaryKeyPartitionedTable(boolean keyedTable, boolean partitionedTable)
+ throws Exception {
+ setUpForParam(keyedTable, partitionedTable);
- @Test
- public void testPrimaryKeyPartitionedTable() throws Exception {
- Assume.assumeTrue(isKeyedTable());
- Assume.assumeTrue(isPartitionedTable());
+ Assumptions.assumeTrue(isKeyedTable());
+ Assumptions.assumeTrue(isPartitionedTable());
ShuffleHelper helper =
ShuffleHelper.build(getMixedTable(), getMixedTable().schema(), FLINK_ROW_TYPE);
RoundRobinShuffleRulePolicy policy = new RoundRobinShuffleRulePolicy(helper, 5, 2);
Map> subTaskTreeNodes = policy.getSubtaskTreeNodes();
- Assert.assertEquals(subTaskTreeNodes.size(), 5);
+ Assertions.assertEquals(subTaskTreeNodes.size(), 5);
subTaskTreeNodes
.values()
.forEach(
nodes -> {
- Assert.assertEquals(nodes.size(), 2);
- Assert.assertTrue(nodes.contains(DataTreeNode.of(1, 0)));
- Assert.assertTrue(nodes.contains(DataTreeNode.of(1, 1)));
+ Assertions.assertEquals(nodes.size(), 2);
+ Assertions.assertTrue(nodes.contains(DataTreeNode.of(1, 0)));
+ Assertions.assertTrue(nodes.contains(DataTreeNode.of(1, 1)));
});
KeySelector keySelector = policy.generateKeySelector();
Partitioner partitioner = policy.generatePartitioner();
- Assert.assertEquals(
+ Assertions.assertEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(1, "hello2", "2022-10-11T10:10:11.0")), 5));
- Assert.assertNotEquals(
+ Assertions.assertNotEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(1, "hello2", "2022-10-12T10:10:11.0")), 5));
- Assert.assertNotEquals(
+ Assertions.assertNotEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(2, "hello2", "2022-10-11T10:10:11.0")), 5));
}
- @Test
- public void testPrimaryKeyTableWithoutPartition() throws Exception {
- Assume.assumeTrue(isKeyedTable());
- Assume.assumeFalse(isPartitionedTable());
+ @ParameterizedTest(name = "keyedTable = {0}, partitionedTable = {1}")
+ @MethodSource("parameters")
+ public void testPrimaryKeyTableWithoutPartition(boolean keyedTable, boolean partitionedTable)
+ throws Exception {
+ setUpForParam(keyedTable, partitionedTable);
+
+ Assumptions.assumeTrue(isKeyedTable());
+ Assumptions.assumeFalse(isPartitionedTable());
ShuffleHelper helper =
ShuffleHelper.build(getMixedTable(), getMixedTable().schema(), FLINK_ROW_TYPE);
RoundRobinShuffleRulePolicy policy = new RoundRobinShuffleRulePolicy(helper, 5, 2);
Map> subTaskTreeNodes = policy.getSubtaskTreeNodes();
- Assert.assertEquals(subTaskTreeNodes.size(), 5);
- Assert.assertEquals(
+ Assertions.assertEquals(subTaskTreeNodes.size(), 5);
+ Assertions.assertEquals(
subTaskTreeNodes.get(0), Sets.newHashSet(DataTreeNode.of(7, 0), DataTreeNode.of(7, 5)));
- Assert.assertEquals(
+ Assertions.assertEquals(
subTaskTreeNodes.get(1), Sets.newHashSet(DataTreeNode.of(7, 1), DataTreeNode.of(7, 6)));
- Assert.assertEquals(
+ Assertions.assertEquals(
subTaskTreeNodes.get(2), Sets.newHashSet(DataTreeNode.of(7, 2), DataTreeNode.of(7, 7)));
- Assert.assertEquals(subTaskTreeNodes.get(3), Sets.newHashSet(DataTreeNode.of(7, 3)));
- Assert.assertEquals(subTaskTreeNodes.get(4), Sets.newHashSet(DataTreeNode.of(7, 4)));
+ Assertions.assertEquals(subTaskTreeNodes.get(3), Sets.newHashSet(DataTreeNode.of(7, 3)));
+ Assertions.assertEquals(subTaskTreeNodes.get(4), Sets.newHashSet(DataTreeNode.of(7, 4)));
KeySelector keySelector = policy.generateKeySelector();
Partitioner partitioner = policy.generatePartitioner();
- Assert.assertEquals(
+ Assertions.assertEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(1, "hello2", "2022-10-11T10:10:11.0")), 5));
- Assert.assertEquals(
+ Assertions.assertEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(1, "hello2", "2022-10-12T10:10:11.0")), 5));
- Assert.assertNotEquals(
+ Assertions.assertNotEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(2, "hello2", "2022-10-11T10:10:11.0")), 5));
}
- @Test
- public void testPartitionedTableWithoutPrimaryKey() throws Exception {
- Assume.assumeFalse(isKeyedTable());
- Assume.assumeTrue(isPartitionedTable());
+ @ParameterizedTest(name = "keyedTable = {0}, partitionedTable = {1}")
+ @MethodSource("parameters")
+ public void testPartitionedTableWithoutPrimaryKey(boolean keyedTable, boolean partitionedTable)
+ throws Exception {
+ setUpForParam(keyedTable, partitionedTable);
+
+ Assumptions.assumeFalse(isKeyedTable());
+ Assumptions.assumeTrue(isPartitionedTable());
ShuffleHelper helper =
ShuffleHelper.build(getMixedTable(), getMixedTable().schema(), FLINK_ROW_TYPE);
RoundRobinShuffleRulePolicy policy = new RoundRobinShuffleRulePolicy(helper, 5, 2);
Map> subTaskTreeNodes = policy.getSubtaskTreeNodes();
- Assert.assertEquals(subTaskTreeNodes.size(), 5);
+ Assertions.assertEquals(subTaskTreeNodes.size(), 5);
subTaskTreeNodes
.values()
.forEach(
nodes -> {
- Assert.assertEquals(nodes.size(), 1);
- Assert.assertTrue(nodes.contains(DataTreeNode.of(0, 0)));
+ Assertions.assertEquals(nodes.size(), 1);
+ Assertions.assertTrue(nodes.contains(DataTreeNode.of(0, 0)));
});
KeySelector keySelector = policy.generateKeySelector();
Partitioner partitioner = policy.generatePartitioner();
- Assert.assertEquals(
+ Assertions.assertEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(1, "hello2", "2022-10-11T10:10:11.0")), 5));
- Assert.assertEquals(
+ Assertions.assertEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
keySelector.getKey(createRowData(2, "hello2", "2022-10-11T10:10:11.0")), 5));
- Assert.assertNotEquals(
+ Assertions.assertNotEquals(
partitioner.partition(
keySelector.getKey(createRowData(1, "hello", "2022-10-11T10:10:11.0")), 5),
partitioner.partition(
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/AmoroCatalogITCaseBase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/AmoroCatalogITCaseBase.java
index 6ab283e097..6617de5dbb 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/AmoroCatalogITCaseBase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/AmoroCatalogITCaseBase.java
@@ -20,11 +20,12 @@
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
+import org.apache.amoro.AmoroCatalog;
import org.apache.amoro.TestAms;
import org.apache.amoro.flink.MiniClusterResource;
-import org.apache.amoro.formats.AmoroCatalogTestBase;
import org.apache.amoro.formats.AmoroCatalogTestHelper;
import org.apache.amoro.hive.TestHMS;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
@@ -37,31 +38,105 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.junit.ClassRule;
-
-import java.io.IOException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+
+/**
+ * JUnit 5 base class for Flink unified-catalog integration tests.
+ *
+ * This class no longer extends the still-JUnit-4 {@code AmoroCatalogTestBase}; the catalog
+ * lifecycle is re-implemented here against the {@link AmoroCatalogTestHelper} contract so children
+ * can be clean Jupiter classes. Parameterized children construct without arguments and call {@link
+ * #initAmoroCatalog(AmoroCatalogTestHelper)} from the {@code @ParameterizedTest} method body.
+ */
+public class AmoroCatalogITCaseBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AmoroCatalogITCaseBase.class);
-public class AmoroCatalogITCaseBase extends AmoroCatalogTestBase {
- static final TestHMS TEST_HMS = new TestHMS();
+ protected static final TestHMS TEST_HMS = new TestHMS();
public static final String TEST_DB_NAME = "test_db";
public static final String TEST_TABLE_NAME = "test_table";
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ protected static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
- @ClassRule public static TestAms TEST_AMS = new TestAms();
+ protected static final TestAms TEST_AMS = new TestAms();
+
+ protected AmoroCatalogTestHelper> catalogTestHelper;
+ protected AmoroCatalog amoroCatalog;
+ protected Object originalCatalog;
+
+ private File tempRoot;
private volatile StreamTableEnvironment tEnv = null;
private volatile StreamExecutionEnvironment env = null;
+ /** No-arg constructor for parameterized children. */
+ public AmoroCatalogITCaseBase() {}
+
public AmoroCatalogITCaseBase(AmoroCatalogTestHelper> catalogTestHelper) {
- super(catalogTestHelper);
+ this.catalogTestHelper = catalogTestHelper;
+ }
+
+ @BeforeAll
+ public static void startBaseClassResources() throws Exception {
+ TEST_AMS.before();
+ MINI_CLUSTER_RESOURCE.before();
+ }
+
+ @AfterAll
+ public static void stopBaseClassResources() {
+ try {
+ MINI_CLUSTER_RESOURCE.after();
+ } finally {
+ TEST_AMS.after();
+ }
+ }
+
+ @BeforeEach
+ public void setUpAmoroCatalog() throws Exception {
+ if (catalogTestHelper == null) {
+ // Parameterized child: must call initAmoroCatalog(...) inside the test body.
+ return;
+ }
+ initLifecycle();
+ }
+
+ @AfterEach
+ public void tearDownAmoroCatalog() {
+ if (catalogTestHelper != null) {
+ try {
+ catalogTestHelper.clean();
+ } catch (Throwable t) {
+ LOG.warn("Failed to clean catalog helper", t);
+ }
+ }
+ if (tempRoot != null) {
+ try {
+ FileUtils.deleteDirectory(tempRoot);
+ } catch (Throwable t) {
+ LOG.warn("Failed to delete temp dir {}", tempRoot, t);
+ }
+ tempRoot = null;
+ }
+ }
+
+ protected void initAmoroCatalog(AmoroCatalogTestHelper> catalogTestHelper) throws Exception {
+ this.catalogTestHelper = catalogTestHelper;
+ initLifecycle();
}
- @Override
- public void setupCatalog() throws IOException {
- super.setupCatalog();
+ private void initLifecycle() throws Exception {
+ tempRoot = Files.createTempDirectory("amoro-catalog-it").toFile();
+ catalogTestHelper.initWarehouse(tempRoot.getPath());
+ this.amoroCatalog = catalogTestHelper.amoroCatalog();
+ this.originalCatalog = catalogTestHelper.originalCatalog();
catalogTestHelper.initHiveConf(TEST_HMS.getHiveConf());
TEST_AMS.getAmsHandler().createCatalog(catalogTestHelper.getCatalogMeta());
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/CatalogITCaseBase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/CatalogITCaseBase.java
index 84500df822..8d1e67a2aa 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/CatalogITCaseBase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/CatalogITCaseBase.java
@@ -20,10 +20,25 @@
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
+import org.apache.amoro.MockAmoroManagementServer;
+import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestAms;
+import org.apache.amoro.UnifiedCatalog;
+import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.catalog.CatalogTestHelper;
-import org.apache.amoro.catalog.TableTestBase;
import org.apache.amoro.flink.MiniClusterResource;
+import org.apache.amoro.mixed.CatalogLoader;
+import org.apache.amoro.mixed.MixedFormatCatalog;
+import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.TableBuilder;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.CatalogUtil;
+import org.apache.amoro.utils.MixedTableUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
@@ -36,19 +51,246 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.junit.ClassRule;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public abstract class CatalogITCaseBase extends TableTestBase {
+import java.io.File;
+import java.nio.file.Files;
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+/**
+ * JUnit 5 base class for Flink catalog integration tests that exercise the mixed-format catalog
+ * lifecycle but do not need the streaming-write helpers from FlinkTestBase. Implementations of
+ * FlinkTaskWriterBaseTest extend this directly so that no static {@code createKeyedTaskWriter}
+ * inherited from FlinkTestBase clashes with the interface's default methods.
+ */
+public abstract class CatalogITCaseBase {
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogITCaseBase.class);
+
+ protected static final TestAms TEST_AMS = new TestAms();
+
+ protected static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
+ private CatalogTestHelper catalogTestHelper;
+ private TableTestHelper tableTestHelper;
+ private CatalogMeta catalogMeta;
+ private MixedFormatCatalog mixedFormatCatalog;
+ private UnifiedCatalog unifiedCatalog;
+ private org.apache.iceberg.catalog.Catalog icebergCatalog;
+ private MixedTable mixedTable;
+ private TableMetaStore tableMetaStore;
+ private File tempRoot;
+
private volatile StreamTableEnvironment tEnv = null;
private volatile StreamExecutionEnvironment env = null;
+ /** No-arg constructor for parameterized children that supply helpers in the test method body. */
+ public CatalogITCaseBase() {}
+
public CatalogITCaseBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
- super(catalogTestHelper, tableTestHelper);
+ this.catalogTestHelper = catalogTestHelper;
+ this.tableTestHelper = tableTestHelper;
+ }
+
+ @BeforeAll
+ public static void startBaseClassResources() throws Exception {
+ TEST_AMS.before();
+ MINI_CLUSTER_RESOURCE.before();
+ }
+
+ @AfterAll
+ public static void stopBaseClassResources() {
+ try {
+ MINI_CLUSTER_RESOURCE.after();
+ } finally {
+ TEST_AMS.after();
+ }
+ }
+
+ @BeforeEach
+ public void setUpCatalogITCase() throws Exception {
+ if (catalogTestHelper == null) {
+ return;
+ }
+ initLifecycle();
+ }
+
+ @AfterEach
+ public void tearDownCatalogITCase() {
+ teardownLifecycle();
+ }
+
+ protected void initCatalogITCase(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) throws Exception {
+ this.catalogTestHelper = catalogTestHelper;
+ this.tableTestHelper = tableTestHelper;
+ initLifecycle();
+ }
+
+ private void initLifecycle() throws Exception {
+ tempRoot = Files.createTempDirectory("catalog-it").toFile();
+ String baseDir = tempRoot.getPath();
+ if (!SystemUtils.IS_OS_UNIX) {
+ baseDir = "file:/" + baseDir.replace("\\", "/");
+ }
+ catalogMeta = catalogTestHelper.buildCatalogMeta(baseDir);
+ catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, TEST_AMS.getServerUrl());
+ getAmsHandler().createCatalog(catalogMeta);
+ if (tableTestHelper != null) {
+ createTestTable();
+ }
+ }
+
+ private void teardownLifecycle() {
+ if (tableTestHelper != null && unifiedCatalog != null) {
+ try {
+ unifiedCatalog.dropTable(
+ tableTestHelper.id().getDatabase(), tableTestHelper.id().getTableName(), true);
+ } catch (Exception e) {
+ LOG.warn("dropTable failed", e);
+ }
+ try {
+ unifiedCatalog.dropDatabase(TableTestHelper.TEST_DB_NAME);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (catalogMeta != null) {
+ try {
+ getAmsHandler().dropCatalog(catalogMeta.getCatalogName());
+ } catch (Exception e) {
+ LOG.warn("dropCatalog failed", e);
+ }
+ }
+ if (tempRoot != null) {
+ try {
+ FileUtils.deleteDirectory(tempRoot);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean temp directory {}", tempRoot, e);
+ }
+ }
+ catalogMeta = null;
+ mixedFormatCatalog = null;
+ unifiedCatalog = null;
+ icebergCatalog = null;
+ mixedTable = null;
+ tableMetaStore = null;
+ tempRoot = null;
+ }
+
+ private void createTestTable() {
+ this.tableMetaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
+ getUnifiedCatalog().createDatabase(TableTestHelper.TEST_DB_NAME);
+ TableFormat format = getTestFormat();
+ if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
+ createMixedFormatTable();
+ } else if (TableFormat.ICEBERG.equals(format)) {
+ createIcebergFormatTable();
+ }
+ }
+
+ private void createMixedFormatTable() {
+ TableBuilder tableBuilder =
+ getMixedFormatCatalog()
+ .newTableBuilder(TableTestHelper.TEST_TABLE_ID, tableTestHelper.tableSchema());
+ tableBuilder.withProperties(tableTestHelper.tableProperties());
+ if (isKeyedTable()) {
+ tableBuilder.withPrimaryKeySpec(tableTestHelper.primaryKeySpec());
+ }
+ if (isPartitionedTable()) {
+ tableBuilder.withPartitionSpec(tableTestHelper.partitionSpec());
+ }
+ mixedTable = tableBuilder.create();
+ }
+
+ private void createIcebergFormatTable() {
+ getIcebergCatalog()
+ .createTable(
+ org.apache.iceberg.catalog.TableIdentifier.of(
+ TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME),
+ tableTestHelper.tableSchema(),
+ tableTestHelper.partitionSpec(),
+ tableTestHelper.tableProperties());
+ mixedTable =
+ (MixedTable)
+ getUnifiedCatalog()
+ .loadTable(TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME)
+ .originalTable();
+ }
+
+ public static MockAmoroManagementServer.AmsHandler getAmsHandler() {
+ return TEST_AMS.getAmsHandler();
+ }
+
+ protected MixedFormatCatalog getMixedFormatCatalog() {
+ if (mixedFormatCatalog == null) {
+ mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
+ }
+ return mixedFormatCatalog;
+ }
+
+ protected void refreshMixedFormatCatalog() {
+ this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
+ }
+
+ protected String getCatalogUri() {
+ return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
+ }
+
+ protected CatalogMeta getCatalogMeta() {
+ return catalogMeta;
+ }
+
+ protected TableFormat getTestFormat() {
+ return catalogTestHelper.tableFormat();
+ }
+
+ protected org.apache.iceberg.catalog.Catalog getIcebergCatalog() {
+ if (icebergCatalog == null) {
+ icebergCatalog = catalogTestHelper.buildIcebergCatalog(catalogMeta);
+ }
+ return icebergCatalog;
+ }
+
+ protected UnifiedCatalog getUnifiedCatalog() {
+ if (unifiedCatalog == null) {
+ unifiedCatalog = catalogTestHelper.buildUnifiedCatalog(catalogMeta);
+ }
+ return unifiedCatalog;
+ }
+
+ protected MixedTable getMixedTable() {
+ return mixedTable;
+ }
+
+ protected UnkeyedTable getBaseStore() {
+ return MixedTableUtil.baseStore(mixedTable);
+ }
+
+ protected TableMetaStore getTableMetaStore() {
+ return this.tableMetaStore;
+ }
+
+ protected boolean isKeyedTable() {
+ return tableTestHelper.primaryKeySpec() != null
+ && tableTestHelper.primaryKeySpec().primaryKeyExisted();
+ }
+
+ protected boolean isPartitionedTable() {
+ return tableTestHelper.partitionSpec() != null
+ && tableTestHelper.partitionSpec().isPartitioned();
+ }
+
+ protected TableTestHelper tableTestHelper() {
+ return tableTestHelper;
+ }
+
+ protected CatalogTestHelper catalogTestHelper() {
+ return catalogTestHelper;
}
protected TableResult exec(String query, Object... args) {
@@ -67,7 +309,6 @@ protected StreamTableEnvironment getTableEnv() {
StreamTableEnvironment.create(
getEnv(), EnvironmentSettings.newInstance().inStreamingMode().build());
Configuration configuration = tEnv.getConfig().getConfiguration();
- // set low-level key-value options
configuration.setString(TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key(), "true");
}
}
diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
index 9a1a21e09d..9b096eca81 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/LookupITCase.java
@@ -33,10 +33,10 @@
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.io.TaskWriter;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashSet;
@@ -54,7 +54,7 @@ public LookupITCase() {
new BasicTableTestHelper(true, false));
}
- @Before
+ @BeforeEach
public void setup() throws IOException {
List dbs = getMixedFormatCatalog().listDatabases();
if (dbs.isEmpty()) {
@@ -83,7 +83,7 @@ public void setup() throws IOException {
TableIdentifier.of(getCatalogName(), db, "L"), Lists.newArrayList(DataUtil.toRowData(1)));
}
- @After
+ @AfterEach
public void drop() {
exec("drop table mixed_catalog.%s.L", db);
exec("drop table mixed_catalog.%s.DIM", db);
@@ -129,7 +129,7 @@ public void testLookup() throws Exception {
}
}
- Assert.assertEquals(expected, actual.size());
+ Assertions.assertEquals(expected, actual.size());
List