Skip to content

Commit 1104c09

Browse files
Adding onDropped callback to throttleLast as a part of #7458 (#7488)
1 parent 80f7caa commit 1104c09

File tree

7 files changed

+359
-28
lines changed

7 files changed

+359
-28
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+85-2
Original file line numberDiff line numberDiff line change
@@ -14672,7 +14672,7 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emi
1467214672
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1467314673
Objects.requireNonNull(unit, "unit is null");
1467414674
Objects.requireNonNull(scheduler, "scheduler is null");
14675-
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
14675+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null));
1467614676
}
1467714677

1467814678
/**
@@ -14713,7 +14713,51 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
1471314713
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
1471414714
Objects.requireNonNull(unit, "unit is null");
1471514715
Objects.requireNonNull(scheduler, "scheduler is null");
14716-
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
14716+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
14717+
}
14718+
14719+
/**
14720+
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14721+
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
14722+
* and optionally emit the very last upstream item when the upstream completes.
14723+
* <p>
14724+
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
14725+
* <dl>
14726+
* <dt><b>Backpressure:</b></dt>
14727+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14728+
* <dt><b>Scheduler:</b></dt>
14729+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14730+
* </dl>
14731+
*
14732+
* <p>History: 2.0.5 - experimental
14733+
* @param period
14734+
* the sampling rate
14735+
* @param unit
14736+
* the {@link TimeUnit} in which {@code period} is defined
14737+
* @param scheduler
14738+
* the {@code Scheduler} to use when sampling
14739+
* @param emitLast
14740+
* if {@code true} and the upstream completes while there is still an unsampled item available,
14741+
* that item is emitted to downstream before completion
14742+
* if {@code false}, an unsampled last item is ignored.
14743+
* @param onDropped
14744+
* called with the current entry when it has been replaced by a new one
14745+
* @return the new {@code Flowable} instance
14746+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
14747+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14748+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14749+
* @see #throttleLast(long, TimeUnit, Scheduler)
14750+
* @since 2.1
14751+
*/
14752+
@CheckReturnValue
14753+
@NonNull
14754+
@BackpressureSupport(BackpressureKind.ERROR)
14755+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14756+
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<T> onDropped) {
14757+
Objects.requireNonNull(unit, "unit is null");
14758+
Objects.requireNonNull(scheduler, "scheduler is null");
14759+
Objects.requireNonNull(onDropped, "onDropped is null");
14760+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
1471714761
}
1471814762

1471914763
/**
@@ -17211,6 +17255,45 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
1721117255
return sample(intervalDuration, unit, scheduler);
1721217256
}
1721317257

17258+
/**
17259+
* Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
17260+
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
17261+
* <p>
17262+
* This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
17263+
* {@code throttleFirst} does not tick, it just tracks the passage of time.
17264+
* <p>
17265+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
17266+
* <dl>
17267+
* <dt><b>Backpressure:</b></dt>
17268+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17269+
* <dt><b>Scheduler:</b></dt>
17270+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
17271+
* </dl>
17272+
*
17273+
* @param intervalDuration
17274+
* duration of windows within which the last item emitted by the current {@code Flowable} will be
17275+
* emitted
17276+
* @param unit
17277+
* the unit of time of {@code intervalDuration}
17278+
* @param scheduler
17279+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
17280+
* event
17281+
* @param onDropped
17282+
* called with the current entry when it has been replaced by a new one
17283+
* @return the new {@code Flowable} instance
17284+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
17285+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17286+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17287+
* @see #sample(long, TimeUnit, Scheduler)
17288+
*/
17289+
@CheckReturnValue
17290+
@BackpressureSupport(BackpressureKind.ERROR)
17291+
@SchedulerSupport(SchedulerSupport.CUSTOM)
17292+
@NonNull
17293+
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
17294+
return sample(intervalDuration, unit, scheduler, false, onDropped);
17295+
}
17296+
1721417297
/**
1721517298
* Throttles items from the upstream {@code Flowable} by first emitting the next
1721617299
* item from upstream, then periodically emitting the latest item (if any) when

src/main/java/io/reactivex/rxjava3/core/Observable.java

+70-2
Original file line numberDiff line numberDiff line change
@@ -12128,7 +12128,40 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, boolean e
1212812128
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1212912129
Objects.requireNonNull(unit, "unit is null");
1213012130
Objects.requireNonNull(scheduler, "scheduler is null");
12131-
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false));
12131+
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, null));
12132+
}
12133+
12134+
/**
12135+
* Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable}
12136+
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
12137+
* <p>
12138+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.v3.png" alt="">
12139+
* <dl>
12140+
* <dt><b>Scheduler:</b></dt>
12141+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
12142+
* </dl>
12143+
*
12144+
* @param period
12145+
* the sampling rate
12146+
* @param unit
12147+
* the {@link TimeUnit} in which {@code period} is defined
12148+
* @param scheduler
12149+
* the {@code Scheduler} to use when sampling
12150+
* @param onDropped
12151+
* called with the current entry when it has been replaced by a new one
12152+
* @return the new {@code Observable} instance
12153+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
12154+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
12155+
* @see #throttleLast(long, TimeUnit, Scheduler)
12156+
*/
12157+
@CheckReturnValue
12158+
@SchedulerSupport(SchedulerSupport.CUSTOM)
12159+
@NonNull
12160+
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
12161+
Objects.requireNonNull(unit, "unit is null");
12162+
Objects.requireNonNull(scheduler, "scheduler is null");
12163+
Objects.requireNonNull(onDropped, "onDropped is null");
12164+
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, onDropped));
1213212165
}
1213312166

