Categories
Java

comfortable thread pools using java.util.concurrent

Today I had another time the problem to execute many short tasks in a thread pooled environment. One way could be to write the pooling itselfs (like the other times in history ;)). But I thought: Why to reinvent the wheel every time and not using standardized libraries like the java.util.concurrent.*?

What I want:

  • A self enlarging and shrinking ThreadPool depending on the queued tasks. (Because sometimes it is nothing to do and sometimes it could be to get many hundreds of tasks in a short time)
  • A definable Limit of maximum running Threads (Because we cannot create hundreds of threads just for this task. Keep in mind: the tasks have a short run time)
  • Queuing the Tasks to run later, if the ThreadPool has reached the given limits

So I looked at the given Examples and the Executors-Class, which provides fast and simple methods:

  1. newCachedThreadPool : This looks good, but unfortunately you can’t define a maximum limit of threads.
  2. newFixedThreadPool : This handles the wanted limitation, but not the automatic enlarging and shrinking.

So we cannot use the predefined methods and need to instantiate ThreadPoolExecutor directly.

The simple Constructor is:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)

Here we can define the minimum pool size (would be zero) and the maximum pool size (for example 5). The keepAliveTime and unit would be the time a idle thread will wait for new jobs, until it will be terminated. The given WorkQueue handles all jobs, which cannot executed directly. So the best Java provided BlockingQueue would be LinkedBlockingQueue. This queue holds up to a given count (for us unlimited) jobs. Unfortunately the result is not the wanted. The ThreadPoolExecutor only uses the threads in the core pool and ignores the maximumPoolSize.

If a new task should be executed, the ThreadPoolExecutor will try the following:

  1. If the ThreadPoolExecutor is not running, the job will be rejected
  2. If the current pool size is under the core pool size a new thread is created
  3. The job will be queued in the work queue (using BlockingQueue.offer)
  4. If not possible a new thread is created up to max pool size
  5. otherwise the job will be rejected

The problem here is that step 3 and 4 are unfortunately changed as I expected. (We wanted result would be use as many threads as possible up to the limit and otherwise queue it).But it is correct, because the running threads uses the work queue to get the following jobs they should do. Because we want to give the job to idle threads first and create new otherwise.

So one solution could be a SynchronousQueue which only accept a job if a idle thread is waiting. But if all threads are busy the ThreadPoolExecutor will reject the Job. So I had to implement an own BlockQueue.

To implement the right way and honour the ThreadPoolExecutor-Implementation, we Queue should only accept a new job, if an idle thread is waiting. But instead of rejecting the the job, the job should be queued. The result is the following implementation of my BufferedBlockingQueue and the associated BufferedBlockingQueueRejectedExecutionHandler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class BufferedBlockingQueue< E > implements BlockingQueue< E > {
 
    private Queue< E > queue = new LinkedList< E >();
 
    private int inWait = 0;
 
    public synchronized boolean add(E o) {
        boolean result = queue.add(o);
        if (result && inWait > 0)
            notify();
        return result;
    }
 
    public int drainTo(Collection< ? super E > c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
 
    public synchronized int drainTo(Collection< ? super E > c, int maxElements) {
        int n = 0;
        while (!queue.isEmpty() && maxElements > 0) {
            c.add(queue.poll());
            n++;
        }
        return n;
    }
 
    public synchronized boolean offer(E o) {
        if (inWait == 0)
            return false;
        return add(o);
    }
 
    public boolean offer(E o, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (inWait == 0)
            return false;
        return add(o);
    }
 
    public synchronized E poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        if (queue.isEmpty()) {
            inWait++;
            try {
                wait(unit.toMillis(timeout));
            } finally {
                inWait--;
            }
        }
        return queue.poll();
    }
 
    public synchronized void put(E o) throws InterruptedException {
        queue.add(o);
        notify();
    }
 
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }
 
    public synchronized E take() throws InterruptedException {
        return poll(0, TimeUnit.MILLISECONDS);
    }
 
    public synchronized E element() {
        return queue.element();
    }
 
    public synchronized E peek() {
        return queue.peek();
    }
 
    public synchronized E poll() {
        return queue.poll();
    }
 
    public synchronized E remove() {
        return queue.remove();
    }
 
    public synchronized boolean addAll(Collection< ? extends E > c) {
        notify();
        return addAll(c);
    }
 
    public synchronized void clear() {
        queue.clear();
    }
 
    public synchronized boolean contains(Object o) {
        return queue.contains(o);
    }
 
    public synchronized boolean containsAll(Collection< ? > c) {
        return queue.containsAll(c);
    }
 
    public synchronized boolean isEmpty() {
        return queue.isEmpty();
    }
 
    public synchronized Iterator< E > iterator() {
        return queue.iterator();
    }
 
    public synchronized boolean remove(Object o) {
        return queue.remove(o);
    }
 
    public synchronized boolean removeAll(Collection< ? > c) {
        return queue.removeAll(c);
    }
 
    public synchronized boolean retainAll(Collection< ? > c) {
        return queue.retainAll(c);
    }
 
    public synchronized int size() {
        return queue.size();
    }
 
    public synchronized Object[] toArray() {
        return queue.toArray();
    }
 
    public synchronized < T > T[] toArray(T[] a) {
        return queue.toArray(a);
    }
 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BufferedBlockingQueueRejectedExecutionHandler implements
        RejectedExecutionHandler {
 
    @SuppressWarnings("unchecked")
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (executor.isShutdown())
            throw new RejectedExecutionException();
        BlockingQueue< Runnable > blockingQueue = executor.getQueue();
        if (!(blockingQueue instanceof BufferedBlockingQueue)) {
            throw new RejectedExecutionException();
        }
        BufferedBlockingQueue< Runnable > queue = (BufferedBlockingQueue< Runnable >) blockingQueue;
        queue.add(r);
    }
 
}

The final constructor for the thread pool is:

1
2
3
Executor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, 
                       new BufferedBlockingQueue< Runnable >(), 
                       new BufferedBlockingQueueRejectedExecutionHandler());