From 204f45f8b3233dbea87e2c8065914f0d2a0ded07 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 21 Dec 2015 15:42:35 -0500 Subject: [PATCH] Implement suspend leader in Shard Added code in the ShardCommitCoordinator to invoke the Runnable operation that is passed via pauseLeader when pending transactions are complete. A subsequent patch will add a timer to cap the amount of time to wait for pauseLeader to complete. Change-Id: I3fe0b0ce4e025b2f68ce9c0150732bc4eabf5e0a Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 7 +- .../RaftActorLeadershipTransferCohort.java | 7 ++ .../controller/cluster/datastore/Shard.java | 28 +++++--- .../datastore/ShardCommitCoordinator.java | 22 ++++++ .../jmx/mbeans/shard/ShardStats.java | 4 ++ .../jmx/mbeans/shard/ShardStatsMXBean.java | 3 +- ...butedDataStoreRemotingIntegrationTest.java | 67 +++++++++++++++++++ 7 files changed, 127 insertions(+), 11 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 6851f6aab9..610a7d8f2f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -532,7 +532,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected boolean isLeaderActive() { - return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null; + return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && + !isLeadershipTransferInProgress(); + } + + private boolean isLeadershipTransferInProgress() { + return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 623fa4902c..7105714b0b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -57,6 +57,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { private final List onCompleteCallbacks = new ArrayList<>(); private long newLeaderTimeoutInMillis = 2000; private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private boolean isTransferring; RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) { this.raftActor = raftActor; @@ -94,6 +95,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); // Sanity check... if(behavior instanceof Leader) { + isTransferring = true; ((Leader)behavior).transferLeadership(this); } else { LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId()); @@ -144,6 +146,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { } private void finish(boolean success) { + isTransferring = false; if(transferTimer.isRunning()) { transferTimer.stop(); if(success) { @@ -168,6 +171,10 @@ public class RaftActorLeadershipTransferCohort implements Runnable { onCompleteCallbacks.add(onComplete); } + boolean isTransferring() { + return isTransferring; + } + @VisibleForTesting void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7867c91158..ef4bab44f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -284,6 +284,10 @@ public class Shard extends RaftActor { return commitCoordinator.getQueueSize(); } + public int getCohortCacheSize() { + return commitCoordinator.getCohortCacheSize(); + } + @Override protected Optional getRoleChangeNotifier() { return roleChangeNotifier; @@ -436,12 +440,12 @@ public class Shard extends RaftActor { // the primary/leader shard. However with timing and caching on the front-end, there's a small // window where it could have a stale leader during leadership transitions. // - boolean isIsolatedLeader = isIsolatedLeader(); - if (isLeader() && !isIsolatedLeader) { + boolean isLeaderActive = isLeaderActive(); + if (isLeader() && isLeaderActive) { handleBatchedModificationsLocal(batched, getSender()); } else { ActorSelection leader = getLeader(); - if (isIsolatedLeader || leader == null) { + if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(batched, getSender(), "Could not commit transaction " + batched.getTransactionID()); } else { @@ -473,8 +477,8 @@ public class Shard extends RaftActor { private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID()); - boolean isIsolatedLeader = isIsolatedLeader(); - if (isLeader() && !isIsolatedLeader) { + boolean isLeaderActive = isLeaderActive(); + if (isLeader() && isLeaderActive) { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { @@ -484,7 +488,7 @@ public class Shard extends RaftActor { } } else { ActorSelection leader = getLeader(); - if (isIsolatedLeader || leader == null) { + if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), "Could not commit transaction " + message.getTransactionID()); } else { @@ -498,12 +502,12 @@ public class Shard extends RaftActor { private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) { LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID()); - boolean isIsolatedLeader = isIsolatedLeader(); - if (isLeader() && !isIsolatedLeader) { + boolean isLeaderActive = isLeaderActive(); + if (isLeader() && isLeaderActive) { commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); - if (isIsolatedLeader || leader == null) { + if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), "Could not commit transaction " + forwardedReady.getTransactionID()); } else { @@ -722,6 +726,12 @@ public class Shard extends RaftActor { } } + @Override + protected void pauseLeader(Runnable operation) { + LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + commitCoordinator.setRunOnPendingTransactionsComplete(operation); + } + @Override public String persistenceId() { return this.name; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 7c45bd0702..fd55ceeed3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -68,6 +68,8 @@ class ShardCommitCoordinator { private ReadyTransactionReply readyTransactionReply; + private Runnable runOnPendingTransactionsComplete; + ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) { @@ -82,6 +84,10 @@ class ShardCommitCoordinator { return queuedCohortEntries.size(); } + int getCohortCacheSize() { + return cohortCache.size(); + } + void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } @@ -524,12 +530,28 @@ class ShardCommitCoordinator { iter.remove(); cohortCache.remove(next.getTransactionID()); } + + maybeRunOperationOnPendingTransactionsComplete(); } void cleanupExpiredCohortEntries() { maybeProcessNextCohortEntry(); } + void setRunOnPendingTransactionsComplete(Runnable operation) { + runOnPendingTransactionsComplete = operation; + maybeRunOperationOnPendingTransactionsComplete(); + } + + private void maybeRunOperationOnPendingTransactionsComplete() { + if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) { + log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete); + + runOnPendingTransactionsComplete.run(); + runOnPendingTransactionsComplete = null; + } + } + @VisibleForTesting void setCohortDecorator(CohortDecorator cohortDecorator) { this.cohortDecorator = cohortDecorator; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 1e6107c786..0f491570a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -347,6 +347,10 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { return shard.getPendingTxCommitQueueSize(); } + public int getTxCohortCacheSize() { + return shard.getCohortCacheSize(); + } + @Override public void captureSnapshot() { if(shard != null) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java index ecc51b0917..39cc22fc2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java @@ -84,6 +84,7 @@ public interface ShardStatsMXBean { int getPendingTxCommitQueueSize(); - void captureSnapshot(); + int getTxCohortCacheSize(); + void captureSnapshot(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index f7a5630336..492a72b291 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -23,6 +23,7 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; @@ -54,6 +55,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; @@ -84,6 +86,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * End-to-end distributed data store tests that exercise remote shards and transactions. @@ -720,6 +725,68 @@ public class DistributedDataStoreRemotingIntegrationTest { verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); } + @Test + public void testLeadershipTransferOnShutdown() throws Exception { + leaderDatastoreContextBuilder.shardBatchedModificationCount(1); + followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); + String testName = "testLeadershipTransferOnShutdown"; + initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); + + IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); + DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false); + + // Create and submit a couple tx's so they're pending. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); + } + }); + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); + } + }); + + // Gracefully stop the leader via a Shutdown message. + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + ActorRef leaderActor = Await.result(future, duration); + + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown()); + + // Commit the 2 transactions. They should finish and succeed. + + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); + + // Wait for the leader actor stopped. + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + // Verify leadership was transferred by reading the committed data from the other nodes. + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + } + @Test public void testTransactionWithIsolatedLeader() throws Throwable { leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); -- 2.36.6