1213412167
/**
@@ -12165,7 +12198,7 @@ public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull
1216512198
public final Observable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
1216612199
Objects.requireNonNull(unit, "unit is null");
1216712200
Objects.requireNonNull(scheduler, "scheduler is null");
12168-
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast));
12201+
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
1216912202
}
1217012203

1217112204
/**
@@ -14233,6 +14266,41 @@ public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit
1423314266
return sample(intervalDuration, unit);
1423414267
}
1423514268

14269+
/**
14270+
* Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
14271+
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
14272+
* <p>
14273+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas
14274+
* {@code throttleFirst} does not tick, it just tracks passage of time.
14275+
* <p>
14276+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
14277+
* <dl>
14278+
* <dt><b>Scheduler:</b></dt>
14279+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14280+
* </dl>
14281+
*
14282+
* @param intervalDuration
14283+
* duration of windows within which the last item emitted by the current {@code Observable} will be
14284+
* emitted
14285+
* @param unit
14286+
* the unit of time of {@code intervalDuration}
14287+
* @param scheduler
14288+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
14289+
* event
14290+
* @param onDropped
14291+
* called with the current entry when it has been replaced by a new one
14292+
* @return the new {@code Observable} instance
14293+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
14294+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14295+
* @see #sample(long, TimeUnit, Scheduler)
14296+
*/
14297+
@CheckReturnValue
14298+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14299+
@NonNull
14300+
public final Observable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
14301+
return sample(intervalDuration, unit, scheduler, onDropped);
14302+
}
14303+
1423614304
/**
1423714305
* Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
1423814306
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java

+25-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.atomic.*;
1818

19+
import io.reactivex.rxjava3.exceptions.Exceptions;
20+
import io.reactivex.rxjava3.functions.Consumer;
1921
import org.reactivestreams.*;
2022

2123
import io.reactivex.rxjava3.core.*;
@@ -29,24 +31,25 @@ public final class FlowableSampleTimed<T> extends AbstractFlowableWithUpstream<T
2931
final long period;
3032
final TimeUnit unit;
3133
final Scheduler scheduler;
32-
3334
final boolean emitLast;
35+
final Consumer<T> onDropped;
3436

35-
public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
37+
public FlowableSampleTimed(Flowable<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer<T> onDropped) {
3638
super(source);
3739
this.period = period;
3840
this.unit = unit;
3941
this.scheduler = scheduler;
4042
this.emitLast = emitLast;
43+
this.onDropped = onDropped;
4144
}
4245

4346
@Override
4447
protected void subscribeActual(Subscriber<? super T> s) {
4548
SerializedSubscriber<T> serial = new SerializedSubscriber<>(s);
4649
if (emitLast) {
47-
source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler));
50+
source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped));
4851
} else {
49-
source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler));
52+
source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped));
5053
}
5154
}
5255

@@ -58,18 +61,20 @@ abstract static class SampleTimedSubscriber<T> extends AtomicReference<T> implem
5861
final long period;
5962
final TimeUnit unit;
6063
final Scheduler scheduler;
64+
final Consumer<T> onDropped;
6165

6266
final AtomicLong requested = new AtomicLong();
6367

6468
final SequentialDisposable timer = new SequentialDisposable();
6569

6670
Subscription upstream;
6771

68-
SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
72+
SampleTimedSubscriber(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
6973
this.downstream = actual;
7074
this.period = period;
7175
this.unit = unit;
7276
this.scheduler = scheduler;
77+
this.onDropped = onDropped;
7378
}
7479

7580
@Override
@@ -84,7 +89,17 @@ public void onSubscribe(Subscription s) {
8489

8590
@Override
8691
public void onNext(T t) {
87-
lazySet(t);
92+
T oldValue = getAndSet(t);
93+
if (oldValue != null && onDropped != null) {
94+
try {
95+
onDropped.accept(oldValue);
96+
} catch (Throwable throwable) {
97+
Exceptions.throwIfFatal(throwable);
98+
cancelTimer();
99+
upstream.cancel();
100+
downstream.onError(throwable);
101+
}
102+
}
88103
}
89104

90105
@Override
@@ -137,8 +152,8 @@ static final class SampleTimedNoLast<T> extends SampleTimedSubscriber<T> {
137152

138153
private static final long serialVersionUID = -7139995637533111443L;
139154

140-
SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
141-
super(actual, period, unit, scheduler);
155+
SampleTimedNoLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
156+
super(actual, period, unit, scheduler, onDropped);
142157
}
143158

144159
@Override
@@ -158,8 +173,8 @@ static final class SampleTimedEmitLast<T> extends SampleTimedSubscriber<T> {
158173

159174
final AtomicInteger wip;
160175

161-
SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
162-
super(actual, period, unit, scheduler);
176+
SampleTimedEmitLast(Subscriber<? super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
177+
super(actual, period, unit, scheduler, onDropped);
163178
this.wip = new AtomicInteger(1);
164179
}
165180

0 commit comments

Comments
 (0)