diff --git a/storage/dataflux/integration_test.go b/storage/dataflux/integration_test.go index 2bb04c4566cb..2fa492c4b26e 100644 --- a/storage/dataflux/integration_test.go +++ b/storage/dataflux/integration_test.go @@ -70,7 +70,6 @@ func TestMain(m *testing.M) { // Lists the all the objects in the bucket. func TestIntegration_NextBatch_All(t *testing.T) { - t.Skip("#11198") if testing.Short() { t.Skip("Integration tests skipped in short mode") } @@ -97,7 +96,6 @@ func TestIntegration_NextBatch_All(t *testing.T) { } func TestIntegration_NextBatch(t *testing.T) { - t.Skip("#11196") // Accessing public bucket to list large number of files in batches. // See https://cloud.google.com/storage/docs/public-datasets/landsat if testing.Short() { diff --git a/storage/dataflux/range_splitter.go b/storage/dataflux/range_splitter.go index 9d0896081ad0..2be2d7b30d1e 100644 --- a/storage/dataflux/range_splitter.go +++ b/storage/dataflux/range_splitter.go @@ -328,6 +328,8 @@ func (rs *rangeSplitter) convertStringRangeToMinimalIntRange( // charPosition returns the index of the character in the alphabet set. func (rs *rangeSplitter) charPosition(ch rune) (int, error) { + rs.mu.Lock() // Acquire the lock + defer rs.mu.Unlock() // Release the lock when the function exits if idx, ok := rs.alphabetMap[ch]; ok { return idx, nil } @@ -337,6 +339,8 @@ func (rs *rangeSplitter) charPosition(ch rune) (int, error) { // convertRangeStringToArray transforms the range string into a rune slice while // verifying the presence of each character in the alphabets. func (rs *rangeSplitter) convertRangeStringToArray(rangeString string) ([]rune, error) { + rs.mu.Lock() // Acquire the lock + defer rs.mu.Unlock() // Release the lock when the function exits for _, char := range rangeString { if _, exists := rs.alphabetMap[char]; !exists { return nil, fmt.Errorf("character %c in range string %q is not found in the alphabet array", char, rangeString) diff --git a/storage/dataflux/worksteal.go b/storage/dataflux/worksteal.go index 256100976606..99d1742f242e 100644 --- a/storage/dataflux/worksteal.go +++ b/storage/dataflux/worksteal.go @@ -77,7 +77,6 @@ func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs, if err != nil { return nil, fmt.Errorf("creating new range splitter: %w", err) } - g, ctx := errgroup.WithContext(ctx) // Initialize all workers as idle. for i := 0; i < c.parallelism; i++ { @@ -126,13 +125,12 @@ func (w *worker) doWorkstealListing(ctx context.Context) error { // If a worker is idle, sleep for a while before checking the next update. // Worker status is changed to active when it finds work in range channel. if w.status == idle { - if len(w.lister.ranges) == 0 { - time.Sleep(sleepDurationWhenIdle) - continue - } else { - newRange := <-w.lister.ranges + select { + case newRange := <-w.lister.ranges: <-w.idleChannel w.updateWorker(newRange.startRange, newRange.endRange, active) + case <-time.After(sleepDurationWhenIdle): + continue } } // Active worker to list next page of objects within the range @@ -153,7 +151,7 @@ func (w *worker) doWorkstealListing(ctx context.Context) error { // If listing not complete and idle workers are available, split the range // and give half of work to idle worker. - for len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil { + if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil { // Split range and upload half of work for idle worker. splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1) if err != nil { @@ -191,7 +189,6 @@ func (w *worker) shutDownSignal() bool { w.result.mu.Unlock() alreadyListedBatchSizeObjects := w.lister.batchSize > 0 && lenResult >= w.lister.batchSize - return noMoreObjects || alreadyListedBatchSizeObjects } diff --git a/storage/dataflux/worksteal_test.go b/storage/dataflux/worksteal_test.go index ce49df36302d..d034cbbe0f66 100644 --- a/storage/dataflux/worksteal_test.go +++ b/storage/dataflux/worksteal_test.go @@ -22,7 +22,6 @@ import ( ) func TestWorkstealListingEmulated(t *testing.T) { - t.Skip("https://github.com/googleapis/google-cloud-go/issues/11205") transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client *storage.Client) { attrs := &storage.BucketAttrs{