-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
refactor: improve manifest scanning organization and concurrency #252
Conversation
This refactor breaks down the manifest scanning logic into more focused components in preparation for adding incremental scanners to allow reading changelog and diffs between two snapshots. Key changes include: - Add manifestEntries type to safely collect data and delete entries concurrently - Split manifest handling into separate fetchPartitionSpecFilteredManifests and collectManifestEntries functions for better separation of concerns - Replace manual goroutine management with errgroup for more robust concurrency - Add documentation comments explaining the manifest scanning process This is a step toward adding a ManifestGroup abstraction similar to the Java implementation that can be shared among different scanner types.
g.Go(func() error { | ||
partEval := partitionEvaluators.Get(int(mf.PartitionSpecID())) | ||
manifestEntries, err := openManifest(scan.io, mf, partEval, metricsEval) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, e := range entries { | ||
for _, e := range manifestEntries { | ||
df := e.DataFile() | ||
switch df.ContentType() { | ||
case iceberg.EntryContentData: | ||
dataEntries = append(dataEntries, e) | ||
entries.addDataEntry(e) | ||
case iceberg.EntryContentPosDeletes: | ||
positionalDeleteEntries = append(positionalDeleteEntries, e) | ||
entries.addPositionalDeleteEntry(e) | ||
case iceberg.EntryContentEqDeletes: | ||
return nil, fmt.Errorf("iceberg-go does not yet support equality deletes") | ||
return fmt.Errorf("iceberg-go does not yet support equality deletes") | ||
default: | ||
return nil, fmt.Errorf("%w: unknown DataFileContent type (%s): %s", | ||
return fmt.Errorf("%w: unknown DataFileContent type (%s): %s", | ||
ErrInvalidMetadata, df.ContentType(), e) | ||
} | ||
} | ||
} | ||
return nil | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're switching from utilizing a channel and fanning out with goroutines reading from that channel to splitting out a goroutine for each manifest.
Is there any particular benefit/reason for that change beyond the simplified code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, mostly simplified code. For context I am working on adding a couple more scanners and trying to build an abstraction that would make it easy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. This seems reasonable to me and is unlikely to cause any issues I think. So I think we can move forward with this refactor. It might be worthwhile looking into adding some benchmarking to track the performance of the planning on various numbers of manifests and manifest entries so that we can keep track of it in the future.
Not something that we need for this particular change, but definitely something to look into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thanks!
This refactor breaks down the manifest scanning logic into more focused components in preparation for adding incremental scanners to allow reading changelog and diffs between two snapshots.
Key changes include:
This is a step toward adding a ManifestGroup abstraction similar to the Java implementation that can be shared among different scanner types.