+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.actor.UntypedAbstractActor;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.Futures;
+import akka.testkit.TestActorRef;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
-import junit.framework.Assert;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
-import static org.junit.Assert.assertNotNull;
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
-public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+ private ActorUtils actorUtils;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+ private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
+ private final TransactionIdentifier tx = nextTransactionId();
- private ThreePhaseCommitCohortProxy proxy;
- private Props props;
- private ActorRef actorRef;
- private MockActorContext actorContext;
- private ExecutorService executor = Executors.newSingleThreadExecutor();
@Before
- public void setUp(){
- props = Props.create(MessageCollectorActor.class);
- actorRef = getSystem().actorOf(props);
- actorContext = new MockActorContext(this.getSystem());
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
+ new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
+ new PrimaryShardInfoFutureCache()) {
+ @Override
+ public Timer getOperationTimer(final String operationName) {
+ return commitTimer;
+ }
- proxy =
- new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(actorRef.path()), "txn-1", executor);
+ @Override
+ public double getTxCreationLimit() {
+ return 10.0;
+ }
+ };
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ for (int i = 1; i < 11; i++) {
+ // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
+ // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
+ doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
+ }
}
@Test
- public void testCanCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
+ public void testCanCommitYesWithOneCohort() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
- ListenableFuture<Boolean> future = proxy.canCommit();
+ verifyCanCommit(proxy.canCommit(), true);
+ verifyCohortActors();
+ }
- Assert.assertTrue(future.get().booleanValue());
+ @Test
+ public void testCanCommitNoWithOneCohort() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
+ verifyCanCommit(proxy.canCommit(), false);
+ verifyCohortActors();
}
@Test
- public void testPreCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
+ public void testCanCommitYesWithTwoCohorts() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
- ListenableFuture<Void> future = proxy.preCommit();
+ verifyCanCommit(proxy.canCommit(), true);
+ verifyCohortActors();
+ }
- future.get();
+ @Test
+ public void testCanCommitNoWithThreeCohorts() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.yes(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
+ CanCommitTransactionReply.no(CURRENT_VERSION))),
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
+ verifyCanCommit(proxy.canCommit(), false);
+ verifyCohortActors();
+ }
+
+ @Test(expected = TestException.class)
+ public void testCanCommitWithExceptionFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
+
+ propagateExecutionExceptionCause(proxy.canCommit());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCanCommitWithInvalidResponseType() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
+
+ propagateExecutionExceptionCause(proxy.canCommit());
+ }
+
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortFuture() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx)),
+ newCohortInfoWithFailedFuture(new TestException()),
+ newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
+
+ propagateExecutionExceptionCause(proxy.canCommit());
}
@Test
- public void testAbort() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
+ public void testAllThreePhasesSuccessful() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(
+ new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(
+ new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
+ }
+
+ @Test(expected = TestException.class)
+ public void testCommitWithExceptionFailure() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfo(
+ new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
+ newCohortInfo(
+ new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
+ .expectCommit(new TestException())));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
- ListenableFuture<Void> future = proxy.abort();
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ propagateExecutionExceptionCause(proxy.commit());
+ }
- future.get();
+ @Test(expected = IllegalArgumentException.class)
+ public void testCommitWithInvalidResponseType() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
+ .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test
- public void testCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
+ public void testAbort() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(
+ AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
- ListenableFuture<Void> future = proxy.commit();
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
+ }
- future.get();
+ @Test
+ public void testAbortWithFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
+ newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
+
+ // The exception should not get propagated.
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
}
@Test
- public void testGetCohortPaths() throws Exception {
- assertNotNull(proxy.getCohortPaths());
+ public void testAbortWithFailedCohortFuture() throws Exception {
+ List<CohortInfo> cohorts = Arrays.asList(
+ newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
+
+ verifySuccessfulFuture(proxy.abort());
+ verifyCohortActors();
+ }
+
+ @Test
+ public void testWithNoCohorts() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
+ Collections.<CohortInfo>emptyList(), tx);
+
+ verifyCanCommit(proxy.canCommit(), true);
+ verifySuccessfulFuture(proxy.preCommit());
+ verifySuccessfulFuture(proxy.commit());
+ verifyCohortActors();
+ }
+
+ @SuppressWarnings("checkstyle:avoidHidingCauseException")
+ private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ verifyCohortActors();
+ Throwables.propagateIfPossible(e.getCause(), Exception.class);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
+ TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
+ cohortActors.add(actor);
+ return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
+ }
+
+ private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
+ return newCohortInfo(builder, CURRENT_VERSION);
+ }
+
+ private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
+ return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
+ }
+
+ private void verifyCohortActors() {
+ for (TestActorRef<CohortActor> actor: cohortActors) {
+ actor.underlyingActor().verify();
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
+ try {
+ return future.get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ verifyCohortActors();
+ throw e;
+ }
+ }
+
+ private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
+ Boolean actual = verifySuccessfulFuture(future);
+ assertEquals("canCommit", expected, actual);
+ }
+
+ private static class CohortActor extends UntypedAbstractActor {
+ private final Builder builder;
+ private final AtomicInteger canCommitCount = new AtomicInteger();
+ private final AtomicInteger commitCount = new AtomicInteger();
+ private final AtomicInteger abortCount = new AtomicInteger();
+ private volatile AssertionError assertionError;
+
+ CohortActor(final Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void onReceive(final Object message) {
+ if (CanCommitTransaction.isSerializedType(message)) {
+ canCommitCount.incrementAndGet();
+ onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
+ builder.expCanCommitType, builder.canCommitReply);
+ } else if (CommitTransaction.isSerializedType(message)) {
+ commitCount.incrementAndGet();
+ onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
+ builder.expCommitType, builder.commitReply);
+ } else if (AbortTransaction.isSerializedType(message)) {
+ abortCount.incrementAndGet();
+ onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
+ builder.expAbortType, builder.abortReply);
+ } else {
+ assertionError = new AssertionError("Unexpected message " + message);
+ }
+ }
+
+ private void onMessage(final String name, final Object rawMessage,
+ final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
+ try {
+ assertNotNull("Unexpected " + name, expType);
+ assertEquals(name + " type", expType, rawMessage.getClass());
+ assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
+
+ if (reply instanceof Throwable) {
+ getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
+ } else {
+ getSender().tell(reply, self());
+ }
+ } catch (AssertionError e) {
+ assertionError = e;
+ }
+ }
+
+ void verify() {
+ if (assertionError != null) {
+ throw assertionError;
+ }
+
+ if (builder.expCanCommitType != null) {
+ assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
+ }
+
+ if (builder.expCommitType != null) {
+ assertEquals("CommitTransaction count", 1, commitCount.get());
+ }
+
+ if (builder.expAbortType != null) {
+ assertEquals("AbortTransaction count", 1, abortCount.get());
+ }
+ }
+
+ static class Builder {
+ private Class<?> expCanCommitType;
+ private Class<?> expCommitType;
+ private Class<?> expAbortType;
+ private Object canCommitReply;
+ private Object commitReply;
+ private Object abortReply;
+ private final TransactionIdentifier transactionId;
+
+ Builder(final TransactionIdentifier transactionId) {
+ this.transactionId = requireNonNull(transactionId);
+ }
+
+ Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
+ this.expCanCommitType = newExpCanCommitType;
+ this.canCommitReply = newCanCommitReply;
+ return this;
+ }
+
+ Builder expectCanCommit(final Object newCanCommitReply) {
+ return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
+ }
+
+ Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
+ this.expCommitType = newExpCommitType;
+ this.commitReply = newCommitReply;
+ return this;
+ }
+
+ Builder expectCommit(final Object newCommitReply) {
+ return expectCommit(CommitTransaction.class, newCommitReply);
+ }
+
+ Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
+ this.expAbortType = newExpAbortType;
+ this.abortReply = newAbortReply;
+ return this;
+ }
+
+ Builder expectAbort(final Object newAbortReply) {
+ return expectAbort(AbortTransaction.class, newAbortReply);
+ }
+
+ Props props() {
+ return Props.create(CohortActor.class, this);
+ }
+ }
}
}