Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Issue #4]: update docs. #48

Merged
merged 2 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
# pixels-trino
The Pixels connector for Trino.

**This project is under development, and not ready to be used.**

## Compatibility
Pixels integration (connector & event listener) is currently compatible with Trino 374.
Pixels integration (connector & event listener) is currently compatible with Trino 375.
Other Trino versions that are compatible
with the Connector SPI in Trino 374 should also work well with Pixels.
with the Connector SPI in Trino 375 should also work well with Pixels.

## Build
This project can be opened as a maven project in Intellij and built using maven.

**Note** that Trino 374 requires Java 11, thus this project should be build by Jdk 11 (11.0.11 is tested).
**Note** that Trino 375 requires Java 11, thus this project should be build by Jdk 11 (11.0.11 is tested).

[Pixels](https://github.com/pixelsdb/pixels) is the parent of this project,
therefore use `mvn install` to install Pixels modules into your local maven repository,
before building this project.

## Use Pixels in Trino

Ensure that Pixels and other prerequisites are installed following the instructions
[here](https://github.com/pixelsdb/pixels#installation-in-aws).
Follow the instructions
[here](https://github.com/pixelsdb/pixels#installation-in-aws) to install Pixels and use it in Trino.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.pixelsdb.pixels.common.transaction.QueryTransInfo;
import io.pixelsdb.pixels.common.transaction.TransContext;
import io.pixelsdb.pixels.common.transaction.TransService;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.pixelsdb.pixels.trino.properties.PixelsSessionProperties;
import io.pixelsdb.pixels.trino.properties.PixelsTableProperties;
Expand All @@ -36,13 +38,17 @@

import javax.inject.Inject;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class PixelsConnector implements Connector {
public class PixelsConnector implements Connector
{
private static final Logger logger = Logger.get(PixelsConnector.class);

private final PixelsConnectorId connectorId;
private final LifeCycleManager lifeCycleManager;
private final PixelsMetadata metadata;
private final PixelsMetadataProxy metadataProxy;
private final PixelsSplitManager splitManager;
private final boolean recordCursorEnabled;
private final PixelsPageSourceProvider pageSourceProvider;
Expand All @@ -54,16 +60,19 @@ public class PixelsConnector implements Connector {

@Inject
public PixelsConnector(
PixelsConnectorId connectorId,
LifeCycleManager lifeCycleManager,
PixelsMetadata metadata,
PixelsMetadataProxy metadataProxy,
PixelsSplitManager splitManager,
PixelsTrinoConfig config,
PixelsPageSourceProvider pageSourceProvider,
PixelsRecordSetProvider recordSetProvider,
PixelsSessionProperties sessionProperties,
PixelsTableProperties tableProperties) {
PixelsTableProperties tableProperties)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "recordSetProvider is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
Expand Down Expand Up @@ -93,7 +102,29 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel
throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_SERVICE_ERROR, e);
}
TransContext.Instance().beginQuery(info);
return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp());
QueryQueues.ExecutorType executorType;
if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.AUTO)
{
while ((executorType = QueryQueues.Instance().Enqueue(info.getQueryId())) == QueryQueues.ExecutorType.None)
{
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e)
{
logger.error("Interrupted while waiting for enqueue the query id.");
}
}
}
else if (config.getLambdaSwitch() == PixelsTrinoConfig.LambdaSwitch.ON)
{
executorType = QueryQueues.ExecutorType.Lambda;
}
else
{
executorType = QueryQueues.ExecutorType.Cluster;
}
return new PixelsTransactionHandle(info.getQueryId(), info.getQueryTimestamp(), executorType);
}

