Saturday, March 10, 2012

An invokeAll for Hazelcast ExecutorServices and a distributed CacheLoader

[A more recent posting describes a much cleaner implementation of the functionality described in this article.]

The ExecutorService returned by Hazelcast[Instance].getExecutorService() does not support invokeAll, but I needed this functionality for work I'm doing, so I rolled my own restricted version:

ExecUtil.java

ExecUtil has several variants of invokeAll. The generic variants take these values:

  • an ExecutorService, which is must have been returned by a Hazelcast getExecutorService call;
  • an Iterable of DistributedTasks<T>, where T is the common result type; and
  • a FutureCallback<T>, which is a Guava interface for specifying what to do with each returned result (and what to do, if anything, with exceptions thrown during task execution).
The non-generic variants use the Void result type. Generic and non-generic have two variants each, one that waits indefinitely and one that waits a given amount of time for all the results to finish.

The use of Iterable<DistributedTask<T>> allows lazy provision of the tasks. (Supporting this feature prevented a simpler implementation with just a CountDownLatch initialized to the number of tasks, because we don't know the number of tasks in advance.)

The main awkwardness is that both the Callable<T> used to create the DistributedTask and T itself must be Serializable, but this is, of course, a requirement imposed by DistributedTask.

Update (2012-Mar-11): I added a static method to ExecUtil that wraps an ExecutorService, implementing invokeAll in terms of ExecUtil.invokeAll if the argument was created by Hazelcast. Also added a ConcurrentFuction that wraps an existing Function for concurrent or distributed application.

A concurrent Guava CacheLoader

I then realized that this machinery could be used to provide a nice implementation of Guava's CacheLoader, used to build LoadingCache instances with CacheBuilder. LoadingCache has a getAll(Iterable<? extends K> keys) method that returns a Map<K, V> from keys to cached values. It calls CacheLoader.loadAll(Iterable<? extends K> keys) -- if it's implemented -- to load the values in order to cache them. If loadAll isn't implemented, it just loads the keys sequentially.

I wrote a CacheLoader implementation that can use ExecUtil.invokeAll to load the values using DistributedTasks. While I was at it, I made it so that if you don't have a Hazelcast-based ExecutorService, you can use a normal ExecutorService to load the values concurrently (in the same JVM).

The result is ConcurrentCacheLoader:

ConcurrentCacheLoader.java

It has a nested Builder class that allows you to specify:

  • the ExecutorService used (including shortcuts for default HazelcastInstance ExecutorServices),
  • a time limit for loadAll,
  • a function to map a cache key to a key object for the DistributedTask, and
  • the actual function that maps cache keys to values.
There's a minimal set of tests here:

ConcurrentCacheLoaderTest.java

Here's how I'm using it:

Sample usage

That's a ConcurrentCacheLoader.Builder used to build an argument to a CacheBuilder method.

The upshot is that my call to getAll runs on all the nodes in my Hazelcast cluster.

Addendum:

The motivation for all this was the desire to make dozens of calls to JClouds' BlobStore.getBlob() in the space of a single HTTP request. The standard for-loop approach was taking too long, and I suddenly realized I could be asking the Hazelcast cluster to do the work.

No comments: