Skip to content

Commit

Permalink
NET-5073 - ProxyConfiguration: implement various connection options (#…
Browse files Browse the repository at this point in the history
…19187)

* NET-5073 - ProxyConfiguration: implement various connection options

* PR feedback - LocalConnection and InboundConnection do not affect exposed routes. configure L7 route destinations. fix connection proto sequence numbers.

* add timeout to L7 Route Destinations
  • Loading branch information
jmurret authored Oct 14, 2023
1 parent 105ebfd commit a7fbd00
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (b *Builder) buildExposePaths(workload *pbcatalog.Workload) {
buildListener()

b.addExposePathsRoute(exposePath, clusterName).
addLocalAppCluster(clusterName).
addLocalAppCluster(clusterName, nil).
addLocalAppStaticEndpoints(clusterName, exposePath.LocalPathPort)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package builder
import (
"fmt"

pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
Expand All @@ -31,13 +34,13 @@ func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload, ctp *pbauth.Comput

if port.Protocol != pbcatalog.Protocol_PROTOCOL_MESH {
foundInboundNonMeshPorts = true
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName]).
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName], b.proxyCfg.GetDynamicConfig().GetInboundConnections()).
addInboundTLS()

if isL7(port.Protocol) {
b.addLocalAppRoute(routeName, clusterName)
b.addLocalAppRoute(routeName, clusterName, portName)
}
b.addLocalAppCluster(clusterName).
b.addLocalAppCluster(clusterName, &portName).
addLocalAppStaticEndpoints(clusterName, port.GetPort())
}
}
Expand Down Expand Up @@ -264,10 +267,16 @@ func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload)
// Add TLS inspection capability to be able to parse ALPN and/or SNI information from inbound connections.
listener.Capabilities = append(listener.Capabilities, pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION)

if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().InboundConnections != nil {
listener.BalanceConnections = pbproxystate.BalanceConnections(b.proxyCfg.DynamicConfig.InboundConnections.BalanceInboundConnections)
}
return b.NewListenerBuilder(listener)
}

func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions) *ListenerBuilder {
func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions,
ic *pbmesh.InboundConnectionsConfig) *ListenerBuilder {

if l.listener == nil {
return l
}
Expand All @@ -289,6 +298,15 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}

if ic != nil {
// MaxInboundConnections is uint32 that is used on:
// - router destinations MaxInboundConnection (uint64).
// - cluster circuit breakers UpstreamLimits.MaxConnections (uint32).
// It is cast to a uint64 here similarly as it is to the proxystateconverter code.
r.GetL4().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}

l.listener.Routers = append(l.listener.Routers, r)
} else if isL7(port.Protocol) {
r := &pbproxystate.Router{
Expand All @@ -308,6 +326,13 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}

if ic != nil {
// MaxInboundConnections is cast to a uint64 here similarly as it is to the
// as the L4 case statement above and in proxystateconverter code.
r.GetL7().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}

l.listener.Routers = append(l.listener.Routers, r)
}
return l
Expand Down Expand Up @@ -339,7 +364,7 @@ func getAlpnProtocolFromPortName(portName string) string {
return fmt.Sprintf("consul~%s", portName)
}

