Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Prioritize Resend, if configured. #688

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,14 @@ <H3>QuickFIX Settings</H3>
<TD> Y<br/>N</TD>
<TD>N</TD>
</TR>
<TR ALIGN="left" VALIGN="middle">
<TD><I>PrioritizeResend</I></TD>
<TD>Prioritize resend responses over newer entries to ensure that the counterparty
first catches up before receiving (or being overwhelmed by) latter messages.</TD>
<TD>Y<br/>N</TD>
<TD>N</TD>
</TR>


<TR ALIGN="center" VALIGN="middle">
<TD COLSPAN="4" class="subsection"><A NAME="Validation">Validation</A></TD>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
final boolean enableLastMsgSeqNumProcessed = getSetting(settings, sessionID, Session.SETTING_ENABLE_LAST_MSG_SEQ_NUM_PROCESSED, false);
final int resendRequestChunkSize = getSetting(settings, sessionID, Session.SETTING_RESEND_REQUEST_CHUNK_SIZE, Session.DEFAULT_RESEND_RANGE_CHUNK_SIZE);
final boolean allowPossDup = getSetting(settings, sessionID, Session.SETTING_ALLOW_POS_DUP_MESSAGES, false);
final boolean prioritizeResend = getSetting(settings, sessionID, Session.SETTING_PRIORITIZE_RESEND, false);

final int[] logonIntervals = getLogonIntervalsInSeconds(settings, sessionID);
final Set<InetAddress> allowedRemoteAddresses = getInetAddresses(settings, sessionID);
Expand All @@ -247,7 +248,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime,
forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage,
resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed,
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup);
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup, prioritizeResend);

session.setLogonTimeout(logonTimeout);
session.setLogoutTimeout(logoutTimeout);
Expand Down
16 changes: 16 additions & 0 deletions quickfixj-core/src/main/java/quickfix/Responder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package quickfix;

import org.apache.mina.core.write.WriteRequestQueue;

import java.util.List;

/**
* Used by a Session to send raw FIX message data and to disconnect a
* connection. This interface is used by Acceptor or Initiator implementations.
Expand All @@ -37,6 +41,18 @@ public interface Responder {
*/
boolean send(String data);

/**
* Override to prioritize raw FIX {@code messages} over pending messages in the {@link WriteRequestQueue}.
* Typical use case is when sending response for resend request to ensure that the counterparty
* first catches up before receiving(or being overwhelmed by) latter messages.
*
* @param messages List of raw FIX messages to be prioritized in that order over all pending sends
* @return count of entries in {@code messages} list that were successfully scheduled
*/
default int prioritySend(List<String> messages){
throw new UnsupportedOperationException("Priority send not supported");
}

/**
* Disconnect the underlying connection.
*/
Expand Down
45 changes: 41 additions & 4 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static quickfix.LogUtil.logThrowable;

