Skip to content

Commit

Permalink
Attempt to fix OpenTSDB#823 by using a proper concurrent byte map.
Browse files Browse the repository at this point in the history
  • Loading branch information
manolama committed Jul 13, 2016
1 parent ef4905a commit 8b1368a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 8 deletions.
16 changes: 8 additions & 8 deletions src/core/SaltScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -28,6 +29,7 @@
import net.opentsdb.stats.QueryStats;
import net.opentsdb.stats.QueryStats.QueryStat;
import net.opentsdb.uid.UniqueId;
import net.opentsdb.utils.ConcurrentByteMap;
import net.opentsdb.utils.DateTime;

import org.hbase.async.Bytes.ByteMap;
Expand Down Expand Up @@ -72,9 +74,8 @@ public class SaltScanner {
new ConcurrentHashMap<Integer, List<KeyValue>>();

/** Stores annotations from each scanner as it completes */
private final Map<byte[], List<Annotation>> annotation_map =
Collections.synchronizedMap(
new TreeMap<byte[], List<Annotation>>(new RowKey.SaltCmp()));
private final ConcurrentByteMap<List<Annotation>> annotation_map =
new ConcurrentByteMap<List<Annotation>>();

/** A deferred to call with the spans on completion */
private final Deferred<TreeMap<byte[], Span>> results =
Expand Down Expand Up @@ -636,18 +637,17 @@ void close(final boolean ok) {
* @param annotations The annotations fetched by the scanners
*/
private void validateAndTriggerCallback(final List<KeyValue> kvs,
final Map<byte[], List<Annotation>> annotations) {
final ByteMap<List<Annotation>> annotations) {

final int tasks = completed_tasks.incrementAndGet();
if (kvs.size() > 0) {
kv_map.put(tasks, kvs);
}

for (final byte[] key : annotations.keySet()) {
final List<Annotation> notes = annotations.get(key);
if (notes.size() > 0) {
for (final Entry<byte[], List<Annotation>> entry : annotations.entrySet()) {
if (entry.getValue().size() > 0) {
// Optimistic write, expecting unique row keys
annotation_map.put(key, notes);
annotation_map.put(entry.getKey(), entry.getValue());
}
}

Expand Down
69 changes: 69 additions & 0 deletions src/utils/ConcurrentByteMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// This file is part of OpenTSDB.
// Copyright (C) 2016 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.utils;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

import org.hbase.async.Bytes;

/**
* A concurrent version of the Bytes.ByteMap class. This map is sorted according
* to the lexical order of the byte arrays. Since using byte arrays as the key
* for default Java maps uses the hashcode which is based on the address of the
* array, this class solves the issue by using a proper array value comparator.
*
* @param <V> The type of object stored as values in the map.
*/
public class ConcurrentByteMap<V> extends ConcurrentSkipListMap<byte[], V>
implements Iterable<Map.Entry<byte[], V>>{

/**
* Default Ctor setting up the SkipListMap with the Bytes.MEMCMP comparator.
*/
public ConcurrentByteMap() {
super(Bytes.MEMCMP);
}

/** Returns an iterator that goes through all the entries in this map. */
public Iterator<Map.Entry<byte[], V>> iterator() {
return super.entrySet().iterator();
}

/** {@code byte[]} friendly implementation. */
public String toString() {
final int size = size();
if (size == 0) {
return "{}";
}
final StringBuilder buf = new StringBuilder(size << 4);
buf.append('{');
for (final Map.Entry<byte[], V> e : this) {
Bytes.pretty(buf, e.getKey());
buf.append('=');
final V value = e.getValue();
if (value instanceof byte[]) {
Bytes.pretty(buf, (byte[]) value);
} else {
buf.append(value == this ? "(this map)" : value);
}
buf.append(", ");
}
buf.setLength(buf.length() - 2); // Remove the extra ", ".
buf.append('}');
return buf.toString();
}

private static final long serialVersionUID = -7607287447846950300L;
}

0 comments on commit 8b1368a

Please # to comment.