From 57f2e77381aa10a07d2a1a72a760755a432ad34c Mon Sep 17 00:00:00 2001 From: Mustafa Altun Date: Wed, 13 Nov 2024 09:30:42 +0100 Subject: [PATCH] Add option to set max concurrency for table scan operations (#198) --- table/arrow_scanner.go | 12 ++++++------ table/scanner.go | 5 +++-- table/table.go | 14 ++++++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go index 814ae77..97147ce 100644 --- a/table/arrow_scanner.go +++ b/table/arrow_scanner.go @@ -21,7 +21,6 @@ import ( "context" "io" "iter" - "runtime" "strconv" "sync" @@ -45,7 +44,7 @@ const ( type positionDeletes = []*arrow.Chunked type perFilePosDeletes = map[string]positionDeletes -func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask) (perFilePosDeletes, error) { +func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask, concurrency int) (perFilePosDeletes, error) { var ( deletesPerFile = make(perFilePosDeletes) uniqueDeletes = make(map[string]iceberg.DataFile) @@ -69,9 +68,9 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask) } g, ctx := errgroup.WithContext(ctx) - g.SetLimit(runtime.NumCPU()) + g.SetLimit(concurrency) - perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU()) + perFileChan := make(chan map[string]*arrow.Chunked, concurrency) go func() { defer close(perFileChan) for _, v := range uniqueDeletes { @@ -213,6 +212,7 @@ type arrowScan struct { options iceberg.Properties useLargeTypes bool + concurrency int } func (as *arrowScan) projectedFieldIDs() (set[int], error) { @@ -534,7 +534,7 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks)) // numWorkers := 1 - numWorkers := min(runtime.NumCPU(), len(tasks)) + numWorkers := min(as.concurrency, len(tasks)) records := make(chan enumeratedRecord, numWorkers) var wg sync.WaitGroup @@ -592,7 +592,7 @@ func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) (*arr return resultSchema, func(yield func(arrow.Record, error) bool) {}, nil } - deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks) + deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks, as.concurrency) if err != nil { return nil, nil, err } diff --git a/table/scanner.go b/table/scanner.go index f7aab11..bfb183e 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "iter" - "runtime" "slices" "sync" @@ -132,6 +131,7 @@ type Scan struct { limit int64 partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression] + concurrency int } func (scan *Scan) UseRowLimit(n int64) *Scan { @@ -294,7 +294,7 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) { dataEntries := make([]iceberg.ManifestEntry, 0) positionalDeleteEntries := make([]iceberg.ManifestEntry, 0) - nworkers := min(runtime.NumCPU(), len(manifestList)) + nworkers := min(scan.concurrency, len(manifestList)) var wg sync.WaitGroup manifestChan := make(chan iceberg.ManifestFile, len(manifestList)) @@ -434,6 +434,7 @@ func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[ caseSensitive: scan.caseSensitive, rowLimit: scan.limit, options: scan.options, + concurrency: scan.concurrency, }).GetRecords(ctx, tasks) } diff --git a/table/table.go b/table/table.go index 350782a..ac9cd2b 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,7 @@ package table import ( + "runtime" "slices" "github.com/apache/iceberg-go" @@ -110,6 +111,18 @@ func WithLimit(n int64) ScanOption { } } +// WitMaxConcurrency sets the maximum concurrency for table scan and plan +// operations. When unset it defaults to runtime.GOMAXPROCS. +func WitMaxConcurrency(n int) ScanOption { + if n <= 0 { + return noopOption + } + + return func(scan *Scan) { + scan.concurrency = n + } +} + func WithOptions(opts iceberg.Properties) ScanOption { if opts == nil { return noopOption @@ -128,6 +141,7 @@ func (t Table) Scan(opts ...ScanOption) *Scan { selectedFields: []string{"*"}, caseSensitive: true, limit: ScanNoLimit, + concurrency: runtime.GOMAXPROCS(0), } for _, opt := range opts {