Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

add BlockedHealthBalncingPolicy #648

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package me.prettyprint.cassandra.connection;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import me.prettyprint.cassandra.connection.factory.HClientFactory;
import me.prettyprint.cassandra.service.CassandraHost;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

public class BlockedHealthBalncingPolicy implements LoadBalancingPolicy {

private static final long serialVersionUID = 4216966823555634515L;
private static final Logger log = LoggerFactory.getLogger(BlockedHealthBalncingPolicy.class);
private static final int ZERO_VALUE = 0;

private final class FilteringCompare implements Comparator<HClientPool> {

public int compare(HClientPool o1, HClientPool o2) {

if ( log.isDebugEnabled() ) {
log.debug("comparing 1: {} / count {} / Blocked {} / availableClientQueue {} with 2: {} / count {} / Blocked {} / availableClientQueue {} ",
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o1.getNumBlockedThreads(), o1.getNumIdle(),
o2.getCassandraHost(), o2.getNumActive(), o2.getNumBlockedThreads(), o2.getNumIdle()});
}

if(o2.getNumIdle() == ZERO_VALUE || o2.getNumIdle() == ZERO_VALUE){
log.warn("finding 0 hector pool {}:{} /{}:{}",
new Object[]{o1.getCassandraHost(), o1.getNumIdle(), o2.getCassandraHost(), o2.getNumIdle()});
}

if(o1.getNumBlockedThreads() != o2.getNumBlockedThreads()){
return o1.getNumBlockedThreads() - o2.getNumBlockedThreads();
} else {
return o2.getNumIdle() - o1.getNumIdle();
}
}
}

public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {

List<HClientPool> hClientPoolList = Lists.newArrayList(pools);
Collections.shuffle(hClientPoolList);
Collections.sort(hClientPoolList, new FilteringCompare());

Iterator<HClientPool> iterator = hClientPoolList.iterator();
HClientPool concurrentHClientPool = iterator.next();

if ( excludeHosts != null && excludeHosts.size() > 0 ) {
while (iterator.hasNext()) {
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
break;
}
concurrentHClientPool = (ConcurrentHClientPool) iterator.next();
}
}

return concurrentHClientPool;
}

public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) {
return new ConcurrentHClientPool(clientFactory, host);
}

}