Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closing NATS connection may not close all running threads #26

Closed
kevinsookocheff-wf opened this issue Jun 16, 2016 · 8 comments
Closed
Assignees

Comments

@kevinsookocheff-wf
Copy link

kevinsookocheff-wf commented Jun 16, 2016

Hi,

I'm working on getting a simplified reproducible example. Will update this issue when that is ready.

The general problem is that closing a NATS connection may not close all threads if there is work being done in ConnectionImpl.deliverMsgs. I looks like if Channel.get() is called from deliverMsgs it calls q.take() (Channel.java:73), which blocks until data is available. I believe that calling interrupt() in Channel.close() will allow Channel.get() to unblock and exit correctly.

Below is a thread dump from jstack of an app that cannot exit because of hung NATS threads.


2016-06-16 12:47:13
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode):

"Attach Listener" #28 daemon prio=9 os_prio=31 tid=0x00007f830a001000 nid=0x3d0b waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #27 prio=5 os_prio=31 tid=0x00007f830925b000 nid=0x1703 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"msgfeeder" #17 prio=5 os_prio=31 tid=0x00007f830b083800 nid=0x6503 waiting on condition [0x0000700001b67000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076c610fd0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at io.nats.client.Channel.get(Channel.java:73)
    at io.nats.client.Channel.get(Channel.java:56)
    at io.nats.client.ConnectionImpl.deliverMsgs(ConnectionImpl.java:1404)
    at io.nats.client.AsyncSubscriptionImpl$1.run(AsyncSubscriptionImpl.java:94)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    at io.nats.client.NATSThread.run(NATSThread.java:63)

"msgfeeder" #15 prio=5 os_prio=31 tid=0x00007f830925a800 nid=0x6103 waiting on condition [0x0000700001961000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076c608710> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at io.nats.client.Channel.get(Channel.java:73)
    at io.nats.client.Channel.get(Channel.java:56)
    at io.nats.client.ConnectionImpl.deliverMsgs(ConnectionImpl.java:1404)
    at io.nats.client.AsyncSubscriptionImpl$1.run(AsyncSubscriptionImpl.java:94)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    at io.nats.client.NATSThread.run(NATSThread.java:63)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007f8309041000 nid=0x5303 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007f830b82a000 nid=0x5103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f830c003000 nid=0x4f03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f8309038000 nid=0x4d03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f830882c800 nid=0x4b03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f8308849800 nid=0x3e0f runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f8308818800 nid=0x3803 in Object.wait() [0x0000700000d3a000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f8308817800 nid=0x3603 in Object.wait() [0x0000700000c37000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
    - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)

"VM Thread" os_prio=31 tid=0x00007f830b851800 nid=0x3403 runnable

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f830c000800 nid=0x2403 runnable

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f8309006800 nid=0x2603 runnable

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f8309801800 nid=0x2803 runnable

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f830b801000 nid=0x2a03 runnable

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007f8309007000 nid=0x2c03 runnable

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007f830b002000 nid=0x2e03 runnable

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007f8309007800 nid=0x3003 runnable

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007f830c001800 nid=0x3203 runnable

"VM Periodic Task Thread" os_prio=31 tid=0x00007f830b82d000 nid=0x5503 waiting on condition

JNI global references: 349
@kevinsookocheff-wf
Copy link
Author

Example application that cannot exit:

package com.sookocheff.example

import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Main {

  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory cf = new ConnectionFactory(Constants.DEFAULT_URL);
    Connection nc = cf.createConnection();

    nc.subscribe("foo", m -> {
        System.out.println("Received a message: " + new String(m.getData()));
    });
    nc.close();

    System.out.println("Exiting application");
  }

}

@pires
Copy link
Contributor

pires commented Jun 23, 2016

I can confirm this behavior but haven't had the time to produce a PR I'm proud of since AFAIK BlockingQueue.clear() will not wake .take() as @kevinsookocheff-wf suggested. JRE documentation is clear about the fact BlockingQueue doesn't support a shutdown behavior.

Quoting:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

@pires
Copy link
Contributor

pires commented Jun 23, 2016

TBH I'm scared as hell of managing manually threads. However, @mcqueary is much more of a Java developer than I'll ever be so I bet he has a solution or two under the sleeve.

@kevinsookocheff-wf
Copy link
Author

I could take a stab at this but I'm having trouble getting the dev environment setup (can't find parent pom.xml when checking out a fresh copy of this repo).

@pires
Copy link
Contributor

pires commented Jun 23, 2016

@kevinsookocheff-wf you need to clone https://github.com/nats-io/nats-parent-pom and mvn install.

tylertreat added a commit to tylertreat/jnats that referenced this issue Jul 13, 2016
Currently, AsyncSubscriptionImpl leaks the msgfeeder thread on
unsubscribe because Channel.get() is never unblocked. This
overrides unsubscribe so that the thread properly terminates.
I believe this resolves issue nats-io#26.
@mcqueary
Copy link

@pires you flatter and overestimate me, sir.

@kevinsookocheff-wf I do believe that PR #31 closes this, but will confirm.

@mcqueary
Copy link

There are actually two cases to consider here.

  • Explicit unsubscription: AsyncSubscription::unsubscribe() method wasn't properly terminating the feeder thread, which Tyler's PR Override unsubscribe in AsyncSubscriptionImpl #31 addresses.
  • Indirect unsubscription through closing the connection:Connection::close() isn't correctly terminating async subscriptions. @kevinsookocheff-wf code above illustrates this problem. He's using an anonymous subscription, and not explicitly calling unsubscribe(), nor using try-with-resources to automatically close/unsubscribe it. In this case, the nc.close() call should still result in all subscriptions being terminated cleanly, and that's broken for async subscriptions. I will fix that and add a corresponding test.

Thanks Kevin/Tyler/Pires.

mcqueary pushed a commit that referenced this issue Aug 31, 2016
…e that

the msgFeeder thread is terminated for async subscriptions.

Resolves #26
@mcqueary mcqueary self-assigned this Aug 31, 2016
@mcqueary
Copy link

mcqueary commented Aug 31, 2016

Resolved this for the connection close scenario (described above) by ensuring that the connection close triggers the subscription close, which will shut down the msgFeeder thread for async subscriptions. Test(s) also added to guard against future regression.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants