Monday, March 12, 2012

Further distributed CacheLoader developments

Updated 2012-Mar-13 and 2012-Mar-20; see notes at end.

In a discussion on the Guava mailing list after my previous post, Charles Fry and Louis Wasserman had some great ideas that spurred me to rewrite the distributed CacheLoader functionality from scratch.

The first part is static factory methods to turn an AsyncFunction into a CacheLoader:

CacheLoaders.java

This is independent of any mention of Hazelcast; it works with any AsyncFunction. The Iterable-ness of the keys passed to loadAll is preserved all the way through, so asynchronous processing can start before the keys have been completely iterated. This could be handy if, for example, the keys themselves are being generated asynchronously.

The other piece is a class with methods to create an AsyncFunction out of a Hazelcast-based Executor:

HazelcastAsyncFunction.java

Note that the public API of this class does not use any Hazelcast types.

The whole implementation is much cleaner and takes full advantage of existing Guava machinery. Here's what it looks like in use in my code:

    AsyncFunction asyncLookup =
        HazelcastAsyncFunction
            .from(syncLookup());
            .onExecutor(hazelcastExecutor())
            .withTaskKeyFunction(mapAccountToServer());

    LoadingCache cache = CacheBuilder.newBuilder().build(
        fromAsyncFunction(asyncLookup, 30L, TimeUnit.SECONDS));
    ...
    // Later on, this call causes the account lookups to be
    // magically distributed across the cluster:
    Map accountNameToInfo = cache.get(accountNames);

Update 2012-Mar-13: Louis Wasserman pointed out (in comments to this post) a race in the implementation of CacheLoaders. I've updated the code to remove the race. It doesn't use as many cool Guava tricks, but it's quite a bit simpler now.

Update 2012-Mar-20: Unpacked the example to make it less frightening.

7 comments:

Louis Wasserman said...

I'm slightly concerned about this just because you don't currently mandate that you don't return from loadAll until the entries are actually finished being put into the map. Honestly, I would just go ahead and construct an explicit ImmutableMap in loadAll, with the results -- iterating through the keys and results should be much cheaper than the loading itself.

Tembrel said...

I wanted to do that, but I wasn't sure whether ImmutableMap.Builder objects are safe for concurrent access.

Or did you just mean return ImmutableMap.copyOf(results)?

Louis Wasserman said...

Basically! Except I don't trust the way you're constructing results. I fully expect that LoadingCache will just snap up the entrySet of the generated Map, and if some of those callbacks haven't fired yet, you're screwed! So do it the old-fashioned way.

ImmutableList keyList = ImmutableList.copyOf(keys);
ListenableFuture> listFuture = Futures.successfulAsList(Iterables.transform(keyList, asyncFunction)); // or really just a function that returns the raw ListenableFuture for a key
List valueList = listFuture.get();
ImmutableMap.Builder builder = ImmutableMap.builder();
for(int i=0;i<keyList.size();i++){
builder.add(keyList.get(i), valueList.get(i));
}
return builder.build();

Louis Wasserman said...

In particular, the doc of e.g. ListenableFutureTask specifies: "There is no guaranteed ordering of execution of listeners, but any listener added through this method is guaranteed to be called once the computation is complete." So successfulAsList (which operates by adding a callback to the future) might return _before_ the entry is put into the map. I'm not sure there's a better way to do this than just the straightforward sequential way.

Tembrel said...

Yeah, now I see the race: The sync state can change to COMPLETED before the callbacks on the ExecutionList fire. *sigh*

But I can go back to using the same AtomicInteger/CountDownLatch technique that I used in ConcurrentCacheLoader over the weekend. The map.put(key, result) happens-before the counter decrement and potential latch opening, and the latch opening happens-before the map is returned. I won't have to build the map after the computation is done, and by avoiding successfulAsList, I won't be building an unused List.

I'm sure there are fancy lock-free ways to do distributed countdown, but I bet this is fine for most purposes.

I still have to copy the whole map if I catch TimeoutException in the timed version, but that's a hit I'm happy to take.

Ashwin Jayaprakash said...

This fragment looks so scary, it almost makes no sense. Why does the builder method keep creating new instances..doesn't it make the docs more confusing. Just my 2 cents.

.build(fromAsyncFunction(
HazelcastAsyncFunction
.from(new AccountInfoLookupFunction())
.onExecutor(hazelcast().getExecutorService())
.withTaskKeyFunction(new Function() {
public Object apply(String account) {
return accountToServer(account).or(account);
}
}),...


Coherence has a cleaner (more verbose) way of doing this - http://coherence.oracle.com/display/COH35UG/Pre-Loading+the+Cache or http://cohfu.wordpress.com/2010/01/06/bulk-loading-a-coherence-cache-from-an-oracle-database/

Tembrel said...

I unpacked the code a bit to make it less scary. But in any event it doesn't "keep creating new instances"; that's one-time code to build the cache. Once built, the only thing you need to do is say "cache.get(key)".

The docs aren't particularly confusing, because there are only two new things, the fromAsyncFunction method and the HazelcastAsyncFunction class. The latter turns a regular function into an asynchronous function, and the former turns an asynchronous function into a CacheLoader.

The Coherence examples you cite look (after a very brief skim) similar in nature to what is going on under the covers in my example. Not sure in what way they are cleaner, but they are indeed more verbose.