From c8628f73ed5051d396b19ee73e5d33d7f7090f14 Mon Sep 17 00:00:00 2001 From: Haoqiong Bian Date: Mon, 25 Apr 2022 05:24:50 +0200 Subject: [PATCH] implement projection pushdown. --- .../pixelsdb/pixels/trino/PixelsMetadata.java | 117 +++++++++++++----- .../pixels/trino/PixelsSplitManager.java | 21 ++-- .../pixels/trino/PixelsTableHandle.java | 49 +++++--- pom.xml | 4 +- 4 files changed, 134 insertions(+), 57 deletions(-) diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java index 58278d9..816b4fa 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsMetadata.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.common.physical.Storage; @@ -28,15 +29,15 @@ import io.pixelsdb.pixels.trino.impl.PixelsMetadataProxy; import io.trino.spi.TrinoException; import io.trino.spi.connector.*; +import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.security.TrinoPrincipal; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -44,12 +45,12 @@ /** * @author tao * @author hank + * @author tianxiao **/ public class PixelsMetadata implements ConnectorMetadata { // private static final Logger logger = Logger.get(PixelsMetadata.class); private final String connectorId; - private final PixelsMetadataProxy pixelsMetadataProxy; @Inject @@ -98,8 +99,17 @@ public PixelsTableHandle getTableHandle(ConnectorSession session, SchemaTableNam { if (this.pixelsMetadataProxy.existTable(tableName.getSchemaName(), tableName.getTableName())) { + List columns; + try + { + columns = pixelsMetadataProxy.getTableColumn( + connectorId, tableName.getSchemaName(), tableName.getTableName()); + } catch (MetadataException e) + { + throw new TrinoException(PixelsErrorCode.PIXELS_METASTORE_ERROR, e); + } PixelsTableHandle tableHandle = new PixelsTableHandle( - connectorId, tableName.getSchemaName(), tableName.getTableName(), ""); + connectorId, tableName.getSchemaName(), tableName.getTableName(), columns); return tableHandle; } } catch (MetadataException e) @@ -109,28 +119,6 @@ public PixelsTableHandle getTableHandle(ConnectorSession session, SchemaTableNam return null; } -// TODO: table layouts are deprecated since Presto 306 -// -// @Override -// public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, -// Constraint constraint, -// Optional> desiredColumns) -// { -// PixelsTableHandle tableHandle = (PixelsTableHandle) table; -// PixelsTableLayoutHandle tableLayout = new PixelsTableLayoutHandle(tableHandle); -// tableLayout.setConstraint(constraint.getSummary()); -// if(desiredColumns.isPresent()) -// tableLayout.setDesiredColumns(desiredColumns.get()); -// ConnectorTableLayout layout = getTableLayout(session, tableLayout); -// return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); -// } -// -// @Override -// public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) -// { -// return new ConnectorTableLayout(handle); -// } - @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { @@ -360,7 +348,78 @@ public void dropSchema(ConnectorSession session, String schemaName) } @Override - public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace) + public Optional> applyLimit( + ConnectorSession session, ConnectorTableHandle handle, long limit) + { + return ConnectorMetadata.super.applyLimit(session, handle, limit); + } + + @Override + public Optional> applyFilter( + ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) + { + return ConnectorMetadata.super.applyFilter(session, handle, constraint); + } + + @Override + public Optional> applyProjection( + ConnectorSession session, ConnectorTableHandle handle, List projections, + Map assignments) + { + PixelsTableHandle tableHandle = (PixelsTableHandle) handle; + + List newColumns = assignments.values().stream() + .map(PixelsColumnHandle.class::cast).collect(toImmutableList()); + + Set newColumnSet = ImmutableSet.copyOf(newColumns); + Set tableColumnSet = ImmutableSet.copyOf(tableHandle.getColumns()); + if (newColumnSet.equals(tableColumnSet)) + { + return Optional.empty(); + } + + verify(tableColumnSet.containsAll(newColumnSet), + "applyProjection called with columns %s and some are not available in existing query: %s", + newColumnSet, tableColumnSet); + + return Optional.of(new ProjectionApplicationResult<>( + new PixelsTableHandle(connectorId, tableHandle.getSchemaName(), tableHandle.getTableName(), newColumns), + projections, + assignments.entrySet().stream().map(assignment -> new Assignment( + assignment.getKey(), assignment.getValue(), + ((PixelsColumnHandle) assignment.getValue()).getColumnType() + )).collect(toImmutableList()), + // always pushdown by setting precalculateStatistics to false. + false)); + } + + @Override + public Optional> applyAggregation( + ConnectorSession session, ConnectorTableHandle handle, List aggregates, + Map assignments, List> groupingSets) + { + return ConnectorMetadata.super.applyAggregation(session, handle, aggregates, assignments, groupingSets); + } + + @Override + public Optional> applyJoin( + ConnectorSession session, JoinType joinType, ConnectorTableHandle left, ConnectorTableHandle right, + List joinConditions, Map leftAssignments, + Map rightAssignments, JoinStatistics statistics) + { + return ConnectorMetadata.super.applyJoin(session, joinType, left, right, joinConditions, leftAssignments, + rightAssignments, statistics); + } + + @Override + public void validateScan(ConnectorSession session, ConnectorTableHandle handle) + { + ConnectorMetadata.super.validateScan(session, handle); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName viewName, + ConnectorViewDefinition definition, boolean replace) { // TODO: API change in Trino 315 https://trino.io/docs/current/release/release-315.html: // Allow connectors to provide view definitions. ConnectorViewDefinition now contains the real view definition diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java index db700cf..91af9b1 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsSplitManager.java @@ -54,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; /** @@ -87,7 +88,6 @@ public PixelsSplitManager(PixelsConnectorId connectorId, PixelsMetadataProxy met this.cacheSchema = config.getConfigFactory().getProperty("cache.schema"); this.cacheTable = config.getConfigFactory().getProperty("cache.table"); } - @Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_, @@ -101,9 +101,13 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_, PixelsTableHandle tableHandle = (PixelsTableHandle) tableHandle_; TupleDomain constraint = constraint_.getSummary() .transformKeys(PixelsColumnHandle.class::cast); -// TODO: get desiredColumns using projection push-down. -// Set desiredColumns = layoutHandle.getDesiredColumns().stream().map(PixelsColumnHandle.class::cast) -// .collect(toSet()); + if (dynamicFilter.isAwaitable()) + { + logger.info("Using predicate from dynamicFilter."); + constraint = dynamicFilter.getCurrentPredicate().transformKeys(PixelsColumnHandle.class::cast); + } + Set desiredColumns = tableHandle.getColumns().stream().map(PixelsColumnHandle.class::cast) + .collect(toImmutableSet()); String schemaName = tableHandle.getSchemaName(); String tableName = tableHandle.getTableName(); @@ -156,11 +160,10 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transHandle_, IndexName indexName = new IndexName(schemaName, tableName); Order order = JSON.parseObject(layout.getOrder(), Order.class); ColumnSet columnSet = new ColumnSet(); -// TODO: get desiredColumns using projection push-down. -// for (PixelsColumnHandle column : desiredColumns) -// { -// columnSet.addColumn(column.getColumnName()); -// } + for (PixelsColumnHandle column : desiredColumns) + { + columnSet.addColumn(column.getColumnName()); + } // get split size int splitSize; diff --git a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTableHandle.java b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTableHandle.java index fb91167..cf0569f 100644 --- a/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTableHandle.java +++ b/connector/src/main/java/io/pixelsdb/pixels/trino/PixelsTableHandle.java @@ -25,66 +25,79 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; +import java.util.List; import java.util.Objects; import static java.util.Objects.requireNonNull; + /** * @author tao + * @author hank */ public final class PixelsTableHandle implements ConnectorTableHandle { private final String connectorId; private final String schemaName; private final String tableName; - private final String path; + private final List columns; @JsonCreator public PixelsTableHandle( @JsonProperty("connectorId") String connectorId, @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, - @JsonProperty("path") String path) { + @JsonProperty("columns") List columns) + { this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); - this.path = requireNonNull(path, "path is null"); + this.columns = requireNonNull(columns, "columns is null"); } @JsonProperty - public String getConnectorId() { + public String getConnectorId() + { return connectorId; } @JsonProperty - public String getSchemaName() { + public String getSchemaName() + { return schemaName; } @JsonProperty - public String getTableName() { + public String getTableName() + { return tableName; } @JsonProperty - public String getPath() { - return path; + public List getColumns() + { + return columns; } - public SchemaTableName toSchemaTableName() { + public SchemaTableName toSchemaTableName() + { return new SchemaTableName(schemaName, tableName); } @Override - public int hashCode() { - return Objects.hash(connectorId, schemaName, tableName, path); + public int hashCode() + { + return Objects.hash(connectorId, schemaName, tableName, columns); } @Override - public boolean equals(Object obj) { - if (this == obj) { + public boolean equals(Object obj) + { + if (this == obj) + { return true; } - if ((obj == null) || (getClass() != obj.getClass())) { + if ((obj == null) || (getClass() != obj.getClass())) + { return false; } @@ -92,11 +105,13 @@ public boolean equals(Object obj) { return Objects.equals(this.connectorId, other.connectorId) && Objects.equals(this.schemaName, other.schemaName) && Objects.equals(this.tableName, other.tableName) && - Objects.equals(this.path, other.path); + Objects.equals(this.columns, other.columns); } @Override - public String toString() { - return Joiner.on(":").join(connectorId, schemaName, tableName, path); + public String toString() + { + return Joiner.on(":").join(connectorId, schemaName, tableName, + Joiner.on(",").join(columns)); } } diff --git a/pom.xml b/pom.xml index 06755ca..226c755 100644 --- a/pom.xml +++ b/pom.xml @@ -33,13 +33,13 @@ UTF-8 - + 11 11 true true - 374 + 375 214 0.41