Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Do not attempt to join Windows agents to memberlist cluster #5434

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions pkg/agent/memberlist/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var mapNodeEventType = map[memberlist.NodeEventType]nodeEventType{
memberlist.NodeUpdate: nodeEventTypeUpdate,
}

var linuxNodeSelector = labels.SelectorFromSet(labels.Set{corev1.LabelOSStable: "linux"})

type ClusterNodeEventHandler func(objName string)

type Interface interface {
Expand Down Expand Up @@ -200,18 +202,27 @@ func NewCluster(
return c, nil
}

func shouldJoinCluster(node *corev1.Node) bool {
// non-Linux Nodes should not join the memberlist cluster as all features relying on it is only supported on Linux.
return linuxNodeSelector.Matches(labels.Set(node.Labels))
}

func (c *Cluster) handleCreateNode(obj interface{}) {
node := obj.(*corev1.Node)
if !shouldJoinCluster(node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not worth it to have a filtered node informer based on the label, and avoid the calls to shouldJoinCluster? (or maybe not possible?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible but may be not worth as other modules need all Nodes so an unfiltered node informer is required anyway. If we create a filtered node informer, it will create another connection to kube-apiserver and store a copy of linux nodes:

// NewFilteredNodeInformer constructs a new informer for Node type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

return
}
// Ignore the Node itself.
if node.Name != c.nodeName {
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
if node.Name == c.nodeName {
return
}
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
}

affectedEIPs := c.filterEIPsFromNodeLabels(node)
Expand All @@ -233,13 +244,19 @@ func (c *Cluster) handleDeleteNode(obj interface{}) {
return
}
}
if !shouldJoinCluster(node) {
return
}
affectedEIPs := c.filterEIPsFromNodeLabels(node)
c.enqueueExternalIPPools(affectedEIPs)
klog.V(2).InfoS("Processed Node DELETE event", "nodeName", node.Name, "affectedExternalIPPoolNum", affectedEIPs.Len())
}

func (c *Cluster) handleUpdateNode(oldObj, newObj interface{}) {
node := newObj.(*corev1.Node)
if !shouldJoinCluster(node) {
return
}
oldNode := oldObj.(*corev1.Node)
if reflect.DeepEqual(node.GetLabels(), oldNode.GetLabels()) {
klog.V(2).InfoS("Processed Node UPDATE event, labels not changed", "nodeName", node.Name)
Expand Down Expand Up @@ -356,7 +373,7 @@ func (c *Cluster) Run(stopCh <-chan struct{}) {
// than 15 seconds, the agent wouldn't try to reach any other Node and would think it's the only alive Node until it's
// restarted.
func (c *Cluster) RejoinNodes() {
nodes, _ := c.nodeLister.List(labels.Everything())
nodes, _ := c.nodeLister.List(linuxNodeSelector)
aliveNodes := c.AliveNodes()
var membersToJoin []string
for _, node := range nodes {
Expand Down
50 changes: 35 additions & 15 deletions pkg/agent/memberlist/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
"antrea.io/antrea/pkg/util/ip"
)

var (
labelsWindowsOS = map[string]string{v1.LabelOSStable: "windows"}
labelsLinuxOS = map[string]string{v1.LabelOSStable: "linux"}
)

type fakeCluster struct {
cluster *Cluster
clientSet *fake.Clientset
Expand Down Expand Up @@ -190,7 +195,7 @@ func TestCluster_RunClusterEvents(t *testing.T) {
NodeIPv4Addr: &net.IPNet{IP: net.IPv4(127, 0, 0, 1)},
}
localNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
ObjectMeta: metav1.ObjectMeta{Name: nodeName, Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "127.0.0.1"}}}}
fakeEIP1 := &crdv1b1.ExternalIPPool{
TypeMeta: metav1.TypeMeta{Kind: "CustomResourceDefinition"},
Expand Down Expand Up @@ -239,17 +244,17 @@ func TestCluster_RunClusterEvents(t *testing.T) {
{
name: "Update Node with matched labels then local Node should be selected",
expectEgressSelectResult: true,
newNodeLabels: map[string]string{"env": "pro"},
newNodeLabels: map[string]string{"env": "pro", v1.LabelOSStable: "linux"},
},
{
name: "Update Node with different but matched labels then local Node should be selected",
expectEgressSelectResult: true,
newNodeLabels: map[string]string{"env": "pro", "env1": "test"},
newNodeLabels: map[string]string{"env": "pro", "env1": "test", v1.LabelOSStable: "linux"},
},
{
name: "Update Node with not matched labels then local Node should not be selected",
expectEgressSelectResult: false,
newNodeLabels: map[string]string{"env": "test"},
newNodeLabels: map[string]string{"env": "test", v1.LabelOSStable: "linux"},
},
}
updateNode := func(node *v1.Node) {
Expand All @@ -260,8 +265,9 @@ func TestCluster_RunClusterEvents(t *testing.T) {
}
for _, tCase := range testCasesUpdateNode {
t.Run(tCase.name, func(t *testing.T) {
localNode.Labels = tCase.newNodeLabels
updateNode(localNode)
newPod := localNode.DeepCopy()
newPod.Labels = tCase.newNodeLabels
updateNode(newPod)
assert.NoError(t, wait.Poll(100*time.Millisecond, time.Second, func() (done bool, err error) {
res, err := fakeCluster.cluster.ShouldSelectIP(fakeEgress1.Spec.EgressIP, fakeEgress1.Spec.ExternalIPPool)
return err == nil && res == tCase.expectEgressSelectResult, nil
Expand All @@ -270,8 +276,9 @@ func TestCluster_RunClusterEvents(t *testing.T) {
}

// Test updating ExternalIPPool.
localNode.Labels = map[string]string{"env": "test"}
updateNode(localNode)
newPod := localNode.DeepCopy()
newPod.Labels = map[string]string{"env": "test", v1.LabelOSStable: "linux"}
updateNode(newPod)
testCasesUpdateEIP := []struct {
name string
expectEgressSelectResult bool
Expand Down Expand Up @@ -374,7 +381,7 @@ func TestCluster_RunClusterEvents(t *testing.T) {

// Test creating Node with invalid IP.
fakeNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "fakeNode0"},
ObjectMeta: metav1.ObjectMeta{Name: "fakeNode0", Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "x"}}},
}
assert.NoError(t, createNode(fakeCluster.clientSet, fakeNode))
Expand All @@ -388,19 +395,28 @@ func TestCluster_RunClusterEvents(t *testing.T) {
t.Fatalf("Delete Node error: %v", err)
}
}
deleteNode(localNode)
deleteNode(newPod)
assertEgressSelectResult(fakeEgress2, false, true)
assertEgressSelectResult(fakeEgress1, false, false)

mockMemberlist.EXPECT().Join([]string{"1.1.1.1"})
// Test creating Node with valid IP.
fakeNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "fakeNode1"},
ObjectMeta: metav1.ObjectMeta{Name: "fakeNode1", Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "1.1.1.1"}}},
}
assert.NoError(t, createNode(fakeCluster.clientSet, fakeNode1))
assertEgressSelectResult(fakeEgress2, false, true)
assertEgressSelectResult(fakeEgress1, false, false)

// Test creating Windows Node, which should be ignored.
fakeWinNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "fakeWinNode1", Labels: labelsWindowsOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "1.1.1.11"}}},
}
assert.NoError(t, createNode(fakeCluster.clientSet, fakeWinNode1))
assertEgressSelectResult(fakeEgress2, false, true)
assertEgressSelectResult(fakeEgress1, false, false)
}

