Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Explicitly handle lack of append() support during LogWriting #511

Merged
merged 1 commit into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package com.uber.hoodie.io.storage;

import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -60,20 +59,6 @@
public class HoodieWrapperFileSystem extends FileSystem {

public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
private static final Set<String> SUPPORT_SCHEMES;

static {
SUPPORT_SCHEMES = new HashSet<>();
SUPPORT_SCHEMES.add("file");
SUPPORT_SCHEMES.add("hdfs");
SUPPORT_SCHEMES.add("s3");
SUPPORT_SCHEMES.add("s3a");

// Hoodie currently relies on underlying object store being fully
// consistent so only regional buckets should be used.
SUPPORT_SCHEMES.add("gs");
SUPPORT_SCHEMES.add("viewfs");
}

private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new
ConcurrentHashMap<>();
Expand Down Expand Up @@ -104,7 +89,7 @@ private static Path convertPathWithScheme(Path oldPath, String newScheme) {

public static String getHoodieScheme(String scheme) {
String newScheme;
if (SUPPORT_SCHEMES.contains(scheme)) {
if (StorageSchemes.isSchemeSupported(scheme)) {
newScheme = HOODIE_SCHEME_PREFIX + scheme;
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.storage;

import java.util.Arrays;

/**
* All the supported storage schemes in Hoodie.
*/
public enum StorageSchemes {
// Local filesystem
FILE("file", false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be true ?

// Hadoop File System
HDFS("hdfs", true),
// Apache Ignite FS
IGNITE("igfs", true),
// AWS S3
S3A("s3a", false),
S3("s3", false),
// Google Cloud Storage
GCS("gs", false),
// View FS for federated setups. If federating across cloud stores, then append support is false
VIEWFS("viewfs", true);

private String scheme;
private boolean supportsAppend;

StorageSchemes(String scheme, boolean supportsAppend) {
this.scheme = scheme;
this.supportsAppend = supportsAppend;
}

public String getScheme() {
return scheme;
}

public boolean supportsAppend() {
return supportsAppend;
}

public static boolean isSchemeSupported(String scheme) {
return Arrays.stream(values()).filter(s -> s.getScheme().equals(scheme)).count() > 0;
}

public static boolean isAppendSupported(String scheme) {
if (!isSchemeSupported(scheme)) {
throw new IllegalArgumentException("Unsupported scheme :" + scheme);
}
return Arrays.stream(StorageSchemes.values())
.filter(s -> s.supportsAppend() && s.scheme.equals(scheme)).count() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.hoodie.common.table.log;

import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
Expand Down Expand Up @@ -68,26 +69,32 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {

Path path = logFile.getPath();
if (fs.exists(path)) {
log.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
log.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().equalsIgnoreCase("Not supported")) {
log.info("Append not supported. Opening a new log file..");
this.logFile = logFile.rollOver(fs);
createNewFile();
} else {
throw ioe;
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
log.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
log.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs);
log.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
log.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
// TODO - append a file level meta block
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.storage;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.junit.Test;

public class TestStorageSchemes {

@Test
public void testStorageSchemes() {
assertTrue(StorageSchemes.isSchemeSupported("hdfs"));
assertFalse(StorageSchemes.isSchemeSupported("s2"));
assertFalse(StorageSchemes.isAppendSupported("s3a"));
assertFalse(StorageSchemes.isAppendSupported("gs"));
assertTrue(StorageSchemes.isAppendSupported("viewfs"));
try {
StorageSchemes.isAppendSupported("s2");
fail("Should throw exception for unsupported schemes");
} catch (IllegalArgumentException ignore) {
// expected.
}
}
}