By dave | May 10, 2015

In this example I demonstrate how to use ConcurrentMap along with Future to generate a lazy cache. Concurrent maps are a good choice in some situations, where absolute atomicity can be traded off for performance, some level of control over ordering of events has been traded for performance.

Let’s assume for the sake of example that we have an object that is heavy to create. Further, it has to be created on a single thread because it calls into a single threaded maths library. With this in mind we continue to design a lazy queue based cache.

If we look at the get method on the cache, it first checks if the map already contains the requested key. If the map does not have the key, next there is an attempt to create a new key value pair for this entry. Instead of locking for this operation we store a Future into the cache and submit the future to a work queue (Executor). However, we only add the entry to the work queue and the cache if the method putIfAbsent() indicates there was no previous entry. Note that putIfAbsent is atomic on it’s segment of the map. See  Concurrent Maps and CopyOnWriteArrayList  for more on this.

In summary, this means we never submit a job more than once and further we don’t change the entry in the cache once added. It also means that because we use future objects, others cannot get the object we are creating until it is fully ready. One assumption we make is that creation of the Future is light, as in some cases an excess Future object may be created.

FutureExecutionCache.java

package com.thecoderscorner.example.futurecache;

import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This class simulates a lazy cache based on Futures. It is not intended as
 * an exemplar of how to build a cache, rather see this as the starting point
 * of how to use Futures and Executors to build a working cache.
 *
 * For examples sake we have objects of type HeavyCreation that take some
 * time to create and can only be created on a single thread, we need to
 * queue up requests from multiple threads onto a single executor.
 * Requires Java 8.
 */
public class FutureExecutionCache {
    private static final Logger LOGGER = Logger.getLogger("FutureCache");

    // How long we should wait before timing out
    public static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(5);

    // Our cache stores a future for each HeavyCreation object - default segmentation is fine
    private ConcurrentMap<String, Future<HeavyCreation> > mapKeyToHeavy = new ConcurrentHashMap<>();

    // this is the single thread executor that we use
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    /**
     * This method gets the HeavyCreation for a given key. In this example we
     * simply generate the object and sleep for 1 second, as an example of a
     * long running process. In real life, the FutureTask could call into a
     * single threaded maths library to perform some data analysis.
     *
     * @param key the key into the cache
     * @return the HeavyCreation associated
     */
    public HeavyCreation get(String key) throws Exception {
        // check first if the item is in the cache, we are safe to use an
        // un-synchronised check here as the ConcurrentMap provides a safe
        // method for addition later.
        if(!mapKeyToHeavy.containsKey(key)) {

            // Create the future task object, that when given to an executor
            // will create the object and do the heavy operation
            FutureTask<HeavyCreation> futureTask = new FutureTask<>(() -> {
                HeavyCreation hco = new HeavyCreation(key);
                // simulate the process taking a long time
                hco.performLongInitTask();
                LOGGER.info("Created a heavy object with key: " + key);
                return hco;
            });

            // Here we add the task into the concurrent map, but ONLY if it does not
            // already exist in this map. Importantly this does not cause execution.
            if(mapKeyToHeavy.putIfAbsent(key, futureTask)==null) {
                // nothing previously in the map for this key, therefore this new task
                // is now in our cache and as such we need to execute the future task
                executorService.submit(futureTask);
            }
        }

        // If we got this far, we must either have created a Future task or one
        // already existed.
        Future<HeavyCreation> f = mapKeyToHeavy.get(key);
        HeavyCreation hc = f.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        LOGGER.info("Returning for key: " + hc.getKey());
        return hc;
    }

    public void printItems() {
        LOGGER.info("Printing out contents of cache:");
        mapKeyToHeavy.values().stream().forEach( val -> {
            try {
                LOGGER.info("Key: " + val.get().getKey() + ", value: " + val.get().getValue());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args)  {
        FutureExecutionCache theCache = new FutureExecutionCache();

        // lets make requests
        new Thread(() -> {
            try {
                theCache.get("Item1");
                waitForAWhile();
                theCache.get("Item2");
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Exception in thread", e);
            }
        }, "TestingThread").start();


        try {
            theCache.get("Item1");
            waitForAWhile();
            theCache.get("Item2");
            waitForAWhile();
            theCache.get("Item3");
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Exception in main", e);
        }

        theCache.printItems();
    }

    /**
     * This method just allows us to simulate using the cache in real
     * life across more than one thread, by increasing the probability
     * of context switching.
     */
    private static void waitForAWhile() throws InterruptedException {
        Thread.sleep(0);
    }

}

HeavyCreation.java

package com.thecoderscorner.example.futurecache;

/**
  this object is immutable, and therefore according to the java memory
  model we can assume it will be visible to all threads without need
  for a memory barrier (synchronized block).
 */
class HeavyCreation {
    private final String key;
    private final String value;

    // actually this is not really heavy to create, but we are simulating
    public HeavyCreation(String key) {
        this.key = key;
        this.value = key + ": " + key.hashCode();
    }

    public String getKey() {
        return key;
    }

    public String getValue() {
        return value;
    }

    public void performLongInitTask() throws InterruptedException {
        Thread.sleep(500);
    }
}

Lastly here is the output when the above program is run

May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$get$0
INFO: Created a heavy object with key: Item1
May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache get
INFO: Returning for key: Item1
May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache get
INFO: Returning for key: Item1
May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$get$0
INFO: Created a heavy object with key: Item2
May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache get
INFO: Returning for key: Item2
May 10, 2015 6:50:39 PM com.thecoderscorner.example.futurecache.FutureExecutionCache get
INFO: Returning for key: Item2
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$get$0
INFO: Created a heavy object with key: Item3
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache get
INFO: Returning for key: Item3
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache printItems
INFO: Printing out contents of cache:
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$printItems$1
INFO: Key: Item1, value: Item1: 70973278
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$printItems$1
INFO: Key: Item2, value: Item2: 70973279
May 10, 2015 6:50:40 PM com.thecoderscorner.example.futurecache.FutureExecutionCache lambda$printItems$1
INFO: Key: Item3, value: Item3: 70973280

Other pages within this category

comments powered by Disqus

This site uses cookies to analyse traffic, and to record consent. We also embed Twitter, Youtube and Disqus content on some pages, these companies have their own privacy policies.

Our privacy policy applies to all pages on our site

Should you need further guidance on how to proceed: External link for information about cookie management.

Send a message
X

Please use the forum for help with UI & libraries.

This message will be securely transmitted to our servers.