Skip to content

Commit

Permalink
Fix clickhouse client race during batch commit
Browse files Browse the repository at this point in the history
Signed-off-by: Shawn Wang <wshaoquan@vmware.com>
  • Loading branch information
wsquan171 committed Aug 2, 2022
1 parent b66ffda commit 602b8cc
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/containernetworking/plugins v0.8.7
github.com/coreos/go-iptables v0.6.0
github.com/fsnotify/fsnotify v1.5.4
github.com/gammazero/deque v0.1.0
github.com/gammazero/deque v0.1.2
github.com/go-logr/logr v1.2.0
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik=
github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/gammazero/deque v0.1.2 h1:WvbDJ3YaT4ELf9+Cq9lv4Ef0aPRyZeEpIoVkjOw9kes=
github.com/gammazero/deque v0.1.2/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down
30 changes: 24 additions & 6 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ func (ch *ClickHouseExportProcess) flowRecordPeriodicCommit() {
// Returns the number of records successfully committed, and error if encountered.
// Cached records will be removed only after successful commit.
func (ch *ClickHouseExportProcess) batchCommitAll() (int, error) {
ch.mutex.RLock()
currSize := ch.deque.Len()
ch.mutex.RUnlock()
if currSize == 0 {
return 0, nil
}
Expand All @@ -431,11 +433,18 @@ func (ch *ClickHouseExportProcess) batchCommitAll() (int, error) {
}

// populate items from deque
recordsToExport := make([]*ClickHouseFlowRow, 0, currSize)
ch.mutex.Lock()
for i := 0; i < currSize; i++ {
record, ok := ch.deque.At(i).(*ClickHouseFlowRow)
record, ok := ch.deque.PopFront().(*ClickHouseFlowRow)
if !ok {
continue
}
recordsToExport = append(recordsToExport, record)
}
ch.mutex.Unlock()

for _, record := range recordsToExport {
_, err := stmt.Exec(
record.flowStartSeconds,
record.flowEndSeconds,
Expand Down Expand Up @@ -487,24 +496,33 @@ func (ch *ClickHouseExportProcess) batchCommitAll() (int, error) {

if err != nil {
klog.ErrorS(err, "Error when adding record")
ch.pushFrontRecords(recordsToExport)
_ = tx.Rollback()
return 0, err
}
}

if err := tx.Commit(); err != nil {
klog.ErrorS(err, "Error when committing record")
ch.pushFrontRecords(recordsToExport)
return 0, err
}

// remove committed record from deque
return len(recordsToExport), nil
}

// pushFrontRecords pushes records back to the front of deque without exceeding its capacity.
// Items with lower index (older records) will be dropped first if deque is to be filled.
func (ch *ClickHouseExportProcess) pushFrontRecords(records []*ClickHouseFlowRow) {
ch.mutex.Lock()
defer ch.mutex.Unlock()
for i := 0; i < currSize; i++ {
ch.deque.PopFront()
}

return currSize, nil
for i := len(records) - 1; i >= 0; i-- {
if ch.deque.Len() >= ch.queueSize {
break
}
ch.deque.PushFront(records[i])
}
}

func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
Expand Down
59 changes: 50 additions & 9 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,10 @@ func TestBatchCommitAll(t *testing.T) {
defer db.Close()

chExportProc := ClickHouseExportProcess{
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
queueSize: maxQueueSize,
}

recordRow := ClickHouseFlowRow{
Expand Down Expand Up @@ -526,9 +527,10 @@ func TestBatchCommitAllMultiRecord(t *testing.T) {
defer db.Close()

chExportProc := ClickHouseExportProcess{
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
queueSize: maxQueueSize,
}
recordRow := ClickHouseFlowRow{}
fieldCount := reflect.TypeOf(recordRow).NumField()
Expand Down Expand Up @@ -564,9 +566,10 @@ func TestBatchCommitAllError(t *testing.T) {
defer db.Close()

chExportProc := ClickHouseExportProcess{
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
db: db,
deque: deque.New(),
mutex: sync.RWMutex{},
queueSize: maxQueueSize,
}
recordRow := ClickHouseFlowRow{}
chExportProc.deque.PushBack(&recordRow)
Expand All @@ -591,3 +594,41 @@ func TestBatchCommitAllError(t *testing.T) {
t.Errorf("Exists unfulfilled expectations for db sql operation: %s", err)
}
}

func TestPushFrontRecords(t *testing.T) {
chExportProc := ClickHouseExportProcess{
deque: deque.New(),
mutex: sync.RWMutex{},
queueSize: 4,
}

// init deque [0]
records := make([]*ClickHouseFlowRow, 5)
for i := 0; i < 5; i++ {
records[i] = &ClickHouseFlowRow{sourceTransportPort: uint16(i)}
}
chExportProc.deque.PushBack(records[0])

// all records should be pushed to front of deque if cap allows.
// deque before [0], cap: 4
// pushfront([1,2])
// expected deque: [1,2,0]
pushbackRecords := records[1:3]
chExportProc.pushFrontRecords(pushbackRecords)
assert.Equal(t, 3, chExportProc.deque.Len(), "deque size mismatch")
assert.Equal(t, records[1], chExportProc.deque.At(0), "deque has wrong item at index 0")
assert.Equal(t, records[2], chExportProc.deque.At(1), "deque has wrong item at index 1")
assert.Equal(t, records[0], chExportProc.deque.At(2), "deque has wrong item at index 2")

// only newest items should be pushed to front of deque if hitting capacity.
// deque before [1,2,0], cap: 4
// pushfront([3,4])
// expected deque: [4,1,2,0]
pushbackRecords = records[3:]
chExportProc.pushFrontRecords(pushbackRecords)
assert.Equal(t, 4, chExportProc.deque.Len(), "deque size mismatch")
assert.Equal(t, records[4], chExportProc.deque.At(0), "deque has wrong item at index 0")
assert.Equal(t, records[1], chExportProc.deque.At(1), "deque has wrong item at index 1")
assert.Equal(t, records[2], chExportProc.deque.At(2), "deque has wrong item at index 2")
assert.Equal(t, records[0], chExportProc.deque.At(3), "deque has wrong item at index 3")
}
2 changes: 1 addition & 1 deletion plugins/octant/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwV
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/gammazero/deque v0.1.2/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
Expand Down

0 comments on commit 602b8cc

Please # to comment.