Overview
When using Apache Iceberg tables in a Flink streaming environment, it’s important to automate table maintenance operations such as snapshot expiration
, small file compaction
, and orphan file cleanup
.
Previously, these maintenance tasks were only available through Iceberg Spark Actions, requiring a separate Spark cluster. However, maintaining Spark infrastructure just for table optimization adds complexity and operational overhead.
The TableMaintenance
API in Apache Iceberg enables Flink jobs to execute these maintenance tasks natively within the same streaming job or as an independent Flink job, avoiding the need for external systems. This approach simplifies architecture, reduces costs, and improves automation.
Quick Start
Here’s a simple example that sets up automated maintenance for an Iceberg table.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("path/to/table");
TriggerLockFactory lockFactory = TriggerLockFactory.defaultLockFactory();
TableMaintenance.forTable(env, tableLoader, lockFactory)
.uidSuffix("my-maintenance-job")
.rateLimit(Duration.ofMinutes(10))
.lockCheckDelay(Duration.ofSeconds(10))
.add(ExpireSnapshots.builder()
.scheduleOnCommitCount(10)
.maxSnapshotAge(Duration.ofMinutes(10))
.retainLast(5)
.deleteBatchSize(5)
.parallelism(8))
.add(RewriteDataFiles.builder()
.scheduleOnDataFileCount(10)
.targetFileSizeBytes(128 * 1024 * 1024)
.partialProgressEnabled(true)
.partialProgressMaxCommits(10))
.append();
env.execute("Table Maintenance Job");
Configuration Options
TableMaintenance Builder
Method | Description | Default |
---|---|---|
uidSuffix(String) |
Unique identifier suffix for the job | Random UUID |
rateLimit(Duration) |
Minimum interval between task executions | 60 seconds |
lockCheckDelay(Duration) |
Delay for checking lock availability | 30 seconds |
parallelism(int) |
Default parallelism for maintenance tasks | System default |
maxReadBack(int) |
Max snapshots to check during initialization | 100 |
ExpireSnapshots Configuration
Method | Description | Default Value | Type |
---|---|---|---|
maxSnapshotAge(Duration) |
Maximum age of snapshots to retain | No limit | Duration |
retainLast(int) |
Minimum number of snapshots to retain | 1 | int |
deleteBatchSize(int) |
Number of files to delete in each batch | 1000 | int |
scheduleOnCommitCount(int) |
Trigger after N commits | No automatic scheduling | int |
scheduleOnDataFileCount(int) |
Trigger after N data files | No automatic scheduling | int |
scheduleOnDataFileSize(long) |
Trigger after total data file size (bytes) | No automatic scheduling | long |
scheduleOnIntervalSecond(long) |
Trigger after time interval (seconds) | No automatic scheduling | long |
parallelism(int) |
Parallelism for this specific task | Inherits from TableMaintenance | int |
RewriteDataFiles Configuration
Method | Description | Default Value | Type |
---|---|---|---|
targetFileSizeBytes(long) |
Target size for rewritten files | Table property or 512MB | long |
partialProgressEnabled(boolean) |
Enable partial progress commits | false | boolean |
partialProgressMaxCommits(int) |
Maximum commits for partial progress | 10 | int |
scheduleOnCommitCount(int) |
Trigger after N commits | 10 | int |
scheduleOnDataFileCount(int) |
Trigger after N data files | 1000 | int |
scheduleOnDataFileSize(long) |
Trigger after total data file size (bytes) | 100GB | long |
scheduleOnIntervalSecond(long) |
Trigger after time interval (seconds) | 600 (10 minutes) | long |
maxRewriteBytes(long) |
Maximum bytes to rewrite per execution | Long.MAX_VALUE | long |
parallelism(int) |
Parallelism for this specific task | Inherits from TableMaintenance | int |
Flink Configuration Options
You can also configure maintenance behavior through Flink configuration:
Configuration Key | Description | Default Value | Type |
---|---|---|---|
iceberg.maintenance.rate-limit-seconds |
Rate limit in seconds | 60 | long |
iceberg.maintenance.lock-check-delay-seconds |
Lock check delay in seconds | 30 | long |
iceberg.maintenance.rewrite.max-bytes |
Maximum rewrite bytes | Long.MAX_VALUE | long |
iceberg.maintenance.rewrite.schedule.commit-count |
Schedule on commit count | 10 | int |
iceberg.maintenance.rewrite.schedule.data-file-count |
Schedule on data file count | 1000 | int |
iceberg.maintenance.rewrite.schedule.data-file-size |
Schedule on data file size | 100GB | long |
iceberg.maintenance.rewrite.schedule.interval-second |
Schedule interval in seconds | 600 | long |
Complete Example
public class TableMaintenanceJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Enable checkpointing
// Configure table loader
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration),
TableIdentifier.of("database", "table")
);
// Set up maintenance with comprehensive configuration
TableMaintenance.forTable(env, tableLoader, TriggerLockFactory.defaultLockFactory())
.uidSuffix("production-maintenance")
.rateLimit(Duration.ofMinutes(15))
.lockCheckDelay(Duration.ofSeconds(30))
.parallelism(4)
// Daily snapshot cleanup
.add(ExpireSnapshots.builder()
.maxSnapshotAge(Duration.ofDays(7))
.retainLast(10)
.deleteBatchSize(1000)
.scheduleOnCommitCount(50)
.parallelism(2))
// Continuous file optimization
.add(RewriteDataFiles.builder()
.targetFileSizeBytes(256 * 1024 * 1024)
.minFileSizeBytes(32 * 1024 * 1024)
.scheduleOnDataFileCount(20)
.partialProgressEnabled(true)
.partialProgressMaxCommits(5)
.maxRewriteBytes(2L * 1024 * 1024 * 1024)
.parallelism(6))
.append();
env.execute("Iceberg Table Maintenance");
}
}
Scheduling Options
Maintenance tasks can be triggered based on various conditions:
Time-based Scheduling
ExpireSnapshots.builder()
.scheduleOnIntervalSecond(3600)
Commit-based Scheduling
RewriteDataFiles.builder()
.scheduleOnCommitCount(50)
Data Volume-based Scheduling
RewriteDataFiles.builder()
.scheduleOnDataFileCount(500)
.scheduleOnDataFileSize(50L * 1024 * 1024 * 1024)
IcebergSink with Post-Commit Integration
Apache Iceberg Sink V2 for Flink allows automatic execution of maintenance tasks after data is committed to the table, using the addPostCommitTopology(…) method.
IcebergSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.setAll(properties)
.addPostCommitTopology(
TableMaintenance.forTable(env, tableLoader, TriggerLockFactory.defaultLockFactory())
.rateLimit(Duration.ofMinutes(10))
.add(ExpireSnapshots.builder().scheduleOnCommitCount(10))
.add(RewriteDataFiles.builder().scheduleOnDataFileCount(50))
)
.append();
This approach executes maintenance tasks in the same job as the sink, enabling real-time optimization without running a separate job.
Lock Configuration Example
Iceberg uses a locking mechanism to prevent multiple Flink jobs from performing maintenance on the same table simultaneously. Locks are provided via the TriggerLockFactory and support either JDBC or ZooKeeper backends.
JDBC Lock Example
flink-maintenance.lock.type=jdbc
flink-maintenance.lock.lock-id=catalog.db.table
flink-maintenance.lock.jdbc.uri=jdbc:postgresql://localhost:5432/iceberg
flink-maintenance.lock.jdbc.user=flink
flink-maintenance.lock.jdbc.password=flinkpw
flink-maintenance.lock.jdbc.init-lock-tables=true
JDBC-based locking is recommended for most production environments.
ZooKeeper Lock Example
flink-maintenance.lock.type=zookeeper
flink-maintenance.lock.zookeeper.uri=zk://zk1:2181,zk2:2181
flink-maintenance.lock.zookeeper.session-timeout-ms=60000
flink-maintenance.lock.zookeeper.connection-timeout-ms=15000
flink-maintenance.lock.zookeeper.max-retries=3
flink-maintenance.lock.zookeeper.base-sleep-ms=3000
Use ZooKeeper-based locks only in high-availability or multi-process coordination environments.
Best Practices
Resource Management
- Use dedicated slot sharing groups for maintenance tasks
- Set appropriate parallelism based on cluster resources
- Enable checkpointing for fault tolerance
Scheduling Strategy
- Avoid too frequent executions with
rateLimit
- Use
scheduleOnCommitCount
for write-heavy tables - Use
scheduleOnDataFileCount
for fine-grained control
Performance Tuning
- Adjust
deleteBatchSize
based on storage performance - Enable
partialProgressEnabled
for large rewrite operations - Set reasonable
maxRewriteBytes
limits
Troubleshooting
Common Issues
OutOfMemoryError during file deletion:
.deleteBatchSize(500)
Lock conflicts:
.lockCheckDelay(Duration.ofMinutes(1))
.rateLimit(Duration.ofMinutes(10))
Slow rewrite operations:
.partialProgressEnabled(true)
.partialProgressMaxCommits(3)
.maxRewriteBytes(1024 * 1024 * 1024)