Introduce PeerInfo and VotingState 20/28720/10
authorTom Pantelis <tpanteli@brocade.com>
Fri, 23 Oct 2015 20:09:38 +0000 (16:09 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 28 Oct 2015 18:24:22 +0000 (18:24 +0000)
We need to store the voting state for each per so I created a
PeerInfo class to include, id, address and voting state (represented by a
VotingState enum). The RaftActorContext now stores PeerInfo instances
in its peer map and added methods to access PeerInfo. As a consequence,
RaftActorContext#getPeerAddresses was no longer needed and was removed.

AbstractLeader and Candidate were modified to utilize the PeerInfo to
calculate the majority vote/min replication count, ie ignore non-voting peers.

Previously we had added a FollowerState enum and stored it in the
FollowerLogInformation. Since voting state is now stored in the
RaftActorContext peer info, I removed the FollowerState from
FollowerLogInformation to avoid redundancy and having to keep both
up to date.

Change-Id: I1394511a8db7f0b9df3ed7879c77c1f44f3b143d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
15 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java

index b2173c2baf3d9caec83207c28a383590aea61640..6618a97f21f0d1b7145abc77171a950d9bf9d1e8 100644 (file)
@@ -12,12 +12,6 @@ package org.opendaylight.controller.cluster.raft;
  */
 public interface FollowerLogInformation {
 
-    enum FollowerState {
-        VOTING,
-        NON_VOTING,
-        VOTING_NOT_INITIALIZED
-    };
-
     /**
      * Increment the value of the nextIndex
      *
@@ -114,20 +108,4 @@ public interface FollowerLogInformation {
      * Sets the payload data version of the follower.
      */
     void setPayloadVersion(short payloadVersion);
-
-    /**
-     * Sets the state of the follower.
-     */
-    void setFollowerState(FollowerState state);
-
-    /**
-     * @return the state of the follower.
-     */
-    FollowerState getFollowerState();
-
-    /**
-     * @return true if the follower is in a state where it can participate in leader elections and
-     *              commitment consensus.
-     */
-    boolean canParticipateInConsensus();
 }
index 5bf37d6534e5b7b38e5664f67b2dbf0d2d86f113..1c8d5e6e10647bcbac9ad7f55dd247e6f51f94c6 100644 (file)
@@ -8,12 +8,11 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
-    private final String id;
-
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
     private final RaftActorContext context;
@@ -28,13 +27,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private short payloadVersion = -1;
 
-    private FollowerState state = FollowerState.VOTING;
+    private final PeerInfo peerInfo;
 
-    public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
-        this.id = id;
+    public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
         this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
         this.context = context;
+        this.peerInfo = Preconditions.checkNotNull(peerInfo);
     }
 
     @Override
@@ -74,7 +73,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public String getId() {
-        return id;
+        return peerInfo.getId();
     }
 
     @Override
