From 473edd1764b6739e2e4610ea5dede4c2bc6009d1 Mon Sep 17 00:00:00 2001 From: Evan Culver Date: Fri, 27 Aug 2021 00:25:19 +0200 Subject: [PATCH] [1.9.x] rpc: authorize raft requests (#10932) --- .changelog/10932.txt | 3 + agent/consul/raft_rpc.go | 3 +- agent/consul/rpc.go | 20 ++- agent/consul/rpc_test.go | 289 +++++++++++++++++++++++++++++++++++++-- tlsutil/config.go | 91 +++++++++--- tlsutil/config_test.go | 4 +- 6 files changed, 373 insertions(+), 37 deletions(-) create mode 100644 .changelog/10932.txt diff --git a/.changelog/10932.txt b/.changelog/10932.txt new file mode 100644 index 000000000000..d7cc925b00a9 --- /dev/null +++ b/.changelog/10932.txt @@ -0,0 +1,3 @@ +```release-note:security +rpc: authorize raft requests [CVE-2021-37219](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-37219) +``` diff --git a/agent/consul/raft_rpc.go b/agent/consul/raft_rpc.go index 9fb236a64b8c..08e23a1409cb 100644 --- a/agent/consul/raft_rpc.go +++ b/agent/consul/raft_rpc.go @@ -6,9 +6,10 @@ import ( "sync" "time" + "github.com/hashicorp/raft" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/raft" ) // RaftLayer implements the raft.StreamLayer interface, diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index b72360259b3d..4b4a50ee43f9 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -201,8 +201,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { s.handleConsulConn(conn) case pool.RPCRaft: - metrics.IncrCounter([]string{"rpc", "raft_handoff"}, 1) - s.raftLayer.Handoff(conn) + s.handleRaftRPC(conn) case pool.RPCTLS: // Don't allow malicious client to create TLS-in-TLS for ever. @@ -290,8 +289,7 @@ func (s *Server) handleNativeTLS(conn net.Conn) { s.handleConsulConn(tlsConn) case pool.ALPN_RPCRaft: - metrics.IncrCounter([]string{"rpc", "raft_handoff"}, 1) - s.raftLayer.Handoff(tlsConn) + s.handleRaftRPC(tlsConn) case pool.ALPN_RPCMultiplexV2: s.handleMultiplexV2(tlsConn) @@ -462,6 +460,20 @@ func (s *Server) handleSnapshotConn(conn net.Conn) { }() } +func (s *Server) handleRaftRPC(conn net.Conn) { + if tlsConn, ok := conn.(*tls.Conn); ok { + err := s.tlsConfigurator.AuthorizeServerConn(s.config.Datacenter, tlsConn) + if err != nil { + s.rpcLogger().Warn(err.Error(), "from", conn.RemoteAddr(), "operation", "raft RPC") + conn.Close() + return + } + } + + metrics.IncrCounter([]string{"rpc", "raft_handoff"}, 1) + s.raftLayer.Handoff(conn) +} + func (s *Server) handleALPN_WANGossipPacketStream(conn net.Conn) error { defer conn.Close() diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index e668334b5137..5357adda22c6 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1,29 +1,43 @@ package consul import ( + "bufio" "bytes" + "crypto/x509" "encoding/binary" "errors" + "fmt" + "io" + "io/ioutil" "math" "net" "os" + "path/filepath" "strings" "sync" "testing" "time" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-msgpack/codec" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/go-memdb" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/tlsutil" ) func TestRPC_NoLeader_Fail(t *testing.T) { @@ -648,10 +662,10 @@ func TestRPC_RPCMaxConnsPerClient(t *testing.T) { magicByte pool.RPCType tlsEnabled bool }{ - {"RPC", pool.RPCMultiplexV2, false}, - {"RPC TLS", pool.RPCMultiplexV2, true}, - {"Raft", pool.RPCRaft, false}, - {"Raft TLS", pool.RPCRaft, true}, + {"RPC v2", pool.RPCMultiplexV2, false}, + {"RPC v2 TLS", pool.RPCMultiplexV2, true}, + {"RPC", pool.RPCConsul, false}, + {"RPC TLS", pool.RPCConsul, true}, } for _, tc := range cases { @@ -913,3 +927,262 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { require.NoError(t, err) require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") } + +func TestRPC_AuthorizeRaftRPC(t *testing.T) { + caPEM, pk, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"}) + require.NoError(t, err) + + dir := testutil.TempDir(t, "certs") + err = ioutil.WriteFile(filepath.Join(dir, "ca.pem"), []byte(caPEM), 0600) + require.NoError(t, err) + + newCert := func(t *testing.T, caPEM, pk, node, name string) { + t.Helper() + + signer, err := tlsutil.ParseSigner(pk) + require.NoError(t, err) + + pem, key, err := tlsutil.GenerateCert(tlsutil.CertOpts{ + Signer: signer, + CA: caPEM, + Name: name, + Days: 5, + DNSNames: []string{node + "." + name, name, "localhost"}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, + }) + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".pem"), []byte(pem), 0600) + require.NoError(t, err) + err = ioutil.WriteFile(filepath.Join(dir, node+"-"+name+".key"), []byte(key), 0600) + require.NoError(t, err) + } + + newCert(t, caPEM, pk, "srv1", "server.dc1.consul") + + _, connectCApk, err := connect.GeneratePrivateKey() + require.NoError(t, err) + + _, srv := testServerWithConfig(t, func(c *Config) { + c.Domain = "consul." // consul. is the default value in agent/config + c.CAFile = filepath.Join(dir, "ca.pem") + c.CertFile = filepath.Join(dir, "srv1-server.dc1.consul.pem") + c.KeyFile = filepath.Join(dir, "srv1-server.dc1.consul.key") + c.VerifyIncoming = true + c.VerifyServerHostname = true + // Enable Auto-Encrypt so that Conenct CA roots are added to the + // tlsutil.Configurator. + c.AutoEncryptAllowTLS = true + c.CAConfig = &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: structs.ConsulCAProvider, + Config: map[string]interface{}{"PrivateKey": connectCApk}, + } + }) + defer srv.Shutdown() + + // Wait for ConnectCA initiation to complete. + retry.Run(t, func(r *retry.R) { + _, root := srv.caManager.getCAProvider() + if root == nil { + r.Fatal("ConnectCA root is still nil") + } + }) + + useTLSByte := func(t *testing.T, c *tlsutil.Configurator) net.Conn { + wrapper := tlsutil.SpecificDC("dc1", c.OutgoingRPCWrapper()) + tlsEnabled := func(_ raft.ServerAddress) bool { + return true + } + + rl := NewRaftLayer(nil, nil, wrapper, tlsEnabled) + conn, err := rl.Dial(raft.ServerAddress(srv.Listener.Addr().String()), 100*time.Millisecond) + require.NoError(t, err) + return conn + } + + useNativeTLS := func(t *testing.T, c *tlsutil.Configurator) net.Conn { + wrapper := c.OutgoingALPNRPCWrapper() + dialer := &net.Dialer{Timeout: 100 * time.Millisecond} + + rawConn, err := dialer.Dial("tcp", srv.Listener.Addr().String()) + require.NoError(t, err) + + tlsConn, err := wrapper("dc1", "srv1", pool.ALPN_RPCRaft, rawConn) + require.NoError(t, err) + return tlsConn + } + + setupAgentTLSCert := func(name string) func(t *testing.T) string { + return func(t *testing.T) string { + newCert(t, caPEM, pk, "node1", name) + return filepath.Join(dir, "node1-"+name) + } + } + + setupConnectCACert := func(name string) func(t *testing.T) string { + return func(t *testing.T) string { + _, caRoot := srv.caManager.getCAProvider() + newCert(t, caRoot.RootCert, connectCApk, "node1", name) + return filepath.Join(dir, "node1-"+name) + } + } + + type testCase struct { + name string + conn func(t *testing.T, c *tlsutil.Configurator) net.Conn + setupCert func(t *testing.T) string + expectError bool + } + + run := func(t *testing.T, tc testCase) { + certPath := tc.setupCert(t) + + cfg := tlsutil.Config{ + VerifyOutgoing: true, + VerifyServerHostname: true, + CAFile: filepath.Join(dir, "ca.pem"), + CertFile: certPath + ".pem", + KeyFile: certPath + ".key", + Domain: "consul", + } + c, err := tlsutil.NewConfigurator(cfg, hclog.New(nil)) + require.NoError(t, err) + + _, err = doRaftRPC(tc.conn(t, c), srv.config.NodeName) + if tc.expectError { + if !isConnectionClosedError(err) { + t.Fatalf("expected a connection closed error, got: %v", err) + } + return + } + require.NoError(t, err) + } + + var testCases = []testCase{ + { + name: "TLS byte with client cert", + setupCert: setupAgentTLSCert("client.dc1.consul"), + conn: useTLSByte, + expectError: true, + }, + { + name: "TLS byte with server cert in different DC", + setupCert: setupAgentTLSCert("server.dc2.consul"), + conn: useTLSByte, + expectError: true, + }, + { + name: "TLS byte with server cert in same DC", + setupCert: setupAgentTLSCert("server.dc1.consul"), + conn: useTLSByte, + }, + { + name: "TLS byte with ConnectCA leaf cert", + setupCert: setupConnectCACert("server.dc1.consul"), + conn: useTLSByte, + expectError: true, + }, + { + name: "native TLS with client cert", + setupCert: setupAgentTLSCert("client.dc1.consul"), + conn: useNativeTLS, + expectError: true, + }, + { + name: "native TLS with server cert in different DC", + setupCert: setupAgentTLSCert("server.dc2.consul"), + conn: useNativeTLS, + expectError: true, + }, + { + name: "native TLS with server cert in same DC", + setupCert: setupAgentTLSCert("server.dc1.consul"), + conn: useNativeTLS, + }, + { + name: "native TLS with ConnectCA leaf cert", + setupCert: setupConnectCACert("server.dc1.consul"), + conn: useNativeTLS, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func doRaftRPC(conn net.Conn, leader string) (raft.AppendEntriesResponse, error) { + var resp raft.AppendEntriesResponse + + var term uint64 = 0xc + a := raft.AppendEntriesRequest{ + RPCHeader: raft.RPCHeader{ProtocolVersion: 3}, + Term: 0, + Leader: []byte(leader), + PrevLogEntry: 0, + PrevLogTerm: term, + LeaderCommitIndex: 50, + } + + if err := appendEntries(conn, a, &resp); err != nil { + return resp, err + } + return resp, nil +} + +func appendEntries(conn net.Conn, req raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { + w := bufio.NewWriter(conn) + enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) + + const rpcAppendEntries = 0 + if err := w.WriteByte(rpcAppendEntries); err != nil { + return fmt.Errorf("failed to write raft-RPC byte: %w", err) + } + + if err := enc.Encode(req); err != nil { + return fmt.Errorf("failed to send append entries RPC: %w", err) + } + if err := w.Flush(); err != nil { + return fmt.Errorf("failed to flush RPC: %w", err) + } + + if err := decodeRaftRPCResponse(conn, resp); err != nil { + return fmt.Errorf("response error: %w", err) + } + return nil +} + +// copied and modified from raft/net_transport.go +func decodeRaftRPCResponse(conn net.Conn, resp *raft.AppendEntriesResponse) error { + r := bufio.NewReader(conn) + dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) + + var rpcError string + if err := dec.Decode(&rpcError); err != nil { + return fmt.Errorf("failed to decode response error: %w", err) + } + if err := dec.Decode(resp); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + if rpcError != "" { + return fmt.Errorf("rpc error: %v", rpcError) + } + return nil +} + +func isConnectionClosedError(err error) bool { + switch { + case err == nil: + return false + case errors.Is(err, io.EOF): + return true + case strings.Contains(err.Error(), "connection reset by peer"): + return true + default: + return false + } +} diff --git a/tlsutil/config.go b/tlsutil/config.go index 4042518b3951..e1153aeb8e78 100644 --- a/tlsutil/config.go +++ b/tlsutil/config.go @@ -158,19 +158,20 @@ func SpecificDC(dc string, tlsWrap DCWrapper) Wrapper { } type autoTLS struct { - manualCAPems []string + extraCAPems []string connectCAPems []string cert *tls.Certificate verifyServerHostname bool } -func (a *autoTLS) caPems() []string { - return append(a.manualCAPems, a.connectCAPems...) -} - +// manual stores the TLS CA and cert received from Configurator.Update which +// generally comes from the agent configuration. type manual struct { caPems []string cert *tls.Certificate + // caPool containing only the caPems. This CertPool should be used instead of + // the Configurator.caPool when only the Agent TLS CA is allowed. + caPool *x509.CertPool } // Configurator holds a Config and is responsible for generating all the @@ -209,13 +210,6 @@ func NewConfigurator(config Config, logger hclog.Logger) (*Configurator, error) return c, nil } -// CAPems returns the currently loaded CAs in PEM format. -func (c *Configurator) CAPems() []string { - c.RLock() - defer c.RUnlock() - return append(c.manual.caPems, c.autoTLS.caPems()...) -} - // ManualCAPems returns the currently loaded CAs in PEM format. func (c *Configurator) ManualCAPems() []string { c.RLock() @@ -240,17 +234,23 @@ func (c *Configurator) Update(config Config) error { if err != nil { return err } - pool, err := pool(append(pems, c.autoTLS.caPems()...)) + caPool, err := newX509CertPool(pems, c.autoTLS.extraCAPems, c.autoTLS.connectCAPems) if err != nil { return err } - if err = c.check(config, pool, cert); err != nil { + if err = c.check(config, caPool, cert); err != nil { + return err + } + manualCAPool, err := newX509CertPool(pems) + if err != nil { return err } + c.base = &config c.manual.cert = cert + c.manual.caPool = manualCAPool c.manual.caPems = pems - c.caPool = pool + c.caPool = caPool c.version++ return nil } @@ -265,7 +265,7 @@ func (c *Configurator) UpdateAutoTLSCA(connectCAPems []string) error { defer c.log("UpdateAutoEncryptCA") defer c.Unlock() - pool, err := pool(append(c.manual.caPems, append(c.autoTLS.manualCAPems, connectCAPems...)...)) + pool, err := newX509CertPool(c.manual.caPems, c.autoTLS.extraCAPems, connectCAPems) if err != nil { c.RUnlock() return err @@ -310,11 +310,11 @@ func (c *Configurator) UpdateAutoTLS(manualCAPems, connectCAPems []string, pub, c.Lock() defer c.Unlock() - pool, err := pool(append(c.manual.caPems, append(manualCAPems, connectCAPems...)...)) + pool, err := newX509CertPool(c.manual.caPems, manualCAPems, connectCAPems) if err != nil { return err } - c.autoTLS.manualCAPems = manualCAPems + c.autoTLS.extraCAPems = manualCAPems c.autoTLS.connectCAPems = connectCAPems c.autoTLS.cert = &cert c.caPool = pool @@ -345,11 +345,21 @@ func (c *Configurator) Base() Config { return *c.base } -func pool(pems []string) (*x509.CertPool, error) { +// newX509CertPool loads all the groups of PEM encoded certificates into a +// single x509.CertPool. +// +// The groups argument is a varargs of slices so that callers do not need to +// append slices together. In some cases append can modify the backing array +// of the first slice passed to append, which will often result in hard to +// find bugs. By accepting a varargs of slices we remove the need for the +// caller to append the groups, which should prevent any such bugs. +func newX509CertPool(groups ...[]string) (*x509.CertPool, error) { pool := x509.NewCertPool() - for _, pem := range pems { - if !pool.AppendCertsFromPEM([]byte(pem)) { - return nil, fmt.Errorf("Couldn't parse PEM %s", pem) + for _, group := range groups { + for _, pem := range group { + if !pool.AppendCertsFromPEM([]byte(pem)) { + return nil, fmt.Errorf("failed to parse PEM %s", pem) + } } } if len(pool.Subjects()) == 0 { @@ -923,6 +933,43 @@ func (c *Configurator) wrapALPNTLSClient(dc, nodeName, alpnProto string, conn ne return tlsConn, nil } +// AuthorizeServerConn is used to validate that the connection is being established +// by a Consul server in the same datacenter. +// +// The identity of the connection is checked by verifying that the certificate +// presented is signed by the Agent TLS CA, and has a DNSName that matches the +// local ServerSNI name. +// +// Note this check is only performed if VerifyServerHostname is enabled, otherwise +// it does no authorization. +func (c *Configurator) AuthorizeServerConn(dc string, conn *tls.Conn) error { + if !c.VerifyServerHostname() { + return nil + } + + c.RLock() + caPool := c.manual.caPool + c.RUnlock() + + expected := c.ServerSNI(dc, "") + for _, chain := range conn.ConnectionState().VerifiedChains { + if len(chain) == 0 { + continue + } + clientCert := chain[0] + _, err := clientCert.Verify(x509.VerifyOptions{ + DNSName: expected, + Roots: caPool, + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + }) + if err == nil { + return nil + } + c.logger.Debug("AuthorizeServerConn failed certificate validation", "error", err) + } + return fmt.Errorf("a TLS certificate with a CommonName of %v is required for this operation", expected) +} + // ParseCiphers parse ciphersuites from the comma-separated string into // recognized slice func ParseCiphers(cipherStr string) ([]uint16, error) { diff --git a/tlsutil/config_test.go b/tlsutil/config_test.go index 1cfd44d018bc..736dc4c03d35 100644 --- a/tlsutil/config_test.go +++ b/tlsutil/config_test.go @@ -520,7 +520,7 @@ func TestConfigurator_ErrorPropagation(t *testing.T) { require.NoError(t, err, info) pems, err := LoadCAs(v.config.CAFile, v.config.CAPath) require.NoError(t, err, info) - pool, err := pool(pems) + pool, err := newX509CertPool(pems) require.NoError(t, err, info) err3 = c.check(v.config, pool, cert) } @@ -579,7 +579,7 @@ func TestConfigurator_LoadCAs(t *testing.T) { } for i, v := range variants { pems, err1 := LoadCAs(v.cafile, v.capath) - pool, err2 := pool(pems) + pool, err2 := newX509CertPool(pems) info := fmt.Sprintf("case %d", i) if v.shouldErr { if err1 == nil && err2 == nil {