Skip to content

Commit

Permalink
[Issue #4]: refine query queues. (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
bianhq authored Oct 31, 2022
1 parent 882d652 commit 9c25c11
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.pixelsdb.pixels.common.transaction.QueryTransInfo;
import io.pixelsdb.pixels.common.transaction.TransContext;
import io.pixelsdb.pixels.common.transaction.TransService;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.pixelsdb.pixels.trino.properties.PixelsSessionProperties;
import io.pixelsdb.pixels.trino.properties.PixelsTableProperties;
Expand All @@ -36,13 +38,17 @@

import javax.inject.Inject;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class PixelsConnector implements Connector {
public class PixelsConnector implements Connector
{
private static final Logger logger = Logger.get(PixelsConnector.class);

private final PixelsConnectorId connectorId;
private final LifeCycleManager lifeCycleManager;
private final PixelsMetadata metadata;
private final PixelsMetadataProxy metadataProxy;
private final PixelsSplitManager splitManager;
private final boolean recordCursorEnabled;
private final PixelsPageSourceProvider pageSourceProvider;
Expand All @@ -54,16 +60,19 @@ public class PixelsConnector implements Connector {

@Inject
public PixelsConnector(
PixelsConnectorId connectorId,
LifeCycleManager lifeCycleManager,
PixelsMetadata metadata,
PixelsMetadataProxy metadataProxy,
PixelsSplitManager splitManager,
PixelsTrinoConfig config,
PixelsPageSourceProvider pageSourceProvider,
PixelsRecordSetProvider recordSetProvider,
PixelsSessionProperties sessionProperties,
PixelsTableProperties tableProperties) {
PixelsTableProperties tableProperties)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "recordSetProvider is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
Expand Down Expand Up @@ -93,7 +102,29 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel
throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_SERVICE_ERROR, e);
}
TransContext.Instance().beginQuery(info);
return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp());
QueryQueues.ExecutorType executorType;
if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.AUTO)
{
while ((executorType = QueryQueues.Instance().Enqueue(info.getQueryId())) == QueryQueues.ExecutorType.None)
{
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e)
{
logger.error("Interrupted while waiting for enqueue the query id.");
}
}
}
else if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.ON)
{
executorType = QueryQueues.ExecutorType.Lambda;
}
else
{
executorType = QueryQueues.ExecutorType.Cluster;
}
return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp(), executorType);
}

@Override
Expand All @@ -102,6 +133,7 @@ public void commit(ConnectorTransactionHandle transactionHandle)
if (transactionHandle instanceof PixelsTransactionHandle)
{
PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle;
QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType());
TransContext.Instance().commitQuery(handle.getTransId());
cleanIntermediatePathForQuery(handle.getTransId());
} else
Expand All @@ -117,8 +149,12 @@ public void rollback(ConnectorTransactionHandle transactionHandle)
if (transactionHandle instanceof PixelsTransactionHandle)
{
PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle;
QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType());
TransContext.Instance().rollbackQuery(handle.getTransId());
cleanIntermediatePathForQuery(handle.getTransId());
if (handle.getExecutorType() == QueryQueues.ExecutorType.Lambda)
{
cleanIntermediatePathForQuery(handle.getTransId());
}
} else
{
throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_HANDLE_TYPE_ERROR,
Expand All @@ -128,26 +164,23 @@ public void rollback(ConnectorTransactionHandle transactionHandle)

private void cleanIntermediatePathForQuery(long queryId)
{
if (config.isLambdaEnabled())
try
{
try
if (config.isCleanLocalResult())
{
if (config.isCleanLocalResult())
{
IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId));
}
} catch (InterruptedException e)
{
throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR,
"Failed to clean intermediate path for the query");
IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId));
}
} catch (InterruptedException e)
{
throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR,
"Failed to clean intermediate path for the query");
}
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transHandle)
{
return metadata;
return new PixelsMetadata(connectorId, metadataProxy, config, (PixelsTransactionHandle) transHandle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.pixelsdb.pixels.core.stats.StatsRecorder;
import io.pixelsdb.pixels.executor.aggregation.FunctionType;
import io.pixelsdb.pixels.optimizer.plan.Table;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
Expand All @@ -47,7 +48,6 @@
import io.trino.spi.statistics.Estimate;
import io.trino.spi.statistics.TableStatistics;

import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.*;

Expand All @@ -71,13 +71,15 @@ public class PixelsMetadata implements ConnectorMetadata
private final String connectorId;
private final PixelsMetadataProxy metadataProxy;
private final PixelsTrinoConfig config;
private final PixelsTransactionHandle transHandle;

@Inject
public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy, PixelsTrinoConfig config)
public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy,
PixelsTrinoConfig config, PixelsTransactionHandle transHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null");
this.config = requireNonNull(config, "config is null");
this.transHandle = requireNonNull(transHandle, "transHandle is null");
}

