Skip to content

Commit

Permalink
Update joda-time to 2.10.0
Browse files Browse the repository at this point in the history
Get America/Nuuk alike timezone. This version is required by trino.

See also https://twitter.com/raphaelschaad/status/1488584437197586432.

Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Jul 21, 2022
1 parent 2605a5f commit 848e66f
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ flexible messaging model and an intuitive client API.</description>
<rabbitmq-client.version>5.5.3</rabbitmq-client.version>
<aws-sdk.version>1.11.774</aws-sdk.version>
<avro.version>1.10.2</avro.version>
<joda.version>2.10.5</joda.version>
<joda.version>2.10.10</joda.version>
<jclouds.version>2.5.0</jclouds.version>
<guice.version>5.1.0</guice.version>
<sqlite-jdbc.version>3.36.0.3</sqlite-jdbc.version>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ The Apache Software License, Version 2.0
- jetcd-core-0.5.11.jar

* Joda Time
- joda-time-2.10.5.jar
- joda-time-2.10.10.jar
- failsafe-2.4.4.jar
* Jetty
- http2-client-9.4.48.v20220622.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.Utils;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -386,15 +385,13 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
if (!range.isHighUnbounded()) {
Block block = Utils.nativeValueToBlock(range.getType(), range.getHighBoundedValue());
// Trino timestamp payload is in micros
long millis = block.getLong(0, 0) / 1000;
upperBoundTs = new Timestamp(millis).getTime();
upperBoundTs = block.getLong(0, 0) / 1000;
}

if (!range.isLowUnbounded()) {
Block block = Utils.nativeValueToBlock(range.getType(), range.getLowBoundedValue());
// Trino timestamp payload is in micros
long millis = block.getLong(0, 0) / 1000;
lowerBoundTs = new Timestamp(millis).getTime();
lowerBoundTs = block.getLong(0, 0) / 1000;
}

PositionImpl overallStartPos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@

public abstract class TestPulsarConnector {

protected static final long currentTimeMs = 1534806330000L;
protected static final long currentTimeMicros = 1534806330000000L;

protected PulsarConnectorConfig pulsarConnectorConfig;

Expand Down Expand Up @@ -405,7 +405,7 @@ private static List<Entry> getTopicEntries(String topicSchemaName) {

MessageMetadata messageMetadata = new MessageMetadata()
.setProducerName("test-producer").setSequenceId(i)
.setPublishTime(currentTimeMs + i);
.setPublishTime(currentTimeMicros / 1000 + i);

Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ public void testPublishTimePredicatePushdown(String delimiter) throws Exception


Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMicros + 1000L, true,
currentTimeMicros + 50000L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);

Expand Down Expand Up @@ -258,8 +258,8 @@ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) t


Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMicros + 1000L, true,
currentTimeMicros + 50000L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,10 @@ public void testPrimitiveType() {
Map<DecoderColumnHandle, FieldValueProvider> decodedRowTimestamp =
pulsarRowDecoderTimestamp.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaTimestamp.encode(timestampValue))).get();
long timestampValueInMicros = timestampValue.getTime() * 1000;
checkValue(decodedRowTimestamp, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, TIMESTAMP, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), timestampValue.getTime());
PulsarColumnHandle.HandleKeyValueType.NONE), timestampValueInMicros);

}

Expand Down

0 comments on commit 848e66f

Please # to comment.