From c25172a4c68861b062904fd67872c7a07808ce49 Mon Sep 17 00:00:00 2001 From: Haoqiong Bian Date: Sun, 17 Jul 2022 18:23:07 +0200 Subject: [PATCH 1/2] wait the prev stages to complete. --- .../java/io/pixelsdb/pixels/trino/PixelsSplitManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java index dbdf01a..6bfc2bf 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java @@ -72,9 +72,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static io.pixelsdb.pixels.executor.lambda.Operator.waitForCompletion; import static java.util.Objects.requireNonNull; /** @@ -197,7 +199,8 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle trans, // Ensure multi-pipeline join is supported. JoinOperator joinOperator = (JoinOperator) executor.getRootOperator(); logger.info("join operator: " + JSON.toJSONString(joinOperator)); - joinOperator.executePrev(); + CompletableFuture[] prevOutputs = joinOperator.executePrev(); + waitForCompletion(prevOutputs); Thread outputCollector = new Thread(() -> { try @@ -248,7 +251,8 @@ else if (tableHandle.getTableType() == TableType.AGGREGATED) transHandle.getTransId(), root, orderedPathEnabled, compactPathEnabled); AggregationOperator aggrOperator = (AggregationOperator) executor.getRootOperator(); logger.info("aggregation operator: " + JSON.toJSONString(aggrOperator)); - aggrOperator.executePrev(); + CompletableFuture[] prevOutputs = aggrOperator.executePrev(); + waitForCompletion(prevOutputs); Thread outputCollector = new Thread(() -> { try From a035275ff35b23b786e1c782f91f6f2cdb33e6b2 Mon Sep 17 00:00:00 2001 From: Haoqiong Bian Date: Thu, 21 Jul 2022 18:54:53 +0200 Subject: [PATCH 2/2] make clean local result files configurable. --- .../pixels/trino/PixelsPageSourceProvider.java | 15 ++++++++++++--- .../pixelsdb/pixels/trino/PixelsSplitManager.java | 5 ++++- .../pixels/trino/impl/PixelsTrinoConfig.java | 13 +++++++++++++ connector/src/main/resources/pixels.properties | 1 + 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java index b260803..9f5837e 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java @@ -117,7 +117,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti // perform aggregation push down. MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey()); Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio); - IntermediateFileCleaner.Instance().registerStorage(storage); + if (config.isCleanLocalResult()) + { + IntermediateFileCleaner.Instance().registerStorage(storage); + } return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile, pixelsFooterCache, getLambdaAggrOutput(pixelsSplit), null); } @@ -126,7 +129,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti // perform join push down. MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey()); Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio); - IntermediateFileCleaner.Instance().registerStorage(storage); + if (config.isCleanLocalResult()) + { + IntermediateFileCleaner.Instance().registerStorage(storage); + } return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile, pixelsFooterCache, getLambdaJoinOutput(pixelsSplit), null); } @@ -146,7 +152,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti } MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey()); Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio); - IntermediateFileCleaner.Instance().registerStorage(storage); + if (config.isCleanLocalResult()) + { + IntermediateFileCleaner.Instance().registerStorage(storage); + } return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile, pixelsFooterCache, getLambdaScanOutput(pixelsSplit, columnsToRead, projection), null); } else diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java index 6bfc2bf..3163fe3 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java @@ -23,6 +23,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.InvalidProtocolBufferException; import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.etcd.jetcd.KeyValue; @@ -209,6 +210,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle trans, SerializerFeature[] features = new SerializerFeature[]{SerializerFeature.WriteClassName}; String json = JSON.toJSONString(outputCollection, features); logger.info("join outputs: " + json); + logger.info("cumulated duration " + outputCollection.getCumulativeDurationMs()); } catch (Exception e) { logger.error(e, "failed to execute the join plan using pixels-lambda"); @@ -261,6 +263,7 @@ else if (tableHandle.getTableType() == TableType.AGGREGATED) SerializerFeature[] features = new SerializerFeature[]{SerializerFeature.WriteClassName}; String json = JSON.toJSONString(outputCollection, features); logger.info("aggregation outputs: " + json); + logger.info("cumulated duration " + outputCollection.getCumulativeDurationMs()); } catch (Exception e) { logger.error(e, "failed to execute the aggregation plan using pixels-lambda"); @@ -463,7 +466,7 @@ private JoinedTable parseJoinPlan(PixelsTableHandle tableHandle) { joinEndian = JoinAdvisor.Instance().getJoinEndian(leftTable, rightTable); joinAlgo = JoinAdvisor.Instance().getJoinAlgorithm(leftTable, rightTable, joinEndian); - } catch (MetadataException e) + } catch (MetadataException | InvalidProtocolBufferException e) { logger.error("failed to get join algorithm", e); throw new TrinoException(PixelsErrorCode.PIXELS_METASTORE_ERROR, e); diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java b/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java index 8c0753a..091a6f1 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java @@ -51,6 +51,7 @@ public static int getBatchSize() private String pixelsConfig = null; private boolean lambdaEnabled = false; + private boolean cleanLocalResult = true; private int localScanConcurrency = -1; private String minioOutputFolder = null; private String minioEndpointIP = null; @@ -149,6 +150,13 @@ public PixelsTrinoConfig setLambdaEnabled(boolean enabled) return this; } + @Config("clean.local.result") + public PixelsTrinoConfig setCleanLocalResult(boolean cleanLocalResult) + { + this.cleanLocalResult = cleanLocalResult; + return this; + } + @Config("local.scan.concurrency") public PixelsTrinoConfig setLocalScanConcurrency(int concurrency) { @@ -204,6 +212,11 @@ public int getLocalScanConcurrency() return localScanConcurrency; } + public boolean isCleanLocalResult() + { + return cleanLocalResult; + } + @NotNull public String getMinioOutputFolder() { diff --git a/connector/src/main/resources/pixels.properties b/connector/src/main/resources/pixels.properties index c8d90b0..0435178 100755 --- a/connector/src/main/resources/pixels.properties +++ b/connector/src/main/resources/pixels.properties @@ -10,6 +10,7 @@ pixels.config=/home/pixels/opt/pixels/pixels.properties # serverless config lambda.enabled=false local.scan.concurrency=40 +clean.local.result=true minio.output.folder=pixels-lambda/output/ minio.endpoint.port=9000 minio.access.key=lambda