Daehong Blog
โ ๊ฐ์ธ ๋ฉ๋ชจ์ฅ
Flink Table Maintenance
๐ข ์ฌ๋ด ์ด์ ํด๊ฒฐ
๐จ๐ปโ๐ป ์ด๋ ฅ์
๐ป ๊ฐ๋ฐ ์ผ์ง
๐ ๊ฐ์ธ ๊ณต๋ถ
๐ Apache Hadoop
1. ๋ถ์ฐ ์์คํ ์ ์ดํด์ ํ๋ก์ ๋ฑ์ฅ ๋ฐฐ๊ฒฝ
2. ํ๋ก์ ํต์ฌ ๊ตฌ์ฑ์์์ ์ด๋ก
3. YARN(Yet Another Resource Negotiator) ๊ธฐ๋ณธ ์ด๋ก
4. Map Reduce ๊ธฐ๋ณธ ์ด๋ก ๊ณผ ์ค์ต
5. ํ๋ก ์์ฝ์์คํ ๊ณผ ์ด์ ๋ฐ ๊ด๋ฆฌ
6. ํ๋ก ํด๋ฌ์คํฐ, YARN, ๊ธฐ๋ณธ ์์ฝ์์คํ ์ค์ต
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
๐ Apache Iceberg
1. Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ
2. Flink 1.19 ๋ฐ 1.20์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backport
๐ Spring Kafka
1. Kafka RetryTopic ๊ด๋ จ ๊ธฐ๋ณธ Template Bean ์ด๋ฆ ๋ณ๊ฒฝ
Home
Contact
Copyright ยฉ 2024 |
Daehong
Home
> ๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
Now Loading ...
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
๐ Flink 1.19 ๋ฐ 1.20์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backport
โ๏ธ 1. ์๋ก ์ด์ ํฌ์คํ ์์ ์๊ฐํ ๊ฒ์ฒ๋ผ, Iceberg Flink Catalog v2.0์์ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ์ ์ฑ๊ณตํ์ต๋๋ค. ์ด์ ๋ฐ๋ผ ํด๋น ์ด์์ ์ฐ์ฅ์ ์ด์ ๋ฉ์ธํ ์ด๋์ ์์ฒญ์ ๋ฐ๋ผ, ๋์ผํ ๋ณ๊ฒฝ ์ฌํญ์ Flink 1.19์ 1.20 ๋ฒ์ ์๋ ๋ฐฑํฌํ ํ๊ธฐ๋ก ํ์ต๋๋ค. (ํ์ฌ Iceberg๋ 1.9.1 ๋ฒ์ ์ ์ฌ์ฉ ์ค์ด๋ฉฐ, Flink Catalog๋ 1.19, 1.20, ๊ทธ๋ฆฌ๊ณ 2.0 ๋ฒ์ ์ ์ง์ํฉ๋๋ค.) ์ด๋ฒ ์์ ์ ์ด์ ํฌ์คํ ์ธ ๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ์ ์ฐ์ฅ์ ์ด์, ์ ์ฌํ ๋ด์ฉ์ด ๋ง์, ๋น๊ต์ ์งง๊ฒ ์ ๋ฆฌํด๋ณด๋ ค ํฉ๋๋ค. โ โ๏ธ 2. ๋ณธ๋ก โ๐ป 2.1. PR: MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ Backporting ์ด๋ฒ ์์ ์ญ์ ์ด์ PR์ ์ฐ์ฅ์ ์ ์๊ธฐ ๋๋ฌธ์, ์ด์ ์ ์ ๊ณผ์ ์ ๋ํ ์์ธํ ์ค๋ช ์ ์๋ตํฉ๋๋ค. ์๋์ ๊ฐ์ด, ๋ฐฑํฌํ ์ ๋ํ ์์ฒญ์ด ์์๋ค๋ ๋ด์ฉ์ ๋จ๊น๋๋ค. ๋ฐ๋ผ์ ์ด๋ฒ ํฌ์คํ ์์๋ ๊ณง๋ฐ๋ก PR ๊ณผ์ ๋ถํฐ ๋ค๋ฃจ๊ฒ ์ต๋๋ค. ์ด์ ์ ์ ๋ฐฐ๊ฒฝ์ด๋ PR ์ ์ฒด ํ๋ฆ์ด ๊ถ๊ธํ์ ๋ถ๋ค์ ์ด์ ํฌ์คํ ์ธ ๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ์ ์ฐธ๊ณ ํด์ฃผ์๊ธฐ ๋ฐ๋๋๋ค. 1๏ธโฃ COMMIT : v1.19 Backporting ๊ธฐ์กด 2.0์ TestIcebergSourceFailover ํด๋์ค์๋ ํฌ๊ฒ ๋ค๋ฅธ ๋ถ๋ถ์ด ์์์ต๋๋ค. ์ค๊ฐ์ ๊ตฌํ๋ ์ฝ๋๊ฐ ์ฝ๊ฐ์ ๋ค๋ฅด๊ธด ํ์์ง๋ง, ์ ๊ฐ ์์ ํด์ผํ๋ ๋ฒ์์๋ ๋ฌด๊ดํ ๋ถ๋ถ์ด์๊ธฐ ๋๋ฌธ์ ๋์ด๊ฐ์ต๋๋ค. ํด๋น ์ฝ๋์ ์ด์ ๋ฒ์ ์ ์๋์ ๊ฐ์์ต๋๋ค. ( v2.0 ๊ณผ ๋์ผ ) Before: import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.ThrowingConsumer; // ... @Test public void testBoundedWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testBoundedWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster)); } // ... @Test public void testContinuousWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testContinuousWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster)); } // ... private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception { MiniClusterWithClientResource miniCluster = null; try { miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG); miniCluster.before(); testMethod.accept(miniCluster.getMiniCluster()); } finally { if (miniCluster != null) { miniCluster.after(); } } } ๊ทธ๋์ ์๋์ ๊ฐ์ ์ฝ๋๋ก ์์ ์ ๋์ผํ๊ฒ ์งํํ์์ต๋๋ค. After: import org.apache.flink.test.junit5.InjectMiniCluster; import org.junit.jupiter.api.AfterEach; // ... @BeforeEach protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { if (!miniCluster.isRunning()) { miniCluster.start(); } } @AfterEach protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { miniCluster.close(); } // ... @Test public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.TM, miniCluster); } @Test public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.JM, miniCluster); } // ... @Test public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.TM, miniCluster); } @Test public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.JM, miniCluster); } ์ด๋ ๊ฒ ์ฝ๋๋ฅผ ์์ ํ๊ฒ ๋ ์ด์ ์, Minicluster ์ ์ญํ ๊ทธ๋ฆฌ๊ณ InjectMiniCluster ์์ ์ฐจ์ด์ ๋ํด์ ์ญ์ ์ด์ ํฌ์คํ ์ ์์ฑ ๋์ด์์ต๋๋ค. ๊ฑฐ๋ญ ๋ง์๋๋ฆฌ์ง๋ง, ์ด์ ํฌ์คํ ์ ๊ผญ ์ฐธ๊ณ ํ์๊ธฐ ๋ฐ๋๋๋ค. :D 2๏ธโฃ COMMIT : v1.20 Backporting v1.20 ๋ฐฑํฌํ ์์ ์, v1.19 ์ ์์ ๋์ผํ๊ฒ ์งํ๋์์ต๋๋ค. ์ฝ๋ ์ญ์ ์์ ๋์ผํ๊ฒ ์์ ํ์์ต๋๋ค. ๊ทธ๋ฆฌํ์ฌ, ์ต์ข ์ ์ผ๋ก๋ ์๋์ ๊ฐ์ PR ์ ์ ์กํ์๊ณ , ์ฑ๊ณต์ ์ผ๋ก Merge ๊ฐ ๋์์ต๋๋ค. ๐ My PR: Backporting Removal of MiniClusterWithClientResource from Iceberg Flink Catalog v1.19, v1.20 ๐น ์ค๊ฐ์ ๋ง์ฃผํ ์ฝ๊ฐ์ ์ด์ ์ด์ ํฌ์คํ ์์ Local ํ ์คํธ์, CI ํ ์คํธ๋ฅผ ์ฑ๊ณตํ๊ธฐ ์ํ์ฌ ํ์ํ ๋ช ๊ฐ์ง Gradle ๋ช ๋ น์ด๋ฅผ ๋ง์๋๋ ธ์ต๋๋ค. ์ด๋ฒ์ ์ญ์ ๋์ผํ ๊ณผ์ ์ ๊ฑฐ์ณค๊ณ , ์ฝ๋ ์คํ์ผ์ ๋ง์ถ๊ธฐ ์ํด ./gradlew :iceberg-flink:iceberg-flink-1.19:spotlessApply ์ ./gradlew :iceberg-flink:iceberg-flink-1.20:spotlessApply ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์์ต๋๋ค. ๊ทธ๋ฌ๋๋ ์๋์ ๊ฐ์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๊ฒ ๋ฉ๋๋ค. ์๋ฌ๋ฅผ ๋ณด๋ฉด, Gradle์ด :iceberg-flink:iceberg-flink-1.20:spotlessApply ๋ผ๋ ํ์คํฌ(ํน์ ๋ชจ๋)๋ฅผ ์คํํ๋ ค ํ์ง๋ง, iceberg-flink ํ๋ก์ ํธ ์๋์ iceberg-flink-1.20 ์ด๋ผ๋ ์๋ธํ๋ก์ ํธ๊ฐ ์กด์ฌํ์ง ์๋ค๋ ๋ด์ฉ์ ๋๋ค. ์กด์ฌํ๋ ์๋ธํ๋ก์ ํธ ํ๋ณด๋ iceberg-flink-2.0 ์ด ์๋ค๊ณ ์น์ ํ๊ฒ ์๋ ค์ฃผ๊ณ ์์ง๋ง, ์ฐ๋ฆฌ์๊ฒ ํ์ํ๊ฑด 1.19 ์ 1.20 ๋ฒ์ ์ ๋๋ค. ์ ๋ ์ฒ์์๋ 2.0 ์ผ๋ก ํด๋, ์๋์ผ๋ก 1.19 ์ 1.20 ์ฝ๋ ์คํ์ผ๋ ๋ง์ถฐ์ง ์ค ์์๋๋ฐ ๊ทธ๊ฒ ์๋์์ต๋๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ด ์ง๋ฌธ์ ํ์์ต๋๋ค. ๊ทธ๋์ ./gradlew properties ๋ช ๋ น์ด๋ฅผ ํตํด ์ฐพ์๋ณด์๋๋, ์ญ์ systemProp.defaultFlinkVersions: 2.0 ์ผ๋ก, ๋์ด์์์ต๋๋ค. ๊ทธ๋์ vi ๋ก gradle.properties ๋ฅผ ์ด์ด์ฃผ์๊ณ , enableFlink1.19=true ๊ณผ, enableFlink1.20=true ์ ์ถ๊ฐํด์ค ๋ค spotlessApply ๋ช ๋ น์ด๋ฅผ ์คํํด์ฃผ์์ต๋๋ค. ๊ทธ๋ฌ๋๋ ์ ์์ ์ผ๋ก ์ฑ๊ณตํ์๊ณ , PR ํ Merge ๊น์ง ์ฑ๊ณตํ์์ต๋๋ค. โ โ๏ธ 3. ๊ฒฐ๋ก ์ด๋ฒ ์์ ์ Iceberg Flink Catalog v2.0์์ ์งํํ๋ MiniClusterWithClientResource ์ ๊ฑฐ ์์ ์ Flink 1.19 ๋ฐ 1.20 ๋ฒ์ ์ ์ฑ๊ณต์ ์ผ๋ก ๋ฐฑํฌํ ํ ์ฌ๋ก์์ต๋๋ค. ์ฝ๋ ๋ณ๊ฒฝ ์์ฒด๋ ์ด์ ๊ณผ ๊ฑฐ์ ๋์ผํ์ง๋ง, Gradle ์ค์ ๋ฐ ๋ก์ปฌ ํ ์คํธ ํ๊ฒฝ ์ค์ ์์ ๋ฐ์ํ ์์ ์ด์๋ฅผ ํด๊ฒฐํ๋ฉด์, Iceberg ํ๋ก์ ํธ์ Gradle ๊ตฌ์ฑ๊ณผ ์๋ธ๋ชจ๋ ํ์ฑํ ๋ฐฉ์์ ๋ํด ๋ ๊น์ด ์ดํดํ ์ ์๋ ๊ณ๊ธฐ๊ฐ ๋์์ต๋๋ค. ์ด๋ฒ ๊ฒฝํ์ ๋ฐํ์ผ๋ก ์์ผ๋ก๋ ์คํ์์ค ํ๋ก์ ํธ์ ๋์ฑ ์ ๊ทน์ ์ผ๋ก ๊ธฐ์ฌํ๋ฉฐ, ๋ค์ํ ๋ฌธ์ ์ํฉ์ ์ฃผ๋์ ์ผ๋ก ํด๊ฒฐํด๋๊ฐ ์ ์๋๋ก ๊พธ์คํ ์ญ๋์ ํค์๊ฐ๊ฒ ์ต๋๋ค. ๐ช
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
ยท 2025-05-27
๐ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ
โ๏ธ 1. ์๋ก ์๋ 10์๊ฒฝ, ์ฒซ ์คํ ์์ค ๋ฉํ ๋ง ํ๋ก๊ทธ๋จ์ ์ฐธ์ฌํ๋ฉด์ Spring Kafka ํ๋ก์ ํธ์ Contributor ๊ฐ ๋๋ ์์คํ ๊ฒฝํ์ ํ์ต๋๋ค. ๋น์์๋ ์คํ ์์ค ์ํ๊ณ๊ฐ ์ฒ์์ด์๊ธฐ ๋๋ฌธ์, ๋ค์ ๋ฏ์ค๊ณ ์ด๋ ต๊ฒ ๋๊ปด์ก์ง๋ง, ์ด ์ข๊ฒ๋ ์ ๋ฌธ์๋ค์ ์ํ ๋น๊ต์ ๋จ์ํ ์ด์๋ฅผ ํตํด ๊ธฐ์ฌ๋ฅผ ์์ํ ์ ์์์ต๋๋ค. ๊ทธ ์ด์๋ ๋ฌธ์(Docs)์ ์ ์ ๊ณผ ์ผ๋ถ ๊ฐ๋จํ ์ฝ๋ ์์ ์ ๋์์ง๋ง, ์ ์๊ฒ๋ ์คํ ์์ค์ ์ฒซ ๋ฐ์ ๋ด๋๋ ๋ฐ ๋งค์ฐ ์๋ฏธ ์๋ ๊ธฐํ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ฌํด, ๋ค์ ํ ๋ฒ ๊ฐ์ ๋ฉํ ๋ง ์์คํ ์ ์ฐธ์ฌํ๊ฒ ๋์์ต๋๋ค. ํ์ง๋ง ์ด๋ฒ์๋ ๋ฉํ ๋ง์ ๋ฐ๋ ์ ์ฅ์ด ์๋๋ผ, ๋ฉํ ๋ง์ ์ง์ํ๊ณ ์ด์ํ๋ ์ ์ฅ์ผ๋ก ์ฐธ์ฌํ๊ฒ ๋์์ต๋๋ค. ๋ฉํฐ๋ค์ ์ด์ ์ ์ ๊ณผ PR ๊ณผ์ ์ ์กฐ์ธ์ ์ฃผ๋ฉด์, ์์ฐ์ค๋ฝ๊ฒ ์ ๋ ๋ค์ ํ ๋ฒ ์คํ ์์ค ๊ธฐ์ฌ๋ฅผ ๋์ ํด๋ณด๊ณ ์ถ์ ์์์ด ์๊ฒผ์ต๋๋ค. ( ์ด๋ฒ ํฌ์คํ ์ ICeberg ๊ธฐ์ฌ ํฌ์คํ ์ด๊ธฐ ๋๋ฌธ์, ๋ฉํ ๋ง์์ ์ด์์ง์ผ๋ก์ ํ๋ํ ๋ด์ฉ์ ์ต์ํ ํ์์ต๋๋ค. ๊ฒฐ๋ก ์๋ง ์ด์ง ํฌ์คํ ํ๊ฒ ์ต๋๋ค. :D ) ๊ทธ ๊ณผ์ ์์ ๋๊ธธ์ด ๊ฐ ํ๋ก์ ํธ๊ฐ ๋ฐ๋ก Apache Iceberg์์ต๋๋ค. ์ด ํ๋ก์ ํธ๋ ๋จ์ํ ๊ด์ฌ๋ง ์์๋ ๊ฒ์ด ์๋๋ผ, ํ์ฌ ๋ด๋ถ์์ ์ ๊ท ์๋ฃจ์ ๋์ ์ ์ง์ Iceberg๋ฅผ ์ ์ํ์ฌ ์ค์ ๋ก ์ ์ฉ๋๊ธฐ๋ ํ๊ณ , ํด๋น ๊ธฐ์ ์ ๋ฐํ์ผ๋ก ์ฌ๋ด ์ปจํผ๋ฐ์ค ๋ฐํ๊น์ง ์งํํ๋ ๋งํผ, ๊ฐ์ธ์ ์ผ๋ก๋ ์๋ฏธ๊ฐ ๊น๊ณ ์ ์ฐฉ์ด ์๋ ํ๋ก์ ํธ์ ๋๋ค. ๊ทธ๋์ ์ด๋ฒ์๋ ๋จ์ํ ๋ฌธ์ ์์ ์ ๋จธ๋ฌด๋ฅด์ง ์๊ณ , ์ค์ง์ ์ธ ์ฝ๋ ์์ค์ ๊ธฐ์ฌ๋ฅผ ๋ชฉํ๋ก ์ผ์์ต๋๋ค. Apache Iceberg์ ๊ตฌ์กฐ๋ฅผ ๋ ๊น์ด ์๊ฒ ์ดํดํ๊ณ , ์ค์ ๋์์ ๋ถ์ํ๋ฉด์ ํ๋ก์ ํธ์ ๊ธฐ์ฌํ ์ ์๋ ํฌ์ธํธ๋ฅผ ์ฐพ๊ณ ์ ํ์ต๋๋ค. ์ด๋ฒ ๊ธฐํ๋ฅผ ํตํด ๋ ํฐ ๋์ ๊ณผ ์ฑ์ฅ์ ๊ณ๊ธฐ๋ฅผ ๋ง๋ค๊ณ ์ถ์์ต๋๋ค. โ โ๏ธ 2. ๋ณธ๋ก ๐ค 2.1. ์ด์ ์ ํ ๊ณผ์ ์ด๋ฒ ์ด์ ์ ์ ๊ณผ์ ์ AI์ ๋์ 50%, ๊ทธ๋ฆฌ๊ณ ์ ์ ํ๋จ 50%๋ก ์ด๋ฃจ์ด์ก์ต๋๋ค. ๋จผ์ , ๊ธฐ์ฌํ๊ณ ์ถ์ ํ๋ก์ ํธ๋ก๋ ๋จ์ฐ Apache Iceberg๊ฐ 1์์์์ต๋๋ค. Iceberg ์ธ์๋ Spark, Airflow, Hadoop, Flink ๋ฑ์ Apache ์ฌ๋จ ํ๋ก์ ํธ๋ฅผ ํ๋ณด๊ตฐ์ ๋๊ณ , ๊ธฐ์ฌ ๊ฐ๋ฅํ ์ด์๋ค์ ํ๋์ฉ ํ์ํ๊ธฐ ์์ํ์ต๋๋ค. ์ด ๊ณผ์ ์์ Google์ Gemini๋ฅผ ํ์ฉํด Iceberg์ GitHub ์ ์ฅ์์์ ์ด์ ๋ฆฌ์คํธ๋ฅผ ์์งํ๊ณ , AI์๊ฒ โ๊ธฐ์ฌํ๊ธฐ ์ข์ ์ด์์ ๊ธฐ์คโ์ ํ๋กฌํํธ๋ก ์ ์ํ์ฌ ํํฐ๋ง ์์ ์ ์งํํ์ต๋๋ค. ๊ทธ ๊ฒฐ๊ณผ, ์ฌ๋ฌ ์ด์ ์ค์์ good first issue ๋ผ๋ฒจ์ด ๋ถ์ด ์๋ ํญ๋ชฉ๋ค์ ์ ๋ณํด๋ผ ์ ์์๊ณ , ๊ทธ์ค์์๋ ๋จ๋ฒ์ ๋์ ๋๋ ์ด์ ํ๋๋ฅผ ๋ฐ๊ฒฌํ๊ฒ ๋์์ต๋๋ค. ์ด์ฒ๋ผ ์ด๋ฒ์๋ ์์์ ์ผ๋ก๋ง ํ์ํ๋ ๊ณผ๊ฑฐ์ ๋ฌ๋ฆฌ, AI ๋๊ตฌ๋ฅผ ๋ณํํ์ฌ ์๊ฐ์ ์ ์ฝํ๋ฉด์๋ ํจ์จ์ ์ผ๋ก ๊ธฐ์ฌ ๋์ ์ด์๋ฅผ ์ ์ ํ ์ ์์์ต๋๋ค. ์ด๋ ๊ฒ ์ด์๋ฅผ ์ ํํ ์ ๋, ์ด์๋ฅผ ์ฌ๋ฆฐ Maintainer ์์ ์ํต์ ํตํ์ฌ ํด๋น ์ด์๋ฅผ ๋งก๊ฒ ๋ค๊ณ ์์ฒญํ์์ต๋๋ค. ์ด๋ฌํ ๊ณผ์ ์ ํตํ์ฌ Exclude JUnit4 dependency from classpath ์์ Remove JUnit4 dependency from Flink ๊น์ง ์ด์ด์ง๋ ํด๋น ์ด์์ ๊ธฐ์ฌ๋ฅผ ํ๊ธฐ๋ก ํ์ ํ์๊ณ , ํ์ฌ์์๋ ๊ด์ฌ์ ๊ฐ์ง๊ณ , ์ ๋ํ ์ต๊ทผ ๊ณต๋ถ๋ฅผ ํ๊ณ ์๋ Apache Iceberg ์ ๊ธฐ์ฌํ ์ ์๋ ๊ธฐํ๋ฅผ ์ก๊ฒ ๋์์ต๋๋ค. ๐ 2.2. ์ด์: Iceberg Flink Catalog ์ Junit4 ์์กด์ฑ ์ ๊ฑฐ Exclude JUnit4 dependency from classpath ์, Remove JUnit4 dependency from Flink ์ด์ ๋ฅผ ๋ณด๋ฉด, Apache Iceberg์ Maintainer๋ ์ ์ฒด ์ฝ๋๋ฒ ์ด์ค, ํนํ Flink ๊ด๋ จ ๋ชจ๋์์ JUnit4 ์์กด์ฑ์ ์์ ํ ์ ๊ฑฐํ๋ ค๋ ๋ช ํํ ์์ง๋ฅผ ๋๋ฌ๋ด๊ณ ์์์ต๋๋ค. ๋ ์ด์ ๋ชจ๋ ๊ณตํต์ ์ผ๋ก, ํ ์คํธ ์ฝ๋๊ฐ JUnit5 ๊ธฐ๋ฐ์ผ๋ก ํต์ผ๋๊ธธ ์ํ๋ฉฐ, ์ค๋๋ ํ ์คํธ ์ ํธ์ด๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋ฌถ์ฌ ์๋ ์์กด JUnit4 ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ๋ฐ ๋ชฉ์ ์ด ์์์ต๋๋ค. ๋จผ์ #12937 ์ด์์์๋ iceberg-flink ๋ชจ๋ ๋ด์์ JUnit4 ๊ด๋ จ ์ข ์์ฑ์ ์ ๊ฑฐํ๊ณ , JUnit5 ์คํ์ผ๋ก ๋ง์ด๊ทธ๋ ์ด์ ํ ์ผ๋ถ ์์ ๋ด์ญ์ด ๊ณต์ ๋์ด ์์์ต๋๋ค. ์๋ฅผ ๋ค์ด Assume.assumeFalse(โฆ)์ ๊ฐ์ ์ฝ๋๋ assumeThat(โฆ).isEqualTo(โฆ)์ ๊ฐ์ JUnit5 ์คํ์ผ๋ก ๋์ฒด๋๊ณ ์์์ผ๋ฉฐ, build.gradle ํ์ผ์์๋ junit ๊ทธ๋ฃน์ ๋ช ์์ ์ผ๋ก exclude ์ฒ๋ฆฌํ๊ณ ์์์ต๋๋ค. ํ์ง๋ง ์์ง๋ ์์ ํ ์ ๊ฑฐํ์ง ๋ชปํ ๋ถ๋ถ๋ค์ด ์กด์ฌํ์ต๋๋ค. ์๋ฅผ ๋ค์ด, TestIcebergSourceFailover ํ ์คํธ๋ ๋ด๋ถ์ ์ผ๋ก Flink์ MiniClusterWithClientResource๋ฅผ ์ฌ์ฉํ๊ณ ์์๊ณ , ์ด ์ ํธ๋ฆฌํฐ๋ JUnit4์ ์์กดํ๊ณ ์๊ธฐ ๋๋ฌธ์ ์์ ํ ์ ๊ฑฐ์๋ ์ ํ์ด ์์์ต๋๋ค. ๋ ๋ค๋ฅธ ์ด์์ธ #13049์์๋ ํ๋ก์ ํธ ์ ์ฒด build.gradle์ classpath ๋ ๋ฒจ์์ JUnit4๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ธํ์๋ ์ ์์ด ๋ด๊ฒจ ์์์ต๋๋ค. ์ด ์ญ์ Flink ๋ชจ๋๋ฟ๋ง ์๋๋ผ, Testcontainers ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ผ๋ถ ํด๋์ค(GenericContainer)๋ ์ฌ์ ํ JUnit4์ ์์กดํ๊ณ ์์ด, ์์ ํ ์ ๊ฑฐ๋ฅผ ์ํด์๋ ์ธ๋ถ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ ๋ฐ์ดํธ๋ฅผ ๊ธฐ๋ค๋ ค์ผ ํ๋ ์ํฉ์ด์์ต๋๋ค. ์ด๋ฌํ ๋ฐฐ๊ฒฝ์์, ์ ๋ ํด๋น ์ด์๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ตฌ์ฒด์ ์ธ ๊ธฐ์ฌ ๋ฐฉํฅ์ ์ ํ๊ณ ์์ ์ ์์ํ๊ฒ ๋์์ต๋๋ค. ๋จ์ํ ๋ฌธ์ ์์ ์ด๋ ์ค์ ๋ณ๊ฒฝ์ ๋์ด์, ์ง์ ํ ์คํธ ์ฝ๋์ ๊ตฌ์กฐ๋ฅผ ๊ฐ์ ํ๊ณ , Gradle ์์กด์ฑ์ ๋ค๋ฃจ๋ฉฐ, ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํธํ์ฑ ๋ฌธ์ ๊น์ง ๊ณ ๋ฏผํด์ผ ํ๋ ๋์ ์ ์ธ ๊ณผ์ ์๊ธฐ์ ๋์ฑ ์๋ฏธ๊ฐ ์ปธ์ต๋๋ค. ๐ ํด๋น ์ด์๋ ์์์ ์ค๋ช ํ, Apache Iceberg ์ #13049 ์, #12937 ์ด์ ์ ์๋ ๋ด์ฉ์ ๋๋ค. ๐ฌ 2.3. Maintainer ์์ ์ํต ๊ณผ์ ์ ํตํ, PR ๋ฐฉํฅ์ฑ ํ๋ฆฝ ์ ๋ Apache Iceberg ํ๋ก์ ํธ์ Maintainer์ธ @nastra ๋๊ณผ ์ฃผ์ ๊ธฐ์ฌ์ ์ค ํ ๋ถ์ธ @tomtongue ๋๊ณผ ๋ฐ๋ก ์ํต์ ์์ํ์ต๋๋ค. ๋จผ์ , ํด๋น ์์ ์ ์ฐธ์ฌํ๊ณ ์ถ๋ค๋ ์์ฌ๋ฅผ ๋๊ธ๋ก ๋จ๊ฒผ๊ณ , Maintainer๋๊ป์๋ ๋งค์ฐ ํ์ํด ์ฃผ์๋ฉฐ ์์ ๋กญ๊ฒ ์์ ์ ์งํํด๋ ๋๋ค๋ ๊ธ์ ์ ์ธ ๋ต๋ณ์ ์ฃผ์ จ์ต๋๋ค. ๋ํ, ์ ๋ณด๋ค ๋จผ์ ๊ฐ์ ์ด์์ ๊ด์ฌ์ ๋ณด์ธ tomtongue ๋๊ณผ ํ์ ๊ฐ๋ฅ์ฑ๋ ์ด์ด๋์๋๋ฐ, ๊ทธ๋ถ์ Flink ๋ง์ด๊ทธ๋ ์ด์ ์์ ์๋ ์๊ฐ์ด ์ข ๋ ํ์ํ ๊ฒ ๊ฐ๋ค๊ณ ๋ง์ํด ์ฃผ์ จ์ต๋๋ค. ์ด์ ์์ฐ์ค๋ฝ๊ฒ ์ ๊ฐ ๋จผ์ ์์ ์ ์งํํ๋ ๋ฐฉํฅ์ผ๋ก ์ญํ ์ด ์ ๋ฆฌ๋์์ต๋๋ค. ๊ตฌ์ฒด์ ์ธ ์์ ๊ณํ์ผ๋ก๋, ์ฐ์ JUnit4์ ์์กดํ๊ณ ์๋ TestIcebergSourceFailover ํด๋์ค๋ฅผ JUnit5๋ก ๋ง์ด๊ทธ๋ ์ด์ ํ๋ ๊ฒ๊ณผ, Gradle ์ค์ ์์ JUnit4 ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ์์ ์ ํฌํจํ์ต๋๋ค. ์ฒ์์๋ Flink 2.0 ๋ฒ์ ๋ถํฐ ์์ ์ ์์ํ ๋ค, ์ดํ 1.20 ๋ฐ 1.19 ๋ฒ์ ์ผ๋ก ์์ฐจ์ ์ผ๋ก ๋ฐฑํฌํ (backport)ํ๊ฒ ๋ค๋ ๊ณํ์ Maintainer๋๊ป ๊ณต์ ํ์ต๋๋ค. Maintainer๋๋ ์ด ๊ณํ์ด ์ ์ ํ๋ค๋ฉฐ, ์ฐ์ Flink 2.0 ๋ชจ๋์์ ์์ ์ ์งํํ ๋ค ๊ฒฐ๊ณผ๋ฅผ ๊ฒํ ๋ฐ๋ ๊ฒ์ด ์ข๊ฒ ๋ค๋ ํผ๋๋ฐฑ์ ์ฃผ์ จ์ต๋๋ค. ์ดํ ์ ๋ ์ฒซ ๋ฒ์งธ PR์ธ #13016์ ์ ์ถํ์๊ณ , ๊ธฐ์กด์ ์ฌ์ฉ๋๋ MiniClusterWithClientResource ์์กด์ฑ์ ์ ๊ฑฐํ๋ฉด์ ํ ์คํธ ์ฝ๋์ Gradle ์ค์ ์ ํจ๊ป ์์ ํ์ต๋๋ค. ํ์ง๋ง Gradle ์ค์ ์ ์์ ํ๋ ๊ณผ์ ์์ ์์์น ๋ชปํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ฌ๋ฌ ๊ฐ์ง ๋ฐฉ๋ฒ์ ์๋ํ์์๋ ๋ถ๊ตฌํ๊ณ , JUnit4์ ๋ํ ์ข ์์ฑ์ ์์ ํ ์ ๊ฑฐํ๋ ๊ฒ์ด ์ฝ์ง ์์๊ณ , ๋น๋์ ํ ์คํธ ๊ณผ์ ์์ ์ง์์ ์ผ๋ก ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค. ์ด์ Maintainer๋๊ณผ ๋ค์ ์ํตํ ๊ฒฐ๊ณผ, Flink ๋ด๋ถ ์ ํธ๋ฆฌํฐ ์ค ํ๋์ธ InternalMiniClusterExtension์ด ์ฌ์ ํ JUnit4๋ฅผ ํ์๋ก ํ๊ธฐ ๋๋ฌธ์ ์์ ํ ์ ๊ฑฐ๋ ํ์ค์ ์ผ๋ก ์ด๋ ต๋ค๋ ์ ์ ํจ๊ป ๋์ํ๊ฒ ๋์์ต๋๋ค. ๊ทธ๋ผ์๋ ๋ถ๊ตฌํ๊ณ , Iceberg ์ฝ๋๋ฒ ์ด์ค ์ ์ฒด์์๋ JUnit4 ์์กด์ฑ์ ์ต์ํํ๊ณ , ๊ฐ๋ฅํ๋ฉด JUnit5 ์ค์ฌ์ผ๋ก ํต์ผํ๋ ๋ฐฉํฅ์ด ๋ฐ๋์งํ๋ค๋ ์๊ฒฌ์ ์ฃผ์ ์, ์ ์ญ์ ์ด ๊ฐ์ด๋๋ผ์ธ์ ๋ง์ถฐ TestIcebergSourceFailover ํด๋์ค๋ฅผ ์ค์ฌ์ผ๋ก PR์ ์ง์ํด์ ์ ์ ํด ๋๊ฐ ์ ์์์ต๋๋ค. ์ด์ ๊ฐ์ ์ํต ๊ณผ์ ์ ํตํด, ๋จ์ํ ์ฝ๋ ์์ ๋ฟ๋ง ์๋๋ผ ํ๋ก์ ํธ ๋ด๋ถ ์ํฉ๊ณผ ์ ์ฝ์ ์ดํดํ๋ฉฐ ํ๋ ฅํ๋ ๊ฒฝํ์ ์์ ์ ์์์ต๋๋ค. โ๐ป 2.4. PR: JUnit4 ์์กด์ฑ ์ต์ํ ๋ฐ MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ํด๋น ์์ค๋ฅผ ์์ ํ๊ธฐ ์ , ์ ๋ Iceberg์ ๋ํด์๋ ์ฌ๋ด ์ปจํผ๋ฐ์ค๋ฅผ ์งํํ ๋งํผ ์ถฉ๋ถํ ๊ณต๋ถํ ์ํ์์ง๋ง, ์นดํ๋ก๊ทธ ์์ง๊ณผ ๊ด๋ จํด์๋ Spark์ JDBC ๊ธฐ๋ฐ์ ๋ช ๊ฐ์ง์ ๋ํด์๋ง ์ฃผ๋ก ์๊ณ ์์์ต๋๋ค. ๋ฐ๋ฉด, Flink์ ๋ํด์๋ ์๋์ ์ผ๋ก ์ต์ํ์ง ์์์ต๋๋ค. Flink๋ ์ฃผ๋ก ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ๋ฅผ ์ํด ์ฌ์ฉ๋๋ ๋ถ์ฐ ์ฒ๋ฆฌ ์์ง์ผ๋ก, Iceberg์์๋ Flink ์ปค๋ฅํฐ๋ฅผ ํตํด ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๋ฐ ํ์ฉ๋๊ณ ์๋ค ์ ๋๋ก๋ง ์๊ณ ์์์ต๋๋ค. ( ์ ํฌ ํ์ฌ๋ Flink ๋ฅผ ์ฐ๊ณ ์์ง๋ ์์ง๋ง, Iceberg ๋ฅผ ๊ณต๋ถํ ๋ ์ฐธ๊ณ ํ์๋ Kakao ๊ธฐ์ ๋ธ๋ก๊ทธ์ louis ๋ ๊ธ ๋๋ถ์ Flink ์ ๋ํ์ฌ ๊ณต๋ถ๊ฐ ๋์์ต๋๋ค. Apache Iceberg์ Flink CDC ์ฌ์ธต ํ๊ตฌ, ์ํ์น ํ๋งํฌ์ CDC์ ๋ง๋จ. ํ๋งํฌ CDC ๋ง๋ณด๊ธฐ ) ํ์ง๋ง Flink ๋ด๋ถ์์ ํ ์คํธ๋ฅผ ์ํด ์ฌ์ฉ๋๋ โMiniClusterโ๋ผ๋ ๊ฐ๋ ์ ๋ํด์๋ ์ ํ ์์ง ๋ชปํ์ต๋๋ค. ์ด์ Flink ๊ณต์ ๋ฌธ์์ ์ปค๋ฎค๋ํฐ ์๋ฃ, ๊ทธ๋ฆฌ๊ณ GitHub ์ ์ฅ์ ๋ฑ์ ์ฐธ๊ณ ํ์ฌ โMiniClusterโ๊ฐ ๋ฌด์์ธ์ง ๊ณต๋ถํ๊ธฐ ์์ํ์ต๋๋ค. MiniCluster๋ Flink ํด๋ฌ์คํฐ๋ฅผ ๋ก์ปฌ ํ๊ฒฝ์์ ๊ฐ๋ณ๊ฒ ์คํํ ์ ์๋๋ก ํด์ฃผ๋ ์ผ์ข ์ ๋ฏธ๋ ๋ฒ์ ํด๋ฌ์คํฐ๋ก, ํ ์คํธ ํ๊ฒฝ์์ ์ค์ ๋ถ์ฐ ํ๊ฒฝ๊ณผ ์ ์ฌํ ์กฐ๊ฑด์ ๋ง๋ค์ด Flink ์์ ์ ๊ฒ์ฆํ ์ ์๋๋ก ๋๋ ์ญํ ์ ํฉ๋๋ค. ์ฆ, ์ค์ ํด๋ฌ์คํฐ๋ฅผ ๋์ฐ์ง ์๊ณ ๋ ๋ก์ปฌ์์ Flink ์ก์ ์คํํด๋ณผ ์ ์๊ฒ ํด์ฃผ๊ธฐ ๋๋ฌธ์ ํ ์คํธ ์๋ํ์ ๋๋ฒ๊น ์ ๋งค์ฐ ์ ์ฉํฉ๋๋ค. ์ด์ฒ๋ผ MiniCluster๊ฐ Flink ํ ์คํธ ํ๊ฒฝ์์ ์ด๋ค ํต์ฌ์ ์ธ ์ญํ ์ ํ๋์ง ์ดํดํ๋ ๊ฒ์ด, ์ด๋ฒ ์์ ์ ์งํํ๋ ๋ฐ ์ค์ํ ์ฒซ๊ฑธ์์ด ๋์์ต๋๋ค. ๊ทธ ๋ค์์, ๋๋์ด ์ฝ๋๋ฅผ ์์ ํ๊ธฐ ์์ํ์์ต๋๋ค. ์์๋ Junit4 ๋ฅผ ์ด์ฉ ์ ์์ด ์์กดํด์ผํ๋ ๋ถ๋ถ์ ์ด๋ฆฌ๊ณ , ์ด์ ์ปค๋ฏธํฐ๊ฐ ๋ฏธ์ฒ ์์ ํ์ง ๋ชปํ Junit5 ๋ก ๋ฐ๊ฟ ์ ์๋ ๋ถ๋ถ์ ์์ ํ๋ ์์ ์ ๋จผ์ ์งํํ์์ต๋๋ค. ์ดํ์๋, TestIcebergSourceFailover ํด๋์ค์์ Junit4 ๋ฅผ ์์กดํ๊ณ ์๋ ๋ถ๋ถ์ด ์ด๋์ธ์ง๋ฅผ ์ฐพ๊ณ , ๊ทธ ๋ถ๋ถ์ Junit5 ํํ๋ก ๋ณํํ๋ ์์ ์ ์งํํ์์ต๋๋ค. 1๏ธโฃ COMMIT : GenericAppenderHelper ์ ์์ฑ์ @Deprecated ์ ๊ฑฐ ์ฒซ ๋ฒ์งธ๋ก, GenericAppenderHelper.java ์ฝ๋์์, ์์ฑ์๊ฐ @Deprecated ๋์ด์์์ต๋๋ค. ๊ณผ๊ฑฐ์ JUnit์์ ํํ ์ฐ๋ TemporaryFolder ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฐ์ต๋๋ค. TemporaryFolder๋ ํ ์คํธ ์ค ์์ ํ์ผ/๋๋ ํ ๋ฆฌ๋ฅผ ์๋ ์์ฑํ๊ณ ์ ๋ฆฌํด์ฃผ๋ ๋๊ตฌ์ ๋๋ค. ์๋ง, Junit4 ์์กด์ฑ์ ์์ ํ ๊ฑท์ด๋ด๊ธฐ ์ํ ๊ณผ์ ์์ @Deprecated ๋ฅผ ํด๋ ๊ฑฐ ๊ฐ์ต๋๋ค. ํ์ง๋ง ์์์ ์ด์ผ๊ธฐํ์๋ฏ ํ์ฌ Junit4 ๋ฅผ ์์ ํ ๊ฑท์ด๋ด๋ ๊ฒ์ ์ด๋ ต๋ค๊ณ ํ๋จํ์๊ธฐ ๋๋ฌธ์, ์์ง๊น์ง GenericAppenderHelper ํด๋์ค๋ฅผ ํ์๋ก ํ๋ ์์ค๋ค์ ์ํด์, @Deprecated ๋ฅผ ์ญ์ ํด์ฃผ์์ต๋๋ค. ( ์ญ์ ํ์ง ์์ผ๋ฉด, CI ํ ์คํธ ์ ์๋ฌ๊ฐ ๋ฐ์ํฉ๋๋ค. ) Before: @Deprecated public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) { this.table = table; this.fileFormat = fileFormat; this.temp = tmp.getRoot().toPath(); this.conf = conf; } @Deprecated public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { this(table, fileFormat, tmp, null); } After: public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) { this.table = table; this.fileFormat = fileFormat; this.temp = tmp.getRoot().toPath(); this.conf = conf; } public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { this(table, fileFormat, tmp, null); } 2๏ธโฃ COMMIT : Junit4 ์ Assume ์ ๊ฑท์ด๋ด๊ณ , AssertJ ๋ฐฉ์ ํ์ฉ ๋ ๋ฒ์งธ๋ก ์ด์ ์ Junit4 ์์ Junit5 ๋ก ์์กด์ฑ ๋ณ๊ฒฝ์ ์๋ํ๋ ์ปค๋ฏธํฐ๋ถ์ด ๋ฏธ์ฒ ์์ ํ์ง ๋ชปํ ๋ถ๋ถ์ ์์ ํ๋ ๊ณผ์ ์ ๊ฑฐ์ณค์ต๋๋ค. Flink 2.0 ์ TestIcebergSink ํด๋์ค์์, ์๋ ์ฝ๋์ฒ๋ผ Junit4 ๋ฐฉ์์ Assume.assumeFalse(...) ์ ์ฌ์ฉํ๋ ๋ถ๋ถ์ด ์์ด์, ๋ค๋ฅธ ๋ฒ์ ์ TestIcebergSink ํด๋์ค๋ค๊ณผ ๋์ผํ๊ฒ AssertJ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ assumeThat ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํด์ฃผ์์ต๋๋ค. Before: import org.junit.Assume; // ... @TestTemplate void testErrorOnNullForRequiredField() throws Exception { Assume.assumeFalse( "ORC file format supports null values even for required fields.", format == FileFormat.ORC); // Next Code ... } After: import static org.assertj.core.api.Assumptions.assumeThat; // ... @TestTemplate void testErrorOnNullForRequiredField() throws Exception { assumeThat(format) .as("ORC file format supports null values even for required fields.") .isNotEqualTo(FileFormat.ORC); // Next Code ... } 3๏ธโฃ COMMIT : MiniClusterWithClientResource ์ข ์์ฑ ์ ๊ฑฐ ๋ง์ง๋ง์ด ๋๋์ด, ์ด๋ฒ PR ์ ๊ฝ์ธ, TestIcebergSourceFailover ํด๋์ค์์ MiniClusterWithClientResource ์ข ์์ฑ์ ์ ๊ฑฐํ๋ ๊ฒ์ด์์ต๋๋ค. ์ด ๋ถ๋ถ์ ๋ํ ์ค๋ช ์ ์กฐ๊ธ ๊ธธ์ด์ง ์ ์์ผ๋, ๋จผ์ ASIS ์ฝ๋๋ฅผ ๋ณด๊ฒ ์ต๋๋ค. Before: import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.ThrowingConsumer; // ... @Test public void testBoundedWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testBoundedWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster)); } // ... @Test public void testContinuousWithTaskManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster)); } @Test public void testContinuousWithJobManagerFailover() throws Exception { runTestWithNewMiniCluster( miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster)); } // ... private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception { MiniClusterWithClientResource miniCluster = null; try { miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG); miniCluster.before(); testMethod.accept(miniCluster.getMiniCluster()); } finally { if (miniCluster != null) { miniCluster.after(); } } } ๋จผ์ , JUnit 5์์๋ @BeforeAll, @AfterAll, @BeforeEach, @AfterEach๋ฅผ ์ฌ์ฉํด ๋ฆฌ์์ค ์๋ช ์ฃผ๊ธฐ๋ฅผ ๋ค๋ฃจ๋ ๊ฒ ์ผ๋ฐ์ ์ ๋๋ค. ํ์ง๋ง ํด๋น ์ฝ๋๋ ํ ์คํธ๋ง๋ค miniCluster.before() / after()๋ฅผ ์๋์ผ๋ก ํธ์ถํฉ๋๋ค. ์ด๋ถ๋ถ์ด JUnit 4 ์คํ์ผ์ด๋ผ๊ณ ํ ์ ์์ต๋๋ค. ๋ํ, MiniClusterWithClientResource ๋ ๋ณดํต JUnit4์ ExternalResource ๋ฅผ ์์ํด์ ๊ตฌํํฉ๋๋ค. JUnit4์์๋ ๋ฆฌ์์ค ์ด๊ธฐํ์ ์ ๋ฆฌ๋ฅผ ์ํด @Rule ์ด๋ ธํ ์ด์ ์ ํตํด ExternalResource ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด๋, Flink ๊ฐ๋ฐ ์ด๊ธฐ ์์ ๊ณผ JUnit5 ๋์ ์์ ์ ์๊ธฐ์ ์ฐจ์ด ๋๋ฌธ์, ๊ธฐ์กด์ ์ ๋ง๋ค์ด์ง ํ ์คํธ ์ ํธ๋ฆฌํฐ๋ค์ด JUnit4 ์คํ์ผ๋ก ๋ง์ด ๋จ์ ์์ต๋๋ค. ์ด๋ฐ ์ด์ ๋๋ฌธ์, ์ฌ์ค ์์ ํ Junit4 ๋ฅผ ๋น์ฅ ๊ฑท์ด๋ด๋ ๊ฒ์ด ์ด๋ ต๋ค๊ณ ํ๋จํ ๊ฑฐ ๊ฐ์ต๋๋ค. ์ฆ, Flink ์ Minicluster ๋ฅผ ํ์ฉํ ํ ์คํธ์ฝ๋ ์์ฒด๊ฐ ์์ ํ JUnit5๋ก ์์ ํ ๋ง์ด๊ทธ๋ ์ด์ ๋์ง ์์ ์ํ์ด๊ธฐ ๋๋ฌธ์, Iceberg ์๋ ๋น์ฅ ์ ์ฉํ๊ธฐ ์ด๋ ต๋ค๋ ๊ฒ์ผ๋ก ์๊ฐ๋ฉ๋๋ค. ๊ณ์ ์ด์ด์ ์ค๋ช ํ์๋ฉด, ์ฌ๊ธฐ์๋ ํ ์คํธ๊ฐ ์คํ๋ ๋๋ง๋ค MiniClusterWithClientResource ์ธ์คํด์ค๋ฅผ ์ง์ ์์ฑํ๊ณ , ์๋์ผ๋ก before()์ after()๋ฅผ ํธ์ถํด์ ํด๋ฌ์คํฐ ์์๊ณผ ์ข ๋ฃ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ดํ๊ณ ์์ต๋๋ค. ํ ์คํธ ๋ฉ์๋๋ค์ MiniCluster๋ฅผ ์ง์ ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ง ์๊ณ , ๋๋ค ThrowingConsumer์ ๋๊ฒจ์ ๊ฐ์ ์ ์ผ๋ก ์ฒ๋ฆฌํฉ๋๋ค. ์ฆ, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ํ ์คํธ ์ฝ๋ ๋ด๋ถ์์ ์ง์ ์ ์ดํ๋ ํํ์ ๋๋ค. ๋ณดํต ์ด๋ฐ ๋ฐฉ์์ JUnit4์์ @Rule ๊ฐ์ ์๋ ๋ฆฌ์์ค ๊ด๋ฆฌ๊ฐ ์๊ฑฐ๋ ์ปค์คํ ์ํฉ์์ ์ฐ์ ๋๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ ์ฝ๋๋ก ์์ ์ ์งํํ์์ต๋๋ค. After: import org.apache.flink.test.junit5.InjectMiniCluster; // ... @Test public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.TM, miniCluster); } @Test public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testBoundedIcebergSource(FailoverType.JM, miniCluster); } // ... @Test public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.TM, miniCluster); } @Test public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception { testContinuousIcebergSource(FailoverType.JM, miniCluster); } ์ค์ ์, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฉด์์, MiniClusterWithClientResource ์๋ ์์ฑ ๋ฐ before()/after() ํธ์ถ ํ๋ ๋ถ๋ถ์ @InjectMiniCluster๋ฅผ ํตํด ์๋์ผ๋ก ์ฃผ์ ๋ฐ ๊ด๋ฆฌ๋ฅผ ํ ์ ์๊ฒ๋ ๋ณ๊ฒฝํ๋ค๋ ๋ถ๋ถ์ ๋๋ค. ๊ทธ๋ฆฌ๊ณ ํ ์คํธ ์ฝ๋ ์์ฑ ์, ๋๋ค ๋ฐฉ์์ผ๋ก ํ ์คํธ ์คํ, ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ์ง์ ์ ์ดํ๋ ๋ฐฉ์์์ ํ ์คํธ ๋ฉ์๋ ํ๋ผ๋ฏธํฐ๋ก ๋ฆฌ์์ค ์ฃผ์ ๋ฐ์ ์ฌ์ฉํ๋๋ก ๋ณ๊ฒฝํ์๊ณ , ExternalResource ๊ธฐ๋ฐ (JUnit4 Rule) ๋ฐฉ์์์ ParameterResolver ๋๋ Extension ๊ธฐ๋ฐ (JUnit5) ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํ์๋ค๋ ์ ์ด ์๊ฒ ์ต๋๋ค. ์ด๋ฌํ ๋ฐฉ์์ MainTainer ๋ค์ ํ ๋ก ์ ํ์๊ณ , ๊ด์ฐฎ์ ๋ฐฉ์์ด๋ผ๋ ๊ฒฐ๋ก ์ ๋ค๋ค๋ฅด๊ฒ ๋ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ , ๋ค๋ฅธ ์ปจํธ๋ฆฌ๋ทฐํฐ๋ค ์ญ์, ๊ด์ฐฎ๋ค๋ ๋ฆฌ๋ทฐ๋ฅผ ๋ฌ์์ฃผ์์ต๋๋ค. ๐ง MiniCluster ์ฃผ์ ํ ์คํ ์ค๋ฅ ๋ฐ์ ๋ฐ ์๋ ์๋ช ์ฃผ๊ธฐ ๊ด๋ฆฌ ์ฝ๋ ์ถ๊ฐ ๊ทธ๋ฌ๋, ์ด๋ถ๋ถ์ ๋ํ์ฌ ์ฒ์์๋ CI ํ ์คํธ์์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ์์ต๋๋ค. ํ ์คํธ๊ฐ 120์ด ์์ ๋๋์ง ์๊ณ Timeout์ผ๋ก ์คํจํ๋ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ๊ฒ์ ๋๋ค. @InjectMiniCluster๋ฅผ ํตํด MiniCluster๋ฅผ ํ ์คํธ ๋ฉ์๋์ ์ฃผ์ ๋ฐ์์ง๋ง ์ฃผ์ ๋ง ๋ ์ํ์ด์ง, ๋ช ์์ ์ผ๋ก start()๋ฅผ ํธ์ถํ์ง ์์ผ๋ฉด MiniCluster๊ฐ ์คํ๋์ง ์์๋ ๊ฒ์ผ๋ก ๋ณด์ ๋๋ค. ๊ฒฐ๊ตญ ํ ์คํธ ๋ก์ง์์ MiniCluster๋ฅผ ์ฌ์ฉํ๋ ค ํ์ ๋, ํด๋ฌ์คํฐ๊ฐ โ๋นํ์ฑ ์ํโ์ด๋ฏ๋ก, ์์ ์ด ์์๋์ง ์๊ฑฐ๋, ์คํ ์ค hang ์ด ๋ฐ์ํ ๊ฒ์ผ๋ก ๋ณด์ ๋๋ค. ์ข ๋ ์์ธ์ ์ฐพ์๋ณด๋. JUnit5์ ํ์ฅ ๋ชจ๋ธ์์๋ @Injectโฆ ๊ฐ์ ์ด๋ ธํ ์ด์ ์ผ๋ก ๊ฐ์ฒด ์ฃผ์ ์ ๊ฐ๋ฅํ์ง๋ง, ํด๋น ๊ฐ์ฒด์ ์๋ช ์ฃผ๊ธฐ(start/stop)๋ ์๋์ผ๋ก ๋ณด์ฅ๋์ง ์์ ์ ์๋ค๊ณ ํฉ๋๋ค. ๋ํ MiniCluster๋ ๋ช ์์ ์ผ๋ก .start() ํธ์ถ์ด ํ์ํ๊ณ , ์ฃผ์ ์ ๋จ์ํ ์์ฑ๋ง ํด์ฃผ๋ ๊ฒ์ด๋ฉฐ, ์คํ์ ๋ณ๋ ๋จ๊ณ๋ผ๋ ์์ธ์ ์ฐพ์ ์ ์์์ต๋๋ค. ๊ฒฐ๊ณผ์ ์ผ๋ก ํ ์คํธ๊ฐ MiniCluster์ ์์ ์ ์ ์ถํ๋ ค ํ์ง๋ง, MiniCluster๊ฐ ์์๋์ง ์์๊ณ , ์ด๋ ํ ์คํธ timeout (120์ด) ์ด๊ณผ๋ก ์คํจ๊น์ง ์ด์ด์ง๊ฒ ๋ ๊ฒ์ ๋๋ค. ๊ทธ๋์ TestIcebergSourceFailover ํด๋์ค์ ์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ์ถ๊ฐ ์ปค๋ฐํ์์ต๋๋ค. Add Code: import org.junit.jupiter.api.AfterEach; // AfterEach ์ถ๊ฐ @BeforeEach protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { if (!miniCluster.isRunning()) { miniCluster.start(); } } @AfterEach protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception { miniCluster.close(); } @BeforeEach์ @AfterEach๋ฅผ ํตํด MiniCluster์ ์์๊ณผ ์ข ๋ฃ๋ฅผ ๋ช ์์ ์ผ๋ก ์ ์ดํจ์ผ๋ก์จ, ํ ์คํธ ์คํ ์ ํ์ํ Flink ํด๋ฌ์คํฐ ํ๊ฒฝ์ด ์ ์์ ์ผ๋ก ์ด๊ธฐํ๋๊ณ ์ ๋ฆฌ๋ ์ ์๊ฒ ํ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ๊ทธ ๊ฒฐ๊ณผ, ์ต์ข ์ ์ผ๋ก๋ Iceberg ์ PR Merge ๋ฅผ ์ฑ๊ณตํ์ฌ, Apache ์ฌ๋จ์ Iceberg ํ๋ก์ ํธ์ ๊ธฐ์ฌํ ์ ์์๊ณ , Contributor ๊ฐ ๋ ์ ์์์ต๋๋ค. ๐ My PR: Iceberg Flink Catalog v2.0 Remove the MiniClusterWithClientResource dependency ๐น Local Test ๋ฐ CI ํต๊ณผ๋ฅผ ์ํ ์ ์ฐจ ์๋ด ์ฒ์ Iceberg ํ๋ก์ ํธ์ ๊ธฐ์ฌํ๋ฉด์ ๊ฐ์ฅ ํท๊ฐ๋ ธ๋ ๋ถ๋ถ ์ค ํ๋๊ฐ, ๋ก์ปฌ ํ ์คํธ์ CI ํต๊ณผ ๋ฐฉ์์ด์์ต๋๋ค. ํน์ ์ ์ฒ๋ผ Iceberg์ ๊ธฐ์ฌํ๊ณ ์ ํ๋ ๋ถ๋ค์ด ๊ณ์๋ค๋ฉด, ์๋ ์ ์ฐจ๋ฅผ ์ฐธ๊ณ ํ์๋ฉด ๋์์ด ๋ ๊ฒ์ ๋๋ค. โ ์ฝ๋ ์คํ์ผ ์ ๋ฆฌ: spotlessApply Iceberg๋ Spotless๋ฅผ ์ฌ์ฉํ์ฌ ์ฝ๋ ํฌ๋งทํ ์ ๊ฒ์ฌํฉ๋๋ค. ๋ฐ๋ผ์ PR์ ์์ฑํ๊ธฐ ์ ๋ฐ๋์ ์๋ ๋ช ๋ น์ด๋ฅผ ์คํํ์ฌ ์ฝ๋ ์คํ์ผ์ ์๋์ผ๋ก ์ ๋ฆฌํด์ผ ํฉ๋๋ค: ./gradlew :iceberg-flink:iceberg-flink-2.0:spotlessApply ์ด ๋ช ๋ น์ด๋ Java ์ฝ๋์ ์ ํด์ง ์คํ์ผ ๊ฐ์ด๋์ ๋ง๊ฒ ๋ค์ฌ์ฐ๊ธฐ, ์ ๋ ฌ, ๊ณต๋ฐฑ ๋ฑ์ ์๋์ผ๋ก ์์ ํด์ค๋๋ค. ์ด๋ฅผ ์ ์ฉํ์ง ์์ผ๋ฉด CI์์ spotlessCheck ๋จ๊ณ์์ ์คํจํ๊ฒ ๋ฉ๋๋ค. โ ๋ก์ปฌ ํ ์คํธ ์ํ: test โstacktrace CI์ ์ฌ๋ฆฌ๊ธฐ ์ ์, ๋ก์ปฌ ํ๊ฒฝ์์ ๋จผ์ ์ ๋ ํ ์คํธ๋ฅผ ์คํํ์ฌ ์ด์์ด ์๋์ง ํ์ธํด์ผ ํฉ๋๋ค: ./gradlew :iceberg-flink:iceberg-flink-2.0:test --stacktrace โstacktrace ์ต์ ์ ๋ง์ฝ ํ ์คํธ ์คํจ ์ ์์ธํ ์ค๋ฅ ๋ฉ์์ง๋ฅผ ํ์ธํ๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค. ํ ์คํธ๋ฅผ ํต๊ณผํ์ง ๋ชปํ๋ฉด CI ๋จ๊ณ์์๋ ๋์ผํ๊ฒ ์คํจํ๋ฏ๋ก, ๋ก์ปฌ์์ ๋ฐ๋์ ๋จผ์ ํ์ธํ๋ ๊ฒ์ด ์ข์ต๋๋ค. ์ฐธ๊ณ ๋ก, ์ ๋ flink 2.0 ์ ๋ํ์ฌ ํ ์คํธ๋ฅผ ์งํํ๋ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ์์ ๊ฐ์ด ํ์์ต๋๋ค. ./gradlew projects ๋ฅผ ํตํ์ฌ ํ ์คํธ๊ฐ ๊ฐ๋ฅํ ํ๋ก์ ํธ ๋ชฉ๋ก์ ํ์ธํ ํ์, ์ํ๋ ํ๋ก์ ํธ๋ฅผ ๊ฐ์ง๊ณ ์งํํด์ฃผ๋ฉด ๋๊ฒ ์ต๋๋ค. โ โ๏ธ 3. ๊ฒฐ๋ก ์ด๋ฒ ๊ธฐ์ฌ๋ฅผ ํตํด ๊ธฐ์ ์ ์ธ ์ญ๋๋ฟ๋ง ์๋๋ผ, ์คํ์์ค ํ๋ก์ ํธ์์์ ์ปค๋ฎค๋์ผ์ด์ ์ ์ค์์ฑ์ ๊น์ด ์ฒด๊ฐํ ์ ์์์ต๋๋ค. ๋จ์ํ ์ฝ๋๋ฅผ ์์ฑํ๊ณ ์ ์ถํ๋ ๊ฒ์ ๋์ด, Maintainer ๋ฐ ๋ค๋ฅธ ๊ธฐ์ฌ์๋ค๊ณผ์ ์ง์์ ์ธ ์ํต, ํผ๋๋ฐฑ ์์ฉ, ๊ทธ๋ฆฌ๊ณ ํ์ ๋ฐฉํฅ ์กฐ์จ์ด ์ผ๋ง๋ ์ค์ํ์ง ์ง์ ๊ฒฝํํ๊ฒ ๋์์ต๋๋ค. ํนํ ์ด์๋ฅผ ๊ณต์ ํ๊ณ , ์์ ๊ณํ์ ๋ช ํํ ์ ๋ฌํ๋ฉฐ, ๋ฆฌ๋ทฐ์ด์ ํผ๋๋ฐฑ์ ๋น ๋ฅด๊ฒ ๋ฐ์ํ๋ ๊ณผ์ ์ ํตํด ์คํ์์ค ์ํ๊ณ์ ํ์ ๋ฌธํ๊ฐ ์ ๋ขฐ์ ํจ์จ์ ๋ง๋ค์ด๋ด๋ ๊ตฌ์กฐ์์ ๋ฐฐ์ ์ต๋๋ค. ์ฒซ ๋ฒ์งธ PR์ ํตํด ๊ธ์ ์ ์ธ ํผ๋๋ฐฑ์ ๋ฐ์ ๋ค, Maintainer๋ก๋ถํฐ ์์ฒญ๋ฐ์๋ Flink 1.20 ๋ฐ 1.19 ๋ฒ์ ์ ๋ํ backport ์์ ์ญ์ ๋น ๋ฅด๊ฒ ์งํํ๊ณ ์ถ๋ค๋ ๋๊ธฐ๋ถ์ฌ๊ฐ ์๊ฒผ๊ณ , ๊ธฐ์ ์ ๊ธฐ์ฌ๋ฟ ์๋๋ผ ํ๋ก์ ํธ ์๊ตฌ์ ์ ์ํ ๋์ํ๋ ํ๋์ ์ค์์ฑ๋ ๋๊ผ์ต๋๋ค. ๋ํ, ์คํ์์ค ๋ฉํ ๋ง์์ ์ด์์ง ์ญํ ์ ๋งก์ ์ ์ ๊ธฐ์ฌ๋ฅผ ํ๋ฉด์ ๋์์ ๋ง์ ๋ฉํฐ๋ค์ด ์คํ์์ค์ ๋์ ํ๋ ๊ณผ์ ์ ์ง์ผ๋ณด๊ณ , ๊ทธ๋ถ๋ค๊ป ๋ต๊ธ๋ก ๋ฐฉํฅ์ฑ์ ์ ์ํ๋ ๊ฒฝํ์ ์ ์์ ์ ์ฑ์ฅ์๋ ํฐ ์๊ทน์ด ๋์์ต๋๋ค. ๋น๋ก ์ค๋ ํฌ์คํ ์์๋ ๋ฉํ ๋ง ๋ด์ฉ์ด ์์ธํ ๋ค๋ค์ง์ง ์์์ง๋ง, ์ ๋ ์์ผ๋ก๋ ์ด ๋ฉํ ๋ง ํ๋๊ณผ ์คํ์์ค ๊ธฐ์ฌ๋ฅผ ๊พธ์คํ ์ด์ด๊ฐ ๊ฒ์ด๋ฉฐ, ์ฐ๋ฆฌ๋๋ผ์ ์ฌ๋ฐ๋ฅด๊ณ ๊ฑด๊ฐํ ์คํ์์ค ๊ธฐ์ฌ ๋ฌธํ๊ฐ ์๋ฆฌ์ก๋๋ก ๋ ธ๋ ฅํ๋ 1์ธ๋๊ฐ ๋๊ณ ์ ํฉ๋๋ค. ์์ผ๋ก๋ ๋จ์ํ ์ฝ๋ ์ ๊ณต์ ๋์ด, ์ ๋ขฐ๋ฅผ ๋ฐํ์ผ๋ก ์ปค๋ฎค๋์ผ์ด์ ํ๊ณ , ํ๋ก์ ํธ์ ์ค์ง์ ์ธ ๊ฐ์น๋ฅผ ๋ํ๋ ๊ธฐ์ฌ์๊ฐ ๋๊ธฐ ์ํด ๊พธ์คํ ๋ ธ๋ ฅํ๊ฒ ์ต๋๋ค.
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
ยท 2025-05-23
๐ Kafka RetryTopic ๊ด๋ จ ๊ธฐ๋ณธ Template Bean ์ด๋ฆ ๋ณ๊ฒฝ
โ๏ธ 1. ์๋ก ๊ฐ๋ฐ์๋ก์ ์คํ์์ค์ ๊ธฐ์ฌํ๋ ์ผ์ ์ธ์ ๊ฐ ๊ผญ ํด๋ณด๊ณ ์ถ์ ๋ชฉํ ์ค ํ๋์์ต๋๋ค. ํ์ง๋ง ๋ง์ ์๋ํ๋ ค๊ณ ํ๋ฉด, ์ด๋์๋ถํฐ ์์ํด์ผ ํ ์ง ๋ง๋งํ ๊ฒ์ด ํ์ค์ด์์ต๋๋ค. ๊ทธ๋ฌ๋ ์ค, ์คํ์์ค ๊ธฐ์ฌ ๋ฐฉ๋ฒ์ ์๋ดํด์ฃผ๋ ์ปค๋ฎค๋ํฐ๋ฅผ ์๊ฒ ๋์๊ณ , ๊ทธ๊ณณ์์ ๋ฉํ ๋ง ํ๋ก๊ทธ๋จ์ ์ฐธ์ฌํ๊ฒ ๋์์ต๋๋ค. ์ด์๋ฅผ ์ฐพ๊ณ , ๊ธฐ์ฌํ ๋ถ๋ถ์ ๊ณ ๋ฏผํ๋ฉฐ, ๋ณธ๊ฒฉ์ ์ผ๋ก ์คํ์์ค์ ์ธ๊ณ์ ๋ฐ์ ๋ค์ด๊ฒ ๋์์ฃ . Kafka๋ฅผ ๊ณต๋ถํ๋ ์ด๋ ๋ , ๋ฌธ๋ Spring Kafka์ ๋ด๋ถ ๊ตฌํ์ด ๊ถ๊ธํด์ก๊ณ , ์์ฐ์ค๋ฝ๊ฒ GitHub ์ ์ฅ์๋ฅผ ๋ค์ฌ๋ค๋ณด๊ฒ ๋์์ต๋๋ค. ๊ฑฐ๊ธฐ์ ๋ฐ๊ฒฌํ ํ๋์ ์ด์ โ ์์ ๋ณด์ด์ง๋ง ์ฌ์ฉ์ ํผ๋์ ์ผ๊ธฐํ ์ ์๋ ๋ฌธ์ โ ๋ฅผ ๊ณ๊ธฐ๋ก ์์ ์ฒซ Pull Request๋ฅผ ๋ง๋ค๊ฒ ๋์๊ณ , ์ด ์ข๊ฒ๋ ๋จธ์ง๋๋ฉฐ Spring Kafka์ ๊ณต์ Contributor๊ฐ ๋ ์ ์์์ต๋๋ค. ์ด ๊ธ์์๋ ๊ทธ ์ฌ์ ์ ๊ณต์ ํ๋ฉฐ, ์ด๋ค ์ด์๋ฅผ ๋ฐ๊ฒฌํ๊ณ ์ด๋ป๊ฒ ๊ธฐ์ฌํ๋์ง๋ฅผ ์ ๋ฆฌํด๋ณด๋ ค ํฉ๋๋ค. ์คํ์์ค ๊ธฐ์ฌ๋ฅผ ๊ฟ๊พธ์ง๋ง ๋ง์ฐํ ๋ถ๋ค๊ป ์๊ฒ๋๋ง ๋์์ด ๋์์ผ๋ฉด ํฉ๋๋ค. โ โ๏ธ 2. ๋ณธ๋ก ๐ค 2.1. ์ด์ ์ ํ ๊ณผ์ ์ด์๋ฅผ ์ ์ ํ ๋ ๊ฐ์ฅ ์ค์ํ๊ฒ ์๊ฐํ ๊ธฐ์ค์ ๊ธฐ์ฌ๊ฐ ๋น๊ต์ ์ฌ์ด๊ฐ?, ๊ทธ๋ฆฌ๊ณ ํด๋น ํ๋ก์ ํธ์ ์์ค ์ฝ๋๋ฅผ ๊น์ด ์ดํดํ์ง ์๋๋ผ๋ ์์ ์ด ๊ฐ๋ฅํ๊ฐ? ์์ต๋๋ค. ์ด๋ฌํ ๊ธฐ์ค์ ๋ฐ๋ผ Apache ํ๋ก์ ํธ, Spring ํ๋ก์ ํธ ๋ฑ ์ฌ๋ฌ ์คํ์์ค ์ ์ฅ์๋ฅผ ๋๋ฌ๋ณด๋ฉฐ ์ ์ ํ ์ด์๋ฅผ ์ฐพ๊ธฐ ์์ํ์ต๋๋ค. ์ฒ์์๋ Apache Jackrabbit Oak ํ๋ก์ ํธ์์ ๊ธฐ์ฌํ ๋งํ ์ด์๋ฅผ ์ฐพ์ ์ ์์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ๋ค์์ ์ค๋ช ํ๊ฒ ์ง๋ง, ๊ฒฐ๊ณผ์ ์ผ๋ก ํด๋น PR์ ๋จธ์ง๋์ง ๋ชปํ๊ณ , ๋ค์ ์๋ก์ด ์ด์๋ฅผ ์ฐพ์ ๋์ฐ์ต๋๋ค. ๊ทธ๋ฌ๋ ์ค, Spring-Kafka ์์ KafkaTemplate Bean ์ด๋ฆ ๋ถ์ผ์น ์ด์ ๋ฅผ ๋ฐ๊ฒฌํ๊ฒ ๋์๊ณ , ๊ทธ ๋ด์ฉ์ ๋ฐํ์ผ๋ก ๊ธฐ์ฌํ๊ฒ ๋์์ต๋๋ค. ๐ 2.2. ์ด์: KafkaTemplate Bean ์ด๋ฆ ๋ถ์ผ์น Spring Kafka์ ๊ณต์ ๋ฌธ์์์๋ @RetryableTopic ์ด ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉํ KafkaTemplate ๋น์ ์ด๋ฆ์ defaultRetryTopicKafkaTemplate๋ผ๊ณ ๋ช ์ํ๊ณ ์์ต๋๋ค. ๊ทธ๋ฌ๋ ์ค์ @RetryableTopic ์ JavaDoc์์๋ ๋ค์๊ณผ ๊ฐ์ด ์ค๋ช ํ๊ณ ์์์ต๋๋ค. * If not specified, a bean with name {@code retryTopicDefaultKafkaTemplate} or {@code kafkaTemplate} will be looked up. ์ฆ, JavaDoc์๋ retryTopicDefaultKafkaTemplate, ๊ณต์ ๋ฌธ์์๋ defaultRetryTopicKafkaTemplate๋ผ๋ ์ด๋ฆ์ด ๋์ ์์๊ณ , ํ ์คํธ ์ฝ๋ ์ผ๋ถ๋ JavaDoc ์ชฝ ์ด๋ฆ์ ๋ฐ๋ผ๊ฐ๊ณ ์์์ต๋๋ค. ์ด๋ก ์ธํด ์ด๋ค ๋น ์ด๋ฆ์ ์ฌ์ฉํด์ผ ํ๋์ง ๋ช ํํ์ง ์์์ต๋๋ค. ๐ ํด๋น ์ด์๋ Spring Kafka GitHub Issue #3514 ์์ ๋ ผ์๋์์ต๋๋ค. โ๐ป 2.3. PR: JavaDoc ์ ์ ๋จผ์ , @RetryableTopic ์ด๋ ธํ ์ด์ ํด๋์ค์ JavaDoc์ ๊ณต์ ๋ฌธ์์ ์ผ์นํ๋๋ก ์์ ํ์ต๋๋ค. Before: * The bean name of the {@link org.springframework.kafka.core.KafkaTemplate} bean that * will be used to forward the message to the retry and Dlt topics. If not specified, * a bean with name {@code retryTopicDefaultKafkaTemplate} or {@code kafkaTemplate} * will be looked up. * * @return the kafkaTemplate bean name. After: * The bean name of the {@link org.springframework.kafka.core.KafkaTemplate} bean that * will be used to forward the message to the retry and Dlt topics. If not specified, * a bean with name {@code defaultRetryTopicKafkaTemplate} or {@code kafkaTemplate} * will be looked up. * * @return the kafkaTemplate bean name. bean ์ผ๋ก retryTopicDefaultKafkaTemplate ๋์ defaultRetryTopicKafkaTemplate ์ฌ์ฉํ๋ค๊ณ ์์ ์ ํ์์ต๋๋ค. ์ด ์์ ์ผ๋ก ๊ณต์ ๋ฌธ์์ JavaDoc์ด ์ผ์นํ๊ฒ ๋์๊ณ , ์ค์ ์ฌ์ฉ์๋ค์ด ํผ๋ ์์ด ์ฌ๋ฐ๋ฅธ ๋น ์ด๋ฆ์ ์ฌ์ฉํ ์ ์๊ฒ ๋์์ต๋๋ค. ๐ Commit Message โ Fix: Replace retryTopicDefaultKafkaTemplate with defaultRetryTopicKafkaTemplate in docs ๐ป 2.4. PR: ํ ์คํธ ์ฝ๋ ์ ์ ๋ํ ํ ์คํธ ์ฝ๋ ๋ด์์๋ ์๋ชป๋ ๋น ์ด๋ฆ ๋ฌธ์์ด์ ์ง์ ๋ช ์ํ๊ณ ์์์ต๋๋ค. RetryTopicConfigurationProviderTests ๋ผ๋ ํ ์คํธ ํด๋์ค์์ ์ฌ์ฉ ์ค์ด์๋๋ฐ, ์ด ๋ถ๋ถ์ ์์ RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME ์ ์ฌ์ฉํ๋๋ก ๋ฆฌํฉํฐ๋งํ์ต๋๋ค. Before: @Test void shouldProvideFromAnnotation() { // setup willReturn(kafkaOperations).given(beanFactory).getBean("retryTopicDefaultKafkaTemplate", KafkaOperations.class); // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean); RetryTopicConfiguration configurationFromClass = provider .findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isNotNull(); assertThat(configurationFromClass).isNotNull(); } @Test void shouldProvideFromMetaAnnotation() { // setup willReturn(kafkaOperations).given(beanFactory).getBean("retryTopicDefaultKafkaTemplate", KafkaOperations.class); // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean); RetryTopicConfiguration configurationFromClass = provider .findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isNotNull(); assertThat(configuration.getConcurrency()).isEqualTo(3); assertThat(configurationFromClass).isNotNull(); assertThat(configurationFromClass.getConcurrency()).isEqualTo(3); } After: @Test void shouldProvideFromAnnotation() { // setup willReturn(kafkaOperations).given(beanFactory).getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class); // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean); RetryTopicConfiguration configurationFromClass = provider .findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isNotNull(); assertThat(configurationFromClass).isNotNull(); } @Test void shouldProvideFromMetaAnnotation() { // setup willReturn(kafkaOperations).given(beanFactory).getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class); // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean); RetryTopicConfiguration configurationFromClass = provider .findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isNotNull(); assertThat(configuration.getConcurrency()).isEqualTo(3); assertThat(configurationFromClass).isNotNull(); assertThat(configurationFromClass.getConcurrency()).isEqualTo(3); } "retryTopicDefaultKafkaTemplate" ์ผ๋ก ๋์ด์๋ Bean ์ด๋ฆ ๋ถ๋ถ์ RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME ๋ก ๋ณ๊ฒฝํ์์ต๋๋ค. ์ด๋ฅผ ํตํด ๋น ์ด๋ฆ๋ ๋ณ๊ฒฝํ์๊ณ , ํ๋์ฝ๋ฉ๋ ๋ฌธ์์ด์ ์ ๊ฑฐํ๋ฉด์, ์์๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ์์ผ๋ก ์ฝ๋์ ์ผ๊ด์ฑ๊ณผ ์์ ์ฑ์ ํ๋ณดํ์ต๋๋ค. ์ถํ KafkaTemplate ๋น ์ด๋ฆ์ด ๋ณ๊ฒฝ๋๋๋ผ๋ ์์๋ง ์์ ํ๋ฉด ๋๋ฏ๋ก ์ ์ง๋ณด์๊ฐ ์ฉ์ดํด์ก์ต๋๋ค. ๐ Commit Message โ Fix: Replace retryTopicDefaultKafkaTemplate with RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME in Test Code ๊ทธ ๊ฒฐ๊ณผ, Spring Kafka ์ Contributor ๊ฐ ๋์๊ณ , ์ ์์ ์ฒซ ์คํ ์์ค ๊ธฐ์ฌ์ ์ฑ๊ณตํ๊ฒ ๋์์ต๋๋ค. ํด๋น PR ๋ด์ฉ์ ํ์ธํ๊ณ ์ถ๋ค๋ฉด, Spring Kafka PR ๊ธฐ์ฌ ์ฑ๊ณต ํด๋น ๋งํฌ๋ฅผ ํด๋ฆญํ์๊ธฐ ๋ฐ๋๋๋ค. ๐คฃ 2.5. ๋ฐฐ์: Apache Jackrabbit Oak ๊ธฐ์ฌ ์คํจ ๊ฒฝํ ์์์ ์ธ๊ธํ์์ง๋ง, Spring Kafka ๋ฅผ ๊ธฐ์ฌํ๊ธฐ ์ ์, Apache Jackrabbit Oak ์ ์ด์๋ฅผ ํ๋ ๋จผ์ ์ฐพ์ ์ ์์์ต๋๋ค. ๐ก Apache Jackrabbit Oak ? ํ์ผ์ด๋ ๋ฌธ์ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ํธ๋ฆฌ ๊ตฌ์กฐ๋ก ์ ์ฅํ๊ณ ๊ด๋ฆฌํด์ฃผ๋ ์์คํ ์ ๋๋ค. Java๋ก ๋ง๋ค์ด์ก๊ณ , Adobe Experience Manager(AEM) ๊ฐ์ ์ ๋ช ํ ์ฝํ ์ธ ๊ด๋ฆฌ ์์คํ ์์ ๋ด๋ถ ์ ์ฅ์๋ก ์ฌ์ฉ๋๊ณ ์์ต๋๋ค. ์๋ฅผ ๋ค๋ฉด, ์น์ฌ์ดํธ์์ ํ์ด์ง๋ฅผ ๋ง๋ค๊ฑฐ๋ ์ด๋ฏธ์ง๋ฅผ ์ ๋ก๋ํ ๋, ๊ทธ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๋ฐ Oak๊ฐ ์ฐ์ผ ์ ์์ต๋๋ค. ํ์ผ์ด๋ ํด๋๋ฅผ ๋๋ ํ ๋ฆฌ์ฒ๋ผ ๊ด๋ฆฌํ๊ณ , ๋๊ฐ ์ธ์ ๋ฐ๊ฟจ๋์ง ๊ธฐ๋ก๋ ๋จ๊ธธ ์ ์์ด์, ๋ฌธ์ ๊ด๋ฆฌ ์์คํ ์ด๋ CMS์ ๋ฑ ๋ง๋ ๊ตฌ์กฐ์ ๋๋ค. ๋น์ ์ด์ ๋ด์ฉ์ ๋ค์๊ณผ ๊ฐ์๋๋ฐ MAX_SEGMENT_SIZE ์์๊ฐ Segment์ SegmentDataUtil s์ ์ค๋ณต ์ ์๋์ด ์์ผ๋ ํ๋๋ก ๊ณต์ ํ ์ ์๊ฒ ํด๋ฌ๋ผ๋ ๋ด์ฉ์ด์์ต๋๋ค. Segment ๋ผ๋ ํด๋์ค์์ static final int MAX_SEGMENT_SIZE = 1 << 18; ๋ก ๋์ด์๋ ๋ถ๋ถ์ public ์ ๋ฌ์์ฃผ๊ณ , ๊ทธ๊ฑธ SegmentDataUtil ์์ ์ฌ์ฉ ํ ์ ์๊ฒ ํด์ฃผ๋๊ฒ ์ ๋ถ์์ต๋๋ค. ๊ทธ๋์ ์๋์ ๊ฐ์ด, ์ฝ๋๋ฅผ ์์ ํ๋ ค PR ์ ๋ณด๋์ต๋๋ค. ํ์ง๋ง ๊ฒฐ๊ณผ๋ Merge ์คํจ์์ต๋๋ค. ์ด์ ๋ ๊ฐ๋จํ์ต๋๋ค. ์ด๋ฏธ ์ ๋ณด๋ค ๋จผ์ ํด๋น ์ด์๋ฅผ ์บ์นํ๊ณ PR ์ ์ฌ๋ฆฐ ์ฌ๋์ด ์์๊ธฐ ๋๋ฌธ์ด์์ต๋๋ค. ์คํ ์์ค ์ํ๊ณ๋ฅผ ๋ณด๋ฉด, ์ฒ์์๋ ์ ๋ชฐ๋ผ์ ์ค์ ํ ์ ์๋ 2๊ฐ์ง๊ฐ ์์ต๋๋ค. ํ๋๋ ์๋ฃ ๋ ์ด์์ธ๋ฐ๋ ์ด์๊ฐ Close ๋์ด์์ง ์์๊ฒ ์์ ์ ์๋ค๋ ๊ฒ์ ๋๋ค. ๋ ๋ค๋ฅธ ํ๋๋ ๋๊ตฐ๊ฐ ํด๋น ์ด์๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด Assign ์ ๋ฐ์๋๋ฐ, ๊ทธ๊ฒ์ ๊ฐ๋ก์ฑ๋ ํ๋์ ํด์๋ ์๋๋ค๋ ๊ฒ์ ๋๋ค. ๋ณดํต์ ์ด์์ ๋๊ธ๋ก ๋ณธ์ธ์ด ํด๊ฒฐํ๊ฒ ๋ค๊ณ ๋ฆฌํ์ ๋ฌ์๋๊ณ ์งํ์ ํฉ๋๋ค. ๊ทธ๋ ๊ธฐ ๋๋ฌธ์, ์ด๋ ํ ์ด์๋ฅผ ํด๊ฒฐํ๊ณ ์ ํ๋ค๋ฉด, ๊ผญ ๋๊ธ์ ๋ณธ์ธ์ด ํด๊ฒฐํ๊ฒ ๋ค๋ ์์ฌ๋ฅผ ๋ฐํ๊ณ , ์งํํ๋ ๊ฒ์ด ์ข์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ๋ค๋ฅธ ์ฌ๋์ด ์งํ์ ํ๊ฒ ๋ค๊ณ ๋๊ธ์ ์ด๋ฏธ ๋จ๊ธด ์ํ๋ผ๋ฉด, ๊ทธ๊ฑธ ๊ฐ๋ก์ฑ์๋ ์๋ฉ๋๋ค. ๋ค๋ง, ๋๊ธ์ด ๋ฌ๋ฆฌ๊ณ ํ์ฐธ์ด ์ง๋ฌ๋๋ฐ๋ ํด๊ฒฐ์ด ๋์ง ์์ ์ด์๋ผ๋ฉด, ํด๋น ์ด์๊ฐ ํ์ฌ ํด๊ฒฐ ์ค์ธ์ง, ๊ทธ๊ฒ ์๋๋ผ๋ฉด ๋ด๊ฐ ํด๊ฒฐ์ ํด๋ ๋๋์ง ๋๊ธ๋ก ๋ฌผ์ด๋ณด๊ณ ์งํ์ ํ๋ ๋ฐฉํฅ๋ ์์ต๋๋ค. โ โ๏ธ 3. ๊ฒฐ๋ก ์ด๋ฒ ๊ธฐ์ฌ๋ฅผ ํตํด Spring Kafka ๋ฌธ์์ ์ฝ๋์ ๋ถ์ผ์น ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ฉฐ, ๋ณด๋ค ๋ช ํํ๊ณ ์ ์ง๋ณด์ํ๊ธฐ ์ฌ์ด ๊ธฐ๋ฐ์ ๋ง๋ จํ๋ ๋ฐ ์์ ๋ณดํฌ์ด ๋ ์ ์์์ต๋๋ค. ๋น๋ก ์๊ณ ์ฌ์ํ ๋ณ๊ฒฝ์ฒ๋ผ ๋ณด์ผ ์ ์์ง๋ง, ์ค์ ์ฌ์ฉ์ ๊ฒฝํ์ ์ง์ ์ ์ธ ์ํฅ์ ์ค ์ ์๋ค๋ ์ ์์ ๊ทธ ์๋ฏธ๋ ๊ฒฐ์ฝ ์์ง ์์์ต๋๋ค. ๋ฌด์๋ณด๋ค ์คํ์์ค ์ํ๊ณ์ ๋ฌธํ, ํ๋ก์ธ์ค, ํ์ ๋ฐฉ์์ ์ค์ ๋ก ์ฒดํํ๊ณ ๋ฐฐ์ธ ์ ์์๋ ๊ฐ์ง ๊ฒฝํ์ด์์ต๋๋ค. ์คํ์์ค ๊ธฐ์ฌ๋ฅผ ์ด๋ ต๊ฒ๋ง ๋๋ผ๋ ๋ถ๋ค๋ ๋ง๊ฒ ์ง๋ง, ์๋ฒฝํ๊ฒ ์๋ ์ํ์์ ์์ํ๋ ์ฌ๋์ ์์ต๋๋ค. ์ ์ญ์ ๋ฌธ์ ํ๋ ๊ณ ์น๊ณ , ํ ์คํธ ์ฝ๋ ํ ์ค ๋ฐ๊พธ๋ ๋ฐ์ ์์ํ์ต๋๋ค. ์์ ๊ธฐ์ฌ๋ ์ถฉ๋ถํ ์๋ฏธ ์๊ณ , ์คํ์์ค ์ปค๋ฎค๋ํฐ๋ ๊ทธ๋ฐ ๊ธฐ์ฌ๋ฅผ ์์คํ ์ฌ๊น๋๋ค. ๐จ โ์ค์ํ ๊ฑด ์๋ฒฝํจ์ด ์๋๋ผ ์์์ ๋๋ค. ์ง๊ธ ์ฌ๋ฌ๋ถ๋ ์ฒซ ๋ฐ์ ๋ด๋๋๋ณด์ธ์.โ
๐๏ธ ์คํ์์ค ๊ธฐ์ฌ
ยท 2024-10-09
<
>
Touch background to close