Skip to content

Commit

Permalink
Fix FlowExporter memory bloat when export process is dead (#3994)
Browse files Browse the repository at this point in the history
When flow exporter is enabled, but failed to connect to downstream IPFIX
collector, connections added to the priority queue inside flow exporter
won't be expired and removed from queue, causing memory to
bloat. Furthermore, connection store polling will remove conn from its
connections map when the flow is no longer in conntrack, but not the
related items in priority queue. When a new flow with same flow key is
reestablished, a duplicated item with same key will be added to the
queue, while the reference to the old one is lost, essentially causing
memory leak.

This change addresses above issue in the following aspects:

* Connection store polling removes stale conn from both connections map
  and the priority queue. Since CS polling is independent of exporting
  process liveness, this allows clean up to be done without connection
  to collector.
* Fixes init of Connection.LastExportTime to be connection start time to
  make sure CS polling logic works properly when the exporting process
  is dead. Previously LastExportTime will only be filled by exporting
  process at the time of export, causing zero value to be compare in
  certain cases.
* Adds guards in priority queue to prevent having two Connections with
  same connection key in the heap.

Benchmark test BenchmarkExportConntrackConns did not show observable
difference before and after change.

Fixes item 1 and 2 in #3972. Severity of item 3 is lower, which will be
addressed in a later change.

Signed-off-by: Shawn Wang <wshaoquan@vmware.com>
  • Loading branch information
wsquan171 authored Jul 13, 2022
1 parent ce3059e commit 23ef6f7
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 6 deletions.
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if removedItem := cs.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from cs pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -239,7 +244,7 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
// If the connKey:pqItem pair does not exist in the map, it shows the
// conn was inactive, and was removed from PQ and map. Since it becomes
// active again now, we create a new pqItem and add it to PQ and map.
cs.expirePriorityQueue.AddItemToQueue(connKey, existingConn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, existingConn)
} else {
cs.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(cs.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -264,11 +269,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
conn.StartTime = time.Now()
conn.StopTime = time.Now()
}
conn.LastExportTime = conn.StartTime
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
conn.IsActive = true
// Add new antrea connection to connection store and PQ.
cs.connections[connKey] = conn
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New Antrea flow added", "connection", conn)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime,
StopTime: refTime,
LastExportTime: refTime,
FlowKey: tuple1,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
Mark: openflow.ServiceCTMark.GetValue(),
Expand All @@ -136,6 +137,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -156,6 +158,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
Expand All @@ -173,6 +176,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -195,6 +199,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand Down Expand Up @@ -249,7 +254,7 @@ func testAddNewConn(mockIfaceStore *interfacestoretest.MockInterfaceStore, mockP
func addConnToStore(cs *ConntrackConnectionStore, conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)
cs.AddConnToMap(&connKey, conn)
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) {
case <-pollTicker.C:
deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= ds.staleConnectionTimeout {
if removedItem := ds.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from ds pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := ds.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -78,7 +83,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.IsActive = true
existingItem, exists := ds.expirePriorityQueue.KeyToItem[connKey]
if !exists {
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
} else {
ds.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(ds.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -87,6 +92,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
} else {
conn.StartTime = timeSeen
conn.StopTime = timeSeen
conn.LastExportTime = timeSeen
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
Expand All @@ -96,7 +102,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New deny connection added", "connection", conn)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
Expand All @@ -89,6 +90,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/flowexporter/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (pq *ExpirePriorityQueue) Update(item *flowexporter.ItemToExpire, activeExp
heap.Fix(pq, item.Index)
}

// Remove removes and returns an Item by key from priority queue if it exists.
func (pq *ExpirePriorityQueue) Remove(connKey flowexporter.ConnectionKey) *flowexporter.ItemToExpire {
item, exists := pq.KeyToItem[connKey]
if !exists {
return nil
}

removedItem := heap.Remove(pq, item.Index)
delete(pq.KeyToItem, connKey)
return removedItem.(*flowexporter.ItemToExpire)
}

// GetExpiryFromExpirePriorityQueue returns the shortest expire time duration
// from expire priority queue.
func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration {
Expand All @@ -114,13 +126,18 @@ func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration
return pq.IdleFlowTimeout
}

func (pq *ExpirePriorityQueue) AddItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
// WriteItemToQueue adds conn with connKey into the queue. If an existing item
// has the same connKey, it will be overwritten by the new item.
func (pq *ExpirePriorityQueue) WriteItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
currTime := time.Now()
pqItem := &flowexporter.ItemToExpire{
Conn: conn,
ActiveExpireTime: currTime.Add(pq.ActiveFlowTimeout),
IdleExpireTime: currTime.Add(pq.IdleFlowTimeout),
}
// If connKey exists in pq, it is removed first to avoid having multiple pqItems with same key
// in the queue, which can cause memory leak as the previous one can't be updated or removed.
pq.Remove(connKey)
heap.Push(pq, pqItem)
pq.KeyToItem[connKey] = pqItem
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package priorityqueue

import (
"container/heap"
"fmt"
"testing"
"time"

Expand All @@ -41,9 +42,40 @@ func TestExpirePriorityQueue(t *testing.T) {
Index: key,
}
testPriorityQueue.items = append(testPriorityQueue.items, item)
testPriorityQueue.KeyToItem[flowexporter.ConnectionKey{fmt.Sprintf("%d", key)}] = item
}
heap.Init(testPriorityQueue)

// Test WriteItemToQueue
connKey := flowexporter.ConnectionKey{"3"}
conn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &conn)
assert.Equal(t, &conn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't add new conn to map")
newConn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &newConn)
assert.Equal(t, &newConn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't overwrite existing conn to map")
hasOld, hasNew := false, false
for _, item := range testPriorityQueue.items {
if item.Conn == &conn {
hasOld = true
}
if item.Conn == &newConn {
hasNew = true
}
}
assert.False(t, hasOld && hasNew, "WriteItemToQueue shouldn't add two items with same key to heap")

// Test Remove
removedItem := testPriorityQueue.Remove(connKey)
assert.Equal(t, &newConn, removedItem.Conn, "Remove didn't return correct item")
_, exist := testPriorityQueue.KeyToItem[connKey]
assert.False(t, exist, "Remove didn't delete KeyToItem entry")
for _, item := range testPriorityQueue.items {
if item.Conn == &newConn {
assert.Fail(t, "Remove didn't delete item from queue")
}
}

// Add new flow to the priority queue
testFlowsWithExpire[3] = []time.Time{startTime.Add(3 * time.Second), startTime.Add(500 * time.Millisecond)}
newFlowItem := &flowexporter.ItemToExpire{
Expand Down

0 comments on commit 23ef6f7

Please # to comment.