From ab6a124c3bdbe9b9b9fb715b7b734fbbd1fa9e9e Mon Sep 17 00:00:00 2001 From: jiangmichaellll <40044148+jiangmichaellll@users.noreply.github.com> Date: Fri, 29 Jan 2021 13:16:48 -0500 Subject: [PATCH] feat: Fixes timestamps and more thorough testing (#38) --- .../cloud/pubsublite/spark/PslSparkUtils.java | 4 +- .../pubsublite/spark/PslSparkUtilsTest.java | 43 ++++++++++++++----- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index 4487ce9a..1d54fe19 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -75,9 +75,9 @@ public static InternalRow toInternalRow( msg.offset().value(), ByteArray.concat(msg.message().key().toByteArray()), ByteArray.concat(msg.message().data().toByteArray()), - Timestamps.toMillis(msg.publishTime()), + Timestamps.toMicros(msg.publishTime()), msg.message().eventTime().isPresent() - ? Timestamps.toMillis(msg.message().eventTime().get()) + ? Timestamps.toMicros(msg.message().eventTime().get()) : null, convertAttributesToSparkMap(msg.message().attributes()))); return InternalRow.apply(asScalaBufferConverter(list).asScala()); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java index 5dbdf65e..b3b81246 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -27,17 +27,25 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import java.nio.charset.StandardCharsets; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.DataTypes; import org.junit.Test; public class PslSparkUtilsTest { @Test public void testToInternalRow() { + Timestamp publishTimestamp = Timestamp.newBuilder().setSeconds(20000000L).setNanos(20).build(); + Timestamp eventTimestamp = Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(); Message message = Message.builder() .setKey(ByteString.copyFromUtf8("key")) .setData(ByteString.copyFromUtf8("data")) - .setEventTime(Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build()) + .setEventTime(eventTimestamp) .setAttributes( ImmutableListMultimap.of( "key1", ByteString.copyFromUtf8("val1"), @@ -45,15 +53,30 @@ public void testToInternalRow() { "key2", ByteString.copyFromUtf8("val3"))) .build(); SequencedMessage sequencedMessage = - SequencedMessage.of( - message, - Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(), - Offset.of(10L), - 10L); - PslSparkUtils.toInternalRow( - sequencedMessage, - UnitTestExamples.exampleSubscriptionPath(), - UnitTestExamples.examplePartition()); + SequencedMessage.of(message, publishTimestamp, UnitTestExamples.exampleOffset(), 10L); + InternalRow row = + PslSparkUtils.toInternalRow( + sequencedMessage, + UnitTestExamples.exampleSubscriptionPath(), + UnitTestExamples.examplePartition()); + assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); + assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); + assertThat(row.getLong(2)).isEqualTo(UnitTestExamples.exampleOffset().value()); + assertThat(row.getBinary(3)).isEqualTo("key".getBytes(StandardCharsets.UTF_8)); + assertThat(row.getBinary(4)).isEqualTo("data".getBytes(StandardCharsets.UTF_8)); + assertThat(row.getLong(5)).isEqualTo(Timestamps.toMicros(publishTimestamp)); + assertThat(row.getLong(6)).isEqualTo(Timestamps.toMicros(eventTimestamp)); + ArrayData keys = row.getMap(7).keyArray(); + ArrayData values = row.getMap(7).valueArray(); + assertThat(keys.get(0, DataTypes.StringType).toString()).isEqualTo("key1"); + assertThat(keys.get(1, DataTypes.StringType).toString()).isEqualTo("key2"); + GenericArrayData valueOfKey1 = + (GenericArrayData) values.get(0, DataTypes.createArrayType(DataTypes.BinaryType)); + GenericArrayData valueOfKey2 = + (GenericArrayData) values.get(1, DataTypes.createArrayType(DataTypes.BinaryType)); + assertThat(valueOfKey1.getBinary(0)).isEqualTo("val1".getBytes(StandardCharsets.UTF_8)); + assertThat(valueOfKey1.getBinary(1)).isEqualTo("val2".getBytes(StandardCharsets.UTF_8)); + assertThat(valueOfKey2.getBinary(0)).isEqualTo("val3".getBytes(StandardCharsets.UTF_8)); } @Test