Skip to content

Commit

Permalink
feat: set crrst flag on response header to ensure kitex client won't …
Browse files Browse the repository at this point in the history
…reuse bad connections
  • Loading branch information
jayantxie committed Jan 24, 2025
1 parent 98e1162 commit 64f616c
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions pkg/remote/trans/default_server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
}
Expand All @@ -203,7 +203,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
var methodInfo serviceinfo.MethodInfo
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
// it won't be error, because the method has been checked in decode, err check here just do defensive inspection
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded,
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
Expand All @@ -219,7 +219,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
// error cannot be wrapped to print here, so it must exec before NewTransError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
Expand Down Expand Up @@ -288,7 +288,7 @@ func (t *svrTransHandler) SetPipeline(p *remote.TransPipeline) {
}

func (t *svrTransHandler) writeErrorReplyIfNeeded(
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool,
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage, connReset bool,
) (shouldCloseConn bool) {
if cn, ok := conn.(remote.IsActive); ok && !cn.IsActive() {
// conn is closed, no need reply
Expand All @@ -313,6 +313,13 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(
// if error happen before normal OnMessage, exec it to transfer header trans info into rpcinfo
t.transPipe.OnMessage(ctx, recvMsg, errMsg)
}
if connReset {
// if connection needs to be closed, set ConnResetTag to response header
// to ensure the client won't reuse the connection.
if ei := rpcinfo.AsTaggable(ri.To()); ei != nil {
ei.SetTag(rpcinfo.ConnResetTag, "1")
}
}
ctx, err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
Expand Down

0 comments on commit 64f616c

Please # to comment.