Skip to content

Commit

Permalink
2.x: Avoid using System.getProperties(), some cleanup, up RS 1.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 28, 2019
1 parent 13772a1 commit cc690ff
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 65 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
// ---------------------------------------

def junitVersion = "4.12"
def reactiveStreamsVersion = "1.0.2"
def reactiveStreamsVersion = "1.0.3"
def mockitoVersion = "2.1.0"
def jmhLibVersion = "1.20"
def testNgVersion = "6.11"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static final class NextObserver implements CompletableObserver {

final CompletableObserver downstream;

public NextObserver(AtomicReference<Disposable> parent, CompletableObserver downstream) {
NextObserver(AtomicReference<Disposable> parent, CompletableObserver downstream) {
this.parent = parent;
this.downstream = downstream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Publisher<T> source() {
}

/**
* The internal buffer size of this FloawblePublish operator.
* @return The internal buffer size of this FloawblePublish operator.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public Publisher<T> source() {
}

/**
* The internal buffer size of this FloawblePublishAlt operator.
* @return The internal buffer size of this FloawblePublishAlt operator.
*/
public int publishBufferSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
public interface FlowablePublishClassic<T> {

/**
* The upstream source of this publish operator.
* @return the upstream source of this publish operator
*/
Publisher<T> publishSource();

/**
* The internal buffer size of this publish operator.
* @return the internal buffer size of this publish operator
*/
int publishBufferSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static final class PublishConnection<T>
Throwable error;

@SuppressWarnings("unchecked")
public PublishConnection(AtomicReference<PublishConnection<T>> current) {
PublishConnection(AtomicReference<PublishConnection<T>> current) {
this.connect = new AtomicBoolean();
this.current = current;
this.upstream = new AtomicReference<Disposable>();
Expand Down Expand Up @@ -261,7 +261,7 @@ static final class InnerDisposable<T>

final Observer<? super T> downstream;

public InnerDisposable(Observer<? super T> downstream, PublishConnection<T> parent) {
InnerDisposable(Observer<? super T> downstream, PublishConnection<T> parent) {
this.downstream = downstream;
lazySet(parent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public interface ObservablePublishClassic<T> {

/**
* The upstream source of this publish operator.
* @return the upstream source of this publish operator
*/
ObservableSource<T> publishSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.functions.Function;

/**
* Manages the creating of ScheduledExecutorServices and sets up purging.
*/
Expand Down Expand Up @@ -90,40 +92,48 @@ public static void shutdown() {
}

static {
Properties properties = System.getProperties();

PurgeProperties pp = new PurgeProperties();
pp.load(properties);

PURGE_ENABLED = pp.purgeEnable;
PURGE_PERIOD_SECONDS = pp.purgePeriod;
SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor();
PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor);
PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor);

start();
}

static final class PurgeProperties {

boolean purgeEnable;

int purgePeriod;

void load(Properties properties) {
if (properties.containsKey(PURGE_ENABLED_KEY)) {
purgeEnable = Boolean.parseBoolean(properties.getProperty(PURGE_ENABLED_KEY));
} else {
purgeEnable = true;
static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function<String, String> propertyAccessor) {
if (enabled) {
try {
String value = propertyAccessor.apply(key);
if (value == null) {
return defaultNotFound;
}
return Integer.parseInt(value);
} catch (Throwable ex) {
return defaultNotFound;
}
}
return defaultNotEnabled;
}

if (purgeEnable && properties.containsKey(PURGE_PERIOD_SECONDS_KEY)) {
try {
purgePeriod = Integer.parseInt(properties.getProperty(PURGE_PERIOD_SECONDS_KEY));
} catch (NumberFormatException ex) {
purgePeriod = 1;
static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function<String, String> propertyAccessor) {
if (enabled) {
try {
String value = propertyAccessor.apply(key);
if (value == null) {
return defaultNotFound;
}
} else {
purgePeriod = 1;
return "true".equals(value);
} catch (Throwable ex) {
return defaultNotFound;
}
}
return defaultNotEnabled;
}

static final class SystemPropertyAccessor implements Function<String, String> {
@Override
public String apply(String t) throws Exception {
return System.getProperty(t);
}
}

/**
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/reactivex/observers/BaseTestConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@ public final U assertValues(T... values) {
* @return this
* @since 2.2
*/
@SuppressWarnings("unchecked")
public final U assertValuesOnly(T... values) {
return assertSubscribed()
.assertValues(values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

import static org.junit.Assert.*;

import java.util.Properties;

import org.junit.Test;

import io.reactivex.TestHelper;
import io.reactivex.internal.schedulers.SchedulerPoolFactory.PurgeProperties;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.schedulers.Schedulers;

public class SchedulerPoolFactoryTest {
Expand Down Expand Up @@ -78,53 +77,66 @@ public void run() {
}

@Test
public void loadPurgeProperties() {
Properties props1 = new Properties();

PurgeProperties pp = new PurgeProperties();
pp.load(props1);

assertTrue(pp.purgeEnable);
assertEquals(pp.purgePeriod, 1);
public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor));
assertFalse(SchedulerPoolFactory.getBooleanProperty(false, "key", true, false, failingPropertiesAccessor));
}

@Test
public void loadPurgePropertiesDisabled() {
Properties props1 = new Properties();
props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "false");
public void boolPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "key", true, false, missingPropertiesAccessor));
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "key", false, true, missingPropertiesAccessor));
}

PurgeProperties pp = new PurgeProperties();
pp.load(props1);
@Test
public void boolPropertiesFailureReturnsDefaultMissing() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "key", true, false, failingPropertiesAccessor));
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "key", false, true, failingPropertiesAccessor));
}

assertFalse(pp.purgeEnable);
assertEquals(pp.purgePeriod, 1);
@Test
public void boolPropertiesReturnsValue() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(true, "true", true, false, Functions.<String>identity()));
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.<String>identity()));
}

@Test
public void loadPurgePropertiesEnabledCustomPeriod() {
Properties props1 = new Properties();
props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "true");
props1.setProperty(SchedulerPoolFactory.PURGE_PERIOD_SECONDS_KEY, "2");
public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor));
}

PurgeProperties pp = new PurgeProperties();
pp.load(props1);
@Test
public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor));
}

assertTrue(pp.purgeEnable);
assertEquals(pp.purgePeriod, 2);
@Test
public void intPropertiesFailureReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor));
}

@Test
public void loadPurgePropertiesEnabledCustomPeriodNaN() {
Properties props1 = new Properties();
props1.setProperty(SchedulerPoolFactory.PURGE_ENABLED_KEY, "true");
props1.setProperty(SchedulerPoolFactory.PURGE_PERIOD_SECONDS_KEY, "abc");
public void intPropertiesReturnsValue() throws Throwable {
assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.<String>identity()));
assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.<String>identity()));
}

PurgeProperties pp = new PurgeProperties();
pp.load(props1);
static final Function<String, String> failingPropertiesAccessor = new Function<String, String>() {
@Override
public String apply(String v) throws Exception {
throw new SecurityException();
}
};

assertTrue(pp.purgeEnable);
assertEquals(pp.purgePeriod, 1);
}
static final Function<String, String> missingPropertiesAccessor = new Function<String, String>() {
@Override
public String apply(String v) throws Exception {
return null;
}
};

@Test
public void putIntoPoolNoPurge() {
Expand Down

0 comments on commit cc690ff

Please # to comment.