@@ -89,7 +88,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean isFollowerActive() {
-        if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
@@ -120,7 +119,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean okToReplicate() {
-        if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+        if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
@@ -154,26 +153,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         this.payloadVersion = payloadVersion;
     }
 
-    @Override
-    public boolean canParticipateInConsensus() {
-        return state == FollowerState.VOTING;
-    }
-
-    @Override
-    public void setFollowerState(FollowerState state) {
-        this.state = state;
-    }
-
-    @Override
-    public FollowerState getFollowerState() {
-        return state;
-    }
-
     @Override
     public String toString() {
-        return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
-                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch="
-                + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+        return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
+                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
                 + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java
new file mode 100644 (file)
index 0000000..3d15d88
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Stores information about a raft peer.
+ *
+ * @author Thomas Pantelis
+ */
+public class PeerInfo {
+    private final String id;
+    private String address;
+    private VotingState votingState;
+
+    public PeerInfo(String id, String address, VotingState votingState) {
+        this.id = id;
+        this.address = address;
+        this.votingState = votingState;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public VotingState getVotingState() {
+        return votingState;
+    }
+
+    public boolean isVoting() {
+        return votingState == VotingState.VOTING;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public void setVotingState(VotingState votingState) {
+        this.votingState = votingState;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerInfo [id=" + id + ", address=" + address + ", votingState=" + votingState + "]";
+    }
+}
index 106bbe1549c4c440200ce986695c162fb71dcf3b..efcf27da20283bda77cb6b2ac2fa8bc3bbd700ad 100644 (file)
@@ -19,6 +19,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -273,6 +274,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
+        Map<String, String> peerAddresses = new HashMap<>();
+        for(String peerId: context.getPeerIds()) {
+            peerAddresses.put(peerId, context.getPeerAddress(peerId));
+        }
+
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
@@ -288,7 +294,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
-                .peerAddresses(context.getPeerAddresses());
+                .peerAddresses(peerAddresses);
 
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
         if (lastLogEntry != null) {
index 0c9d698cd5404f0675d11df935351df529b448f2..c325875950169854685c2746178524155eb5f6fc 100644 (file)
@@ -15,7 +15,6 @@ import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Collection;
-import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
@@ -103,12 +102,6 @@ public interface RaftActorContext {
      */
     Logger getLogger();
 
-    /**
-     * @return a copy of the mapping of peerId's to their addresses
-     *
-     */
-    Map<String, String> getPeerAddresses();
-
     /**
      * Get the address of the peer as a String. This is the same format in
      * which a consumer would provide the address
@@ -119,18 +112,31 @@ public interface RaftActorContext {
      */
     String getPeerAddress(String peerId);
 
+    /**
+     * @return list of PeerInfo
+     */
+    Collection<PeerInfo> getPeers();
+
     /**
      * @return the list of peer IDs.
      */
     Collection<String> getPeerIds();
 
+    /**
+     * Get the PeerInfo for the given peer.
+     *
+     * @param peerId
+     * @return the PeerInfo
+     */
+    PeerInfo getPeerInfo(String peerId);
+
     /**
      * Add to actor peers
      *
      * @param name
      * @param address
      */
-    void addToPeers(String name, String address);
+    void addToPeers(String name, String address, VotingState votingState);
 
     /**
      *
index cebf7b3cc66626b3bbe7742ca4e459b956398c33..bbeaddb240c2ab9bb220178c04c9627c3826bd94 100644 (file)
@@ -8,20 +8,17 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.actor.UntypedActorContext;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
@@ -30,7 +27,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ActorRef actor;
 
-    private final UntypedActorContext context;
+    private final ActorContext context;
 
     private final String id;
 
@@ -42,7 +39,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private ReplicatedLog replicatedLog;
 
-    private final Map<String, String> peerAddresses;
+    private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
 
     private final Logger LOG;
 
@@ -59,7 +56,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private short payloadVersion;
 
-    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+    public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
         this.actor = actor;
@@ -68,10 +65,13 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.termInformation = termInformation;
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.peerAddresses = Maps.newHashMap(peerAddresses);
         this.configParams = configParams;
         this.persistenceProvider = persistenceProvider;
         this.LOG = logger;
+
+        for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
+            peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
+        }
     }
 
     void setPayloadVersion(short payloadVersion) {
@@ -150,20 +150,30 @@ public class RaftActorContextImpl implements RaftActorContext {
     }
 
     @Override
-    public Map<String, String> getPeerAddresses() {
-        return new HashMap<String, String>(peerAddresses);
+    public Collection<String> getPeerIds() {
+        return peerInfoMap.keySet();
     }
 
     @Override
-    public Collection<String> getPeerIds() {
-        return peerAddresses.keySet();
+    public Collection<PeerInfo> getPeers() {
+        return peerInfoMap.values();
     }
 
-    @Override public String getPeerAddress(String peerId) {
-        String peerAddress = peerAddresses.get(peerId);
-        if(peerAddress == null) {
-            peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
-            peerAddresses.put(peerId, peerAddress);
+    @Override
+    public PeerInfo getPeerInfo(String peerId) {
+        return peerInfoMap.get(peerId);
+    }
+
+    @Override
+    public String getPeerAddress(String peerId) {
+        String peerAddress = null;
+        PeerInfo peerInfo = peerInfoMap.get(peerId);
+        if(peerInfo != null) {
+            peerAddress = peerInfo.getAddress();
+            if(peerAddress == null) {
+                peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
+                peerInfo.setAddress(peerAddress);
+            }
         }
 
         return peerAddress;
@@ -173,12 +183,13 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
-    @Override public void addToPeers(String name, String address) {
-        peerAddresses.put(name, address);
+    @Override
+    public void addToPeers(String id, String address, VotingState votingState) {
+        peerInfoMap.put(id, new PeerInfo(id, address, votingState));
     }
 
     @Override public void removePeer(String name) {
-        peerAddresses.remove(name);
+        peerInfoMap.remove(name);
     }
 
     @Override public ActorSelection getPeerActorSelection(String peerId) {
@@ -191,9 +202,10 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     @Override
     public void setPeerAddress(String peerId, String peerAddress) {
-        if(peerAddresses.containsKey(peerId)) {
+        PeerInfo peerInfo = peerInfoMap.get(peerId);
+        if(peerInfo != null) {
             LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
-            peerAddresses.put(peerId, peerAddress);
+            peerInfo.setAddress(peerAddress);
         }
     }
 
index 7c8888b9f4c60735adfc8fed6e39df97244d1c29..7c37b2a68daa62768407bfd66365a2b0509f1b8e 100644 (file)
@@ -18,7 +18,6 @@ import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
@@ -276,14 +275,13 @@ class RaftActorServerConfigurationSupport {
 
             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
 
-            raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+            VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
+                    VotingState.NON_VOTING;
+            raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
 
-            // if voting member - initialize to VOTING_NOT_INITIALIZED
-            FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
-                FollowerState.NON_VOTING;
-            leader.addFollower(addServer.getNewServerId(), initialState);
+            leader.addFollower(addServer.getNewServerId());
 
-            if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
+            if(votingState == VotingState.VOTING_NOT_INITIALIZED){
                 LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
                         addServer.getNewServerId());
 
@@ -345,14 +343,12 @@ class RaftActorServerConfigurationSupport {
             // add server operation that timed out.
             if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-                FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
-
-                installSnapshotTimer.cancel();
-
-                followerLogInformation.setFollowerState(FollowerState.VOTING);
-                leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+                raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
+                leader.updateMinReplicaCount();
 
                 persistNewServerConfiguration(raftActor, getAddServerContext());
+
+                installSnapshotTimer.cancel();
             }
         }
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java
new file mode 100644 (file)
index 0000000..c5a22ff
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Enumerates voting states for a peer.
+ *
+ * @author Thomas Pantelis
+ */
+public enum VotingState {
+    VOTING,
+    NON_VOTING,
+    VOTING_NOT_INITIALIZED
+}
index b80a1bab15a9d248fa43a8628fdf810ef5bb0289..4ea02db2d7a9ed5db91cba9ee90ca7ac92b4e55f 100644 (file)
@@ -28,12 +28,13 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -88,8 +89,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private int minReplicationCount;
 
-    private int minIsolatedLeaderPeerCount;
-
     private Optional<SnapshotHolder> snapshot;
 
     public AbstractLeader(RaftActorContext context) {
@@ -97,18 +96,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         setLeaderPayloadVersion(context.getPayloadVersion());
 
-        for (String followerId : context.getPeerIds()) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId, -1, context);
-
-            followerToLog.put(followerId, followerLogInformation);
+        for(PeerInfo peerInfo: context.getPeers()) {
+            FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+            followerToLog.put(peerInfo.getId(), followerLogInformation);
         }
 
         leaderId = context.getId();
 
         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
 
-        updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+        updateMinReplicaCount();
 
         snapshot = Optional.absent();
 
@@ -131,9 +128,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.keySet();
     }
 
-    public void addFollower(String followerId, FollowerState followerState) {
-        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
-        followerLogInformation.setFollowerState(followerState);
+    public void addFollower(String followerId) {
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
+                context.getPeerInfo(followerId), -1, context);
         followerToLog.put(followerId, followerLogInformation);
 
         if(heartbeatSchedule == null) {
@@ -145,20 +142,26 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerToLog.remove(followerId);
     }
 
-    public void updateMinReplicaCountAndMinIsolatedLeaderPeerCount(){
-        minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
+    public void updateMinReplicaCount() {
+        int numVoting = 0;
+        for(PeerInfo peer: context.getPeers()) {
+            if(peer.isVoting()) {
+                numVoting++;
+            }
+        }
+
+        minReplicationCount = getMajorityVoteCount(numVoting);
+    }
 
-        //the isolated Leader peer count will be 1 less than the majority vote count.
+    protected int getMinIsolatedLeaderPeerCount(){
+      //the isolated Leader peer count will be 1 less than the majority vote count.
         //this is because the vote count has the self vote counted in it
         //for e.g
         //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
         //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
         //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
-        minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
-    }
 
-    protected int getMinIsolatedLeaderPeerCount(){
-        return minIsolatedLeaderPeerCount;
+        return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
     }
 
     @VisibleForTesting
@@ -432,8 +435,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         setSnapshot(null);
                     }
                     wasLastChunk = true;
-                    FollowerState followerState = followerLogInformation.getFollowerState();
-                    if(followerState == FollowerState.VOTING_NOT_INITIALIZED){
+                    if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
                         UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
                                              new UnInitializedFollowerSnapshotReply(followerId);
                         context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
@@ -650,14 +652,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendInstallSnapshot() {
         LOG.debug("{}: sendInstallSnapshot", logName());
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
-            ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+            String followerId = e.getKey();
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
             FollowerLogInformation followerLogInfo = e.getValue();
 
             if (followerActor != null) {
-                long nextIndex = e.getValue().getNextIndex();
-                if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+                long nextIndex = followerLogInfo.getNextIndex();
+                if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
                         canInstallSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, e.getKey());
+                    sendSnapshotChunk(followerActor, followerId);
                 }
             }
         }
@@ -759,7 +762,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     protected boolean isLeaderIsolated() {
-        int minPresent = minIsolatedLeaderPeerCount;
+        int minPresent = getMinIsolatedLeaderPeerCount();
         for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
             if (followerLogInformation.isFollowerActive()) {
                 --minPresent;
index 8cb011f7a71d1b54b76bba86a261612f4f555bb0..751aeeb15e25b4edf5beebe00424879d780f92a0 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
-import java.util.Map;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -19,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -493,18 +495,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
-        Map<String, String> currentPeers = context.getPeerAddresses();
+        Set<String> currentPeers = new HashSet<>(context.getPeerIds());
         for(String peerId: serverConfig.getNewServerConfig()) {
             if(!getId().equals(peerId)) {
-                if(!currentPeers.containsKey(peerId)) {
-                    context.addToPeers(peerId, null);
+                if(!currentPeers.contains(peerId)) {
+                    context.addToPeers(peerId, null, VotingState.VOTING);
                 } else {
                     currentPeers.remove(peerId);
                 }
             }
         }
 
-        for(String peerIdToRemove: currentPeers.keySet()) {
+        for(String peerIdToRemove: currentPeers) {
             context.removePeer(peerIdToRemove);
         }
     }
index 8baf0d8e815d96957c1d4e6cee3ad55b1bd21f07..7f73d0cfb0a24a8b85960c2b1fdb031c3131cfdd 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.ArrayList;
 import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -44,22 +46,26 @@ public class Candidate extends AbstractRaftActorBehavior {
 
     private final int votesRequired;
 
-    private final Collection<String> peers;
+    private final Collection<String> votingPeers = new ArrayList<>();
 
     public Candidate(RaftActorContext context) {
         super(context, RaftState.Candidate);
 
-        peers = context.getPeerIds();
+        for(PeerInfo peer: context.getPeers()) {
+            if(peer.isVoting()) {
+                votingPeers.add(peer.getId());
+            }
+        }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
+            LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
         }
 
-        votesRequired = getMajorityVoteCount(peers.size());
+        votesRequired = getMajorityVoteCount(votingPeers.size());
 
         startNewTerm();
 
-        if(peers.isEmpty()){
+        if(votingPeers.isEmpty()){
             actor().tell(ELECTION_TIMEOUT, actor());
         } else {
             scheduleElection(electionDuration());
@@ -170,7 +176,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
-        for (String peerId : peers) {
+        for (String peerId : votingPeers) {
             ActorSelection peerActor = context.getPeerActorSelection(peerId);
             if(peerActor != null) {
                 RequestVote requestVote = new RequestVote(
index 75496f97526d70eeef6fbdb1fd49b8127550b79a..b89f28f40166c113243972267e51e5b1ba595577 100644 (file)
@@ -13,7 +13,6 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImplTest {
@@ -30,7 +29,7 @@ public class FollowerLogInformationImplTest {
         context.setConfigParams(configParams);
 
         FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl("follower1", 9, context);
+                new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 9, context);
 
         assertFalse("Follower should be termed inactive before stopwatch starts",
                 followerLogInformation.isFollowerActive());
@@ -66,8 +65,7 @@ public class FollowerLogInformationImplTest {
     public void testOkToReplicate(){
         MockRaftActorContext context = new MockRaftActorContext();
         FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(
-                        "follower1", 10, context);
+                new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 10, context);
 
         assertTrue(followerLogInformation.okToReplicate());
         assertFalse(followerLogInformation.okToReplicate());
@@ -83,19 +81,17 @@ public class FollowerLogInformationImplTest {
 
     @Test
     public void testVotingNotInitializedState() {
+        final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.VOTING_NOT_INITIALIZED);
         MockRaftActorContext context = new MockRaftActorContext();
-        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
 
-        followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED);
         assertFalse(followerLogInformation.okToReplicate());
-        assertFalse(followerLogInformation.canParticipateInConsensus());
 
         followerLogInformation.markFollowerActive();
         assertFalse(followerLogInformation.isFollowerActive());
 
-        followerLogInformation.setFollowerState(FollowerState.VOTING);
+        peerInfo.setVotingState(VotingState.VOTING);
         assertTrue(followerLogInformation.okToReplicate());
-        assertTrue(followerLogInformation.canParticipateInConsensus());
 
         followerLogInformation.markFollowerActive();
         assertTrue(followerLogInformation.isFollowerActive());
@@ -103,12 +99,11 @@ public class FollowerLogInformationImplTest {
 
     @Test
     public void testNonVotingState() {
+        final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.NON_VOTING);
         MockRaftActorContext context = new MockRaftActorContext();
-        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
 
-        followerLogInformation.setFollowerState(FollowerState.NON_VOTING);
         assertTrue(followerLogInformation.okToReplicate());
-        assertFalse(followerLogInformation.canParticipateInConsensus());
 
         followerLogInformation.markFollowerActive();
         assertTrue(followerLogInformation.isFollowerActive());
index 2a1a94c549458a8515da3e8fbcfd4e8d04d63d1b..228bc8741745b34d04017cd643824ff9a930e116 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -163,20 +164,31 @@ public class MockRaftActorContext implements RaftActorContext {
         return LoggerFactory.getLogger(getClass());
     }
 
-    @Override public Map<String, String> getPeerAddresses() {
-        return peerAddresses;
-    }
-
     @Override
     public Collection<String> getPeerIds() {
         return peerAddresses.keySet();
     }
 
+    @Override
+    public Collection<PeerInfo> getPeers() {
+        Collection<PeerInfo> peers = new ArrayList<>();
+        for(Map.Entry<String, String> p: peerAddresses.entrySet()) {
+            peers.add(new PeerInfo(p.getKey(), p.getValue(), VotingState.VOTING));
+        }
+
+        return peers;
+    }
+
     @Override public String getPeerAddress(String peerId) {
         return peerAddresses.get(peerId);
     }
 
-    @Override public void addToPeers(String name, String address) {
+    @Override
+    public PeerInfo getPeerInfo(String peerId) {
+        return new PeerInfo(peerId, peerAddresses.get(peerId), VotingState.VOTING);
+    }
+
+    @Override public void addToPeers(String name, String address, VotingState votingState) {
         peerAddresses.put(name, address);
     }
 
index 5275a303d37d7a54d73286056f8db625a8f6c5dc..a2f9e78769420e5675183fa54c1fc6b123b4c21d 100644 (file)
@@ -18,6 +18,8 @@ import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
@@ -45,11 +47,13 @@ public class RaftActorContextImplTest extends AbstractActorTest {
 
     @Test
     public void testGetPeerAddress() {
+        Map<String, String> peerMap = new HashMap<>();
+        peerMap.put("peer1", "peerAddress1");
+        peerMap.put("peer2", null);
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
-                Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), log);
+                peerMap, configParams, new NonPersistentDataProvider(), log);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
index c67395b9ded202b8a179241d9f46e3b804703cb3..8dd81dab8cf2c79d225bcff4dd0d77b649292150 100644 (file)
@@ -23,10 +23,16 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.ElectionTerm;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContextImpl;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -34,8 +40,11 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
+    static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class);
 
     private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("candidate"));
@@ -114,9 +123,22 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
         MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.getTermInformation().update(2L, "other");
+        raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
+                createEntries(0, 5, 1).build());
         raftActorContext.setPeerAddresses(setupPeers(4));
         candidate = new Candidate(raftActorContext);
 
+        RequestVote requestVote = MessageCollectorActor.expectFirstMatching(peerActors[0], RequestVote.class);
+        assertEquals("getTerm", 3L, requestVote.getTerm());
+        assertEquals("getCandidateId", raftActorContext.getId(), requestVote.getCandidateId());
+        assertEquals("getLastLogTerm", 1L, requestVote.getLastLogTerm());
+        assertEquals("getLastLogIndex", 4L, requestVote.getLastLogIndex());
+
+        MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+        MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+        MessageCollectorActor.expectFirstMatching(peerActors[3], RequestVote.class);
+
         // First peers denies the vote.
         candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, false));
 
@@ -131,6 +153,32 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         assertEquals("Behavior", RaftState.Leader, candidate.state());
     }
 
+    @Test
+    public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers(){
+        ElectionTerm mockElectionTerm = Mockito.mock(ElectionTerm.class);
+        Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
+        RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
+                "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
+                new NonPersistentDataProvider(), LOG);
+        raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
+        raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
+        candidate = new Candidate(raftActorContext);
+
+        MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+        MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+        MessageCollectorActor.assertNoneMatching(peerActors[0], RequestVote.class, 300);
+        MessageCollectorActor.assertNoneMatching(peerActors[3], RequestVote.class, 100);
+
+        candidate = candidate.handleMessage(peerActors[1], new RequestVoteReply(1, false));
+
+        assertEquals("Behavior", RaftState.Candidate, candidate.state());
+
+        candidate = candidate.handleMessage(peerActors[2], new RequestVoteReply(1, true));
+
+        assertEquals("Behavior", RaftState.Leader, candidate.state());
+    }
+
     @Test
     public void testResponseToHandleAppendEntriesWithLowerTerm() {
         candidate = new Candidate(createActorContext());