Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Issue #4]: wait for the completion of the previous stage in a join. #38

Merged
merged 2 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
// perform aggregation push down.
MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey());
Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio);
IntermediateFileCleaner.Instance().registerStorage(storage);
if (config.isCleanLocalResult())
{
IntermediateFileCleaner.Instance().registerStorage(storage);
}
return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile,
pixelsFooterCache, getLambdaAggrOutput(pixelsSplit), null);
}
Expand All @@ -126,7 +129,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
// perform join push down.
MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey());
Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio);
IntermediateFileCleaner.Instance().registerStorage(storage);
if (config.isCleanLocalResult())
{
IntermediateFileCleaner.Instance().registerStorage(storage);
}
return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile,
pixelsFooterCache, getLambdaJoinOutput(pixelsSplit), null);
}
Expand All @@ -146,7 +152,10 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
}
MinIO.ConfigMinIO(config.getMinioEndpoint(), config.getMinioAccessKey(), config.getMinioSecretKey());
Storage storage = StorageFactory.Instance().getStorage(Storage.Scheme.minio);
IntermediateFileCleaner.Instance().registerStorage(storage);
if (config.isCleanLocalResult())
{
IntermediateFileCleaner.Instance().registerStorage(storage);
}
return new PixelsPageSource(pixelsSplit, pixelsColumns, storage, cacheFile, indexFile,
pixelsFooterCache, getLambdaScanOutput(pixelsSplit, columnsToRead, projection), null);
} else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.etcd.jetcd.KeyValue;
Expand Down Expand Up @@ -72,9 +73,11 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static io.pixelsdb.pixels.executor.lambda.Operator.waitForCompletion;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -197,7 +200,8 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle trans,
// Ensure multi-pipeline join is supported.
JoinOperator joinOperator = (JoinOperator) executor.getRootOperator();
logger.info("join operator: " + JSON.toJSONString(joinOperator));
joinOperator.executePrev();
CompletableFuture<?>[] prevOutputs = joinOperator.executePrev();
waitForCompletion(prevOutputs);

Thread outputCollector = new Thread(() -> {
try
Expand All @@ -206,6 +210,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle trans,
SerializerFeature[] features = new SerializerFeature[]{SerializerFeature.WriteClassName};
String json = JSON.toJSONString(outputCollection, features);
logger.info("join outputs: " + json);
logger.info("cumulated duration " + outputCollection.getCumulativeDurationMs());
} catch (Exception e)
{
logger.error(e, "failed to execute the join plan using pixels-lambda");
Expand Down Expand Up @@ -248,7 +253,8 @@ else if (tableHandle.getTableType() == TableType.AGGREGATED)
transHandle.getTransId(), root, orderedPathEnabled, compactPathEnabled);
AggregationOperator aggrOperator = (AggregationOperator) executor.getRootOperator();
logger.info("aggregation operator: " + JSON.toJSONString(aggrOperator));
aggrOperator.executePrev();
CompletableFuture<?>[] prevOutputs = aggrOperator.executePrev();
waitForCompletion(prevOutputs);

Thread outputCollector = new Thread(() -> {
try
Expand All @@ -257,6 +263,7 @@ else if (tableHandle.getTableType() == TableType.AGGREGATED)
SerializerFeature[] features = new SerializerFeature[]{SerializerFeature.WriteClassName};
String json = JSON.toJSONString(outputCollection, features);
logger.info("aggregation outputs: " + json);
logger.info("cumulated duration " + outputCollection.getCumulativeDurationMs());
} catch (Exception e)
{
logger.error(e, "failed to execute the aggregation plan using pixels-lambda");
Expand Down Expand Up @@ -459,7 +466,7 @@ private JoinedTable parseJoinPlan(PixelsTableHandle tableHandle)
{
joinEndian = JoinAdvisor.Instance().getJoinEndian(leftTable, rightTable);
joinAlgo = JoinAdvisor.Instance().getJoinAlgorithm(leftTable, rightTable, joinEndian);
} catch (MetadataException e)
} catch (MetadataException | InvalidProtocolBufferException e)
{
logger.error("failed to get join algorithm", e);
throw new TrinoException(PixelsErrorCode.PIXELS_METASTORE_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static int getBatchSize()

private String pixelsConfig = null;
private boolean lambdaEnabled = false;
private boolean cleanLocalResult = true;
private int localScanConcurrency = -1;
private String minioOutputFolder = null;
private String minioEndpointIP = null;
Expand Down Expand Up @@ -149,6 +150,13 @@ public PixelsTrinoConfig setLambdaEnabled(boolean enabled)
return this;
}

@Config("clean.local.result")
public PixelsTrinoConfig setCleanLocalResult(boolean cleanLocalResult)
{
this.cleanLocalResult = cleanLocalResult;
return this;
}

@Config("local.scan.concurrency")
public PixelsTrinoConfig setLocalScanConcurrency(int concurrency)
{
Expand Down Expand Up @@ -204,6 +212,11 @@ public int getLocalScanConcurrency()
return localScanConcurrency;
}

public boolean isCleanLocalResult()
{
return cleanLocalResult;
}

@NotNull
public String getMinioOutputFolder()
{
Expand Down
1 change: 1 addition & 0 deletions connector/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pixels.config=/home/pixels/opt/pixels/pixels.properties
# serverless config
lambda.enabled=false
local.scan.concurrency=40
clean.local.result=true
minio.output.folder=pixels-lambda/output/
minio.endpoint.port=9000
minio.access.key=lambda
Expand Down