From 51161154dfb9058cbda13cc51a9c519efac2b8df Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 20 Nov 2024 13:19:50 +0800 Subject: [PATCH] executor: fix data race in hash join v2 spill (#57519) ref pingcap/tidb#57450 --- pkg/executor/join/hash_join_v2.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/executor/join/hash_join_v2.go b/pkg/executor/join/hash_join_v2.go index 22abbff7cd4ea..6387f717c4b23 100644 --- a/pkg/executor/join/hash_join_v2.go +++ b/pkg/executor/join/hash_join_v2.go @@ -504,11 +504,6 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestore(inDisk *chun } }() - partitionNumber := b.HashJoinCtx.partitionNumber - hashJoinCtx := b.HashJoinCtx - - b.builder = createRowTableBuilder(b.BuildKeyColIdx, hashJoinCtx.BuildKeyTypes, partitionNumber, b.HasNullableKey, hashJoinCtx.BuildFilter != nil, hashJoinCtx.needScanRowTableAfterProbeDone) - hasErr := false chunkNum := inDisk.NumChunks() for i := 0; i < chunkNum; i++ { @@ -544,10 +539,6 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTable(typeCtx types.Context, setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost) } }() - partitionNumber := b.HashJoinCtx.partitionNumber - hashJoinCtx := b.HashJoinCtx - - b.builder = createRowTableBuilder(b.BuildKeyColIdx, hashJoinCtx.BuildKeyTypes, partitionNumber, b.HasNullableKey, hashJoinCtx.BuildFilter != nil, hashJoinCtx.needScanRowTableAfterProbeDone) hasErr := false for chk := range srcChkCh { @@ -1263,6 +1254,11 @@ func (e *HashJoinV2Exec) fetchAndBuildHashTableImpl(ctx context.Context) { // doneCh is used by the consumer(splitAndAppendToRowTable) to info the producer(fetchBuildSideRows) that the consumer meet error and stop consume data doneCh := make(chan struct{}, e.Concurrency) + // init builder, todo maybe the builder can be reused during the whole life cycle of the executor + hashJoinCtx := e.HashJoinCtxV2 + for _, worker := range e.BuildWorkers { + worker.builder = createRowTableBuilder(worker.BuildKeyColIdx, hashJoinCtx.BuildKeyTypes, hashJoinCtx.partitionNumber, worker.HasNullableKey, hashJoinCtx.BuildFilter != nil, hashJoinCtx.needScanRowTableAfterProbeDone) + } srcChkCh, waitForController := e.fetchBuildSideRows(ctx, fetcherAndWorkerSyncer, wg, errCh, doneCh) e.splitAndAppendToRowTable(srcChkCh, waitForController, fetcherAndWorkerSyncer, wg, errCh, doneCh) success := waitJobDone(wg, errCh)