01-11-2013

In this BlockingQueue example we show how to write a very simple producer - consumer with a blocking queue. This example generates SimpleAddition objects that require an addition of two numbers to be performed on the consumer thread. In this case the two values to be added are generated using java.util.Random's nextInt call. They are stored on the queue as a SimpleAddition transfer object and picked up for processing on the consumer thread.

We make the queue smaller than the number of items to be processed, so it is probable that we will need to block while producing, as the call to put may block.

package com.thecoderscorner.test;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
 * This example shows the use of a bounded blocking queue that is backed by
 * an array. In this example we have an immutable SimpleAddition class that
 * is passed from the producer to the consumer, we generate random numbers
 * using the Random class that form the basis for the addition.
 */
public class BlockingQueueExample {
    // the maximum number of items that can be on the queue
    private static final int MAX_CAPACITY = 100;
    // the number of items to queue, make larger than capacity
    private static final int ITEMS_TO_QUEUE = 250;
    // create a bounded queue based on an array.
    private BlockingQueue<SimpleAddition> queue =
            new ArrayBlockingQueue<SimpleAddition>(MAX_CAPACITY);
    // a latch that counts down the items we have processed
    private CountDownLatch consumeLatch = new CountDownLatch(ITEMS_TO_QUEUE);
    // and a random number generator for creating addition instances
    private Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) throws InterruptedException {
        BlockingQueueExample example = new BlockingQueueExample();
        example.init();
    }
    public void init() throws InterruptedException {
        // run the consumer thread, there could be more than one consumer
        Thread th = new Thread(new QueueProcessor(), "QueueProcessor");
        th.setDaemon(true); // don't hold the VM open for this thread
        th.start();
        // and load the queue with some values, should the queue fill
        // up, the put call will block.
        for(int i=0; i<ITEMS_TO_QUEUE;i++) {
            SimpleAddition addition = new SimpleAddition(
                        random.nextInt(128), random.nextInt(128)
            );
            queue.put(addition);
        }
        // This latch counts down each event in the producer, and once
        // all the events are processed we exit the VM.
        consumeLatch.await();
    }
    /**
     * This runnable simulates the real world producer, it takes items of
     * work from the queue and processes them. In a real world system
     * there could well be more than one processor thread.
     */
    private class QueueProcessor implements Runnable {
        @Override
        public void run() {
            try {
                while(!Thread.currentThread().isInterrupted()) {
                    // attempt to take the next work item off the queue
                    // if we consume quicker than the producer then take
                    // will block until there is work to do.
                    SimpleAddition addition = queue.take();
                    // and now do the work!
                    int result = addition.getN1() + addition.getN2();
                    System.out.printf("Addition: %d + %d = %d",
                            addition.getN1(), addition.getN2(), result);
                    System.out.println();
                    // decrement the countdown latch
                    consumeLatch.countDown();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                // no need to re set interrupted as we are outside loop
            }
        }
    }
    /**
     * This immutable object is used to pass the state over to the queue
     * processor. Made a static inner class to avoid the overhead of
     * access back to the instance.
     */
    private final static class SimpleAddition {
        private final int n1;
        private final int n2;
        public SimpleAddition(int n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }
        private int getN1() {
            return n1;
        }
        private int getN2() {
            return n2;
        }
    }
}

Example output (cut down from 250 repeated lines) for formatting

Addition: 84 + 38 = 122
Addition: 48 + 89 = 137
.. cut down for formatting ..
Addition: 17 + 104 = 121
Addition: 95 + 83 = 178
Process finished with exit code 0

These may be of interest

We use cookies to analyse traffic and to personalise content. We also embed Twitter and Youtube on some pages, these companies have their own privacy policies.

See the privacy policy and terms of use of this site should you need more information or wish to adjust your settings.