From 7d3fdf3d46f29651947bf5ea4f441b5e1c3d3cda Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 18 Feb 2025 20:18:03 +0100 Subject: [PATCH] add lakekeeper integration test --- .../plugin/iceberg/IcebergQueryRunner.java | 36 ++ ...rgLakekeeperCatalogConnectorSmokeTest.java | 416 ++++++++++++++++++ .../rest/TestingLakekeeperCatalog.java | 236 ++++++++++ 3 files changed, 688 insertions(+) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergLakekeeperCatalogConnectorSmokeTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingLakekeeperCatalog.java diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 8e4be916cc5f..92ebad9ed072 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -24,6 +24,7 @@ import io.trino.plugin.hive.containers.Hive3MinioDataLake; import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer; +import io.trino.plugin.iceberg.catalog.rest.TestingLakekeeperCatalog; import io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog; import io.trino.plugin.iceberg.containers.NessieContainer; import io.trino.plugin.iceberg.containers.UnityCatalogContainer; @@ -226,6 +227,41 @@ public static void main(String[] args) } } + public static final class IcebergLakekeeperQueryRunnerMain + { + private IcebergLakekeeperQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + TestingLakekeeperCatalog lakekeeperCatalog = new TestingLakekeeperCatalog(); + Logger log = Logger.get(IcebergLakekeeperQueryRunnerMain.class); + try { + @SuppressWarnings("resource") + QueryRunner queryRunner = icebergQueryRunnerMainBuilder() + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", lakekeeperCatalog.restUri() + "/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", TestingLakekeeperCatalog.DEFAULT_WAREHOUSE) + .addIcebergProperty("iceberg.rest-catalog.vended-credentials-enabled", "true") + .addIcebergProperty("s3.endpoint", lakekeeperCatalog.externalMinioAddress()) + .addIcebergProperty("fs.native-s3.enabled", "true") + .addIcebergProperty("s3.region", "dummy") + .addIcebergProperty("s3.path-style-access", "true") + .setInitialTables(TpchTable.getTables()) + .build(); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + catch (Exception e) { + log.error("Error starting server: \n%s", lakekeeperCatalog.getLogs()); + throw e; + } + finally { + lakekeeperCatalog.close(); + } + } + } + public static final class IcebergPolarisQueryRunnerMain { private IcebergPolarisQueryRunnerMain() {} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergLakekeeperCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergLakekeeperCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..265ae912cb4c --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergLakekeeperCatalogConnectorSmokeTest.java @@ -0,0 +1,416 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergConnector; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.FileIO; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Isolated; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@Isolated // TODO remove +@TestInstance(PER_CLASS) +final class TestIcebergLakekeeperCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private TestingLakekeeperCatalog lakekeeperCatalog; + + public TestIcebergLakekeeperCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + lakekeeperCatalog = closeAfterClass(new TestingLakekeeperCatalog("125ms")); + + return IcebergQueryRunner.builder() + .addIcebergProperty("iceberg.file-format", format.name()) + .addIcebergProperty("iceberg.register-table-procedure.enabled", "true") + .addIcebergProperty("iceberg.rest-catalog.vended-credentials-enabled", "false") + .addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB") + .addIcebergProperty("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max") + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.nested-namespace-enabled", "true") + .addIcebergProperty("iceberg.rest-catalog.uri", lakekeeperCatalog.restUri() + "/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", TestingLakekeeperCatalog.DEFAULT_WAREHOUSE) + .addIcebergProperty("s3.endpoint", lakekeeperCatalog.externalMinioAddress()) + .addIcebergProperty("fs.native-s3.enabled", "true") + .addIcebergProperty("s3.region", "dummy") + .addIcebergProperty("s3.path-style-access", "true") + // we need to hard-code this here since vended-credentials-enabled cannot be true while register-table-procedure is true + // and we need register-table-procedure to be true since the tests. + .addIcebergProperty("s3.aws-access-key", MINIO_ACCESS_KEY) + .addIcebergProperty("s3.aws-secret-key", MINIO_SECRET_KEY) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + lakekeeperCatalog.dropWithoutPurge(getSession().getSchema().orElseThrow(), tableName); + } + + @Override + protected String getMetadataLocation(String tableName) + { + TrinoCatalogFactory catalogFactory = ((IcebergConnector) getQueryRunner().getCoordinator().getConnector("iceberg")).getInjector().getInstance(TrinoCatalogFactory.class); + TrinoCatalog trinoCatalog = catalogFactory.create(getSession().getIdentity().toConnectorIdentity()); + BaseTable table = trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName)); + return table.operations().current().metadataFileLocation(); + } + + @Override + @Test + @Disabled("Disabled since unregister -> register is currently unsupported by lakekeeper due to defaulting to purgeRequested = true.") + public void testUnregisterTable() + { + } + + @Override + @Test + @Disabled("Disabled since unregister -> register is currently unsupported by lakekeeper due to defaulting to purgeRequested = true.") + public void testRepeatUnregisterTable() + { + } + + @Override + protected String schemaPath() + { + return format("s3://%s/%s", TestingLakekeeperCatalog.DEFAULT_BUCKET, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + try { + // we're using s3 which doesn't have the notation of a directory, + // so we just check if there are any files + return fileSystem.listFiles(Location.of(location)).hasNext(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + try { + fileSystem.deleteDirectory(Location.of(location)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + void testNestedNamespace() + { + String parentNamespace = "level1_" + randomNameSuffix(); + String nestedNamespace = parentNamespace + ".level2_" + randomNameSuffix(); + + assertUpdate("CREATE SCHEMA " + parentNamespace); + assertUpdate("CREATE SCHEMA \"" + nestedNamespace + "\""); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()) + .contains(parentNamespace, nestedNamespace); + + assertUpdate("DROP SCHEMA \"" + nestedNamespace + "\""); + assertUpdate("DROP SCHEMA " + parentNamespace); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testCreateTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testCreateTableWithTrailingSpaceInLocation).hasStackTraceContaining("Trailing whitespaces are forbidden"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testCreateTableWithTrailingSpaceInLocation).hasStackTraceContaining("Trailing whitespaces are forbidden"); + } + + @Test + @Override + public void testShowCreateTable() + { + String schemaName = getSession().getSchema().orElseThrow(); + Assertions.assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .matches("CREATE TABLE iceberg." + schemaName + ".region \\(\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + "\\)\n" + + "WITH \\(\n" + + " format = '" + format.name() + "',\n" + + " format_version = 2,\n" + + // lakekeeper uses namespaceId instead of schemaName in the location + format(" location = 's3://" + TestingLakekeeperCatalog.DEFAULT_BUCKET + "/.*/region.*',\n" + + " max_commit_retry = 4\n") + + "\\)"); + } + + // Lakekeeper does async drops, so we need to wait for the drop to complete + @Override + @Test + public void testDropTableWithMissingMetadataFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_metadata_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + Location metadataLocation = Location.of(getMetadataLocation(tableName)); + Location tableLocation = Location.of(getTableLocation(tableName)); + + // Delete current metadata file + fileSystem.deleteFile(metadataLocation); + Assertions.assertThat(fileSystem.newInputFile(metadataLocation).exists()) + .describedAs("Current metadata file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + Assertions.assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + + // gotta wait for the task queue to run the cleanup job + try { + Thread.sleep(1100); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs(String.format("Table location: %s should not exist", tableLocation)) + .isFalse(); + } + + // Lakekeeper does async drops, so we need to wait for the drop to complete + @Override + @Test + public void testDropTableWithMissingManifestListFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_manifest_list_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + FileIO fileIo = new ForwardingFileIo(fileSystem); + TableMetadata tableMetadata = TableMetadataParser.read(fileIo, metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location manifestListFile = Location.of(tableMetadata.currentSnapshot().allManifests(fileIo).getFirst().path()); + + // Delete Manifest List file + fileSystem.deleteFile(manifestListFile); + Assertions.assertThat(fileSystem.newInputFile(manifestListFile).exists()) + .describedAs("Manifest list file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + Assertions.assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + + // gotta wait for the task queue to run the cleanup job + try { + Thread.sleep(1100); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs(String.format("Table location: %s should not exist", tableLocation)) + .isFalse(); + } + + // Lakekeeper does async drops, so we need to wait for the drop to complete + @Override + @Test + public void testDropTableWithMissingDataFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_data_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'POLAND')", 1); + + Location tableLocation = Location.of(getTableLocation(tableName)); + Location tableDataPath = tableLocation.appendPath("data"); + FileIterator fileIterator = fileSystem.listFiles(tableDataPath); + Assertions.assertThat(fileIterator.hasNext()).isTrue(); + Location dataFile = fileIterator.next().location(); + + // Delete data file + fileSystem.deleteFile(dataFile); + Assertions.assertThat(fileSystem.newInputFile(dataFile).exists()) + .describedAs("Data file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + Assertions.assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + + // gotta wait for the task queue to run the cleanup job + try { + Thread.sleep(1100); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs(String.format("Table location: %s should not exist", tableLocation)) + .isFalse(); + } + + // Lakekeeper does async drops, so we need to wait for the drop to complete + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_snapshot_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + TableMetadata tableMetadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location currentSnapshotFile = Location.of(tableMetadata.currentSnapshot().manifestListLocation()); + + // Delete current snapshot file + fileSystem.deleteFile(currentSnapshotFile); + Assertions.assertThat(fileSystem.newInputFile(currentSnapshotFile).exists()) + .describedAs("Current snapshot file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + Assertions.assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + try { + Thread.sleep(1100); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs(String.format("Table location: %s should not exist", tableLocation)) + .isFalse(); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + String tableName = "test_register_table_with_dropped_table_" + randomNameSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Drop table to verify register_table call fails when no metadata can be found (table doesn't exist) + assertUpdate(format("DROP TABLE %s", tableName)); + + try { + Thread.sleep(1100); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertQueryFails(format("CALL system.register_table (CURRENT_SCHEMA, '%s', '%s')", tableNameNew, tableLocation), + ".*No versioned metadata file exists at location.*"); + } + + @Test + @Override + @Disabled("Disabled due to https://github.com/trinodb/trino/issues/23941") + public void testDeleteRowsConcurrently() + { + } + + @Test + @Override + public void testUnregisterTableNotExistingTable() + { + assertThatThrownBy(super::testUnregisterTableNotExistingTable).hasStackTraceContaining("Forbidden: Table action can_drop forbidden for Anonymous"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingLakekeeperCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingLakekeeperCatalog.java new file mode 100644 index 000000000000..ef2190d66f60 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingLakekeeperCatalog.java @@ -0,0 +1,236 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.Request; +import io.airlift.http.client.StatusResponseHandler; +import io.airlift.http.client.jetty.JettyHttpClient; +import io.airlift.json.JsonCodec; +import org.intellij.lang.annotations.Language; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.Network.NetworkImpl; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.Closeable; +import java.net.URI; +import java.util.Map; + +import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; +import static io.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; +import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public final class TestingLakekeeperCatalog + implements Closeable +{ + public static final String DEFAULT_BUCKET = "examples"; + public static final String DEFAULT_WAREHOUSE = "lakekeeper"; + public static final String DEFAULT_PROJECT_ID = "00000000-0000-0000-0000-000000000000"; + + public static final String LAKEKEEPER_DB_URL_WRITE_KEY = "LAKEKEEPER__PG_DATABASE_URL_WRITE"; + private static final String LAKEKEEPER_DB_URL_READ_KEY = "LAKEKEEPER__PG_DATABASE_URL_READ"; + public static final String LAKEKEEPER_PG_ENCRYPTION_KEY = "LAKEKEEPER__PG_ENCRYPTION_KEY"; + public static final String PG_ENCRYPTION_KEY = "This-is-NOT-Secure!"; + public static final String POSTGRES_IMAGE = "postgres:16"; + + public static final String LAKEKEEPER_CATALOG_IMAGE = "quay.io/lakekeeper/catalog:v0.7.0"; + + private static final int MINIO_PORT = 9000; + private static final String MINIO_ALIAS = "minio-1"; + private static final int LAKEKEEPER_PORT = 8181; + private static final HttpClient HTTP_CLIENT = new JettyHttpClient(); + + private final String prefix; + private final GenericContainer lakekeeperCatalog; + private final GenericContainer migrator; + private final PostgreSQLContainer postgresqlContainer; + private final GenericContainer minio; + private final Network network; + + public TestingLakekeeperCatalog() + { + this("1s"); + } + + public TestingLakekeeperCatalog(String taskQueuePollInterval) + { + network = NetworkImpl.SHARED; + + postgresqlContainer = new PostgreSQLContainer<>(POSTGRES_IMAGE); + postgresqlContainer.withNetwork(network).start(); + + String pgConnectionString = String.format("postgresql://%s:%s@%s:%d/%s", + postgresqlContainer.getUsername(), + postgresqlContainer.getPassword(), + postgresqlContainer.getContainerName().substring(1), + PostgreSQLContainer.POSTGRESQL_PORT, + postgresqlContainer.getDatabaseName()); + + migrator = new GenericContainer<>(LAKEKEEPER_CATALOG_IMAGE); + migrator.withEnv(LAKEKEEPER_PG_ENCRYPTION_KEY, PG_ENCRYPTION_KEY) + .withEnv(LAKEKEEPER_DB_URL_READ_KEY, pgConnectionString) + .withEnv(LAKEKEEPER_DB_URL_WRITE_KEY, pgConnectionString).withNetwork(postgresqlContainer.getNetwork()) + .withCommand("migrate").start(); + + minio = new GenericContainer<>("bitnami/minio:2025.2.18"); + minio.withEnv(Map.of("MINIO_ROOT_USER", MINIO_ACCESS_KEY, + "MINIO_ROOT_PASSWORD", MINIO_SECRET_KEY, + "MINIO_API_PORT_NUMBER", "%d".formatted(MINIO_PORT), + "MINIO_CONSOLE_PORT_NUMBER", "9001", + "MINIO_SCHEME", "http", + "MINIO_DEFAULT_BUCKETS", DEFAULT_BUCKET)) + .withNetwork(postgresqlContainer.getNetwork()) + .withNetworkAliases(MINIO_ALIAS) + .waitingFor(Wait.forSuccessfulCommand("mc ls local | grep examples")) + .withExposedPorts(MINIO_PORT, 9001).start(); + + lakekeeperCatalog = new GenericContainer<>(LAKEKEEPER_CATALOG_IMAGE); + lakekeeperCatalog.withEnv(Map.of(LAKEKEEPER_PG_ENCRYPTION_KEY, PG_ENCRYPTION_KEY, + LAKEKEEPER_DB_URL_READ_KEY, pgConnectionString, + LAKEKEEPER_DB_URL_WRITE_KEY, pgConnectionString, + "LAKEKEEPER__QUEUE_CONFIG__POLL_INTERVAL", String.format("\"%s\"", taskQueuePollInterval))) + .withNetwork(postgresqlContainer.getNetwork()) + .withCommand("serve").withExposedPorts(LAKEKEEPER_PORT) + .withExposedPorts(8181); + try { + lakekeeperCatalog.start(); + bootstrapServer(); + createCatalog(); + prefix = fetchPrefix(); + } + catch (Exception e) { + System.err.println(lakekeeperCatalog.getLogs()); + close(); + throw e; + } + } + + public void dropWithoutPurge(String schema, String table) + { + Request request = Request.Builder.prepareDelete() + .setUri(URI.create(restUri() + "/catalog/v1/" + prefix + "/namespaces/" + schema + "/tables/" + table + "?purgeRequested=false")) + .setHeader("Content-Type", "application/json") + .build(); + StatusResponseHandler.StatusResponse sr = HTTP_CLIENT.execute(request, createStatusResponseHandler()); + if (sr.getStatusCode() != 204) { + throw new RuntimeException("Failed to drop table. Status: " + sr.getStatusCode()); + } + } + + public String getLogs() + { + return lakekeeperCatalog.getLogs(); + } + + public String restUri() + { + return "http://%s:%s".formatted(lakekeeperCatalog.getHost(), lakekeeperCatalog.getMappedPort(LAKEKEEPER_PORT)); + } + + @Override + public void close() + { + migrator.close(); + lakekeeperCatalog.close(); + postgresqlContainer.close(); + minio.close(); + network.close(); + } + + public String externalMinioAddress() + { + return "http://localhost:%d".formatted(minioPort()); + } + + private int minioPort() + { + return this.minio.getMappedPort(MINIO_PORT); + } + + private String fetchPrefix() + { + Request request = Request.Builder.prepareGet().setUri(URI.create(restUri() + "/catalog/v1/config?warehouse=" + DEFAULT_WAREHOUSE)).build(); + Map resp = HTTP_CLIENT.execute(request, createJsonResponseHandler(JsonCodec.mapJsonCodec(String.class, Object.class))); + @SuppressWarnings("unchecked") + Map overrides = ((Map) resp.get("overrides")); + return requireNonNull(overrides.get("prefix")); + } + + private void bootstrapServer() + { + @Language("JSON") + String body = "{\"accept-terms-of-use\": true}"; + Request request = Request.Builder.preparePost() + .setUri(URI.create(restUri() + "/management/v1/bootstrap")) + .setHeader("Content-Type", "application/json") + .setBodyGenerator(createStaticBodyGenerator(body, UTF_8)) + .build(); + HTTP_CLIENT.execute(request, createStatusResponseHandler()); + } + + private void createCatalog() + { + String body = JsonCodec.jsonCodec(Map.class).toJson(createWarehousePayload()); + + Request request = Request.Builder.preparePost() + .setUri(URI.create(restUri() + "/management/v1/warehouse")) + .setHeader("Content-Type", "application/json") + .setBodyGenerator(createStaticBodyGenerator(body, UTF_8)) + .build(); + + StatusResponseHandler.StatusResponse response = HTTP_CLIENT.execute( + request, + createStatusResponseHandler()); + + if (response.getStatusCode() != 201) { + throw new RuntimeException("Failed to create catalog. Status: " + response.getStatusCode()); + } + } + + private static Map createWarehousePayload() + { + return Map.of( + "warehouse-name", DEFAULT_WAREHOUSE, + "project-id", DEFAULT_PROJECT_ID, + "storage-profile", storageProfile(), + "storage-credential", storageCredential()); + } + + private static Map storageProfile() + { + return Map.of( + "type", "s3", + "bucket", DEFAULT_BUCKET, + "endpoint", "http://%s:%d".formatted(MINIO_ALIAS, MINIO_PORT), + "region", "local-01", + "path-style-access", true, + "flavor", "s3-compat", + "sts-enabled", true); + } + + private static Map storageCredential() + { + return Map.of( + "type", "s3", + "credential-type", "access-key", + "aws-access-key-id", MINIO_ACCESS_KEY, + "aws-secret-access-key", MINIO_SECRET_KEY); + } +}