diff --git a/motan-core/src/main/java/com/weibo/api/motan/cluster/loadbalance/LocalFirstLoadBalance.java b/motan-core/src/main/java/com/weibo/api/motan/cluster/loadbalance/LocalFirstLoadBalance.java index 5feb02104..479fd13c7 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/cluster/loadbalance/LocalFirstLoadBalance.java +++ b/motan-core/src/main/java/com/weibo/api/motan/cluster/loadbalance/LocalFirstLoadBalance.java @@ -69,7 +69,7 @@ protected Referer doSelect(Request request) { List> localReferers = searchLocalReferer(referers, NetUtils.getLocalAddress().getHostAddress()); - if (localReferers.isEmpty()) { + if (!localReferers.isEmpty()) { referers = localReferers; } diff --git a/motan-extension/pom.xml b/motan-extension/pom.xml index af7a5ec45..dafd23ca6 100644 --- a/motan-extension/pom.xml +++ b/motan-extension/pom.xml @@ -1,4 +1,4 @@ - + - - + 4.0.0 com.weibo @@ -19,12 +17,13 @@ motan-extension motan-extension - http://maven.apache.org + https://github.com/weibocom/motan UTF-8 pom serialization-extension + protocol-extension - + \ No newline at end of file diff --git a/motan-extension/protocol-extension/motan-protocol-yar/pom.xml b/motan-extension/protocol-extension/motan-protocol-yar/pom.xml new file mode 100644 index 000000000..6f2cddf93 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + com.weibo + protocol-extension + 0.1.3-SNAPSHOT + + motan-protocol-yar + motan-protocol-yar + https://github.com/weibocom/motan + + UTF-8 + + + + com.weibo + motan-core + ${project.parent.version} + + + com.weibo + yar-java + 0.0.1 + + + diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/AttachmentRequest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/AttachmentRequest.java new file mode 100644 index 000000000..6105ed9bc --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/AttachmentRequest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import java.util.Map; + +import com.weibo.yar.YarRequest; + +/** + * + * @Description YarRequest with attachments. rpc attachments such as auth,application can pass with + * this class. + * @author zhanglei + * @date 2016-7-26 + * + */ +public class AttachmentRequest extends YarRequest { + private Map attachments; + + public AttachmentRequest(String packagerName, String methodName, Object[] parameters) { + super(packagerName, methodName, parameters); + } + + public AttachmentRequest(long id, String packagerName, String methodName, Object[] parameters) { + super(id, packagerName, methodName, parameters); + } + + public AttachmentRequest() { + super(); + } + + public AttachmentRequest(YarRequest yarRequest, Map attachments) { + this(yarRequest.getId(), yarRequest.getPackagerName(), yarRequest.getMethodName(), yarRequest.getParameters()); + this.attachments = attachments; + } + + public Map getAttachments() { + return attachments; + } + + public void setAttachments(Map attachments) { + this.attachments = attachments; + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarExporter.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarExporter.java new file mode 100644 index 000000000..aaa06403f --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarExporter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import com.weibo.api.motan.common.URLParamType; +import com.weibo.api.motan.core.extension.ExtensionLoader; +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.rpc.AbstractExporter; +import com.weibo.api.motan.rpc.Provider; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.EndpointFactory; +import com.weibo.api.motan.transport.Server; + +/** + * + * @Description YarExporter + * @author zhanglei + * @date 2016-5-31 + * + */ +public class YarExporter extends AbstractExporter { + protected Server server; + private YarRpcProtocol yarProtocol; + + public YarExporter(URL url, Provider provider, YarRpcProtocol yarProtocol) { + super(provider, url); + EndpointFactory endpointFactory = + ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension( + url.getParameter(URLParamType.endpointFactory.getName(), "netty4yar")); + // set noheartbeat factory + String heartbeatFactory = url.getParameter(URLParamType.heartbeatFactory.getName()); + if (heartbeatFactory == null) { + url.addParameter(URLParamType.heartbeatFactory.getName(), "noHeartbeat"); + } + // FIXME to avoid parameters ambiguous in weak type language,parameters size of method with + // same name must be different. + validateInterface(provider.getInterface()); + server = endpointFactory.createServer(url, yarProtocol.initRequestRouter(url, provider)); + + } + + + @Override + public void destroy() { + server.close(); + } + + @Override + public boolean isAvailable() { + return server.isAvailable(); + } + + @Override + public void unexport() { + yarProtocol.unexport(url, provider); + } + + @Override + protected boolean doInit() { + return server.open(); + } + + protected void validateInterface(Class interfaceClazz) { + HashMap> tempMap = new HashMap>(); + for (Method m : interfaceClazz.getDeclaredMethods()) { + if (!tempMap.containsKey(m.getName())) { + List templist = new ArrayList(); + templist.add(m.getParameterTypes().length); + tempMap.put(m.getName(), templist); + } else { + List templist = tempMap.get(m.getName()); + if (templist.contains(m.getParameterTypes().length)) { + throw new MotanFrameworkException("in yar protocol, methods with same name must have different params size !"); + } else { + templist.add(m.getParameterTypes().length); + } + } + } + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarMessageRouter.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarMessageRouter.java new file mode 100644 index 000000000..b661203e8 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarMessageRouter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import java.util.concurrent.ConcurrentHashMap; + +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.exception.MotanServiceException; +import com.weibo.api.motan.rpc.Provider; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.transport.Channel; +import com.weibo.api.motan.transport.ProviderProtectedMessageRouter; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; +/** + * + * @Description yar message router + * @author zhanglei + * @date 2016-6-8 + * + */ +public class YarMessageRouter extends ProviderProtectedMessageRouter { + protected ConcurrentHashMap> providerMap = new ConcurrentHashMap>(); + + @Override + public Object handle(Channel channel, Object message) { + YarRequest yarRequest = (YarRequest) message; + + String packagerName = yarRequest.getPackagerName(); + Provider provider = providerMap.get(yarRequest.getRequestPath()); + if (provider == null) { + throw new MotanServiceException("can not find service provider. request path:" + yarRequest.getRequestPath()); + } + Class clazz = provider.getInterface(); + Request request = YarProtocolUtil.convert(yarRequest, clazz); + Response response = call(request, provider); + YarResponse yarResponse = YarProtocolUtil.convert(response, packagerName); + return yarResponse; + } + + @Override + public void addProvider(Provider provider) { + String path = YarProtocolUtil.getYarPath(provider.getInterface(), provider.getUrl()); + Provider old = providerMap.putIfAbsent(path, provider); + if (old != null) { + throw new MotanFrameworkException("duplicate yar provider"); + } + } + + @Override + public void removeProvider(Provider provider) { + String path = YarProtocolUtil.getYarPath(provider.getInterface(), provider.getUrl()); + providerMap.remove(path); + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarProtocolUtil.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarProtocolUtil.java new file mode 100644 index 000000000..c9f07e4d6 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarProtocolUtil.java @@ -0,0 +1,217 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import java.lang.reflect.Method; + +import org.apache.commons.lang3.StringUtils; + +import com.weibo.api.motan.exception.MotanBizException; +import com.weibo.api.motan.exception.MotanServiceException; +import com.weibo.api.motan.protocol.yar.annotation.YarConfig; +import com.weibo.api.motan.rpc.DefaultRequest; +import com.weibo.api.motan.rpc.DefaultResponse; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.util.ReflectUtil; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; + +/** + * + * @Description yar protocol util. + * @author zhanglei + * @date 2016-6-8 + * + */ +public class YarProtocolUtil { + + public static String getYarPath(Class interfaceClazz, URL url) { + if (interfaceClazz != null) { + YarConfig config = interfaceClazz.getAnnotation(YarConfig.class); + if (config != null && StringUtils.isNotBlank(config.path())) { + return config.path(); + } + } + // '/group/urlpath' as default + return "/" + url.getGroup() + "/" + url.getPath(); + } + + /** + * convert yar request to motan rpc request + * + * @param yarRequest + * @param interfaceClass + * @return + */ + public static Request convert(YarRequest yarRequest, Class interfaceClass) { + DefaultRequest request = new DefaultRequest(); + request.setInterfaceName(interfaceClass.getName()); + request.setMethodName(yarRequest.getMethodName()); + request.setRequestId(yarRequest.getId()); + addArguments(request, interfaceClass, yarRequest.getMethodName(), yarRequest.getParameters()); + if (yarRequest instanceof AttachmentRequest) { + request.setAttachments(((AttachmentRequest) yarRequest).getAttachments()); + } + return request; + } + + public static YarRequest convert(Request request, String packagerName) { + YarRequest yarRequest = new YarRequest(); + yarRequest.setId(request.getRequestId()); + yarRequest.setMethodName(request.getMethodName()); + yarRequest.setPackagerName(packagerName); + yarRequest.setParameters(request.getArguments()); + return yarRequest; + } + + public static Response convert(YarResponse yarResponse) { + DefaultResponse response = new DefaultResponse(); + response.setRequestId(yarResponse.getId()); + response.setValue(yarResponse.getRet()); + if (StringUtils.isNotBlank(yarResponse.getError())) { + response.setException(new MotanBizException(yarResponse.getError())); + } + + return response; + } + + public static YarResponse convert(Response response, String packagerName) { + YarResponse yarResponse = new YarResponse(); + yarResponse.setId(response.getRequestId()); + yarResponse.setPackagerName(packagerName); + if (response.getException() != null) { + if (response.getException() instanceof MotanBizException) { + yarResponse.setError(response.getException().getCause().getMessage()); + } else { + yarResponse.setError(response.getException().getMessage()); + } + } else { + yarResponse.setRet(response.getValue()); + } + + return yarResponse; + } + + /** + * add arguments + * + * @param interfaceClass + * @param methodName + * @param arguments + * @return + */ + private static void addArguments(DefaultRequest request, Class interfaceClass, String methodName, Object[] arguments) { + Method targetMethod = null; + Method[] methods = interfaceClass.getDeclaredMethods(); + for (Method m : methods) { + // FIXME parameters may be ambiguous in weak type language, temporarily by limiting the + // size of parameters with same method name to avoid. + if (m.getName().equalsIgnoreCase(methodName) && m.getParameterTypes().length == arguments.length) { + targetMethod = m; + break; + } + } + if (targetMethod == null) { + throw new MotanServiceException("cann't find request method. method name " + methodName); + } + + request.setParamtersDesc(ReflectUtil.getMethodParamDesc(targetMethod)); + + if (arguments != null && arguments.length > 0) { + Class[] argumentClazz = targetMethod.getParameterTypes(); + request.setArguments(adaptParams(targetMethod, arguments, argumentClazz)); + } + + + } + + public static YarResponse buildDefaultErrorResponse(String errMsg, String packagerName) { + YarResponse yarResponse = new YarResponse(); + yarResponse.setPackagerName(packagerName); + yarResponse.setError(errMsg); + yarResponse.setStatus("500"); + return yarResponse; + } + + + // adapt parameters to java class type + private static Object[] adaptParams(Method method, Object[] arguments, Class[] argumentClazz) { + // FIXME the real parameter type may not same with formal parameter, for instance, formal + // parameter type is int, the real parameter maybe use String in php + // any elegant way? + for (int i = 0; i < argumentClazz.length; i++) { + try { + if ("int".equals(argumentClazz[i].getName()) || "java.lang.Integer".equals(argumentClazz[i].getName())) { + if (arguments[i] == null) { + arguments[i] = 0;// default + } else if (arguments[i] instanceof String) { + arguments[i] = Integer.parseInt((String) arguments[i]); + } else if (arguments[i] instanceof Number) { + arguments[i] = ((Number) arguments[i]).intValue(); + } else { + throw new RuntimeException(); + } + } else if ("long".equals(argumentClazz[i].getName()) || "java.lang.Long".equals(argumentClazz[i].getName())) { + if (arguments[i] == null) { + arguments[i] = 0;// default + } else if (arguments[i] instanceof String) { + arguments[i] = Long.parseLong((String) arguments[i]); + } else if (arguments[i] instanceof Number) { + arguments[i] = ((Number) arguments[i]).longValue(); + } else { + throw new RuntimeException(); + } + } else if ("float".equals(argumentClazz[i].getName()) || "java.lang.Float".equals(argumentClazz[i].getName())) { + if (arguments[i] == null) { + arguments[i] = 0.0f;// default + } else if (arguments[i] instanceof String) { + arguments[i] = Float.parseFloat((String) arguments[i]); + } else if (arguments[i] instanceof Number) { + arguments[i] = ((Number) arguments[i]).floatValue(); + } else { + throw new RuntimeException(); + } + } else if ("double".equals(argumentClazz[i].getName()) || "java.lang.Double".equals(argumentClazz[i].getName())) { + if (arguments[i] == null) { + arguments[i] = 0.0f;// default + } else if (arguments[i] instanceof String) { + arguments[i] = Double.parseDouble((String) arguments[i]); + } else if (arguments[i] instanceof Number) { + arguments[i] = ((Number) arguments[i]).doubleValue(); + } else { + throw new RuntimeException(); + } + } else if ("boolean".equals(argumentClazz[i].getName()) || "java.lang.Boolean".equals(argumentClazz[i].getName())) { + if (arguments[i] instanceof Boolean) { + continue; + } + if (arguments[i] instanceof String) { + arguments[i] = Boolean.valueOf(((String) arguments[i])); + } else { + throw new RuntimeException(); + } + } + } catch (Exception e) { + throw new MotanServiceException("adapt param fail! method:" + method.toString() + ", require param:" + + argumentClazz[i].getName() + ", actual param:" + + (arguments[i] == null ? null : arguments[i].getClass().getName() + "-" + arguments[i])); + } + } + return arguments; + } + + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarReferer.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarReferer.java new file mode 100644 index 000000000..7bf8aaedc --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarReferer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import com.weibo.api.motan.rpc.AbstractReferer; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; + +public class YarReferer extends AbstractReferer { + + public YarReferer(Class clz, URL url) { + super(clz, url); + // TODO Auto-generated constructor stub + } + + @Override + public void destroy() { + // TODO Auto-generated method stub + + } + + @Override + protected Response doCall(Request request) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected boolean doInit() { + // TODO Auto-generated method stub + return false; + } + + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarRpcProtocol.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarRpcProtocol.java new file mode 100644 index 000000000..43a28e8e4 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/YarRpcProtocol.java @@ -0,0 +1,84 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import java.util.concurrent.ConcurrentHashMap; + +import com.weibo.api.motan.core.extension.SpiMeta; +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.protocol.AbstractProtocol; +import com.weibo.api.motan.rpc.Exporter; +import com.weibo.api.motan.rpc.Provider; +import com.weibo.api.motan.rpc.Referer; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.ProviderMessageRouter; +import com.weibo.api.motan.util.LoggerUtil; +import com.weibo.api.motan.util.MotanFrameworkUtil; + +/** + * + * @Description yar rpc protocol + * @author zhanglei + * @date 2016-5-25 + * + */ +@SpiMeta(name = "yar") +public class YarRpcProtocol extends AbstractProtocol { + private ConcurrentHashMap ipPort2RequestRouter = new ConcurrentHashMap(); + + @Override + protected Exporter createExporter(Provider provider, URL url) { + + return new YarExporter(url, provider, this); + } + + @Override + protected Referer createReferer(Class clz, URL url, URL serviceUrl) { + //TODO + throw new MotanFrameworkException("not yet implemented!"); + } + + public ProviderMessageRouter initRequestRouter(URL url, Provider provider) { + String ipPort = url.getServerPortStr(); + ProviderMessageRouter requestRouter = ipPort2RequestRouter.get(ipPort); + if (requestRouter == null) { + ipPort2RequestRouter.putIfAbsent(ipPort, new YarMessageRouter()); + requestRouter = ipPort2RequestRouter.get(ipPort); + } + requestRouter.addProvider(provider); + return requestRouter; + } + + public void unexport(URL url, Provider provider){ + String protocolKey = MotanFrameworkUtil.getProtocolKey(url); + String ipPort = url.getServerPortStr(); + + Exporter exporter = (Exporter) exporterMap.remove(protocolKey); + + if (exporter != null) { + exporter.destroy(); + } + + synchronized (ipPort2RequestRouter) { + ProviderMessageRouter requestRouter = ipPort2RequestRouter.get(ipPort); + + if (requestRouter != null) { + requestRouter.removeProvider(provider); + } + } + + LoggerUtil.info("yarRpcExporter unexport Success: url={}", url); + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/annotation/YarConfig.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/annotation/YarConfig.java new file mode 100644 index 000000000..b5730404b --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/protocol/yar/annotation/YarConfig.java @@ -0,0 +1,39 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.weibo.api.motan.protocol.yar.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * + * @Description yar rpc config + * @author zhanglei + * @date 2016-6-7 + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface YarConfig { + /** + * yar rpc request path + * @return path + */ + String path() default ""; + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/Netty4HttpServer.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/Netty4HttpServer.java new file mode 100644 index 000000000..2b7aa961a --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/Netty4HttpServer.java @@ -0,0 +1,198 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.http; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.stream.ChunkedWriteHandler; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.weibo.api.motan.common.ChannelState; +import com.weibo.api.motan.common.MotanConstants; +import com.weibo.api.motan.common.URLParamType; +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.AbstractServer; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.transport.TransportException; +import com.weibo.api.motan.util.LoggerUtil; +import com.weibo.api.motan.util.StatisticCallback; +import com.weibo.api.motan.util.StatsUtil; + +/** + * + * @Description netty 4 http server. + * @author zhanglei + * @date 2016-5-31 + * + */ +// TODO move to transport netty4 module +public class Netty4HttpServer extends AbstractServer implements StatisticCallback { + private MessageHandler messageHandler; + private URL url; + private Channel channel; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + public Netty4HttpServer(URL url, MessageHandler messageHandler) { + this.url = url; + this.messageHandler = messageHandler; + } + + + @Override + public boolean open() { + if (isAvailable()) { + return true; + } + if (channel != null) { + channel.close(); + } + if (bossGroup == null) { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + } + boolean shareChannel = url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue()); + // TODO max connection protect + int maxServerConnection = + url.getIntParameter(URLParamType.maxServerConnection.getName(), URLParamType.maxServerConnection.getIntValue()); + int workerQueueSize = url.getIntParameter(URLParamType.workerQueueSize.getName(), 500); + + int minWorkerThread = 0, maxWorkerThread = 0; + + if (shareChannel) { + minWorkerThread = url.getIntParameter(URLParamType.minWorkerThread.getName(), MotanConstants.NETTY_SHARECHANNEL_MIN_WORKDER); + maxWorkerThread = url.getIntParameter(URLParamType.maxWorkerThread.getName(), MotanConstants.NETTY_SHARECHANNEL_MAX_WORKDER); + } else { + minWorkerThread = + url.getIntParameter(URLParamType.minWorkerThread.getName(), MotanConstants.NETTY_NOT_SHARECHANNEL_MIN_WORKDER); + maxWorkerThread = + url.getIntParameter(URLParamType.maxWorkerThread.getName(), MotanConstants.NETTY_NOT_SHARECHANNEL_MAX_WORKDER); + } + final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(), URLParamType.maxContentLength.getIntValue()); + final NettyHttpRequestHandler handler = + new NettyHttpRequestHandler(this, messageHandler, new ThreadPoolExecutor(minWorkerThread, maxWorkerThread, 15, + TimeUnit.SECONDS, new ArrayBlockingQueue(workerQueueSize))); + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("http-decoder", new HttpRequestDecoder()); + ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(maxContentLength)); + ch.pipeline().addLast("http-encoder", new HttpResponseEncoder()); + ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); + ch.pipeline().addLast("serverHandler", handler); + } + }).option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, false); + + ChannelFuture f; + try { + f = b.bind(url.getPort()).sync(); + channel = f.channel(); + } catch (InterruptedException e) { + LoggerUtil.error("init http server fail.", e); + return false; + } + state = ChannelState.ALIVE; + StatsUtil.registryStatisticCallback(this); + LoggerUtil.info("Netty4HttpServer ServerChannel finish Open: url=" + url); + return true; + } + + @Override + public void close() { + close(0); + } + + @Override + public boolean isAvailable() { + return state.isAliveState(); + } + + + @Override + public boolean isBound() { + return channel != null && channel.isActive(); + } + + + @Override + public Response request(Request request) throws TransportException { + throw new MotanFrameworkException("Netty4HttpServer request(Request request) method unsupport: url: " + url); + } + + + @Override + public void close(int timeout) { + if (state.isCloseState()) { + LoggerUtil.info("NettyServer close fail: already close, url={}", url.getUri()); + return; + } + + if (state.isUnInitState()) { + LoggerUtil.info("NettyServer close Fail: don't need to close because node is unInit state: url={}", + url.getUri()); + return; + } + if (channel != null) { + channel.close(); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + workerGroup = null; + bossGroup = null; + } + state = ChannelState.CLOSE; + + StatsUtil.unRegistryStatisticCallback(this); + } + + + @Override + public boolean isClosed() { + return state.isCloseState(); + } + + + + @Override + public String statisticCallback() { + //TODO + return null; + } + + + @Override + public URL getUrl() { + return url; + } + + + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandler.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandler.java new file mode 100644 index 000000000..2b003a5c7 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandler.java @@ -0,0 +1,169 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.http; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpHeaders.Values; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +import java.util.concurrent.ThreadPoolExecutor; + +import com.weibo.api.motan.common.MotanConstants; +import com.weibo.api.motan.transport.Channel; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.util.LoggerUtil; +import com.weibo.api.motan.util.MotanSwitcherUtil; + +/** + * + * @Description http request handler for netty4 + * @author zhanglei + * @date 2016-5-31 + * + */ + +@Sharable +public class NettyHttpRequestHandler extends SimpleChannelInboundHandler { + public static final String BAD_REQUEST = "/bad-request"; + public static final String ROOT_PATH = "/"; + public static final String STATUS_PATH = "/rpcstatus"; + private Channel serverChannel; + private ThreadPoolExecutor threadPoolExecutor; + private MessageHandler messageHandler; + protected String swictherName = MotanConstants.REGISTRY_HEARTBEAT_SWITCHER; + + + + public NettyHttpRequestHandler(Channel serverChannel) { + this.serverChannel = serverChannel; + } + + public NettyHttpRequestHandler(Channel serverChannel, MessageHandler messageHandler) { + this.serverChannel = serverChannel; + this.messageHandler = messageHandler; + } + + public NettyHttpRequestHandler(Channel serverChannel, MessageHandler messageHandler, ThreadPoolExecutor threadPoolExecutor) { + this.serverChannel = serverChannel; + this.messageHandler = messageHandler; + this.threadPoolExecutor = threadPoolExecutor; + } + + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest httpRequest) throws Exception { + // check badRequest + if(BAD_REQUEST.equals(httpRequest.getUri())){ + sendResponse(ctx, buildDefaultResponse("bad request!", HttpResponseStatus.BAD_REQUEST)); + return; + } + + // service status + if(ROOT_PATH.equals(httpRequest.getUri()) || STATUS_PATH.equals(httpRequest.getUri())){ + if(isSwitchOpen()){// 200 + sendResponse(ctx, buildDefaultResponse("ok!", HttpResponseStatus.OK)); + }else{//503 + sendResponse(ctx, buildErrorResponse("service not available!")); + } + return; + } + + httpRequest.content().retain(); + + if (threadPoolExecutor == null) { + processHttpRequest(ctx, httpRequest); + } else { + try{ + threadPoolExecutor.execute(new Runnable() { + @Override + public void run() { + processHttpRequest(ctx, httpRequest); + } + }); + }catch(Exception e){ + LoggerUtil.error("request is rejected by threadpool!", e); + httpRequest.content().release(); + sendResponse(ctx, buildErrorResponse("request is rejected by threadpool!")); + } + } + } + + + protected void processHttpRequest(ChannelHandlerContext ctx, FullHttpRequest httpRequest) { + FullHttpResponse httpResponse = null; + try { + httpResponse = (FullHttpResponse) messageHandler.handle(serverChannel, httpRequest); + } catch (Exception e) { + LoggerUtil.error("NettyHttpHandler process http request fail.", e); + httpResponse = buildErrorResponse(e.getMessage()); + } finally { + httpRequest.content().release(); + } + sendResponse(ctx, httpResponse); + } + + private void sendResponse(ChannelHandlerContext ctx, FullHttpResponse httpResponse){ + boolean close = false; + try { + ctx.write(httpResponse); + ctx.flush(); + } catch (Exception e) { + LoggerUtil.error("NettyHttpHandler write response fail.", e); + close = true; + } finally { + // close connection + if (close || httpResponse == null || !Values.KEEP_ALIVE.equals(httpResponse.headers().get(HttpHeaders.Names.CONNECTION))) { + ctx.close(); + } + } + } + + protected FullHttpResponse buildErrorResponse(String errMsg) { + return buildDefaultResponse(errMsg, HttpResponseStatus.SERVICE_UNAVAILABLE); + } + + protected FullHttpResponse buildDefaultResponse(String msg, HttpResponseStatus status){ + FullHttpResponse errorResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.wrappedBuffer(msg + .getBytes())); + return errorResponse; + } + + /** + * is service switcher close. http status will be 503 when switcher is close + * @return + */ + protected boolean isSwitchOpen(){ + return MotanSwitcherUtil.isOpen(swictherName); + } + + + public MessageHandler getMessageHandler() { + return messageHandler; + } + + public void setMessageHandler(MessageHandler messageHandler) { + this.messageHandler = messageHandler; + } + + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NoHeartbeatFactory.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NoHeartbeatFactory.java new file mode 100644 index 000000000..94fcc2861 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/http/NoHeartbeatFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.http; + +import com.weibo.api.motan.core.extension.SpiMeta; +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.transport.HeartbeatFactory; +import com.weibo.api.motan.transport.MessageHandler; +/** + * + * @Description no heartbeatFactory + * @author zhanglei + * @date 2016-6-8 + * + */ +@SpiMeta(name = "noHeartbeat") +public class NoHeartbeatFactory implements HeartbeatFactory { + + @Override + public Request createRequest() { + throw new MotanFrameworkException("cann't create request in NoHeartbeatFactory"); + } + + @Override + public MessageHandler wrapMessageHandler(MessageHandler handler) { + return handler; + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/Netty4YarEndpointFactory.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/Netty4YarEndpointFactory.java new file mode 100644 index 000000000..4ab54aefd --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/Netty4YarEndpointFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.yar; + +import com.weibo.api.motan.core.extension.SpiMeta; +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.Client; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.transport.Server; +import com.weibo.api.motan.transport.netty4.http.Netty4HttpServer; +import com.weibo.api.motan.transport.support.AbstractEndpointFactory; + +/** + * + * @Description yar endpoint factory use netty4 + * @author zhanglei + * @date 2016-5-31 + * + */ +@SpiMeta(name = "netty4yar") +public class Netty4YarEndpointFactory extends AbstractEndpointFactory { + + @Override + protected Server innerCreateServer(URL url, MessageHandler messageHandler) { + return new Netty4HttpServer(url, new YarMessageHandlerWarpper(messageHandler)); + } + + @Override + protected Client innerCreateClient(URL url) { + // TODO + throw new MotanFrameworkException("not yet implemented!"); + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpper.java b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpper.java new file mode 100644 index 000000000..5efbd4413 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpper.java @@ -0,0 +1,126 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.yar; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpHeaders.Values; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import com.weibo.api.motan.exception.MotanFrameworkException; +import com.weibo.api.motan.protocol.yar.AttachmentRequest; +import com.weibo.api.motan.protocol.yar.YarMessageRouter; +import com.weibo.api.motan.protocol.yar.YarProtocolUtil; +import com.weibo.api.motan.transport.Channel; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.util.LoggerUtil; +import com.weibo.yar.YarProtocol; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; + +/** + * + * @Description wrapper to process yar message + * @author zhanglei + * @date 2016-5-31 + * + */ +public class YarMessageHandlerWarpper implements MessageHandler { + private YarMessageRouter orgHandler; + + public YarMessageHandlerWarpper(MessageHandler orgHandler) { + if (orgHandler == null) { + throw new MotanFrameworkException("messageHandler is null!"); + } + if (orgHandler instanceof YarMessageRouter) { + this.orgHandler = (YarMessageRouter) orgHandler; + } else { + throw new MotanFrameworkException("YarMessageHandlerWarper can not wrapper " + orgHandler.getClass().getSimpleName()); + } + + } + + + + @Override + public Object handle(Channel channel, Object message) { + FullHttpRequest httpRequest = (FullHttpRequest) message; + String uri = httpRequest.getUri(); + int index = uri.indexOf("?");// should not be null + String requestPath = uri; + Map attachments = null; + if (index > -1) { + requestPath = uri.substring(0, index); + if (index != uri.length() - 1) { + attachments = getAttachMents(uri.substring(index + 1, uri.length())); + } + } + YarResponse yarResponse = null; + String packagerName = "JSON"; + try { + ByteBuf buf = httpRequest.content(); + final byte[] contentBytes = new byte[buf.readableBytes()]; + buf.getBytes(0, contentBytes); + YarRequest yarRequest = new AttachmentRequest(YarProtocol.buildRequest(contentBytes), attachments); + yarRequest.setRequestPath(requestPath); + yarResponse = (YarResponse) orgHandler.handle(channel, yarRequest); + + } catch (Exception e) { + LoggerUtil.error("YarMessageHandlerWarpper handle yar request fail.", e); + yarResponse = YarProtocolUtil.buildDefaultErrorResponse(e.getMessage(), packagerName); + } + byte[] responseBytes; + try { + responseBytes = YarProtocol.toProtocolBytes(yarResponse); + } catch (IOException e) { + throw new MotanFrameworkException("convert yar response to bytes fail.", e); + } + FullHttpResponse httpResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseBytes)); + httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/x-www-form-urlencoded"); + httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes()); + + if (HttpHeaders.isKeepAlive(httpRequest)) { + httpResponse.headers().set(HttpHeaders.Names.CONNECTION, Values.KEEP_ALIVE); + } else { + httpResponse.headers().set(HttpHeaders.Names.CONNECTION, Values.CLOSE); + } + + return httpResponse; + } + + private Map getAttachMents(String params) { + Map map = new HashMap(); + String[] paramArray = params.split("&"); + for (String param : paramArray) { + String[] kv = param.split("="); + if (kv.length == 2) { + map.put(kv[0], kv[1]); + } else { + LoggerUtil.warn("yar attachment parse fail. uri param:" + param); + } + } + return map; + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.rpc.Protocol b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.rpc.Protocol new file mode 100644 index 000000000..1f3e41f52 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.rpc.Protocol @@ -0,0 +1,17 @@ +# +# Copyright 2009-2016 Weibo, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.weibo.api.motan.protocol.yar.YarRpcProtocol \ No newline at end of file diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.EndpointFactory b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.EndpointFactory new file mode 100644 index 000000000..08468e386 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.EndpointFactory @@ -0,0 +1,17 @@ +# +# Copyright 2009-2016 Weibo, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.weibo.api.motan.transport.netty4.yar.Netty4YarEndpointFactory \ No newline at end of file diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.HeartbeatFactory b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.HeartbeatFactory new file mode 100644 index 000000000..2b1c9f0d1 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/main/resources/META-INF/services/com.weibo.api.motan.transport.HeartbeatFactory @@ -0,0 +1,17 @@ +# +# Copyright 2009-2016 Weibo, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.weibo.api.motan.transport.netty4.http.NoHeartbeatFactory \ No newline at end of file diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarMessageRouterTest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarMessageRouterTest.java new file mode 100644 index 000000000..47cd141ae --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarMessageRouterTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.weibo.api.motan.protocol.yar.annotation.YarConfig; +import com.weibo.api.motan.rpc.DefaultProvider; +import com.weibo.api.motan.rpc.DefaultResponse; +import com.weibo.api.motan.rpc.Provider; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; + +/** + * + * @Description YarMessageRouterTest + * @author zhanglei + * @date 2016年7月27日 + * + */ +public class YarMessageRouterTest { + TestYarMessageRouter router = new TestYarMessageRouter(); + DefaultResponse response; + String requestPath = "/test/anno_path"; + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws Exception {} + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testHandle() { + response = new DefaultResponse(); + response.setValue("test"); + response.setProcessTime(1); + + Provider provider = new DefaultProvider(null, null, AnnoService.class); + router.addProvider(provider); + + YarRequest yarRequest = new YarRequest(1, "JSON", "hello", new Object[] {"params"}); + yarRequest.setRequestPath(requestPath); + + YarResponse yarResponse = (YarResponse) router.handle(null, yarRequest); + assertEquals(YarProtocolUtil.convert(response, "JSON"), yarResponse); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddProvider() { + Provider provider = new DefaultProvider(null, null, AnnoService.class); + router.addProvider(provider); + assertTrue(router.checkProvider(requestPath)); + + router.removeProvider(provider); + assertFalse(router.checkProvider(requestPath)); + + URL url = new URL("motan", "localhost", 8002, "urlpath"); + provider = new DefaultProvider(null, url, normalService.class); + router.addProvider(provider); + assertTrue(router.checkProvider(YarProtocolUtil.getYarPath(normalService.class, url))); + + router.removeProvider(provider); + assertFalse(router.checkProvider(YarProtocolUtil.getYarPath(normalService.class, url))); + } + + class TestYarMessageRouter extends YarMessageRouter { + public boolean checkProvider(String path) { + return providerMap.containsKey(path); + } + + @Override + protected Response call(Request request, Provider provider) { + return response; + } + + } + + @YarConfig(path = "/test/anno_path") + interface AnnoService { + String hello(String name); + } + + interface normalService { + String hello(String name); + } +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarProtocolUtilTest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarProtocolUtilTest.java new file mode 100644 index 000000000..5f6bdadaf --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarProtocolUtilTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.lang.reflect.Method; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.weibo.api.motan.exception.MotanBizException; +import com.weibo.api.motan.rpc.DefaultRequest; +import com.weibo.api.motan.rpc.DefaultResponse; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.util.ReflectUtil; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; + +/** + * + * @Description YarProtocolUtilTest + * @author zhanglei + * @date 2016年7月27日 + * + */ +public class YarProtocolUtilTest { + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws Exception {} + + @Test + public void testGetYarPath() { + String path = YarProtocolUtil.getYarPath(YarMessageRouterTest.AnnoService.class, null); + assertEquals("/test/anno_path", path); + URL url = new URL("motan", "localhost", 8002, "testpath"); + path = YarProtocolUtil.getYarPath(null, url); + assertEquals("/" + url.getGroup() + "/" + url.getPath(), path); + } + + @Test + public void testConvertYarRequest() throws NoSuchMethodException, SecurityException { + DefaultRequest request = new DefaultRequest(); + request.setRequestId(123); + request.setMethodName("hello"); + request.setArguments(new Object[] {"param1"}); + request.setInterfaceName(YarMessageRouterTest.AnnoService.class.getName()); + request.setParamtersDesc(ReflectUtil.getMethodParamDesc(YarMessageRouterTest.AnnoService.class.getMethod("hello", String.class))); + YarRequest yarRequest = YarProtocolUtil.convert(request, "JSON"); + assertNotNull(yarRequest); + + Request newRequest = YarProtocolUtil.convert(yarRequest, YarMessageRouterTest.AnnoService.class); + assertNotNull(newRequest); + assertEquals(request.toString(), newRequest.toString()); + } + + @Test + // test string cast primitive value + public void testConvertRequest() throws Exception { + String methodName = "testParam"; + Class[] paramClazz = new Class[] {int.class, long.class, boolean.class, float.class, double.class}; + Method method = MethodTestService.class.getDeclaredMethod(methodName, paramClazz); + final String result = "succ"; + MethodTestService service = new MethodTestService() { + @Override + public String testParam(int intParam, long longParam, boolean booleanParam, float floatParam, double doubleParam) { + return result; + } + }; + + // string + Object[] params = new Object[] {"234", "567", "true", "789.12", "678.12"}; + verifyMethodParam(MethodTestService.class, service, method, params, result); + + // number + params = new Object[] {234l, 567, false, 789.12d, 678.12f}; + verifyMethodParam(MethodTestService.class, service, method, params, result); + } + + + private void verifyMethodParam(Class interfaceClazz, T service, Method method, Object[] params, Object expectResult) + throws Exception { + YarRequest yarRequest = new YarRequest(); + yarRequest.setId(123); + yarRequest.setMethodName(method.getName()); + yarRequest.setPackagerName("JSON"); + yarRequest.setParameters(params); + + Request request = YarProtocolUtil.convert(yarRequest, interfaceClazz); + assertNotNull(request); + assertEquals(method.getName(), request.getMethodName()); + Object[] requestParams = request.getArguments(); + assertEquals(params.length, requestParams.length); + Object result = method.invoke(service, requestParams); + assertEquals(expectResult, result); + } + + @Test + public void testConvertYarResponse() { + DefaultResponse response = new DefaultResponse(); + response.setRequestId(456); + response.setValue("stringValue"); + + YarResponse yarResponse = YarProtocolUtil.convert(response, "JSON"); + assertNotNull(yarResponse); + Response newResponse = YarProtocolUtil.convert(yarResponse); + assertEquals(response.getRequestId(), newResponse.getRequestId()); + assertEquals(response.getValue(), newResponse.getValue()); + + + response.setException(new RuntimeException("test exception")); + + yarResponse = YarProtocolUtil.convert(response, "JSON"); + assertNotNull(yarResponse); + newResponse = YarProtocolUtil.convert(yarResponse); + assertEquals(response.getRequestId(), newResponse.getRequestId()); + // yarresponse的异常会转为motan业务异常 + assertEquals(new MotanBizException(response.getException().getMessage()).getMessage(), newResponse.getException().getMessage()); + + } + + @Test + public void testBuildDefaultErrorResponse() { + String errMsg = "test err"; + String packagerName = "MSGPACK"; + YarResponse response = YarProtocolUtil.buildDefaultErrorResponse(errMsg, packagerName); + assertNotNull(response); + assertEquals(errMsg, response.getError()); + assertEquals(packagerName, response.getPackagerName()); + } + + interface MethodTestService { + String testParam(int intParam, long longParam, boolean booleanParam, float floatParam, double doubleParam); + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarRpcProtocolTest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarRpcProtocolTest.java new file mode 100644 index 000000000..2113b4c63 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/protocol/yar/YarRpcProtocolTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.protocol.yar; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.weibo.api.motan.rpc.DefaultProvider; +import com.weibo.api.motan.rpc.Provider; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.transport.ProviderMessageRouter; + +/** + * + * @Description YarRpcProtocolTest + * @author zhanglei + * @date 2016年7月27日 + * + */ +public class YarRpcProtocolTest { + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws Exception {} + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testInitRequestRouter() { + YarRpcProtocol protocol = new YarRpcProtocol(); + URL url = new URL("motan", "localhost", 8002, "urlpath"); + Provider provider = new DefaultProvider(null, url, MessageHandler.class); + ProviderMessageRouter router = protocol.initRequestRouter(url, provider); + assertNotNull(router); + + URL url2 = new URL("motan", "localhost", 8003, "urlpath2"); + Provider provider2 = new DefaultProvider(null, url2, MessageHandler.class); + ProviderMessageRouter router2 = protocol.initRequestRouter(url2, provider2); + assertNotNull(router2); + assertFalse(router2.equals(router)); + + URL url3 = new URL("motan", "localhost", 8002, "urlpath3"); + Provider provider3 = new DefaultProvider(null, url3, MessageHandler.class); + ProviderMessageRouter router3 = protocol.initRequestRouter(url3, provider3); + assertNotNull(router3); + assertTrue(router3.equals(router)); + + try { + protocol.initRequestRouter(url, provider); + assertTrue(false); + } catch (Exception e) { + assertTrue(e.getMessage().contains("duplicate yar provider")); + } + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandlerTest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandlerTest.java new file mode 100644 index 000000000..ed8b2474d --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/http/NettyHttpRequestHandlerTest.java @@ -0,0 +1,152 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.http; + +import static org.junit.Assert.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +import org.jmock.Expectations; +import org.jmock.api.Invocation; +import org.jmock.integration.junit4.JUnit4Mockery; +import org.jmock.lib.action.CustomAction; +import org.jmock.lib.legacy.ClassImposteriser; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.weibo.api.motan.common.MotanConstants; +import com.weibo.api.motan.transport.Channel; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.util.MotanSwitcherUtil; + +/** + * + * @Description NettyHttpRequestHandlerTest + * @author zhanglei + * @date 2016年7月27日 + * + */ +public class NettyHttpRequestHandlerTest { + public static JUnit4Mockery mockery = null; + + @Before + public void setUp() throws Exception { + mockery = new JUnit4Mockery() { + { + setImposteriser(ClassImposteriser.INSTANCE); + } + }; + } + + @After + public void tearDown() throws Exception {} + + @Test + public void testChannelRead0() throws Exception { + final MessageHandler messageHandler = mockery.mock(MessageHandler.class); + final ChannelHandlerContext ctx = mockery.mock(ChannelHandlerContext.class); + final FullHttpResponse response = mockery.mock(FullHttpResponse.class); + mockery.checking(new Expectations() { + { + allowing(ctx).write(with(any(FullHttpResponse.class))); + will(new CustomAction("verify") { + @Override + public Object invoke(Invocation invocation) throws Throwable { + FullHttpResponse actualResponse = (FullHttpResponse) invocation.getParameter(0); + assertNotNull(actualResponse); + assertEquals(response, actualResponse); + return null; + } + + }); + allowing(ctx).flush(); + will(returnValue(null)); + allowing(ctx).close(); + will(returnValue(null)); + + atLeast(1).of(messageHandler).handle(with(any(Channel.class)), with(anything())); + will(returnValue(response)); + allowing(response).headers(); + will(returnValue(new DefaultHttpHeaders())); + } + }); + FullHttpRequest httpRequest = buildHttpRequest("anyPath"); + NettyHttpRequestHandler handler = new NettyHttpRequestHandler(null, messageHandler); + handler.channelRead0(ctx, httpRequest); + } + + @Test + public void testServerStatus() throws Exception { + final MessageHandler messageHandler = mockery.mock(MessageHandler.class); + final ChannelHandlerContext ctx = mockery.mock(ChannelHandlerContext.class); + mockery.checking(new Expectations() { + { + allowing(ctx).write(with(any(FullHttpResponse.class))); + will(new CustomAction("verify") { + @Override + public Object invoke(Invocation invocation) throws Throwable { + verifyStatus((FullHttpResponse) invocation.getParameter(0)); + return null; + } + + }); + allowing(ctx).flush(); + will(returnValue(null)); + allowing(ctx).close(); + will(returnValue(null)); + + allowing(messageHandler).handle(with(any(Channel.class)), with(anything())); + will(returnValue(null)); + } + }); + + FullHttpRequest httpRequest = buildHttpRequest(NettyHttpRequestHandler.ROOT_PATH); + NettyHttpRequestHandler handler = new NettyHttpRequestHandler(null, messageHandler); + + // 关闭心跳开关 + MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, false); + handler.channelRead0(ctx, httpRequest); + + // 打开心跳开关 + MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true); + handler.channelRead0(ctx, httpRequest); + + } + + + private void verifyStatus(FullHttpResponse response) { + if (MotanSwitcherUtil.isOpen(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER)) { + assertEquals(HttpResponseStatus.OK, response.getStatus()); + } else { + assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus()); + } + } + + private FullHttpRequest buildHttpRequest(String requestPath) throws Exception { + PooledByteBufAllocator allocator = new PooledByteBufAllocator(); + ByteBuf buf = allocator.buffer(0); + FullHttpRequest httpReqeust = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, requestPath, buf); + return httpReqeust; + } + +} diff --git a/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpperTest.java b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpperTest.java new file mode 100644 index 000000000..793878d94 --- /dev/null +++ b/motan-extension/protocol-extension/motan-protocol-yar/src/test/java/com/weibo/api/motan/transport/netty4/yar/YarMessageHandlerWarpperTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2009-2016 Weibo, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.weibo.api.motan.transport.netty4.yar; + +import static org.junit.Assert.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.weibo.api.motan.protocol.yar.AttachmentRequest; +import com.weibo.api.motan.protocol.yar.YarMessageRouter; +import com.weibo.api.motan.protocol.yar.YarProtocolUtil; +import com.weibo.api.motan.rpc.DefaultResponse; +import com.weibo.api.motan.rpc.Request; +import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.transport.Channel; +import com.weibo.api.motan.transport.MessageHandler; +import com.weibo.api.motan.transport.TransportException; +import com.weibo.api.motan.transport.netty4.http.Netty4HttpServer; +import com.weibo.yar.YarProtocol; +import com.weibo.yar.YarRequest; +import com.weibo.yar.YarResponse; + +/** + * + * @Description YarMessageHandlerWarpperTest + * @author zhanglei + * @date 2016年7月27日 + * + */ +public class YarMessageHandlerWarpperTest { + public String uri = "/testpath?param1=a¶m2=b¶m3=c"; + + @Before + public void setUp() throws Exception {} + + @After + public void tearDown() throws Exception {} + + @Test + public void testHandle() throws Exception { + YarRequest yarRequest = new YarRequest(123, "JSON", "testmethod", new Object[] {"params", 456}); + final YarResponse yarResponse = YarProtocolUtil.buildDefaultErrorResponse("test err", "JSON"); + YarMessageHandlerWarpper handler = new YarMessageHandlerWarpper(new YarMessageRouter() { + + @Override + public Object handle(Channel channel, Object message) { + AttachmentRequest request = (AttachmentRequest) message; + verifyAttachments(request.getAttachments()); + return yarResponse; + } + }); + FullHttpResponse httpResponse = (FullHttpResponse) handler.handle(new MockChannel(), buildHttpRequest(yarRequest, uri)); + + assertNotNull(httpResponse); + assertNotNull(httpResponse.content()); + YarResponse retYarResponse = getYarResponse(httpResponse); + assertNotNull(retYarResponse); + assertEquals(yarResponse, retYarResponse); + } + + @Test + public void testAbnormal() throws Exception { + final String errmsg = "rpc process error"; + YarMessageHandlerWarpper handler = new YarMessageHandlerWarpper(new YarMessageRouter() { + + @Override + public Object handle(Channel channel, Object message) { + throw new RuntimeException(errmsg); + } + }); + // yar协议无法解析 + FullHttpResponse httpResponse = (FullHttpResponse) handler.handle(new MockChannel(), buildHttpRequest(null, uri)); + assertNotNull(httpResponse); + assertEquals(HttpResponseStatus.OK, httpResponse.getStatus()); + YarResponse retYarResponse = getYarResponse(httpResponse); + assertNotNull(retYarResponse); + assertNotNull(retYarResponse.getError()); + + // yar协议可以正常解析,但后续处理异常 + YarRequest yarRequest = new YarRequest(123, "JSON", "testmethod", new Object[] {"params", 456}); + httpResponse = (FullHttpResponse) handler.handle(new MockChannel(), buildHttpRequest(yarRequest, uri)); + assertNotNull(httpResponse); + assertEquals(HttpResponseStatus.OK, httpResponse.getStatus()); + retYarResponse = getYarResponse(httpResponse); + assertNotNull(retYarResponse); + assertEquals(errmsg, retYarResponse.getError()); + } + + private void verifyAttachments(Map attachments) { + String[] params = uri.substring(uri.indexOf("?") + 1).split("&"); + for (String param : params) { + String k = param.split("=")[0]; + String v = param.split("=")[1]; + assertTrue(attachments.containsKey(k)); + assertEquals(v, attachments.get(k)); + } + } + + private YarResponse getYarResponse(FullHttpResponse httpResponse) throws Exception { + ByteBuf buf = httpResponse.content(); + final byte[] contentBytes = new byte[buf.readableBytes()]; + buf.getBytes(0, contentBytes); + YarResponse yarResponse = YarProtocol.buildResponse(contentBytes); + return yarResponse; + } + + private FullHttpRequest buildHttpRequest(YarRequest yarRequest, String requestPath) throws Exception { + PooledByteBufAllocator allocator = new PooledByteBufAllocator(); + ByteBuf buf = allocator.buffer(2048, 1024 * 1024); + if (yarRequest != null) { + buf.writeBytes(YarProtocol.toProtocolBytes(yarRequest)); + } + FullHttpRequest httpReqeust = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, requestPath, buf); + return httpReqeust; + } + + class MockChannel extends Netty4HttpServer { + public MockChannel() { + super(null, null); + } + + public MockChannel(URL url, MessageHandler messageHandler) { + super(url, messageHandler); + } + + @Override + public Response request(Request request) throws TransportException { + return new DefaultResponse(); + } + } + +} diff --git a/motan-extension/protocol-extension/pom.xml b/motan-extension/protocol-extension/pom.xml new file mode 100644 index 000000000..7449120c0 --- /dev/null +++ b/motan-extension/protocol-extension/pom.xml @@ -0,0 +1,28 @@ + + + + 4.0.0 + + com.weibo + motan-extension + 0.1.3-SNAPSHOT + + protocol-extension + protocol-extension + https://github.com/weibocom/motan + + UTF-8 + + pom + + motan-protocol-yar + + \ No newline at end of file diff --git a/motan-extension/serialization-extension/pom.xml b/motan-extension/serialization-extension/pom.xml index 27383ddbc..89c33339f 100644 --- a/motan-extension/serialization-extension/pom.xml +++ b/motan-extension/serialization-extension/pom.xml @@ -19,7 +19,7 @@ serialization-extension serialization-extension - http://maven.apache.org + https://github.com/weibocom/motan UTF-8