+ /**
+ * Put the given statistics transaction container into the cache.
+ *
+ * @param key Key that specifies the given transaction container.
+ * @param container Transaction container.
+ */
+ private synchronized void putTransaction(
+ String key, TransactionCacheContainer<? super TransactionAware> container) {
+ txCache.put(key, container);
+
+ SettableFuture<Boolean> future = txFutureCache.asMap().remove(key);
+ if (future != null) {
+ // Wake up a thread waiting for this transaction container.
+ future.set(true);
+ }
+ }
+
+ /**
+ * Check to see if the specified transaction container is cached in
+ * {@link #txCache}.
+ *
+ * @param key Key that specifies the transaction container.
+ * @return A future that will contain the result.
+ */
+ private synchronized Future<Boolean> isExpectedStatistics(String key) {
+ Future<Boolean> future;
+ TransactionCacheContainer<?> container = txCache.getIfPresent(key);
+ if (container == null) {
+ // Wait for the transaction container to be put into the cache.
+ SettableFuture<Boolean> f = SettableFuture.<Boolean>create();
+ SettableFuture<Boolean> current =
+ txFutureCache.asMap().putIfAbsent(key, f);
+ future = (current == null) ? f : current;
+ } else {
+ future = Futures.immediateFuture(Boolean.TRUE);
+ }
+
+ return future;
+ }
+