X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=2f7c790269f0f9702157b6468234154e9e1cb43d;hp=8100bdc6a2ec7331baa31f26a02682674a60e3ad;hb=a7740542c8ce1985c0a35767966c781805dfad84;hpb=fd38fb36d0992a3a2391837f6e12615ed98a8bb5 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 8100bdc6a2..2f7c790269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -16,18 +16,25 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.dispatch.Futures; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.math.BigInteger; +import java.util.Arrays; +import java.util.LinkedList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -36,14 +43,25 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier; +import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; @@ -65,6 +83,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; @@ -72,31 +91,39 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * End-to-end distributed data store tests that exercise remote shards and transactions. * * @author Thomas Pantelis */ -public class DistributedDataStoreRemotingIntegrationTest { +public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { - private static final String[] SHARD_NAMES = {"cars", "people"}; + private static final String[] CARS_AND_PEOPLE = {"cars", "people"}; + private static final String[] CARS = {"cars"}; private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559"); - private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf"; - private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf"; + private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf"; + private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf"; + private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf"; private ActorSystem leaderSystem; private ActorSystem followerSystem; private ActorSystem follower2System; private final DatastoreContext.Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); private final DatastoreContext.Builder followerDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5). + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + private final TransactionIdentifier tx1 = nextTransactionId(); + private final TransactionIdentifier tx2 = nextTransactionId(); private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; @@ -117,24 +144,35 @@ public class DistributedDataStoreRemotingIntegrationTest { @After public void tearDown() { + if (followerDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + if (leaderDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); JavaTestKit.shutdownActorSystem(follower2System); } - private void initDatastores(String type) { - initDatastores(type, MODULE_SHARDS_CONFIG_2); + private void initDatastoresWithCars(String type) { + initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS); + } + + private void initDatastoresWithCarsAndPeople(String type) { + initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE); } - private void initDatastores(String type, String moduleShardsConfig) { + private void initDatastores(String type, String moduleShardsConfig, String[] shards) { leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards); followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); - followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); } private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception { @@ -164,7 +202,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testWriteTransactionWithSingleShard() throws Exception { String testName = "testWriteTransactionWithSingleShard"; - initDatastores(testName); + initDatastoresWithCars(testName); String followerCarShardName = "member-2-shard-cars-" + testName; InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class ); @@ -212,17 +250,17 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2")); - DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). - setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES); - - verifyCars(member2Datastore.newReadOnlyTransaction(), car2); + try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). + setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) { + verifyCars(member2Datastore.newReadOnlyTransaction(), car2); + } JavaTestKit.shutdownActorSystem(newSystem); } @Test public void testReadWriteTransactionWithSingleShard() throws Exception { - initDatastores("testReadWriteTransactionWithSingleShard"); + initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", rwTx); @@ -248,7 +286,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception { - initDatastores("testWriteTransactionWithMultipleShards"); + initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards"); DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -271,7 +309,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testReadWriteTransactionWithMultipleShards() throws Exception { - initDatastores("testReadWriteTransactionWithMultipleShards"); + initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards"); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", rwTx); @@ -294,7 +332,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionChainWithSingleShard() throws Exception { - initDatastores("testTransactionChainWithSingleShard"); + initDatastoresWithCars("testTransactionChainWithSingleShard"); DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -341,7 +379,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionChainWithMultipleShards() throws Exception{ - initDatastores("testTransactionChainWithMultipleShards"); + initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards"); DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); @@ -396,7 +434,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testChainedTransactionFailureWithSingleShard() throws Exception { - initDatastores("testChainedTransactionFailureWithSingleShard"); + initDatastoresWithCars("testChainedTransactionFailureWithSingleShard"); ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( @@ -429,7 +467,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception { - initDatastores("testChainedTransactionFailureWithMultipleShards"); + initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards"); ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( @@ -467,7 +505,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { String testName = "testSingleShardTransactionsWithLeaderChanges"; - initDatastores(testName); + initDatastoresWithCars(testName); String followerCarShardName = "member-2-shard-cars-" + testName; InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class ); @@ -485,12 +523,13 @@ public class DistributedDataStoreRemotingIntegrationTest { // Switch the leader to the follower - followerDatastoreContextBuilder.shardElectionTimeoutFactor(1); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS); @@ -498,26 +537,30 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); - newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES); + try (DistributedDataStore ds = + newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - // Write a car entry to the new leader - should switch to local Tx + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); - writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + // Write a car entry to the new leader - should switch to local Tx - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); - writeTx.merge(car1Path, car1); + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - followerTestKit.doCommit(writeTx.ready()); + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); + writeTx.merge(car1Path, car1); + + followerTestKit.doCommit(writeTx.ready()); - verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); + } } + @SuppressWarnings("unchecked") @Test public void testReadyLocalTransactionForwardedToLeader() throws Exception { - initDatastores("testReadyLocalTransactionForwardedToLeader"); + initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); @@ -525,16 +568,18 @@ public class DistributedDataStoreRemotingIntegrationTest { TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); dataTree.setSchemaContext(SchemaContextHelper.full()); - DataTreeModification modification = dataTree.takeSnapshot().newModification(); + // Send a tx with immediate commit. + + DataTreeModification modification = dataTree.takeSnapshot().newModification(); new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification); + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); + modification.ready(); - String transactionID = "tx-1"; - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -542,37 +587,338 @@ public class DistributedDataStoreRemotingIntegrationTest { throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); } - assertTrue("Expected response of type " + CommitTransactionReply.SERIALIZABLE_CLASS, - CommitTransactionReply.SERIALIZABLE_CLASS.equals(resp.getClass())); + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); + + // Send another tx without immediate commit. + + modification = dataTree.takeSnapshot().newModification(); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); + modification.ready(); + + readyLocal = new ReadyLocalTransaction(tx2 , modification, false); + + carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); + + ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ((ReadyTransactionReply)resp).getCohortPath()); + + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); - verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } - @Test(expected=NoShardLeaderException.class) + @SuppressWarnings("unchecked") + @Test + public void testForwardedReadyTransactionForwardedToLeader() throws Exception { + initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); + + Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); + assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + + carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); + DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class); + + // Send a tx with immediate commit. + + DataTreeModification modification = dataTree.takeSnapshot().newModification(); + new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); + new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); + + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, + DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( + Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true); + + carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + Object resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", CommitTransactionReply.class, resp.getClass()); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1); + + // Send another tx without immediate commit. + + modification = dataTree.takeSnapshot().newModification(); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); + + forwardedReady = new ForwardedReadyTransaction(tx2, + DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( + Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false); + + carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); + resp = followerTestKit.expectMsgClass(Object.class); + if(resp instanceof akka.actor.Status.Failure) { + throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause()); + } + + assertEquals("Response type", ReadyTransactionReply.class, resp.getClass()); + + ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection( + ((ReadyTransactionReply)resp).getCohortPath()); + + Supplier versionSupplier = Mockito.mock(Supplier.class); + Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); + ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( + leaderDistributedDataStore.getActorContext(), Arrays.asList( + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); + cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); + } + + @Test + public void testTransactionForwardedToLeaderAfterRetry() throws Exception { + followerDatastoreContextBuilder.shardBatchedModificationCount(2); + leaderDatastoreContextBuilder.shardBatchedModificationCount(2); + initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); + + // Do an initial write to get the primary shard info cached. + + DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + followerTestKit.doCommit(initialWriteTx.ready()); + + // Wait for the commit to be replicated to the follower. + + MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getLastApplied", 0, raftState.getLastApplied()); + } + }); + + MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getLastApplied", 0, raftState.getLastApplied()); + } + }); + + // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in + // the leader shard. + + DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready(); + ListenableFuture writeTx1CanCommit = writeTx1Cohort.canCommit(); + writeTx1CanCommit.get(5, TimeUnit.SECONDS); + + // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued + // in the leader shard. + + DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); + LinkedList cars = new LinkedList<>(); + int carIndex = 1; + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + carIndex++; + NormalizedNode people = PeopleModel.newPersonMapNode(); + writeTx2.write(PeopleModel.PERSON_LIST_PATH, people); + DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready(); + + // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This + // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the + // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be + // sent on ready. + + DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); + for(int i = 1; i <= 5; i++, carIndex++) { + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + } + + // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications + // message on ready. + + DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + carIndex++; + + // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the + // leader shard on ready. + + DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); + cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()); + } + }); + + // Disable elections on the leader so it switches to follower. + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). + shardElectionTimeoutFactor(10)); + + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); + + // Submit all tx's - the messages should get queued for retry. + + ListenableFuture writeTx2CanCommit = writeTx2Cohort.canCommit(); + DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready(); + DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready(); + DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready(); + + // Enable elections on the other follower so it becomes the leader, at which point the + // tx's should get forwarded from the previous leader to the new leader to complete the commits. + + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); + + followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); + followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort); + followerTestKit.doCommit(writeTx3Cohort); + followerTestKit.doCommit(writeTx4Cohort); + followerTestKit.doCommit(rwTxCohort); + + DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction(); + verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()])); + verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people); + } + + @Test + public void testLeadershipTransferOnShutdown() throws Exception { + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); + followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); + String testName = "testLeadershipTransferOnShutdown"; + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); + + IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); + try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { + + // Create and submit a couple tx's so they're pending. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); + } + }); + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); + } + }); + + // Gracefully stop the leader via a Shutdown message. + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardElectionTimeoutFactor(100)); + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + ActorRef leaderActor = Await.result(future, duration); + + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); + + // Commit the 2 transactions. They should finish and succeed. + + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); + + // Wait for the leader actor stopped. + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + // Verify leadership was transferred by reading the committed data from the other nodes. + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + } + } + + @Test public void testTransactionWithIsolatedLeader() throws Throwable { - leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300); + leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); String testName = "testTransactionWithIsolatedLeader"; - initDatastores(testName); + initDatastoresWithCars(testName); - JavaTestKit.shutdownActorSystem(followerSystem, null, true); + DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext() - .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS); + DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); + followerDistributedDataStore.close(); + followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); + + MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()); + } + }); try { - followerTestKit.doCommit(writeTx.ready()); + leaderTestKit.doCommit(failWriteTx.ready()); + fail("Expected NoShardLeaderException"); } catch (ExecutionException e) { - throw e.getCause(); + assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass()); } + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardElectionTimeoutFactor(100)); + + DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready(); + + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); + + leaderTestKit.doCommit(writeTxCohort); } @Test(expected=AskTimeoutException.class) public void testTransactionWithShardLeaderNotResponding() throws Throwable { - followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); - initDatastores("testTransactionWithShardLeaderNotResponding"); + initDatastoresWithCars("testTransactionWithShardLeaderNotResponding"); // Do an initial read to get the primary shard info cached. @@ -602,7 +948,7 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test(expected=NoShardLeaderException.class) public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable { - initDatastores("testTransactionWithCreateTxFailureDueToNoLeader"); + initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader"); // Do an initial read to get the primary shard info cached. @@ -613,10 +959,12 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -631,40 +979,48 @@ public class DistributedDataStoreRemotingIntegrationTest { @Test public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { - followerDatastoreContextBuilder.shardElectionTimeoutFactor(30); String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; - initDatastores(testName, MODULE_SHARDS_CONFIG_3); + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS); DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); - follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES); - // Do an initial read to get the primary shard info cached. + try (DistributedDataStore ds = + follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) { - DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); - // Shutdown the leader and try to create a new tx. + // Do an initial read to get the primary shard info cached. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); - followerDatastoreContextBuilder.operationTimeoutInMillis(500); - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); + // Shutdown the leader and try to create a new tx. - DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); - rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); - followerTestKit.doCommit(rwTx.ready()); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + followerTestKit.doCommit(rwTx.ready()); + } } private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { + final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); Answer answer = new Answer() { @Override public DatastoreContext answer(InvocationOnMock invocation) { - return builder.build(); + return newBuilder.build(); } }; Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();