Bump odlparent to 6.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 3943e7ee563325f2fb371182ef6d18413b1dd7f9..94813e84991aef33c558e94b294dbce120dbe779 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -23,13 +24,15 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -38,15 +41,19 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -58,7 +65,9 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -68,10 +77,15 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -118,7 +132,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
         });
     }
 
@@ -328,6 +342,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.newCarPath("car" + i),
+                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+            followerTestKit.doCommit(writeTx.ready());
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        // wait to let the shard catch up with purged
+        await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+                                metadata.getPurgedTransactions().asRanges().iterator().next());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
+    @Test
+    @Ignore("Flushes out tell based leak needs to be handled separately")
+    public void testCloseTransactionMetadataLeak() throws Exception {
+        // Ask based frontend seems to have some issues with back to back close
+        Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.close();
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // wait to let the shard catch up with purged
+        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(1, ranges.size());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@@ -637,7 +776,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+        final Optional<ActorRef> carsFollowerShard =
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
@@ -704,7 +843,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+        final Optional<ActorRef> carsFollowerShard =
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
@@ -771,8 +910,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@@ -843,7 +982,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
         IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
-            stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
+            stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
 
         // Disable elections on the leader so it switches to follower.
 
@@ -883,8 +1022,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
@@ -948,8 +1087,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -1028,7 +1167,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
                         || e.getCause() instanceof ShardLeaderNotRespondingException);
             } else {
@@ -1067,7 +1206,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
             } else {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@@ -1115,6 +1254,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
+        final String testName = "testSemiReachableCandidateNotDroppingLeader";
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+        final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
+        final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
+
+        final AbstractDataStore ds2 =
+                     follower2TestKit.setupAbstractDataStore(
+                             testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+
+        followerTestKit.waitForMembersUp("member-1", "member-3");
+        follower2TestKit.waitForMembersUp("member-1", "member-2");
+
+        TestKit.shutdownActorSystem(follower2System);
+
+        ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+        OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        Cluster leaderCluster = Cluster.get(leaderSystem);
+        Cluster followerCluster = Cluster.get(followerSystem);
+        Cluster follower2Cluster = Cluster.get(follower2System);
+
+        Member follower2Member = follower2Cluster.readView().self();
+
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
+        await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
+
+        ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+
+        // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
+        // candidate, we can just send a couple of RequestVotes to both leader and follower.
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+        cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+        followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+
+        OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+        OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
+                .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+        assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
+        assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
+
+        ds2.close();
+    }
+
     @Test
     public void testInstallSnapshot() throws Exception {
         final String testName = "testInstallSnapshot";
@@ -1131,7 +1324,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
         AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
 
-        final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+        final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
         final Snapshot initialSnapshot = Snapshot.create(
                 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
                 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
@@ -1157,7 +1350,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
         // The slicing is only implemented for tell-based protocol
-        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+        Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
 
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);