Skip to content

Commit

Permalink
Merge pull request #3 from netifi/fix-fuseable-metrics-issue
Browse files Browse the repository at this point in the history
fix issue with fuseable metrics subscriber
  • Loading branch information
rdegnan authored Sep 13, 2018
2 parents f77c70a + 9713b31 commit 9c055c6
Show file tree
Hide file tree
Showing 13 changed files with 426 additions and 33 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ matrix:
jdk: oraclejdk8
language: generic
before_install:
- wget https://github.com/google/protobuf/releases/download/v3.6.0/protobuf-cpp-3.6.0.tar.gz
- tar -xzvf protobuf-cpp-3.6.0.tar.gz
- pushd protobuf-3.6.0 && ./configure --disable-shared && make && sudo make install
- wget https://github.com/google/protobuf/releases/download/v3.6.1/protobuf-cpp-3.6.1.tar.gz
- tar -xzvf protobuf-cpp-3.6.1.tar.gz
- pushd protobuf-3.6.1 && ./configure --disable-shared && make && sudo make install
&& popd
script: "./buildtravis.sh"
- os: osx
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ RSocket RPC Java uses a Protobuf plugin to generate application code. Add the fo
```
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.6.0'
artifact = 'com.google.protobuf:protoc:3.6.1'
}
plugins {
rsocketRpc {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=io.rsocket.rpc
version=0.2.0
version=0.2.1
8 changes: 4 additions & 4 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ dependencies {
testCompile 'junit:junit:4.12'

testCompile 'javax.inject:javax.inject:1'
testCompile 'io.projectreactor:reactor-test:3.1.7.RELEASE'
testCompile 'com.google.protobuf:protobuf-java:3.6.0'
testCompile 'io.projectreactor:reactor-test:3.1.9.RELEASE'
testCompile 'com.google.protobuf:protobuf-java:3.6.1'
testCompile 'org.hdrhistogram:HdrHistogram:2.1.10'
testCompile 'org.apache.logging.log4j:log4j-api:2.8.2'
testCompile 'org.apache.logging.log4j:log4j-core:2.8.2'
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.2'
testCompile 'io.rsocket:rsocket-transport-netty:0.11.5'
testCompile 'io.rsocket:rsocket-transport-local:0.11.5'
testCompile 'io.rsocket:rsocket-transport-netty:0.11.6'
testCompile 'io.rsocket:rsocket-transport-local:0.11.6'
testCompile 'org.mockito:mockito-all:1.10.19'
}

Expand Down
10 changes: 5 additions & 5 deletions rsocket-rpc-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ dependencies {
compile project (':rsocket-rpc-protobuf')
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'javax.inject:javax.inject:1'
compile 'com.google.protobuf:protobuf-java:3.6.0'
compile 'io.rsocket:rsocket-core:0.11.5'
compile 'io.rsocket:rsocket-transport-netty:0.11.5'
compile 'io.micrometer:micrometer-core:1.0.3'
compile 'com.google.protobuf:protobuf-java:3.6.1'
compile 'io.rsocket:rsocket-core:0.11.6'
compile 'io.rsocket:rsocket-transport-netty:0.11.6'
compile 'io.micrometer:micrometer-core:1.0.6'

protobuf project(':rsocket-rpc-metrics-idl')

Expand All @@ -28,7 +28,7 @@ protobuf {
generatedFilesBaseDir = "${projectDir}/src/generated"

protoc {
artifact = 'com.google.protobuf:protoc:3.6.0'
artifact = 'com.google.protobuf:protoc:3.6.1'
}
plugins {
rsocketRpc {
Expand Down
32 changes: 30 additions & 2 deletions rsocket-rpc-core/src/main/java/io/rsocket/rpc/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.micrometer.core.instrument.*;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;

public class Metrics {
Expand All @@ -13,6 +14,7 @@ public class Metrics {
return timed(registry, name, Tags.of(keyValues));
}

@SuppressWarnings("unchecked")
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> timed(
MeterRegistry registry, String name, Iterable<Tag> tags) {
Counter next =
Expand All @@ -32,7 +34,33 @@ public class Metrics {
.tags(tags)
.register(registry);
return Operators.lift(
(scannable, subscriber) ->
new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer));
(scannable, subscriber) -> {
if (scannable instanceof Fuseable) {
if (subscriber instanceof Fuseable.ConditionalSubscriber) {
return new MetricsFuseableConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<? super T>) subscriber,
next,
complete,
error,
cancelled,
timer);
} else {
return new MetricsFuseableSubscriber<>(
subscriber, next, complete, error, cancelled, timer);
}
} else {
if (subscriber instanceof Fuseable.ConditionalSubscriber) {
return new MetricsFuseableConditionalSubscriber<>(
(Fuseable.ConditionalSubscriber<? super T>) subscriber,
next,
complete,
error,
cancelled,
timer);
} else {
return new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer);
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.rsocket.rpc.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable.ConditionalSubscriber;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

public class MetricsConditionalSubscriber<T> extends AtomicBoolean
implements Subscription, ConditionalSubscriber<T> {
private final ConditionalSubscriber<? super T> actual;
private final Counter next, complete, error, cancelled;
private final Timer timer;

private Subscription s;
private long start;

MetricsConditionalSubscriber(
ConditionalSubscriber<? super T> actual,
Counter next,
Counter complete,
Counter error,
Counter cancelled,
Timer timer) {
this.actual = actual;
this.next = next;
this.complete = complete;
this.error = error;
this.cancelled = cancelled;
this.timer = timer;
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
this.start = System.nanoTime();

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
next.increment();
actual.onNext(t);
}

@Override
public boolean tryOnNext(T t) {
next.increment();
return actual.tryOnNext(t);
}

@Override
public void onError(Throwable t) {
if (compareAndSet(false, true)) {
error.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
actual.onError(t);
}

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
complete.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
actual.onComplete();
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
if (compareAndSet(false, true)) {
cancelled.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
s.cancel();
}

@Override
public Context currentContext() {
return actual.currentContext();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.rsocket.rpc.metrics;

import static reactor.core.Fuseable.ASYNC;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.core.Fuseable.ConditionalSubscriber;
import reactor.core.Fuseable.QueueSubscription;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

public class MetricsFuseableConditionalSubscriber<T> extends AtomicBoolean
implements QueueSubscription<T>, ConditionalSubscriber<T> {
private final ConditionalSubscriber<? super T> actual;
private final Counter next, complete, error, cancelled;
private final Timer timer;

private QueueSubscription<T> s;
private int sourceMode;

private long start;

MetricsFuseableConditionalSubscriber(
ConditionalSubscriber<? super T> actual,
Counter next,
Counter complete,
Counter error,
Counter cancelled,
Timer timer) {
this.actual = actual;
this.next = next;
this.complete = complete;
this.error = error;
this.cancelled = cancelled;
this.timer = timer;
}

@Override
@SuppressWarnings("unchecked")
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = (QueueSubscription<T>) s;
this.start = System.nanoTime();

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (sourceMode == ASYNC) {
actual.onNext(null);
} else {
next.increment();
actual.onNext(t);
}
}

@Override
public boolean tryOnNext(T t) {
next.increment();
return actual.tryOnNext(t);
}

@Override
public void onError(Throwable t) {
if (compareAndSet(false, true)) {
error.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
actual.onError(t);
}

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
complete.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
actual.onComplete();
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
if (compareAndSet(false, true)) {
cancelled.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
s.cancel();
}

@Override
public Context currentContext() {
return actual.currentContext();
}

@Override
public int requestFusion(int requestedMode) {
int m;
if ((requestedMode & Fuseable.THREAD_BARRIER) != 0) {
return Fuseable.NONE;
} else {
m = s.requestFusion(requestedMode);
}
sourceMode = m;
return m;
}

@Override
@Nullable
public T poll() {
T v = s.poll();
if (v != null) {
next.increment();
return v;
}
return null;
}

@Override
public boolean isEmpty() {
return s.isEmpty();
}

@Override
public void clear() {
s.clear();
}

@Override
public int size() {
return s.size();
}
}
Loading

0 comments on commit 9c055c6

Please # to comment.