Skip to content

Commit

Permalink
Merge pull request #145 from Redick01/#143
Browse files Browse the repository at this point in the history
#143 support zookeeper config center
  • Loading branch information
magestacks authored Mar 15, 2022
2 parents cb9eece + 41383a4 commit 7cf7baa
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cn.hippo4j.example.core.zookeeper;

import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @author Redick01
* @date 2022/3/14 20:40
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class Hippo4jCoreZookeeperExampleApplication {

public static void main(String[] args) {
SpringApplication.run(Hippo4jCoreZookeeperExampleApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
server:
port: 8888

spring:
application:
name: dynamic-threadpool-zookeeper-example

dynamic:
thread-pool:
config-file-type: properties
zookeeper:
zk-connect-str: 127.0.0.1:2181
config-version: 1.0.0
root-node: /configserver/userproject
node: zookeeper-demo

#logging:
# level:
# root: DEBUG
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Export from zookeeper configuration group: [/configserver/userproject] - [1.0.0] - [zookeeper-demo].

spring.application.name=dynamic-threadpool-zookeeper-example
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=5
spring.dynamic.thread-pool.collect=true
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].core-pool-size=2
spring.dynamic.thread-pool.executors[0].execute-time-out=1000
spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
spring.dynamic.thread-pool.executors[0].maximum-pool-size=4
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true
spring.dynamic.thread-pool.executors[0].notify.receives.DING=177****6993
spring.dynamic.thread-pool.executors[0].queue-capacity=100
spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[1].blocking-queue=LinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].core-pool-size=1
spring.dynamic.thread-pool.executors[1].execute-time-out=1000
spring.dynamic.thread-pool.executors[1].keep-alive-time=6691
spring.dynamic.thread-pool.executors[1].maximum-pool-size=1
spring.dynamic.thread-pool.executors[1].notify.active-alarm=80
spring.dynamic.thread-pool.executors[1].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[1].notify.interval=8
spring.dynamic.thread-pool.executors[1].notify.is-alarm=true
spring.dynamic.thread-pool.executors[1].notify.receives.DING=177****6993
spring.dynamic.thread-pool.executors[1].queue-capacity=1
spring.dynamic.thread-pool.executors[1].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[1].thread-name-prefix=message-produce
spring.dynamic.thread-pool.executors[1].thread-pool-id=message-produce
spring.dynamic.thread-pool.notify-platforms[0].platform=DING
spring.dynamic.thread-pool.notify-platforms[0].secret-key=aab197577f6d8cc3aa8b52ee38adb6e16a46642a9c4986f5e45ca6946fdcea6f
1 change: 1 addition & 0 deletions hippo4j-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<module>hippo4j-core-nacos-spring-boot-starter-example</module>
<!-- 测试 Hippo4J-Core Apollo 配置中心 -->
<module>hippo4j-core-apollo-spring-boot-starter-example</module>
<module>hippo4j-core-zookeeper-spring-boot-starter-example</module>
</modules>

</project>
21 changes: 21 additions & 0 deletions hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.guicedee.services</groupId>
<artifactId>commons-lang3</artifactId>
<version>1.2.1.1-jre17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
*/
private Map<String, String> apollo;

/**
* Zookeeper config.
*/
private Map<String, String> zookeeper;

/**
* Tomcat thread pool config.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import cn.hippo4j.core.starter.refresher.ApolloRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosRefresherHandler;
import cn.hippo4j.core.starter.refresher.ZookeeperRefresherHandler;
import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
Expand Down Expand Up @@ -52,6 +53,8 @@ public class DynamicThreadPoolCoreAutoConfiguration {

private static final String APOLLO_CONFIG_KEY = "com.ctrip.framework.apollo.ConfigService";

private static final String ZK_CONFIG_KEY = "org.apache.curator.framework.CuratorFramework";

@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() {
Expand Down Expand Up @@ -121,4 +124,10 @@ public ApolloRefresherHandler apolloRefresher(ThreadPoolNotifyAlarmHandler threa
return new ApolloRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}

@Bean
@ConditionalOnClass(name = ZK_CONFIG_KEY)
public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
return new ZookeeperRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();

List<ExecutorProperties> executors = bootstrapCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
resultMap.putAll(buildSingleNotifyConfig(executor));
if (null !=executors) {
for (ExecutorProperties executor : executors) {
resultMap.putAll(buildSingleNotifyConfig(executor));
}
}

return resultMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.web.executor.WebThreadPoolService;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
Expand Down Expand Up @@ -75,6 +77,24 @@ public void dynamicRefresh(String content) {
refreshExecutors(bindableCoreProperties);
}

/**
* register notify alarm manage
*/
public void registerNotifyAlarmManage() {
bootstrapCoreProperties.getExecutors().forEach(executorProperties -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
executorProperties.getNotify().getIsAlarm(),
executorProperties.getNotify().getCapacityAlarm(),
executorProperties.getNotify().getActiveAlarm()
);

threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval());
threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives());
GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
});

}

/**
* Refresh web executor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package cn.hippo4j.core.starter.refresher;

import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import com.google.common.base.Charsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;

import java.util.List;
import java.util.Map;

/**
* @author Redick01
* @date 2022/3/14 16:03
*/
@Slf4j
public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh {

private CuratorFramework curatorFramework;

public ZookeeperRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, BootstrapCoreProperties bootstrapCoreProperties) {
super(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}

@Override
public void afterPropertiesSet() {
Map<String, String> zkConfigs = bootstrapCoreProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"),
new ExponentialBackoffRetry(1000, 3));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"),
zkConfigs.get("config-version")), zkConfigs.get("node"));
final ConnectionStateListener connectionStateListener = (client, newState) -> {
if (newState == ConnectionState.CONNECTED) {
loadNode(nodePath);
} else if (newState == ConnectionState.RECONNECTED) {
loadNode(nodePath);
}};

final CuratorListener curatorListener = (client, curatorEvent) -> {
final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
if (null != watchedEvent) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged:
case NodeDataChanged:
loadNode(nodePath);
break;
default:
break;
}
}};
curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
curatorFramework.getCuratorListenable().addListener(curatorListener);
curatorFramework.start();
}

/**
* load config info and refresh.
* @param nodePath zk config node path.
*/
public void loadNode(String nodePath) {
try {
final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
final List<String> children = childrenBuilder.watched().forPath(nodePath);
StringBuilder content = new StringBuilder();
children.forEach(c -> {
String n = ZKPaths.makePath(nodePath, c);
final String nodeName = ZKPaths.getNodeFromPath(n);
final GetDataBuilder data = curatorFramework.getData();
String value = "";
try {
value = new String(data.watched().forPath(n), Charsets.UTF_8);
} catch (Exception e) {
e.printStackTrace();
}
final Pair<String, String> keyValue = new ImmutablePair<>(nodeName, value);
content.append(keyValue.getKey()).append("=").append(keyValue.getValue()).append("\n");
});
dynamicRefresh(content.toString());
registerNotifyAlarmManage();
} catch (Exception e) {
log.error("load zk node error, nodePath is {}", nodePath, e);
}
}
}
Loading

0 comments on commit 7cf7baa

Please # to comment.