-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Small file size handling for inserts into log files. #413
Small file size handling for inserts into log files. #413
Conversation
n3nash
commented
Jun 12, 2018
•
edited
Loading
edited
- Added rolling stats to the metadata of a commit file
- Determine how many inserts vs upserts from rolling stats and hence determine if more inserts can be added to a log file.
- In case of absence of rolling stats, the total size of the log file is compared with the parquet max file size and if there is scope to add inserts the add it.
@bvaradar @vinothchandar please take a pass. |
32405eb
to
8a2bdbf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially reviewed. Will review further tomorrow
// called by jackson json lib | ||
} | ||
|
||
public HoodieRollingStat(String fileId, long inserts, long upserts, long totalInputWriteBytesOnDisk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use Avro for tracking rolling stats ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very inclined towards having an AVRO for rolling stats. The rolling stats is written to extraMetadata which is a very flexible map datastructure. Also, eventually, we are needing to have an associated java class for each avro generated class. WDYT ?
} | ||
} | ||
Assert.assertEquals(inserts, 200); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also include upsert ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return mapper; | ||
} | ||
|
||
public HoodieRollingStatMetadata merge(HoodieRollingStatMetadata rollingStatMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a simple unit-test for merge ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done via the tests of inserts + updates. Added another test for updates (which automatically tests merge). Let me know if you still feel the need for it.
|
||
private String fileId; | ||
private long inserts; | ||
private long upserts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to track deletes to accurately track number of entries in the file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, at the moment, pure deletes is not used but we should add it anyways.
this.upserts = upserts; | ||
} | ||
|
||
public long updateInserts(long inserts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: addInserts()/addUpserts()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
public static HoodieRollingStatMetadata fromBytes(byte[] bytes) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these methods be reused across HoodieRollingStatMetadata/HoodieCommitMetadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did some refactoring.
|
||
public String toJsonString() throws IOException { | ||
if (partitionToRollingStats.containsKey(null)) { | ||
log.info("partition path is null for " + partitionToRollingStats.get(null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you intend to leave this logging in there forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this can be helpful for debugging.
UpsertPartitioner(WorkloadProfile profile) { | ||
updateLocationToBucket = new HashMap<>(); | ||
partitionPathToInsertBuckets = new HashMap<>(); | ||
bucketInfoMap = new HashMap<>(); | ||
globalStat = profile.getGlobalStat(); | ||
|
||
rollingStatMetadata = getRollingStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the hoodieTable initialization reading in some data as well. How expensive is this going to make it? Can you look at places where we are doing this and assess if this is an issue
Alternatively we can selectively read it out during commit & partitioning phases alone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not done during hoodie table initialization. This is only done when a getUpsertPartitioner() is called on the table so it should be fine ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay . then its fine
@@ -89,6 +89,8 @@ | |||
private int maxBlockSize = config.getLogFileDataBlockMaxSize(); | |||
// Header metadata for a log block | |||
private Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap(); | |||
// default is false | |||
private boolean isUpsert; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this flag signify? given its solely set based on which constructor is invoked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this, it was being used to identify upserts vs inserts to correctly set writeStatus. It is done differently now (by checking currentLocation). Please take a pass.
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata | ||
.fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant | ||
.get()).get()); | ||
rollingStatMetadata = rollingStatMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a compaction runs and thus the log becomes empty now, don't we have to reset the stats? We always seem to be merging with previous stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the stats are the same irrespective of whether it's log files or parquet files. Resetting the stats could be a good size optimization since the stats will grow over time. It is tricky to erase the stats based on compaction since we have parallel compactions running. For example, say we look at the timeline and get all the compaction commits present after the last deltacommit, then we have to look up all the fileId's for all those compaction commits, if there was no new log file written for that file id while that compaction took place, then reset the stats, else keep the stats since there is a phantom parquet for which new log files were created. There are other corner cases too which I cannot think of now (but had considered this approach when I first opened the PR). If you strongly feel we should invest in the space optimization aspect, I can spend time digging deeper into this to list out the issues. Let me know.
// -------------------------------------------------------------------------------------------------- | ||
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal | ||
// -------------------------------------------------------------------------------------------------- | ||
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleting logs is bit different from deleting parquet files.. Queries can pick the log file up and may crash in between if we delete them?
In case of parquet files, they are not published to the query, so deleting works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are deleting log files for failed first commits. The queries should not pick up failed commits.
if (!index.canIndexLogFiles()) { | ||
// TODO : choose last N small files since there can be multiple small files written to a single partitions | ||
// in a single batch | ||
Optional<FileSlice> smallFile = getRTFileSystemView() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: smallFileSlice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
// TODO (NA) : Make this static part of utility | ||
protected long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some tests for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some extra cases to testUpsertPartitioner already which should cover this but I agree we should have more granular unit test here. Let me think about this.
8a2bdbf
to
d058e5b
Compare
@vinothchandar @bvaradar addressed comments, left a couple of comments on 2 concerns. Please take a pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@n3nash looks like only thing remaining is the extra test case?
@n3nash @vinothchandar : As we have been testing Async Compaction in staging without this diff for sometime now, Can we wait for async compaction diff to land first before we merge this diff. It also helps me avoid doing numerous rebases to make the Async Compaction Stacked diffs consistent. Balaji.V |
@bvaradar I'm fine waiting till your diff gets landed. |
@n3nash can you rebase this one? |
d058e5b
to
7eb9ace
Compare
…he total size of the log file is compared with the parquet max file size and if there is scope to add inserts the add it.
7eb9ace
to
704bd86
Compare
@vinothchandar rebased, please take a pass on HoodieMergeOnReadTable and HoodieAppendHandle since I've made a few new changes there. |
@n3nash can you briefly describe your delta, HoodieMergeOnReadTable is pretty big. |
|
||
// read commit file and (either append delete blocks or delete file) | ||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>(); | ||
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); | ||
|
||
// In case all data was inserts and the commit failed, delete the file belonging to that commit | ||
// We do not know fileIds for inserts (first inserts are either log files or parquet files), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Changes in comments here
|
||
private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, | ||
HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus, | ||
Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass the deletedFiles as an argument here @vinothchandar
@vinothchandar Tagged you in the places where changes have happened |
.filter(wStat -> { | ||
// Filter out stats without prevCommit since they are all inserts | ||
if (wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null | ||
&& !deletedFiles.contains(wStat.getFileId())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check deletedFiles here @vinothchandar
private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit, | ||
HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus, | ||
Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) { | ||
// The following needs to be done since GlobalIndex at the moment does not store the latest commit time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@n3nash can you compile a list of these one-offs into a ticket.. we need to fish them all out eventually.
thanks.. Assuming you have already thoroughly tested this, looks okay to me. |
Co-authored-by: Lokesh Lingarajan <lokeshlingarajan@Lokeshs-MacBook-Pro.local>