X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=2f7c790269f0f9702157b6468234154e9e1cb43d;hp=0c7575a61ddfeb6521b3ec23612245bbc2662cd9;hb=a7740542c8ce1985c0a35767966c781805dfad84;hpb=6f3c16acf17d0cb4d5f44b666751f8db84a652be diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0c7575a61d..2f7c790269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -43,6 +43,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier; @@ -99,7 +100,7 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class DistributedDataStoreRemotingIntegrationTest { +public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static final String[] CARS_AND_PEOPLE = {"cars", "people"}; private static final String[] CARS = {"cars"}; @@ -121,6 +122,8 @@ public class DistributedDataStoreRemotingIntegrationTest { private final DatastoreContext.Builder followerDatastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5). customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + private final TransactionIdentifier tx1 = nextTransactionId(); + private final TransactionIdentifier tx2 = nextTransactionId(); private DistributedDataStore followerDistributedDataStore; private DistributedDataStore leaderDistributedDataStore; @@ -141,6 +144,13 @@ public class DistributedDataStoreRemotingIntegrationTest { @After public void tearDown() { + if (followerDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + if (leaderDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); JavaTestKit.shutdownActorSystem(follower2System); @@ -517,6 +527,7 @@ public class DistributedDataStoreRemotingIntegrationTest { shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); @@ -568,7 +579,7 @@ public class DistributedDataStoreRemotingIntegrationTest { new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -587,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest { new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction("tx-2" , modification, false); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -604,7 +615,7 @@ public class DistributedDataStoreRemotingIntegrationTest { Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( leaderDistributedDataStore.getActorContext(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -633,9 +644,9 @@ public class DistributedDataStoreRemotingIntegrationTest { MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); - ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1", + ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true); + Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -653,9 +664,9 @@ public class DistributedDataStoreRemotingIntegrationTest { MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); - forwardedReady = new ForwardedReadyTransaction("tx-2", + forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false); + Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -672,7 +683,7 @@ public class DistributedDataStoreRemotingIntegrationTest { Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get(); ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy( leaderDistributedDataStore.getActorContext(), Arrays.asList( - new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2"); + new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2); cohort.canCommit().get(5, TimeUnit.SECONDS); cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); @@ -702,6 +713,13 @@ public class DistributedDataStoreRemotingIntegrationTest { } }); + MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getLastApplied", 0, raftState.getLastApplied()); + } + }); + // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in // the leader shard. @@ -764,6 +782,7 @@ public class DistributedDataStoreRemotingIntegrationTest { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). shardElectionTimeoutFactor(10)); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -940,6 +959,8 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. @@ -980,6 +1001,8 @@ public class DistributedDataStoreRemotingIntegrationTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));