Implement cluster admin RPCs to change member voting states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 6065103b13dbd4d54d1e1ffb9352e9c2b072952a..2fe3bfdc8764aca78f7cdf942f3ba5dbba96e589 100644 (file)
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
@@ -84,13 +85,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
-    private Cancellable heartbeatSchedule = null;
-
-    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
-    private int minReplicationCount;
+    /**
+     * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+     * expect the entries to be modified in sequence, hence we open-code the lookup.
+     *
+     * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+     *       but we already expect those to be far from frequent.
+     */
+    private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshot;
+    private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state) {
         super(context, state);
@@ -325,7 +331,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
             if (t.getIndex() == logIndex) {
@@ -337,16 +343,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override
-    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
-        for (ClientRequestTracker tracker : trackerList) {
-            if (tracker.getIndex() == logIndex) {
-                return tracker;
-            }
-        }
-        return null;
-    }
-
     @Override
     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
@@ -380,23 +376,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             beforeSendHeartbeat();
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-            return this;
-
         } else if(message instanceof SendInstallSnapshot) {
             // received from RaftActor
             setSnapshot(((SendInstallSnapshot) message).getSnapshot());
             sendInstallSnapshot();
-
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
-
-        } else if (message instanceof InstallSnapshotReply){
+        } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+        } else {
+            return super.handleMessage(sender, message);
         }
 
-
-        return super.handleMessage(sender, message);
+        return this;
     }
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
@@ -497,7 +489,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        trackerList.add(
+        trackers.add(
             new ClientRequestTrackerImpl(replicate.getClientActor(),
                 replicate.getIdentifier(),
                 logIndex)
@@ -783,10 +775,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected boolean isLeaderIsolated() {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            if (followerLogInformation.isFollowerActive()) {
+            final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
-                    break;
+                    return false;
                 }
             }
         }