func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
func (b *Builder) addLocalAppRoute(routeName, clusterName, portName string) {
proxyRouteRule := &pbproxystate.RouteRule{
Match: &pbproxystate.RouteMatch{
PathMatch: &pbproxystate.PathMatch{
Expand All @@ -356,6 +381,18 @@ func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
},
},
}
if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().LocalConnection != nil {
lc, lcOK := b.proxyCfg.GetDynamicConfig().LocalConnection[portName]
if lcOK {
proxyRouteRule.Destination.DestinationConfiguration =
&pbproxystate.DestinationConfiguration{
TimeoutConfig: &pbproxystate.TimeoutConfig{
Timeout: lc.RequestTimeout,
},
}
}
}

// Each route name for the local app is listenerName:port since there is a route per port on the local app listener.
b.addRoute(routeName, &pbproxystate.Route{
VirtualHosts: []*pbproxystate.VirtualHost{{
Expand All @@ -373,9 +410,9 @@ func isL7(protocol pbcatalog.Protocol) bool {
return false
}

func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
func (b *Builder) addLocalAppCluster(clusterName string, portName *string) *Builder {
// Make cluster for this router destination.
b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{
cluster := &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Expand All @@ -384,20 +421,34 @@ func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
},
},
}

// configure inbound connections or connection timeout if either is defined
if b.proxyCfg.GetDynamicConfig() != nil && portName != nil {
lc, lcOK := b.proxyCfg.DynamicConfig.LocalConnection[*portName]

if lcOK || b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().Config = &pbproxystate.StaticEndpointGroupConfig{}

if lcOK {
cluster.GetEndpointGroup().GetStatic().GetConfig().ConnectTimeout = lc.ConnectTimeout
}

if b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().GetConfig().CircuitBreakers = &pbproxystate.CircuitBreakers{
UpstreamLimits: &pbproxystate.UpstreamLimits{
MaxConnections: &wrapperspb.UInt32Value{Value: b.proxyCfg.DynamicConfig.InboundConnections.MaxInboundConnections},
},
}
}
}
}

b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster
return b
}

func (b *Builder) addBlackHoleCluster() *Builder {
b.proxyStateTemplate.ProxyState.Clusters[xdscommon.BlackHoleClusterName] = &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Static: &pbproxystate.StaticEndpointGroup{},
},
},
},
}
return b
return b.addLocalAppCluster(xdscommon.BlackHoleClusterName, nil)
}

func (b *Builder) addLocalAppStaticEndpoints(clusterName string, port uint32) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package builder

import (
"google.golang.org/protobuf/types/known/durationpb"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -139,17 +142,80 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) {
},
},
},
// source/local-and-inbound-connections shows that configuring LocalCOnnection
// and InboundConnections in DynamicConfig will set fields on standard clusters and routes,
// but will not set fields on exposed path clusters and routes.
"source/local-and-inbound-connections": {
workload: &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: "10.0.0.1",
},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
"port3": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
},
proxyCfg: &pbmesh.ComputedProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
LocalConnection: map[string]*pbmesh.ConnectionConfig{
"port1": {
ConnectTimeout: durationpb.New(6 * time.Second),
RequestTimeout: durationpb.New(7 * time.Second)},
"port3": {
ConnectTimeout: durationpb.New(8 * time.Second),
RequestTimeout: durationpb.New(9 * time.Second)},
},
InboundConnections: &pbmesh.InboundConnectionsConfig{
MaxInboundConnections: 123,
BalanceInboundConnections: pbmesh.BalanceConnections(pbproxystate.BalanceConnections_BALANCE_CONNECTIONS_EXACT),
},
ExposeConfig: &pbmesh.ExposeConfig{
ExposePaths: []*pbmesh.ExposePath{
{
ListenerPort: 1234,
Path: "/health",
LocalPathPort: 9090,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP,
},
{
ListenerPort: 1235,
Path: "GetHealth",
LocalPathPort: 9091,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP2,
},
},
},
},
},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", true, c.proxyCfg).
BuildLocalApp(c.workload, nil).
Build()

// sort routers because of test flakes where order was flip flopping.
actualRouters := proxyTmpl.ProxyState.Listeners[0].Routers
sort.Slice(actualRouters, func(i, j int) bool {
return actualRouters[i].String() < actualRouters[j].String()
})

actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name+".golden")
expected := JSONToProxyTemplate(t, golden.GetBytes(t, actual, name+".golden"))

require.JSONEq(t, expected, actual)
// sort routers on listener from golden file
expectedRouters := expected.ProxyState.Listeners[0].Routers
sort.Slice(expectedRouters, func(i, j int) bool {
return expectedRouters[i].String() < expectedRouters[j].String()
})

// convert back to json after sorting so that test output does not contain extraneous fields.
require.Equal(t, protoToJSON(t, expected), protoToJSON(t, proxyTmpl))
})
}
}
Expand Down
Loading

0 comments on commit a7fbd00

Please # to comment.