diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/BlockedHealthBalncingPolicy b/core/src/main/java/me/prettyprint/cassandra/connection/BlockedHealthBalncingPolicy new file mode 100644 index 000000000..71476e34d --- /dev/null +++ b/core/src/main/java/me/prettyprint/cassandra/connection/BlockedHealthBalncingPolicy @@ -0,0 +1,72 @@ +package me.prettyprint.cassandra.connection; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import me.prettyprint.cassandra.connection.factory.HClientFactory; +import me.prettyprint.cassandra.service.CassandraHost; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class BlockedHealthBalncingPolicy implements LoadBalancingPolicy { + + private static final long serialVersionUID = 4216966823555634515L; + private static final Logger log = LoggerFactory.getLogger(BlockedHealthBalncingPolicy.class); + private static final int ZERO_VALUE = 0; + + private final class FilteringCompare implements Comparator { + + public int compare(HClientPool o1, HClientPool o2) { + + if ( log.isDebugEnabled() ) { + log.debug("comparing 1: {} / count {} / Blocked {} / availableClientQueue {} with 2: {} / count {} / Blocked {} / availableClientQueue {} ", + new Object[]{o1.getCassandraHost(), o1.getNumActive(), o1.getNumBlockedThreads(), o1.getNumIdle(), + o2.getCassandraHost(), o2.getNumActive(), o2.getNumBlockedThreads(), o2.getNumIdle()}); + } + + if(o2.getNumIdle() == ZERO_VALUE || o2.getNumIdle() == ZERO_VALUE){ + log.warn("finding 0 hector pool {}:{} /{}:{}", + new Object[]{o1.getCassandraHost(), o1.getNumIdle(), o2.getCassandraHost(), o2.getNumIdle()}); + } + + if(o1.getNumBlockedThreads() != o2.getNumBlockedThreads()){ + return o1.getNumBlockedThreads() - o2.getNumBlockedThreads(); + } else { + return o2.getNumIdle() - o1.getNumIdle(); + } + } + } + + public HClientPool getPool(Collection pools, Set excludeHosts) { + + List hClientPoolList = Lists.newArrayList(pools); + Collections.shuffle(hClientPoolList); + Collections.sort(hClientPoolList, new FilteringCompare()); + + Iterator iterator = hClientPoolList.iterator(); + HClientPool concurrentHClientPool = iterator.next(); + + if ( excludeHosts != null && excludeHosts.size() > 0 ) { + while (iterator.hasNext()) { + if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) { + break; + } + concurrentHClientPool = (ConcurrentHClientPool) iterator.next(); + } + } + + return concurrentHClientPool; + } + + public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host) { + return new ConcurrentHClientPool(clientFactory, host); + } + +}