import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
@SuppressWarnings("unchecked")
@Test
public void testAsyncCohort() throws Exception {
- ExecutorService executor = Executors.newSingleThreadExecutor();
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
- doReturn(Futures.makeChecked(executeWithDelay(executor, mockPostCanCommit),
- ex -> new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")))
- .when(mockCohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
+ doReturn(executeWithDelay(executor, mockPostCanCommit))
+ .when(mockCohort).canCommit(any(Object.class), any(SchemaContext.class), any(Collection.class));
- doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
- mockPostPreCommit), MoreExecutors.directExecutor())).when(mockPostCanCommit).preCommit();
+ doReturn(executor.submit(() -> mockPostPreCommit)).when(mockPostCanCommit).preCommit();
- doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
- null), MoreExecutors.directExecutor())).when(mockPostPreCommit).commit();
+ doReturn(executor.submit(() -> null)).when(mockPostPreCommit).commit();
ActorRef cohortActor = newCohortActor("testAsyncCohort");
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
@Test
public void testFailureOnCanCommit() throws Exception {
DataValidationFailedException failure = new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock");
- doReturn(Futures.immediateFailedCheckedFuture(failure)).when(mockCohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(FluentFutures.immediateFailedFluentFuture(failure)).when(mockCohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
ActorRef cohortActor = newCohortActor("testFailureOnCanCommit");
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new Abort(txId));
verify(mockPostCanCommit).abort();
resetMockCohort();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
}
@Test
TransactionIdentifier txId = nextTransactionId();
askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
- verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
+ verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
askAndAwait(cohortActor, new PreCommit(txId));
verify(mockPostCanCommit).preCommit();
verify(mockPostPreCommit).abort();
}
- private <T> ListenableFuture<T> executeWithDelay(ExecutorService executor, T result) {
- return JdkFutureAdapters.listenInPoolThread(executor.submit(() -> {
+ private static <T> FluentFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
+ return FluentFuture.from(executor.submit(() -> {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
return result;
- }), MoreExecutors.directExecutor());
+ }));
}
- private ActorRef newCohortActor(String name) {
+ private ActorRef newCohortActor(final String name) {
return actorFactory.createActor(DataTreeCohortActor.props(mockCohort, YangInstanceIdentifier.EMPTY), name);
}
reset(mockCohort);
doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostCanCommit).abort();
doReturn(Futures.immediateFuture(mockPostPreCommit)).when(mockPostCanCommit).preCommit();
- doReturn(Futures.immediateCheckedFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
- any(Collection.class), any(SchemaContext.class));
+ doReturn(FluentFutures.immediateFluentFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
+ any(SchemaContext.class), any(Collection.class));
doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostPreCommit).abort();
doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
}
- private static void askAndAwait(ActorRef actor, CommitProtocolCommand<?> message) throws Exception {
+ private static void askAndAwait(final ActorRef actor, final CommitProtocolCommand<?> message) throws Exception {
Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
Object result = Await.result(Patterns.ask(actor, message, timeout), timeout.duration());
assertTrue("Expected Success but was " + result, result instanceof Success);