Skip to content

Commit

Permalink
Parallel write of target file.
Browse files Browse the repository at this point in the history
  • Loading branch information
bmarwell committed Jun 1, 2019
1 parent c7294aa commit df2f0dc
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 12 deletions.
54 changes: 42 additions & 12 deletions app/src/main/java/io/github/zchunk/app/commands/Unzck.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@

import io.github.zchunk.app.ZChunkFilename;
import io.github.zchunk.app.err.UncompressException;
import io.github.zchunk.compressedint.CompressedInt;
import io.github.zchunk.fileformat.ZChunk;
import io.github.zchunk.fileformat.ZChunkFile;
import io.github.zchunk.fileformat.ZChunkHeader;
import io.github.zchunk.fileformat.ZChunkHeaderChunkInfo;
import io.github.zchunk.fileformat.ZChunkHeaderIndex;
import io.github.zchunk.fileformat.util.IOUtil;
import io.github.zchunk.fileformat.util.OffsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -74,7 +78,7 @@ public Integer call() {
private int decompressFile(final ZChunkFile zChunkFile) {
final File target = getTargetFile();

try (final FileOutputStream fileOutputStream = new FileOutputStream(target)) {
try {
final File targetDir = target.getAbsoluteFile().getParentFile();
if (null == targetDir) {
throw new IllegalStateException("TargetDir Parent is null: [" + target.getAbsolutePath() + "].");
Expand All @@ -90,7 +94,7 @@ private int decompressFile(final ZChunkFile zChunkFile) {

final byte[] decompressedDict = ZChunk.getDecompressedDict(zChunkFileHeader, this.inputFile);

uncompressChunks(fileOutputStream, zChunkFileHeader, decompressedDict);
uncompressChunks(target, zChunkFileHeader, decompressedDict);

} catch (final FileNotFoundException fnfe) {
cleanPartialFile(target);
Expand All @@ -103,21 +107,47 @@ private int decompressFile(final ZChunkFile zChunkFile) {
return 0;
}

private void uncompressChunks(final FileOutputStream fileOutputStream,
private void uncompressChunks(final File targetFile,
final ZChunkHeader zChunkFileHeader,
final byte[] decompressedDict) throws IOException {
final SortedSet<ZChunkHeaderChunkInfo> chunks = zChunkFileHeader.getIndex().getChunkInfoSortedByIndex();

// TODO: This can be optimized using random access file and parallel writing.
for (final ZChunkHeaderChunkInfo chunk : chunks) {
LOG.finest("Working on chunk [" + chunk + "].");
final InputStream decompressedChunk = ZChunk.getDecompressedChunk(
zChunkFileHeader,
this.inputFile,
decompressedDict,
chunk.getCurrentIndex());
IOUtil.copy(decompressedChunk, fileOutputStream);
final RandomAccessFile accessFile = new RandomAccessFile(targetFile, "rwd");
final long totalLength = zChunkFileHeader.getIndex().getChunkInfoSortedByIndex().stream()
.map(ZChunkHeaderChunkInfo::getChunkUncompressedLength)
.mapToLong(CompressedInt::getLongValue)
.sum();
accessFile.setLength(totalLength);

CompletableFuture.allOf(
chunks.parallelStream()
.map(chunk -> CompletableFuture.runAsync(() -> writeChunk(targetFile, chunk, zChunkFileHeader, decompressedDict)))
.toArray(CompletableFuture[]::new)
).join();
}

private void writeChunk(final File targetFile,
final ZChunkHeaderChunkInfo chunk,
final ZChunkHeader zChunkFileHeader,
final byte[] decompressedDict) {
final long decompressedChunkOffset = OffsetUtil.getDecompressedChunkOffset(zChunkFileHeader.getIndex(), chunk);

try (final InputStream decompressedChunk = ZChunk.getDecompressedChunk(
zChunkFileHeader,
this.inputFile,
decompressedDict,
chunk.getCurrentIndex());
final RandomAccessFile outputStream = new RandomAccessFile(targetFile, "rw")) {
outputStream.seek(decompressedChunkOffset);
IOUtil.copy(decompressedChunk, outputStream);
} catch (final IOException ex) {
final String message = String.format("Unable to decompress chunk[%s] to File [%s] at position [%d].",
chunk,
targetFile.getAbsolutePath(),
decompressedChunkOffset);
throw new UncompressException(message, ex);
}

}

private int decompressDict(final ZChunkFile zChunkFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.github.zchunk.fileformat.util;

import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -47,4 +48,22 @@ public static int copy(final InputStream in, final OutputStream out) throws IOEx
return totalWritten;
}

public static int copy(final InputStream in, final DataOutput out) throws IOException {
final byte[] buffer = new byte[BUFFER_SIZE];
int readCount;
int totalWritten = 0;

while ((readCount = in.read(buffer)) != EOF) {
out.write(buffer, 0, readCount);
totalWritten += readCount;

if (readCount < BUFFER_SIZE) {
// end reached.
break;
}
}

return totalWritten;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.github.zchunk.fileformat.OptionalElement;
import io.github.zchunk.fileformat.ZChunkHeader;
import io.github.zchunk.fileformat.ZChunkHeaderChunkInfo;
import io.github.zchunk.fileformat.ZChunkHeaderIndex;
import io.github.zchunk.fileformat.ZChunkHeaderLead;
import io.github.zchunk.fileformat.ZChunkHeaderPreface;
import java.math.BigInteger;
import java.util.SortedSet;

public final class OffsetUtil {

Expand Down Expand Up @@ -97,4 +99,11 @@ public static long getDictOffset(final ZChunkHeader zChunkHeader) {
return getTotalHeaderSize(zChunkHeader.getLead());
}

public static long getDecompressedChunkOffset(final ZChunkHeaderIndex index, final ZChunkHeaderChunkInfo chunk) {
final SortedSet<ZChunkHeaderChunkInfo> chunks = index.getChunkInfoSortedByIndex();
return chunks.stream().limit(chunk.getCurrentIndex())
.map(ZChunkHeaderChunkInfo::getChunkUncompressedLength)
.mapToLong(CompressedInt::getLongValue)
.sum();
}
}

0 comments on commit df2f0dc

Please # to comment.