Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Managed Iceberg] Create tables if needed #32686

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 2
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -106,6 +107,14 @@
* <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below,
* for Iceberg writes.</b>
*
* <h3>Creating Tables</h3>
*
* <p>If an Iceberg table does not exist at the time of writing, this connector will automatically
* create one with the data's schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog} implementation.
* Some implementations may not support creating a table using the Iceberg API.
*
* <h3>Beam Rows</h3>
*
* <p>Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
Expand All @@ -41,7 +43,13 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
Expand All @@ -66,6 +74,7 @@
* #getSerializableDataFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class);
/**
* Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per
* partition and manages them in a {@link Cache}.
Expand All @@ -79,8 +88,8 @@ class DestinationState {
private final PartitionKey partitionKey;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Cache<PartitionKey, RecordWriter> writers;
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand Down Expand Up @@ -186,6 +195,8 @@ private RecordWriter createWriter(PartitionKey partitionKey) {

private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();
private static final Cache<TableIdentifier, Table> TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;

Expand All @@ -196,6 +207,40 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
this.maxNumWriters = maxNumWriters;
}

/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
}
}
TABLE_CACHE.put(identifier, table);
}
return table;
}

/**
* Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the
* record.
Expand All @@ -208,7 +253,16 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
destinations.computeIfAbsent(
icebergDestination,
destination -> {
Table table = catalog.loadTable(destination.getValue().getTableIdentifier());
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table;
try {
table =
TABLE_CACHE.get(
identifier, () -> getOrCreateTable(identifier, row.getSchema()));
} catch (ExecutionException e) {
throw new RuntimeException(
"Error while fetching or creating table: " + identifier, e);
}
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -290,14 +291,16 @@ public void testRead() throws Exception {
*/
@Test
public void testWrite() {
Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);

// Write with Beam
// Expect the sink to create the table
Map<String, Object> config = managedIcebergConfig(tableId);
PCollection<Row> input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

Table table = catalog.loadTable(TableIdentifier.parse(tableId));
assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));

// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
assertThat(
Expand Down Expand Up @@ -434,22 +437,23 @@ private void writeToDynamicDestinations(

Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());

PartitionSpec partitionSpec = null;
TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a");
TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b");
TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c");
TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d");
TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e");
// the sink doesn't support creating partitioned tables yet,
// so we need to create it manually for this test case
if (partitioning) {
Preconditions.checkState(filterOp == null || !filterOp.equals("only"));
partitionSpec =
PartitionSpec partitionSpec =
PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build();
catalog.createTable(tableIdentifier0, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier1, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier2, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier3, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier4, tableSchema, partitionSpec);
}
Table table0 =
catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec);
Table table1 =
catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec);
Table table2 =
catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec);
Table table3 =
catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec);
Table table4 =
catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec);

// Write with Beam
PCollection<Row> input;
Expand All @@ -467,6 +471,16 @@ private void writeToDynamicDestinations(
input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
pipeline.run().waitUntilFinish();

Table table0 = catalog.loadTable(tableIdentifier0);
Table table1 = catalog.loadTable(tableIdentifier1);
Table table2 = catalog.loadTable(tableIdentifier2);
Table table3 = catalog.loadTable(tableIdentifier3);
Table table4 = catalog.loadTable(tableIdentifier4);

for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
assertTrue(t.schema().sameSchema(tableSchema));
}

// Read back and check records are correct
List<List<Record>> returnedRecords =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public void testSimpleAppend() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand All @@ -104,6 +101,7 @@ public void testSimpleAppend() throws Exception {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table = warehouse.loadTable(tableId);
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -117,11 +115,6 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception {
final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt);
final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt);

// Create a table and add records to it.
Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA);
Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -177,6 +170,10 @@ public IcebergDestination instantiateDestination(String dest) {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table1 = warehouse.loadTable(table1Id);
Table table2 = warehouse.loadTable(table2Id);
Table table3 = warehouse.loadTable(table3Id);

List<Record> writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
List<Record> writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build());
Expand Down Expand Up @@ -320,9 +317,6 @@ public void testStreamingWrite() {
TableIdentifier.of(
"default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -365,6 +359,8 @@ public void testStreamingWrite() {
PAssert.that(snapshots).containsInAnyOrder(2L);
testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public void testBuildTransformWithRow() {
public void testSimpleAppend() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

TableIdentifier tableId = TableIdentifier.parse(identifier);

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> properties = new HashMap<>();
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
Expand Down Expand Up @@ -129,6 +124,9 @@ public void testSimpleAppend() {

testPipeline.run().waitUntilFinish();

TableIdentifier tableId = TableIdentifier.parse(identifier);
Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -137,7 +135,6 @@ public void testSimpleAppend() {
@Test
public void testWriteUsingManagedTransform() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Table table = warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA);

String yamlConfig =
String.format(
Expand All @@ -161,6 +158,7 @@ public void testWriteUsingManagedTransform() {

testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down Expand Up @@ -261,9 +259,6 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema());
Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema);
Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema);
Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema);

TestStream<Row> stream =
TestStream.create(beamSchema)
Expand Down Expand Up @@ -301,6 +296,9 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

testPipeline.run().waitUntilFinish();

Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0));
Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1));
Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2));
List<Record> table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
List<Record> table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public Table createTable(
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
}

public Table loadTable(TableIdentifier tableId) {
return catalog.loadTable(tableId);
}
}
Loading