Skip to content

Threads and Executors

Florian Schmaus edited this page Feb 23, 2018 · 11 revisions

Current Threads/Executors

AbstractXMPPConnection

  • removeCallbacksService (SingleThreadedScheduledExecutor, minSize=1, maxSize=1) used to remove pending callbacks after a timeout
  • cachedExecutorService (CachedThreadPool, minSize=0, maxSize=inf) used for Smack's asyncGo() API, which includes async listeners
  • singleThreadedExecutorService (SingleThreadExecutor, minSize=1, maxSize=1) used to invoke listeners, queue size Integer.MAX_INT, minSize set to '0' in Smack 4.2.4
  • Removed in Smack 4.2.4 executorService (ThreadPoolExecutor, minSize=1, maxSize=1) used to invoke collectors and eventually a singleThreadedExecutor for listeners, has a queue size of 100

XMPPTCPConnection

  • reader thread
  • writer thread

PingManager

  • executorService (SingleThreadedScheduledExecutor, minSize=1, maxSize=1) used to schedule server pings

Thoughts

  • XMPPTCPConnection could use NIO and therefore use a single thread. But needs to keep the bundle-and-defer feature.
  • sharing cachedExecutorService accross connections yields no benefit, as it's min core pool size is 0
  • remove callbacks service could eventually be shared
  • executorService's main (/only?) purpose is being a buffer, i.e. its queue. The other reason it exists is that it's core pool size is one, i.e. there is always one thread ready for work

Possible Savings

Current min threads per XMPPTCPConnection + PingManager: Min: 6

  • -1 - Use NIO in XMPPTCPConnection, makes XMPPTCPconnection use one thread for read/write
  • -2 - use below data structure to replace executorService and singleThreadedExecutorService
  • -1 - Share remove callbacks service, i.e. make it static (and ensure that it doesn't block VM termination, use daemon threads)
  • -1 - Shared pingManager scheduled executor (and ensure that it doesn't block VM termination, use daemon threads, also ensure that it doesn't cause memory leaks, eventually use weak refs)

Achievable min threads: 1 1x thread for XMPPTCPConnection NIO reader/writer

MultipleQueueNonConcurrentExecutionThreadSharingExecutor

DRAFT/WIP: May summons dragons when implemented and executed.

AtomicInteger queuesState;

put(Runnable runnable, int queueID) {
  queues[queueID].put(runnable);
  queuesState++;
  if (!inExecution[queueId] && currentWorkers < maxWorkers) {
    // create new worker
  }
}

workerLoop() {
  Runnable work = null;
  int savedQueueState = queuesState;
  for (int i = 0; i < maxQueues; i++)
    synchronized (queues[i]) {
      if (queues[i].isEmpty()) continue;
      if (inExecution[i]) continue;
      work = queues[i].pop();
      inExecution[i] = true;
    }
    work();
    inExecution[i] = false;
  }
  synchronized (this) {
    if (savedQueuedState == queuesState) // if no new work
      if (currentWorkers > minWorkers) {
       currentWorkers--;
       return;
      } else {
         // sleep until new work
      }
    }
  }
}