X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=499d5e22b99463245747d679d4244d1a4725c0b3;hp=3c4c9046c82c4d48e0504cfe4677231e2ef50c25;hb=5a4560d475f0ed328275f1a5c7a5dae292acfb02;hpb=a51163ead1d60e66eeaf3691adb70b019ce60fb2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 3c4c9046c8..499d5e22b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; @@ -54,6 +55,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -84,6 +86,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * End-to-end distributed data store tests that exercise remote shards and transactions. @@ -563,7 +568,7 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); @@ -626,7 +631,7 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass()); + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); @@ -720,13 +725,83 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @Test(expected=NoShardLeaderException.class) + @Test + public void testLeadershipTransferOnShutdown() throws Exception { + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); + followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); + String testName = "testLeadershipTransferOnShutdown"; + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); + + IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); + DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false); + + // Create and submit a couple tx's so they're pending. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); + } + }); + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); + } + }); + + // Gracefully stop the leader via a Shutdown message. + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + ActorRef leaderActor = Await.result(future, duration); + + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown()); + + // Commit the 2 transactions. They should finish and succeed. + + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); + + // Wait for the leader actor stopped. + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + // Verify leadership was transferred by reading the committed data from the other nodes. + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + } + + @Test public void testTransactionWithIsolatedLeader() throws Throwable { leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); String testName = "testTransactionWithIsolatedLeader"; initDatastoresWithCars(testName); - JavaTestKit.shutdownActorSystem(followerSystem, null, true); + DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerDistributedDataStore.close(); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { @Override @@ -735,14 +810,22 @@ public class DistributedDataStoreRemotingIntegrationTest { } }); - DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - try { - followerTestKit.doCommit(writeTx.ready()); + leaderTestKit.doCommit(failWriteTx.ready()); + fail("Expected NoShardLeaderException"); } catch (ExecutionException e) { - throw e.getCause(); + assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass()); } + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardElectionTimeoutFactor(100)); + + DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready(); + + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); + + leaderTestKit.doCommit(writeTxCohort); } @Test(expected=AskTimeoutException.class)