Skip to content

Commit

Permalink
Attempt a fix for #823 wherein writes to the annotations byte map
Browse files Browse the repository at this point in the history
were not synchronized, potentially leading to a race condition and
stuck threads. Now we'll send a fresh list to the compaction code
if notes were found, synchronize on the local byte map before making
any modifications. Thanks to @thatsafunnyname.

Signed-off-by: Chris Larsen <clarsen@yahoo-inc.com>
  • Loading branch information
manolama committed Dec 17, 2016
1 parent b0ae20f commit 76eb740
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions src/core/SaltScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

Expand Down Expand Up @@ -316,8 +317,10 @@ final class ScannerCB implements Callback<Object,
private final List<KeyValue> kvs = new ArrayList<KeyValue>();
private final ByteMap<List<Annotation>> annotations =
new ByteMap<List<Annotation>>();
private final Set<String> skips = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> keepers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> skips = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
private final Set<String> keepers = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());

private long scanner_start = -1;
/** nanosecond timestamps */
Expand Down Expand Up @@ -535,12 +538,6 @@ void processRow(final byte[] key, final ArrayList<KeyValue> row) {
tsdb.getClient().delete(del);
}

List<Annotation> notes = annotations.get(key);
if (notes == null) {
notes = new ArrayList<Annotation>();
annotations.put(key, notes);
}

//TODO rollup doesn't use the column qualifier prefix right now
//Please move this logic to @CompactionQueue.compact API, if the
//qualifier prefix is set for rollup. Right now there is no way to
Expand Down Expand Up @@ -612,7 +609,18 @@ void processRow(final byte[] key, final ArrayList<KeyValue> row) {
// the scanner
final long compaction_start = DateTime.nanoTime();
try {
final List<Annotation> notes = Lists.newArrayList();
compacted = tsdb.compact(row, notes);
if (!notes.isEmpty()) {
synchronized (annotations) {
List<Annotation> map_notes = annotations.get(key);
if (map_notes == null) {
annotations.put(key, notes);
} else {
map_notes.addAll(notes);
}
}
}
} catch (IllegalDataException idex) {
compaction_time += (DateTime.nanoTime() - compaction_start);
close(false);
Expand Down

0 comments on commit 76eb740

Please # to comment.