X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=132ac47e7da77de739c6fb1b50fb7d201ce81259;hb=a1f54a11f6c8675404efc1afcca3e0bd5f9a7e36;hp=037a1dec5b076dc9a073ce2d796d4ebf078a09e5;hpb=26721a6f2bc3ce4d524ccb562d7e7a38b4b76068;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 037a1dec5b..132ac47e7d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -28,6 +28,7 @@ import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; @@ -42,6 +43,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; @@ -679,14 +681,14 @@ public class DistributedDataStoreRemotingIntegrationTest { public void testTransactionForwardedToLeaderAfterRetry() throws Exception { followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); - initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry"); + initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); // Do an initial write to get the primary shard info cached. - DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); - writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx1.ready()); + DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + followerTestKit.doCommit(initialWriteTx.ready()); // Wait for the commit to be replicated to the follower. @@ -697,21 +699,51 @@ public class DistributedDataStoreRemotingIntegrationTest { } }); - // Create and prepare wo and rw tx's. + // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in + // the leader shard. + + DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready(); + ListenableFuture writeTx1CanCommit = writeTx1Cohort.canCommit(); + writeTx1CanCommit.get(5, TimeUnit.SECONDS); + + // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued + // in the leader shard. - writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); + DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); LinkedList cars = new LinkedList<>(); int carIndex = 1; - for(; carIndex <= 5; carIndex++) { + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + carIndex++; + NormalizedNode people = PeopleModel.newPersonMapNode(); + writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); + DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); + + // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This + // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the + // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be + // sent on ready. + + DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); + for(int i = 1; i <= 5; i++, carIndex++) { cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); - writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } - DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); + // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications + // message on ready. + + DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); - writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; + // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the + // leader shard on ready. + DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); @@ -731,21 +763,28 @@ public class DistributedDataStoreRemotingIntegrationTest { leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); - // Submit tx's and enable elections on the follower so it becomes the leader, at which point the - // readied tx's should get forwarded from the previous leader. + // Submit all tx's - the messages should get queued for retry. - DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready(); - DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); - DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready(); + ListenableFuture writeTx2CanCommit = writeTx2Cohort.canCommit(); + DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready(); + DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready(); + DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready(); + + // Enable elections on the other follower so it becomes the leader, at which point the + // tx's should get forwarded from the previous leader to the new leader to complete the commits. sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); - followerTestKit.doCommit(cohort1); - followerTestKit.doCommit(cohort2); - followerTestKit.doCommit(cohort3); + followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); + followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort); + followerTestKit.doCommit(writeTx3Cohort); + followerTestKit.doCommit(writeTx4Cohort); + followerTestKit.doCommit(rwTxCohort); - verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()])); + DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); + verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); + verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); } @Test @@ -923,6 +962,9 @@ public class DistributedDataStoreRemotingIntegrationTest { IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS); + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + // Do an initial read to get the primary shard info cached. DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();