Skip to content

Commit

Permalink
Addressed Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuj Modi committed Sep 19, 2023
1 parent d68ae44 commit 0966f1d
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -764,7 +765,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
}

if (isChecksumValidationEnabled()) {
addCheckSumHeaderForWrite(requestHeaders, buffer);
addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
}

// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
Expand Down Expand Up @@ -1011,7 +1012,7 @@ public AbfsRestOperation read(final String path, final long position, final byte
op.execute(tracingContext);

if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
verifyCheckSumForRead(buffer, op.getResult());
verifyCheckSumForRead(buffer, op.getResult(), bufferOffset);
}

return op;
Expand Down Expand Up @@ -1427,28 +1428,45 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx,
}
}

/**
* Add MD5 hash as checksum request header to the append request
* @param requestHeaders
* @param reqParams
* @param buffer
*/
private void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
final byte[] buffer) {
final AppendRequestParameters reqParams, final byte[] buffer) {
try {
MessageDigest md5Digest = MessageDigest.getInstance("MD5");
byte[] md5Bytes = md5Digest.digest(buffer);
byte[] md5Bytes = md5Digest.digest(
Arrays.copyOfRange(buffer, reqParams.getoffset(), reqParams.getLength()));
String md5Hash = Base64.getEncoder().encodeToString(md5Bytes);
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}

private void verifyCheckSumForRead(final byte[] buffer, final AbfsHttpOperation result)
/**
* T verify the checksum information received from server for the data read
* @param buffer stores the data received from server
* @param result HTTP Operation Result
* @param bufferOffset Position where data returned by server is saved in buffer
* @throws AbfsRestOperationException
*/
private void verifyCheckSumForRead(final byte[] buffer, final AbfsHttpOperation result, final int bufferOffset)
throws AbfsRestOperationException{
// Number of bytes returned by server could be less than or equal to what
// caller requests. In case it is less, extra bytes will be initialized to 0
// Server returned MD5 Hash will be computed on what server returned.
// We need to get exact data that server returned and compute its md5 hash
// Computed hash should be equal to what server returned
int numberOfBytesRead = (int)result.getBytesReceived();
if (numberOfBytesRead == 0) {
return;
}
byte[] dataRead = new byte[numberOfBytesRead];
System.arraycopy(buffer, 0, dataRead, 0, numberOfBytesRead);
System.arraycopy(buffer, bufferOffset, dataRead, 0, numberOfBytesRead);

try {
MessageDigest md5Digest = MessageDigest.getInstance(MD5);
Expand All @@ -1463,6 +1481,18 @@ private void verifyCheckSumForRead(final byte[] buffer, final AbfsHttpOperation
}
}

/**
* Conditions check for allowing checksum support for read operation
* As per the azure documentation following conditions should be met before
* sending MD5 hash as checksum header.
* https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read
* 1. Range header should be present as one of the request headers
* 2. buffer length should not exceed 4MB.
* @param requestHeaders
* @param rangeHeader
* @param bufferLength
* @return true if all conditions are met
*/
private boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeaders,
final AbfsHttpHeader rangeHeader, final int bufferLength) {
return getAbfsConfiguration().getIsChecksumValidationEnabled() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.hadoop.fs.azurebfs;

import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.HashSet;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.impl.OpenFileParameters;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;

/**
* Test For Verifying Checksum Related Operations
Expand All @@ -39,28 +45,86 @@ public ITestAzureBlobFileSystemChecksum() throws Exception {

@Test
public void testWriteReadWithChecksum() throws Exception {
testWriteReadWithChecksumInternal(true);
testWriteReadWithChecksumInternal(false);
}

private void testWriteReadWithChecksumInternal(final boolean readAheadEnabled)
throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
// Enable checksum validations for Read and Write Requests
conf.setIsChecksumValidationEnabled(true);
conf.setWriteBufferSize(4 * ONE_MB);
conf.setReadBufferSize(4 * ONE_MB);
conf.setReadAheadEnabled(readAheadEnabled);
final int datasize = 16 * ONE_MB + 1000;

Path testpath = new Path("a/b.txt");
String dataUploaded = "This is Sample Data";
FSDataOutputStream out = fs.create(testpath);
out.write(dataUploaded.getBytes(StandardCharsets.UTF_8));
Path testPath = new Path("a/b.txt");
byte[] bytesUploaded = generateRandomBytes(datasize);
FSDataOutputStream out = fs.create(testPath);
out.write(bytesUploaded);
out.hflush();
out.close();

FSDataInputStream in = fs.open(testpath);
byte[] bytesRead = new byte[dataUploaded.length()];
in.read(bytesRead);
FSDataInputStream in = fs.open(testPath);
byte[] bytesRead = new byte[bytesUploaded.length];
in.read(bytesRead, 0, datasize);

// Verify that the data read is same as data written
Assertions.assertThat(bytesRead)
.describedAs("Bytes read with checksum enabled are not as expected")
.containsExactly(dataUploaded.getBytes(StandardCharsets.UTF_8));
Assertions.assertThat(new String(bytesRead, StandardCharsets.UTF_8))
.describedAs("Data read with checksum enabled is not as expected")
.isEqualTo(dataUploaded);
.containsExactly(bytesUploaded);

// Verify that reading from random position works
in = fs.open(testPath);
bytesRead = new byte[datasize];
in.seek(ONE_MB);
in.read(bytesRead, ONE_MB, datasize - 2 * ONE_MB);
}

@Test
public void testWriteReadWithChecksumAndOptions() throws Exception {
testWriteReadWithChecksumAndOptionsInternal(true);
testWriteReadWithChecksumAndOptionsInternal(false);
}

private void testWriteReadWithChecksumAndOptionsInternal(
final boolean readAheadEnabled) throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
// Enable checksum validations for Read and Write Requests
conf.setIsChecksumValidationEnabled(true);
conf.setWriteBufferSize(8 * ONE_MB);
conf.setReadBufferSize(ONE_MB);
conf.setReadAheadEnabled(readAheadEnabled);
final int datasize = 16 * ONE_MB + 1000;

Path testPath = new Path("a/b.txt");
byte[] bytesUploaded = generateRandomBytes(datasize);
FSDataOutputStream out = fs.create(testPath);
out.write(bytesUploaded);
out.hflush();
out.close();

Configuration cpm1 = new Configuration();
cpm1.setBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, true);
FSDataInputStream in = fs.openFileWithOptions(testPath,
new OpenFileParameters().withOptions(cpm1)
.withMandatoryKeys(new HashSet<>())).get();
byte[] bytesRead = new byte[datasize];
in.read(1, bytesRead, 1, 4 * ONE_MB);

// Verify that the data read is same as data written
Assertions.assertThat(Arrays.copyOfRange(bytesRead, 1, 4 * ONE_MB))
.describedAs("Bytes read with checksum enabled are not as expected")
.containsExactly(Arrays.copyOfRange(bytesUploaded, 1, 4 * ONE_MB));
}

public static byte[] generateRandomBytes(int numBytes) {
SecureRandom secureRandom = new SecureRandom();
byte[] randomBytes = new byte[numBytes];
secureRandom.nextBytes(randomBytes);
return randomBytes;
}
}

0 comments on commit 0966f1d

Please # to comment.