X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=55024b265aea8cb9b9d29d6a667207b51f669e90;hb=0281535ab08fd795e42df66d25e9a904ff941ad7;hp=3c4c9046c82c4d48e0504cfe4677231e2ef50c25;hpb=a51163ead1d60e66eeaf3691adb70b019ce60fb2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 3c4c9046c8..55024b265a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -23,8 +23,10 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -54,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -84,6 +87,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * End-to-end distributed data store tests that exercise remote shards and transactions. @@ -534,6 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); } + @SuppressWarnings("unchecked") @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); @@ -563,7 +570,7 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); @@ -587,8 +594,11 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -596,6 +606,7 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } + @SuppressWarnings("unchecked") @Test public void testForwardedReadyTransactionForwardedToLeader() throws Exception { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); @@ -626,7 +637,7 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); @@ -651,8 +662,11 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( ((ReadyTransactionReply)resp).getCohortPath()); + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( - leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2"); + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -720,13 +734,83 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @Test(expected=NoShardLeaderException.class) + @Test + public void testLeadershipTransferOnShutdown() throws Exception { + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); + followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); + String testName = "testLeadershipTransferOnShutdown"; + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); + + IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); + DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false); + + // Create and submit a couple tx's so they're pending. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); + } + }); + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); + } + }); + + // Gracefully stop the leader via a Shutdown message. + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + ActorRef leaderActor = Await.result(future, duration); + + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown()); + + // Commit the 2 transactions. They should finish and succeed. + + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); + + // Wait for the leader actor stopped. + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + // Verify leadership was transferred by reading the committed data from the other nodes. + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + } + + @Test public void testTransactionWithIsolatedLeader() throws Throwable { leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); String testName = "testTransactionWithIsolatedLeader"; initDatastoresWithCars(testName); - JavaTestKit.shutdownActorSystem(followerSystem, null, true); + DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerDistributedDataStore.close(); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { @Override @@ -735,14 +819,22 @@ public class DistributedDataStoreRemotingIntegrationTest { } }); - DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(writeTx.ready()); + leaderTestKit.doCommit(failWriteTx.ready()); + fail("Expected NoShardLeaderException"); } catch (ExecutionException e) { - throw e.getCause(); + assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass()); } + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardElectionTimeoutFactor(100)); + + DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready(); + + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); + + leaderTestKit.doCommit(writeTxCohort); } @Test(expected=AskTimeoutException.class)