Implement cluster admin RPCs to change member voting states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 83d0a52b90a1d7e0a622229efa04fa68bff8eda4..2fe3bfdc8764aca78f7cdf942f3ba5dbba96e589 100644 (file)
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
@@ -84,26 +85,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
-    private Cancellable heartbeatSchedule = null;
-
-    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
-    private int minReplicationCount;
+    /**
+     * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+     * expect the entries to be modified in sequence, hence we open-code the lookup.
+     *
+     * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+     *       but we already expect those to be far from frequent.
+     */
+    private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshot;
+    private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state) {
         super(context, state);
 
-        setLeaderPayloadVersion(context.getPayloadVersion());
-
         for(PeerInfo peerInfo: context.getPeers()) {
             FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
             followerToLog.put(peerInfo.getId(), followerLogInformation);
         }
 
-        leaderId = context.getId();
-
         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
 
         updateMinReplicaCount();
@@ -329,7 +331,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        final Iterator<ClientRequestTracker> it = trackers.iterator();
         while (it.hasNext()) {
             final ClientRequestTracker t = it.next();
             if (t.getIndex() == logIndex) {
@@ -341,16 +343,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override
-    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
-        for (ClientRequestTracker tracker : trackerList) {
-            if (tracker.getIndex() == logIndex) {
-                return tracker;
-            }
-        }
-        return null;
-    }
-
     @Override
     protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
@@ -384,23 +376,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             beforeSendHeartbeat();
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-            return this;
-
         } else if(message instanceof SendInstallSnapshot) {
             // received from RaftActor
             setSnapshot(((SendInstallSnapshot) message).getSnapshot());
             sendInstallSnapshot();
-
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
-
-        } else if (message instanceof InstallSnapshotReply){
+        } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+        } else {
+            return super.handleMessage(sender, message);
         }
 
-
-        return super.handleMessage(sender, message);
+        return this;
     }
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
@@ -501,7 +489,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        trackerList.add(
+        trackers.add(
             new ClientRequestTrackerImpl(replicate.getClientActor(),
                 replicate.getIdentifier(),
                 logIndex)
@@ -770,22 +758,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         stopHeartBeat();
     }
 
     @Override
-    public String getLeaderId() {
+    public final String getLeaderId() {
         return context.getId();
     }
 
+    @Override
+    public final short getLeaderPayloadVersion() {
+        return context.getPayloadVersion();
+    }
+
     protected boolean isLeaderIsolated() {
         int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            if (followerLogInformation.isFollowerActive()) {
+            final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+            if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
                 --minPresent;
                 if (minPresent == 0) {
-                    break;
+                    return false;
                 }
             }
         }