Bug 5450: Query akka cluster state on Follower ElectionTimeout 65/43265/6
authorTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 20:10:58 +0000 (16:10 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 11 Aug 2016 20:21:52 +0000 (20:21 +0000)
Added changes to query the akka ClusterState to see if the leader is
actually unreachable or not Up on ElectionTimeout. If not, Follower
reschedules election timer and stays as Follower.

Change-Id: I3a054a82edbe975ad9e27c4d208083b19b392d2d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 0472e2d..ba6645f 100644 (file)
@@ -12,8 +12,10 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.function.LongSupplier;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -57,6 +59,13 @@ public interface RaftActorContext {
      */
     ActorRef getActor();
 
+    /**
+     * The akka Cluster singleton for the actor system if one is configured.
+     *
+     * @return an Optional containing the CLuster instance is present.
+     */
+    Optional<Cluster> getCluster();
+
     /**
      * @return the ElectionTerm information
      */
index 5b53a27..926e974 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -21,6 +22,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.LongSupplier;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -72,6 +74,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private int numVotingPeers = -1;
 
+    private Optional<Cluster> cluster;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -124,6 +128,21 @@ public class RaftActorContextImpl implements RaftActorContext {
         return actor;
     }
 
+    @Override
+    public Optional<Cluster> getCluster() {
+        if(cluster == null) {
+            try {
+                cluster = Optional.of(Cluster.get(getActorSystem()));
+            } catch(Exception e) {
+                // An exception means there's no cluster configured. This will only happen in unit tests.
+                LOG.debug("{}: Could not obtain Cluster: {}", getId(), e);
+                cluster = Optional.empty();
+            }
+        }
+
+        return cluster;
+    }
+
     @Override
     public ElectionTerm getTermInformation() {
         return termInformation;
index 5f37af6..73e2cf9 100644 (file)
@@ -9,10 +9,18 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -44,6 +52,8 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
 public class Follower extends AbstractRaftActorBehavior {
     private static final int SYNC_THRESHOLD = 10;
 
+    private static final long MAX_ELECTION_TIMEOUT_FACTOR = 18;
+
     private final SyncStatusTracker initialSyncStatusTracker;
 
     private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
@@ -53,7 +63,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     };
 
-    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
+    private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
     private short leaderPayloadVersion;
@@ -397,16 +407,30 @@ public class Follower extends AbstractRaftActorBehavior {
         // queue but would be processed before the ElectionTimeout message and thus would restart the
         // lastLeaderMessageTimer.
         long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
-        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
-                context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
+        boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() ||
+                lastLeaderMessageInterval >= electionTimeoutInMillis;
 
         if(canStartElection()) {
-            if(message instanceof TimeoutNow || noLeaderMessageReceived) {
-                LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+            if(message instanceof TimeoutNow) {
+                LOG.debug("{}: Received TimeoutNow - switching to Candidate", logName());
                 return internalSwitchBehavior(RaftState.Candidate);
+            } else if(noLeaderMessageReceived) {
+                // Check the cluster state to see if the leader is known to be up before we go to Candidate.
+                // However if we haven't heard from the leader in a long time even though the cluster state
+                // indicates it's up then something is wrong - leader might be stuck indefinitely - so switch
+                // to Candidate,
+                long maxElectionTimeout = electionTimeoutInMillis * MAX_ELECTION_TIMEOUT_FACTOR;
+                if(isLeaderAvailabilityKnown() && lastLeaderMessageInterval < maxElectionTimeout) {
+                    LOG.debug("{}: Received ElectionTimeout but leader appears to be available", logName());
+                    scheduleElection(electionDuration());
+                } else {
+                    LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+                    return internalSwitchBehavior(RaftState.Candidate);
+                }
             } else {
-                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
-                        logName(), lastLeaderMessageInterval);
+                LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout {}",
+                        logName(), lastLeaderMessageInterval, context.getConfigParams().getElectionTimeOutInterval());
                 scheduleElection(electionDuration());
             }
         } else if(message instanceof ElectionTimeout) {
@@ -420,6 +444,55 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    private boolean isLeaderAvailabilityKnown() {
+        if(leaderId == null) {
+            return false;
+        }
+
+        Optional<Cluster> cluster = context.getCluster();
+        if(!cluster.isPresent()) {
+            return false;
+        }
+
+        ActorSelection leaderActor = context.getPeerActorSelection(leaderId);
+        if(leaderActor == null) {
+            return false;
+        }
+
+        Address leaderAddress = leaderActor.anchorPath().address();
+
+        CurrentClusterState state = cluster.get().state();
+        Set<Member> unreachable = state.getUnreachable();
+
+        LOG.debug("{}: Checking for leader {} in the cluster unreachable set {}", logName(), leaderAddress,
+                unreachable);
+
+        for(Member m: unreachable) {
+            if(leaderAddress.equals(m.address())) {
+                LOG.info("{}: Leader {} is unreachable", logName(), leaderAddress);
+                return false;
+            }
+        }
+
+        for(Member m: state.getMembers()) {
+            if(leaderAddress.equals(m.address())) {
+                if(m.status() == MemberStatus.up() || m.status() == MemberStatus.weaklyUp()) {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is available", logName(),
+                            leaderAddress, m.status());
+                    return true;
+                } else {
+                    LOG.debug("{}: Leader {} cluster status is {} - leader is unavailable", logName(),
+                            leaderAddress, m.status());
+                    return false;
+                }
+            }
+        }
+
+        LOG.debug("{}: Leader {} not found in the cluster member set", logName(), leaderAddress);
+
+        return false;
+    }
+
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
index 8469d02..44ffdeb 100644 (file)
@@ -16,9 +16,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
-import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCanCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
@@ -54,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
@@ -322,25 +320,26 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     }
 
     public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
-        final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
-
-        transaction.getSnapshot().write(id, node);
-        final ShardDataTreeCohort cohort = transaction.ready();
-        immediateCanCommit(cohort);
-        immediatePreCommit(cohort);
-        immediateCommit(cohort);
+            final NormalizedNode<?,?> node) throws Exception {
+        BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
+        DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+        batched.apply(modification);
+        modification.ready();
+        store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification));
     }
 
     public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
-        final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(nextTransactionId());
-
-        transaction.getSnapshot().merge(id, node);
-        final ShardDataTreeCohort cohort = transaction.ready();
-        immediateCanCommit(cohort);
-        immediatePreCommit(cohort);
-        immediateCommit(cohort);
+            final NormalizedNode<?,?> node) throws Exception {
+        final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
+        batched.addModification(new MergeModification(id, node));
+        batched.setReady(true);
+        batched.setDoCommitOnReady(true);
+        batched.setTotalMessagesSent(1);
+
+        DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
+        batched.apply(modification);
+        modification.ready();
+        store.applyForeignCandidate(batched.getTransactionID(), store.getDataTree().prepare(modification));
     }
 
     public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
index ffe3226..13b78a0 100644 (file)
@@ -162,8 +162,6 @@ public class DataChangeListenerSupportTest extends AbstractShardTest {
 
     private Shard createShard() {
         TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
-        ShardTestKit.waitUntilLeader(actor);
-
         return actor.underlyingActor();
     }
 }
index e7f56ba..2f7c790 100644 (file)
@@ -527,6 +527,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
 
@@ -781,6 +782,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
                 shardElectionTimeoutFactor(10));
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
@@ -957,6 +959,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
@@ -997,6 +1001,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
             JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+            Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.