Saturday, March 10, 2012

An invokeAll for Hazelcast ExecutorServices and a distributed CacheLoader

[A more recent posting describes a much cleaner implementation of the functionality described in this article.]

The ExecutorService returned by Hazelcast[Instance].getExecutorService() does not support invokeAll, but I needed this functionality for work I'm doing, so I rolled my own restricted version:

ExecUtil has several variants of invokeAll. The generic variants take these values:

  • an ExecutorService, which is must have been returned by a Hazelcast getExecutorService call;
  • an Iterable of DistributedTasks<T>, where T is the common result type; and
  • a FutureCallback<T>, which is a Guava interface for specifying what to do with each returned result (and what to do, if anything, with exceptions thrown during task execution).
The non-generic variants use the Void result type. Generic and non-generic have two variants each, one that waits indefinitely and one that waits a given amount of time for all the results to finish.

The use of Iterable<DistributedTask<T>> allows lazy provision of the tasks. (Supporting this feature prevented a simpler implementation with just a CountDownLatch initialized to the number of tasks, because we don't know the number of tasks in advance.)

The main awkwardness is that both the Callable<T> used to create the DistributedTask and T itself must be Serializable, but this is, of course, a requirement imposed by DistributedTask.

Update (2012-Mar-11): I added a static method to ExecUtil that wraps an ExecutorService, implementing invokeAll in terms of ExecUtil.invokeAll if the argument was created by Hazelcast. Also added a ConcurrentFuction that wraps an existing Function for concurrent or distributed application.

A concurrent Guava CacheLoader

I then realized that this machinery could be used to provide a nice implementation of Guava's CacheLoader, used to build LoadingCache instances with CacheBuilder. LoadingCache has a getAll(Iterable<? extends K> keys) method that returns a Map<K, V> from keys to cached values. It calls CacheLoader.loadAll(Iterable<? extends K> keys) -- if it's implemented -- to load the values in order to cache them. If loadAll isn't implemented, it just loads the keys sequentially.

I wrote a CacheLoader implementation that can use ExecUtil.invokeAll to load the values using DistributedTasks. While I was at it, I made it so that if you don't have a Hazelcast-based ExecutorService, you can use a normal ExecutorService to load the values concurrently (in the same JVM).

The result is ConcurrentCacheLoader:

It has a nested Builder class that allows you to specify:

  • the ExecutorService used (including shortcuts for default HazelcastInstance ExecutorServices),
  • a time limit for loadAll,
  • a function to map a cache key to a key object for the DistributedTask, and
  • the actual function that maps cache keys to values.
There's a minimal set of tests here:

Here's how I'm using it:

Sample usage

That's a ConcurrentCacheLoader.Builder used to build an argument to a CacheBuilder method.

The upshot is that my call to getAll runs on all the nodes in my Hazelcast cluster.


The motivation for all this was the desire to make dozens of calls to JClouds' BlobStore.getBlob() in the space of a single HTTP request. The standard for-loop approach was taking too long, and I suddenly realized I could be asking the Hazelcast cluster to do the work.

