Home > πŸ–ŠοΈ μ˜€ν”ˆμ†ŒμŠ€ κΈ°μ—¬ > πŸ‘‰ Apache Iceberg > πŸ“˜ Flink 1.19 및 1.20에 MiniClusterWithClientResource 쒅속성 제거 Backport

πŸ“˜ Flink 1.19 및 1.20에 MiniClusterWithClientResource 쒅속성 제거 Backport
OpenSource PR Iceberg Flink

✏️ 1. μ„œλ‘ 


이전 ν¬μŠ€νŒ…μ—μ„œ μ†Œκ°œν•œ κ²ƒμ²˜λŸΌ, Iceberg Flink Catalog v2.0μ—μ„œ MiniClusterWithClientResource 쒅속성 μ œκ±°μ— μ„±κ³΅ν–ˆμŠ΅λ‹ˆλ‹€. 이에 따라 ν•΄λ‹Ή 이슈의 μ—°μž₯μ„ μ΄μž λ©”μΈν…Œμ΄λ„ˆμ˜ μš”μ²­μ— 따라, λ™μΌν•œ λ³€κ²½ 사항을 Flink 1.19와 1.20 버전에도 λ°±ν¬νŒ…ν•˜κΈ°λ‘œ ν–ˆμŠ΅λ‹ˆλ‹€.
(ν˜„μž¬ IcebergλŠ” 1.9.1 버전을 μ‚¬μš© 쀑이며, Flink CatalogλŠ” 1.19, 1.20, 그리고 2.0 버전을 μ§€μ›ν•©λ‹ˆλ‹€.)

이번 μž‘μ—…μ€ 이전 ν¬μŠ€νŒ…μΈ

πŸ“˜ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource 쒅속성 제거 의 μ—°μž₯μ„ μ΄μž, μœ μ‚¬ν•œ λ‚΄μš©μ΄ λ§Žμ•„, 비ꡐ적 짧게 정리해보렀 ν•©λ‹ˆλ‹€.




β—ˆ


✏️ 2. 본둠

β€πŸ’» 2.1. PR: MiniClusterWithClientResource 쒅속성 제거 Backporting


이번 μž‘μ—… μ—­μ‹œ 이전 PR의 μ—°μž₯선에 있기 λ•Œλ¬Έμ—, 이슈 μ„ μ • 과정에 λŒ€ν•œ μžμ„Έν•œ μ„€λͺ…은 μƒλž΅ν•©λ‹ˆλ‹€.

μ•„λž˜μ™€ 같이, λ°±ν¬νŒ…μ— λŒ€ν•œ μš”μ²­μ΄ μžˆμ—ˆλ‹€λŠ” λ‚΄μš©μ„ λ‚¨κΉλ‹ˆλ‹€.

1

λ”°λΌμ„œ 이번 ν¬μŠ€νŒ…μ—μ„œλŠ” κ³§λ°”λ‘œ PR κ³Όμ •λΆ€ν„° λ‹€λ£¨κ² μŠ΅λ‹ˆλ‹€.

이슈 μ„ μ • λ°°κ²½μ΄λ‚˜ PR 전체 흐름이 κΆκΈˆν•˜μ‹  뢄듀은 이전 ν¬μŠ€νŒ…μΈ

πŸ“˜ Iceberg Flink Catalog v2.0 MiniClusterWithClientResource 쒅속성 제거 을 μ°Έκ³ ν•΄μ£Όμ‹œκΈ° λ°”λžλ‹ˆλ‹€.

1️⃣ COMMIT : v1.19 Backporting

κΈ°μ‘΄ 2.0의 TestIcebergSourceFailover ν΄λž˜μŠ€μ™€λŠ” 크게 λ‹€λ₯Έ 뢀뢄이 μ—†μ—ˆμŠ΅λ‹ˆλ‹€. 쀑간에 κ΅¬ν˜„λœ μ½”λ“œκ°€ 약간은 λ‹€λ₯΄κΈ΄ ν•˜μ˜€μ§€λ§Œ, μ œκ°€ μˆ˜μ •ν•΄μ•Όν•˜λŠ” λ²”μœ„μ™€λŠ” λ¬΄κ΄€ν•œ λΆ€λΆ„μ΄μ—ˆκΈ° λ•Œλ¬Έμ— λ„˜μ–΄κ°”μŠ΅λ‹ˆλ‹€.

ν•΄λ‹Ή μ½”λ“œμ˜ 이전 버전은 μ•„λž˜μ™€ κ°™μ•˜μŠ΅λ‹ˆλ‹€. ( v2.0 κ³Ό 동일 )

Before:

import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.function.ThrowingConsumer;

// ...

@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
    runTestWithNewMiniCluster(
        miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster));
}

