Today I had another time the problem to execute many short tasks in a thread pooled environment. One way could be to write the pooling itselfs (like the other times in history ;)). But I thought: Why to reinvent the wheel every time and not using standardized libraries like the java.util.concurrent.*?
What I want:
- A self enlarging and shrinking ThreadPool depending on the queued tasks. (Because sometimes it is nothing to do and sometimes it could be to get many hundreds of tasks in a short time)
- A definable Limit of maximum running Threads (Because we cannot create hundreds of threads just for this task. Keep in mind: the tasks have a short run time)
- Queuing the Tasks to run later, if the ThreadPool has reached the given limits
So I looked at the given Examples and the Executors-Class, which provides fast and simple methods:
- newCachedThreadPool : This looks good, but unfortunately you can’t define a maximum limit of threads.
- newFixedThreadPool : This handles the wanted limitation, but not the automatic enlarging and shrinking.
So we cannot use the predefined methods and need to instantiate ThreadPoolExecutor directly.
The simple Constructor is:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
Here we can define the minimum pool size (would be zero) and the maximum pool size (for example 5). The keepAliveTime and unit would be the time a idle thread will wait for new jobs, until it will be terminated. The given WorkQueue handles all jobs, which cannot executed directly. So the best Java provided BlockingQueue would be LinkedBlockingQueue. This queue holds up to a given count (for us unlimited) jobs. Unfortunately the result is not the wanted. The ThreadPoolExecutor only uses the threads in the core pool and ignores the maximumPoolSize.
If a new task should be executed, the ThreadPoolExecutor will try the following:
- If the ThreadPoolExecutor is not running, the job will be rejected
- If the current pool size is under the core pool size a new thread is created
- The job will be queued in the work queue (using BlockingQueue.offer)
- If not possible a new thread is created up to max pool size
- otherwise the job will be rejected
The problem here is that step 3 and 4 are unfortunately changed as I expected. (We wanted result would be use as many threads as possible up to the limit and otherwise queue it).But it is correct, because the running threads uses the work queue to get the following jobs they should do. Because we want to give the job to idle threads first and create new otherwise.
So one solution could be a SynchronousQueue which only accept a job if a idle thread is waiting. But if all threads are busy the ThreadPoolExecutor will reject the Job. So I had to implement an own BlockQueue.
To implement the right way and honour the ThreadPoolExecutor-Implementation, we Queue should only accept a new job, if an idle thread is waiting. But instead of rejecting the the job, the job should be queued. The result is the following implementation of my BufferedBlockingQueue and the associated BufferedBlockingQueueRejectedExecutionHandler.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | public class BufferedBlockingQueue< E > implements BlockingQueue< E > { private Queue< E > queue = new LinkedList< E >(); private int inWait = 0; public synchronized boolean add(E o) { boolean result = queue.add(o); if (result && inWait > 0) notify(); return result; } public int drainTo(Collection< ? super E > c) { return drainTo(c, Integer.MAX_VALUE); } public synchronized int drainTo(Collection< ? super E > c, int maxElements) { int n = 0; while (!queue.isEmpty() && maxElements > 0) { c.add(queue.poll()); n++; } return n; } public synchronized boolean offer(E o) { if (inWait == 0) return false; return add(o); } public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { if (inWait == 0) return false; return add(o); } public synchronized E poll(long timeout, TimeUnit unit) throws InterruptedException { if (queue.isEmpty()) { inWait++; try { wait(unit.toMillis(timeout)); } finally { inWait--; } } return queue.poll(); } public synchronized void put(E o) throws InterruptedException { queue.add(o); notify(); } public int remainingCapacity() { return Integer.MAX_VALUE; } public synchronized E take() throws InterruptedException { return poll(0, TimeUnit.MILLISECONDS); } public synchronized E element() { return queue.element(); } public synchronized E peek() { return queue.peek(); } public synchronized E poll() { return queue.poll(); } public synchronized E remove() { return queue.remove(); } public synchronized boolean addAll(Collection< ? extends E > c) { notify(); return addAll(c); } public synchronized void clear() { queue.clear(); } public synchronized boolean contains(Object o) { return queue.contains(o); } public synchronized boolean containsAll(Collection< ? > c) { return queue.containsAll(c); } public synchronized boolean isEmpty() { return queue.isEmpty(); } public synchronized Iterator< E > iterator() { return queue.iterator(); } public synchronized boolean remove(Object o) { return queue.remove(o); } public synchronized boolean removeAll(Collection< ? > c) { return queue.removeAll(c); } public synchronized boolean retainAll(Collection< ? > c) { return queue.retainAll(c); } public synchronized int size() { return queue.size(); } public synchronized Object[] toArray() { return queue.toArray(); } public synchronized < T > T[] toArray(T[] a) { return queue.toArray(a); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public class BufferedBlockingQueueRejectedExecutionHandler implements RejectedExecutionHandler { @SuppressWarnings("unchecked") public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (executor.isShutdown()) throw new RejectedExecutionException(); BlockingQueue< Runnable > blockingQueue = executor.getQueue(); if (!(blockingQueue instanceof BufferedBlockingQueue)) { throw new RejectedExecutionException(); } BufferedBlockingQueue< Runnable > queue = (BufferedBlockingQueue< Runnable >) blockingQueue; queue.add(r); } } |
The final constructor for the thread pool is:
1 2 3 | Executor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, new BufferedBlockingQueue< Runnable >(), new BufferedBlockingQueueRejectedExecutionHandler()); |