βοΈ 1. μλ‘
μ΄μ ν¬μ€ν
μμ μκ°ν κ²μ²λΌ, Iceberg Flink Catalog v2.0μμ MiniClusterWithClientResource
μ’
μμ± μ κ±°μ μ±κ³΅νμ΅λλ€. μ΄μ λ°λΌ ν΄λΉ μ΄μμ μ°μ₯μ μ΄μ λ©μΈν
μ΄λμ μμ²μ λ°λΌ, λμΌν λ³κ²½ μ¬νμ Flink 1.19μ 1.20 λ²μ μλ λ°±ν¬ν
νκΈ°λ‘ νμ΅λλ€.
(νμ¬ Icebergλ 1.9.1 λ²μ μ μ¬μ© μ€μ΄λ©°, Flink Catalogλ 1.19, 1.20, κ·Έλ¦¬κ³ 2.0 λ²μ μ μ§μν©λλ€.)
μ΄λ² μμ μ μ΄μ ν¬μ€ν μΈ
π Iceberg Flink Catalog v2.0 MiniClusterWithClientResource μ’ μμ± μ κ±° μ μ°μ₯μ μ΄μ, μ μ¬ν λ΄μ©μ΄ λ§μ, λΉκ΅μ μ§§κ² μ 리ν΄λ³΄λ € ν©λλ€.
βοΈ 2. λ³Έλ‘
βπ» 2.1. PR: MiniClusterWithClientResource μ’ μμ± μ κ±° Backporting
μ΄λ² μμ μμ μ΄μ PRμ μ°μ₯μ μ μκΈ° λλ¬Έμ, μ΄μ μ μ κ³Όμ μ λν μμΈν μ€λͺ μ μλ΅ν©λλ€.
μλμ κ°μ΄, λ°±ν¬ν μ λν μμ²μ΄ μμλ€λ λ΄μ©μ λ¨κΉλλ€.
λ°λΌμ μ΄λ² ν¬μ€ν μμλ κ³§λ°λ‘ PR κ³Όμ λΆν° λ€λ£¨κ² μ΅λλ€.
μ΄μ μ μ λ°°κ²½μ΄λ PR μ 체 νλ¦μ΄ κΆκΈνμ λΆλ€μ μ΄μ ν¬μ€ν μΈ
π Iceberg Flink Catalog v2.0 MiniClusterWithClientResource μ’ μμ± μ κ±° μ μ°Έκ³ ν΄μ£ΌμκΈ° λ°λλλ€.
1οΈβ£ COMMIT : v1.19 Backporting
κΈ°μ‘΄ 2.0μ TestIcebergSourceFailover ν΄λμ€μλ ν¬κ² λ€λ₯Έ λΆλΆμ΄ μμμ΅λλ€. μ€κ°μ ꡬνλ μ½λκ° μ½κ°μ λ€λ₯΄κΈ΄ νμμ§λ§, μ κ° μμ ν΄μΌνλ λ²μμλ 무κ΄ν λΆλΆμ΄μκΈ° λλ¬Έμ λμ΄κ°μ΅λλ€.
ν΄λΉ μ½λμ μ΄μ λ²μ μ μλμ κ°μμ΅λλ€. ( v2.0 κ³Ό λμΌ )
Before:
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.function.ThrowingConsumer;
// ...
@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
runTestWithNewMiniCluster(
miniCluster -> testBoundedIcebergSource(FailoverType.TM, miniCluster));
}
@Test
public void testBoundedWithJobManagerFailover() throws Exception {
runTestWithNewMiniCluster(
miniCluster -> testBoundedIcebergSource(FailoverType.JM, miniCluster));
}
// ...
@Test
public void testContinuousWithTaskManagerFailover() throws Exception {
runTestWithNewMiniCluster(
miniCluster -> testContinuousIcebergSource(FailoverType.TM, miniCluster));
}
@Test
public void testContinuousWithJobManagerFailover() throws Exception {
runTestWithNewMiniCluster(
miniCluster -> testContinuousIcebergSource(FailoverType.JM, miniCluster));
}
// ...
private static void runTestWithNewMiniCluster(ThrowingConsumer<MiniCluster, Exception> testMethod) throws Exception {
MiniClusterWithClientResource miniCluster = null;
try {
miniCluster = new MiniClusterWithClientResource(MINI_CLUSTER_RESOURCE_CONFIG);
miniCluster.before();
testMethod.accept(miniCluster.getMiniCluster());
} finally {
if (miniCluster != null) {
miniCluster.after();
}
}
}
κ·Έλμ μλμ κ°μ μ½λλ‘ μμ μ λμΌνκ² μ§ννμμ΅λλ€.
After:
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.junit.jupiter.api.AfterEach;
// ...
@BeforeEach
protected void startMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
if (!miniCluster.isRunning()) {
miniCluster.start();
}
}
@AfterEach
protected void stopMiniCluster(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
miniCluster.close();
}
// ...
@Test
public void testBoundedWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
testBoundedIcebergSource(FailoverType.TM, miniCluster);
}
@Test
public void testBoundedWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
testBoundedIcebergSource(FailoverType.JM, miniCluster);
}
// ...
@Test
public void testContinuousWithTaskManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
testContinuousIcebergSource(FailoverType.TM, miniCluster);
}
@Test
public void testContinuousWithJobManagerFailover(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
testContinuousIcebergSource(FailoverType.JM, miniCluster);
}
μ΄λ κ² μ½λλ₯Ό μμ νκ² λ μ΄μ μ, Minicluster μ μν κ·Έλ¦¬κ³ InjectMiniCluster μμ μ°¨μ΄μ λν΄μ μμ μ΄μ ν¬μ€ν μ μμ± λμ΄μμ΅λλ€.
κ±°λ λ§μλ리μ§λ§, μ΄μ ν¬μ€ν μ κΌ μ°Έκ³ νμκΈ° λ°λλλ€. :D
2οΈβ£ COMMIT : v1.20 Backporting
v1.20 λ°±ν¬ν μμ μ, v1.19 μ μμ λμΌνκ² μ§νλμμ΅λλ€.
μ½λ μμ μμ λμΌνκ² μμ νμμ΅λλ€.
그리νμ¬, μ΅μ’ μ μΌλ‘λ μλμ κ°μ PR μ μ μ‘νμκ³ , μ±κ³΅μ μΌλ‘ Merge κ° λμμ΅λλ€.
π My PR: Backporting Removal of MiniClusterWithClientResource from Iceberg Flink Catalog v1.19, v1.20
πΉ μ€κ°μ λ§μ£Όν μ½κ°μ μ΄μ
μ΄μ ν¬μ€ν μμ Local ν μ€νΈμ, CI ν μ€νΈλ₯Ό μ±κ³΅νκΈ° μνμ¬ νμν λͺ κ°μ§ Gradle λͺ λ Ήμ΄λ₯Ό λ§μλλ Έμ΅λλ€.
μ΄λ²μ μμ λμΌν κ³Όμ μ κ±°μ³€κ³ , μ½λ μ€νμΌμ λ§μΆκΈ° μν΄ ./gradlew :iceberg-flink:iceberg-flink-1.19:spotlessApply
μ ./gradlew :iceberg-flink:iceberg-flink-1.20:spotlessApply
λͺ
λ Ήμ΄λ₯Ό μ¬μ©νμμ΅λλ€.
κ·Έλ¬λλ μλμ κ°μ μ€λ₯κ° λ°μνκ² λ©λλ€.
μλ¬λ₯Ό 보면, Gradleμ΄ :iceberg-flink:iceberg-flink-1.20:spotlessApply λΌλ νμ€ν¬(νΉμ λͺ¨λ)λ₯Ό μ€ννλ € νμ§λ§, iceberg-flink νλ‘μ νΈ μλμ iceberg-flink-1.20 μ΄λΌλ μλΈνλ‘μ νΈκ° μ‘΄μ¬νμ§ μλ€λ λ΄μ©μ λλ€. μ‘΄μ¬νλ μλΈνλ‘μ νΈ ν보λ iceberg-flink-2.0 μ΄ μλ€κ³ μΉμ νκ² μλ €μ£Όκ³ μμ§λ§, μ°λ¦¬μκ² νμν건 1.19 μ 1.20 λ²μ μ λλ€.
μ λ μ²μμλ 2.0 μΌλ‘ ν΄λ, μλμΌλ‘ 1.19 μ 1.20 μ½λ μ€νμΌλ λ§μΆ°μ§ μ€ μμλλ° κ·Έκ² μλμμ΅λλ€.
κ·Έλμ μλμ κ°μ΄ μ§λ¬Έμ νμμ΅λλ€.
κ·Έλμ ./gradlew properties
λͺ
λ Ήμ΄λ₯Ό ν΅ν΄ μ°Ύμ보μλλ, μμ systemProp.defaultFlinkVersions: 2.0 μΌλ‘, λμ΄μμμ΅λλ€.
κ·Έλμ vi λ‘ gradle.properties λ₯Ό μ΄μ΄μ£Όμκ³ , enableFlink1.19=true
κ³Ό, enableFlink1.20=true
μ μΆκ°ν΄μ€ λ€ spotlessApply λͺ
λ Ήμ΄λ₯Ό μ€νν΄μ£Όμμ΅λλ€.
κ·Έλ¬λλ μ μμ μΌλ‘ μ±κ³΅νμκ³ , PR ν Merge κΉμ§ μ±κ³΅νμμ΅λλ€.
βοΈ 3. κ²°λ‘
μ΄λ² μμ μ Iceberg Flink Catalog v2.0μμ μ§ννλ MiniClusterWithClientResource μ κ±° μμ μ Flink 1.19 λ° 1.20 λ²μ μ μ±κ³΅μ μΌλ‘ λ°±ν¬ν ν μ¬λ‘μμ΅λλ€.
μ½λ λ³κ²½ μ체λ μ΄μ κ³Ό κ±°μ λμΌνμ§λ§, Gradle μ€μ λ° λ‘컬 ν μ€νΈ νκ²½ μ€μ μμ λ°μν μμ μ΄μλ₯Ό ν΄κ²°νλ©΄μ, Iceberg νλ‘μ νΈμ Gradle ꡬμ±κ³Ό μλΈλͺ¨λ νμ±ν λ°©μμ λν΄ λ κΉμ΄ μ΄ν΄ν μ μλ κ³κΈ°κ° λμμ΅λλ€.
μ΄λ² κ²½νμ λ°νμΌλ‘ μμΌλ‘λ μ€νμμ€ νλ‘μ νΈμ λμ± μ κ·Ήμ μΌλ‘ κΈ°μ¬νλ©°, λ€μν λ¬Έμ μν©μ μ£Όλμ μΌλ‘ ν΄κ²°ν΄λκ° μ μλλ‘ κΎΈμ€ν μλμ ν€μκ°κ² μ΅λλ€. πͺ