@Test
public void testBoundedWithJobManagerFailover() throws Exception {
    runTestWithNewMiniCluster(
        miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster));
}
  
// ...

@Test
public void testContinuousWithTaskManagerFailover() throws Exception {
    runTestWithNewMiniCluster(
        miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster));
}

@Test
public void testContinuousWithJobManagerFailover() throws Exception {
    runTestWithNewMiniCluster(
        miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster));
}

// ...

private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception {
    MiniClusterWithClientResource miniCluster = null;
    try {
        miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG);
	    miniCluster.before();
        testMethod.accept(miniCluster.getMiniCluster());
    } finally {
        if (miniCluster != null) {
          miniCluster.after();
        }
    }
}

κ·Έλž˜μ„œ μ•„λž˜μ™€ 같은 μ½”λ“œλ‘œ μˆ˜μ •μ„ λ™μΌν•˜κ²Œ μ§„ν–‰ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

After:

import org.apache.flink.test.junit5.InjectMiniCluster;
import org.junit.jupiter.api.AfterEach;

// ...

@BeforeEach
protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    if (!miniCluster.isRunning()) {
      miniCluster.start();
    }
}

@AfterEach
protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    miniCluster.close();
}

// ...

@Test
public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    testBoundedIcebergSource(FailoverType.TM, miniCluster);
}
  
@Test
public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    testBoundedIcebergSource(FailoverType.JM, miniCluster);
}
  
// ...
  
@Test
public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    testContinuousIcebergSource(FailoverType.TM, miniCluster);
}

@Test
public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
    testContinuousIcebergSource(FailoverType.JM, miniCluster);
}

μ΄λ ‡κ²Œ μ½”λ“œλ₯Ό μˆ˜μ •ν•˜κ²Œ 된 μ΄μœ μ™€, Minicluster 의 μ—­ν•  그리고 InjectMiniCluster μ™€μ˜ 차이에 λŒ€ν•΄μ„œ μ—­μ‹œ 이전 ν¬μŠ€νŒ…μ— μž‘μ„± λ˜μ–΄μžˆμŠ΅λ‹ˆλ‹€.

κ±°λ“­ λ§μ”€λ“œλ¦¬μ§€λ§Œ, 이전 ν¬μŠ€νŒ…μ„ κΌ­ μ°Έκ³ ν•˜μ‹œκΈ° λ°”λžλ‹ˆλ‹€. :D


2️⃣ COMMIT : v1.20 Backporting

v1.20 λ°±ν¬νŒ… μž‘μ—…μ€, v1.19 와 μ™„μ „ λ™μΌν•˜κ²Œ μ§„ν–‰λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

μ½”λ“œ μ—­μ‹œ μ™„μ „ λ™μΌν•˜κ²Œ μˆ˜μ •ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

κ·Έλ¦¬ν•˜μ—¬, μ΅œμ’…μ μœΌλ‘œλŠ” μ•„λž˜μ™€ 같은 PR 을 μ „μ†‘ν•˜μ˜€κ³ , μ„±κ³΅μ μœΌλ‘œ Merge κ°€ λ˜μ—ˆμŠ΅λ‹ˆλ‹€.


πŸ‘‰ My PR: Backporting Removal of MiniClusterWithClientResource from Iceberg Flink Catalog v1.19, v1.20



1


🍹 쀑간에 λ§ˆμ£Όν•œ μ•½κ°„μ˜ 이슈

이전 ν¬μŠ€νŒ…μ—μ„œ Local ν…ŒμŠ€νŠΈμ™€, CI ν…ŒμŠ€νŠΈλ₯Ό μ„±κ³΅ν•˜κΈ° μœ„ν•˜μ—¬ ν•„μš”ν•œ λͺ‡ κ°€μ§€ Gradle λͺ…λ Ήμ–΄λ₯Ό λ§μ”€λ“œλ ΈμŠ΅λ‹ˆλ‹€.

μ΄λ²ˆμ— μ—­μ‹œ λ™μΌν•œ 과정을 κ±°μ³€κ³ , μ½”λ“œ μŠ€νƒ€μΌμ„ λ§žμΆ”κΈ° μœ„ν•΄ ./gradlew :iceberg-flink:iceberg-flink-1.19:spotlessApply 와 ./gradlew :iceberg-flink:iceberg-flink-1.20:spotlessApply λͺ…λ Ήμ–΄λ₯Ό μ‚¬μš©ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

κ·Έλž¬λ”λ‹ˆ μ•„λž˜μ™€ 같은 였λ₯˜κ°€ λ°œμƒν•˜κ²Œ λ©λ‹ˆλ‹€.

1

