Skip to content

Commit

Permalink
[Managed Iceberg] Create tables if needed (apache#32686)
Browse files Browse the repository at this point in the history
* create missing iceberg tables

* trigger iceberg integration tests

* add to changes md

* add to java doc

* trigger iceberg integration tests
  • Loading branch information
ahmedabu98 authored and reeba212 committed Dec 4, 2024
1 parent 0ed47ea commit e91a3e1
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 2
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -106,6 +107,14 @@
* <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below,
* for Iceberg writes.</b>
*
* <h3>Creating Tables</h3>
*
* <p>If an Iceberg table does not exist at the time of writing, this connector will automatically
* create one with the data's schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog} implementation.
* Some implementations may not support creating a table using the Iceberg API.
*
* <h3>Beam Rows</h3>
*
* <p>Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
Expand All @@ -41,7 +43,13 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
Expand All @@ -66,6 +74,7 @@
* #getSerializableDataFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class);
/**
* Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per
* partition and manages them in a {@link Cache}.
Expand All @@ -79,8 +88,8 @@ class DestinationState {
private final PartitionKey partitionKey;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Cache<PartitionKey, RecordWriter> writers;
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand Down Expand Up @@ -186,6 +195,8 @@ private RecordWriter createWriter(PartitionKey partitionKey) {

private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();
private static final Cache<TableIdentifier, Table> TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;

Expand All @@ -196,6 +207,40 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
this.maxNumWriters = maxNumWriters;
}

/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
}
}
TABLE_CACHE.put(identifier, table);
}
return table;
}

/**
* Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the
* record.
Expand All @@ -208,7 +253,16 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
destinations.computeIfAbsent(
icebergDestination,
destination -> {
Table table = catalog.loadTable(destination.getValue().getTableIdentifier());
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table;
try {
table =
TABLE_CACHE.get(
identifier, () -> getOrCreateTable(identifier, row.getSchema()));
} catch (ExecutionException e) {
throw new RuntimeException(
"Error while fetching or creating table: " + identifier, e);
}
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -290,14 +291,16 @@ public void testRead() throws Exception {
*/
@Test
public void testWrite() {
Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);

// Write with Beam
// Expect the sink to create the table
Map<String, Object> config = managedIcebergConfig(tableId);
PCollection<Row> input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

Table table = catalog.loadTable(TableIdentifier.parse(tableId));
assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));

// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
assertThat(
Expand Down Expand Up @@ -434,22 +437,23 @@ private void writeToDynamicDestinations(

Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());

PartitionSpec partitionSpec = null;
TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a");
TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b");
TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c");
TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d");
TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e");
// the sink doesn't support creating partitioned tables yet,
// so we need to create it manually for this test case
if (partitioning) {
Preconditions.checkState(filterOp == null || !filterOp.equals("only"));
partitionSpec =
PartitionSpec partitionSpec =
PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build();
catalog.createTable(tableIdentifier0, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier1, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier2, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier3, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier4, tableSchema, partitionSpec);
}
Table table0 =
catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec);
Table table1 =
catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec);
Table table2 =
catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec);
Table table3 =
catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec);
Table table4 =
catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec);

// Write with Beam
PCollection<Row> input;
Expand All @@ -467,6 +471,16 @@ private void writeToDynamicDestinations(
input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
pipeline.run().waitUntilFinish();

Table table0 = catalog.loadTable(tableIdentifier0);
Table table1 = catalog.loadTable(tableIdentifier1);
Table table2 = catalog.loadTable(tableIdentifier2);
Table table3 = catalog.loadTable(tableIdentifier3);
Table table4 = catalog.loadTable(tableIdentifier4);

for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
assertTrue(t.schema().sameSchema(tableSchema));
}

// Read back and check records are correct
List<List<Record>> returnedRecords =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public void testSimpleAppend() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand All @@ -104,6 +101,7 @@ public void testSimpleAppend() throws Exception {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table = warehouse.loadTable(tableId);
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -117,11 +115,6 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception {
final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt);
final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt);

// Create a table and add records to it.
Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA);
Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -177,6 +170,10 @@ public IcebergDestination instantiateDestination(String dest) {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table1 = warehouse.loadTable(table1Id);
Table table2 = warehouse.loadTable(table2Id);
Table table3 = warehouse.loadTable(table3Id);

List<Record> writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
List<Record> writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build());
Expand Down Expand Up @@ -320,9 +317,6 @@ public void testStreamingWrite() {
TableIdentifier.of(
"default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -365,6 +359,8 @@ public void testStreamingWrite() {
PAssert.that(snapshots).containsInAnyOrder(2L);
testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public void testBuildTransformWithRow() {
public void testSimpleAppend() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

TableIdentifier tableId = TableIdentifier.parse(identifier);

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> properties = new HashMap<>();
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
Expand Down Expand Up @@ -129,6 +124,9 @@ public void testSimpleAppend() {

testPipeline.run().waitUntilFinish();

TableIdentifier tableId = TableIdentifier.parse(identifier);
Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -137,7 +135,6 @@ public void testSimpleAppend() {
@Test
public void testWriteUsingManagedTransform() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Table table = warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA);

String yamlConfig =
String.format(
Expand All @@ -161,6 +158,7 @@ public void testWriteUsingManagedTransform() {

testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down Expand Up @@ -261,9 +259,6 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema());
Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema);
Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema);
Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema);

TestStream<Row> stream =
TestStream.create(beamSchema)
Expand Down Expand Up @@ -301,6 +296,9 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

testPipeline.run().waitUntilFinish();

Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0));
Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1));
Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2));
List<Record> table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
List<Record> table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public Table createTable(
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
}

public Table loadTable(TableIdentifier tableId) {
return catalog.loadTable(tableId);
}
}

0 comments on commit e91a3e1

Please # to comment.