Skip to content

Commit

Permalink
(feat) Improve getMulti0
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Sep 26, 2016
1 parent d14fc81 commit 5b36a8c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 8 deletions.
9 changes: 4 additions & 5 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1315,13 +1315,12 @@ private final Collection<List<String>> catalogKeys(

for (String key : keyCollections) {
Session index = this.sessionLocator.getSessionByKey(key);
if (!catalogMap.containsKey(index)) {
List<String> tmpKeys = new ArrayList<String>(100);
tmpKeys.add(key);
List<String> tmpKeys = catalogMap.get(index);
if (tmpKeys == null) {
tmpKeys = new ArrayList<String>(10);
catalogMap.put(index, tmpKeys);
} else {
catalogMap.get(index).add(key);
}
tmpKeys.add(key);
}

Collection<List<String>> catalogKeys = catalogMap.values();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/rubyeye/xmemcached/impl/Optimizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public final Command optimiezeMergeBuffer(Command optimiezeCommand,
log.debug("Optimieze merge buffer:" + optimiezeCommand.toString());
}
if (this.optimiezeMergeBuffer
&& optimiezeCommand.getIoBuffer().remaining() < sendBufferSize) {
&& optimiezeCommand.getIoBuffer().remaining() < sendBufferSize - 24) {
optimiezeCommand = this.mergeBuffer(optimiezeCommand, writeQueue,
executingCmds, sendBufferSize);
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/net/rubyeye/xmemcached/utils/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public final class ByteUtils {
public static final Charset DEFAULT_CHARSET = Charset
.forName(DEFAULT_CHARSET_NAME);
public static final ByteBuffer SPLIT = ByteBuffer.wrap(Constants.CRLF);

public static final boolean ENABLE_CACHED_STRING_BYTES = Boolean
.valueOf(System.getProperty(
"xmemcached.string.bytes.cached.enable", "false"));
/**
* if it is testing,check key argument even if use binary protocol. The user
* must never change this value at all.
Expand Down Expand Up @@ -68,6 +72,9 @@ public static final byte[] getBytes(String k) {
if (k == null || k.length() == 0) {
throw new IllegalArgumentException("Key must not be blank");
}
if (ENABLE_CACHED_STRING_BYTES) {
return CachedString.getBytes(k);
}
try {
return k.getBytes(DEFAULT_CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
Expand Down
98 changes: 98 additions & 0 deletions src/main/java/net/rubyeye/xmemcached/utils/CachedString.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package net.rubyeye.xmemcached.utils;

import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.management.RuntimeErrorException;

public class CachedString {
private static ConcurrentHashMap<String, Reference<byte[]>> table = new ConcurrentHashMap();
static final ReferenceQueue rq = new ReferenceQueue();

public static byte[] getBytes(String s) {
if (s == null || s.length() == 0)
return null;
byte[] bs = null;
Reference<byte[]> existingRef = table.get(s);
if (existingRef == null) {
clearCache(rq, table);
bs = s.getBytes(ByteUtils.DEFAULT_CHARSET);
existingRef = table.putIfAbsent(s,
new WeakReference<byte[]>(bs, rq));
}
if (existingRef == null) {
return bs;
}
byte[] existingbs = existingRef.get();
if (existingbs != null) {
return existingbs;
}
// entry died in the interim, do over
table.remove(s, existingRef);
return getBytes(s);
}

static public <K, V> void clearCache(ReferenceQueue rq,
ConcurrentHashMap<K, Reference<V>> cache) {
// cleanup any dead entries
if (rq.poll() != null) {
while (rq.poll() != null)
;
for (Map.Entry<K, Reference<V>> e : cache.entrySet()) {
Reference<V> val = e.getValue();
if (val != null && val.get() == null) {
cache.remove(e.getKey(), val);
}
}
}
}

private static long testString(int keyLen) {
String k = getKey(keyLen);
long len = 0;
for (int i = 0; i < 1000; i++) {
// byte[] bs = k.getBytes(ByteUtils.DEFAULT_CHARSET);
// String nk = new String(bs, ByteUtils.DEFAULT_CHARSET);
byte[] bs = getBytes(k);
String nk = ByteUtils.getString(bs);
if (!k.equals(nk)) {
throw new RuntimeException();
}
len += nk.length();
}
return len;
}

private static String getKey(int len) {
StringBuilder sb = new StringBuilder();
String[] chars = { "a", "b", "c", "d", "e", "f", "g", "h" };
int index = (int) Math.floor(Math.random() * 8);
for (int i = 0; i < len; i++) {
sb.append(chars[index]);
}
return sb.toString();
}

public static void main(String[] args) {
long sum = 0;
for (int i = 0; i < 10000; i++) {
sum += testString(8);
}
int[] keys = { 8, 64, 128 };

for (int k : keys) {
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
sum += testString(k);
}
System.out.println("Key length=" + k + ", cost "
+ (System.currentTimeMillis() - start) + " ms.");
}
System.out.println(sum);
System.out.println(table.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.code.yanf4j.util.SimpleQueue;

@SuppressWarnings("unchecked")
public class OptimezerTest extends TestCase {
public class OptimizerTest extends TestCase {
Optimizer optimiezer;

Queue writeQueue;
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testMergeLimitBuffer() {
// set send buffer size to 30,merge four commands at most
this.optimiezer.setOptimizeMergeBuffer(true);
Command optimiezeCommand = this.optimiezer.optimiezeMergeBuffer(
this.currentCmd, this.writeQueue, this.executingCmds, 30);
this.currentCmd, this.writeQueue, this.executingCmds, 54);
assertNotSame(this.currentCmd, optimiezeCommand);
ByteBuffer mergeBuffer = optimiezeCommand.getIoBuffer().buf();
assertEquals(0, this.writeQueue.size());
Expand Down

0 comments on commit 5b36a8c

Please # to comment.