Bug 5450: Query akka cluster state on Follower ElectionTimeout 65/43265/6
authorTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 20:10:58 +0000 (16:10 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 11 Aug 2016 20:21:52 +0000 (20:21 +0000)
Added changes to query the akka ClusterState to see if the leader is
actually unreachable or not Up on ElectionTimeout. If not, Follower
reschedules election timer and stays as Follower.

Change-Id: I3a054a82edbe975ad9e27c4d208083b19b392d2d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 0472e2d812a5f7f297667415d96632fc50bcb593..ba6645f2407c55afdbbdae8cffd524dfbcc466a7 100644 (file)
@@ -12,8 +12,10 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.function.LongSupplier;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -57,6 +59,13 @@ public interface RaftActorContext {
      */
     ActorRef getActor();
 
+    /**
+     * The akka Cluster singleton for the actor system if one is configured.
+     *
+     * @return an Optional containing the CLuster instance is present.
+     */
+    Optional<Cluster> getCluster();
+
     /**
      * @return the ElectionTerm information
      */
index 5b53a27d9682fe8b78e738a483b9e950d5f885d0..926e9748d477d8c54fcbde9265197e97a1369063 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -21,6 +22,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.LongSupplier;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -72,6 +74,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private int numVotingPeers = -1;
 
+    private Optional<Cluster> cluster;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -124,6 +128,21 @@ public class RaftActorContextImpl implements RaftActorContext {
         return actor;
     }
 
+    @Override
+    public Optional<Cluster> getCluster() {
+        if(cluster == null) {
+            try {
+                cluster = Optional.of(Cluster.get(getActorSystem()));
+            } catch(Exception e) {
+                // An exception means there's no cluster configured. This will only happen in unit tests.
+                LOG.debug("{}: Could not obtain Cluster: {}", getId(), e);
+                cluster = Optional.empty();
+            }
+        }
+
+        return cluster;
+    }
+
     @Override
     public ElectionTerm getTermInformation() {
         return termInformation;
index 5f37af6e9d828482d16aac95178fc47418f27d6d..73e2cf9bc576d7831f9d12ba42cd849e7c1dea7a 100644 (file)
@@ -9,10 +9,18 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -44,6 +52,8 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
 public class Follower extends AbstractRaftActorBehavior {
     private static final int SYNC_THRESHOLD = 10;
 
+    private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
+
     private final SyncStatusTracker initialSyncStatusTracker;
 
     private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
@@ -53,7 +63,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     };
 
-    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
+    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
     private short leaderPayloadVersion;
@@ -397,16 +407,30 @@ public class Follower extends AbstractRaftActorBehavior {
         // queue but would be processed before the ElectionTimeout message and thus would restart the
         // lastLeaderMessageTimer.
         long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
-        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
-                context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
+                lastLeaderMessageInterval >= electionTimeoutInMillis;
 
         if(canStartElection()) {
-            if(message instanceof TimeoutNow || noLeaderMessageReceived) {
-                LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+            if(message instanceof TimeoutNow) {
+                LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
                 return internalSwitchBehavior(RaftState.Candidate);
+            } else if(noLeaderMessageReceived) {
+                // Check the cluster state to see if the leader is known to be up before we go to Candidate.
+                // However if we haven't heard from the leader in a long time even though the cluster state
+                // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
+                // to Candidate,
+                long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
+                if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+                    LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+                    scheduleElection(electionDuration());
+                } else {
+                    LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+                    return internalSwitchBehavior(RaftState.Candidate);
+                }
             } else {
-                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
-                        logName(), lastLeaderMessageInterval);
+                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+                        logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
                 scheduleElection(electionDuration());
             }
         } else if(message instanceof ElectionTimeout) {
@@ -420,6 +444,55 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean isLeaderAvailabilityKnown() {
+        if(leaderId == null) {
+            return false;
+        }
+
+        Optional<Cluster> cluster = context.getCluster();
+        if(!cluster.isPresent()) {
+            return false;
+        }
+
+        ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
+        if(leaderActor == null) {
+            return false;
+        }
+
+        Address leaderAddress = leaderActor.anchorPath().address();
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+                unreachable);
+
+        for(Member m: unreachable) {
+            if(leaderAddress.equals(m.address())) {
+                LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+                return false;
+            }
+        }
+
+        for(Member m: state.getMembers()) {
+            if(leaderAddress.equals(m.address())) {
+                if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+                            leaderAddress, m.status());
+                    return true;
+                } else {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+                            leaderAddress, m.status());
+                    return false;
+                }
+            }
+        }
+
+        LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
index 8469d027957005d7522b52d5861a6c72837c90ab..44ffdeb97b8dde203fa4314f6709dbf2379b1d19 100644 (file)
@@ -16,9 +16,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
@@ -54,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
@@ -322,25 +320,26 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     }
 
     public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
-        final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
-
-        transaction.getSnapshot().write(id, node);
-        final ShardDataTreeCohort cohort = transaction.ready();
-        immediateCanCommit(cohort);
-        immediatePreCommit(cohort);
-        immediateCommit(cohort);
+            final NormalizedNode<?,?> node) throws Exception {
+        BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
+        DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+        batched.apply(modification);
+        modification.ready();
+        store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification));
     }
 
     public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
-        final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
-
-        transaction.getSnapshot().merge(id, node);
-        final ShardDataTreeCohort cohort = transaction.ready();
-        immediateCanCommit(cohort);
-        immediatePreCommit(cohort);
-        immediateCommit(cohort);
+            final NormalizedNode<?,?> node) throws Exception {
+        final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
+        batched.addModification(new MergeModification(id, node));
+        batched.setReady(true);
+        batched.setDoCommitOnReady(true);
+        batched.setTotalMessagesSent(1);
+
+        DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+        batched.apply(modification);
+        modification.ready();
+        store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification));
     }
 
     public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
index ffe32260422ba1e5283574d0bd8eb4f3ab0cbf97..13b78a01a7a5fb5a1057cec06e646cad304e0b2e 100644 (file)
@@ -162,8 +162,6 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
     private Shard createShard() {
         TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
-        ShardTestKit.waitUntilLeader(actor);
-
         return actor.underlyingActor();
     }
 }
index e7f56baa5f38a8283c086973ed845dcfa3030658..2f7c790269f0f9702157b6468234154e9e1cb43d 100644 (file)
@@ -527,6 +527,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
 
@@ -781,6 +782,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
                 shardElectionTimeoutFactor(10));
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
@@ -957,6 +959,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
@@ -997,6 +1001,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
             JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+            Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));