Skip to content

Commit

Permalink
Merge pull request #206 from koo-taejin/#191
Browse files Browse the repository at this point in the history
Zookeeper does not reconnect when session expired. #191
  • Loading branch information
koo-taejin committed Mar 5, 2015
2 parents b428c21 + 8acf839 commit 5af8137
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package com.navercorp.pinpoint.web.cluster.zookeeper;

import java.io.IOException;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
Expand Down Expand Up @@ -55,13 +54,14 @@ public class ZookeeperClusterManager implements ClusterManager, Watcher {

private static final long SYNC_INTERVAL_TIME_MILLIS = 15 * 1000;

private static final String PATH_SEPERATOR = "/";

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final AtomicBoolean connected = new AtomicBoolean(false);
private final ZookeeperClient client;
private final ZookeeperClusterManagerHelper zookeeperClusterManagerHelper;

private final Object initilizeLock = new Object();

private final int retryInterval;

private final Timer timer;
Expand All @@ -71,17 +71,20 @@ public class ZookeeperClusterManager implements ClusterManager, Watcher {
private final CollectorClusterInfoRepository collectorClusterInfo = new CollectorClusterInfoRepository();

public ZookeeperClusterManager(String zookeeperAddress, int sessionTimeout, int retryInterval) throws KeeperException, IOException, InterruptedException {
this.client = new ZookeeperClient(zookeeperAddress, sessionTimeout, this, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED);
this.retryInterval = retryInterval;
// it could be better to create upon failure
this.timer = createTimer();
synchronized(initilizeLock) {
this.client = new ZookeeperClient(zookeeperAddress, sessionTimeout, this, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED);
this.retryInterval = retryInterval;
// it could be better to create upon failure
this.timer = createTimer();
this.zookeeperClusterManagerHelper = new ZookeeperClusterManagerHelper();
}
}

// Retry upon failure (1 min retry period)
// not too much overhead, just logging
@Override
public boolean registerWebCluster(String zNodeName, byte[] contents) {
String zNodePath = bindingPathAndZnode(PINPOINT_WEB_CLUSTER_PATh, zNodeName);
String zNodePath = zookeeperClusterManagerHelper.bindingPathAndZnode(PINPOINT_WEB_CLUSTER_PATh, zNodeName);

logger.info("Create Web Cluster Zookeeper UniqPath = {}", zNodePath);

Expand All @@ -97,7 +100,7 @@ public boolean registerWebCluster(String zNodeName, byte[] contents) {
return true;
}

if (!syncWebCluster(job)) {
if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, job)) {
timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS);
}

Expand All @@ -106,29 +109,34 @@ public boolean registerWebCluster(String zNodeName, byte[] contents) {

@Override
public void process(WatchedEvent event) {
logger.info("Zookeepr Event({}) ocurred.", event);
synchronized (initilizeLock) {
// wait for client variable to be assigned.
}

logger.info("Zookeepr Event({}) ocurred.", event);

KeeperState state = event.getState();
EventType eventType = event.getType();
String path = event.getPath();

boolean result = false;

// when this happens, ephemeral node disappears
// reconnects automatically, and process gets notified for all events
if (state == KeeperState.Expired) {
result = handleDisconnected();
client.reconnectWhenSessionExpired();
} else if (state == KeeperState.Expired) {
boolean result = false;
if (ZookeeperUtils.isDisconnectedEvent(event)) {
result = handleDisconnected();
} else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.None) {
result = handleConnected();
} else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeChildrenChanged) {
result = handleNodeChildrenChanged(path);
} else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeDeleted) {
result = handleNodeDeleted(path);
} else if ((state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) && eventType == EventType.NodeDataChanged) {
result = handleNodeDataChanged(path);
if (state == KeeperState.Expired) {
client.reconnectWhenSessionExpired();
}
} else if (state == KeeperState.SyncConnected || state == KeeperState.NoSyncConnected) {
if (eventType == EventType.None) {
result = handleConnected();
} else if (eventType == EventType.NodeChildrenChanged) {
result = handleNodeChildrenChanged(path);
} else if (eventType == EventType.NodeDeleted) {
result = handleNodeDeleted(path);
} else if (eventType == EventType.NodeDataChanged) {
result = handleNodeDataChanged(path);
}
}

if (result) {
Expand All @@ -152,14 +160,14 @@ private boolean handleConnected() {
if (changed) {
PushWebClusterJob job = this.job.get();
if (job != null) {
if (!syncWebCluster(job)) {
if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, job)) {
timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS);
result = false;
}
}

if (!syncCollectorCluster()) {
timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (!syncPullCollectorCluster()) {
timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
result = false;
}
} else {
Expand All @@ -171,18 +179,18 @@ private boolean handleConnected() {

private boolean handleNodeChildrenChanged(String path) {
if (PINPOINT_COLLECTOR_CLUSTER_PATH.equals(path)) {
if (syncCollectorCluster()) {
if (syncPullCollectorCluster()) {
return true;
}
timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
}

return false;
}

private boolean handleNodeDeleted(String path) {
if (path != null) {
String id = extractCollectorClusterId(path);
String id = zookeeperClusterManagerHelper.extractCollectorClusterId(path, PINPOINT_COLLECTOR_CLUSTER_PATH);
if (id != null) {
collectorClusterInfo.remove(id);
return true;
Expand All @@ -193,12 +201,12 @@ private boolean handleNodeDeleted(String path) {

private boolean handleNodeDataChanged(String path) {
if (path != null) {
String id = extractCollectorClusterId(path);
String id = zookeeperClusterManagerHelper.extractCollectorClusterId(path, PINPOINT_COLLECTOR_CLUSTER_PATH);
if (id != null) {
if (syncCollectorCluster(id)) {
if (pushCollectorClusterData(id)) {
return true;
}
timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -227,80 +235,26 @@ private Timer createTimer() {
return timer;
}

private boolean syncWebCluster(PushWebClusterJob job) {
String zNodePath = job.getZnodePath();
byte[] contents = job.getContents();

try {
if (!client.exists(zNodePath)) {
client.createPath(zNodePath);
}

// ip:port zNode naming scheme
String nodeName = client.createNode(zNodePath, contents, CreateMode.EPHEMERAL);
logger.info("Register Web Cluster Zookeeper UniqPath = {}.", zNodePath);
return true;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}

public boolean isConnected() {
return connected.get();
}

private String bindingPathAndZnode(String path, String znodeName) {
StringBuilder fullPath = new StringBuilder();

fullPath.append(path);
if (!path.endsWith(PATH_SEPERATOR)) {
fullPath.append(PATH_SEPERATOR);
}
fullPath.append(znodeName);

return fullPath.toString();
}

private String extractCollectorClusterId(String path) {
int index = path.indexOf(PINPOINT_COLLECTOR_CLUSTER_PATH);

int startPosition = index + PINPOINT_COLLECTOR_CLUSTER_PATH.length() + 1;

if (path.length() > startPosition) {
String id = path.substring(startPosition);
return id;
}

return null;
}

private boolean syncCollectorCluster() {
private boolean syncPullCollectorCluster() {
synchronized (this) {
Map<String, byte[]> map = getCollectorData();

if (map == null) {
Map<String, byte[]> map = zookeeperClusterManagerHelper.syncPullCollectorCluster(client, PINPOINT_COLLECTOR_CLUSTER_PATH);
if (Collections.EMPTY_MAP == map) {
return false;
}

for (Map.Entry<String, byte[]> entry : map.entrySet()) {
String key = entry.getKey();
byte[] value = entry.getValue();

String id = extractCollectorClusterId(key);
if (id == null) {
logger.error("Illegal Collector Path({}) finded.", key);
continue;
}
collectorClusterInfo.put(id, value);
collectorClusterInfo.put(entry.getKey(), entry.getValue());
}

return true;
}
}

private boolean syncCollectorCluster(String id) {
String path = bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, id);
private boolean pushCollectorClusterData(String id) {
String path = zookeeperClusterManagerHelper.bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, id);
synchronized (this) {
try {
byte[] data = client.getData(path, true);
Expand All @@ -318,27 +272,6 @@ private boolean syncCollectorCluster(String id) {
}
}

private Map<String, byte[]> getCollectorData() {
try {
List<String> collectorList = client.getChildren(PINPOINT_COLLECTOR_CLUSTER_PATH, true);

Map<String, byte[]> map = new HashMap<String, byte[]>();

for (String collector : collectorList) {
String node = bindingPathAndZnode(PINPOINT_COLLECTOR_CLUSTER_PATH, collector);

byte[] data = client.getData(node, true);
map.put(node, data);
}

return map;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}

return null;
}


class PushWebClusterJob implements TimerTask {
private final String znodeName;
Expand Down Expand Up @@ -380,13 +313,13 @@ public void run(Timeout timeout) throws Exception {
return;
}

if (!syncWebCluster(this)) {
if (!zookeeperClusterManagerHelper.pushWebClusterResource(client, this)) {
timer.newTimeout(this, getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
}

class FetchCollectorClusterJob implements TimerTask {
class PullCollectorClusterJob implements TimerTask {

@Override
public void run(Timeout timeout) throws Exception {
Expand All @@ -396,8 +329,8 @@ public void run(Timeout timeout) throws Exception {
return;
}

if (!syncCollectorCluster()) {
timer.newTimeout(new FetchCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (!syncPullCollectorCluster()) {
timer.newTimeout(new PullCollectorClusterJob(), SYNC_INTERVAL_TIME_MILLIS, TimeUnit.MILLISECONDS);
}
}
}
Expand Down
Loading

0 comments on commit 5af8137

Please # to comment.