Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Issue #4]: implement projection pushdown. #7

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 88 additions & 29 deletions connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,36 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.metadata.domain.Column;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.*;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.security.TrinoPrincipal;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

/**
* @author tao
* @author hank
* @author tianxiao
**/
public class PixelsMetadata implements ConnectorMetadata
{
// private static final Logger logger = Logger.get(PixelsMetadata.class);
private final String connectorId;

private final PixelsMetadataProxy pixelsMetadataProxy;

@Inject
Expand Down Expand Up @@ -98,8 +99,17 @@ public PixelsTableHandle getTableHandle(ConnectorSession session, SchemaTableNam
{
if (this.pixelsMetadataProxy.existTable(tableName.getSchemaName(), tableName.getTableName()))
{
List<PixelsColumnHandle> columns;
try
{
columns = pixelsMetadataProxy.getTableColumn(
connectorId, tableName.getSchemaName(), tableName.getTableName());
} catch (MetadataException e)
{
throw new TrinoException(PixelsErrorCode.PIXELS_METASTORE_ERROR, e);
}
PixelsTableHandle tableHandle = new PixelsTableHandle(
connectorId, tableName.getSchemaName(), tableName.getTableName(), "");
connectorId, tableName.getSchemaName(), tableName.getTableName(), columns);
return tableHandle;
}
} catch (MetadataException e)
Expand All @@ -109,28 +119,6 @@ public PixelsTableHandle getTableHandle(ConnectorSession session, SchemaTableNam
return null;
}

// TODO: table layouts are deprecated since Presto 306
//
// @Override
// public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table,
// Constraint<ColumnHandle> constraint,
// Optional<Set<ColumnHandle>> desiredColumns)
// {
// PixelsTableHandle tableHandle = (PixelsTableHandle) table;
// PixelsTableLayoutHandle tableLayout = new PixelsTableLayoutHandle(tableHandle);
// tableLayout.setConstraint(constraint.getSummary());
// if(desiredColumns.isPresent())
// tableLayout.setDesiredColumns(desiredColumns.get());
// ConnectorTableLayout layout = getTableLayout(session, tableLayout);
// return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
// }
//
// @Override
// public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
// {
// return new ConnectorTableLayout(handle);
// }

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down Expand Up @@ -360,7 +348,78 @@ public void dropSchema(ConnectorSession session, String schemaName)
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
ConnectorSession session, ConnectorTableHandle handle, long limit)
{
return ConnectorMetadata.super.applyLimit(session, handle, limit);
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
return ConnectorMetadata.super.applyFilter(session, handle, constraint);
}

@Override
public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjection(
ConnectorSession session, ConnectorTableHandle handle, List<ConnectorExpression> projections,
Map<String, ColumnHandle> assignments)
{
PixelsTableHandle tableHandle = (PixelsTableHandle) handle;

List<PixelsColumnHandle> newColumns = assignments.values().stream()
.map(PixelsColumnHandle.class::cast).collect(toImmutableList());

Set<PixelsColumnHandle> newColumnSet = ImmutableSet.copyOf(newColumns);
Set<PixelsColumnHandle> tableColumnSet = ImmutableSet.copyOf(tableHandle.getColumns());
if (newColumnSet.equals(tableColumnSet))
{
return Optional.empty();
}

verify(tableColumnSet.containsAll(newColumnSet),
"applyProjection called with columns %s and some are not available in existing query: %s",
newColumnSet, tableColumnSet);

return Optional.of(new ProjectionApplicationResult<>(
new PixelsTableHandle(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), newColumns),
projections,
assignments.entrySet().stream().map(assignment -> new Assignment(
assignment.getKey(), assignment.getValue(),
((PixelsColumnHandle) assignment.getValue()).getColumnType()
)).collect(toImmutableList()),
// always pushdown by setting precalculateStatistics to false.
false));
}

@Override
public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggregation(
ConnectorSession session, ConnectorTableHandle handle, List<AggregateFunction> aggregates,
Map<String, ColumnHandle> assignments, List<List<ColumnHandle>> groupingSets)
{
return ConnectorMetadata.super.applyAggregation(session, handle, aggregates, assignments, groupingSets);
}

