The ExecutorService returned by Hazelcast[Instance].getExecutorService() does not support invokeAll, but I needed this functionality for work I'm doing, so I rolled my own restricted version:
ExecUtil has several variants of invokeAll. The generic variants take these values:
- an ExecutorService, which is must have been returned by a Hazelcast getExecutorService call;
- an Iterable of DistributedTasks<T>, where T is the common result type; and
- a FutureCallback<T>, which is a Guava interface for specifying what to do with each returned result (and what to do, if anything, with exceptions thrown during task execution).
The use of Iterable<DistributedTask<T>> allows lazy provision of the tasks. (Supporting this feature prevented a simpler implementation with just a CountDownLatch initialized to the number of tasks, because we don't know the number of tasks in advance.)
The main awkwardness is that both the Callable<T> used to create the DistributedTask and T itself must be Serializable, but this is, of course, a requirement imposed by DistributedTask.
Update (2012-Mar-11): I added a static method to ExecUtil that wraps an ExecutorService, implementing invokeAll in terms of ExecUtil.invokeAll if the argument was created by Hazelcast. Also added a ConcurrentFuction that wraps an existing Function for concurrent or distributed application.
A concurrent Guava CacheLoader
I then realized that this machinery could be used to provide a nice implementation of Guava's CacheLoader, used to build LoadingCache instances with CacheBuilder. LoadingCache has a getAll(Iterable<? extends K> keys) method that returns a Map<K, V> from keys to cached values. It calls CacheLoader.loadAll(Iterable<? extends K> keys) -- if it's implemented -- to load the values in order to cache them. If loadAll isn't implemented, it just loads the keys sequentially.
I wrote a CacheLoader implementation that can use ExecUtil.invokeAll to load the values using DistributedTasks. While I was at it, I made it so that if you don't have a Hazelcast-based ExecutorService, you can use a normal ExecutorService to load the values concurrently (in the same JVM).
The result is ConcurrentCacheLoader:
It has a nested Builder class that allows you to specify:
- the ExecutorService used (including shortcuts for default HazelcastInstance ExecutorServices),
- a time limit for loadAll,
- a function to map a cache key to a key object for the DistributedTask, and
- the actual function that maps cache keys to values.
ConcurrentCacheLoaderTest.java
Here's how I'm using it:
That's a ConcurrentCacheLoader.Builder used to build an argument to a CacheBuilder method.
The upshot is that my call to getAll runs on all the nodes in my Hazelcast cluster.
Addendum:
The motivation for all this was the desire to make dozens of calls to JClouds' BlobStore.getBlob() in the space of a single HTTP request. The standard for-loop approach was taking too long, and I suddenly realized I could be asking the Hazelcast cluster to do the work.
No comments:
Post a Comment