Skip to content

Commit

Permalink
examples: add an example of flow control behavior (#6648)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Sep 27, 2023
1 parent ee4b62c commit c6264a9
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 0 deletions.
3 changes: 3 additions & 0 deletions examples/examples_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ EXAMPLES=(
"features/encryption/TLS"
"features/error_details"
"features/error_handling"
"features/flow_control"
"features/interceptor"
"features/load_balancing"
"features/metadata"
Expand Down Expand Up @@ -112,6 +113,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
["features/encryption/TLS"]=""
["features/error_details"]=""
["features/error_handling"]=""
["features/flow_control"]="Stream ended successfully."
["features/interceptor"]="unary echoing message \"hello world\""
["features/load_balancing"]="serving on :50051"
["features/metadata"]="message:\"this is examples/metadata\", sending echo"
Expand All @@ -134,6 +136,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
["features/encryption/TLS"]="UnaryEcho: hello world"
["features/error_details"]="Greeting: Hello world"
["features/error_handling"]="Received error"
["features/flow_control"]="Stream ended successfully."
["features/interceptor"]="UnaryEcho: hello world"
["features/load_balancing"]="calling helloworld.Greeter/SayHello with pick_first"
["features/metadata"]="this is examples/metadata"
Expand Down
55 changes: 55 additions & 0 deletions examples/features/flow_control/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Flow Control

Flow control is a feature in gRPC that prevents senders from writing more data
on a stream than a receiver is capable of handling. This feature behaves the
same for both clients and servers. Because gRPC-Go uses a blocking-style API
for stream operations, flow control pushback is implemented by simply blocking
send operations on a stream when that stream's flow control limits have been
reached. When the receiver has read enough data from the stream, the send
operation will unblock automatically. Flow control is configured automatically
based on a connection's Bandwidth Delay Product (BDP) to ensure the buffer is
the minimum size necessary to allow for maximum throughput on the stream if the
receiver is reading at its maximum speed.

## Try it

```
go run ./server
```

```
go run ./client
```

## Example explanation

The example client and server are written to demonstrate the blocking by
intentionally sending messages while the other side is not receiving. The
bidirectional echo stream in the example begins by having the client send
messages until it detects it has blocked (utilizing another goroutine). The
server sleeps for 2 seconds to allow this to occur. Then the server will read
all of these messages, and the roles of the client and server are swapped so the
server attempts to send continuously while the client sleeps. After the client
sleeps for 2 seconds, it will read again to unblock the server. The server will
detect that it has blocked, and end the stream once it has unblocked.

### Expected Output

The client output should look like:
```
2023/09/19 15:49:49 New stream began.
2023/09/19 15:49:50 Sending is blocked.
2023/09/19 15:49:51 Sent 25 messages.
2023/09/19 15:49:53 Read 25 messages.
2023/09/19 15:49:53 Stream ended successfully.
```

while the server should output the following logs:

```
2023/09/19 15:49:49 New stream began.
2023/09/19 15:49:51 Read 25 messages.
2023/09/19 15:49:52 Sending is blocked.
2023/09/19 15:49:53 Sent 25 messages.
2023/09/19 15:49:53 Stream ended successfully.
```
102 changes: 102 additions & 0 deletions examples/features/flow_control/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"io"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/internal/grpcsync"
)

var addr = flag.String("addr", "localhost:50052", "the address to connect to")

var payload string = string(make([]byte, 8*1024)) // 8KB

func main() {
flag.Parse()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("Error creating stream: %v", err)
}
log.Printf("New stream began.")

// First we will send data on the stream until we cannot send any more. We
// detect this by not seeing a message sent 1s after the last sent message.
stopSending := grpcsync.NewEvent()
sentOne := make(chan struct{})
go func() {
i := 0
for !stopSending.HasFired() {
i++
if err := stream.Send(&pb.EchoRequest{Message: payload}); err != nil {
log.Fatalf("Error sending data: %v", err)
}
sentOne <- struct{}{}
}
log.Printf("Sent %v messages.", i)
stream.CloseSend()
}()

for !stopSending.HasFired() {
after := time.NewTimer(time.Second)
select {
case <-sentOne:
after.Stop()
case <-after.C:
log.Printf("Sending is blocked.")
stopSending.Fire()
<-sentOne
}
}

// Next, we wait 2 seconds before reading from the stream, to give the
// server an opportunity to block while sending its responses.
time.Sleep(2 * time.Second)

// Finally, read all the data sent by the server to allow it to unblock.
for i := 0; true; i++ {
if _, err := stream.Recv(); err != nil {
log.Printf("Read %v messages.", i)
if err == io.EOF {
log.Printf("Stream ended successfully.")
return
}
log.Fatalf("Error receiving data: %v", err)
}
}
}
110 changes: 110 additions & 0 deletions examples/features/flow_control/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"flag"
"fmt"
"io"
"log"
"net"
"time"

"google.golang.org/grpc"

pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/internal/grpcsync"
)

var port = flag.Int("port", 50052, "port number")

var payload string = string(make([]byte, 8*1024)) // 8KB

// server is used to implement EchoServer.
type server struct {
pb.UnimplementedEchoServer
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
log.Printf("New stream began.")
// First, we wait 2 seconds before reading from the stream, to give the
// client an opportunity to block while sending its requests.
time.Sleep(2 * time.Second)

// Next, read all the data sent by the client to allow it to unblock.
for i := 0; true; i++ {
if _, err := stream.Recv(); err != nil {
log.Printf("Read %v messages.", i)
if err == io.EOF {
break
}
log.Printf("Error receiving data: %v", err)
return err
}
}

// Finally, send data until we block, then end the stream after we unblock.
stopSending := grpcsync.NewEvent()
sentOne := make(chan struct{})
go func() {
for !stopSending.HasFired() {
after := time.NewTimer(time.Second)
select {
case <-sentOne:
after.Stop()
case <-after.C:
log.Printf("Sending is blocked.")
stopSending.Fire()
<-sentOne
}
}
}()

i := 0
for !stopSending.HasFired() {
i++
if err := stream.Send(&pb.EchoResponse{Message: payload}); err != nil {
log.Printf("Error sending data: %v", err)
return err
}
sentOne <- struct{}{}
}
log.Printf("Sent %v messages.", i)

log.Printf("Stream ended successfully.")
return nil
}

func main() {
flag.Parse()

address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()
pb.RegisterEchoServer(grpcServer, &server{})

if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

0 comments on commit c6264a9

Please # to comment.