From 3812334bc6581e053ef9a3be62f4d89fc9ed5dde Mon Sep 17 00:00:00 2001 From: Michael Jiang Date: Thu, 28 Jan 2021 23:57:06 -0500 Subject: [PATCH 1/2] update --- .../cloud/pubsublite/spark/PslSparkUtils.java | 4 +-- .../pubsublite/spark/PslSparkUtilsTest.java | 34 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java index 4487ce9a..1d54fe19 100644 --- a/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java +++ b/src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java @@ -75,9 +75,9 @@ public static InternalRow toInternalRow( msg.offset().value(), ByteArray.concat(msg.message().key().toByteArray()), ByteArray.concat(msg.message().data().toByteArray()), - Timestamps.toMillis(msg.publishTime()), + Timestamps.toMicros(msg.publishTime()), msg.message().eventTime().isPresent() - ? Timestamps.toMillis(msg.message().eventTime().get()) + ? Timestamps.toMicros(msg.message().eventTime().get()) : null, convertAttributesToSparkMap(msg.message().attributes()))); return InternalRow.apply(asScalaBufferConverter(list).asScala()); diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java index 5dbdf65e..634353a5 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -27,17 +27,27 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import org.apache.orc.storage.ql.util.TimestampUtils; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.DataTypes; import org.junit.Test; +import java.nio.charset.StandardCharsets; + public class PslSparkUtilsTest { @Test public void testToInternalRow() { + Timestamp publishTimestamp = Timestamp.newBuilder().setSeconds(20000000L).setNanos(20).build(); + Timestamp eventTimestamp = Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(); Message message = Message.builder() .setKey(ByteString.copyFromUtf8("key")) .setData(ByteString.copyFromUtf8("data")) - .setEventTime(Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build()) + .setEventTime(eventTimestamp) .setAttributes( ImmutableListMultimap.of( "key1", ByteString.copyFromUtf8("val1"), @@ -47,13 +57,29 @@ public void testToInternalRow() { SequencedMessage sequencedMessage = SequencedMessage.of( message, - Timestamp.newBuilder().setSeconds(10000000L).setNanos(10).build(), - Offset.of(10L), + publishTimestamp, + UnitTestExamples.exampleOffset(), 10L); - PslSparkUtils.toInternalRow( + InternalRow row = PslSparkUtils.toInternalRow( sequencedMessage, UnitTestExamples.exampleSubscriptionPath(), UnitTestExamples.examplePartition()); + assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); + assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); + assertThat(row.getLong(2)).isEqualTo(UnitTestExamples.exampleOffset().value()); + assertThat(row.getBinary(3)).isEqualTo("key".getBytes(StandardCharsets.UTF_8)); + assertThat(row.getBinary(4)).isEqualTo("data".getBytes(StandardCharsets.UTF_8)); + assertThat(row.getLong(5)).isEqualTo(Timestamps.toMicros(publishTimestamp)); + assertThat(row.getLong(6)).isEqualTo(Timestamps.toMicros(eventTimestamp)); + ArrayData keys = row.getMap(7).keyArray(); + ArrayData values = row.getMap(7).valueArray(); + assertThat(keys.get(0, DataTypes.StringType).toString()).isEqualTo("key1"); + assertThat(keys.get(1, DataTypes.StringType).toString()).isEqualTo("key2"); + GenericArrayData valueOfKey1 = (GenericArrayData) values.get(0, DataTypes.createArrayType(DataTypes.BinaryType)); + GenericArrayData valueOfKey2 = (GenericArrayData) values.get(1, DataTypes.createArrayType(DataTypes.BinaryType)); + assertThat(valueOfKey1.getBinary(0)).isEqualTo("val1".getBytes(StandardCharsets.UTF_8)); + assertThat(valueOfKey1.getBinary(1)).isEqualTo("val2".getBytes(StandardCharsets.UTF_8)); + assertThat(valueOfKey2.getBinary(0)).isEqualTo("val3".getBytes(StandardCharsets.UTF_8)); } @Test From c1318b27049d970a7fec0d8af479f060fd1158d5 Mon Sep 17 00:00:00 2001 From: Michael Jiang Date: Thu, 28 Jan 2021 23:59:50 -0500 Subject: [PATCH 2/2] update --- .../pubsublite/spark/PslSparkUtilsTest.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java index 634353a5..b3b81246 100644 --- a/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java +++ b/src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java @@ -28,15 +28,13 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; -import org.apache.orc.storage.ql.util.TimestampUtils; +import java.nio.charset.StandardCharsets; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.sql.types.DataTypes; import org.junit.Test; -import java.nio.charset.StandardCharsets; - public class PslSparkUtilsTest { @Test @@ -55,15 +53,12 @@ public void testToInternalRow() { "key2", ByteString.copyFromUtf8("val3"))) .build(); SequencedMessage sequencedMessage = - SequencedMessage.of( - message, - publishTimestamp, - UnitTestExamples.exampleOffset(), - 10L); - InternalRow row = PslSparkUtils.toInternalRow( - sequencedMessage, - UnitTestExamples.exampleSubscriptionPath(), - UnitTestExamples.examplePartition()); + SequencedMessage.of(message, publishTimestamp, UnitTestExamples.exampleOffset(), 10L); + InternalRow row = + PslSparkUtils.toInternalRow( + sequencedMessage, + UnitTestExamples.exampleSubscriptionPath(), + UnitTestExamples.examplePartition()); assertThat(row.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString()); assertThat(row.getLong(1)).isEqualTo(UnitTestExamples.examplePartition().value()); assertThat(row.getLong(2)).isEqualTo(UnitTestExamples.exampleOffset().value()); @@ -72,11 +67,13 @@ public void testToInternalRow() { assertThat(row.getLong(5)).isEqualTo(Timestamps.toMicros(publishTimestamp)); assertThat(row.getLong(6)).isEqualTo(Timestamps.toMicros(eventTimestamp)); ArrayData keys = row.getMap(7).keyArray(); - ArrayData values = row.getMap(7).valueArray(); + ArrayData values = row.getMap(7).valueArray(); assertThat(keys.get(0, DataTypes.StringType).toString()).isEqualTo("key1"); assertThat(keys.get(1, DataTypes.StringType).toString()).isEqualTo("key2"); - GenericArrayData valueOfKey1 = (GenericArrayData) values.get(0, DataTypes.createArrayType(DataTypes.BinaryType)); - GenericArrayData valueOfKey2 = (GenericArrayData) values.get(1, DataTypes.createArrayType(DataTypes.BinaryType)); + GenericArrayData valueOfKey1 = + (GenericArrayData) values.get(0, DataTypes.createArrayType(DataTypes.BinaryType)); + GenericArrayData valueOfKey2 = + (GenericArrayData) values.get(1, DataTypes.createArrayType(DataTypes.BinaryType)); assertThat(valueOfKey1.getBinary(0)).isEqualTo("val1".getBytes(StandardCharsets.UTF_8)); assertThat(valueOfKey1.getBinary(1)).isEqualTo("val2".getBytes(StandardCharsets.UTF_8)); assertThat(valueOfKey2.getBinary(0)).isEqualTo("val3".getBytes(StandardCharsets.UTF_8));