Convert mdsal submit() calls to commit()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index dad3f1b223cb4a60646c7a90f5d5725bc65e9bb4..aaab6b19e18d21b0d960daaf4db3250eaa76afac 100644 (file)
@@ -24,7 +24,7 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
@@ -42,7 +42,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
@@ -54,6 +53,7 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
@@ -80,26 +80,24 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -119,7 +117,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 120 }
+                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
         });
     }
 
@@ -139,6 +137,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
     private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
+    private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
@@ -182,9 +181,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             leaderDistributedDataStore.close();
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
-        JavaTestKit.shutdownActorSystem(follower2System);
+        TestKit.shutdownActorSystem(leaderSystem);
+        TestKit.shutdownActorSystem(followerSystem);
+        TestKit.shutdownActorSystem(follower2System);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
@@ -316,8 +315,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
+        TestKit.shutdownActorSystem(leaderSystem, Boolean.TRUE);
+        TestKit.shutdownActorSystem(followerSystem, Boolean.TRUE);
 
         final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
@@ -516,7 +515,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-        final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
         final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
                 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
@@ -525,9 +524,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
         try {
-            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            writeTx.commit().get(5, TimeUnit.SECONDS);
             fail("Expected TransactionCommitFailedException");
-        } catch (final TransactionCommitFailedException e) {
+        } catch (final ExecutionException e) {
             // Expected
         }
 
@@ -549,7 +548,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
         final DOMTransactionChain txChain = broker.createTransactionChain(listener);
 
-        final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
         writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
@@ -562,9 +561,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
 
         try {
-            writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+            writeTx.commit().get(5, TimeUnit.SECONDS);
             fail("Expected TransactionCommitFailedException");
-        } catch (final TransactionCommitFailedException e) {
+        } catch (final ExecutionException e) {
             // Expected
         }
 
@@ -576,6 +575,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         final String testName = "testSingleShardTransactionsWithLeaderChanges";
         initDatastoresWithCars(testName);
 
@@ -598,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
                 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
@@ -640,8 +640,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .findLocalShard("cars");
         assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
 
-        final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        dataTree.setSchemaContext(SchemaContextHelper.full());
+        final DataTree dataTree = new InMemoryDataTreeFactory().create(
+            DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
 
         // Send a tx with immediate commit.
 
@@ -784,10 +784,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Wait for the commit to be replicated to the follower.
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         MemberNode.verifyRaftState(followerDistributedDataStore, "people",
-            raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+            raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
@@ -999,6 +999,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithShardLeaderNotResponding() throws Exception {
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
         initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
 
@@ -1009,7 +1010,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Shutdown the leader and try to create a new tx.
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
 
         followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
@@ -1023,17 +1024,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
-                    || e.getCause() instanceof ShardLeaderNotRespondingException);
-            assertEquals(DistributedDataStore.class, testParameter);
-        } catch (final TimeoutException e) {
-            // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
-            assertEquals(ClientBackedDataStore.class, testParameter);
+            if (DistributedDataStore.class.equals(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
+                        || e.getCause() instanceof ShardLeaderNotRespondingException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
     @Test
     public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
+        followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
         initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
 
         // Do an initial read to get the primary shard info cached.
@@ -1043,7 +1045,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Shutdown the leader and try to create a new tx.
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        TestKit.shutdownActorSystem(leaderSystem, true);
 
         Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
@@ -1060,29 +1062,29 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             followerTestKit.doCommit(rwTx.ready());
             fail("Exception expected");
         } catch (final ExecutionException e) {
-            final String msg = "Expected instance of NoShardLeaderException, actual: \n"
-                    + Throwables.getStackTraceAsString(e.getCause());
-            assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
-            assertEquals(DistributedDataStore.class, testParameter);
-        } catch (TimeoutException e) {
-            // ClientBackedDataStore doesn't set cause to ExecutionException, future just time outs
-            assertEquals(ClientBackedDataStore.class, testParameter);
+            final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+            if (DistributedDataStore.class.equals(testParameter)) {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+            } else {
+                assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+            }
         }
     }
 
     @Test
     public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
+        followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
         String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
-        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
+        initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
 
         final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
-                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+                .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
         final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
                 follower2System, follower2DatastoreContextBuilder, commitTimeout);
 
         try (AbstractDataStore ds =
                 follower2TestKit.setupAbstractDataStore(
-                        testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
+                        testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
             follower2TestKit.waitForMembersUp("member-1", "member-2");
@@ -1094,12 +1096,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
             // Shutdown the leader and try to create a new tx.
 
-            JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+            TestKit.shutdownActorSystem(leaderSystem, true);
 
             Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
-                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
+                .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
 
             final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
 
@@ -1118,8 +1120,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
         // install a snapshot to sync the follower.
 
-        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
-        tree.setSchemaContext(SchemaContextHelper.full());
+        DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
+            SchemaContextHelper.full());
 
         final ContainerNode carsNode = CarsModel.newCarsNode(
                 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
@@ -1148,6 +1150,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 initialSnapshot, snapshotRoot);
     }
 
+    @Test
+    public void testReadWriteMessageSlicing() throws Exception {
+        // The slicing is only implemented for tell-based protocol
+        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+
+        leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+        followerDatastoreContextBuilder.maximumMessageSliceSize(100);
+        initDatastoresWithCars("testLargeReadReplySlicing");
+
+        final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        final NormalizedNode<?, ?> carsNode = CarsModel.create();
+        rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+        verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+    }
+
     private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
                                        final NormalizedNode<?, ?> expRoot) {
         assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());