@Override
Expand All @@ -102,6 +133,7 @@ public void commit(ConnectorTransactionHandle transactionHandle)
if (transactionHandle instanceof PixelsTransactionHandle)
{
PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle;
QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType());
TransContext.Instance().commitQuery(handle.getTransId());
cleanIntermediatePathForQuery(handle.getTransId());
} else
Expand All @@ -117,8 +149,12 @@ public void rollback(ConnectorTransactionHandle transactionHandle)
if (transactionHandle instanceof PixelsTransactionHandle)
{
PixelsTransactionHandle handle = (PixelsTransactionHandle) transactionHandle;
QueryQueues.Instance().Dequeue(handle.getTransId(), handle.getExecutorType());
TransContext.Instance().rollbackQuery(handle.getTransId());
cleanIntermediatePathForQuery(handle.getTransId());
if (handle.getExecutorType() == QueryQueues.ExecutorType.Lambda)
{
cleanIntermediatePathForQuery(handle.getTransId());
}
} else
{
throw new TrinoException(PixelsErrorCode.PIXELS_TRANS_HANDLE_TYPE_ERROR,
Expand All @@ -128,26 +164,23 @@ public void rollback(ConnectorTransactionHandle transactionHandle)

private void cleanIntermediatePathForQuery(long queryId)
{
if (config.isLambdaEnabled())
try
{
try
if (config.isCleanLocalResult())
{
if (config.isCleanLocalResult())
{
IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId));
}
} catch (InterruptedException e)
{
throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR,
"Failed to clean intermediate path for the query");
IntermediateFileCleaner.Instance().asyncDelete(config.getOutputFolderForQuery(queryId));
}
} catch (InterruptedException e)
{
throw new TrinoException(PixelsErrorCode.PIXELS_STORAGE_ERROR,
"Failed to clean intermediate path for the query");
}
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transHandle)
{
return metadata;
return new PixelsMetadata(connectorId, metadataProxy, config, (PixelsTransactionHandle) transHandle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.pixelsdb.pixels.core.stats.StatsRecorder;
import io.pixelsdb.pixels.executor.aggregation.FunctionType;
import io.pixelsdb.pixels.optimizer.plan.Table;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
Expand All @@ -47,7 +48,6 @@
import io.trino.spi.statistics.Estimate;
import io.trino.spi.statistics.TableStatistics;

import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.*;

Expand All @@ -71,13 +71,15 @@ public class PixelsMetadata implements ConnectorMetadata
private final String connectorId;
private final PixelsMetadataProxy metadataProxy;
private final PixelsTrinoConfig config;
private final PixelsTransactionHandle transHandle;

@Inject
public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy, PixelsTrinoConfig config)
public PixelsMetadata(PixelsConnectorId connectorId, PixelsMetadataProxy metadataProxy,
PixelsTrinoConfig config, PixelsTransactionHandle transHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.metadataProxy = requireNonNull(metadataProxy, "metadataProxy is null");
this.config = requireNonNull(config, "config is null");
this.transHandle = requireNonNull(transHandle, "transHandle is null");
}

@Override
Expand Down Expand Up @@ -541,7 +543,7 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
ConnectorSession session, ConnectorTableHandle handle, List<AggregateFunction> aggregates,
Map<String, ColumnHandle> assignments, List<List<ColumnHandle>> groupingSets)
{
if (!config.isLambdaEnabled())
if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda)
{
return Optional.empty();
}
Expand Down Expand Up @@ -664,7 +666,7 @@ public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
Map<String, ColumnHandle> rightAssignments,
JoinStatistics statistics)
{
if (!config.isLambdaEnabled())
if (transHandle.getExecutorType() != QueryQueues.ExecutorType.Lambda)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void configure(Binder binder) {
binder.bind(PixelsConnector.class).in(Scopes.SINGLETON);
binder.bind(PixelsConnectorId.class).toInstance(new PixelsConnectorId(connectorId));
binder.bind(PixelsTypeParser.class).in(Scopes.SINGLETON);
binder.bind(PixelsMetadata.class).in(Scopes.SINGLETON);
binder.bind(PixelsMetadataProxy.class).in(Scopes.SINGLETON);
binder.bind(PixelsSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PixelsPageSourceProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.pixelsdb.pixels.executor.lambda.output.ScanOutput;
import io.pixelsdb.pixels.executor.predicate.TableScanFilter;
import io.pixelsdb.pixels.optimizer.plan.Table;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -142,7 +143,9 @@ else if (config.getOutputScheme() == Storage.Scheme.redis)
{
// perform scan push down.
List<PixelsColumnHandle> withFilterColumns = getIncludeColumns(pixelsColumns, tableHandle);
if (config.isLambdaEnabled() && this.localSplitCounter.get() >= config.getLocalScanConcurrency())
PixelsTransactionHandle transHandle = (PixelsTransactionHandle) transactionHandle;
if (transHandle.getExecutorType() == QueryQueues.ExecutorType.Lambda &&
this.localSplitCounter.get() >= config.getLocalScanConcurrency())
{
String[] columnsToRead = new String[withFilterColumns.size()];
boolean[] projection = new boolean[withFilterColumns.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.core.PixelsFooterCache;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsTrinoConfig;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -75,7 +76,7 @@ public PixelsRecordSetProvider(PixelsConnectorId connectorId, PixelsTrinoConfig
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session,
ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
if (config.isLambdaEnabled())
if (((PixelsTransactionHandle) transaction).getExecutorType() == QueryQueues.ExecutorType.Lambda)
{
throw new TrinoException(PixelsErrorCode.PIXELS_CONFIG_ERROR,
"PixelsRecordSet does not support lambda coprocessor.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,41 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.pixelsdb.pixels.optimizer.queue.QueryQueues;
import io.trino.spi.connector.ConnectorTransactionHandle;

public class PixelsTransactionHandle implements ConnectorTransactionHandle
{
public static final PixelsTransactionHandle Default = new PixelsTransactionHandle(-1, -1);
public static final PixelsTransactionHandle Default = new PixelsTransactionHandle(
-1, -1, QueryQueues.ExecutorType.None);

/**
* transId is also the query id, as each query is a single-statement read-only transaction.
* transId is also the query id in Pixels, as each query is a single-statement read-only transaction.
*/
private final long transId;
/**
* The timestamp that is used to get a read snapshot of the query.
*/
private final long timestamp;
/**
* The type of executor to execute this query.
*/
private final QueryQueues.ExecutorType executorType;

/**
* Create a transaction handle.
* @param transId is also the queryId as a query is a single-statement read-only transaction.
* @param transId is also the query id in Pixels, as a query is a single-statement read-only transaction.
* @param timestamp the timestamp of a transaction.
* @param executorType the type of executor to execute this query.
*/
@JsonCreator
public PixelsTransactionHandle(@JsonProperty("transId") long transId,
@JsonProperty("timestamp") long timestamp)
@JsonProperty("timestamp") long timestamp,
@JsonProperty("executorType")QueryQueues.ExecutorType executorType)
{
this.transId = transId;
this.timestamp = timestamp;
this.executorType = executorType;
}

@JsonProperty
Expand All @@ -60,4 +69,10 @@ public long getTimestamp()
{
return this.timestamp;
}

@JsonProperty
public QueryQueues.ExecutorType getExecutorType()
{
return this.executorType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ public static int getBatchSize()
return BatchSize;
}

public enum LambdaSwitch { ON, OFF, AUTO }

private String pixelsConfig = null;
private boolean lambdaEnabled = false;
private LambdaSwitch lambdaSwitch = LambdaSwitch.AUTO;
private boolean cleanLocalResult = true;
private int localScanConcurrency = -1;
private Storage.Scheme outputScheme = null;
Expand Down Expand Up @@ -124,10 +126,10 @@ public PixelsTrinoConfig setPixelsConfig (String pixelsConfig)
return this;
}

@Config("lambda.enabled")
public PixelsTrinoConfig setLambdaEnabled(boolean enabled)
@Config("lambda.switch")
public PixelsTrinoConfig setLambdaSwitch(String lambdaSwitch)
{
this.lambdaEnabled = enabled;
this.lambdaSwitch = LambdaSwitch.valueOf(lambdaSwitch.toUpperCase());
return this;
}

Expand Down Expand Up @@ -190,9 +192,10 @@ public String getPixelsConfig ()
return this.pixelsConfig;
}

public boolean isLambdaEnabled()
@NotNull
public LambdaSwitch getLambdaSwitch()
{
return lambdaEnabled;
return lambdaSwitch;
}

public int getLocalScanConcurrency()
Expand Down
3 changes: 2 additions & 1 deletion connector/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ connector.name=pixels
pixels.config=/home/pixels/opt/pixels/pixels.properties

# serverless config
lambda.enabled=false
# lambda.switch can be on, off, auto
lambda.switch=auto
local.scan.concurrency=40
clean.local.result=true
output.scheme=output-scheme-dummy
Expand Down