Deprecate 3PC protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 3c4c9046c82c4d48e0504cfe4677231e2ef50c25..499d5e22b99463245747d679d4244d1a4725c0b3 100644 (file)
@@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
 import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
@@ -54,6 +55,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -84,6 +86,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * End-to-end distributed data store tests that exercise remote shards and transactions.
@@ -563,7 +568,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
 
@@ -626,7 +631,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
 
@@ -720,13 +725,83 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
-    @Test(expected=NoShardLeaderException.class)
+    @Test
+    public void testLeadershipTransferOnShutdown() throws Exception {
+        leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
+        String testName = "testLeadershipTransferOnShutdown";
+        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
+
+        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
+        DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+                MODULE_SHARDS_CARS_PEOPLE_1_2_3, false);
+
+        // Create and submit a couple tx's so they're pending.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+        DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
+            }
+        });
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        writeTx.write(CarsModel.newCarPath("optima"), car);
+        DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
+            }
+        });
+
+        // Gracefully stop the leader via a Shutdown message.
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+        ActorRef leaderActor = Await.result(future, duration);
+
+        Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+        // Commit the 2 transactions. They should finish and succeed.
+
+        followerTestKit.doCommit(cohort1);
+        followerTestKit.doCommit(cohort2);
+
+        // Wait for the leader actor stopped.
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        // Verify leadership was transferred by reading the committed data from the other nodes.
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
+        verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
+    }
+
+    @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
         String testName = "testTransactionWithIsolatedLeader";
         initDatastoresWithCars(testName);
 
-        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+        DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
+        followerDistributedDataStore.close();
+        followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
 
         MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
             @Override
@@ -735,14 +810,22 @@ public class DistributedDataStoreRemotingIntegrationTest {
             }
         });
 
-        DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
-        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-
         try {
-            followerTestKit.doCommit(writeTx.ready());
+            leaderTestKit.doCommit(failWriteTx.ready());
+            fail("Expected NoShardLeaderException");
         } catch (ExecutionException e) {
-            throw e.getCause();
+            assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
         }
+
+        sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+                shardElectionTimeoutFactor(100));
+
+        DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
+                MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
+
+        leaderTestKit.doCommit(writeTxCohort);
     }
 
     @Test(expected=AskTimeoutException.class)