Implement suspend leader in Shard 36/31936/6
authorTom Pantelis <tpanteli@brocade.com>
Mon, 21 Dec 2015 20:42:35 +0000 (15:42 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 7 Jan 2016 02:35:50 +0000 (02:35 +0000)
Added code in the ShardCommitCoordinator to invoke the Runnable
operation that is passed via pauseLeader when pending transactions are
complete. A subsequent patch will add a timer to cap the amount of time
to wait for pauseLeader to complete.

Change-Id: I3fe0b0ce4e025b2f68ce9c0150732bc4eabf5e0a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 6851f6a..610a7d8 100644 (file)
@@ -532,7 +532,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected boolean isLeaderActive() {
-        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+                !isLeadershipTransferInProgress();
+    }
+
+    private boolean isLeadershipTransferInProgress() {
+        return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
     /**
index 623fa49..7105714 100644 (file)
@@ -57,6 +57,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
     private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private boolean isTransferring;
 
     RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
         this.raftActor = raftActor;
@@ -94,6 +95,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
         // Sanity check...
         if(behavior instanceof Leader) {
+            isTransferring = true;
             ((Leader)behavior).transferLeadership(this);
         } else {
             LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
@@ -144,6 +146,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
     }
 
     private void finish(boolean success) {
+        isTransferring = false;
         if(transferTimer.isRunning()) {
             transferTimer.stop();
             if(success) {
@@ -168,6 +171,10 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
         onCompleteCallbacks.add(onComplete);
     }
 
+    boolean isTransferring() {
+        return isTransferring;
+    }
+
     @VisibleForTesting
     void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
index 7867c91..ef4bab4 100644 (file)
@@ -284,6 +284,10 @@ public class Shard extends RaftActor {
         return commitCoordinator.getQueueSize();
     }
 
+    public int getCohortCacheSize() {
+        return commitCoordinator.getCohortCacheSize();
+    }
+
     @Override
     protected Optional<ActorRef> getRoleChangeNotifier() {
         return roleChangeNotifier;
@@ -436,12 +440,12 @@ public class Shard extends RaftActor {
         // the primary/leader shard. However with timing and caching on the front-end, there's a small
         // window where it could have a stale leader during leadership transitions.
         //
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             handleBatchedModificationsLocal(batched, getSender());
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
                         "Could not commit transaction " + batched.getTransactionID());
             } else {
@@ -473,8 +477,8 @@ public class Shard extends RaftActor {
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
 
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
@@ -484,7 +488,7 @@ public class Shard extends RaftActor {
             }
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
                         "Could not commit transaction " + message.getTransactionID());
             } else {
@@ -498,12 +502,12 @@ public class Shard extends RaftActor {
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
         LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
 
-        boolean isIsolatedLeader = isIsolatedLeader();
-        if (isLeader() && !isIsolatedLeader) {
+        boolean isLeaderActive = isLeaderActive();
+        if (isLeader() && isLeaderActive) {
             commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
         } else {
             ActorSelection leader = getLeader();
-            if (isIsolatedLeader || leader == null) {
+            if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
                         "Could not commit transaction " + forwardedReady.getTransactionID());
             } else {
@@ -722,6 +726,12 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected void pauseLeader(Runnable operation) {
+        LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+        commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+    }
+
     @Override
     public String persistenceId() {
         return this.name;
index 7c45bd0..fd55cee 100644 (file)
@@ -68,6 +68,8 @@ class ShardCommitCoordinator {
 
     private ReadyTransactionReply readyTransactionReply;
 
+    private Runnable runOnPendingTransactionsComplete;
+
     ShardCommitCoordinator(ShardDataTree dataTree,
             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
 
@@ -82,6 +84,10 @@ class ShardCommitCoordinator {
         return queuedCohortEntries.size();
     }
 
+    int getCohortCacheSize() {
+        return cohortCache.size();
+    }
+
     void setQueueCapacity(int queueCapacity) {
         this.queueCapacity = queueCapacity;
     }
@@ -524,12 +530,28 @@ class ShardCommitCoordinator {
             iter.remove();
             cohortCache.remove(next.getTransactionID());
         }
+
+        maybeRunOperationOnPendingTransactionsComplete();
     }
 
     void cleanupExpiredCohortEntries() {
         maybeProcessNextCohortEntry();
     }
 
+    void setRunOnPendingTransactionsComplete(Runnable operation) {
+        runOnPendingTransactionsComplete = operation;
+        maybeRunOperationOnPendingTransactionsComplete();
+    }
+
+    private void maybeRunOperationOnPendingTransactionsComplete() {
+        if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+            log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
+
+            runOnPendingTransactionsComplete.run();
+            runOnPendingTransactionsComplete = null;
+        }
+    }
+
     @VisibleForTesting
     void setCohortDecorator(CohortDecorator cohortDecorator) {
         this.cohortDecorator = cohortDecorator;
index 1e6107c..0f49157 100644 (file)
@@ -347,6 +347,10 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         return shard.getPendingTxCommitQueueSize();
     }
 
+    public int getTxCohortCacheSize() {
+        return shard.getCohortCacheSize();
+    }
+
     @Override
     public void captureSnapshot() {
         if(shard != null) {
index f7a5630..492a72b 100644 (file)
@@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
 import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
@@ -54,6 +55,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -84,6 +86,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * End-to-end distributed data store tests that exercise remote shards and transactions.
@@ -720,6 +725,68 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
+    @Test
+    public void testLeadershipTransferOnShutdown() throws Exception {
+        leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
+        String testName = "testLeadershipTransferOnShutdown";
+        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
+
+        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
+        DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+                MODULE_SHARDS_CARS_PEOPLE_1_2_3, false);
+
+        // Create and submit a couple tx's so they're pending.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+        DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
+            }
+        });
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        writeTx.write(CarsModel.newCarPath("optima"), car);
+        DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
+            }
+        });
+
+        // Gracefully stop the leader via a Shutdown message.
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+        ActorRef leaderActor = Await.result(future, duration);
+
+        Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+        // Commit the 2 transactions. They should finish and succeed.
+
+        followerTestKit.doCommit(cohort1);
+        followerTestKit.doCommit(cohort2);
+
+        // Wait for the leader actor stopped.
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        // Verify leadership was transferred by reading the committed data from the other nodes.
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
+        verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
+    }
+
     @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);