Skip to content

Commit

Permalink
Connection::close() now calls Subscription::close(), which will ensur…
Browse files Browse the repository at this point in the history
…e that

the msgFeeder thread is terminated for async subscriptions.

Resolves #26
  • Loading branch information
Larry McQueary committed Aug 31, 2016
1 parent 6f273b5 commit 5eb33aa
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 221 deletions.
5 changes: 4 additions & 1 deletion src/main/java/io/nats/client/AsyncSubscriptionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public void run() {
logger.trace("msgFeeder entering delivery loop for subj: {} sid: {}", subject,
sid);
conn.deliverMsgs(mch);
logger.debug("Leaving msgFeeder for subject: "
+ AsyncSubscriptionImpl.this.getSubject() + " sid: "
+ AsyncSubscriptionImpl.this.getSid());
} catch (Exception e) {
logger.error("Error on async subscription for subject {}",
AsyncSubscriptionImpl.this.getSubject());
Expand All @@ -104,7 +107,7 @@ public void run() {
if (!isStarted()) {
executor = Executors.newSingleThreadExecutor(new NATSThreadFactory("msgfeeder"));
executor.execute(msgFeeder);
logger.trace("Started msgFeeder for subject: " + this.getSubject() + " sid: "
logger.debug("Started msgFeeder for subject: " + this.getSubject() + " sid: "
+ this.getSid());
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/nats/client/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ class ConnectionImpl implements Connection {
private Channel<Boolean> fch = new Channel<Boolean>(FLUSH_CHAN_SIZE);
private List<Thread> threads = new ArrayList<Thread>();

ConnectionImpl() {
}
ConnectionImpl() {}

ConnectionImpl(Options opts) {
this(opts, null);
Expand Down Expand Up @@ -488,6 +487,8 @@ private void close(ConnState closeState, boolean doCBs) {
sub.closed = true;
// Mark connection closed in subscription
sub.connClosed = true;
// Terminate thread executor
sub.close();
sub.mu.unlock();
}
subs.clear();
Expand Down Expand Up @@ -1183,7 +1184,7 @@ public void run() {
}

protected Thread go(final Runnable task, final String name, final String group,
final Phaser ph) {
final Phaser ph) {
NATSThread.setDebug(true);
NATSThread t = new NATSThread(task, name) {
public void run() {
Expand Down Expand Up @@ -1261,7 +1262,7 @@ class ConnectInfo {
private String version = ConnectionImpl.this.version;

public ConnectInfo(boolean verbose, boolean pedantic, String username, String password,
String token, boolean secure, String connectionName) {
String token, boolean secure, String connectionName) {
this.verbose = new Boolean(verbose);
this.pedantic = new Boolean(pedantic);
this.user = username;
Expand Down Expand Up @@ -1401,12 +1402,11 @@ protected void deliverMsgs(Channel<Message> ch) {
}

while (true) {
// logger.trace("Calling ch.get()...");
msg = ch.get();
// logger.trace("ch.get() returned " + m);

if (msg == null) {
// the channel has been closed, exit silently.
logger.debug("Channel closed, exiting msgFeeder loop");
return;
}

Expand Down Expand Up @@ -1965,7 +1965,7 @@ private void writePublishProto(ByteBuffer buffer, byte[] subject, byte[] reply,
}

// Used for handrolled itoa
static final byte[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};
static final byte[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };

void _publish(byte[] subject, byte[] reply, byte[] data) throws IOException {
int msgSize = (data != null) ? data.length : 0;
Expand Down
Loading

0 comments on commit 5eb33aa

Please # to comment.