diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsConnector.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsConnector.java index d308995..43d1dee 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsConnector.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsConnector.java @@ -25,7 +25,9 @@ import io.pixelsdb.pixels.common.transaction.QueryTransInfo; import io.pixelsdb.pixels.common.transaction.TransContext; import io.pixelsdb.pixels.common.transaction.TransService; +import io.pixelsdb.pixels.optimizer.queue.QueryQueues; import io.pixelsdb.pixels.trino.exception.PixelsErrorCode; +import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy; import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig; import io.pixelsdb.pixels.trino.properties.PixelsSessionProperties; import io.pixelsdb.pixels.trino.properties.PixelsTableProperties; @@ -36,13 +38,17 @@ import javax.inject.Inject; import java.util.List; +import java.util.concurrent.TimeUnit; + import static java.util.Objects.requireNonNull; -public class PixelsConnector implements Connector { +public class PixelsConnector implements Connector +{ private static final Logger logger = Logger.get(PixelsConnector.class); + private final PixelsConnectorId connectorId; private final LifeCycleManager lifeCycleManager; - private final PixelsMetadata metadata; + private final PixelsMetadataProxy metadataProxy; private final PixelsSplitManager splitManager; private final boolean recordCursorEnabled; private final PixelsPageSourceProvider pageSourceProvider; @@ -54,16 +60,19 @@ public class PixelsConnector implements Connector { @Inject public PixelsConnector( + PixelsConnectorId connectorId, LifeCycleManager lifeCycleManager, - PixelsMetadata metadata, + PixelsMetadataProxy metadataProxy, PixelsSplitManager splitManager, PixelsTrinoConfig config, PixelsPageSourceProvider pageSourceProvider, PixelsRecordSetProvider recordSetProvider, PixelsSessionProperties sessionProperties, - PixelsTableProperties tableProperties) { + PixelsTableProperties tableProperties) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); - this.metadata = requireNonNull(metadata, "metadata is null"); + this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null"); this.splitManager = requireNonNull(splitManager, "splitManager is null"); this.pageSourceProvider = requireNonNull(pageSourceProvider, "recordSetProvider is null"); this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); @@ -93,7 +102,29 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_SERVICE_ERROR, e); } TransContext.Instance().beginQuery(info); - return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp()); + QueryQueues.ExecutorType executorType; + if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.AUTO) + { + while ((executorType = QueryQueues.Instance().Enqueue(info.getQueryId())) == QueryQueues.ExecutorType.None) + { + try + { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) + { + logger.error("Interrupted while waiting for enqueue the query id."); + } + } + } + else if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.ON) + { + executorType = QueryQueues.ExecutorType.Lambda; + } + else + { + executorType = QueryQueues.ExecutorType.Cluster; + } + return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp(), executorType); } @Override @@ -102,6 +133,7 @@ public void commit(ConnectorTransactionHandle transactionHandle) if (transactionHandle instanceof PixelsTransactionHandle) { PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle; + QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType()); TransContext.Instance().commitQuery(handle.getTransId()); cleanIntermediatePathForQuery(handle.getTransId()); } else @@ -117,8 +149,12 @@ public void rollback(ConnectorTransactionHandle transactionHandle) if (transactionHandle instanceof PixelsTransactionHandle) { PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle; + QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType()); TransContext.Instance().rollbackQuery(handle.getTransId()); - cleanIntermediatePathForQuery(handle.getTransId()); + if (handle.getExecutorType() == QueryQueues.ExecutorType.Lambda) + { + cleanIntermediatePathForQuery(handle.getTransId()); + } } else { throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_HANDLE_TYPE_ERROR, @@ -128,26 +164,23 @@ public void rollback(ConnectorTransactionHandle transactionHandle) private void cleanIntermediatePathForQuery(long queryId) { - if (config.isLambdaEnabled()) + try { - try + if (config.isCleanLocalResult()) { - if (config.isCleanLocalResult()) - { - IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId)); - } - } catch (InterruptedException e) - { - throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR, - "Failed to clean intermediate path for the query"); + IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId)); } + } catch (InterruptedException e) + { + throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR, + "Failed to clean intermediate path for the query"); } } @Override - public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle) + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transHandle) { - return metadata; + return new PixelsMetadata(connectorId, metadataProxy, config, (PixelsTransactionHandle) transHandle); } @Override diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java index bd99af3..ebca670 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java @@ -33,6 +33,7 @@ import io.pixelsdb.pixels.core.stats.StatsRecorder; import io.pixelsdb.pixels.executor.aggregation.FunctionType; import io.pixelsdb.pixels.optimizer.plan.Table; +import io.pixelsdb.pixels.optimizer.queue.QueryQueues; import io.pixelsdb.pixels.trino.exception.PixelsErrorCode; import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy; import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig; @@ -47,7 +48,6 @@ import io.trino.spi.statistics.Estimate; import io.trino.spi.statistics.TableStatistics; -import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.*; @@ -71,13 +71,15 @@ public class PixelsMetadata implements ConnectorMetadata private final String connectorId; private final PixelsMetadataProxy metadataProxy; private final PixelsTrinoConfig config; + private final PixelsTransactionHandle transHandle; - @Inject - public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy, PixelsTrinoConfig config) + public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy, + PixelsTrinoConfig config, PixelsTransactionHandle transHandle) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null"); this.config = requireNonNull(config, "config is null"); + this.transHandle = requireNonNull(transHandle, "transHandle is null"); } @Override @@ -541,7 +543,7 @@ public Optional> applyAggrega ConnectorSession session, ConnectorTableHandle handle, List aggregates, Map assignments, List> groupingSets) { - if (!config.isLambdaEnabled()) + if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda) { return Optional.empty(); } @@ -664,7 +666,7 @@ public Optional> applyJoin( Map rightAssignments, JoinStatistics statistics) { - if (!config.isLambdaEnabled()) + if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda) { return Optional.empty(); } diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsModule.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsModule.java index 8f4e580..99db648 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsModule.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsModule.java @@ -47,7 +47,6 @@ public void configure(Binder binder) { binder.bind(PixelsConnector.class).in(Scopes.SINGLETON); binder.bind(PixelsConnectorId.class).toInstance(new PixelsConnectorId(connectorId)); binder.bind(PixelsTypeParser.class).in(Scopes.SINGLETON); - binder.bind(PixelsMetadata.class).in(Scopes.SINGLETON); binder.bind(PixelsMetadataProxy.class).in(Scopes.SINGLETON); binder.bind(PixelsSplitManager.class).in(Scopes.SINGLETON); binder.bind(PixelsPageSourceProvider.class).in(Scopes.SINGLETON); diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java index 7b2f738..3bcc8ad 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsPageSourceProvider.java @@ -40,6 +40,7 @@ import io.pixelsdb.pixels.executor.lambda.output.ScanOutput; import io.pixelsdb.pixels.executor.predicate.TableScanFilter; import io.pixelsdb.pixels.optimizer.plan.Table; +import io.pixelsdb.pixels.optimizer.queue.QueryQueues; import io.pixelsdb.pixels.trino.exception.PixelsErrorCode; import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig; import io.trino.spi.TrinoException; @@ -142,7 +143,9 @@ else if (config.getOutputScheme() == Storage.Scheme.redis) { // perform scan push down. List withFilterColumns = getIncludeColumns(pixelsColumns, tableHandle); - if (config.isLambdaEnabled() && this.localSplitCounter.get() >= config.getLocalScanConcurrency()) + PixelsTransactionHandle transHandle = (PixelsTransactionHandle) transactionHandle; + if (transHandle.getExecutorType() == QueryQueues.ExecutorType.Lambda && + this.localSplitCounter.get() >= config.getLocalScanConcurrency()) { String[] columnsToRead = new String[withFilterColumns.size()]; boolean[] projection = new boolean[withFilterColumns.size()]; diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsRecordSetProvider.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsRecordSetProvider.java index 2d021eb..26ac005 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsRecordSetProvider.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsRecordSetProvider.java @@ -24,6 +24,7 @@ import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.core.PixelsFooterCache; +import io.pixelsdb.pixels.optimizer.queue.QueryQueues; import io.pixelsdb.pixels.trino.exception.PixelsErrorCode; import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig; import io.trino.spi.TrinoException; @@ -75,7 +76,7 @@ public PixelsRecordSetProvider(PixelsConnectorId connectorId, PixelsTrinoConfig public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) { - if (config.isLambdaEnabled()) + if (((PixelsTransactionHandle) transaction).getExecutorType() == QueryQueues.ExecutorType.Lambda) { throw new TrinoException(PixelsErrorCode.PIXELS_CONFIG_ERROR, "PixelsRecordSet does not support lambda coprocessor."); diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTransactionHandle.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTransactionHandle.java index 854f8f0..5c4c3b5 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTransactionHandle.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTransactionHandle.java @@ -21,32 +21,41 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.pixelsdb.pixels.optimizer.queue.QueryQueues; import io.trino.spi.connector.ConnectorTransactionHandle; public class PixelsTransactionHandle implements ConnectorTransactionHandle { - public static final PixelsTransactionHandle Default = new PixelsTransactionHandle(-1, -1); + public static final PixelsTransactionHandle Default = new PixelsTransactionHandle( + -1, -1, QueryQueues.ExecutorType.None); /** - * transId is also the query id, as each query is a single-statement read-only transaction. + * transId is also the query id in Pixels, as each query is a single-statement read-only transaction. */ private final long transId; /** * The timestamp that is used to get a read snapshot of the query. */ private final long timestamp; + /** + * The type of executor to execute this query. + */ + private final QueryQueues.ExecutorType executorType; /** * Create a transaction handle. - * @param transId is also the queryId as a query is a single-statement read-only transaction. + * @param transId is also the query id in Pixels, as a query is a single-statement read-only transaction. * @param timestamp the timestamp of a transaction. + * @param executorType the type of executor to execute this query. */ @JsonCreator public PixelsTransactionHandle(@JsonProperty("transId") long transId, - @JsonProperty("timestamp") long timestamp) + @JsonProperty("timestamp") long timestamp, + @JsonProperty("executorType")QueryQueues.ExecutorType executorType) { this.transId = transId; this.timestamp = timestamp; + this.executorType = executorType; } @JsonProperty @@ -60,4 +69,10 @@ public long getTimestamp() { return this.timestamp; } + + @JsonProperty + public QueryQueues.ExecutorType getExecutorType() + { + return this.executorType; + } } diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java b/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java index f420081..2243b36 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/impl/PixelsTrinoConfig.java @@ -47,8 +47,10 @@ public static int getBatchSize() return BatchSize; } + public enum LambdaSwitch { ON, OFF, AUTO } + private String pixelsConfig = null; - private boolean lambdaEnabled = false; + private LambdaSwitch lambdaSwitch = LambdaSwitch.AUTO; private boolean cleanLocalResult = true; private int localScanConcurrency = -1; private Storage.Scheme outputScheme = null; @@ -124,10 +126,10 @@ public PixelsTrinoConfig setPixelsConfig (String pixelsConfig) return this; } - @Config("lambda.enabled") - public PixelsTrinoConfig setLambdaEnabled(boolean enabled) + @Config("lambda.switch") + public PixelsTrinoConfig setLambdaSwitch(String lambdaSwitch) { - this.lambdaEnabled = enabled; + this.lambdaSwitch = LambdaSwitch.valueOf(lambdaSwitch.toUpperCase()); return this; } @@ -190,9 +192,10 @@ public String getPixelsConfig () return this.pixelsConfig; } - public boolean isLambdaEnabled() + @NotNull + public LambdaSwitch getLambdaSwitch() { - return lambdaEnabled; + return lambdaSwitch; } public int getLocalScanConcurrency() diff --git a/connector/src/main/resources/pixels.properties b/connector/src/main/resources/pixels.properties index c35da30..007b0af 100755 --- a/connector/src/main/resources/pixels.properties +++ b/connector/src/main/resources/pixels.properties @@ -8,7 +8,8 @@ connector.name=pixels pixels.config=/home/pixels/opt/pixels/pixels.properties # serverless config -lambda.enabled=false +# lambda.switch can be on, off, auto +lambda.switch=auto local.scan.concurrency=40 clean.local.result=true output.scheme=output-scheme-dummy