@Override
Expand Down Expand Up @@ -541,7 +543,7 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
ConnectorSession session, ConnectorTableHandle handle, List<AggregateFunction> aggregates,
Map<String, ColumnHandle> assignments, List<List<ColumnHandle>> groupingSets)
{
if (!config.isLambdaEnabled())
if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda)
{
return Optional.empty();
}
Expand Down Expand Up @@ -664,7 +666,7 @@ public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
Map<String, ColumnHandle> rightAssignments,
JoinStatistics statistics)
{
if (!config.isLambdaEnabled())
if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void configure(Binder binder) {
binder.bind(PixelsConnector.class).in(Scopes.SINGLETON);
binder.bind(PixelsConnectorId.class).toInstance(new PixelsConnectorId(connectorId));
binder.bind(PixelsTypeParser.class).in(Scopes.SINGLETON);
binder.bind(PixelsMetadata.class).in(Scopes.SINGLETON);
binder.bind(PixelsMetadataProxy.class).in(Scopes.SINGLETON);
binder.bind(PixelsSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PixelsPageSourceProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.pixelsdb.pixels.executor.lambda.output.ScanOutput;
import io.pixelsdb.pixels.executor.predicate.TableScanFilter;
import io.pixelsdb.pixels.optimizer.plan.Table;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -142,7 +143,9 @@ else if (config.getOutputScheme() == Storage.Scheme.redis)
{
// perform scan push down.
List<PixelsColumnHandle> withFilterColumns = getIncludeColumns(pixelsColumns, tableHandle);
if (config.isLambdaEnabled() && this.localSplitCounter.get() >= config.getLocalScanConcurrency())
PixelsTransactionHandle transHandle = (PixelsTransactionHandle) transactionHandle;
if (transHandle.getExecutorType() == QueryQueues.ExecutorType.Lambda &&
this.localSplitCounter.get() >= config.getLocalScanConcurrency())
{
String[] columnsToRead = new String[withFilterColumns.size()];
boolean[] projection = new boolean[withFilterColumns.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.core.PixelsFooterCache;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -75,7 +76,7 @@ public PixelsRecordSetProvider(PixelsConnectorId connectorId, PixelsTrinoConfig
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session,
ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
if (config.isLambdaEnabled())
if (((PixelsTransactionHandle) transaction).getExecutorType() == QueryQueues.ExecutorType.Lambda)
{
throw new TrinoException(PixelsErrorCode.PIXELS_CONFIG_ERROR,
"PixelsRecordSet does not support lambda coprocessor.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,41 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.trino.spi.connector.ConnectorTransactionHandle;

public class PixelsTransactionHandle implements ConnectorTransactionHandle
{
public static final PixelsTransactionHandle Default = new PixelsTransactionHandle(-1, -1);
public static final PixelsTransactionHandle Default = new PixelsTransactionHandle(
-1, -1, QueryQueues.ExecutorType.None);

/**
* transId is also the query id, as each query is a single-statement read-only transaction.
* transId is also the query id in Pixels, as each query is a single-statement read-only transaction.
*/
private final long transId;
/**
* The timestamp that is used to get a read snapshot of the query.
*/
private final long timestamp;
/**
* The type of executor to execute this query.
*/
private final QueryQueues.ExecutorType executorType;

/**
* Create a transaction handle.
* @param transId is also the queryId as a query is a single-statement read-only transaction.
* @param transId is also the query id in Pixels, as a query is a single-statement read-only transaction.
* @param timestamp the timestamp of a transaction.
* @param executorType the type of executor to execute this query.
*/
@JsonCreator
public PixelsTransactionHandle(@JsonProperty("transId") long transId,
@JsonProperty("timestamp") long timestamp)
@JsonProperty("timestamp") long timestamp,
@JsonProperty("executorType")QueryQueues.ExecutorType executorType)
{
this.transId = transId;
this.timestamp = timestamp;
this.executorType = executorType;
}

@JsonProperty
Expand All @@ -60,4 +69,10 @@ public long getTimestamp()
{
return this.timestamp;
}

@JsonProperty
public QueryQueues.ExecutorType getExecutorType()
{
return this.executorType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ public static int getBatchSize()
return BatchSize;
}

public enum LambdaSwitch { ON, OFF, AUTO }

private String pixelsConfig = null;
private boolean lambdaEnabled = false;
private LambdaSwitch lambdaSwitch = LambdaSwitch.AUTO;
private boolean cleanLocalResult = true;
private int localScanConcurrency = -1;
private Storage.Scheme outputScheme = null;
Expand Down Expand Up @@ -124,10 +126,10 @@ public PixelsTrinoConfig setPixelsConfig (String pixelsConfig)
return this;
}

@Config("lambda.enabled")
public PixelsTrinoConfig setLambdaEnabled(boolean enabled)
@Config("lambda.switch")
public PixelsTrinoConfig setLambdaSwitch(String lambdaSwitch)
{
this.lambdaEnabled = enabled;
this.lambdaSwitch = LambdaSwitch.valueOf(lambdaSwitch.toUpperCase());
return this;
}

Expand Down Expand Up @@ -190,9 +192,10 @@ public String getPixelsConfig ()
return this.pixelsConfig;
}

public boolean isLambdaEnabled()
@NotNull
public LambdaSwitch getLambdaSwitch()
{
return lambdaEnabled;
return lambdaSwitch;
}

public int getLocalScanConcurrency()
Expand Down
3 changes: 2 additions & 1 deletion connector/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ connector.name=pixels
pixels.config=/home/pixels/opt/pixels/pixels.properties

# serverless config
lambda.enabled=false
# lambda.switch can be on, off, auto
lambda.switch=auto
local.scan.concurrency=40
clean.local.result=true
output.scheme=output-scheme-dummy
Expand Down

0 comments on commit 9c25c11

Please # to comment.