Skip to content


HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. (apache#…
Browse files Browse the repository at this point in the history
…7108). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <>
Signed-off-by: He Xiaoqiao <>
  • Loading branch information
hfutatzhanghb authored and KeeProMise committed Jan 20, 2025
1 parent 2784767 commit 720e000
Show file tree
Hide file tree
Showing 2 changed files with 467 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
import static;

Expand All @@ -49,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -69,6 +76,9 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
Expand Down Expand Up @@ -793,6 +803,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
return invokeOnNs(method, clazz, io, nss);

* Invokes the method at default namespace, if default namespace is not
* available then at the other available namespaces.
* If the namespace is unavailable, retry with other namespaces.
* Asynchronous version of invokeAtAvailableNs method.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
* @throws IOException
<T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
// If no namespace is available, throw IOException.
IOException io = new IOException("No namespace available.");

if (!nsId.isEmpty()) {
asyncTry(() -> {
getRPCClient().invokeSingle(nsId, method, clazz);

asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
throw ioe;
nss.removeIf(n -> n.getNameserviceId().equals(nsId));
invokeOnNsAsync(method, clazz, io, nss);
}, IOException.class);
} else {
// If not have default NS.
invokeOnNsAsync(method, clazz, io, nss);
return asyncReturn(clazz);

* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
Expand Down Expand Up @@ -826,6 +876,61 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
throw ioe;

* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
* Asynchronous version of invokeOnNs method.
* @param method the remote method.
* @param clazz Class for the return type.
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
* @throws IOException
<T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
if (nss.isEmpty()) {
throw ioe;

Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
asyncForEach(nsIterator, (foreach, fnInfo) -> {
String nsId = fnInfo.getNameserviceId();
LOG.debug("Invoking {} on namespace {}", method, nsId);
asyncTry(() -> {
getRPCClient().invokeSingle(nsId, method, clazz);
asyncApply(result -> {
if (result != null) {
return result;
return null;

asyncCatch((CatchFunction<T, IOException>)(ret, ex) -> {
LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex);
// Ignore the exception and try on other namespace, if the tried
// namespace is unavailable, else throw the received exception.
if (!clientProto.isUnavailableSubclusterException(ex)) {
throw ex;
return null;
}, IOException.class);

asyncApply(obj -> {
if (obj == null) {
// Couldn't get a response from any of the namespace, throw ioe.
throw ioe;
return obj;

return asyncReturn(clazz);

@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
Expand Down Expand Up @@ -877,6 +982,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
RemoteLocation getCreateLocation(final String src) throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
if (isAsync()) {
getCreateLocationAsync(src, locations);
return asyncReturn(RemoteLocation.class);
return getCreateLocation(src, locations);

Expand Down Expand Up @@ -913,6 +1022,44 @@ RemoteLocation getCreateLocation(
return createLocation;

* Get the location to create a file. It checks if the file already existed
* in one of the locations.
* Asynchronous version of getCreateLocation method.
* @param src Path of the file to check.
* @param locations Prefetched locations for the file.
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
RemoteLocation getCreateLocationAsync(
final String src, final List<RemoteLocation> locations)
throws IOException {

if (locations == null || locations.isEmpty()) {
throw new IOException("Cannot get locations to create " + src);

RemoteLocation createLocation = locations.get(0);
if (locations.size() > 1) {
asyncTry(() -> {
getExistingLocationAsync(src, locations);
asyncApply((ApplyFunction<RemoteLocation, RemoteLocation>) existingLocation -> {
if (existingLocation != null) {
LOG.debug("{} already exists in {}.", src, existingLocation);
return existingLocation;
return createLocation;
asyncCatch((o, e) -> createLocation, FileNotFoundException.class);
} else {

return asyncReturn(RemoteLocation.class);

* Gets the remote location where the file exists.
* @param src the name of file.
Expand All @@ -934,6 +1081,31 @@ private RemoteLocation getExistingLocation(String src,
return null;

* Gets the remote location where the file exists.
* Asynchronous version of getExistingLocation method.
* @param src the name of file.
* @param locations all the remote locations.
* @return the remote location of the file if it exists, else null.
* @throws IOException in case of any exception.
private RemoteLocation getExistingLocationAsync(String src,
List<RemoteLocation> locations) throws IOException {
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
locations, method, true, false, HdfsFileStatus.class);
asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> {
for (RemoteLocation loc : locations) {
if (results.get(loc) != null) {
return loc;
return null;
return asyncReturn(RemoteLocation.class);

@Override // ClientProtocol
public LastBlockWithStatus append(String src, final String clientName,
final EnumSetWritable<CreateFlag> flag) throws IOException {
Expand Down Expand Up @@ -1188,6 +1360,38 @@ public DatanodeInfo[] getDatanodeReport(
return toArray(datanodes, DatanodeInfo.class);

* Get the datanode report with a timeout.
* Asynchronous version of the getDatanodeReport method.
* @param type Type of the datanode.
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
public DatanodeInfo[] getDatanodeReportAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeReport",
new Class<?>[] {DatanodeReportType.class}, type);

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
return asyncReturn(DatanodeInfo[].class);

@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
Expand All @@ -1206,6 +1410,11 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
return getDatanodeStorageReportMap(type, true, -1);

public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
DatanodeReportType type) throws IOException {
return getDatanodeStorageReportMapAsync(type, true, -1);

* Get the list of datanodes per subcluster.
Expand Down Expand Up @@ -1238,6 +1447,42 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
return ret;

* Get the list of datanodes per subcluster.
* Asynchronous version of getDatanodeStorageReportMap method.
* @param type Type of the datanodes to get.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param timeOutMs Time out for the reply in milliseconds.
* @return nsId to datanode list.
* @throws IOException If the method cannot be invoked remotely.
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {

Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>,
Map<String, DatanodeStorageReport[]>>) results -> {
for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
String nsId = ns.getNameserviceId();
DatanodeStorageReport[] result = entry.getValue();
ret.put(nsId, result);
return ret;
return asyncReturn(ret.getClass());

@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
Expand Down Expand Up @@ -2053,6 +2298,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu
return toArray(datanodes, DatanodeInfo.class);

* Get the slow running datanodes report with a timeout.
* Asynchronous version of the getSlowDatanodeReport method.
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs)
throws IOException {

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);

return asyncReturn(DatanodeInfo[].class);

private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
Expand Down

0 comments on commit 720e000

Please # to comment.