By dave | November 1, 2013

In this BlockingQueue example we show how to write a very simple producer - consumer with a blocking queue. This example generates SimpleAddition objects that require an addition of two numbers to be performed on the consumer thread. In this case the two values to be added are generated using java.util.Random's nextInt call. They are stored on the queue as a SimpleAddition transfer object and picked up for processing on the consumer thread.

We make the queue smaller than the number of items to be processed, so it is probable that we will need to block while producing, as the call to put may block.

package com.thecoderscorner.test;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
 * This example shows the use of a bounded blocking queue that is backed by
 * an array. In this example we have an immutable SimpleAddition class that
 * is passed from the producer to the consumer, we generate random numbers
 * using the Random class that form the basis for the addition.
 */
public class BlockingQueueExample {
    // the maximum number of items that can be on the queue
    private static final int MAX_CAPACITY = 100;
    // the number of items to queue, make larger than capacity
    private static final int ITEMS_TO_QUEUE = 250;
    // create a bounded queue based on an array.
    private BlockingQueue<SimpleAddition> queue =
            new ArrayBlockingQueue<SimpleAddition>(MAX_CAPACITY);
    // a latch that counts down the items we have processed
    private CountDownLatch consumeLatch = new CountDownLatch(ITEMS_TO_QUEUE);
    // and a random number generator for creating addition instances
    private Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) throws InterruptedException {
        BlockingQueueExample example = new BlockingQueueExample();
        example.init();
    }
    public void init() throws InterruptedException {
        // run the consumer thread, there could be more than one consumer
        Thread th = new Thread(new QueueProcessor(), "QueueProcessor");
        th.setDaemon(true); // don't hold the VM open for this thread
        th.start();
        // and load the queue with some values, should the queue fill
        // up, the put call will block.
        for(int i=0; i<ITEMS_TO_QUEUE;i++) {
            SimpleAddition addition = new SimpleAddition(
                        random.nextInt(128), random.nextInt(128)
            );
            queue.put(addition);
        }
        // This latch counts down each event in the producer, and once
        // all the events are processed we exit the VM.
        consumeLatch.await();
    }
    /**
     * This runnable simulates the real world producer, it takes items of
     * work from the queue and processes them. In a real world system
     * there could well be more than one processor thread.
     */
    private class QueueProcessor implements Runnable {
        @Override
        public void run() {
            try {
                while(!Thread.currentThread().isInterrupted()) {
                    // attempt to take the next work item off the queue
                    // if we consume quicker than the producer then take
                    // will block until there is work to do.
                    SimpleAddition addition = queue.take();
                    // and now do the work!
                    int result = addition.getN1() + addition.getN2();
                    System.out.printf("Addition: %d + %d = %d",
                            addition.getN1(), addition.getN2(), result);
                    System.out.println();
                    // decrement the countdown latch
                    consumeLatch.countDown();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                // no need to re set interrupted as we are outside loop
            }
        }
    }
    /**
     * This immutable object is used to pass the state over to the queue
     * processor. Made a static inner class to avoid the overhead of
     * access back to the instance.
     */
    private final static class SimpleAddition {
        private final int n1;
        private final int n2;
        public SimpleAddition(int n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }
        private int getN1() {
            return n1;
        }
        private int getN2() {
            return n2;
        }
    }
}

Example output (cut down from 250 repeated lines) for formatting

Addition: 84 + 38 = 122
Addition: 48 + 89 = 137
.. cut down for formatting ..
Addition: 17 + 104 = 121
Addition: 95 + 83 = 178
Process finished with exit code 0

Other pages within this category

comments powered by Disqus

This site uses cookies to analyse traffic, and to record consent. We also embed Twitter, Youtube and Disqus content on some pages, these companies have their own privacy policies.

Our privacy policy applies to all pages on our site

Should you need further guidance on how to proceed: External link for information about cookie management.

Send a message
X

Please use the forum for help with UI & libraries.

This message will be securely transmitted to our servers.