From 8acf83923f318ade56e3aec967687c48db23ff4f Mon Sep 17 00:00:00 2001 From: kr14910 Date: Thu, 5 Mar 2015 09:47:38 +0900 Subject: [PATCH] Zookeeper does not reconnect when session expired. #191 - Fixed : Local variable referenced before assignment. --- .../zookeeper/ZookeeperClusterManager.java | 173 ++++++------------ .../ZookeeperClusterManagerHelper.java | 134 ++++++++++++++ .../web/cluster/zookeeper/ZookeeperUtils.java | 63 +++++++ 3 files changed, 250 insertions(+), 120 deletions(-) create mode 100644 web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManagerHelper.java create mode 100644 web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperUtils.java diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManager.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManager.java index 92f3766a3a26..8a283539b64c 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManager.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManager.java @@ -17,14 +17,13 @@ package com.navercorp.pinpoint.web.cluster.zookeeper; import java.io.IOException; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -55,13 +54,14 @@ public class ZookeeperClusterManager implements ClusterManager, Watcher { private static final long SYNC_INTERVAL_TIME_MILLIS = 15 * 1000; - private static final String PATH_SEPERATOR = "/"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final AtomicBoolean connected = new AtomicBoolean(false); private final ZookeeperClient client; + private final ZookeeperClusterManagerHelper zookeeperClusterManagerHelper; + private final Object initilizeLock = new Object(); + private final int retryInterval; private final Timer timer; @@ -71,17 +71,20 @@ public class ZookeeperClusterManager implements ClusterManager, Watcher { private final CollectorClusterInfoRepository collectorClusterInfo = new CollectorClusterInfoRepository(); public ZookeeperClusterManager(String zookeeperAddress, int sessionTimeout, int retryInterval) throws KeeperException, IOException, InterruptedException { - this.client = new ZookeeperClient(zookeeperAddress, sessionTimeout, this, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); - this.retryInterval = retryInterval; - // it could be better to create upon failure - this.timer = createTimer(); + synchronized(initilizeLock) { + this.client = new ZookeeperClient(zookeeperAddress, sessionTimeout, this, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); + this.retryInterval = retryInterval; + // it could be better to create upon failure + this.timer = createTimer(); + this.zookeeperClusterManagerHelper = new ZookeeperClusterManagerHelper(); + } } // Retry upon failure (1 min retry period) // not too much overhead, just logging @Override public boolean registerWebCluster(String zNodeName, byte[] contents) { - String zNodePath = bindingPathAndZnode(PINPOINT_WEB_CLUSTER_PATh, zNodeName); + String zNodePath = zookeeperClusterManagerHelper.bindingPathAndZnode(PINPOINT_WEB_CLUSTER_PATh, zNodeName); logger.info("Create Web Cluster Zookeeper UniqPath = {}", zNodePath); @@ -97,7 +100,7 @@ public boolean registerWebCluster(String zNodeName, byte[] contents) { return true; } - if (!syncWebCluster(job)) { + if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, job)) { timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS); } @@ -106,29 +109,34 @@ public boolean registerWebCluster(String zNodeName, byte[] contents) { @Override public void process(WatchedEvent event) { - logger.info("Zookeepr Event({}) ocurred.", event); + synchronized (initilizeLock) { + // wait for client variable to be assigned. + } + logger.info("Zookeepr Event({}) ocurred.", event); + KeeperState state = event.getState(); EventType eventType = event.getType(); String path = event.getPath(); - boolean result = false; - // when this happens, ephemeral node disappears // reconnects automatically, and process gets notified for all events - if (state == KeeperState.Expired) { - result = handleDisconnected(); - client.reconnectWhenSessionExpired(); - } else if (state == KeeperState.Expired) { + boolean result = false; + if (ZookeeperUtils.isDisconnectedEvent(event)) { result = handleDisconnected(); - } else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.None) { - result = handleConnected(); - } else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeChildrenChanged) { - result = handleNodeChildrenChanged(path); - } else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeDeleted) { - result = handleNodeDeleted(path); - } else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeDataChanged) { - result = handleNodeDataChanged(path); + if (state == KeeperState.Expired) { + client.reconnectWhenSessionExpired(); + } + } else if (state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) { + if (eventType == EventType.None) { + result = handleConnected(); + } else if (eventType == EventType.NodeChildrenChanged) { + result = handleNodeChildrenChanged(path); + } else if (eventType == EventType.NodeDeleted) { + result = handleNodeDeleted(path); + } else if (eventType == EventType.NodeDataChanged) { + result = handleNodeDataChanged(path); + } } if (result) { @@ -152,14 +160,14 @@ private boolean handleConnected() { if (changed) { PushWebClusterJob job = this.job.get(); if (job != null) { - if (!syncWebCluster(job)) { + if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, job)) { timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS); result = false; } } - if (!syncCollectorCluster()) { - timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (!syncPullCollectorCluster()) { + timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); result = false; } } else { @@ -171,10 +179,10 @@ private boolean handleConnected() { private boolean handleNodeChildrenChanged(String path) { if (PINPOINT_COLLECTOR_CLUSTER_PATH.equals(path)) { - if (syncCollectorCluster()) { + if (syncPullCollectorCluster()) { return true; } - timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); + timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); } return false; @@ -182,7 +190,7 @@ private boolean handleNodeChildrenChanged(String path) { private boolean handleNodeDeleted(String path) { if (path != null) { - String id = extractCollectorClusterId(path); + String id = zookeeperClusterManagerHelper.extractCollectorClusterId(path, PINPOINT_COLLECTOR_CLUSTER_PATH); if (id != null) { collectorClusterInfo.remove(id); return true; @@ -193,12 +201,12 @@ private boolean handleNodeDeleted(String path) { private boolean handleNodeDataChanged(String path) { if (path != null) { - String id = extractCollectorClusterId(path); + String id = zookeeperClusterManagerHelper.extractCollectorClusterId(path, PINPOINT_COLLECTOR_CLUSTER_PATH); if (id != null) { - if (syncCollectorCluster(id)) { + if (pushCollectorClusterData(id)) { return true; } - timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); + timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); } } @@ -227,80 +235,26 @@ private Timer createTimer() { return timer; } - private boolean syncWebCluster(PushWebClusterJob job) { - String zNodePath = job.getZnodePath(); - byte[] contents = job.getContents(); - - try { - if (!client.exists(zNodePath)) { - client.createPath(zNodePath); - } - - // ip:port zNode naming scheme - String nodeName = client.createNode(zNodePath, contents, CreateMode.EPHEMERAL); - logger.info("Register Web Cluster Zookeeper UniqPath = {}.", zNodePath); - return true; - } catch (Exception e) { - logger.warn(e.getMessage(), e); - } - return false; - } - public boolean isConnected() { return connected.get(); } - private String bindingPathAndZnode(String path, String znodeName) { - StringBuilder fullPath = new StringBuilder(); - - fullPath.append(path); - if (!path.endsWith(PATH_SEPERATOR)) { - fullPath.append(PATH_SEPERATOR); - } - fullPath.append(znodeName); - - return fullPath.toString(); - } - - private String extractCollectorClusterId(String path) { - int index = path.indexOf(PINPOINT_COLLECTOR_CLUSTER_PATH); - - int startPosition = index + PINPOINT_COLLECTOR_CLUSTER_PATH.length() + 1; - - if (path.length() > startPosition) { - String id = path.substring(startPosition); - return id; - } - - return null; - } - - private boolean syncCollectorCluster() { + private boolean syncPullCollectorCluster() { synchronized (this) { - Map map = getCollectorData(); - - if (map == null) { + Map map = zookeeperClusterManagerHelper.syncPullCollectorCluster(client, PINPOINT_COLLECTOR_CLUSTER_PATH); + if (Collections.EMPTY_MAP == map) { return false; } - + for (Map.Entry entry : map.entrySet()) { - String key = entry.getKey(); - byte[] value = entry.getValue(); - - String id = extractCollectorClusterId(key); - if (id == null) { - logger.error("Illegal Collector Path({}) finded.", key); - continue; - } - collectorClusterInfo.put(id, value); + collectorClusterInfo.put(entry.getKey(), entry.getValue()); } - return true; } } - private boolean syncCollectorCluster(String id) { - String path = bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, id); + private boolean pushCollectorClusterData(String id) { + String path = zookeeperClusterManagerHelper.bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, id); synchronized (this) { try { byte[] data = client.getData(path, true); @@ -318,27 +272,6 @@ private boolean syncCollectorCluster(String id) { } } - private Map getCollectorData() { - try { - List collectorList = client.getChildren(PINPOINT_COLLECTOR_CLUSTER_PATH, true); - - Map map = new HashMap(); - - for (String collector : collectorList) { - String node = bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, collector); - - byte[] data = client.getData(node, true); - map.put(node, data); - } - - return map; - } catch (Exception e) { - logger.warn(e.getMessage(), e); - } - - return null; - } - class PushWebClusterJob implements TimerTask { private final String znodeName; @@ -380,13 +313,13 @@ public void run(Timeout timeout) throws Exception { return; } - if (!syncWebCluster(this)) { + if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, this)) { timer.newTimeout(this, getRetryInterval(), TimeUnit.MILLISECONDS); } } } - class FetchCollectorClusterJob implements TimerTask { + class PullCollectorClusterJob implements TimerTask { @Override public void run(Timeout timeout) throws Exception { @@ -396,8 +329,8 @@ public void run(Timeout timeout) throws Exception { return; } - if (!syncCollectorCluster()) { - timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (!syncPullCollectorCluster()) { + timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS); } } } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManagerHelper.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManagerHelper.java new file mode 100644 index 000000000000..8d54d42bbe55 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterManagerHelper.java @@ -0,0 +1,134 @@ +/* + * Copyright 2014 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.web.cluster.zookeeper; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.navercorp.pinpoint.web.cluster.zookeeper.ZookeeperClusterManager.PushWebClusterJob; + +/** + * @author Taejin Koo + */ +public class ZookeeperClusterManagerHelper { + + private static final String PATH_SEPERATOR = "/"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public ZookeeperClusterManagerHelper() { + } + + Map getCollectorData(ZookeeperClient client, String path) { + try { + List collectorList = client.getChildren(path, true); + + Map map = new HashMap(); + + for (String collector : collectorList) { + String node = bindingPathAndZnode(path, collector); + + byte[] data = client.getData(node, true); + map.put(node, data); + } + + return map; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + + return null; + } + + String bindingPathAndZnode(String path, String znodeName) { + StringBuilder fullPath = new StringBuilder(); + + fullPath.append(path); + if (!path.endsWith(PATH_SEPERATOR)) { + fullPath.append(PATH_SEPERATOR); + } + fullPath.append(znodeName); + + return fullPath.toString(); + } + + String extractCollectorClusterId(String path, String collectorClusterPath) { + int index = path.indexOf(collectorClusterPath); + + int startPosition = index + collectorClusterPath.length() + 1; + + if (path.length() > startPosition) { + String id = path.substring(startPosition); + return id; + } + + return null; + } + + boolean pushWebClusterResource(ZookeeperClient client, PushWebClusterJob job) { + if (job == null) { + return false; + } + + String zNodePath = job.getZnodePath(); + byte[] contents = job.getContents(); + + try { + if (!client.exists(zNodePath)) { + client.createPath(zNodePath); + } + + // ip:port zNode naming scheme + String nodeName = client.createNode(zNodePath, contents, CreateMode.EPHEMERAL); + logger.info("Register Web Cluster Zookeeper UniqPath = {}.", zNodePath); + return true; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + return false; + } + + Map syncPullCollectorCluster(ZookeeperClient client, String path) { + Map map = getCollectorData(client, path); + + if (map == null) { + return Collections.emptyMap(); + } + + Map result = new HashMap(); + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + byte[] value = entry.getValue(); + + String id = extractCollectorClusterId(key, path); + if (id == null) { + logger.error("Illegal Collector Path({}) finded.", key); + continue; + } + result.put(id, value); + } + + return result; + } + +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperUtils.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperUtils.java new file mode 100644 index 000000000000..e3e7a3f18e43 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright 2014 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.web.cluster.zookeeper; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +/** + * @author Taejin Koo + */ +public final class ZookeeperUtils { + + // would be a good idea to move to commons-hbase (if implemented) in the future + private ZookeeperUtils() { + } + + public static boolean isConnectedEvent(WatchedEvent event) { + KeeperState state = event.getState(); + EventType eventType = event.getType(); + + return isConnectedEvent(state, eventType); + } + + public static boolean isConnectedEvent(KeeperState state, EventType eventType) { + if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.None) { + return true; + } else { + return false; + } + } + + + public static boolean isDisconnectedEvent(WatchedEvent event) { + KeeperState state = event.getState(); + EventType eventType = event.getType(); + + return isDisconnectedEvent(state, eventType); + } + + public static boolean isDisconnectedEvent(KeeperState state, EventType eventType) { + if ((state == KeeperState.Disconnected || state == KeeperState.Expired) && eventType == eventType.None) { + return true; + } else { + return false; + } + } + +}