func genLocalNodeCluster(localNodeNme, eipName string, nodes []string) *Cluster {
Expand Down Expand Up @@ -644,24 +660,28 @@ func TestCluster_RejoinNodes(t *testing.T) {
NodeIPv4Addr: ip.MustParseCIDR("10.0.0.1/24"),
}
node1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.1"}}},
}
node2 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node2"},
ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.2"}}},
}
node3 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "node3"},
ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: labelsLinuxOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.3"}}},
}
winNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "winnode1", Labels: labelsWindowsOS},
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{{Type: v1.NodeInternalIP, Address: "10.0.0.11"}}},
}
stopCh := make(chan struct{})
defer close(stopCh)
controller := gomock.NewController(t)
mockMemberlist := NewMockMemberlist(controller)
mockMemberlist.EXPECT().Join([]string{"10.0.0.2"})
mockMemberlist.EXPECT().Join([]string{"10.0.0.3"})
fakeCluster, _ := newFakeCluster(localNodeConfig, stopCh, mockMemberlist, node1, node2, node3)
fakeCluster, _ := newFakeCluster(localNodeConfig, stopCh, mockMemberlist, node1, node2, node3, winNode1)

mockMemberlist.EXPECT().Members().Return([]*memberlist.Node{
{Name: "node1"},
Expand Down
Loading