Expand Down Expand Up @@ -385,6 +387,12 @@ public class Session implements Closeable {
*/
public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup";

/**
* Option to enable prioritization of resend responses allowing lagging counterparty app to catch up
* instead of being overwhelmed by new messages.
*/
public static final String SETTING_PRIORITIZE_RESEND = "PrioritizeResend";

private static final ConcurrentMap<SessionID, Session> sessions = new ConcurrentHashMap<>();

private final Application application;
Expand Down Expand Up @@ -436,6 +444,7 @@ public class Session implements Closeable {
private boolean enableLastMsgSeqNumProcessed = false;
private boolean validateChecksum = true;
private boolean allowPosDup = false;
private boolean prioritizeResend = false;

private int maxScheduledWriteRequests = 0;

Expand Down Expand Up @@ -501,7 +510,7 @@ public class Session implements Closeable {
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup);
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup, false);
}

Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory,
Expand All @@ -520,7 +529,7 @@ public class Session implements Closeable {
boolean validateIncomingMessage, int resendRequestChunkSize,
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
boolean allowPossDup) {
boolean allowPossDup, boolean prioritizeResend) {
this.application = application;
this.sessionID = sessionID;
this.sessionSchedule = sessionSchedule;
Expand Down Expand Up @@ -556,6 +565,7 @@ public class Session implements Closeable {
this.validateChecksum = validateChecksum;
this.logonTags = logonTags;
this.allowPosDup = allowPossDup;
this.prioritizeResend = prioritizeResend;

final Log engineLog = (logFactory != null) ? logFactory.create(sessionID) : null;
if (engineLog instanceof SessionStateListener) {
Expand Down Expand Up @@ -2343,6 +2353,9 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
int current = beginSeqNo;
boolean appMessageJustSent = false;

final List<Message> prioritizedResendAccumulator = prioritizeResend ? new ArrayList<>(messages.size()) :
Collections.emptyList();

for (final String message : messages) {
appMessageJustSent = false;
final Message msg;
Expand Down Expand Up @@ -2375,9 +2388,13 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
send(msg.toString());
if(prioritizeResend) {
prioritizedResendAccumulator.add(msg);
} else {
send(msg.toString());
appMessageJustSent = true;
}
begin = 0;
appMessageJustSent = true;
} else {
if (begin == 0) {
begin = msgSeqNum;
Expand All @@ -2387,6 +2404,12 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
current = msgSeqNum + 1;
}

// schedule prioritized resend messages if any
if (!prioritizedResendAccumulator.isEmpty()) {
sendPrioritized(prioritizedResendAccumulator);
appMessageJustSent = true;
}

int newBegin = beginSeqNo;
if (appMessageJustSent) {
newBegin = msgSeqNum + 1;
Expand Down Expand Up @@ -2761,6 +2784,20 @@ public boolean send(Message message, boolean allowPosDup) {
return sendRaw(message, 0);
}

private int sendPrioritized(List<Message> prioritizedMessages) {
Responder responder = getResponder();
if(responder == null) {
getLog().onEvent("No responder, not sending message: " + prioritizedMessages.toString());
return 0;
}
final List<String> data = prioritizedMessages.stream().map(m -> {
String msg = m.toString();
getLog().onOutgoing(msg);
return msg;
}).collect(Collectors.toList());
return responder.prioritySend(data);
}

private boolean send(String messageString) {
getLog().onOutgoing(messageString);
Responder responder;
Expand Down
38 changes: 38 additions & 0 deletions quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@

import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.core.write.WriteRequestQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.Responder;
import quickfix.Session;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;

/**
* The class that partially integrates the QuickFIX/J Session to
Expand Down Expand Up @@ -75,6 +79,40 @@ public boolean send(String data) {
return true;
}

@Override
public int prioritySend(List<String> data){
final WriteRequestQueue writeRequestQueue = ioSession.getWriteRequestQueue();
final List<WriteRequest> pendingWrites = new ArrayList<>(writeRequestQueue.size());
int successfulMessageCount = 0;
try {
ioSession.suspendWrite();
// drain existing pending writes, to be rescheduled in the end
// a work around as WriteRequestQueue is currently not a Deque
WriteRequest pending;
while ((pending = writeRequestQueue.poll(ioSession)) != null) {
pendingWrites.add(pending);
}
for (String d : data) {
if (this.send(d)) {
successfulMessageCount++;
} else {
break;
}
}
} finally {
// reschedule de-prioritized over existing priority send to the end of the queue
try {
for (WriteRequest pendingWrite : pendingWrites) {
writeRequestQueue.offer(ioSession, pendingWrite);
}
} catch (Exception e) {
log.error("Failed to reschedule pending writes: {}", e.getMessage());
}
ioSession.resumeWrite();
}
return successfulMessageCount;
}

@Override
public void disconnect() {
// We cannot call join() on the CloseFuture returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public static Session createSession(SessionID sessionID, Application application
.build();
}

public static Session createSessionWithPersistedMessages(SessionID sessionID, Application application,
boolean isInitiator, boolean prioritizeResend) {
return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator)
.setCheckLatency(true).setMaxLatency(Session.DEFAULT_MAX_LATENCY)
.setCheckCompID(true)
.setPersistMessages(true)
.setPrioritizeResend(prioritizeResend)
.build();
}

public static Session createFileStoreSession(SessionID sessionID, Application application,
boolean isInitiator, SessionSettings settings, SessionSchedule sessionSchedule) {
return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator)
Expand Down Expand Up @@ -113,6 +123,7 @@ public static final class Builder {
private final boolean enableLastMsgSeqNumProcessed = false;
private final boolean validateChecksum = true;
private final boolean allowPosDup = false;
private boolean prioritizeResend = false;
private List<StringField> logonTags = new ArrayList<>();

public Session build() {
Expand All @@ -125,7 +136,7 @@ public Session build() {
resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage,
rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore,
allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum,
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup);
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup, prioritizeResend);
}

public Builder setBeginString(final String beginString) {
Expand Down Expand Up @@ -239,5 +250,11 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs
this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum;
return this;
}

public Builder setPrioritizeResend(final boolean prioritizeResend) {
this.prioritizeResend = prioritizeResend;
return this;
}

}
}
Loading