Skip to content

Commit

Permalink
Caching for Views (#1171)
Browse files Browse the repository at this point in the history
  • Loading branch information
itsiggs authored Aug 30, 2024
1 parent 4747017 commit a833298
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class ParquetUtil {
public static String PARQUET_EXTENSION = ".parquet";
private final AvroConversionUtil conversionUtil;

private final Map<String, ParquetWriter<GenericRecord>> viewWriterMap;
private final Map<String, WriterWithCache> viewWriterMap;
private final Map<String, WriterWithCache> writerMap;

private final int rowGroupSize;
Expand Down Expand Up @@ -222,7 +222,7 @@ private synchronized void createWriter(String resourceType, @Nullable ViewDefini
writerMap.put(resourceType, new WriterWithCache(writer, this.cacheBundle));
} else {
writer = builder.withSchema(ViewSchema.getAvroSchema(vDef)).build();
viewWriterMap.put(vDef.getName(), writer);
viewWriterMap.put(vDef.getName(), new WriterWithCache(writer, this.cacheBundle));
}
}

Expand Down Expand Up @@ -258,6 +258,9 @@ public synchronized void emptyCache() throws IOException {
for (WriterWithCache writer : writerMap.values()) {
writer.flushCache();
}
for (WriterWithCache writer : viewWriterMap.values()) {
writer.flushCache();
}
}

/**
Expand All @@ -271,7 +274,7 @@ private synchronized void write(Resource resource, ViewDefinition vDef)
if (!viewWriterMap.containsKey(vDef.getName())) {
createWriter("", vDef);
}
final ParquetWriter<GenericRecord> parquetWriter = viewWriterMap.get(vDef.getName());
final WriterWithCache parquetWriter = viewWriterMap.get(vDef.getName());
ViewApplicator applicator = new ViewApplicator(vDef);
RowList rows = applicator.apply(resource);
List<GenericRecord> result = ViewSchema.setValueInRecord(rows, vDef);
Expand All @@ -289,7 +292,7 @@ private synchronized void flush(String resourceType) throws IOException, Profile
}

private synchronized void flushViewWriter(String viewName) throws IOException, ProfileException {
ParquetWriter<GenericRecord> writer = viewWriterMap.get(viewName);
WriterWithCache writer = viewWriterMap.get(viewName);
if (writer != null && writer.getDataSize() > 0) {
writer.close();
// TODO: We need to investigate why we need to create the writer here. If we change this logic
Expand All @@ -313,7 +316,7 @@ public synchronized void closeAllWriters() throws IOException {
if (timer != null) {
timer.cancel();
}
for (Map.Entry<String, ParquetWriter<GenericRecord>> entry : viewWriterMap.entrySet()) {
for (Map.Entry<String, WriterWithCache> entry : viewWriterMap.entrySet()) {
entry.getValue().close();
viewWriterMap.put(entry.getKey(), null);
}
Expand Down

0 comments on commit a833298

Please # to comment.