* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.lenient;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorSelection;
import akka.actor.Props;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedAbstractActor;
import akka.dispatch.Dispatchers;
import akka.dispatch.Futures;
import akka.testkit.TestActorRef;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
-
- @SuppressWarnings("serial")
static class TestException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
}
- private ActorContext actorContext;
+ private ActorUtils actorUtils;
@Mock
private Timer commitTimer;
-
@Mock
private Timer.Context commitTimerContext;
-
@Mock
private Snapshot commitSnapshot;
private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
private final TransactionIdentifier tx = nextTransactionId();
-
@Before
public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
+ actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
new PrimaryShardInfoFutureCache()) {
@Override
}
};
- doReturn(commitTimerContext).when(commitTimer).time();
- doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ lenient().doReturn(commitTimerContext).when(commitTimer).time();
+ lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot();
for (int i = 1; i < 11; i++) {
// Keep on increasing the amount of time it takes to complete transaction for each tenth of a
// percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
- doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
}
}
@Test
public void testCanCommitYesWithOneCohort() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
+ tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithOneCohort() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))),
+ tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
@Test
public void testCanCommitYesWithTwoCohorts() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
+ tx);
verifyCanCommit(proxy.canCommit(), true);
verifyCohortActors();
@Test
public void testCanCommitNoWithThreeCohorts() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.yes(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
- CanCommitTransactionReply.no(CURRENT_VERSION))),
- newCohortInfo(new CohortActor.Builder(tx)));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx))), tx);
verifyCanCommit(proxy.canCommit(), false);
verifyCohortActors();
}
- @Test(expected = TestException.class)
- public void testCanCommitWithExceptionFailure() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
+ @Test
+ public void testCanCommitWithExceptionFailure() {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
- propagateExecutionExceptionCause(proxy.canCommit());
+ propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testCanCommitWithInvalidResponseType() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
+ @Test
+ public void testCanCommitWithInvalidResponseType() {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
- propagateExecutionExceptionCause(proxy.canCommit());
+ assertEquals("Unexpected response type class java.lang.String",
+ propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class));
}
- @Test(expected = TestException.class)
+ @Test
public void testCanCommitWithFailedCohortFuture() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx)),
- newCohortInfoWithFailedFuture(new TestException()),
- newCohortInfo(new CohortActor.Builder(tx)));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx)),
+ newCohortInfoWithFailedFuture(new TestException()),
+ newCohortInfo(new CohortActor.Builder(tx))), tx);
- propagateExecutionExceptionCause(proxy.canCommit());
+ propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
}
@Test
public void testAllThreePhasesSuccessful() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(
- new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
- .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(
- new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
- .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
verifyCohortActors();
}
- @Test(expected = TestException.class)
+ @Test
public void testCommitWithExceptionFailure() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfo(
- new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
- .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
- newCohortInfo(
- new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
- .expectCommit(new TestException())));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(new TestException()))), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
- propagateExecutionExceptionCause(proxy.commit());
+ propagateExecutionExceptionCause(proxy.commit(), TestException.class);
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void testCommitWithInvalidResponseType() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
- .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit("invalid"))),
+ tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
- propagateExecutionExceptionCause(proxy.commit());
+ assertEquals("Unexpected response type class java.lang.String",
+ propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class));
}
@Test
public void testAbort() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectAbort(
- AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(
+ AbortTransactionReply.instance(CURRENT_VERSION)))),
+ tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testAbortWithFailure() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
- newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
// The exception should not get propagated.
verifySuccessfulFuture(proxy.abort());
@Test
public void testAbortWithFailedCohortFuture() throws Exception {
- List<CohortInfo> cohorts = Arrays.asList(
- newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
+ newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx);
verifySuccessfulFuture(proxy.abort());
verifyCohortActors();
@Test
public void testWithNoCohorts() throws Exception {
- ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
- Collections.<CohortInfo>emptyList(), tx);
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx);
verifyCanCommit(proxy.canCommit(), true);
verifySuccessfulFuture(proxy.preCommit());
verifyCohortActors();
}
- private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
-
- try {
- future.get(5, TimeUnit.SECONDS);
- fail("Expected ExecutionException");
- } catch (ExecutionException e) {
- verifyCohortActors();
- Throwables.propagateIfPossible(e.getCause(), Exception.class);
- throw new RuntimeException(e.getCause());
- }
+ private String propagateExecutionExceptionCause(final ListenableFuture<?> future,
+ final Class<? extends Exception> expected) {
+ final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause();
+ verifyCohortActors();
+ assertThat(ex, instanceOf(expected));
+ return ex.getMessage();
}
private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
assertEquals("canCommit", expected, actual);
}
- private static class CohortActor extends UntypedActor {
+ private static class CohortActor extends UntypedAbstractActor {
private final Builder builder;
private final AtomicInteger canCommitCount = new AtomicInteger();
private final AtomicInteger commitCount = new AtomicInteger();
private final AtomicInteger abortCount = new AtomicInteger();
private volatile AssertionError assertionError;
- private CohortActor(final Builder builder) {
+ CohortActor(final Builder builder) {
this.builder = builder;
}
private final TransactionIdentifier transactionId;
Builder(final TransactionIdentifier transactionId) {
- this.transactionId = Preconditions.checkNotNull(transactionId);
+ this.transactionId = requireNonNull(transactionId);
}
Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
- this.expCanCommitType = newExpCanCommitType;
- this.canCommitReply = newCanCommitReply;
+ expCanCommitType = newExpCanCommitType;
+ canCommitReply = newCanCommitReply;
return this;
}
}
Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
- this.expCommitType = newExpCommitType;
- this.commitReply = newCommitReply;
+ expCommitType = newExpCommitType;
+ commitReply = newCommitReply;
return this;
}
}
Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
- this.expAbortType = newExpAbortType;
- this.abortReply = newAbortReply;
+ expAbortType = newExpAbortType;
+ abortReply = newAbortReply;
return this;
}