From 02b3f1f3146cdc849f190725f370929a9e2e9a42 Mon Sep 17 00:00:00 2001 From: 1996fanrui <1996fanrui@gmail.com> Date: Thu, 22 Jun 2023 23:12:19 +0800 Subject: [PATCH] [hotfix] Migrate SourceOperatorAlignmentTest to AssertJ --- .../SourceOperatorAlignmentTest.java | 71 +++++++++---------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index c08221807c2a3..bd51d27be726f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -46,22 +46,17 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link SourceOperator} watermark alignment. */ @SuppressWarnings("serial") -public class SourceOperatorAlignmentTest { +class SourceOperatorAlignmentTest { @Nullable private SourceOperatorTestContext context; @Nullable private SourceOperator operator; @BeforeEach - public void setup() throws Exception { + void setup() throws Exception { context = new SourceOperatorTestContext( false, @@ -73,14 +68,14 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() throws Exception { + void tearDown() throws Exception { context.close(); context = null; operator = null; } @Test - public void testWatermarkAlignment() throws Exception { + void testWatermarkAlignment() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); MockSourceSplit newSplit = new MockSourceSplit(2); @@ -98,39 +93,39 @@ public void testWatermarkAlignment() throws Exception { CollectingDataOutput actualOutput = new CollectingDataOutput<>(); List expectedOutput = new ArrayList<>(); - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE)); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record1); context.getTimeService().advance(1); assertLatestReportedWatermarkEvent(record1); assertOutput(actualOutput, expectedOutput); - assertTrue(operator.isAvailable()); + assertThat(operator.isAvailable()).isTrue(); operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 - 1)); - assertFalse(operator.isAvailable()); - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE)); + assertThat(operator.isAvailable()).isFalse(); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); assertLatestReportedWatermarkEvent(record1); assertOutput(actualOutput, expectedOutput); - assertFalse(operator.isAvailable()); + assertThat(operator.isAvailable()).isFalse(); operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 1)); - assertTrue(operator.isAvailable()); + assertThat(operator.isAvailable()).isTrue(); operator.emitNext(actualOutput); // Try to poll a record second time. Technically speaking previous emitNext call could have // already switch the operator status to unavailable, but that's an implementation detail. // However, this second call can not emit anything and should after that second call // operator must be unavailable. - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE)); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); expectedOutput.add(record2); context.getTimeService().advance(1); assertLatestReportedWatermarkEvent(record2); assertOutput(actualOutput, expectedOutput); - assertFalse(operator.isAvailable()); + assertThat(operator.isAvailable()).isFalse(); } @Test - public void testWatermarkAlignmentWithIdleness() throws Exception { + void testWatermarkAlignmentWithIdleness() throws Exception { // we use a separate context, because we need to enable idleness try (SourceOperatorTestContext context = new SourceOperatorTestContext( @@ -156,17 +151,18 @@ public void testWatermarkAlignmentWithIdleness() throws Exception { CollectingDataOutput actualOutput = new CollectingDataOutput<>(); List expectedOutput = new ArrayList<>(); - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE)); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record1); context.getTimeService().advance(1); assertLatestReportedWatermarkEvent(context, record1); // mock WatermarkAlignmentEvent from SourceCoordinator operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100)); assertOutput(actualOutput, expectedOutput); - assertTrue(operator.isAvailable()); + assertThat(operator.isAvailable()).isTrue(); // source becomes idle, it should report Long.MAX_VALUE as the watermark - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE)); + assertThat(operator.emitNext(actualOutput)) + .isEqualTo(DataInputStatus.NOTHING_AVAILABLE); context.getTimeService().advance(1); assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE); // If all source subtasks of the watermark group are idle, @@ -184,7 +180,7 @@ public void testWatermarkAlignmentWithIdleness() throws Exception { new AddSplitEvent<>( Collections.singletonList(newSplit), new MockSourceSplitSerializer())); - assertThat(operator.emitNext(actualOutput), is(DataInputStatus.MORE_AVAILABLE)); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record2); context.getTimeService().advance(1); // becomes active again, should go back to the previously emitted @@ -192,7 +188,7 @@ public void testWatermarkAlignmentWithIdleness() throws Exception { assertLatestReportedWatermarkEvent(context, record1); operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100)); assertOutput(actualOutput, expectedOutput); - assertTrue(operator.isAvailable()); + assertThat(operator.isAvailable()).isTrue(); } } @@ -230,18 +226,18 @@ int record = 10; } @Test - public void testStopWhileWaitingForWatermarkAlignment() throws Exception { + void testStopWhileWaitingForWatermarkAlignment() throws Exception { testWatermarkAlignment(); CompletableFuture availableFuture = operator.getAvailableFuture(); - assertFalse(availableFuture.isDone()); + assertThat(availableFuture).isNotDone(); operator.stop(StopMode.NO_DRAIN); - assertTrue(availableFuture.isDone()); - assertTrue(operator.isAvailable()); + assertThat(availableFuture).isDone(); + assertThat(operator.isAvailable()).isTrue(); } @Test - public void testReportedWatermarkDoNotDecrease() throws Exception { + void testReportedWatermarkDoNotDecrease() throws Exception { operator.initializeState(context.createStateContext()); operator.open(); MockSourceSplit split1 = new MockSourceSplit(2); @@ -273,12 +269,12 @@ public void testReportedWatermarkDoNotDecrease() throws Exception { private void assertOutput( CollectingDataOutput actualOutput, List expectedOutput) { assertThat( - actualOutput.getEvents().stream() - .filter(o -> o instanceof StreamRecord) - .mapToInt(object -> ((StreamRecord) object).getValue()) - .boxed() - .collect(Collectors.toList()), - contains(expectedOutput.toArray())); + actualOutput.getEvents().stream() + .filter(o -> o instanceof StreamRecord) + .mapToInt(object -> ((StreamRecord) object).getValue()) + .boxed() + .collect(Collectors.toList())) + .containsExactly(expectedOutput.toArray(new Integer[0])); } private void assertLatestReportedWatermarkEvent(long expectedWatermark) { @@ -292,8 +288,9 @@ private void assertLatestReportedWatermarkEvent( .filter(event -> event instanceof ReportedWatermarkEvent) .collect(Collectors.toList()); - assertFalse(events.isEmpty()); - assertEquals(new ReportedWatermarkEvent(expectedWatermark), events.get(events.size() - 1)); + assertThat(events).isNotEmpty(); + assertThat(events.get(events.size() - 1)) + .isEqualTo(new ReportedWatermarkEvent(expectedWatermark)); } private void assertNoReportedWatermarkEvent(SourceOperatorTestContext context) {