Adjust to yangtools-2.0.0 changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 3298fa2c05c2464342e1254cbc0ff9b4dd3c043b..b78d017341da3cc860c725ec4a95be9ad899b1d9 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -23,7 +23,6 @@ import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
-import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
@@ -54,6 +53,7 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -96,9 +97,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -118,12 +118,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class }, { ClientBackedDataStore.class }
+                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
         });
     }
 
-    @Parameter
+    @Parameter(0)
     public Class<? extends AbstractDataStore> testParameter;
+    @Parameter(1)
+    public int commitTimeout;
 
     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
     private static final String[] CARS = {"cars"};
@@ -136,6 +138,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
+    private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
@@ -197,16 +200,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
             throws Exception {
-        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
 
         leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
         followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
                 testParameter, type, moduleShardsConfig, false, shards);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
+
+        leaderTestKit.waitForMembersUp("member-2");
+        followerTestKit.waitForMembersUp("member-1");
     }
 
     private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
@@ -315,7 +321,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
-        try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
+                commitTimeout)
                 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
@@ -569,8 +576,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         final String testName = "testSingleShardTransactionsWithLeaderChanges";
         initDatastoresWithCars(testName);
 
@@ -603,9 +609,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
-        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+        IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
 
-        try (final AbstractDataStore ds =
+        try (AbstractDataStore ds =
                 newMember1TestKit.setupAbstractDataStore(
                         testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
 
@@ -635,8 +641,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .findLocalShard("cars");
         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
 
-        final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        dataTree.setSchemaContext(SchemaContextHelper.full());
+        final DataTree dataTree = new InMemoryDataTreeFactory().create(
+            DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
 
         // Send a tx with immediate commit.
 
@@ -779,10 +785,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Wait for the commit to be replicated to the follower.
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "people",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
@@ -842,7 +848,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
                 .shardElectionTimeoutFactor(10));
 
-        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
@@ -857,6 +862,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
                 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars")
+                .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+        IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people")
+                .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
 
         followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
         followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
@@ -879,10 +888,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
 
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
-                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
-        try (final AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+                DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100),
+                commitTimeout);
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
                 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
 
+            followerTestKit.waitForMembersUp("member-3");
+            follower2TestKit.waitForMembersUp("member-1", "member-2");
+
             // Create and submit a couple tx's so they're pending.
 
             DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
@@ -985,10 +998,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderTestKit.doCommit(successTxCohort);
     }
 
-    @Test(expected = AskTimeoutException.class)
+    @Test
     public void testTransactionWithShardLeaderNotResponding() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
 
@@ -1010,19 +1022,21 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         try {
             followerTestKit.doCommit(rwTx.ready());
+            fail("Exception expected");
         } catch (final ExecutionException e) {
-            assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
-                    e.getCause() instanceof ShardLeaderNotRespondingException);
-            assertNotNull("Expected a nested cause", e.getCause().getCause());
-            Throwables.propagateIfInstanceOf(e.getCause().getCause(), Exception.class);
-            Throwables.propagate(e.getCause().getCause());
+            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+            if (DistributedDataStore.class.equals(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
+                        || e.getCause() instanceof ShardLeaderNotRespondingException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
-    @Test(expected = NoShardLeaderException.class)
+    @Test
     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
 
         // Do an initial read to get the primary shard info cached.
@@ -1047,27 +1061,31 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         try {
             followerTestKit.doCommit(rwTx.ready());
+            fail("Exception expected");
         } catch (final ExecutionException e) {
-            Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
-            Throwables.propagate(e.getCause());
+            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+            if (DistributedDataStore.class.equals(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
     @Test
     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
-        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
 
         final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
-                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
-                follower2System, follower2DatastoreContextBuilder);
+                follower2System, follower2DatastoreContextBuilder, commitTimeout);
 
-        try (final AbstractDataStore ds =
+        try (AbstractDataStore ds =
                 follower2TestKit.setupAbstractDataStore(
-                        testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
+                        testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
             follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1084,7 +1102,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
-                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
+                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
 
             final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -1103,8 +1121,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
         // install a snapshot to sync the follower.
 
-        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
-        tree.setSchemaContext(SchemaContextHelper.full());
+        DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
+            SchemaContextHelper.full());
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
                 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
@@ -1133,6 +1151,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 initialSnapshot, snapshotRoot);
     }
 
+    @Test
+    public void testReadWriteMessageSlicing() throws Exception {
+        // The slicing is only implemented for tell-based protocol
+        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+
+        leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+        followerDatastoreContextBuilder.maximumMessageSliceSize(100);
+        initDatastoresWithCars("testLargeReadReplySlicing");
+
+        final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        final NormalizedNode<?, ?> carsNode = CarsModel.create();
+        rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+        verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());