Daehong Blog
โ ๊ฐ์ธ ๋ฉ๋ชจ์ฅ
Flink Table Maintenance
๐ข ์ฌ๋ด ์ด์ ํด๊ฒฐ
๐จ๐ปโ๐ป ์ด๋ ฅ์
๐ป ๊ฐ๋ฐ ์ผ์ง
๐ ๊ฐ์ธ ๊ณต๋ถ
๐ Apache Hadoop
1. ๋ถ์ฐ ์์คํ ์ ์ดํด์ ํ๋ก์ ๋ฑ์ฅ ๋ฐฐ๊ฒฝ
2. ํ๋ก์ ํต์ฌ ๊ตฌ์ฑ์์์ ์ด๋ก
3. YARN(Yet Another Resource Negotiator) ๊ธฐ๋ณธ ์ด๋ก
4. Map Reduce ๊ธฐ๋ณธ ์ด๋ก ๊ณผ ์ค์ต
5. ํ๋ก ์์ฝ์์คํ ๊ณผ ์ด์ ๋ฐ ๊ด๋ฆฌ
6. ํ๋ก ํด๋ฌ์คํฐ, YARN, ๊ธฐ๋ณธ ์์ฝ์์คํ ์ค์ต
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
๐ Apache Iceberg
1. Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ
2. Flink 1.19 ๋ฐ 1.20์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backport
๐ Spring Kafka
1. Kafka RetryTopic ๊ด๋ จ ๊ธฐ๋ณธ Template Bean ์ด๋ฆ ๋ณ๊ฒฝ
Home
Contact
Copyright ยฉ 2024 |
Daehong
Home
>
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
> ๐ Apache Iceberg
Now Loading ...
๐ Apache Iceberg
๐ Flink 1.19 ๋ฐ 1.20์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backport
โ๏ธ 1. ์๋ก ์ด์ ํฌ์คํ ์์ ์๊ฐํ ๊ฒ์ฒ๋ผ, Iceberg Flink Catalog v2.0์์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ์ ์ฑ๊ณตํ์ต๋๋ค. ์ด์ ๋ฐ๋ผ ํด๋น ์ด์์ ์ฐ์ฅ์ ์ด์ ๋ฉ์ธํ ์ด๋์ ์์ฒญ์ ๋ฐ๋ผ, ๋์ผํ ๋ณ๊ฒฝ ์ฌํญ์ Flink 1.19์ 1.20 ๋ฒ์ ์๋ ๋ฐฑํฌํ ํ๊ธฐ๋ก ํ์ต๋๋ค. (ํ์ฌ Iceberg๋ 1.9.1 ๋ฒ์ ์ ์ฌ์ฉ ์ค์ด๋ฉฐ, Flink Catalog๋ 1.19, 1.20, ๊ทธ๋ฆฌ๊ณ 2.0 ๋ฒ์ ์ ์ง์ํฉ๋๋ค.) ์ด๋ฒ ์์ ์ ์ด์ ํฌ์คํ ์ธ ๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ์ ์ฐ์ฅ์ ์ด์, ์ ์ฌํ ๋ด์ฉ์ด ๋ง์, ๋น๊ต์ ์งง๊ฒ ์ ๋ฆฌํด๋ณด๋ ค ํฉ๋๋ค. โ โ๏ธ 2. ๋ณธ๋ก โ๐ป 2.1. PR: MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backporting ์ด๋ฒ ์์ ์ญ์ ์ด์ PR์ ์ฐ์ฅ์ ์ ์๊ธฐ ๋๋ฌธ์, ์ด์ ์ ์ ๊ณผ์ ์ ๋ํ ์์ธํ ์ค๋ช ์ ์๋ตํฉ๋๋ค. ์๋์ ๊ฐ์ด, ๋ฐฑํฌํ ์ ๋ํ ์์ฒญ์ด ์์๋ค๋ ๋ด์ฉ์ ๋จ๊น๋๋ค. ๋ฐ๋ผ์ ์ด๋ฒ ํฌ์คํ ์์๋ ๊ณง๋ฐ๋ก PR ๊ณผ์ ๋ถํฐ ๋ค๋ฃจ๊ฒ ์ต๋๋ค. ์ด์ ์ ์ ๋ฐฐ๊ฒฝ์ด๋ PR ์ ์ฒด ํ๋ฆ์ด ๊ถ๊ธํ์ ๋ถ๋ค์ ์ด์ ํฌ์คํ ์ธ ๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ์ ์ฐธ๊ณ ํด์ฃผ์๊ธฐ ๋ฐ๋๋๋ค. 1๏ธโฃ COMMIT : v1.19 Backporting ๊ธฐ์กด 2.0์ TestIcebergSourceFailover ํด๋์ค์๋ ํฌ๊ฒ ๋ค๋ฅธ ๋ถ๋ถ์ด ์์์ต๋๋ค. ์ค๊ฐ์ ๊ตฌํ๋ ์ฝ๋๊ฐ ์ฝ๊ฐ์ ๋ค๋ฅด๊ธด ํ์์ง๋ง, ์ ๊ฐ ์์ ํด์ผํ๋ ๋ฒ์์๋ ๋ฌด๊ดํ ๋ถ๋ถ์ด์๊ธฐ ๋๋ฌธ์ ๋์ด๊ฐ์ต๋๋ค. ํด๋น ์ฝ๋์ ์ด์ ๋ฒ์ ์ ์๋์ ๊ฐ์์ต๋๋ค. ( v2.0 ๊ณผ ๋์ผ ) Before: import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.ThrowingConsumer; // ... @Test public void testBoundedWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testBoundedWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster)); } // ... @Test public void testContinuousWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testContinuousWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster)); } // ... private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception { MiniClusterWithClientResource miniCluster = null; try { miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG); miniCluster.before(); testMethod.accept(miniCluster.getMiniCluster()); } finally { if (miniCluster != null) { miniCluster.after(); } } } ๊ทธ๋์ ์๋์ ๊ฐ์ ์ฝ๋๋ก ์์ ์ ๋์ผํ๊ฒ ์งํํ์์ต๋๋ค. After: import org.apache.flink.test.junit5.InjectMiniCluster; import org.junit.jupiter.api.AfterEach; // ... @BeforeEach protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { if (!miniCluster.isRunning()) { miniCluster.start(); } } @AfterEach protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { miniCluster.close(); } // ... @Test public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.TM, miniCluster); } @Test public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.JM, miniCluster); } // ... @Test public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.TM, miniCluster); } @Test public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.JM, miniCluster); } ์ด๋ ๊ฒ ์ฝ๋๋ฅผ ์์ ํ๊ฒ ๋ ์ด์ ์, Minicluster ์ ์ญํ ๊ทธ๋ฆฌ๊ณ InjectMiniCluster ์์ ์ฐจ์ด์ ๋ํด์ ์ญ์ ์ด์ ํฌ์คํ ์ ์์ฑ ๋์ด์์ต๋๋ค. ๊ฑฐ๋ญ ๋ง์๋๋ฆฌ์ง๋ง, ์ด์ ํฌ์คํ ์ ๊ผญ ์ฐธ๊ณ ํ์๊ธฐ ๋ฐ๋๋๋ค. :D 2๏ธโฃ COMMIT : v1.20 Backporting v1.20 ๋ฐฑํฌํ ์์ ์, v1.19 ์ ์์ ๋์ผํ๊ฒ ์งํ๋์์ต๋๋ค. ์ฝ๋ ์ญ์ ์์ ๋์ผํ๊ฒ ์์ ํ์์ต๋๋ค. ๊ทธ๋ฆฌํ์ฌ, ์ต์ข ์ ์ผ๋ก๋ ์๋์ ๊ฐ์ PR ์ ์ ์กํ์๊ณ , ์ฑ๊ณต์ ์ผ๋ก Merge ๊ฐ ๋์์ต๋๋ค. ๐ My PR: Backporting Removal of MiniClusterWithClientResource from Iceberg Flink Catalog v1.19, v1.20 ๐น ์ค๊ฐ์ ๋ง์ฃผํ ์ฝ๊ฐ์ ์ด์ ์ด์ ํฌ์คํ ์์ Local ํ ์คํธ์, CI ํ ์คํธ๋ฅผ ์ฑ๊ณตํ๊ธฐ ์ํ์ฌ ํ์ํ ๋ช ๊ฐ์ง Gradle ๋ช ๋ น์ด๋ฅผ ๋ง์๋๋ ธ์ต๋๋ค. ์ด๋ฒ์ ์ญ์ ๋์ผํ ๊ณผ์ ์ ๊ฑฐ์ณค๊ณ , ์ฝ๋ ์คํ์ผ์ ๋ง์ถ๊ธฐ ์ํด ./gradlew :iceberg-flink:iceberg-flink-1.19:spotlessApply ์ ./gradlew :iceberg-flink:iceberg-flink-1.20:spotlessApply ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์์ต๋๋ค. ๊ทธ๋ฌ๋๋ ์๋์ ๊ฐ์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๊ฒ ๋ฉ๋๋ค. ์๋ฌ๋ฅผ ๋ณด๋ฉด, Gradle์ด :iceberg-flink:iceberg-flink-1.20:spotlessApply ๋ผ๋ ํ์คํฌ(ํน์ ๋ชจ๋)๋ฅผ ์คํํ๋ ค ํ์ง๋ง, iceberg-flink ํ๋ก์ ํธ ์๋์ iceberg-flink-1.20 ์ด๋ผ๋ ์๋ธํ๋ก์ ํธ๊ฐ ์กด์ฌํ์ง ์๋ค๋ ๋ด์ฉ์ ๋๋ค. ์กด์ฌํ๋ ์๋ธํ๋ก์ ํธ ํ๋ณด๋ iceberg-flink-2.0 ์ด ์๋ค๊ณ ์น์ ํ๊ฒ ์๋ ค์ฃผ๊ณ ์์ง๋ง, ์ฐ๋ฆฌ์๊ฒ ํ์ํ๊ฑด 1.19 ์ 1.20 ๋ฒ์ ์ ๋๋ค. ์ ๋ ์ฒ์์๋ 2.0 ์ผ๋ก ํด๋, ์๋์ผ๋ก 1.19 ์ 1.20 ์ฝ๋ ์คํ์ผ๋ ๋ง์ถฐ์ง ์ค ์์๋๋ฐ ๊ทธ๊ฒ ์๋์์ต๋๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ด ์ง๋ฌธ์ ํ์์ต๋๋ค. ๊ทธ๋์ ./gradlew properties ๋ช ๋ น์ด๋ฅผ ํตํด ์ฐพ์๋ณด์๋๋, ์ญ์ systemProp.defaultFlinkVersions: 2.0 ์ผ๋ก, ๋์ด์์์ต๋๋ค. ๊ทธ๋์ vi ๋ก gradle.properties ๋ฅผ ์ด์ด์ฃผ์๊ณ , enableFlink1.19=true ๊ณผ, enableFlink1.20=true ์ ์ถ๊ฐํด์ค ๋ค spotlessApply ๋ช ๋ น์ด๋ฅผ ์คํํด์ฃผ์์ต๋๋ค. ๊ทธ๋ฌ๋๋ ์ ์์ ์ผ๋ก ์ฑ๊ณตํ์๊ณ , PR ํ Merge ๊น์ง ์ฑ๊ณตํ์์ต๋๋ค. โ โ๏ธ 3. ๊ฒฐ๋ก ์ด๋ฒ ์์ ์ Iceberg Flink Catalog v2.0์์ ์งํํ๋ MiniClusterWithClientResource ์ ๊ฑฐ ์์ ์ Flink 1.19 ๋ฐ 1.20 ๋ฒ์ ์ ์ฑ๊ณต์ ์ผ๋ก ๋ฐฑํฌํ ํ ์ฌ๋ก์์ต๋๋ค. ์ฝ๋ ๋ณ๊ฒฝ ์์ฒด๋ ์ด์ ๊ณผ ๊ฑฐ์ ๋์ผํ์ง๋ง, Gradle ์ค์ ๋ฐ ๋ก์ปฌ ํ ์คํธ ํ๊ฒฝ ์ค์ ์์ ๋ฐ์ํ ์์ ์ด์๋ฅผ ํด๊ฒฐํ๋ฉด์, Iceberg ํ๋ก์ ํธ์ Gradle ๊ตฌ์ฑ๊ณผ ์๋ธ๋ชจ๋ ํ์ฑํ ๋ฐฉ์์ ๋ํด ๋ ๊น์ด ์ดํดํ ์ ์๋ ๊ณ๊ธฐ๊ฐ ๋์์ต๋๋ค. ์ด๋ฒ ๊ฒฝํ์ ๋ฐํ์ผ๋ก ์์ผ๋ก๋ ์คํ์์ค ํ๋ก์ ํธ์ ๋์ฑ ์ ๊ทน์ ์ผ๋ก ๊ธฐ์ฌํ๋ฉฐ, ๋ค์ํ ๋ฌธ์ ์ํฉ์ ์ฃผ๋์ ์ผ๋ก ํด๊ฒฐํด๋๊ฐ ์ ์๋๋ก ๊พธ์คํ ์ญ๋์ ํค์๊ฐ๊ฒ ์ต๋๋ค. ๐ช
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
ยท 2025-05-27
๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ
โ๏ธ 1. ์๋ก ์๋ 10์๊ฒฝ, ์ฒซ ์คํ ์์ค ๋ฉํ ๋ง ํ๋ก๊ทธ๋จ์ ์ฐธ์ฌํ๋ฉด์ Spring Kafka ํ๋ก์ ํธ์ Contributor ๊ฐ ๋๋ ์์คํ ๊ฒฝํ์ ํ์ต๋๋ค. ๋น์์๋ ์คํ ์์ค ์ํ๊ณ๊ฐ ์ฒ์์ด์๊ธฐ ๋๋ฌธ์, ๋ค์ ๋ฏ์ค๊ณ ์ด๋ ต๊ฒ ๋๊ปด์ก์ง๋ง, ์ด ์ข๊ฒ๋ ์ ๋ฌธ์๋ค์ ์ํ ๋น๊ต์ ๋จ์ํ ์ด์๋ฅผ ํตํด ๊ธฐ์ฌ๋ฅผ ์์ํ ์ ์์์ต๋๋ค. ๊ทธ ์ด์๋ ๋ฌธ์(Docs)์ ์ ์ ๊ณผ ์ผ๋ถ ๊ฐ๋จํ ์ฝ๋ ์์ ์ ๋์์ง๋ง, ์ ์๊ฒ๋ ์คํ ์์ค์ ์ฒซ ๋ฐ์ ๋ด๋๋ ๋ฐ ๋งค์ฐ ์๋ฏธ ์๋ ๊ธฐํ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ฌํด, ๋ค์ ํ ๋ฒ ๊ฐ์ ๋ฉํ ๋ง ์์คํ ์ ์ฐธ์ฌํ๊ฒ ๋์์ต๋๋ค. ํ์ง๋ง ์ด๋ฒ์๋ ๋ฉํ ๋ง์ ๋ฐ๋ ์ ์ฅ์ด ์๋๋ผ, ๋ฉํ ๋ง์ ์ง์ํ๊ณ ์ด์ํ๋ ์ ์ฅ์ผ๋ก ์ฐธ์ฌํ๊ฒ ๋์์ต๋๋ค. ๋ฉํฐ๋ค์ ์ด์ ์ ์ ๊ณผ PR ๊ณผ์ ์ ์กฐ์ธ์ ์ฃผ๋ฉด์, ์์ฐ์ค๋ฝ๊ฒ ์ ๋ ๋ค์ ํ ๋ฒ ์คํ ์์ค ๊ธฐ์ฌ๋ฅผ ๋์ ํด๋ณด๊ณ ์ถ์ ์์์ด ์๊ฒผ์ต๋๋ค. ( ์ด๋ฒ ํฌ์คํ ์ ICeberg ๊ธฐ์ฌ ํฌ์คํ ์ด๊ธฐ ๋๋ฌธ์, ๋ฉํ ๋ง์์ ์ด์์ง์ผ๋ก์ ํ๋ํ ๋ด์ฉ์ ์ต์ํ ํ์์ต๋๋ค. ๊ฒฐ๋ก ์๋ง ์ด์ง ํฌ์คํ ํ๊ฒ ์ต๋๋ค. :D ) ๊ทธ ๊ณผ์ ์์ ๋๊ธธ์ด ๊ฐ ํ๋ก์ ํธ๊ฐ ๋ฐ๋ก Apache Iceberg์์ต๋๋ค. ์ด ํ๋ก์ ํธ๋ ๋จ์ํ ๊ด์ฌ๋ง ์์๋ ๊ฒ์ด ์๋๋ผ, ํ์ฌ ๋ด๋ถ์์ ์ ๊ท ์๋ฃจ์ ๋์ ์ ์ง์ Iceberg๋ฅผ ์ ์ํ์ฌ ์ค์ ๋ก ์ ์ฉ๋๊ธฐ๋ ํ๊ณ , ํด๋น ๊ธฐ์ ์ ๋ฐํ์ผ๋ก ์ฌ๋ด ์ปจํผ๋ฐ์ค ๋ฐํ๊น์ง ์งํํ๋ ๋งํผ, ๊ฐ์ธ์ ์ผ๋ก๋ ์๋ฏธ๊ฐ ๊น๊ณ ์ ์ฐฉ์ด ์๋ ํ๋ก์ ํธ์ ๋๋ค. ๊ทธ๋์ ์ด๋ฒ์๋ ๋จ์ํ ๋ฌธ์ ์์ ์ ๋จธ๋ฌด๋ฅด์ง ์๊ณ , ์ค์ง์ ์ธ ์ฝ๋ ์์ค์ ๊ธฐ์ฌ๋ฅผ ๋ชฉํ๋ก ์ผ์์ต๋๋ค. Apache Iceberg์ ๊ตฌ์กฐ๋ฅผ ๋ ๊น์ด ์๊ฒ ์ดํดํ๊ณ , ์ค์ ๋์์ ๋ถ์ํ๋ฉด์ ํ๋ก์ ํธ์ ๊ธฐ์ฌํ ์ ์๋ ํฌ์ธํธ๋ฅผ ์ฐพ๊ณ ์ ํ์ต๋๋ค. ์ด๋ฒ ๊ธฐํ๋ฅผ ํตํด ๋ ํฐ ๋์ ๊ณผ ์ฑ์ฅ์ ๊ณ๊ธฐ๋ฅผ ๋ง๋ค๊ณ ์ถ์์ต๋๋ค. โ โ๏ธ 2. ๋ณธ๋ก ๐ค 2.1. ์ด์ ์ ํ ๊ณผ์ ์ด๋ฒ ์ด์ ์ ์ ๊ณผ์ ์ AI์ ๋์ 50%, ๊ทธ๋ฆฌ๊ณ ์ ์ ํ๋จ 50%๋ก ์ด๋ฃจ์ด์ก์ต๋๋ค. ๋จผ์ , ๊ธฐ์ฌํ๊ณ ์ถ์ ํ๋ก์ ํธ๋ก๋ ๋จ์ฐ Apache Iceberg๊ฐ 1์์์์ต๋๋ค. Iceberg ์ธ์๋ Spark, Airflow, Hadoop, Flink ๋ฑ์ Apache ์ฌ๋จ ํ๋ก์ ํธ๋ฅผ ํ๋ณด๊ตฐ์ ๋๊ณ , ๊ธฐ์ฌ ๊ฐ๋ฅํ ์ด์๋ค์ ํ๋์ฉ ํ์ํ๊ธฐ ์์ํ์ต๋๋ค. ์ด ๊ณผ์ ์์ Google์ Gemini๋ฅผ ํ์ฉํด Iceberg์ GitHub ์ ์ฅ์์์ ์ด์ ๋ฆฌ์คํธ๋ฅผ ์์งํ๊ณ , AI์๊ฒ โ๊ธฐ์ฌํ๊ธฐ ์ข์ ์ด์์ ๊ธฐ์คโ์ ํ๋กฌํํธ๋ก ์ ์ํ์ฌ ํํฐ๋ง ์์ ์ ์งํํ์ต๋๋ค. ๊ทธ ๊ฒฐ๊ณผ, ์ฌ๋ฌ ์ด์ ์ค์์ good first issue ๋ผ๋ฒจ์ด ๋ถ์ด ์๋ ํญ๋ชฉ๋ค์ ์ ๋ณํด๋ผ ์ ์์๊ณ , ๊ทธ์ค์์๋ ๋จ๋ฒ์ ๋์ ๋๋ ์ด์ ํ๋๋ฅผ ๋ฐ๊ฒฌํ๊ฒ ๋์์ต๋๋ค. ์ด์ฒ๋ผ ์ด๋ฒ์๋ ์์์ ์ผ๋ก๋ง ํ์ํ๋ ๊ณผ๊ฑฐ์ ๋ฌ๋ฆฌ, AI ๋๊ตฌ๋ฅผ ๋ณํํ์ฌ ์๊ฐ์ ์ ์ฝํ๋ฉด์๋ ํจ์จ์ ์ผ๋ก ๊ธฐ์ฌ ๋์ ์ด์๋ฅผ ์ ์ ํ ์ ์์์ต๋๋ค. ์ด๋ ๊ฒ ์ด์๋ฅผ ์ ํํ ์ ๋, ์ด์๋ฅผ ์ฌ๋ฆฐ Maintainer ์์ ์ํต์ ํตํ์ฌ ํด๋น ์ด์๋ฅผ ๋งก๊ฒ ๋ค๊ณ ์์ฒญํ์์ต๋๋ค. ์ด๋ฌํ ๊ณผ์ ์ ํตํ์ฌ Exclude JUnit4 dependency from classpath ์์ Remove JUnit4 dependency from Flink ๊น์ง ์ด์ด์ง๋ ํด๋น ์ด์์ ๊ธฐ์ฌ๋ฅผ ํ๊ธฐ๋ก ํ์ ํ์๊ณ , ํ์ฌ์์๋ ๊ด์ฌ์ ๊ฐ์ง๊ณ , ์ ๋ํ ์ต๊ทผ ๊ณต๋ถ๋ฅผ ํ๊ณ ์๋ Apache Iceberg ์ ๊ธฐ์ฌํ ์ ์๋ ๊ธฐํ๋ฅผ ์ก๊ฒ ๋์์ต๋๋ค. ๐ 2.2. ์ด์: Iceberg Flink Catalog ์ Junit4 ์์กด์ฑ ์ ๊ฑฐ Exclude JUnit4 dependency from classpath ์, Remove JUnit4 dependency from Flink ์ด์ ๋ฅผ ๋ณด๋ฉด, Apache Iceberg์ Maintainer๋ ์ ์ฒด ์ฝ๋๋ฒ ์ด์ค, ํนํ Flink ๊ด๋ จ ๋ชจ๋์์ JUnit4 ์์กด์ฑ์ ์์ ํ ์ ๊ฑฐํ๋ ค๋ ๋ช ํํ ์์ง๋ฅผ ๋๋ฌ๋ด๊ณ ์์์ต๋๋ค. ๋ ์ด์ ๋ชจ๋ ๊ณตํต์ ์ผ๋ก, ํ ์คํธ ์ฝ๋๊ฐ JUnit5 ๊ธฐ๋ฐ์ผ๋ก ํต์ผ๋๊ธธ ์ํ๋ฉฐ, ์ค๋๋ ํ ์คํธ ์ ํธ์ด๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋ฌถ์ฌ ์๋ ์์กด JUnit4 ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ๋ฐ ๋ชฉ์ ์ด ์์์ต๋๋ค. ๋จผ์ #12937 ์ด์์์๋ iceberg-flink ๋ชจ๋ ๋ด์์ JUnit4 ๊ด๋ จ ์ข ์์ฑ์ ์ ๊ฑฐํ๊ณ , JUnit5 ์คํ์ผ๋ก ๋ง์ด๊ทธ๋ ์ด์ ํ ์ผ๋ถ ์์ ๋ด์ญ์ด ๊ณต์ ๋์ด ์์์ต๋๋ค. ์๋ฅผ ๋ค์ด Assume.assumeFalse(โฆ)์ ๊ฐ์ ์ฝ๋๋ assumeThat(โฆ).isEqualTo(โฆ)์ ๊ฐ์ JUnit5 ์คํ์ผ๋ก ๋์ฒด๋๊ณ ์์์ผ๋ฉฐ, build.gradle ํ์ผ์์๋ junit ๊ทธ๋ฃน์ ๋ช ์์ ์ผ๋ก exclude ์ฒ๋ฆฌํ๊ณ ์์์ต๋๋ค. ํ์ง๋ง ์์ง๋ ์์ ํ ์ ๊ฑฐํ์ง ๋ชปํ ๋ถ๋ถ๋ค์ด ์กด์ฌํ์ต๋๋ค. ์๋ฅผ ๋ค์ด, TestIcebergSourceFailover ํ ์คํธ๋ ๋ด๋ถ์ ์ผ๋ก Flink์ MiniClusterWithClientResource๋ฅผ ์ฌ์ฉํ๊ณ ์์๊ณ , ์ด ์ ํธ๋ฆฌํฐ๋ JUnit4์ ์์กดํ๊ณ ์๊ธฐ ๋๋ฌธ์ ์์ ํ ์ ๊ฑฐ์๋ ์ ํ์ด ์์์ต๋๋ค. ๋ ๋ค๋ฅธ ์ด์์ธ #13049์์๋ ํ๋ก์ ํธ ์ ์ฒด build.gradle์ classpath ๋ ๋ฒจ์์ JUnit4๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ธํ์๋ ์ ์์ด ๋ด๊ฒจ ์์์ต๋๋ค. ์ด ์ญ์ Flink ๋ชจ๋๋ฟ๋ง ์๋๋ผ, Testcontainers ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ผ๋ถ ํด๋์ค(GenericContainer)๋ ์ฌ์ ํ JUnit4์ ์์กดํ๊ณ ์์ด, ์์ ํ ์ ๊ฑฐ๋ฅผ ์ํด์๋ ์ธ๋ถ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ ๋ฐ์ดํธ๋ฅผ ๊ธฐ๋ค๋ ค์ผ ํ๋ ์ํฉ์ด์์ต๋๋ค. ์ด๋ฌํ ๋ฐฐ๊ฒฝ์์, ์ ๋ ํด๋น ์ด์๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ตฌ์ฒด์ ์ธ ๊ธฐ์ฌ ๋ฐฉํฅ์ ์ ํ๊ณ ์์ ์ ์์ํ๊ฒ ๋์์ต๋๋ค. ๋จ์ํ ๋ฌธ์ ์์ ์ด๋ ์ค์ ๋ณ๊ฒฝ์ ๋์ด์, ์ง์ ํ ์คํธ ์ฝ๋์ ๊ตฌ์กฐ๋ฅผ ๊ฐ์ ํ๊ณ , Gradle ์์กด์ฑ์ ๋ค๋ฃจ๋ฉฐ, ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํธํ์ฑ ๋ฌธ์ ๊น์ง ๊ณ ๋ฏผํด์ผ ํ๋ ๋์ ์ ์ธ ๊ณผ์ ์๊ธฐ์ ๋์ฑ ์๋ฏธ๊ฐ ์ปธ์ต๋๋ค. ๐ ํด๋น ์ด์๋ ์์์ ์ค๋ช ํ, Apache Iceberg ์ #13049 ์, #12937 ์ด์ ์ ์๋ ๋ด์ฉ์ ๋๋ค. ๐ฌ 2.3. Maintainer ์์ ์ํต ๊ณผ์ ์ ํตํ, PR ๋ฐฉํฅ์ฑ ํ๋ฆฝ ์ ๋ Apache Iceberg ํ๋ก์ ํธ์ Maintainer์ธ @nastra ๋๊ณผ ์ฃผ์ ๊ธฐ์ฌ์ ์ค ํ ๋ถ์ธ @tomtongue ๋๊ณผ ๋ฐ๋ก ์ํต์ ์์ํ์ต๋๋ค. ๋จผ์ , ํด๋น ์์ ์ ์ฐธ์ฌํ๊ณ ์ถ๋ค๋ ์์ฌ๋ฅผ ๋๊ธ๋ก ๋จ๊ฒผ๊ณ , Maintainer๋๊ป์๋ ๋งค์ฐ ํ์ํด ์ฃผ์๋ฉฐ ์์ ๋กญ๊ฒ ์์ ์ ์งํํด๋ ๋๋ค๋ ๊ธ์ ์ ์ธ ๋ต๋ณ์ ์ฃผ์ จ์ต๋๋ค. ๋ํ, ์ ๋ณด๋ค ๋จผ์ ๊ฐ์ ์ด์์ ๊ด์ฌ์ ๋ณด์ธ tomtongue ๋๊ณผ ํ์ ๊ฐ๋ฅ์ฑ๋ ์ด์ด๋์๋๋ฐ, ๊ทธ๋ถ์ Flink ๋ง์ด๊ทธ๋ ์ด์ ์์ ์๋ ์๊ฐ์ด ์ข ๋ ํ์ํ ๊ฒ ๊ฐ๋ค๊ณ ๋ง์ํด ์ฃผ์ จ์ต๋๋ค. ์ด์ ์์ฐ์ค๋ฝ๊ฒ ์ ๊ฐ ๋จผ์ ์์ ์ ์งํํ๋ ๋ฐฉํฅ์ผ๋ก ์ญํ ์ด ์ ๋ฆฌ๋์์ต๋๋ค. ๊ตฌ์ฒด์ ์ธ ์์ ๊ณํ์ผ๋ก๋, ์ฐ์ JUnit4์ ์์กดํ๊ณ ์๋ TestIcebergSourceFailover ํด๋์ค๋ฅผ JUnit5๋ก ๋ง์ด๊ทธ๋ ์ด์ ํ๋ ๊ฒ๊ณผ, Gradle ์ค์ ์์ JUnit4 ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ์์ ์ ํฌํจํ์ต๋๋ค. ์ฒ์์๋ Flink 2.0 ๋ฒ์ ๋ถํฐ ์์ ์ ์์ํ ๋ค, ์ดํ 1.20 ๋ฐ 1.19 ๋ฒ์ ์ผ๋ก ์์ฐจ์ ์ผ๋ก ๋ฐฑํฌํ (backport)ํ๊ฒ ๋ค๋ ๊ณํ์ Maintainer๋๊ป ๊ณต์ ํ์ต๋๋ค. Maintainer๋๋ ์ด ๊ณํ์ด ์ ์ ํ๋ค๋ฉฐ, ์ฐ์ Flink 2.0 ๋ชจ๋์์ ์์ ์ ์งํํ ๋ค ๊ฒฐ๊ณผ๋ฅผ ๊ฒํ ๋ฐ๋ ๊ฒ์ด ์ข๊ฒ ๋ค๋ ํผ๋๋ฐฑ์ ์ฃผ์ จ์ต๋๋ค. ์ดํ ์ ๋ ์ฒซ ๋ฒ์งธ PR์ธ #13016์ ์ ์ถํ์๊ณ , ๊ธฐ์กด์ ์ฌ์ฉ๋๋ MiniClusterWithClientResource ์์กด์ฑ์ ์ ๊ฑฐํ๋ฉด์ ํ ์คํธ ์ฝ๋์ Gradle ์ค์ ์ ํจ๊ป ์์ ํ์ต๋๋ค. ํ์ง๋ง Gradle ์ค์ ์ ์์ ํ๋ ๊ณผ์ ์์ ์์์น ๋ชปํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ฌ๋ฌ ๊ฐ์ง ๋ฐฉ๋ฒ์ ์๋ํ์์๋ ๋ถ๊ตฌํ๊ณ , JUnit4์ ๋ํ ์ข ์์ฑ์ ์์ ํ ์ ๊ฑฐํ๋ ๊ฒ์ด ์ฝ์ง ์์๊ณ , ๋น๋์ ํ ์คํธ ๊ณผ์ ์์ ์ง์์ ์ผ๋ก ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ด์ Maintainer๋๊ณผ ๋ค์ ์ํตํ ๊ฒฐ๊ณผ, Flink ๋ด๋ถ ์ ํธ๋ฆฌํฐ ์ค ํ๋์ธ InternalMiniClusterExtension์ด ์ฌ์ ํ JUnit4๋ฅผ ํ์๋ก ํ๊ธฐ ๋๋ฌธ์ ์์ ํ ์ ๊ฑฐ๋ ํ์ค์ ์ผ๋ก ์ด๋ ต๋ค๋ ์ ์ ํจ๊ป ๋์ํ๊ฒ ๋์์ต๋๋ค. ๊ทธ๋ผ์๋ ๋ถ๊ตฌํ๊ณ , Iceberg ์ฝ๋๋ฒ ์ด์ค ์ ์ฒด์์๋ JUnit4 ์์กด์ฑ์ ์ต์ํํ๊ณ , ๊ฐ๋ฅํ๋ฉด JUnit5 ์ค์ฌ์ผ๋ก ํต์ผํ๋ ๋ฐฉํฅ์ด ๋ฐ๋์งํ๋ค๋ ์๊ฒฌ์ ์ฃผ์ ์, ์ ์ญ์ ์ด ๊ฐ์ด๋๋ผ์ธ์ ๋ง์ถฐ TestIcebergSourceFailover ํด๋์ค๋ฅผ ์ค์ฌ์ผ๋ก PR์ ์ง์ํด์ ์ ์ ํด ๋๊ฐ ์ ์์์ต๋๋ค. ์ด์ ๊ฐ์ ์ํต ๊ณผ์ ์ ํตํด, ๋จ์ํ ์ฝ๋ ์์ ๋ฟ๋ง ์๋๋ผ ํ๋ก์ ํธ ๋ด๋ถ ์ํฉ๊ณผ ์ ์ฝ์ ์ดํดํ๋ฉฐ ํ๋ ฅํ๋ ๊ฒฝํ์ ์์ ์ ์์์ต๋๋ค. โ๐ป 2.4. PR: JUnit4 ์์กด์ฑ ์ต์ํ ๋ฐ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ํด๋น ์์ค๋ฅผ ์์ ํ๊ธฐ ์ , ์ ๋ Iceberg์ ๋ํด์๋ ์ฌ๋ด ์ปจํผ๋ฐ์ค๋ฅผ ์งํํ ๋งํผ ์ถฉ๋ถํ ๊ณต๋ถํ ์ํ์์ง๋ง, ์นดํ๋ก๊ทธ ์์ง๊ณผ ๊ด๋ จํด์๋ Spark์ JDBC ๊ธฐ๋ฐ์ ๋ช ๊ฐ์ง์ ๋ํด์๋ง ์ฃผ๋ก ์๊ณ ์์์ต๋๋ค. ๋ฐ๋ฉด, Flink์ ๋ํด์๋ ์๋์ ์ผ๋ก ์ต์ํ์ง ์์์ต๋๋ค. Flink๋ ์ฃผ๋ก ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ๋ฅผ ์ํด ์ฌ์ฉ๋๋ ๋ถ์ฐ ์ฒ๋ฆฌ ์์ง์ผ๋ก, Iceberg์์๋ Flink ์ปค๋ฅํฐ๋ฅผ ํตํด ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๋ฐ ํ์ฉ๋๊ณ ์๋ค ์ ๋๋ก๋ง ์๊ณ ์์์ต๋๋ค. ( ์ ํฌ ํ์ฌ๋ Flink ๋ฅผ ์ฐ๊ณ ์์ง๋ ์์ง๋ง, Iceberg ๋ฅผ ๊ณต๋ถํ ๋ ์ฐธ๊ณ ํ์๋ Kakao ๊ธฐ์ ๋ธ๋ก๊ทธ์ louis ๋ ๊ธ ๋๋ถ์ Flink ์ ๋ํ์ฌ ๊ณต๋ถ๊ฐ ๋์์ต๋๋ค. Apache Iceberg์ Flink CDC ์ฌ์ธต ํ๊ตฌ, ์ํ์น ํ๋งํฌ์ CDC์ ๋ง๋จ. ํ๋งํฌ CDC ๋ง๋ณด๊ธฐ ) ํ์ง๋ง Flink ๋ด๋ถ์์ ํ ์คํธ๋ฅผ ์ํด ์ฌ์ฉ๋๋ โMiniClusterโ๋ผ๋ ๊ฐ๋ ์ ๋ํด์๋ ์ ํ ์์ง ๋ชปํ์ต๋๋ค. ์ด์ Flink ๊ณต์ ๋ฌธ์์ ์ปค๋ฎค๋ํฐ ์๋ฃ, ๊ทธ๋ฆฌ๊ณ GitHub ์ ์ฅ์ ๋ฑ์ ์ฐธ๊ณ ํ์ฌ โMiniClusterโ๊ฐ ๋ฌด์์ธ์ง ๊ณต๋ถํ๊ธฐ ์์ํ์ต๋๋ค. MiniCluster๋ Flink ํด๋ฌ์คํฐ๋ฅผ ๋ก์ปฌ ํ๊ฒฝ์์ ๊ฐ๋ณ๊ฒ ์คํํ ์ ์๋๋ก ํด์ฃผ๋ ์ผ์ข ์ ๋ฏธ๋ ๋ฒ์ ํด๋ฌ์คํฐ๋ก, ํ ์คํธ ํ๊ฒฝ์์ ์ค์ ๋ถ์ฐ ํ๊ฒฝ๊ณผ ์ ์ฌํ ์กฐ๊ฑด์ ๋ง๋ค์ด Flink ์์ ์ ๊ฒ์ฆํ ์ ์๋๋ก ๋๋ ์ญํ ์ ํฉ๋๋ค. ์ฆ, ์ค์ ํด๋ฌ์คํฐ๋ฅผ ๋์ฐ์ง ์๊ณ ๋ ๋ก์ปฌ์์ Flink ์ก์ ์คํํด๋ณผ ์ ์๊ฒ ํด์ฃผ๊ธฐ ๋๋ฌธ์ ํ ์คํธ ์๋ํ์ ๋๋ฒ๊น ์ ๋งค์ฐ ์ ์ฉํฉ๋๋ค. ์ด์ฒ๋ผ MiniCluster๊ฐ Flink ํ ์คํธ ํ๊ฒฝ์์ ์ด๋ค ํต์ฌ์ ์ธ ์ญํ ์ ํ๋์ง ์ดํดํ๋ ๊ฒ์ด, ์ด๋ฒ ์์ ์ ์งํํ๋ ๋ฐ ์ค์ํ ์ฒซ๊ฑธ์์ด ๋์์ต๋๋ค. ๊ทธ ๋ค์์, ๋๋์ด ์ฝ๋๋ฅผ ์์ ํ๊ธฐ ์์ํ์์ต๋๋ค. ์์๋ Junit4 ๋ฅผ ์ด์ฉ ์ ์์ด ์์กดํด์ผํ๋ ๋ถ๋ถ์ ์ด๋ฆฌ๊ณ , ์ด์ ์ปค๋ฏธํฐ๊ฐ ๋ฏธ์ฒ ์์ ํ์ง ๋ชปํ Junit5 ๋ก ๋ฐ๊ฟ ์ ์๋ ๋ถ๋ถ์ ์์ ํ๋ ์์ ์ ๋จผ์ ์งํํ์์ต๋๋ค. ์ดํ์๋, TestIcebergSourceFailover ํด๋์ค์์ Junit4 ๋ฅผ ์์กดํ๊ณ ์๋ ๋ถ๋ถ์ด ์ด๋์ธ์ง๋ฅผ ์ฐพ๊ณ , ๊ทธ ๋ถ๋ถ์ Junit5 ํํ๋ก ๋ณํํ๋ ์์ ์ ์งํํ์์ต๋๋ค. 1๏ธโฃ COMMIT : GenericAppenderHelper ์ ์์ฑ์ @Deprecated ์ ๊ฑฐ ์ฒซ ๋ฒ์งธ๋ก, GenericAppenderHelper.java ์ฝ๋์์, ์์ฑ์๊ฐ @Deprecated ๋์ด์์์ต๋๋ค. ๊ณผ๊ฑฐ์ JUnit์์ ํํ ์ฐ๋ TemporaryFolder ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฐ์ต๋๋ค. TemporaryFolder๋ ํ ์คํธ ์ค ์์ ํ์ผ/๋๋ ํ ๋ฆฌ๋ฅผ ์๋ ์์ฑํ๊ณ ์ ๋ฆฌํด์ฃผ๋ ๋๊ตฌ์ ๋๋ค. ์๋ง, Junit4 ์์กด์ฑ์ ์์ ํ ๊ฑท์ด๋ด๊ธฐ ์ํ ๊ณผ์ ์์ @Deprecated ๋ฅผ ํด๋ ๊ฑฐ ๊ฐ์ต๋๋ค. ํ์ง๋ง ์์์ ์ด์ผ๊ธฐํ์๋ฏ ํ์ฌ Junit4 ๋ฅผ ์์ ํ ๊ฑท์ด๋ด๋ ๊ฒ์ ์ด๋ ต๋ค๊ณ ํ๋จํ์๊ธฐ ๋๋ฌธ์, ์์ง๊น์ง GenericAppenderHelper ํด๋์ค๋ฅผ ํ์๋ก ํ๋ ์์ค๋ค์ ์ํด์, @Deprecated ๋ฅผ ์ญ์ ํด์ฃผ์์ต๋๋ค. ( ์ญ์ ํ์ง ์์ผ๋ฉด, CI ํ ์คํธ ์ ์๋ฌ๊ฐ ๋ฐ์ํฉ๋๋ค. ) Before: @Deprecated public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) { this.table = table; this.fileFormat = fileFormat; this.temp = tmp.getRoot().toPath(); this.conf = conf; } @Deprecated public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { this(table, fileFormat, tmp, null); } After: public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) { this.table = table; this.fileFormat = fileFormat; this.temp = tmp.getRoot().toPath(); this.conf = conf; } public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { this(table, fileFormat, tmp, null); } 2๏ธโฃ COMMIT : Junit4 ์ Assume ์ ๊ฑท์ด๋ด๊ณ , AssertJ ๋ฐฉ์ ํ์ฉ ๋ ๋ฒ์งธ๋ก ์ด์ ์ Junit4 ์์ Junit5 ๋ก ์์กด์ฑ ๋ณ๊ฒฝ์ ์๋ํ๋ ์ปค๋ฏธํฐ๋ถ์ด ๋ฏธ์ฒ ์์ ํ์ง ๋ชปํ ๋ถ๋ถ์ ์์ ํ๋ ๊ณผ์ ์ ๊ฑฐ์ณค์ต๋๋ค. Flink 2.0 ์ TestIcebergSink ํด๋์ค์์, ์๋ ์ฝ๋์ฒ๋ผ Junit4 ๋ฐฉ์์ Assume.assumeFalse(...) ์ ์ฌ์ฉํ๋ ๋ถ๋ถ์ด ์์ด์, ๋ค๋ฅธ ๋ฒ์ ์ TestIcebergSink ํด๋์ค๋ค๊ณผ ๋์ผํ๊ฒ AssertJ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ assumeThat ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํด์ฃผ์์ต๋๋ค. Before: import org.junit.Assume; // ... @TestTemplate void testErrorOnNullForRequiredField() throws Exception { Assume.assumeFalse( "ORC file format supports null values even for required fields.", format == FileFormat.ORC); // Next Code ... } After: import static org.assertj.core.api.Assumptions.assumeThat; // ... @TestTemplate void testErrorOnNullForRequiredField() throws Exception { assumeThat(format) .as("ORC file format supports null values even for required fields.") .isNotEqualTo(FileFormat.ORC); // Next Code ... } 3๏ธโฃ COMMIT : MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ๋ง์ง๋ง์ด ๋๋์ด, ์ด๋ฒ PR ์ ๊ฝ์ธ, TestIcebergSourceFailover ํด๋์ค์์ MiniClusterWithClientResource ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ๊ฒ์ด์์ต๋๋ค. ์ด ๋ถ๋ถ์ ๋ํ ์ค๋ช ์ ์กฐ๊ธ ๊ธธ์ด์ง ์ ์์ผ๋, ๋จผ์ ASIS ์ฝ๋๋ฅผ ๋ณด๊ฒ ์ต๋๋ค. Before: import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.ThrowingConsumer; // ... @Test public void testBoundedWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testBoundedWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster)); } // ... @Test public void testContinuousWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testContinuousWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster)); } // ... private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception { MiniClusterWithClientResource miniCluster = null; try { miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG); miniCluster.before(); testMethod.accept(miniCluster.getMiniCluster()); } finally { if (miniCluster != null) { miniCluster.after(); } } } ๋จผ์ , JUnit 5์์๋ @BeforeAll, @AfterAll, @BeforeEach, @AfterEach๋ฅผ ์ฌ์ฉํด ๋ฆฌ์์ค ์๋ช ์ฃผ๊ธฐ๋ฅผ ๋ค๋ฃจ๋ ๊ฒ ์ผ๋ฐ์ ์ ๋๋ค. ํ์ง๋ง ํด๋น ์ฝ๋๋ ํ ์คํธ๋ง๋ค miniCluster.before() / after()๋ฅผ ์๋์ผ๋ก ํธ์ถํฉ๋๋ค. ์ด๋ถ๋ถ์ด JUnit 4 ์คํ์ผ์ด๋ผ๊ณ ํ ์ ์์ต๋๋ค. ๋ํ, MiniClusterWithClientResource ๋ ๋ณดํต JUnit4์ ExternalResource ๋ฅผ ์์ํด์ ๊ตฌํํฉ๋๋ค. JUnit4์์๋ ๋ฆฌ์์ค ์ด๊ธฐํ์ ์ ๋ฆฌ๋ฅผ ์ํด @Rule ์ด๋ ธํ ์ด์ ์ ํตํด ExternalResource ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด๋, Flink ๊ฐ๋ฐ ์ด๊ธฐ ์์ ๊ณผ JUnit5 ๋์ ์์ ์ ์๊ธฐ์ ์ฐจ์ด ๋๋ฌธ์, ๊ธฐ์กด์ ์ ๋ง๋ค์ด์ง ํ ์คํธ ์ ํธ๋ฆฌํฐ๋ค์ด JUnit4 ์คํ์ผ๋ก ๋ง์ด ๋จ์ ์์ต๋๋ค. ์ด๋ฐ ์ด์ ๋๋ฌธ์, ์ฌ์ค ์์ ํ Junit4 ๋ฅผ ๋น์ฅ ๊ฑท์ด๋ด๋ ๊ฒ์ด ์ด๋ ต๋ค๊ณ ํ๋จํ ๊ฑฐ ๊ฐ์ต๋๋ค. ์ฆ, Flink ์ Minicluster ๋ฅผ ํ์ฉํ ํ ์คํธ์ฝ๋ ์์ฒด๊ฐ ์์ ํ JUnit5๋ก ์์ ํ ๋ง์ด๊ทธ๋ ์ด์ ๋์ง ์์ ์ํ์ด๊ธฐ ๋๋ฌธ์, Iceberg ์๋ ๋น์ฅ ์ ์ฉํ๊ธฐ ์ด๋ ต๋ค๋ ๊ฒ์ผ๋ก ์๊ฐ๋ฉ๋๋ค. ๊ณ์ ์ด์ด์ ์ค๋ช ํ์๋ฉด, ์ฌ๊ธฐ์๋ ํ ์คํธ๊ฐ ์คํ๋ ๋๋ง๋ค MiniClusterWithClientResource ์ธ์คํด์ค๋ฅผ ์ง์ ์์ฑํ๊ณ , ์๋์ผ๋ก before()์ after()๋ฅผ ํธ์ถํด์ ํด๋ฌ์คํฐ ์์๊ณผ ์ข ๋ฃ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ดํ๊ณ ์์ต๋๋ค. ํ ์คํธ ๋ฉ์๋๋ค์ MiniCluster๋ฅผ ์ง์ ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ง ์๊ณ , ๋๋ค ThrowingConsumer์ ๋๊ฒจ์ ๊ฐ์ ์ ์ผ๋ก ์ฒ๋ฆฌํฉ๋๋ค. ์ฆ, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ํ ์คํธ ์ฝ๋ ๋ด๋ถ์์ ์ง์ ์ ์ดํ๋ ํํ์ ๋๋ค. ๋ณดํต ์ด๋ฐ ๋ฐฉ์์ JUnit4์์ @Rule ๊ฐ์ ์๋ ๋ฆฌ์์ค ๊ด๋ฆฌ๊ฐ ์๊ฑฐ๋ ์ปค์คํ ์ํฉ์์ ์ฐ์ ๋๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ ์ฝ๋๋ก ์์ ์ ์งํํ์์ต๋๋ค. After: import org.apache.flink.test.junit5.InjectMiniCluster; // ... @Test public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.TM, miniCluster); } @Test public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.JM, miniCluster); } // ... @Test public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.TM, miniCluster); } @Test public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.JM, miniCluster); } ์ค์ ์, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฉด์์, MiniClusterWithClientResource ์๋ ์์ฑ ๋ฐ before()/after() ํธ์ถ ํ๋ ๋ถ๋ถ์ @InjectMiniCluster๋ฅผ ํตํด ์๋์ผ๋ก ์ฃผ์ ๋ฐ ๊ด๋ฆฌ๋ฅผ ํ ์ ์๊ฒ๋ ๋ณ๊ฒฝํ๋ค๋ ๋ถ๋ถ์ ๋๋ค. ๊ทธ๋ฆฌ๊ณ ํ ์คํธ ์ฝ๋ ์์ฑ ์, ๋๋ค ๋ฐฉ์์ผ๋ก ํ ์คํธ ์คํ, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ์ง์ ์ ์ดํ๋ ๋ฐฉ์์์ ํ ์คํธ ๋ฉ์๋ ํ๋ผ๋ฏธํฐ๋ก ๋ฆฌ์์ค ์ฃผ์ ๋ฐ์ ์ฌ์ฉํ๋๋ก ๋ณ๊ฒฝํ์๊ณ , ExternalResource ๊ธฐ๋ฐ (JUnit4 Rule) ๋ฐฉ์์์ ParameterResolver ๋๋ Extension ๊ธฐ๋ฐ (JUnit5) ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํ์๋ค๋ ์ ์ด ์๊ฒ ์ต๋๋ค. ์ด๋ฌํ ๋ฐฉ์์ MainTainer ๋ค์ ํ ๋ก ์ ํ์๊ณ , ๊ด์ฐฎ์ ๋ฐฉ์์ด๋ผ๋ ๊ฒฐ๋ก ์ ๋ค๋ค๋ฅด๊ฒ ๋ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ , ๋ค๋ฅธ ์ปจํธ๋ฆฌ๋ทฐํฐ๋ค ์ญ์, ๊ด์ฐฎ๋ค๋ ๋ฆฌ๋ทฐ๋ฅผ ๋ฌ์์ฃผ์์ต๋๋ค. ๐ง MiniCluster ์ฃผ์ ํ ์คํ ์ค๋ฅ ๋ฐ์ ๋ฐ ์๋ ์๋ช ์ฃผ๊ธฐ ๊ด๋ฆฌ ์ฝ๋ ์ถ๊ฐ ๊ทธ๋ฌ๋, ์ด๋ถ๋ถ์ ๋ํ์ฌ ์ฒ์์๋ CI ํ ์คํธ์์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ์์ต๋๋ค. ํ ์คํธ๊ฐ 120์ด ์์ ๋๋์ง ์๊ณ Timeout์ผ๋ก ์คํจํ๋ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ๊ฒ์ ๋๋ค. @InjectMiniCluster๋ฅผ ํตํด MiniCluster๋ฅผ ํ ์คํธ ๋ฉ์๋์ ์ฃผ์ ๋ฐ์์ง๋ง ์ฃผ์ ๋ง ๋ ์ํ์ด์ง, ๋ช ์์ ์ผ๋ก start()๋ฅผ ํธ์ถํ์ง ์์ผ๋ฉด MiniCluster๊ฐ ์คํ๋์ง ์์๋ ๊ฒ์ผ๋ก ๋ณด์ ๋๋ค. ๊ฒฐ๊ตญ ํ ์คํธ ๋ก์ง์์ MiniCluster๋ฅผ ์ฌ์ฉํ๋ ค ํ์ ๋, ํด๋ฌ์คํฐ๊ฐ โ๋นํ์ฑ ์ํโ์ด๋ฏ๋ก, ์์ ์ด ์์๋์ง ์๊ฑฐ๋, ์คํ ์ค hang ์ด ๋ฐ์ํ ๊ฒ์ผ๋ก ๋ณด์ ๋๋ค. ์ข ๋ ์์ธ์ ์ฐพ์๋ณด๋. JUnit5์ ํ์ฅ ๋ชจ๋ธ์์๋ @Injectโฆ ๊ฐ์ ์ด๋ ธํ ์ด์ ์ผ๋ก ๊ฐ์ฒด ์ฃผ์ ์ ๊ฐ๋ฅํ์ง๋ง, ํด๋น ๊ฐ์ฒด์ ์๋ช ์ฃผ๊ธฐ(start/stop)๋ ์๋์ผ๋ก ๋ณด์ฅ๋์ง ์์ ์ ์๋ค๊ณ ํฉ๋๋ค. ๋ํ MiniCluster๋ ๋ช ์์ ์ผ๋ก .start() ํธ์ถ์ด ํ์ํ๊ณ , ์ฃผ์ ์ ๋จ์ํ ์์ฑ๋ง ํด์ฃผ๋ ๊ฒ์ด๋ฉฐ, ์คํ์ ๋ณ๋ ๋จ๊ณ๋ผ๋ ์์ธ์ ์ฐพ์ ์ ์์์ต๋๋ค. ๊ฒฐ๊ณผ์ ์ผ๋ก ํ ์คํธ๊ฐ MiniCluster์ ์์ ์ ์ ์ถํ๋ ค ํ์ง๋ง, MiniCluster๊ฐ ์์๋์ง ์์๊ณ , ์ด๋ ํ ์คํธ timeout (120์ด) ์ด๊ณผ๋ก ์คํจ๊น์ง ์ด์ด์ง๊ฒ ๋ ๊ฒ์ ๋๋ค. ๊ทธ๋์ TestIcebergSourceFailover ํด๋์ค์ ์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ์ถ๊ฐ ์ปค๋ฐํ์์ต๋๋ค. Add Code: import org.junit.jupiter.api.AfterEach; // AfterEach ์ถ๊ฐ @BeforeEach protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { if (!miniCluster.isRunning()) { miniCluster.start(); } } @AfterEach protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { miniCluster.close(); } @BeforeEach์ @AfterEach๋ฅผ ํตํด MiniCluster์ ์์๊ณผ ์ข ๋ฃ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ดํจ์ผ๋ก์จ, ํ ์คํธ ์คํ ์ ํ์ํ Flink ํด๋ฌ์คํฐ ํ๊ฒฝ์ด ์ ์์ ์ผ๋ก ์ด๊ธฐํ๋๊ณ ์ ๋ฆฌ๋ ์ ์๊ฒ ํ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ๊ทธ ๊ฒฐ๊ณผ, ์ต์ข ์ ์ผ๋ก๋ Iceberg ์ PR Merge ๋ฅผ ์ฑ๊ณตํ์ฌ, Apache ์ฌ๋จ์ Iceberg ํ๋ก์ ํธ์ ๊ธฐ์ฌํ ์ ์์๊ณ , Contributor ๊ฐ ๋ ์ ์์์ต๋๋ค. ๐ My PR: Iceberg Flink Catalog v2.0 Remove the MiniClusterWithClientResource dependency ๐น Local Test ๋ฐ CI ํต๊ณผ๋ฅผ ์ํ ์ ์ฐจ ์๋ด ์ฒ์ Iceberg ํ๋ก์ ํธ์ ๊ธฐ์ฌํ๋ฉด์ ๊ฐ์ฅ ํท๊ฐ๋ ธ๋ ๋ถ๋ถ ์ค ํ๋๊ฐ, ๋ก์ปฌ ํ ์คํธ์ CI ํต๊ณผ ๋ฐฉ์์ด์์ต๋๋ค. ํน์ ์ ์ฒ๋ผ Iceberg์ ๊ธฐ์ฌํ๊ณ ์ ํ๋ ๋ถ๋ค์ด ๊ณ์๋ค๋ฉด, ์๋ ์ ์ฐจ๋ฅผ ์ฐธ๊ณ ํ์๋ฉด ๋์์ด ๋ ๊ฒ์ ๋๋ค. โ ์ฝ๋ ์คํ์ผ ์ ๋ฆฌ: spotlessApply Iceberg๋ Spotless๋ฅผ ์ฌ์ฉํ์ฌ ์ฝ๋ ํฌ๋งทํ ์ ๊ฒ์ฌํฉ๋๋ค. ๋ฐ๋ผ์ PR์ ์์ฑํ๊ธฐ ์ ๋ฐ๋์ ์๋ ๋ช ๋ น์ด๋ฅผ ์คํํ์ฌ ์ฝ๋ ์คํ์ผ์ ์๋์ผ๋ก ์ ๋ฆฌํด์ผ ํฉ๋๋ค: ./gradlew :iceberg-flink:iceberg-flink-2.0:spotlessApply ์ด ๋ช ๋ น์ด๋ Java ์ฝ๋์ ์ ํด์ง ์คํ์ผ ๊ฐ์ด๋์ ๋ง๊ฒ ๋ค์ฌ์ฐ๊ธฐ, ์ ๋ ฌ, ๊ณต๋ฐฑ ๋ฑ์ ์๋์ผ๋ก ์์ ํด์ค๋๋ค. ์ด๋ฅผ ์ ์ฉํ์ง ์์ผ๋ฉด CI์์ spotlessCheck ๋จ๊ณ์์ ์คํจํ๊ฒ ๋ฉ๋๋ค. โ ๋ก์ปฌ ํ ์คํธ ์ํ: test โstacktrace CI์ ์ฌ๋ฆฌ๊ธฐ ์ ์, ๋ก์ปฌ ํ๊ฒฝ์์ ๋จผ์ ์ ๋ ํ ์คํธ๋ฅผ ์คํํ์ฌ ์ด์์ด ์๋์ง ํ์ธํด์ผ ํฉ๋๋ค: ./gradlew :iceberg-flink:iceberg-flink-2.0:test --stacktrace โstacktrace ์ต์ ์ ๋ง์ฝ ํ ์คํธ ์คํจ ์ ์์ธํ ์ค๋ฅ ๋ฉ์์ง๋ฅผ ํ์ธํ๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค. ํ ์คํธ๋ฅผ ํต๊ณผํ์ง ๋ชปํ๋ฉด CI ๋จ๊ณ์์๋ ๋์ผํ๊ฒ ์คํจํ๋ฏ๋ก, ๋ก์ปฌ์์ ๋ฐ๋์ ๋จผ์ ํ์ธํ๋ ๊ฒ์ด ์ข์ต๋๋ค. ์ฐธ๊ณ ๋ก, ์ ๋ flink 2.0 ์ ๋ํ์ฌ ํ ์คํธ๋ฅผ ์งํํ๋ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ์์ ๊ฐ์ด ํ์์ต๋๋ค. ./gradlew projects ๋ฅผ ํตํ์ฌ ํ ์คํธ๊ฐ ๊ฐ๋ฅํ ํ๋ก์ ํธ ๋ชฉ๋ก์ ํ์ธํ ํ์, ์ํ๋ ํ๋ก์ ํธ๋ฅผ ๊ฐ์ง๊ณ ์งํํด์ฃผ๋ฉด ๋๊ฒ ์ต๋๋ค. โ โ๏ธ 3. ๊ฒฐ๋ก ์ด๋ฒ ๊ธฐ์ฌ๋ฅผ ํตํด ๊ธฐ์ ์ ์ธ ์ญ๋๋ฟ๋ง ์๋๋ผ, ์คํ์์ค ํ๋ก์ ํธ์์์ ์ปค๋ฎค๋์ผ์ด์ ์ ์ค์์ฑ์ ๊น์ด ์ฒด๊ฐํ ์ ์์์ต๋๋ค. ๋จ์ํ ์ฝ๋๋ฅผ ์์ฑํ๊ณ ์ ์ถํ๋ ๊ฒ์ ๋์ด, Maintainer ๋ฐ ๋ค๋ฅธ ๊ธฐ์ฌ์๋ค๊ณผ์ ์ง์์ ์ธ ์ํต, ํผ๋๋ฐฑ ์์ฉ, ๊ทธ๋ฆฌ๊ณ ํ์ ๋ฐฉํฅ ์กฐ์จ์ด ์ผ๋ง๋ ์ค์ํ์ง ์ง์ ๊ฒฝํํ๊ฒ ๋์์ต๋๋ค. ํนํ ์ด์๋ฅผ ๊ณต์ ํ๊ณ , ์์ ๊ณํ์ ๋ช ํํ ์ ๋ฌํ๋ฉฐ, ๋ฆฌ๋ทฐ์ด์ ํผ๋๋ฐฑ์ ๋น ๋ฅด๊ฒ ๋ฐ์ํ๋ ๊ณผ์ ์ ํตํด ์คํ์์ค ์ํ๊ณ์ ํ์ ๋ฌธํ๊ฐ ์ ๋ขฐ์ ํจ์จ์ ๋ง๋ค์ด๋ด๋ ๊ตฌ์กฐ์์ ๋ฐฐ์ ์ต๋๋ค. ์ฒซ ๋ฒ์งธ PR์ ํตํด ๊ธ์ ์ ์ธ ํผ๋๋ฐฑ์ ๋ฐ์ ๋ค, Maintainer๋ก๋ถํฐ ์์ฒญ๋ฐ์๋ Flink 1.20 ๋ฐ 1.19 ๋ฒ์ ์ ๋ํ backport ์์ ์ญ์ ๋น ๋ฅด๊ฒ ์งํํ๊ณ ์ถ๋ค๋ ๋๊ธฐ๋ถ์ฌ๊ฐ ์๊ฒผ๊ณ , ๊ธฐ์ ์ ๊ธฐ์ฌ๋ฟ ์๋๋ผ ํ๋ก์ ํธ ์๊ตฌ์ ์ ์ํ ๋์ํ๋ ํ๋์ ์ค์์ฑ๋ ๋๊ผ์ต๋๋ค. ๋ํ, ์คํ์์ค ๋ฉํ ๋ง์์ ์ด์์ง ์ญํ ์ ๋งก์ ์ ์ ๊ธฐ์ฌ๋ฅผ ํ๋ฉด์ ๋์์ ๋ง์ ๋ฉํฐ๋ค์ด ์คํ์์ค์ ๋์ ํ๋ ๊ณผ์ ์ ์ง์ผ๋ณด๊ณ , ๊ทธ๋ถ๋ค๊ป ๋ต๊ธ๋ก ๋ฐฉํฅ์ฑ์ ์ ์ํ๋ ๊ฒฝํ์ ์ ์์ ์ ์ฑ์ฅ์๋ ํฐ ์๊ทน์ด ๋์์ต๋๋ค. ๋น๋ก ์ค๋ ํฌ์คํ ์์๋ ๋ฉํ ๋ง ๋ด์ฉ์ด ์์ธํ ๋ค๋ค์ง์ง ์์์ง๋ง, ์ ๋ ์์ผ๋ก๋ ์ด ๋ฉํ ๋ง ํ๋๊ณผ ์คํ์์ค ๊ธฐ์ฌ๋ฅผ ๊พธ์คํ ์ด์ด๊ฐ ๊ฒ์ด๋ฉฐ, ์ฐ๋ฆฌ๋๋ผ์ ์ฌ๋ฐ๋ฅด๊ณ ๊ฑด๊ฐํ ์คํ์์ค ๊ธฐ์ฌ ๋ฌธํ๊ฐ ์๋ฆฌ์ก๋๋ก ๋ ธ๋ ฅํ๋ 1์ธ๋๊ฐ ๋๊ณ ์ ํฉ๋๋ค. ์์ผ๋ก๋ ๋จ์ํ ์ฝ๋ ์ ๊ณต์ ๋์ด, ์ ๋ขฐ๋ฅผ ๋ฐํ์ผ๋ก ์ปค๋ฎค๋์ผ์ด์ ํ๊ณ , ํ๋ก์ ํธ์ ์ค์ง์ ์ธ ๊ฐ์น๋ฅผ ๋ํ๋ ๊ธฐ์ฌ์๊ฐ ๋๊ธฐ ์ํด ๊พธ์คํ ๋ ธ๋ ฅํ๊ฒ ์ต๋๋ค.
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
ยท 2025-05-23
<
>
Touch background to close