Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Issue #4]: adapt to metadata cache and cost-based splits index. #37

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private CompletableFuture<?> getLambdaAggrOutput(PixelsSplit inputSplit)
try
{
inputSplit.permute(Storage.Scheme.minio, (AggregationOutput) aggrOutput);
logger.info("final aggr output: " + JSON.toJSONString(aggrOutput));
}
catch (Exception e)
{
Expand Down Expand Up @@ -261,6 +262,7 @@ else if (inputSplit.getJoinAlgo() == JoinAlgorithm.PARTITIONED)
try
{
inputSplit.permute(Storage.Scheme.minio, (JoinOutput) joinOutput);
logger.info("final join output: " + JSON.toJSONString(joinOutput));
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.etcd.jetcd.KeyValue;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.layout.*;
import io.pixelsdb.pixels.common.metadata.SchemaTableName;
import io.pixelsdb.pixels.common.metadata.domain.Table;
import io.pixelsdb.pixels.common.metadata.domain.*;
import io.pixelsdb.pixels.common.physical.Location;
Expand Down Expand Up @@ -146,7 +147,16 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle trans,
PixelsTableHandle tableHandle = (PixelsTableHandle) handle;
if (tableHandle.getTableType() == TableType.BASE)
{
List<PixelsSplit> pixelsSplits = getScanSplits(transHandle, session, tableHandle);
List<PixelsSplit> pixelsSplits;
try
{
pixelsSplits = getScanSplits(transHandle, session, tableHandle);
} catch (MetadataException e)
{
logger.error(e, "failed to get scan splits");
throw new TrinoException(PixelsErrorCode.PIXELS_SQL_EXECUTE_ERROR,
"failed to get scan splits", e);
}
Collections.shuffle(pixelsSplits);
return new PixelsSplitSource(pixelsSplits);
}
Expand Down Expand Up @@ -646,7 +656,7 @@ public static Pair<Integer, InputSplit> getInputSplit(PixelsSplit split)
}

private List<PixelsSplit> getScanSplits(PixelsTransactionHandle transHandle, ConnectorSession session,
PixelsTableHandle tableHandle)
PixelsTableHandle tableHandle) throws MetadataException
{
// Do not use constraint_ in the parameters, it is always TupleDomain.all().
TupleDomain<PixelsColumnHandle> constraint = tableHandle.getConstraint();
Expand Down Expand Up @@ -701,7 +711,7 @@ private List<PixelsSplit> getScanSplits(PixelsTransactionHandle transHandle, Con
{
// get index
int version = layout.getVersion();
IndexName indexName = new IndexName(schemaName, tableName);
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);
Order order = JSON.parseObject(layout.getOrder(), Order.class);
ColumnSet columnSet = new ColumnSet();
for (PixelsColumnHandle column : desiredColumns)
Expand All @@ -719,19 +729,19 @@ private List<PixelsSplit> getScanSplits(PixelsTransactionHandle transHandle, Con
else
{
// log.info("columns to be accessed: " + columnSet.toString());
SplitsIndex splitsIndex = IndexFactory.Instance().getSplitsIndex(indexName);
SplitsIndex splitsIndex = IndexFactory.Instance().getSplitsIndex(schemaTableName);
if (splitsIndex == null)
{
logger.debug("splits index not exist in factory, building index...");
splitsIndex = buildSplitsIndex(order, splits, indexName);
splitsIndex = buildSplitsIndex(order, splits, schemaTableName);
}
else
{
int indexVersion = splitsIndex.getVersion();
if (indexVersion < version)
{
logger.debug("splits index version is not up-to-date, updating index...");
splitsIndex = buildSplitsIndex(order, splits, indexName);
splitsIndex = buildSplitsIndex(order, splits, schemaTableName);
}
}
SplitPattern bestSplitPattern = splitsIndex.search(columnSet);
Expand All @@ -745,20 +755,20 @@ private List<PixelsSplit> getScanSplits(PixelsTransactionHandle transHandle, Con
String compactPath;
if (projectionReadEnabled)
{
ProjectionsIndex projectionsIndex = IndexFactory.Instance().getProjectionsIndex(indexName);
ProjectionsIndex projectionsIndex = IndexFactory.Instance().getProjectionsIndex(schemaTableName);
Projections projections = JSON.parseObject(layout.getProjections(), Projections.class);
if (projectionsIndex == null)
{
logger.debug("projections index not exist in factory, building index...");
projectionsIndex = buildProjectionsIndex(order, projections, indexName);
projectionsIndex = buildProjectionsIndex(order, projections, schemaTableName);
}
else
{
int indexVersion = projectionsIndex.getVersion();
if (indexVersion < version)
{
logger.debug("projections index is not up-to-date, updating index...");
projectionsIndex = buildProjectionsIndex(order, projections, indexName);
projectionsIndex = buildProjectionsIndex(order, projections, schemaTableName);
}
}
ProjectionPattern projectionPattern = projectionsIndex.search(columnSet);
Expand Down Expand Up @@ -1156,20 +1166,36 @@ private List<HostAddress> toHostAddresses(List<Location> locations)
return addressBuilder.build();
}

private SplitsIndex buildSplitsIndex(Order order, Splits splits, IndexName indexName) {
private SplitsIndex buildSplitsIndex(Order order, Splits splits, SchemaTableName schemaTableName)
throws MetadataException
{
List<String> columnOrder = order.getColumnOrder();
SplitsIndex index;
index = new InvertedSplitsIndex(columnOrder, SplitPattern.buildPatterns(columnOrder, splits),
splits.getNumRowGroupInBlock());
IndexFactory.Instance().cacheSplitsIndex(indexName, index);
String indexTypeName = config.getConfigFactory().getProperty("splits.index.type");
SplitsIndex.IndexType indexType = SplitsIndex.IndexType.valueOf(indexTypeName.toUpperCase());
switch (indexType)
{
case INVERTED:
index = new InvertedSplitsIndex(columnOrder, SplitPattern.buildPatterns(columnOrder, splits),
splits.getNumRowGroupInBlock());
break;
case COST_BASED:
index = new CostBasedSplitsIndex(this.metadataProxy.getMetadataService(), schemaTableName,
splits.getNumRowGroupInBlock(), splits.getNumRowGroupInBlock());
break;
default:
throw new UnsupportedOperationException("splits index type '" + indexType + "' is not supported");
}
IndexFactory.Instance().cacheSplitsIndex(schemaTableName, index);
return index;
}

private ProjectionsIndex buildProjectionsIndex(Order order, Projections projections, IndexName indexName) {
private ProjectionsIndex buildProjectionsIndex(Order order, Projections projections, SchemaTableName schemaTableName)
{
List<String> columnOrder = order.getColumnOrder();
ProjectionsIndex index;
index = new InvertedProjectionsIndex(columnOrder, ProjectionPattern.buildPatterns(columnOrder, projections));
IndexFactory.Instance().cacheProjectionsIndex(indexName, index);
IndexFactory.Instance().cacheProjectionsIndex(schemaTableName, index);
return index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.metadata.MetadataCache;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.SchemaTableName;
import io.pixelsdb.pixels.common.metadata.domain.*;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.core.TypeDescription;
import io.pixelsdb.pixels.trino.PixelsColumnHandle;
import io.pixelsdb.pixels.trino.PixelsTypeParser;
import io.pixelsdb.pixels.trino.exception.PixelsErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.Type;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;

Expand All @@ -49,7 +48,7 @@ public class PixelsMetadataProxy
private static final Logger logger = Logger.get(PixelsMetadataProxy.class);
private final MetadataService metadataService;
private final PixelsTypeParser typeParser;
private final Map<SchemaTableName, List<Column>> tableColumnsMap = new HashMap<>();
private final MetadataCache metadataCache = MetadataCache.Instance();

@Inject
public PixelsMetadataProxy(PixelsTrinoConfig config, PixelsTypeParser typeParser)
Expand All @@ -73,6 +72,11 @@ public PixelsMetadataProxy(PixelsTrinoConfig config, PixelsTypeParser typeParser
}));
}

public MetadataService getMetadataService()
{
return metadataService;
}

public List<String> getSchemaNames() throws MetadataException
{
List<String> schemaList = new ArrayList<String>();
Expand Down Expand Up @@ -118,7 +122,7 @@ public List<PixelsColumnHandle> getTableColumn(String connectorId, String schema
ImmutableList.Builder<PixelsColumnHandle> columnsBuilder = ImmutableList.builder();
List<Column> columnsList = metadataService.getColumns(schemaName, tableName, true);
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);
this.tableColumnsMap.put(schemaTableName, columnsList);
this.metadataCache.cacheTableColumns(schemaTableName, columnsList);
for (int i = 0; i < columnsList.size(); i++) {
Column c = columnsList.get(i);
Type trinoType = typeParser.parseTrinoType(c.getType());
Expand All @@ -139,7 +143,7 @@ public List<PixelsColumnHandle> getTableColumn(String connectorId, String schema

public List<Column> getColumnStatistics(String schemaName, String tableName)
{
return this.tableColumnsMap.get(new SchemaTableName(schemaName, tableName));
return this.metadataCache.getTableColumns(new SchemaTableName(schemaName, tableName));
}

public List<Layout> getDataLayouts (String schemaName, String tableName) throws MetadataException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,9 @@ public void test1()
timestamp.setTime(time/1000);
timestamp.setNanos((int) (time%1000000*1000));
System.out.println(timestamp);

String a = "a|b|c";
String[] split = a.split("\\|");
System.out.println(split.length);
}
}