@Override
public Optional<JoinApplicationResult<ConnectorTableHandle>> applyJoin(
ConnectorSession session, JoinType joinType, ConnectorTableHandle left, ConnectorTableHandle right,
List<JoinCondition> joinConditions, Map<String, ColumnHandle> leftAssignments,
Map<String, ColumnHandle> rightAssignments, JoinStatistics statistics)
{
return ConnectorMetadata.super.applyJoin(session, joinType, left, right, joinConditions, leftAssignments,
rightAssignments, statistics);
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
ConnectorMetadata.super.validateScan(session, handle);
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName,
ConnectorViewDefinition definition, boolean replace)
{
// TODO: API change in Trino 315 https://trino.io/docs/current/release/release-315.html:
// Allow connectors to provide view definitions. ConnectorViewDefinition now contains the real view definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.nio.charset.StandardCharsets;
import java.util.*;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -87,7 +88,6 @@ public PixelsSplitManager(PixelsConnectorId connectorId, PixelsMetadataProxy met
this.cacheSchema = config.getConfigFactory().getProperty("cache.schema");
this.cacheTable = config.getConfigFactory().getProperty("cache.table");
}


@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_,
Expand All @@ -101,9 +101,13 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_,
PixelsTableHandle tableHandle = (PixelsTableHandle) tableHandle_;
TupleDomain<PixelsColumnHandle> constraint = constraint_.getSummary()
.transformKeys(PixelsColumnHandle.class::cast);
// TODO: get desiredColumns using projection push-down.
// Set<PixelsColumnHandle> desiredColumns = layoutHandle.getDesiredColumns().stream().map(PixelsColumnHandle.class::cast)
// .collect(toSet());
if (dynamicFilter.isAwaitable())
{
logger.info("Using predicate from dynamicFilter.");
constraint = dynamicFilter.getCurrentPredicate().transformKeys(PixelsColumnHandle.class::cast);
}
Set<PixelsColumnHandle> desiredColumns = tableHandle.getColumns().stream().map(PixelsColumnHandle.class::cast)
.collect(toImmutableSet());

String schemaName = tableHandle.getSchemaName();
String tableName = tableHandle.getTableName();
Expand Down Expand Up @@ -156,11 +160,10 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_,
IndexName indexName = new IndexName(schemaName, tableName);
Order order = JSON.parseObject(layout.getOrder(), Order.class);
ColumnSet columnSet = new ColumnSet();
// TODO: get desiredColumns using projection push-down.
// for (PixelsColumnHandle column : desiredColumns)
// {
// columnSet.addColumn(column.getColumnName());
// }
for (PixelsColumnHandle column : desiredColumns)
{
columnSet.addColumn(column.getColumnName());
}

// get split size
int splitSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,78 +25,93 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

/**
* @author tao
* @author hank
*/
public final class PixelsTableHandle implements ConnectorTableHandle
{
private final String connectorId;
private final String schemaName;
private final String tableName;
private final String path;
private final List<PixelsColumnHandle> columns;

@JsonCreator
public PixelsTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("path") String path) {
@JsonProperty("columns") List<PixelsColumnHandle> columns)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.path = requireNonNull(path, "path is null");
this.columns = requireNonNull(columns, "columns is null");
}

@JsonProperty
public String getConnectorId() {
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public String getSchemaName() {
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName() {
public String getTableName()
{
return tableName;
}

@JsonProperty
public String getPath() {
return path;
public List<PixelsColumnHandle> getColumns()
{
return columns;
}

public SchemaTableName toSchemaTableName() {
public SchemaTableName toSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}

@Override
public int hashCode() {
return Objects.hash(connectorId, schemaName, tableName, path);
public int hashCode()
{
return Objects.hash(connectorId, schemaName, tableName, columns);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
if ((obj == null) || (getClass() != obj.getClass()))
{
return false;
}

PixelsTableHandle other = (PixelsTableHandle) obj;
return Objects.equals(this.connectorId, other.connectorId) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName) &&
Objects.equals(this.path, other.path);
Objects.equals(this.columns, other.columns);
}

@Override
public String toString() {
return Joiner.on(":").join(connectorId, schemaName, tableName, path);
public String toString()
{
return Joiner.on(":").join(connectorId, schemaName, tableName,
Joiner.on(",").join(columns));
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Trino-374 requires Java 11 -->
<!-- Trino-375 requires Java 11 -->
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.deploy.skip>true</maven.deploy.skip>
<maven.install.skip>true</maven.install.skip>

<dep.trino.version>374</dep.trino.version>
<dep.trino.version>375</dep.trino.version>
<dep.airlift.version>214</dep.airlift.version>
<!-- override the airlift.slice version in pixels -->
<dep.airlift.slice.version>0.41</dep.airlift.slice.version>
Expand Down