Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Feature/local method invoke #286

Merged
merged 4 commits into from
Nov 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/logs/
*/*/logs/
*/target/
bin/

# maven ignore
target/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
Expand All @@ -43,14 +44,13 @@
*/
@SpiMeta(name = "activeWeight")
public class ActiveWeightLoadBalance<T> extends AbstractLoadBalance<T> {
private static Random random = new Random();

@Override
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down Expand Up @@ -83,7 +83,7 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MathUtil;

import org.apache.commons.lang3.StringUtils;

import java.util.*;
Expand Down Expand Up @@ -200,7 +202,7 @@ Referer<T> next() {
String group = randomKeyList.get(ThreadLocalRandom.current().nextInt(randomKeySize));
AtomicInteger ai = cursors.get(group);
List<Referer<T>> referers = groupReferers.get(group);
return referers.get(ai.getAndIncrement() % referers.size());
return referers.get(MathUtil.getPositive(ai.getAndIncrement()) % referers.size());
}

// 求最大公约数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

/**
*
Expand Down Expand Up @@ -82,11 +83,13 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
}

private int getHash(Request request) {
int hashcode;
if (request.getArguments() == null || request.getArguments().length == 0) {
return 0x7fffffff & request.hashCode();
hashcode = request.hashCode();
} else {
return 0x7fffffff & Arrays.hashCode(request.getArguments());
hashcode = Arrays.hashCode(request.getArguments());
}
return MathUtil.getPositive(hashcode);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.weibo.api.motan.util.NetUtils;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

/**
* "本地服务优先" 负载均衡
Expand All @@ -42,7 +43,6 @@
@SpiMeta(name = "localFirst")
public class LocalFirstLoadBalance<T> extends AbstractLoadBalance<T> {
public static final int MAX_REFERER_COUNT = 10;
private static Random random = new Random();

public static long ipToLong(final String addr) {
final String[] addressBytes = addr.split("\\.");
Expand Down Expand Up @@ -107,7 +107,7 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
}

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.weibo.api.motan.cluster.loadbalance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
Expand All @@ -36,7 +37,7 @@ public class RandomLoadBalance<T> extends AbstractLoadBalance<T> {
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int idx = (int) (Math.random() * referers.size());
int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = referers.get((i + idx) % referers.size());
if (ref.isAvailable()) {
Expand All @@ -50,7 +51,7 @@ protected Referer<T> doSelect(Request request) {
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int idx = (int) (Math.random() * referers.size());
int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> referer = referers.get((i + idx) % referers.size());
if (referer.isAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

/**
*
Expand Down Expand Up @@ -65,6 +66,6 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)

// get positive int
private int getNextPositive() {
return 0x7fffffff & idx.incrementAndGet();
return MathUtil.getPositive(idx.incrementAndGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
@SpiMeta(name = "statistic")
public class AccessStatisticFilter implements Filter {
protected static Application RPC_SERVICES = new Application(ApplicationInfo.STATISTIC, "rpc_service");

@Override
public Response filter(Caller<?> caller, Request request) {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -68,7 +70,7 @@ public Response filter(Caller<?> caller, Request request) {
String statName =
caller.getUrl().getProtocol() + MotanConstants.PROTOCOL_SEPARATOR + MotanFrameworkUtil.getGroupMethodString(request);
if (caller instanceof Provider) {
application = new Application(ApplicationInfo.STATISTIC, "rpc_service");
application = RPC_SERVICES;
StatsUtil.accessStatistic(statName, application, end, end - start, bizProcessTime, accessStatus);
}
application = ApplicationInfo.getApplication(caller.getUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.util.ReflectUtil;

import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -135,7 +137,8 @@ public Response filter(Caller<?> caller, Request request) {
return response;
}

private Object invoke(Object clz, Method method, Object[] args, MockInfo info) throws InterruptedException, InvocationTargetException, IllegalAccessException {
private Object invoke(Object clz, Method method, Object[] args, MockInfo info) throws InterruptedException, InvocationTargetException,
IllegalAccessException {

info.callNum.addAndGet(1);

Expand All @@ -151,7 +154,7 @@ private long caclSleepTime(MockInfo info) {

long sleepTime;

int n = new Random().nextInt(1000);
int n = ThreadLocalRandom.current().nextInt(1000);

long delta = (long) (rMean - info.mean + 1);
if (n < 900) {
Expand All @@ -175,7 +178,7 @@ private long caclSleepTime(MockInfo info) {
while (info.errorRate * rate < 1) {
rate *= 10;
}
if (new Random().nextInt(rate) == 1) {
if (ThreadLocalRandom.current().nextInt(rate) == 1) {
throw new RuntimeException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.ApplicationInfo;
import com.weibo.api.motan.rpc.DefaultRequest;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.switcher.Switcher;
import com.weibo.api.motan.switcher.SwitcherService;
Expand Down Expand Up @@ -75,6 +76,12 @@ private void init() {
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(isLocalMethod(method)){
if("toString".equals(method.getName())){
return clustersToString();
}
throw new MotanServiceException("can not invoke local method:" + method.getName());
}
DefaultRequest request = new DefaultRequest();

request.setRequestId(RequestIdGenerator.getRequestId());
Expand Down Expand Up @@ -136,6 +143,35 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
+ MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);

}

/**
* tostring,equals,hashCode,finalize等接口未声明的方法不进行远程调用
* @param method
* @return
*/
public boolean isLocalMethod(Method method){
if(method.getDeclaringClass().equals(Object.class)){
try{
Method interfaceMethod = clz.getDeclaredMethod(method.getName(), method.getParameterTypes());
return false;
}catch(Exception e){
return true;
}
}
return false;
}

private String clustersToString(){
StringBuilder sb = new StringBuilder();
for(Cluster<T> cluster: clusters){
sb.append("{protocol:").append(cluster.getUrl().getProtocol());
for(Referer<T> refer : (List<Referer<T>>)cluster.getReferers()){
sb.append("[").append(refer.getUrl().toSimpleString()).append(", available:").append(refer.isAvailable()).append("]");
}
sb.append("}");
}
return sb.toString();
}

private Object getDefaultReturnValue(Class<?> returnType) {
if (returnType != null && returnType.isPrimitive()) {
Expand Down
7 changes: 6 additions & 1 deletion motan-core/src/main/java/com/weibo/api/motan/rpc/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,12 @@ public boolean canServe(URL refUrl) {
if (!version.equals(refVersion)) {
return false;
}

// check serialize
String serialize = getParameter(URLParamType.serialize.getName(), URLParamType.serialize.getValue());
String refSerialize = refUrl.getParameter(URLParamType.serialize.getName(), URLParamType.serialize.getValue());
if (!serialize.equals(refSerialize)) {
return false;
}
// 由于需要提供跨group访问rpc的能力,所以不再验证group是否一致。
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ProviderProtectedMessageRouter extends ProviderMessageRouter {
protected ConcurrentMap<String, AtomicInteger> requestCounters = new ConcurrentHashMap<String, AtomicInteger>();
protected AtomicInteger totalCounter = new AtomicInteger(0);

protected Object lock = new Object();


public ProviderProtectedMessageRouter() {
super();
Expand All @@ -73,11 +73,8 @@ protected Response call(Request request, Provider<?> provider) {

try {
int requestCounter = 0, totalCounter = 0;
synchronized (lock) {
requestCounter = incrRequestCounter(requestKey);
totalCounter = incrTotalCounter();
}

requestCounter = incrRequestCounter(requestKey);
totalCounter = incrTotalCounter();
if (isAllowRequest(requestCounter, totalCounter, maxThread, request)) {
return super.call(request, provider);
} else {
Expand All @@ -86,10 +83,8 @@ protected Response call(Request request, Provider<?> provider) {
}

} finally {
synchronized (lock) {
decrTotalCounter();
decrRequestCounter(requestKey);
}
decrTotalCounter();
decrRequestCounter(requestKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E>, java.io.Serializable {

private static final long serialVersionUID = -8672117787651310382L;

private static final Object PRESENT = new Object();

private final ConcurrentHashMap<E, Object> map;
private final ConcurrentMap<E, Object> map;

public ConcurrentHashSet() {
map = new ConcurrentHashMap<E, Object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public static int parseInt(String intStr, int defaultValue) {
return defaultValue;
}
}

/**
* return positive int value of originValue
* @param originValue
* @return positive int
*/
public static int getPositive(int originValue){
return 0x7fffffff & originValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,17 @@ public static boolean isValidAddress(InetAddress address) {
String name = address.getHostAddress();
return (name != null && !ANYHOST.equals(name) && !LOCALHOST.equals(name) && IP_PATTERN.matcher(name).matches());
}

//return ip to avoid lookup dns
public static String getHostName(SocketAddress socketAddress) {
if (socketAddress == null) {
return null;
}

if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getHostName();
InetAddress addr = ((InetSocketAddress) socketAddress).getAddress();
if(addr != null){
return addr.getHostAddress();
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void allAvailableCluster(int refererSize) {
if (refererSize <= ActiveWeightLoadBalance.MAX_REFERER_COUNT) {
Assert.assertEquals(referer.activeRefererCount(), lowActive);
} else {
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT >= referer.activeRefererCount());
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT - 1 >= referer.activeRefererCount());
}

List<Referer> referersHolder = new ArrayList<Referer>();
Expand Down Expand Up @@ -102,7 +102,7 @@ private void partOfUnAvailableCluster(int refererSize, int unAvailableSize) {
if (availableSize <= ActiveWeightLoadBalance.MAX_REFERER_COUNT) {
Assert.assertTrue(referer.activeRefererCount() - lowActive - unAvailableSize <= 0);
} else {
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT + unAvailableSize >= referer.activeRefererCount());
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT + unAvailableSize -1 >= referer.activeRefererCount());
}

List<Referer> referersHolder = new ArrayList<Referer>();
Expand Down
Loading