Skip to content

Commit

Permalink
feat: set crrst flag on response header to ensure kitex client won't …
Browse files Browse the repository at this point in the history
…reuse bad connections
  • Loading branch information
jayantxie committed Jan 24, 2025
1 parent 98e1162 commit 0b18be0
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions pkg/remote/trans/default_server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
}
Expand All @@ -203,7 +203,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
var methodInfo serviceinfo.MethodInfo
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
// it won't be error, because the method has been checked in decode, err check here just do defensive inspection
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded,
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
Expand All @@ -219,7 +219,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
// error cannot be wrapped to print here, so it must exec before NewTransError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
Expand Down Expand Up @@ -288,7 +288,7 @@ func (t *svrTransHandler) SetPipeline(p *remote.TransPipeline) {
}

func (t *svrTransHandler) writeErrorReplyIfNeeded(
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool,
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool, connReset bool,

Check failure on line 291 in pkg/remote/trans/default_server_handler.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (gofumpt)
) (shouldCloseConn bool) {
if cn, ok := conn.(remote.IsActive); ok && !cn.IsActive() {
// conn is closed, no need reply
Expand All @@ -313,6 +313,13 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(
// if error happen before normal OnMessage, exec it to transfer header trans info into rpcinfo
t.transPipe.OnMessage(ctx, recvMsg, errMsg)
}
if connReset {
// if connection needs to be closed, set ConnResetTag to response header
// to ensure the client won't reuse the connection.
if ei := rpcinfo.AsTaggable(ri.To()); ei != nil {
ei.SetTag(rpcinfo.ConnResetTag, "1")
}
}
ctx, err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
Expand Down

0 comments on commit 0b18be0

Please # to comment.