diff --git a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html index 6708d0e9ca..1bda843966 100644 --- a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html +++ b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html @@ -258,6 +258,14 @@

QuickFIX Settings

Y
N N + + PrioritizeResend + Prioritize resend responses over newer entries to ensure that the counterparty + first catches up before receiving (or being overwhelmed by) latter messages. + Y
N + N + + Validation diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index cd83b72206..88ff936519 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -229,6 +229,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf final boolean enableLastMsgSeqNumProcessed = getSetting(settings, sessionID, Session.SETTING_ENABLE_LAST_MSG_SEQ_NUM_PROCESSED, false); final int resendRequestChunkSize = getSetting(settings, sessionID, Session.SETTING_RESEND_REQUEST_CHUNK_SIZE, Session.DEFAULT_RESEND_RANGE_CHUNK_SIZE); final boolean allowPossDup = getSetting(settings, sessionID, Session.SETTING_ALLOW_POS_DUP_MESSAGES, false); + final boolean prioritizeResend = getSetting(settings, sessionID, Session.SETTING_PRIORITIZE_RESEND, false); final int[] logonIntervals = getLogonIntervalsInSeconds(settings, sessionID); final Set allowedRemoteAddresses = getInetAddresses(settings, sessionID); @@ -247,7 +248,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed, - validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup); + validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup, prioritizeResend); session.setLogonTimeout(logonTimeout); session.setLogoutTimeout(logoutTimeout); diff --git a/quickfixj-core/src/main/java/quickfix/Responder.java b/quickfixj-core/src/main/java/quickfix/Responder.java index 1005c39196..4dc5ff2e88 100644 --- a/quickfixj-core/src/main/java/quickfix/Responder.java +++ b/quickfixj-core/src/main/java/quickfix/Responder.java @@ -19,6 +19,10 @@ package quickfix; +import org.apache.mina.core.write.WriteRequestQueue; + +import java.util.List; + /** * Used by a Session to send raw FIX message data and to disconnect a * connection. This interface is used by Acceptor or Initiator implementations. @@ -37,6 +41,18 @@ public interface Responder { */ boolean send(String data); + /** + * Override to prioritize raw FIX {@code messages} over pending messages in the {@link WriteRequestQueue}. + * Typical use case is when sending response for resend request to ensure that the counterparty + * first catches up before receiving(or being overwhelmed by) latter messages. + * + * @param messages List of raw FIX messages to be prioritized in that order over all pending sends + * @return count of entries in {@code messages} list that were successfully scheduled + */ + default int prioritySend(List messages){ + throw new UnsupportedOperationException("Priority send not supported"); + } + /** * Disconnect the underlying connection. */ diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 7e52be07bf..f237e4fefd 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -62,6 +62,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Set; @@ -69,6 +70,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static quickfix.LogUtil.logThrowable; @@ -385,6 +387,12 @@ public class Session implements Closeable { */ public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup"; + /** + * Option to enable prioritization of resend responses allowing lagging counterparty app to catch up + * instead of being overwhelmed by new messages. + */ + public static final String SETTING_PRIORITIZE_RESEND = "PrioritizeResend"; + private static final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final Application application; @@ -436,6 +444,7 @@ public class Session implements Closeable { private boolean enableLastMsgSeqNumProcessed = false; private boolean validateChecksum = true; private boolean allowPosDup = false; + private boolean prioritizeResend = false; private int maxScheduledWriteRequests = 0; @@ -501,7 +510,7 @@ public class Session implements Closeable { messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup, false); } Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory, @@ -520,7 +529,7 @@ public class Session implements Closeable { boolean validateIncomingMessage, int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed, boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier, - boolean allowPossDup) { + boolean allowPossDup, boolean prioritizeResend) { this.application = application; this.sessionID = sessionID; this.sessionSchedule = sessionSchedule; @@ -556,6 +565,7 @@ public class Session implements Closeable { this.validateChecksum = validateChecksum; this.logonTags = logonTags; this.allowPosDup = allowPossDup; + this.prioritizeResend = prioritizeResend; final Log engineLog = (logFactory != null) ? logFactory.create(sessionID) : null; if (engineLog instanceof SessionStateListener) { @@ -2343,6 +2353,9 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN int current = beginSeqNo; boolean appMessageJustSent = false; + final List prioritizedResendAccumulator = prioritizeResend ? new ArrayList<>(messages.size()) : + Collections.emptyList(); + for (final String message : messages) { appMessageJustSent = false; final Message msg; @@ -2375,9 +2388,13 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN generateSequenceReset(receivedMessage, begin, msgSeqNum); } getLog().onEvent("Resending message: " + msgSeqNum); - send(msg.toString()); + if(prioritizeResend) { + prioritizedResendAccumulator.add(msg); + } else { + send(msg.toString()); + appMessageJustSent = true; + } begin = 0; - appMessageJustSent = true; } else { if (begin == 0) { begin = msgSeqNum; @@ -2387,6 +2404,12 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN current = msgSeqNum + 1; } + // schedule prioritized resend messages if any + if (!prioritizedResendAccumulator.isEmpty()) { + sendPrioritized(prioritizedResendAccumulator); + appMessageJustSent = true; + } + int newBegin = beginSeqNo; if (appMessageJustSent) { newBegin = msgSeqNum + 1; @@ -2761,6 +2784,20 @@ public boolean send(Message message, boolean allowPosDup) { return sendRaw(message, 0); } + private int sendPrioritized(List prioritizedMessages) { + Responder responder = getResponder(); + if(responder == null) { + getLog().onEvent("No responder, not sending message: " + prioritizedMessages.toString()); + return 0; + } + final List data = prioritizedMessages.stream().map(m -> { + String msg = m.toString(); + getLog().onOutgoing(msg); + return msg; + }).collect(Collectors.toList()); + return responder.prioritySend(data); + } + private boolean send(String messageString) { getLog().onOutgoing(messageString); Responder responder; diff --git a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java index 1c32dcb367..17e2994d72 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java +++ b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java @@ -21,6 +21,8 @@ import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.core.write.WriteRequestQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.Responder; @@ -28,6 +30,8 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; /** * The class that partially integrates the QuickFIX/J Session to @@ -75,6 +79,40 @@ public boolean send(String data) { return true; } + @Override + public int prioritySend(List data){ + final WriteRequestQueue writeRequestQueue = ioSession.getWriteRequestQueue(); + final List pendingWrites = new ArrayList<>(writeRequestQueue.size()); + int successfulMessageCount = 0; + try { + ioSession.suspendWrite(); + // drain existing pending writes, to be rescheduled in the end + // a work around as WriteRequestQueue is currently not a Deque + WriteRequest pending; + while ((pending = writeRequestQueue.poll(ioSession)) != null) { + pendingWrites.add(pending); + } + for (String d : data) { + if (this.send(d)) { + successfulMessageCount++; + } else { + break; + } + } + } finally { + // reschedule de-prioritized over existing priority send to the end of the queue + try { + for (WriteRequest pendingWrite : pendingWrites) { + writeRequestQueue.offer(ioSession, pendingWrite); + } + } catch (Exception e) { + log.error("Failed to reschedule pending writes: {}", e.getMessage()); + } + ioSession.resumeWrite(); + } + return successfulMessageCount; + } + @Override public void disconnect() { // We cannot call join() on the CloseFuture returned diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index 35356fb9e5..7d0bd183b9 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -27,6 +27,16 @@ public static Session createSession(SessionID sessionID, Application application .build(); } + public static Session createSessionWithPersistedMessages(SessionID sessionID, Application application, + boolean isInitiator, boolean prioritizeResend) { + return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator) + .setCheckLatency(true).setMaxLatency(Session.DEFAULT_MAX_LATENCY) + .setCheckCompID(true) + .setPersistMessages(true) + .setPrioritizeResend(prioritizeResend) + .build(); + } + public static Session createFileStoreSession(SessionID sessionID, Application application, boolean isInitiator, SessionSettings settings, SessionSchedule sessionSchedule) { return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator) @@ -113,6 +123,7 @@ public static final class Builder { private final boolean enableLastMsgSeqNumProcessed = false; private final boolean validateChecksum = true; private final boolean allowPosDup = false; + private boolean prioritizeResend = false; private List logonTags = new ArrayList<>(); public Session build() { @@ -125,7 +136,7 @@ public Session build() { resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, - enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup); + enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup, prioritizeResend); } public Builder setBeginString(final String beginString) { @@ -239,5 +250,11 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum; return this; } + + public Builder setPrioritizeResend(final boolean prioritizeResend) { + this.prioritizeResend = prioritizeResend; + return this; + } + } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index 7fb7627e4e..1c257be02c 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -1,8 +1,10 @@ package quickfix; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import quickfix.field.ApplVerID; import quickfix.field.BeginSeqNo; import quickfix.field.BeginString; @@ -52,9 +54,11 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -62,9 +66,11 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -106,7 +112,7 @@ public void testDisposalOfFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false)) { // Simulate socket disconnect session.setResponder(null); } @@ -150,7 +156,7 @@ public void testNondisposableFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false)) { // Simulate socket disconnect session.setResponder(null); @@ -2113,7 +2119,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, false, false, false, true, false, true, false, null, true, - chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2162,7 +2168,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) } } - @Test +@Test public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_number_is_higher_than_the_last_message_resent() throws IOException, InvalidMessage, FieldNotFound, RejectLogon, UnsupportedMessageType, IncorrectTagValue, IncorrectDataFormat, NoSuchFieldException, IllegalAccessException { @@ -2175,7 +2181,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2223,7 +2229,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2272,7 +2278,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception { UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2308,7 +2314,7 @@ public void testTimestampPrecision() throws Exception { UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2360,7 +2366,7 @@ private void testLargeQueue(int N) throws Exception { new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2476,7 +2482,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, false); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2952,6 +2958,21 @@ private Session setUpSession(Application application, boolean isInitiator, final Session session = SessionFactoryTestSupport.createSession( sessionID, application, isInitiator); session.setResponder(responder); + assertSetupSessionAttributes(isInitiator, session); + return session; + } + + private Session setUpSessionForPrioritizedResend(Application application, boolean prioritizeResend, Responder responder) throws Exception{ + final SessionID sessionID = new SessionID( + FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + final Session session = SessionFactoryTestSupport.createSessionWithPersistedMessages( + sessionID, application, false, prioritizeResend); + session.setResponder(responder); + assertSetupSessionAttributes(false, session); + return session; + } + + private void assertSetupSessionAttributes(boolean isInitiator, Session session) throws NoSuchFieldException, IllegalAccessException { final SessionState state = getSessionState(session); assertEquals(isInitiator, state.isInitiator()); assertFalse(state.isLogonSent()); @@ -2962,7 +2983,6 @@ private Session setUpSession(Application application, boolean isInitiator, assertFalse(state.isLogoutSent()); assertFalse(state.isLogoutReceived()); assertFalse(state.isLogoutTimedOut()); - return session; } private Session setUpFileStoreSession(Application application, @@ -2976,16 +2996,7 @@ private Session setUpFileStoreSession(Application application, .createFileStoreSession(sessionID, application, isInitiator, settings, sessionSchedule); session.setResponder(responder); - final SessionState state = getSessionState(session); - assertEquals(isInitiator, state.isInitiator()); - assertFalse(state.isLogonSent()); - assertFalse(state.isLogonReceived()); - assertFalse(state.isLogonAlreadySent()); - assertEquals(isInitiator, state.isLogonSendNeeded()); - assertFalse(state.isLogonTimedOut()); - assertFalse(state.isLogoutSent()); - assertFalse(state.isLogoutReceived()); - assertFalse(state.isLogoutTimedOut()); + assertSetupSessionAttributes(isInitiator, session); return session; } @@ -3145,4 +3156,35 @@ public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupCon assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); } + + //QFJ-271 Prioritize resend if and only if configured + @Test + public void testPrioritizedResend() { + final UnitTestApplication application = new UnitTestApplication(); + // todo upgrade to jupiter @ParameterizedTest + for (boolean prioritizeResend : asList(true, false)) { + final Responder responder = Mockito.mock(Responder.class); + Mockito.when(responder.send(Mockito.any(String.class))).thenReturn(true); + try (Session session = setUpSessionForPrioritizedResend(application, prioritizeResend, responder)) { + SessionState state = getSessionState(session); + + assertEquals(1, state.getNextTargetMsgSeqNum()); + logonTo(session, 1); + processMessage(session, createAppMessage(2)); + session.send(createAppMessage(2)); + assertFalse(state.isResendRequested()); + + processMessage(session, createHeartbeatMessage(7)); + assertTrue(state.isResendRequested()); + processMessage(session, createResendRequest(3, 2)); + if (prioritizeResend) { + verify(responder, times(1)).prioritySend(anyList()); + } else { + verify(responder, never()).prioritySend(anyList()); + } + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + } + } } diff --git a/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java b/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java index 77d2ac6546..863e1573fd 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java @@ -20,16 +20,27 @@ package quickfix.mina; import java.net.InetSocketAddress; +import java.util.Arrays; + import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.core.write.WriteRequestQueue; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import org.junit.Test; - +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -136,4 +147,43 @@ public void testGetRemoteSocketAddress() throws Exception { verify(mockProtocolSession).getRemoteAddress(); verifyNoMoreInteractions(mockProtocolSession); } + + @Test + public void testPrioritySend() { + IoSession ioSession = mock(IoSession.class); + when(ioSession.getRemoteAddress()).thenReturn(new InetSocketAddress("1.2.3.4", 5432)); + WriteRequestQueue writeRequestQueue = mock(WriteRequestQueue.class); + WriteRequest pendingWriteOne = mock(WriteRequest.class); + WriteRequest pendingWriteTwo = mock(WriteRequest.class); + when(writeRequestQueue.poll(ioSession)).thenReturn(pendingWriteOne, pendingWriteTwo, null); + when(ioSession.getWriteRequestQueue()).thenReturn(writeRequestQueue); + IoSessionResponder responder = new IoSessionResponder(ioSession, false, 0, 0); + responder.prioritySend(Arrays.asList("resend1", "resend2")); + + InOrder inOrder = inOrder(ioSession, writeRequestQueue); + inOrder.verify(ioSession).suspendWrite(); + + // verify drain first + inOrder.verify(writeRequestQueue, times(3 /* 2 + null */)).poll(ioSession); + + // verify prioritized writes in order + inOrder.verify(ioSession).write(eq("resend1")); + inOrder.verify(ioSession).write(eq("resend2")); + + // reschedule pending writes + inOrder.verify(writeRequestQueue).offer(ioSession, pendingWriteOne); + inOrder.verify(writeRequestQueue).offer(ioSession, pendingWriteTwo); + + // ensure writes are resumed + inOrder.verify(ioSession).resumeWrite(); + + // ensure writes resume even if exception + Mockito.reset(ioSession); + when(ioSession.getWriteRequestQueue()).thenReturn(writeRequestQueue); + doAnswer(invocation -> { + throw new RuntimeException("Ensure resume writes even if exception"); + }).when(writeRequestQueue).poll(eq(ioSession)); + assertThrows(RuntimeException.class, () -> responder.prioritySend(Arrays.asList("will not be scheduled"))); + verify(ioSession).resumeWrite(); + } }