From ed00367d3d73c867d45d4d3cd1aa8e5fa9b4ef2d Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 4 Apr 2024 14:14:52 -0700 Subject: [PATCH 1/2] feat(pubsub): support publisher compression --- pubsub/integration_test.go | 26 ++++++++++++++++++++++++++ pubsub/topic.go | 32 +++++++++++++++++++++++++++----- pubsub/topic_test.go | 23 +++++++++++++++++++++++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 0dad93f14a46..6fd5a12cf6c6 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -2180,6 +2180,32 @@ func TestIntegration_DetectProjectID(t *testing.T) { } } +func TestIntegration_PublishCompression(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t) + defer client.Close() + + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + + topic.PublishSettings.EnableCompression = true + topic.PublishSettings.CompressionBytesThreshold = 50 + + const messageSizeBytes = 1000 + + msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))} + res := topic.Publish(ctx, msg) + + _, err = res.Get(ctx) + if err != nil { + t.Errorf("publish result got err: %v", err) + } +} + // createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error. func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) { var topic *Topic diff --git a/pubsub/topic.go b/pubsub/topic.go index b85b4b1c342a..5e5819154b70 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -36,6 +36,7 @@ import ( "google.golang.org/api/support/bundler" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -117,6 +118,13 @@ type PublishSettings struct { // FlowControlSettings defines publisher flow control settings. FlowControlSettings FlowControlSettings + + // EnableCompression enables transport compression for Publish operations + EnableCompression bool + + // CompressionBytesThreshold defines the threshold (in bytes) above which messages + // are compressed for transport. Only takes effect if EnableCompression is true. + CompressionBytesThreshold int } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -134,6 +142,8 @@ var DefaultPublishSettings = PublishSettings{ MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, + EnableCompression: false, + CompressionBytesThreshold: 240, } // CreateTopic creates a new topic. @@ -875,6 +885,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) } pbMsgs := make([]*pb.PubsubMessage, len(bms)) var orderingKey string + batchSize := 0 for i, bm := range bms { orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ @@ -882,6 +893,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } + batchSize = batchSize + proto.Size(pbMsgs[i]) bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -897,11 +909,21 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) opt.Resolve(&settings) } r := &publishRetryer{defaultRetryer: settings.Retry()} - res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ - Topic: t.name, - Messages: pbMsgs, - }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), - gax.WithRetry(func() gax.Retryer { return r })) + if t.PublishSettings.EnableCompression && batchSize > t.PublishSettings.CompressionBytesThreshold { + res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ + Topic: t.name, + Messages: pbMsgs, + }, gax.WithGRPCOptions( + grpc.UseCompressor(gzip.Name), + grpc.MaxCallSendMsgSize(maxSendRecvBytes)), + gax.WithRetry(func() gax.Retryer { return r })) + } else { + res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ + Topic: t.name, + Messages: pbMsgs, + }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), + gax.WithRetry(func() gax.Retryer { return r })) + } } end := time.Now() if err != nil { diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index eb53bef5f370..974e0c487d65 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -747,3 +747,26 @@ func TestPublishOrderingNotEnabled(t *testing.T) { t.Errorf("got %v, want errTopicOrderingNotEnabled", err) } } + +func TestPublishCompression(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "topic-compression") + defer topic.Stop() + + topic.PublishSettings.EnableCompression = true + topic.PublishSettings.CompressionBytesThreshold = 50 + + const messageSizeBytes = 1000 + + msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))} + res := topic.Publish(ctx, msg) + + _, err := res.Get(ctx) + if err != nil { + t.Errorf("publish result got err: %v", err) + } +} From c107b6e6052aad197d81f8eb0a39feca5634b0bc Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 26 Apr 2024 15:54:32 -0700 Subject: [PATCH 2/2] address review comments --- pubsub/topic.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index 5e5819154b70..b953cb4d1bcb 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -127,6 +127,10 @@ type PublishSettings struct { CompressionBytesThreshold int } +func (ps *PublishSettings) shouldCompress(batchSize int) bool { + return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold +} + // DefaultPublishSettings holds the default values for topics' PublishSettings. var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, @@ -142,6 +146,8 @@ var DefaultPublishSettings = PublishSettings{ MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, + // Publisher compression defaults matches Java's defaults + // https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718 EnableCompression: false, CompressionBytesThreshold: 240, } @@ -909,21 +915,17 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) opt.Resolve(&settings) } r := &publishRetryer{defaultRetryer: settings.Retry()} - if t.PublishSettings.EnableCompression && batchSize > t.PublishSettings.CompressionBytesThreshold { - res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ - Topic: t.name, - Messages: pbMsgs, - }, gax.WithGRPCOptions( - grpc.UseCompressor(gzip.Name), - grpc.MaxCallSendMsgSize(maxSendRecvBytes)), - gax.WithRetry(func() gax.Retryer { return r })) - } else { - res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ - Topic: t.name, - Messages: pbMsgs, - }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), - gax.WithRetry(func() gax.Retryer { return r })) + gaxOpts := []gax.CallOption{ + gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), + gax.WithRetry(func() gax.Retryer { return r }), + } + if t.PublishSettings.shouldCompress(batchSize) { + gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) } + res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ + Topic: t.name, + Messages: pbMsgs, + }, gaxOpts...) } end := time.Now() if err != nil {