import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@SuppressWarnings("unchecked")
@Test
public void testAsyncCohort() throws Exception {
- ExecutorService executor = Executors.newSingleThreadExecutor();
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
doReturn(Futures.makeChecked(executeWithDelay(executor, mockPostCanCommit),
ex -> new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")))
.when(mockCohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
- doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
- mockPostPreCommit), MoreExecutors.directExecutor())).when(mockPostCanCommit).preCommit();
+ doReturn(executor.submit(() -> mockPostPreCommit)).when(mockPostCanCommit).preCommit();
- doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
- null), MoreExecutors.directExecutor())).when(mockPostPreCommit).commit();
+ doReturn(executor.submit(() -> null)).when(mockPostPreCommit).commit();
ActorRef cohortActor = newCohortActor("testAsyncCohort");
verify(mockPostPreCommit).abort();
}
- private <T> ListenableFuture<T> executeWithDelay(ExecutorService executor, T result) {
- return JdkFutureAdapters.listenInPoolThread(executor.submit(() -> {
+ private static <T> ListenableFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
+ return executor.submit(() -> {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
return result;
- }), MoreExecutors.directExecutor());
+ });
}
- private ActorRef newCohortActor(String name) {
+ private ActorRef newCohortActor(final String name) {
return actorFactory.createActor(DataTreeCohortActor.props(mockCohort, YangInstanceIdentifier.EMPTY), name);
}
doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
}
- private static void askAndAwait(ActorRef actor, CommitProtocolCommand<?> message) throws Exception {
+ private static void askAndAwait(final ActorRef actor, final CommitProtocolCommand<?> message) throws Exception {
Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Object result = Await.result(Patterns.ask(actor, message, timeout), timeout.duration());
assertTrue("Expected Success but was " + result, result instanceof Success);