From 199fb03c223a4f852bd5f8d1e07b0dbff300e239 Mon Sep 17 00:00:00 2001 From: Julian Hyde Date: Thu, 24 Sep 2015 15:21:14 -0700 Subject: [PATCH 1/3] add module --- contrib/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/contrib/pom.xml b/contrib/pom.xml index 2f3ac9fa6e4..76d8369c71e 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -35,6 +35,7 @@ storage-hbase storage-hive storage-mongo + storage-phoenix storage-jdbc sqlline data From 65485b76ebd6dc7f1e4a8f907072ba9463ccb466 Mon Sep 17 00:00:00 2001 From: Julian Hyde Date: Thu, 24 Sep 2015 18:08:14 -0700 Subject: [PATCH 2/3] phoenix record reader --- .../store/phoenix/PhoenixRecordReader.java | 298 ++++++++---------- 1 file changed, 133 insertions(+), 165 deletions(-) diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java index 020d35aafba..7da795cd3dd 100755 --- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java +++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java @@ -17,19 +17,6 @@ */ package org.apache.drill.exec.store.phoenix; -import java.math.BigDecimal; -import java.sql.Connection; -import java.sql.Date; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Time; -import java.sql.Timestamp; - -import javax.sql.DataSource; - -import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -54,7 +41,17 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.base.Charsets; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.ValueSchema; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SQLCloseable; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -64,23 +61,22 @@ class PhoenixRecordReader extends AbstractRecordReader { .getLogger(PhoenixRecordReader.class); private static final ImmutableMap JDBC_TYPE_MAPPINGS; - private final DataSource source; - private ResultSet resultSet; + private final String storagePluginName; - private FragmentContext fragmentContext; - private Connection connection; - private Statement statement; + private final KeyValueSchema kvSchema; + private final ResultIterator result; private final String name; private ImmutableList vectors; private ImmutableList> copiers; - private OperatorContext operatorContext; + // workspace for each row + private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - public PhoenixRecordReader(FragmentContext fragmentContext, DataSource source, String name, String storagePluginName) { - this.fragmentContext = fragmentContext; - this.source = source; + public PhoenixRecordReader(FragmentContext fragmentContext, ResultIterator result, KeyValueSchema kvSchema, String name, String storagePluginName) { + this.result = result; this.name = name; this.storagePluginName = storagePluginName; + this.kvSchema = kvSchema; } static { @@ -116,28 +112,27 @@ public PhoenixRecordReader(FragmentContext fragmentContext, DataSource source, S .build(); } - private Copier getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) { - + private Copier getCopier(int offset, ValueVector v) { if (v instanceof NullableBigIntVector) { - return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator()); + return new BigIntCopier(offset, (NullableBigIntVector.Mutator) v.getMutator()); } else if (v instanceof NullableFloat4Vector) { - return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator()); + return new Float4Copier(offset, (NullableFloat4Vector.Mutator) v.getMutator()); } else if (v instanceof NullableFloat8Vector) { - return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator()); + return new Float8Copier(offset, (NullableFloat8Vector.Mutator) v.getMutator()); } else if (v instanceof NullableIntVector) { - return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator()); + return new IntCopier(offset, (NullableIntVector.Mutator) v.getMutator()); } else if (v instanceof NullableVarCharVector) { - return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator()); + return new VarCharCopier(offset, (NullableVarCharVector.Mutator) v.getMutator()); } else if (v instanceof NullableVarBinaryVector) { - return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator()); + return new VarBinaryCopier(offset, (NullableVarBinaryVector.Mutator) v.getMutator()); } else if (v instanceof NullableDateVector) { - return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator()); + return new DateCopier(offset, (NullableDateVector.Mutator) v.getMutator()); } else if (v instanceof NullableTimeVector) { - return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator()); + return new TimeCopier(offset, (NullableTimeVector.Mutator) v.getMutator()); } else if (v instanceof NullableTimeStampVector) { - return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator()); + return new TimeStampCopier(offset, (NullableTimeStampVector.Mutator) v.getMutator()); } else if (v instanceof NullableBitVector) { - return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator()); + return new BitCopier(offset, (NullableBitVector.Mutator) v.getMutator()); } throw new IllegalArgumentException("Unknown how to handle vector."); @@ -146,29 +141,19 @@ private Copier getCopier(int jdbcType, int offset, ResultSet result, ValueVec @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { try { - - this.operatorContext = operatorContext; - connection = source.getConnection(); - statement = connection.createStatement(); - resultSet = statement.executeQuery("select * from " + name); - - final ResultSetMetaData meta = resultSet.getMetaData(); - final int columns = meta.getColumnCount(); ImmutableList.Builder vectorBuilder = ImmutableList.builder(); ImmutableList.Builder> copierBuilder = ImmutableList.builder(); - for (int i = 1; i <= columns; i++) { - final String name = meta.getColumnLabel(i); - final int jdbcType = meta.getColumnType(i); - final int width = meta.getPrecision(i); - final int scale = meta.getScale(i); - MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType); + int i = 0; + for (ValueSchema.Field phoenixField : kvSchema.getFields()) { + final PDataType phoenixType = phoenixField.getDataType(); + MinorType minorType = JDBC_TYPE_MAPPINGS.get(phoenixType.getSqlType()); if (minorType == null) { throw UserException.dataReadError() - .message("The JDBC storage plugin failed while trying to execute a query. " - + "The JDBC data type %d is not currently supported.", jdbcType) - - .addContext("sql", name) + .message( + "The JDBC storage plugin failed while trying to execute a query. " + + "The JDBC data type %d is not currently supported.", + phoenixType) .addContext("plugin", storagePluginName) .build(logger); } @@ -179,17 +164,16 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws minorType, type.getMode()); ValueVector vector = output.addField(field, clazz); vectorBuilder.add(vector); - copierBuilder.add(getCopier(jdbcType, i, resultSet, vector)); - + copierBuilder.add(getCopier(i++, vector)); } vectors = vectorBuilder.build(); copiers = copierBuilder.build(); - } catch (SQLException | SchemaChangeException e) { + } catch (SchemaChangeException e) { throw UserException.dataReadError(e) .message("The JDBC storage plugin failed while trying setup the SQL query. ") - .addContext("sql", name) + .addContext("name", name) .addContext("plugin", storagePluginName) .build(logger); } @@ -199,20 +183,40 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws @Override public int next() { int counter = 0; - Boolean b = true; try { - while (counter < 4095 && b == true) { // loop at 4095 since nullables use one more than record count and we - // allocate on powers of two. - b = resultSet.next(); - if(b == false) { + final ValueBitSet valueSet = ValueBitSet.newInstance(kvSchema); + for (;;) { + Tuple tuple = result.next(); + if(tuple == null) { + break; + } + + final Cell value = tuple.getValue(0); + ptr.set(value.getValueArray(), + value.getValueOffset(), + value.getValueLength()); + valueSet.clear(); + valueSet.or(ptr); + + final int maxOffset = ptr.getOffset() + ptr.getLength(); + kvSchema.iterator(ptr); + for (int i = 0;; i++) { + final Boolean hasValue = kvSchema.next(ptr, i, maxOffset, valueSet); + if (hasValue == null) { break; + } + final Copier copier = copiers.get(i); + if (hasValue) { + copier.copy(counter); + } } - for (Copier c : copiers) { - c.copy(counter); + if (++counter == 4095) { + // loop at 4095 since nullables use one more than record count and we + // allocate on powers of two. + break; } - counter++; } - } catch (SQLException e) { + } catch (Exception e) { throw UserException .dataReadError(e) .message("Failure while attempting to read from database.") @@ -225,72 +229,77 @@ public int next() { vv.getMutator().setValueCount(counter > 0 ? counter : 0); } - return counter>0 ? counter : 0; + return counter > 0 ? counter : 0; } @Override public void cleanup() { - AutoCloseables.close(resultSet, logger); - AutoCloseables.close(statement, logger); - AutoCloseables.close(connection, logger); + close(result, logger); + } + + /** Similar to {@link org.apache.drill.common.AutoCloseables#close}. */ + public static void close(final SQLCloseable ac, final org.slf4j.Logger logger) { + if (ac == null) { + return; + } + + try { + ac.close(); + } catch(Exception e) { + logger.warn("Failure on close(): {}", e); + } } private abstract class Copier { protected final int columnIndex; - protected final ResultSet result; + protected final PDataType.PDataCodec codec; + protected final SortOrder sortOrder; protected final T mutator; - public Copier(int columnIndex, ResultSet result, T mutator) { + public Copier(int columnIndex, T mutator) { super(); this.columnIndex = columnIndex; - this.result = result; + final ValueSchema.Field field = kvSchema.getField(columnIndex); + this.codec = field.getDataType().getCodec(); this.mutator = mutator; + this.sortOrder = field.getSortOrder(); } - abstract void copy(int index) throws SQLException; + abstract void copy(int index); } private class IntCopier extends Copier { - public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) { - super(offset, set, mutator); + public IntCopier(int offset, NullableIntVector.Mutator mutator) { + super(offset, mutator); } @Override - void copy(int index) throws SQLException { - mutator.setSafe(index, result.getInt(columnIndex)); - if (result.wasNull()) { - mutator.setNull(index); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeInt(ptr, sortOrder)); } } private class BigIntCopier extends Copier { - public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) { - super(offset, set, mutator); + public BigIntCopier(int offset, NullableBigIntVector.Mutator mutator) { + super(offset, mutator); } @Override - void copy(int index) throws SQLException { - mutator.setSafe(index, result.getLong(columnIndex)); - if (result.wasNull()) { - mutator.setNull(index); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeLong(ptr, sortOrder)); } } private class Float4Copier extends Copier { - public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) { - super(columnIndex, result, mutator); + public Float4Copier(int columnIndex, NullableFloat4Vector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - mutator.setSafe(index, result.getFloat(columnIndex)); - if (result.wasNull()) { - mutator.setNull(index); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeFloat(ptr, sortOrder)); } } @@ -298,132 +307,91 @@ void copy(int index) throws SQLException { private class Float8Copier extends Copier { - public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) { - super(columnIndex, result, mutator); + public Float8Copier(int columnIndex, NullableFloat8Vector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - mutator.setSafe(index, result.getDouble(columnIndex)); - if (result.wasNull()) { - mutator.setNull(index); - } - - } - - } - - private class DecimalCopier extends Copier { - - public DecimalCopier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) { - super(columnIndex, result, mutator); - } - - @Override - void copy(int index) throws SQLException { - BigDecimal decimal = result.getBigDecimal(columnIndex); - if (decimal != null) { - mutator.setSafe(index, decimal.doubleValue()); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeDouble(ptr, sortOrder)); } } - + private class VarCharCopier extends Copier { - public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) { - super(columnIndex, result, mutator); + public VarCharCopier(int columnIndex, NullableVarCharVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - String val = resultSet.getString(columnIndex); - if (val != null) { - byte[] record = val.getBytes(Charsets.UTF_8); - mutator.setSafe(index, record, 0, record.length); - } + void copy(int index) { + mutator.setSafe(index, ptr.get(), ptr.getOffset(), ptr.getLength()); } } private class VarBinaryCopier extends Copier { - public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) { - super(columnIndex, result, mutator); + public VarBinaryCopier(int columnIndex, NullableVarBinaryVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - byte[] record = result.getBytes(columnIndex); - if (record != null) { - mutator.setSafe(index, record, 0, record.length); - } + void copy(int index) { + mutator.setSafe(index, ptr.get(), ptr.getOffset(), ptr.getLength()); } } private class DateCopier extends Copier { - public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) { - super(columnIndex, result, mutator); + public DateCopier(int columnIndex, NullableDateVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - Date date = result.getDate(columnIndex); - if (date != null) { - mutator.setSafe(index, date.getTime()); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeLong(ptr, sortOrder)); } } private class TimeCopier extends Copier { - public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) { - super(columnIndex, result, mutator); + public TimeCopier(int columnIndex, NullableTimeVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - Time time = result.getTime(columnIndex); - if (time != null) { - mutator.setSafe(index, (int) time.getTime()); - } - + void copy(int index) { + mutator.setSafe(index, codec.decodeInt(ptr, sortOrder)); } } private class TimeStampCopier extends Copier { - public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) { - super(columnIndex, result, mutator); + public TimeStampCopier(int columnIndex, NullableTimeStampVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - Timestamp stamp = result.getTimestamp(columnIndex); - if (stamp != null) { - mutator.setSafe(index, stamp.getTime()); - } - + void copy(int index) { + mutator.setSafe(index, codec.decodeLong(ptr, sortOrder)); } } private class BitCopier extends Copier { - public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) { - super(columnIndex, result, mutator); + public BitCopier(int columnIndex, NullableBitVector.Mutator mutator) { + super(columnIndex, mutator); } @Override - void copy(int index) throws SQLException { - mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0); - if (result.wasNull()) { - mutator.setNull(index); - } + void copy(int index) { + mutator.setSafe(index, codec.decodeByte(ptr, sortOrder)); } } From 3f994ae5cfd10213ac212f0d5b1aec55bdb4a2a5 Mon Sep 17 00:00:00 2001 From: Julian Hyde Date: Fri, 25 Sep 2015 01:04:23 -0700 Subject: [PATCH 3/3] remove the day part of phoenix time value --- .../apache/drill/exec/store/phoenix/PhoenixRecordReader.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java index 7da795cd3dd..9dde6e49289 100755 --- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java +++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixRecordReader.java @@ -62,6 +62,8 @@ class PhoenixRecordReader extends AbstractRecordReader { private static final ImmutableMap JDBC_TYPE_MAPPINGS; + private static final long MILLIS_IN_DAY = 1000L * 60 * 60 * 24; + private final String storagePluginName; private final KeyValueSchema kvSchema; private final ResultIterator result; @@ -358,14 +360,13 @@ void copy(int index) { } private class TimeCopier extends Copier { - public TimeCopier(int columnIndex, NullableTimeVector.Mutator mutator) { super(columnIndex, mutator); } @Override void copy(int index) { - mutator.setSafe(index, codec.decodeInt(ptr, sortOrder)); + mutator.setSafe(index, (int) (codec.decodeLong(ptr, sortOrder) % MILLIS_IN_DAY)); } }