Skip to content

Commit

Permalink
Merge pull request #78 from lyft/kbilton/fix_upstream_test
Browse files Browse the repository at this point in the history
[hotfix] Migrate SourceOperatorAlignmentTest to AssertJ
  • Loading branch information
kjbilton authored Feb 10, 2025
2 parents 7af6700 + 02b3f1f commit 4bd608c
Showing 1 changed file with 34 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,17 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit test for {@link SourceOperator} watermark alignment. */
@SuppressWarnings("serial")
public class SourceOperatorAlignmentTest {
class SourceOperatorAlignmentTest {

@Nullable private SourceOperatorTestContext context;
@Nullable private SourceOperator<Integer, MockSourceSplit> operator;

@BeforeEach
public void setup() throws Exception {
void setup() throws Exception {
context =
new SourceOperatorTestContext(
false,
Expand All @@ -73,14 +68,14 @@ public void setup() throws Exception {
}

@AfterEach
public void tearDown() throws Exception {
void tearDown() throws Exception {
context.close();
context = null;
operator = null;
}

@Test
public void testWatermarkAlignment() throws Exception {
void testWatermarkAlignment() throws Exception {
operator.initializeState(context.createStateContext());
operator.open();
MockSourceSplit newSplit = new MockSourceSplit(2);
Expand All @@ -98,39 +93,39 @@ public void testWatermarkAlignment() throws Exception {
CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>();
List<Integer> expectedOutput = new ArrayList<>();

assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE));
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
expectedOutput.add(record1);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(record1);
assertOutput(actualOutput, expectedOutput);
assertTrue(operator.isAvailable());
assertThat(operator.isAvailable()).isTrue();

operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 - 1));

assertFalse(operator.isAvailable());
assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
assertThat(operator.isAvailable()).isFalse();
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
assertLatestReportedWatermarkEvent(record1);
assertOutput(actualOutput, expectedOutput);
assertFalse(operator.isAvailable());
assertThat(operator.isAvailable()).isFalse();

operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 1));

assertTrue(operator.isAvailable());
assertThat(operator.isAvailable()).isTrue();
operator.emitNext(actualOutput);
// Try to poll a record second time. Technically speaking previous emitNext call could have
// already switch the operator status to unavailable, but that's an implementation detail.
// However, this second call can not emit anything and should after that second call
// operator must be unavailable.
assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
expectedOutput.add(record2);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(record2);
assertOutput(actualOutput, expectedOutput);
assertFalse(operator.isAvailable());
assertThat(operator.isAvailable()).isFalse();
}

@Test
public void testWatermarkAlignmentWithIdleness() throws Exception {
void testWatermarkAlignmentWithIdleness() throws Exception {
// we use a separate context, because we need to enable idleness
try (SourceOperatorTestContext context =
new SourceOperatorTestContext(
Expand All @@ -156,17 +151,18 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>();
List<Integer> expectedOutput = new ArrayList<>();

assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE));
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
expectedOutput.add(record1);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(context, record1);
// mock WatermarkAlignmentEvent from SourceCoordinator
operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100));
assertOutput(actualOutput, expectedOutput);
assertTrue(operator.isAvailable());
assertThat(operator.isAvailable()).isTrue();

// source becomes idle, it should report Long.MAX_VALUE as the watermark
assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
assertThat(operator.emitNext(actualOutput))
.isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
// If all source subtasks of the watermark group are idle,
Expand All @@ -184,15 +180,15 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));

assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE));
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
expectedOutput.add(record2);
context.getTimeService().advance(1);
// becomes active again, should go back to the previously emitted
// watermark, as the record2 does not emit watermarks
assertLatestReportedWatermarkEvent(context, record1);
operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100));
assertOutput(actualOutput, expectedOutput);
assertTrue(operator.isAvailable());
assertThat(operator.isAvailable()).isTrue();
}
}

Expand Down Expand Up @@ -230,18 +226,18 @@ int record = 10;
}

@Test
public void testStopWhileWaitingForWatermarkAlignment() throws Exception {
void testStopWhileWaitingForWatermarkAlignment() throws Exception {
testWatermarkAlignment();

CompletableFuture<?> availableFuture = operator.getAvailableFuture();
assertFalse(availableFuture.isDone());
assertThat(availableFuture).isNotDone();
operator.stop(StopMode.NO_DRAIN);
assertTrue(availableFuture.isDone());
assertTrue(operator.isAvailable());
assertThat(availableFuture).isDone();
assertThat(operator.isAvailable()).isTrue();
}

@Test
public void testReportedWatermarkDoNotDecrease() throws Exception {
void testReportedWatermarkDoNotDecrease() throws Exception {
operator.initializeState(context.createStateContext());
operator.open();
MockSourceSplit split1 = new MockSourceSplit(2);
Expand Down Expand Up @@ -273,12 +269,12 @@ public void testReportedWatermarkDoNotDecrease() throws Exception {
private void assertOutput(
CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) {
assertThat(
actualOutput.getEvents().stream()
.filter(o -> o instanceof StreamRecord)
.mapToInt(object -> ((StreamRecord<Integer>) object).getValue())
.boxed()
.collect(Collectors.toList()),
contains(expectedOutput.toArray()));
actualOutput.getEvents().stream()
.filter(o -> o instanceof StreamRecord)
.mapToInt(object -> ((StreamRecord<Integer>) object).getValue())
.boxed()
.collect(Collectors.toList()))
.containsExactly(expectedOutput.toArray(new Integer[0]));
}

private void assertLatestReportedWatermarkEvent(long expectedWatermark) {
Expand All @@ -292,8 +288,9 @@ private void assertLatestReportedWatermarkEvent(
.filter(event -> event instanceof ReportedWatermarkEvent)
.collect(Collectors.toList());

assertFalse(events.isEmpty());
assertEquals(new ReportedWatermarkEvent(expectedWatermark), events.get(events.size() - 1));
assertThat(events).isNotEmpty();
assertThat(events.get(events.size() - 1))
.isEqualTo(new ReportedWatermarkEvent(expectedWatermark));
}

private void assertNoReportedWatermarkEvent(SourceOperatorTestContext context) {
Expand Down

0 comments on commit 4bd608c

Please # to comment.