From a7740542c8ce1985c0a35767966c781805dfad84 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 5 Aug 2016 16:10:58 -0400 Subject: [PATCH] Bug 5450: Query akka cluster state on Follower ElectionTimeout Added changes to query the akka ClusterState to see if the leader is actually unreachable or not Up on ElectionTimeout. If not, Follower reschedules election timer and stays as Follower. Change-Id: I3a054a82edbe975ad9e27c4d208083b19b392d2d Signed-off-by: Tom Pantelis --- .../cluster/raft/RaftActorContext.java | 9 ++ .../cluster/raft/RaftActorContextImpl.java | 19 ++++ .../cluster/raft/behaviors/Follower.java | 87 +++++++++++++++++-- .../cluster/datastore/AbstractShardTest.java | 37 ++++---- .../DataChangeListenerSupportTest.java | 2 - ...butedDataStoreRemotingIntegrationTest.java | 6 ++ 6 files changed, 132 insertions(+), 28 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0472e2d812..ba6645f240 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -12,8 +12,10 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.cluster.Cluster; import com.google.common.annotations.VisibleForTesting; import java.util.Collection; +import java.util.Optional; import java.util.function.LongSupplier; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; @@ -57,6 +59,13 @@ public interface RaftActorContext { */ ActorRef getActor(); + /** + * The akka Cluster singleton for the actor system if one is configured. + * + * @return an Optional containing the CLuster instance is present. + */ + Optional getCluster(); + /** * @return the ElectionTerm information */ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 5b53a27d96..926e9748d4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -13,6 +13,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.cluster.Cluster; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -21,6 +22,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.LongSupplier; import org.opendaylight.controller.cluster.DataPersistenceProvider; @@ -72,6 +74,8 @@ public class RaftActorContextImpl implements RaftActorContext { private int numVotingPeers = -1; + private Optional cluster; + public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, ElectionTerm termInformation, long commitIndex, long lastApplied, Map peerAddresses, ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) { @@ -124,6 +128,21 @@ public class RaftActorContextImpl implements RaftActorContext { return actor; } + @Override + public Optional getCluster() { + if(cluster == null) { + try { + cluster = Optional.of(Cluster.get(getActorSystem())); + } catch(Exception e) { + // An exception means there's no cluster configured. This will only happen in unit tests. + LOG.debug("{}: Could not obtain Cluster: {}", getId(), e); + cluster = Optional.empty(); + } + } + + return cluster; + } + @Override public ElectionTerm getTermInformation() { return termInformation; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 5f37af6e9d..73e2cf9bc5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,10 +9,18 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import java.util.ArrayList; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; @@ -44,6 +52,8 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay public class Follower extends AbstractRaftActorBehavior { private static final int SYNC_THRESHOLD = 10; + private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18; + private final SyncStatusTracker initialSyncStatusTracker; private final Procedure appendAndPersistCallback = new Procedure() { @@ -53,7 +63,7 @@ public class Follower extends AbstractRaftActorBehavior { } }; - private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted(); + private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted(); private SnapshotTracker snapshotTracker = null; private String leaderId; private short leaderPayloadVersion; @@ -397,16 +407,30 @@ public class Follower extends AbstractRaftActorBehavior { // queue but would be processed before the ElectionTimeout message and thus would restart the // lastLeaderMessageTimer. long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS); - boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >= - context.getConfigParams().getElectionTimeOutInterval().toMillis(); + long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis(); + boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || + lastLeaderMessageInterval >= electionTimeoutInMillis; if(canStartElection()) { - if(message instanceof TimeoutNow || noLeaderMessageReceived) { - LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName()); + if(message instanceof TimeoutNow) { + LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); + } else if(noLeaderMessageReceived) { + // Check the cluster state to see if the leader is known to be up before we go to Candidate. + // However if we haven't heard from the leader in a long time even though the cluster state + // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch + // to Candidate, + long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR; + if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) { + LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName()); + scheduleElection(electionDuration()); + } else { + LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); + return internalSwitchBehavior(RaftState.Candidate); + } } else { - LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout", - logName(), lastLeaderMessageInterval); + LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}", + logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval()); scheduleElection(electionDuration()); } } else if(message instanceof ElectionTimeout) { @@ -420,6 +444,55 @@ public class Follower extends AbstractRaftActorBehavior { return this; } + private boolean isLeaderAvailabilityKnown() { + if(leaderId == null) { + return false; + } + + Optional cluster = context.getCluster(); + if(!cluster.isPresent()) { + return false; + } + + ActorSelection leaderActor = context.getPeerActorSelection(leaderId); + if(leaderActor == null) { + return false; + } + + Address leaderAddress = leaderActor.anchorPath().address(); + + CurrentClusterState state = cluster.get().state(); + Set unreachable = state.getUnreachable(); + + LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress, + unreachable); + + for(Member m: unreachable) { + if(leaderAddress.equals(m.address())) { + LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress); + return false; + } + } + + for(Member m: state.getMembers()) { + if(leaderAddress.equals(m.address())) { + if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) { + LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(), + leaderAddress, m.status()); + return true; + } else { + LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(), + leaderAddress, m.status()); + return false; + } + } + } + + LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress); + + return false; + } + private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 8469d02795..44ffdeb97b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -16,9 +16,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; -import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit; -import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit; -import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit; import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit; @@ -54,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; @@ -322,25 +320,26 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id, - final NormalizedNode node) throws InterruptedException, ExecutionException { - final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId()); - - transaction.getSnapshot().write(id, node); - final ShardDataTreeCohort cohort = transaction.ready(); - immediateCanCommit(cohort); - immediatePreCommit(cohort); - immediateCommit(cohort); + final NormalizedNode node) throws Exception { + BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1); + DataTreeModification modification = store.getDataTree().takeSnapshot().newModification(); + batched.apply(modification); + modification.ready(); + store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification)); } public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id, - final NormalizedNode node) throws InterruptedException, ExecutionException { - final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId()); - - transaction.getSnapshot().merge(id, node); - final ShardDataTreeCohort cohort = transaction.ready(); - immediateCanCommit(cohort); - immediatePreCommit(cohort); - immediateCommit(cohort); + final NormalizedNode node) throws Exception { + final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION); + batched.addModification(new MergeModification(id, node)); + batched.setReady(true); + batched.setDoCommitOnReady(true); + batched.setTotalMessagesSent(1); + + DataTreeModification modification = store.getDataTree().takeSnapshot().newModification(); + batched.apply(modification); + modification.ready(); + store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification)); } public static void writeToStore(final DataTree store, final YangInstanceIdentifier id, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java index ffe3226042..13b78a01a7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java @@ -162,8 +162,6 @@ public class DataChangeListenerSupportTest extends AbstractShardTest { private Shard createShard() { TestActorRef actor = actorFactory.createTestActor(newShardProps()); - ShardTestKit.waitUntilLeader(actor); - return actor.underlyingActor(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e7f56baa5f..2f7c790269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -527,6 +527,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); @@ -781,6 +782,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()). shardElectionTimeoutFactor(10)); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -957,6 +959,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. @@ -997,6 +1001,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); -- 2.36.6