@@ -43,7 +43,6 @@ import (
43
43
"google.golang.org/grpc/internal"
44
44
"google.golang.org/grpc/internal/binarylog"
45
45
"google.golang.org/grpc/internal/channelz"
46
- "google.golang.org/grpc/internal/grpcrand"
47
46
"google.golang.org/grpc/internal/grpcsync"
48
47
"google.golang.org/grpc/internal/grpcutil"
49
48
"google.golang.org/grpc/internal/transport"
@@ -146,7 +145,7 @@ type Server struct {
146
145
channelzID * channelz.Identifier
147
146
czData * channelzData
148
147
149
- serverWorkerChannels [] chan * serverWorkerData
148
+ serverWorkerChannel chan * serverWorkerData
150
149
}
151
150
152
151
type serverOptions struct {
@@ -561,40 +560,38 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
561
560
const serverWorkerResetThreshold = 1 << 16
562
561
563
562
// serverWorkers blocks on a *transport.Stream channel forever and waits for
564
- // data to be fed by serveStreams. This allows different requests to be
563
+ // data to be fed by serveStreams. This allows multiple requests to be
565
564
// processed by the same goroutine, removing the need for expensive stack
566
565
// re-allocations (see the runtime.morestack problem [1]).
567
566
//
568
567
// [1] https://github.com/golang/go/issues/18138
569
- func (s * Server ) serverWorker (ch chan * serverWorkerData ) {
570
- // To make sure all server workers don't reset at the same time, choose a
571
- // random number of iterations before resetting.
572
- threshold := serverWorkerResetThreshold + grpcrand .Intn (serverWorkerResetThreshold )
573
- for completed := 0 ; completed < threshold ; completed ++ {
574
- data , ok := <- ch
568
+ func (s * Server ) serverWorker () {
569
+ for completed := 0 ; completed < serverWorkerResetThreshold ; completed ++ {
570
+ data , ok := <- s .serverWorkerChannel
575
571
if ! ok {
576
572
return
577
573
}
578
- s .handleStream (data .st , data .stream , s .traceInfo (data .st , data .stream ))
579
- data .wg .Done ()
574
+ s .handleSingleStream (data )
580
575
}
581
- go s .serverWorker (ch )
576
+ go s .serverWorker ()
582
577
}
583
578
584
- // initServerWorkers creates worker goroutines and channels to process incoming
579
+ func (s * Server ) handleSingleStream (data * serverWorkerData ) {
580
+ defer data .wg .Done ()
581
+ s .handleStream (data .st , data .stream , s .traceInfo (data .st , data .stream ))
582
+ }
583
+
584
+ // initServerWorkers creates worker goroutines and a channel to process incoming
585
585
// connections to reduce the time spent overall on runtime.morestack.
586
586
func (s * Server ) initServerWorkers () {
587
- s .serverWorkerChannels = make ([] chan * serverWorkerData , s . opts . numServerWorkers )
587
+ s .serverWorkerChannel = make (chan * serverWorkerData )
588
588
for i := uint32 (0 ); i < s .opts .numServerWorkers ; i ++ {
589
- s .serverWorkerChannels [i ] = make (chan * serverWorkerData )
590
- go s .serverWorker (s .serverWorkerChannels [i ])
589
+ go s .serverWorker ()
591
590
}
592
591
}
593
592
594
593
func (s * Server ) stopServerWorkers () {
595
- for i := uint32 (0 ); i < s .opts .numServerWorkers ; i ++ {
596
- close (s .serverWorkerChannels [i ])
597
- }
594
+ close (s .serverWorkerChannel )
598
595
}
599
596
600
597
// NewServer creates a gRPC server which has no service registered and has not
@@ -946,26 +943,21 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
946
943
defer st .Close (errors .New ("finished serving streams for the server transport" ))
947
944
var wg sync.WaitGroup
948
945
949
- var roundRobinCounter uint32
950
946
st .HandleStreams (func (stream * transport.Stream ) {
951
947
wg .Add (1 )
952
948
if s .opts .numServerWorkers > 0 {
953
949
data := & serverWorkerData {st : st , wg : & wg , stream : stream }
954
950
select {
955
- case s .serverWorkerChannels [atomic .AddUint32 (& roundRobinCounter , 1 )% s .opts .numServerWorkers ] <- data :
951
+ case s .serverWorkerChannel <- data :
952
+ return
956
953
default :
957
954
// If all stream workers are busy, fallback to the default code path.
958
- go func () {
959
- s .handleStream (st , stream , s .traceInfo (st , stream ))
960
- wg .Done ()
961
- }()
962
955
}
963
- } else {
964
- go func () {
965
- defer wg .Done ()
966
- s .handleStream (st , stream , s .traceInfo (st , stream ))
967
- }()
968
956
}
957
+ go func () {
958
+ defer wg .Done ()
959
+ s .handleStream (st , stream , s .traceInfo (st , stream ))
960
+ }()
969
961
}, func (ctx context.Context , method string ) context.Context {
970
962
if ! EnableTracing {
971
963
return ctx
0 commit comments