BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index e157e429e1d9087de43fb1db1957952949abf749..3b14705613fe00c49604d308b76c9bde983b6552 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +57,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -103,9 +108,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static final String[] CARS = {"cars"};
 
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
-            "akka.tcp://cluster-test@127.0.0.1:2558");
+            "akka://cluster-test@127.0.0.1:2558");
     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
-            "akka.tcp://cluster-test@127.0.0.1:2559");
+            "akka://cluster-test@127.0.0.1:2559");
 
     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
@@ -124,13 +129,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private final TransactionIdentifier tx1 = nextTransactionId();
     private final TransactionIdentifier tx2 = nextTransactionId();
 
-    private DistributedDataStore followerDistributedDataStore;
-    private DistributedDataStore leaderDistributedDataStore;
+    private AbstractDataStore followerDistributedDataStore;
+    private AbstractDataStore leaderDistributedDataStore;
     private IntegrationTestKit followerTestKit;
     private IntegrationTestKit leaderTestKit;
 
     @Before
     public void setUp() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
@@ -153,17 +161,20 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
         JavaTestKit.shutdownActorSystem(follower2System);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
-    private void initDatastoresWithCars(String type) {
+    private void initDatastoresWithCars(final String type) {
         initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
     }
 
-    private void initDatastoresWithCarsAndPeople(String type) {
+    private void initDatastoresWithCarsAndPeople(final String type) {
         initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
     }
 
-    private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
+    private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
         leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
@@ -175,7 +186,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
     }
 
-    private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
+    private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
+            throws Exception {
         Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
         assertEquals("isPresent", true, optional.isPresent());
 
@@ -187,14 +199,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Car list node", listBuilder.build(), optional.get());
     }
 
-    private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> expNode) throws Exception {
+    private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
+            final NormalizedNode<?, ?> expNode) throws Exception {
         Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
         assertEquals("isPresent", true, optional.isPresent());
         assertEquals("Data node", expNode, optional.get());
     }
 
-    private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
+    private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
+            throws Exception {
         Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
         assertEquals("exists", true, exists);
     }
@@ -205,7 +218,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
+        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -250,7 +263,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
 
-        try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
                 .setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
@@ -508,7 +521,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
+        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
 
         // Write top-level car container from the follower so it uses a remote Tx.
 
@@ -538,7 +551,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
 
-        try (DistributedDataStore ds =
+        try (AbstractDataStore ds =
                 newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
 
             followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
@@ -804,8 +817,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         String testName = "testLeadershipTransferOnShutdown";
         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
 
-        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
-        try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
                 MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
 
             // Create and submit a couple tx's so they're pending.
@@ -910,6 +924,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test(expected = AskTimeoutException.class)
     public void testTransactionWithShardLeaderNotResponding() throws Exception {
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
 
         // Do an initial read to get the primary shard info cached.
@@ -980,7 +995,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
 
-        try (DistributedDataStore ds =
+        try (AbstractDataStore ds =
                 follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
@@ -1008,7 +1023,57 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+    @Test
+    public void testInstallSnapshot() throws Exception {
+        final String testName = "testInstallSnapshot";
+        final String leaderCarShardName = "member-1-shard-cars-" + testName;
+        final String followerCarShardName = "member-2-shard-cars-" + testName;
+
+        // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
+        // install a snapshot to sync the follower.
+
+        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+        tree.setSchemaContext(SchemaContextHelper.full());
+
+        ContainerNode carsNode = CarsModel.newCarsNode(
+                CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+        AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
+
+        NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+        Snapshot initialSnapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
+                Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
+        InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
+        InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
+
+        initDatastoresWithCars(testName);
+
+        Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
+                CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, readOptional.isPresent());
+        assertEquals("Node", carsNode, readOptional.get());
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+    }
+
+    private static void verifySnapshot(Snapshot actual, Snapshot expected, NormalizedNode<?, ?> expRoot) {
+        assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
+        assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
+        assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
+        MetadataShardDataTreeSnapshot shardSnapshot =
+                (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+    }
+
+    private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Answer<DatastoreContext> answer = invocation -> newBuilder.build();