X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FThreePhaseCommitCohortProxyTest.java;h=ba7295bcea75a7a3b4f9806549065211a0290f13;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hp=992518e100a0381ab4c915128cc7975510bed2fc;hpb=ed693440aa741fee9b94447f8404d89b4020f616;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 992518e100..ba7295bcea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,85 +1,424 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; + +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; +import akka.dispatch.Futures; +import akka.testkit.TestActorRef; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; -import junit.framework.Assert; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.Arrays; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { -import static org.junit.Assert.assertNotNull; + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } -public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { + private ActorContext actorContext; + + @Mock + private Timer commitTimer; + + @Mock + private Timer.Context commitTimerContext; + + @Mock + private Snapshot commitSnapshot; + + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final List> cohortActors = new ArrayList<>(); + private final TransactionIdentifier tx = nextTransactionId(); - private ThreePhaseCommitCohortProxy proxy; - private Props props; - private ActorRef actorRef; - private MockActorContext actorContext; - private ExecutorService executor = Executors.newSingleThreadExecutor(); @Before - public void setUp(){ - props = Props.create(MessageCollectorActor.class); - actorRef = getSystem().actorOf(props); - actorContext = new MockActorContext(this.getSystem()); + public void setUp() { + MockitoAnnotations.initMocks(this); - proxy = - new ThreePhaseCommitCohortProxy(actorContext, - Arrays.asList(actorRef.path()), "txn-1", executor); + actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)), + new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), + new PrimaryShardInfoFutureCache()) { + @Override + public Timer getOperationTimer(final String operationName) { + return commitTimer; + } + @Override + public double getTxCreationLimit() { + return 10.0; + } + }; + + doReturn(commitTimerContext).when(commitTimer).time(); + doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + for (int i = 1; i < 11; i++) { + // Keep on increasing the amount of time it takes to complete transaction for each tenth of a + // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on. + doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1); + } } @Test - public void testCanCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable()); + public void testCanCommitYesWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx); - ListenableFuture future = proxy.canCommit(); + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); + } - Assert.assertTrue(future.get().booleanValue()); + @Test + public void testCanCommitNoWithOneCohort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION)))), tx); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); } @Test - public void testPreCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable()); + public void testCanCommitYesWithTwoCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); - ListenableFuture future = proxy.preCommit(); + verifyCanCommit(proxy.canCommit(), true); + verifyCohortActors(); + } - future.get(); + @Test + public void testCanCommitNoWithThreeCohorts() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.yes(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit( + CanCommitTransactionReply.no(CURRENT_VERSION))), + newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + verifyCanCommit(proxy.canCommit(), false); + verifyCohortActors(); + } + + @Test(expected = TestException.class) + public void testCanCommitWithExceptionFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx); + + propagateExecutionExceptionCause(proxy.canCommit()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCanCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx); + + propagateExecutionExceptionCause(proxy.canCommit()); + } + + @Test(expected = TestException.class) + public void testCanCommitWithFailedCohortFuture() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx)), + newCohortInfoWithFailedFuture(new TestException()), + newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + propagateExecutionExceptionCause(proxy.canCommit()); } @Test - public void testAbort() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable()); + public void testAllThreePhasesSuccessful() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); - ListenableFuture future = proxy.abort(); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); + } + + @Test(expected = TestException.class) + public void testCommitWithExceptionFailure() throws Exception { + List cohorts = Arrays.asList( + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))), + newCohortInfo( + new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)) + .expectCommit(new TestException()))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCommitWithInvalidResponseType() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Arrays.asList(newCohortInfo(new CohortActor.Builder(tx) + .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx); - future.get(); + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + propagateExecutionExceptionCause(proxy.commit()); + } + + @Test + public void testAbort() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectAbort( + AbortTransactionReply.instance(CURRENT_VERSION)))), tx); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable()); + public void testAbortWithFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList( + newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx); - ListenableFuture future = proxy.commit(); + // The exception should not get propagated. + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); + } + + @Test + public void testAbortWithFailedCohortFuture() throws Exception { + List cohorts = Arrays.asList( + newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))); + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx); - future.get(); + verifySuccessfulFuture(proxy.abort()); + verifyCohortActors(); } @Test - public void testGetCohortPaths() throws Exception { - assertNotNull(proxy.getCohortPaths()); + public void testWithNoCohorts() throws Exception { + ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, + Collections.emptyList(), tx); + + verifyCanCommit(proxy.canCommit(), true); + verifySuccessfulFuture(proxy.preCommit()); + verifySuccessfulFuture(proxy.commit()); + verifyCohortActors(); + } + + @SuppressWarnings("checkstyle:avoidHidingCauseException") + private void propagateExecutionExceptionCause(final ListenableFuture future) throws Exception { + try { + future.get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + verifyCohortActors(); + Throwables.propagateIfPossible(e.getCause(), Exception.class); + throw new RuntimeException(e.getCause()); + } + } + + private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) { + TestActorRef actor = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort")); + cohortActors.add(actor); + return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version); + } + + private CohortInfo newCohortInfo(final CohortActor.Builder builder) { + return newCohortInfo(builder, CURRENT_VERSION); + } + + private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) { + return new CohortInfo(Futures.failed(failure), () -> CURRENT_VERSION); + } + + private void verifyCohortActors() { + for (TestActorRef actor: cohortActors) { + actor.underlyingActor().verify(); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private T verifySuccessfulFuture(final ListenableFuture future) throws Exception { + try { + return future.get(5, TimeUnit.SECONDS); + } catch (Exception e) { + verifyCohortActors(); + throw e; + } + } + + private void verifyCanCommit(final ListenableFuture future, final boolean expected) throws Exception { + Boolean actual = verifySuccessfulFuture(future); + assertEquals("canCommit", expected, actual); + } + + private static class CohortActor extends UntypedActor { + private final Builder builder; + private final AtomicInteger canCommitCount = new AtomicInteger(); + private final AtomicInteger commitCount = new AtomicInteger(); + private final AtomicInteger abortCount = new AtomicInteger(); + private volatile AssertionError assertionError; + + CohortActor(final Builder builder) { + this.builder = builder; + } + + @Override + public void onReceive(final Object message) { + if (CanCommitTransaction.isSerializedType(message)) { + canCommitCount.incrementAndGet(); + onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message), + builder.expCanCommitType, builder.canCommitReply); + } else if (CommitTransaction.isSerializedType(message)) { + commitCount.incrementAndGet(); + onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message), + builder.expCommitType, builder.commitReply); + } else if (AbortTransaction.isSerializedType(message)) { + abortCount.incrementAndGet(); + onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message), + builder.expAbortType, builder.abortReply); + } else { + assertionError = new AssertionError("Unexpected message " + message); + } + } + + private void onMessage(final String name, final Object rawMessage, + final AbstractThreePhaseCommitMessage actualMessage, final Class expType, final Object reply) { + try { + assertNotNull("Unexpected " + name, expType); + assertEquals(name + " type", expType, rawMessage.getClass()); + assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId()); + + if (reply instanceof Throwable) { + getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self()); + } else { + getSender().tell(reply, self()); + } + } catch (AssertionError e) { + assertionError = e; + } + } + + void verify() { + if (assertionError != null) { + throw assertionError; + } + + if (builder.expCanCommitType != null) { + assertEquals("CanCommitTransaction count", 1, canCommitCount.get()); + } + + if (builder.expCommitType != null) { + assertEquals("CommitTransaction count", 1, commitCount.get()); + } + + if (builder.expAbortType != null) { + assertEquals("AbortTransaction count", 1, abortCount.get()); + } + } + + static class Builder { + private Class expCanCommitType; + private Class expCommitType; + private Class expAbortType; + private Object canCommitReply; + private Object commitReply; + private Object abortReply; + private final TransactionIdentifier transactionId; + + Builder(final TransactionIdentifier transactionId) { + this.transactionId = Preconditions.checkNotNull(transactionId); + } + + Builder expectCanCommit(final Class newExpCanCommitType, final Object newCanCommitReply) { + this.expCanCommitType = newExpCanCommitType; + this.canCommitReply = newCanCommitReply; + return this; + } + + Builder expectCanCommit(final Object newCanCommitReply) { + return expectCanCommit(CanCommitTransaction.class, newCanCommitReply); + } + + Builder expectCommit(final Class newExpCommitType, final Object newCommitReply) { + this.expCommitType = newExpCommitType; + this.commitReply = newCommitReply; + return this; + } + + Builder expectCommit(final Object newCommitReply) { + return expectCommit(CommitTransaction.class, newCommitReply); + } + + Builder expectAbort(final Class newExpAbortType, final Object newAbortReply) { + this.expAbortType = newExpAbortType; + this.abortReply = newAbortReply; + return this; + } + + Builder expectAbort(final Object newAbortReply) { + return expectAbort(AbortTransaction.class, newAbortReply); + } + + Props props() { + return Props.create(CohortActor.class, this); + } + } } }