Skip to content

Commit

Permalink
Clean up and fix compile
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Nov 2, 2020
1 parent 324424c commit d39fcae
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 140 deletions.
17 changes: 17 additions & 0 deletions pubsublite/internal/wire/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Wire

This directory contains internal implementation details for Cloud Pub/Sub Lite.
Its exported interface can change at any time.

## Conventions

The following are general conventions used in this package:

* Capitalized methods and fields of a struct denotes its public interface. They
are safe to call from outside the struct (e.g. accesses immutable fields or
guarded by a mutex). All other methods are considered internal implementation
details that should not be called from outside the struct.
* unsafeFoo() methods indicate that the caller is expected to have already
acquired the struct's mutex. Since Go does not support re-entrant locks, they
do not acquire the mutex. These are typically common util methods that need
to be atomic with other operations.
52 changes: 0 additions & 52 deletions pubsublite/internal/wire/message_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,10 @@ package wire

import (
"crypto/sha256"
"fmt"
"math/big"
"math/rand"
"time"

"github.com/golang/protobuf/ptypes"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AttributeValues is a slice of strings.
type AttributeValues [][]byte

// Message represents a Pub/Sub message.
type Message struct {
// Data is the actual data in the message.
Data []byte

// Attributes can be used to label the message. A key may have multiple
// values.
Attributes map[string]AttributeValues

// EventTime is an optional, user-specified event time for this message.
EventTime time.Time

// OrderingKey identifies related messages for which publish order should
// be respected. Messages with the same ordering key are published to the
// same topic partition and subscribers will receive the messages in order.
// If the ordering key is empty, the message will be sent to an arbitrary
// partition.
OrderingKey []byte
}

func (m *Message) toProto() (*pb.PubSubMessage, error) {
msgpb := &pb.PubSubMessage{
Data: m.Data,
Key: m.OrderingKey,
}

if len(m.Attributes) > 0 {
msgpb.Attributes = make(map[string]*pb.AttributeValues)
for key, values := range m.Attributes {
msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
}
}

if !m.EventTime.IsZero() {
ts, err := ptypes.TimestampProto(m.EventTime)
if err != nil {
return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
}
msgpb.EventTime = ts
}
return msgpb, nil
}

// messageRouter outputs a partition number, given an ordering key. Results are
// undefined when:
// - setPartitionCount() is called with count <= 0.
Expand Down
63 changes: 0 additions & 63 deletions pubsublite/internal/wire/message_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/golang/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

type fakeSource struct {
Expand All @@ -45,63 +39,6 @@ func (f *fakeMsgRouter) Route(orderingKey []byte) int {
return f.partitionCount * f.multiplier
}

func TestMessageToProto(t *testing.T) {
for _, tc := range []struct {
desc string
msg *Message
want *pb.PubSubMessage
}{
{
desc: "valid: minimal",
msg: &Message{
Data: []byte("Hello world"),
},
want: &pb.PubSubMessage{
Data: []byte("Hello world"),
},
},
{
desc: "valid: filled",
msg: &Message{
Data: []byte("foo"),
Attributes: map[string]AttributeValues{
"attr1": [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
EventTime: time.Unix(1555593697, 154358*1000),
OrderingKey: []byte("order"),
},
want: &pb.PubSubMessage{
Data: []byte("foo"),
Attributes: map[string]*pb.AttributeValues{
"attr1": {
Values: [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
},
EventTime: &tspb.Timestamp{
Seconds: 1555593697,
Nanos: 154358 * 1000,
},
Key: []byte("order"),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, err := tc.msg.toProto()
if err != nil {
t.Errorf("toProto() err = %v", err)
} else if !proto.Equal(got, tc.want) {
t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
}
})
}
}

func TestRoundRobinMsgRouter(t *testing.T) {
// Using the same msgRouter for each test run ensures that it reinitializes
// when the partition count changes.
Expand Down
24 changes: 24 additions & 0 deletions pubsublite/internal/wire/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

type topicPartition struct {
Path string
Partition int
}

type subscriptionPartition struct {
Path string
Partition int
}
25 changes: 5 additions & 20 deletions pubsublite/internal/wire/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,59 +117,44 @@ func defaultClientOptions(region string) []option.ClientOption {
}
}

// NewAdminClient creates a new gapic AdminClient.
// NewAdminClient creates a new gapic AdminClient for a region.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewAdminClient(ctx, options...)
}

func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPublisherClient(ctx, options...)
}

func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewSubscriberClient(ctx, options...)
}

func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewCursorClient(ctx, options...)
}

func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPartitionAssignmentClient(ctx, options...)
}

func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context {
func addTopicRoutingMetadata(ctx context.Context, topic topicPartition) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String()))
val := fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context {
func addSubscriptionRoutingMetadata(ctx context.Context, subscription subscriptionPartition) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String()))
val := fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}
10 changes: 5 additions & 5 deletions pubsublite/internal/wire/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (rs *retryableStream) currentStream() grpc.ClientStream {
return rs.stream
}

// clearStream must be called with the retryableStream.mu locked.
func (rs *retryableStream) clearStream() {
// unsafeClearStream must be called with the retryableStream.mu locked.
func (rs *retryableStream) unsafeClearStream() {
if rs.cancelStream != nil {
// If the stream did not already abort due to error, this will abort it.
rs.cancelStream()
Expand Down Expand Up @@ -210,7 +210,7 @@ func (rs *retryableStream) reconnect() {
return false
}
rs.status = streamReconnecting
rs.clearStream()
rs.unsafeClearStream()
return true
}
if !canReconnect() {
Expand All @@ -229,7 +229,7 @@ func (rs *retryableStream) reconnect() {
defer rs.mu.Unlock()

if rs.status == streamTerminated {
rs.clearStream()
rs.unsafeClearStream()
return false
}
rs.status = streamConnected
Expand Down Expand Up @@ -330,7 +330,7 @@ func (rs *retryableStream) terminate(err error) {
}
rs.status = streamTerminated
rs.finalErr = err
rs.clearStream()
rs.unsafeClearStream()

// terminate can be called from within a streamHandler method with a lock
// held. So notify from a goroutine to prevent deadlock.
Expand Down

0 comments on commit d39fcae

Please sign in to comment.