μ—λŸ¬λ₯Ό 보면, Gradle이 :iceberg-flink:iceberg-flink-1.20:spotlessApply λΌλŠ” νƒœμŠ€ν¬(ν˜Ήμ€ λͺ¨λ“ˆ)λ₯Ό μ‹€ν–‰ν•˜λ € ν–ˆμ§€λ§Œ, iceberg-flink ν”„λ‘œμ νŠΈ μ•„λž˜μ— iceberg-flink-1.20 μ΄λΌλŠ” μ„œλΈŒν”„λ‘œμ νŠΈκ°€ μ‘΄μž¬ν•˜μ§€ μ•Šλ‹€λŠ” λ‚΄μš©μž…λ‹ˆλ‹€. μ‘΄μž¬ν•˜λŠ” μ„œλΈŒν”„λ‘œμ νŠΈ ν›„λ³΄λŠ” iceberg-flink-2.0 이 μžˆλ‹€κ³  μΉœμ ˆν•˜κ²Œ μ•Œλ €μ£Όκ³  μžˆμ§€λ§Œ, μš°λ¦¬μ—κ²Œ ν•„μš”ν•œκ±΄ 1.19 와 1.20 λ²„μ „μž…λ‹ˆλ‹€.

μ €λŠ” μ²˜μŒμ—λŠ” 2.0 으둜 해도, μžλ™μœΌλ‘œ 1.19 와 1.20 μ½”λ“œ μŠ€νƒ€μΌλ„ 맞좰질 쀄 μ•Œμ•˜λŠ”λ° 그게 μ•„λ‹ˆμ—ˆμŠ΅λ‹ˆλ‹€.

κ·Έλž˜μ„œ μ•„λž˜μ™€ 같이 μ§ˆλ¬Έμ„ ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

1

κ·Έλž˜μ„œ ./gradlew properties λͺ…λ Ήμ–΄λ₯Ό 톡해 μ°Ύμ•„λ³΄μ•˜λ”λ‹ˆ, μ—­μ‹œ systemProp.defaultFlinkVersions: 2.0 으둜, λ˜μ–΄μžˆμ—ˆμŠ΅λ‹ˆλ‹€.

κ·Έλž˜μ„œ vi 둜 gradle.properties λ₯Ό μ—΄μ–΄μ£Όμ—ˆκ³ , enableFlink1.19=true κ³Ό, enableFlink1.20=true 을 μΆ”κ°€ν•΄μ€€ λ’€ spotlessApply λͺ…λ Ήμ–΄λ₯Ό μ‹€ν–‰ν•΄μ£Όμ—ˆμŠ΅λ‹ˆλ‹€.

κ·Έλž¬λ”λ‹ˆ μ •μƒμ μœΌλ‘œ μ„±κ³΅ν•˜μ˜€κ³ , PR ν›„ Merge κΉŒμ§€ μ„±κ³΅ν•˜μ˜€μŠ΅λ‹ˆλ‹€.




β—ˆ


✏️ 3. 결둠


이번 μž‘μ—…μ€ Iceberg Flink Catalog v2.0μ—μ„œ μ§„ν–‰ν–ˆλ˜ MiniClusterWithClientResource 제거 μž‘μ—…μ„ Flink 1.19 및 1.20 버전에 μ„±κ³΅μ μœΌλ‘œ λ°±ν¬νŒ…ν•œ μ‚¬λ‘€μ˜€μŠ΅λ‹ˆλ‹€.

μ½”λ“œ λ³€κ²½ μžμ²΄λŠ” 이전과 거의 λ™μΌν–ˆμ§€λ§Œ, Gradle μ„€μ • 및 둜컬 ν…ŒμŠ€νŠΈ ν™˜κ²½ μ„€μ •μ—μ„œ λ°œμƒν•œ μž‘μ€ 이슈λ₯Ό ν•΄κ²°ν•˜λ©΄μ„œ, Iceberg ν”„λ‘œμ νŠΈμ˜ Gradle ꡬ성과 μ„œλΈŒλͺ¨λ“ˆ ν™œμ„±ν™” 방식에 λŒ€ν•΄ 더 깊이 이해할 수 μžˆλŠ” 계기가 λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

이번 κ²½ν—˜μ„ λ°”νƒ•μœΌλ‘œ μ•žμœΌλ‘œλ„ μ˜€ν”ˆμ†ŒμŠ€ ν”„λ‘œμ νŠΈμ— λ”μš± 적극적으둜 κΈ°μ—¬ν•˜λ©°, λ‹€μ–‘ν•œ 문제 상황을 μ£Όλ„μ μœΌλ‘œ ν•΄κ²°ν•΄λ‚˜κ°ˆ 수 μžˆλ„λ‘ κΎΈμ€€νžˆ μ—­λŸ‰μ„ ν‚€μ›Œκ°€κ² μŠ΅λ‹ˆλ‹€. πŸ’ͺ