From 9491b06df9419e58db3089a4c5cd9f5407cb9aac Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 23 Oct 2015 16:09:38 -0400 Subject: [PATCH] Introduce PeerInfo and VotingState We need to store the voting state for each per so I created a PeerInfo class to include, id, address and voting state (represented by a VotingState enum). The RaftActorContext now stores PeerInfo instances in its peer map and added methods to access PeerInfo. As a consequence, RaftActorContext#getPeerAddresses was no longer needed and was removed. AbstractLeader and Candidate were modified to utilize the PeerInfo to calculate the majority vote/min replication count, ie ignore non-voting peers. Previously we had added a FollowerState enum and stored it in the FollowerLogInformation. Since voting state is now stored in the RaftActorContext peer info, I removed the FollowerState from FollowerLogInformation to avoid redundancy and having to keep both up to date. Change-Id: I1394511a8db7f0b9df3ed7879c77c1f44f3b143d Signed-off-by: Tom Pantelis --- .../cluster/raft/FollowerLogInformation.java | 22 -------- .../raft/FollowerLogInformationImpl.java | 36 ++++-------- .../controller/cluster/raft/PeerInfo.java | 54 ++++++++++++++++++ .../controller/cluster/raft/RaftActor.java | 8 ++- .../cluster/raft/RaftActorContext.java | 22 +++++--- .../cluster/raft/RaftActorContextImpl.java | 56 +++++++++++-------- .../RaftActorServerConfigurationSupport.java | 22 +++----- .../controller/cluster/raft/VotingState.java | 19 +++++++ .../raft/behaviors/AbstractLeader.java | 55 +++++++++--------- .../behaviors/AbstractRaftActorBehavior.java | 12 ++-- .../cluster/raft/behaviors/Candidate.java | 18 ++++-- .../raft/FollowerLogInformationImplTest.java | 19 +++---- .../cluster/raft/MockRaftActorContext.java | 22 ++++++-- .../raft/RaftActorContextImplTest.java | 8 ++- .../cluster/raft/behaviors/CandidateTest.java | 48 ++++++++++++++++ 15 files changed, 273 insertions(+), 148 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index b2173c2baf..6618a97f21 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -12,12 +12,6 @@ package org.opendaylight.controller.cluster.raft; */ public interface FollowerLogInformation { - enum FollowerState { - VOTING, - NON_VOTING, - VOTING_NOT_INITIALIZED - }; - /** * Increment the value of the nextIndex * @@ -114,20 +108,4 @@ public interface FollowerLogInformation { * Sets the payload data version of the follower. */ void setPayloadVersion(short payloadVersion); - - /** - * Sets the state of the follower. - */ - void setFollowerState(FollowerState state); - - /** - * @return the state of the follower. - */ - FollowerState getFollowerState(); - - /** - * @return true if the follower is in a state where it can participate in leader elections and - * commitment consensus. - */ - boolean canParticipateInConsensus(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 5bf37d6534..1c8d5e6e10 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -8,12 +8,11 @@ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; public class FollowerLogInformationImpl implements FollowerLogInformation { - private final String id; - private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RaftActorContext context; @@ -28,13 +27,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private short payloadVersion = -1; - private FollowerState state = FollowerState.VOTING; + private final PeerInfo peerInfo; - public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { - this.id = id; + public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) { this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; this.context = context; + this.peerInfo = Preconditions.checkNotNull(peerInfo); } @Override @@ -74,7 +73,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public String getId() { - return id; + return peerInfo.getId(); } @Override @@ -89,7 +88,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { - if(state == FollowerState.VOTING_NOT_INITIALIZED) { + if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } @@ -120,7 +119,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean okToReplicate() { - if(state == FollowerState.VOTING_NOT_INITIALIZED) { + if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } @@ -154,26 +153,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { this.payloadVersion = payloadVersion; } - @Override - public boolean canParticipateInConsensus() { - return state == FollowerState.VOTING; - } - - @Override - public void setFollowerState(FollowerState state) { - this.state = state; - } - - @Override - public FollowerState getFollowerState() { - return state; - } - @Override public String toString() { - return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch=" - + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java new file mode 100644 index 0000000000..3d15d88e38 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerInfo.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +/** + * Stores information about a raft peer. + * + * @author Thomas Pantelis + */ +public class PeerInfo { + private final String id; + private String address; + private VotingState votingState; + + public PeerInfo(String id, String address, VotingState votingState) { + this.id = id; + this.address = address; + this.votingState = votingState; + } + + public String getId() { + return id; + } + + public String getAddress() { + return address; + } + + public VotingState getVotingState() { + return votingState; + } + + public boolean isVoting() { + return votingState == VotingState.VOTING; + } + + public void setAddress(String address) { + this.address = address; + } + + public void setVotingState(VotingState votingState) { + this.votingState = votingState; + } + + @Override + public String toString() { + return "PeerInfo [id=" + id + ", address=" + address + ", votingState=" + votingState + "]"; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 106bbe1549..efcf27da20 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,6 +19,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -273,6 +274,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onGetOnDemandRaftStats() { // Debugging message to retrieve raft stats. + Map peerAddresses = new HashMap<>(); + for(String peerId: context.getPeerIds()) { + peerAddresses.put(peerId, context.getPeerAddress(peerId)); + } + OnDemandRaftState.Builder builder = OnDemandRaftState.builder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) @@ -288,7 +294,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) - .peerAddresses(context.getPeerAddresses()); + .peerAddresses(peerAddresses); ReplicatedLogEntry lastLogEntry = getLastLogEntry(); if (lastLogEntry != null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0c9d698cd5..c325875950 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -15,7 +15,6 @@ import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.util.Collection; -import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -103,12 +102,6 @@ public interface RaftActorContext { */ Logger getLogger(); - /** - * @return a copy of the mapping of peerId's to their addresses - * - */ - Map getPeerAddresses(); - /** * Get the address of the peer as a String. This is the same format in * which a consumer would provide the address @@ -119,18 +112,31 @@ public interface RaftActorContext { */ String getPeerAddress(String peerId); + /** + * @return list of PeerInfo + */ + Collection getPeers(); + /** * @return the list of peer IDs. */ Collection getPeerIds(); + /** + * Get the PeerInfo for the given peer. + * + * @param peerId + * @return the PeerInfo + */ + PeerInfo getPeerInfo(String peerId); + /** * Add to actor peers * * @param name * @param address */ - void addToPeers(String name, String address); + void addToPeers(String name, String address, VotingState votingState); /** * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index cebf7b3cc6..bbeaddb240 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -8,20 +8,17 @@ package org.opendaylight.controller.cluster.raft; +import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.actor.UntypedActorContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; -import com.google.common.collect.Maps; - import java.util.Collection; import java.util.HashMap; import java.util.Map; - import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -30,7 +27,7 @@ public class RaftActorContextImpl implements RaftActorContext { private final ActorRef actor; - private final UntypedActorContext context; + private final ActorContext context; private final String id; @@ -42,7 +39,7 @@ public class RaftActorContextImpl implements RaftActorContext { private ReplicatedLog replicatedLog; - private final Map peerAddresses; + private final Map peerInfoMap = new HashMap<>(); private final Logger LOG; @@ -59,7 +56,7 @@ public class RaftActorContextImpl implements RaftActorContext { private short payloadVersion; - public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, + public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, ElectionTerm termInformation, long commitIndex, long lastApplied, Map peerAddresses, ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) { this.actor = actor; @@ -68,10 +65,13 @@ public class RaftActorContextImpl implements RaftActorContext { this.termInformation = termInformation; this.commitIndex = commitIndex; this.lastApplied = lastApplied; - this.peerAddresses = Maps.newHashMap(peerAddresses); this.configParams = configParams; this.persistenceProvider = persistenceProvider; this.LOG = logger; + + for(Map.Entry e: peerAddresses.entrySet()) { + peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING)); + } } void setPayloadVersion(short payloadVersion) { @@ -150,20 +150,30 @@ public class RaftActorContextImpl implements RaftActorContext { } @Override - public Map getPeerAddresses() { - return new HashMap(peerAddresses); + public Collection getPeerIds() { + return peerInfoMap.keySet(); } @Override - public Collection getPeerIds() { - return peerAddresses.keySet(); + public Collection getPeers() { + return peerInfoMap.values(); } - @Override public String getPeerAddress(String peerId) { - String peerAddress = peerAddresses.get(peerId); - if(peerAddress == null) { - peerAddress = configParams.getPeerAddressResolver().resolve(peerId); - peerAddresses.put(peerId, peerAddress); + @Override + public PeerInfo getPeerInfo(String peerId) { + return peerInfoMap.get(peerId); + } + + @Override + public String getPeerAddress(String peerId) { + String peerAddress = null; + PeerInfo peerInfo = peerInfoMap.get(peerId); + if(peerInfo != null) { + peerAddress = peerInfo.getAddress(); + if(peerAddress == null) { + peerAddress = configParams.getPeerAddressResolver().resolve(peerId); + peerInfo.setAddress(peerAddress); + } } return peerAddress; @@ -173,12 +183,13 @@ public class RaftActorContextImpl implements RaftActorContext { return configParams; } - @Override public void addToPeers(String name, String address) { - peerAddresses.put(name, address); + @Override + public void addToPeers(String id, String address, VotingState votingState) { + peerInfoMap.put(id, new PeerInfo(id, address, votingState)); } @Override public void removePeer(String name) { - peerAddresses.remove(name); + peerInfoMap.remove(name); } @Override public ActorSelection getPeerActorSelection(String peerId) { @@ -191,9 +202,10 @@ public class RaftActorContextImpl implements RaftActorContext { @Override public void setPeerAddress(String peerId, String peerAddress) { - if(peerAddresses.containsKey(peerId)) { + PeerInfo peerInfo = peerInfoMap.get(peerId); + if(peerInfo != null) { LOG.info("Peer address for peer {} set to {}", peerId, peerAddress); - peerAddresses.put(peerId, peerAddress); + peerInfo.setAddress(peerAddress); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 7c8888b9f4..7c37b2a68d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; @@ -276,14 +275,13 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); - raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress()); + VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED : + VotingState.NON_VOTING; + raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState); - // if voting member - initialize to VOTING_NOT_INITIALIZED - FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED : - FollowerState.NON_VOTING; - leader.addFollower(addServer.getNewServerId(), initialState); + leader.addFollower(addServer.getNewServerId()); - if(initialState == FollowerState.VOTING_NOT_INITIALIZED){ + if(votingState == VotingState.VOTING_NOT_INITIALIZED){ LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(), addServer.getNewServerId()); @@ -345,14 +343,12 @@ class RaftActorServerConfigurationSupport { // add server operation that timed out. if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - FollowerLogInformation followerLogInformation = leader.getFollower(followerId); - - installSnapshotTimer.cancel(); - - followerLogInformation.setFollowerState(FollowerState.VOTING); - leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); + raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); + leader.updateMinReplicaCount(); persistNewServerConfiguration(raftActor, getAddServerContext()); + + installSnapshotTimer.cancel(); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java new file mode 100644 index 0000000000..c5a22ff7d4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/VotingState.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +/** + * Enumerates voting states for a peer. + * + * @author Thomas Pantelis + */ +public enum VotingState { + VOTING, + NON_VOTING, + VOTING_NOT_INITIALIZED +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index b80a1bab15..4ea02db2d7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -28,12 +28,13 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -88,8 +89,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private int minReplicationCount; - private int minIsolatedLeaderPeerCount; - private Optional snapshot; public AbstractLeader(RaftActorContext context) { @@ -97,18 +96,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setLeaderPayloadVersion(context.getPayloadVersion()); - for (String followerId : context.getPeerIds()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, -1, context); - - followerToLog.put(followerId, followerLogInformation); + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); } leaderId = context.getId(); LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); - updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); + updateMinReplicaCount(); snapshot = Optional.absent(); @@ -131,9 +128,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - public void addFollower(String followerId, FollowerState followerState) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); - followerLogInformation.setFollowerState(followerState); + public void addFollower(String followerId) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( + context.getPeerInfo(followerId), -1, context); followerToLog.put(followerId, followerLogInformation); if(heartbeatSchedule == null) { @@ -145,20 +142,26 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.remove(followerId); } - public void updateMinReplicaCountAndMinIsolatedLeaderPeerCount(){ - minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); + public void updateMinReplicaCount() { + int numVoting = 0; + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + numVoting++; + } + } + + minReplicationCount = getMajorityVoteCount(numVoting); + } - //the isolated Leader peer count will be 1 less than the majority vote count. + protected int getMinIsolatedLeaderPeerCount(){ + //the isolated Leader peer count will be 1 less than the majority vote count. //this is because the vote count has the self vote counted in it //for e.g //0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0 //2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1 //4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2 - minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; - } - protected int getMinIsolatedLeaderPeerCount(){ - return minIsolatedLeaderPeerCount; + return minReplicationCount > 0 ? (minReplicationCount - 1) : 0; } @VisibleForTesting @@ -432,8 +435,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setSnapshot(null); } wasLastChunk = true; - FollowerState followerState = followerLogInformation.getFollowerState(); - if(followerState == FollowerState.VOTING_NOT_INITIALIZED){ + if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = new UnInitializedFollowerSnapshotReply(followerId); context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); @@ -650,14 +652,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { - ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + String followerId = e.getKey(); + ActorSelection followerActor = context.getPeerActorSelection(followerId); FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED || + long nextIndex = followerLogInfo.getNextIndex(); + if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED || canInstallSnapshot(nextIndex)) { - sendSnapshotChunk(followerActor, e.getKey()); + sendSnapshotChunk(followerActor, followerId); } } } @@ -759,7 +762,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } protected boolean isLeaderIsolated() { - int minPresent = minIsolatedLeaderPeerCount; + int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { if (followerLogInformation.isFollowerActive()) { --minPresent; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 8cb011f7a7..751aeeb15e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; -import java.util.Map; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; @@ -19,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -493,18 +495,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } public void applyServerConfiguration(ServerConfigurationPayload serverConfig) { - Map currentPeers = context.getPeerAddresses(); + Set currentPeers = new HashSet<>(context.getPeerIds()); for(String peerId: serverConfig.getNewServerConfig()) { if(!getId().equals(peerId)) { - if(!currentPeers.containsKey(peerId)) { - context.addToPeers(peerId, null); + if(!currentPeers.contains(peerId)) { + context.addToPeers(peerId, null, VotingState.VOTING); } else { currentPeers.remove(peerId); } } } - for(String peerIdToRemove: currentPeers.keySet()) { + for(String peerIdToRemove: currentPeers) { context.removePeer(peerIdToRemove); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 8baf0d8e81..7f73d0cfb0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import java.util.ArrayList; import java.util.Collection; +import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -44,22 +46,26 @@ public class Candidate extends AbstractRaftActorBehavior { private final int votesRequired; - private final Collection peers; + private final Collection votingPeers = new ArrayList<>(); public Candidate(RaftActorContext context) { super(context, RaftState.Candidate); - peers = context.getPeerIds(); + for(PeerInfo peer: context.getPeers()) { + if(peer.isVoting()) { + votingPeers.add(peer.getId()); + } + } if(LOG.isDebugEnabled()) { - LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers); + LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers); } - votesRequired = getMajorityVoteCount(peers.size()); + votesRequired = getMajorityVoteCount(votingPeers.size()); startNewTerm(); - if(peers.isEmpty()){ + if(votingPeers.isEmpty()){ actor().tell(ELECTION_TIMEOUT, actor()); } else { scheduleElection(electionDuration()); @@ -170,7 +176,7 @@ public class Candidate extends AbstractRaftActorBehavior { // Request for a vote // TODO: Retry request for vote if replies do not arrive in a reasonable // amount of time TBD - for (String peerId : peers) { + for (String peerId : votingPeers) { ActorSelection peerActor = context.getPeerActorSelection(peerId); if(peerActor != null) { RequestVote requestVote = new RequestVote( diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java index 75496f9752..b89f28f401 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -13,7 +13,6 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import scala.concurrent.duration.FiniteDuration; public class FollowerLogInformationImplTest { @@ -30,7 +29,7 @@ public class FollowerLogInformationImplTest { context.setConfigParams(configParams); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl("follower1", 9, context); + new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 9, context); assertFalse("Follower should be termed inactive before stopwatch starts", followerLogInformation.isFollowerActive()); @@ -66,8 +65,7 @@ public class FollowerLogInformationImplTest { public void testOkToReplicate(){ MockRaftActorContext context = new MockRaftActorContext(); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl( - "follower1", 10, context); + new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 10, context); assertTrue(followerLogInformation.okToReplicate()); assertFalse(followerLogInformation.okToReplicate()); @@ -83,19 +81,17 @@ public class FollowerLogInformationImplTest { @Test public void testVotingNotInitializedState() { + final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.VOTING_NOT_INITIALIZED); MockRaftActorContext context = new MockRaftActorContext(); - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); - followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED); assertFalse(followerLogInformation.okToReplicate()); - assertFalse(followerLogInformation.canParticipateInConsensus()); followerLogInformation.markFollowerActive(); assertFalse(followerLogInformation.isFollowerActive()); - followerLogInformation.setFollowerState(FollowerState.VOTING); + peerInfo.setVotingState(VotingState.VOTING); assertTrue(followerLogInformation.okToReplicate()); - assertTrue(followerLogInformation.canParticipateInConsensus()); followerLogInformation.markFollowerActive(); assertTrue(followerLogInformation.isFollowerActive()); @@ -103,12 +99,11 @@ public class FollowerLogInformationImplTest { @Test public void testNonVotingState() { + final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.NON_VOTING); MockRaftActorContext context = new MockRaftActorContext(); - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context); + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); - followerLogInformation.setFollowerState(FollowerState.NON_VOTING); assertTrue(followerLogInformation.okToReplicate()); - assertFalse(followerLogInformation.canParticipateInConsensus()); followerLogInformation.markFollowerActive(); assertTrue(followerLogInformation.isFollowerActive()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 2a1a94c549..228bc87417 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.protobuf.GeneratedMessage; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -163,20 +164,31 @@ public class MockRaftActorContext implements RaftActorContext { return LoggerFactory.getLogger(getClass()); } - @Override public Map getPeerAddresses() { - return peerAddresses; - } - @Override public Collection getPeerIds() { return peerAddresses.keySet(); } + @Override + public Collection getPeers() { + Collection peers = new ArrayList<>(); + for(Map.Entry p: peerAddresses.entrySet()) { + peers.add(new PeerInfo(p.getKey(), p.getValue(), VotingState.VOTING)); + } + + return peers; + } + @Override public String getPeerAddress(String peerId) { return peerAddresses.get(peerId); } - @Override public void addToPeers(String name, String address) { + @Override + public PeerInfo getPeerInfo(String peerId) { + return new PeerInfo(peerId, peerAddresses.get(peerId), VotingState.VOTING); + } + + @Override public void addToPeers(String name, String address, VotingState votingState) { peerAddresses.put(name, address); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java index 5275a303d3..a2f9e78769 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java @@ -18,6 +18,8 @@ import akka.actor.Props; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.util.HashMap; +import java.util.Map; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.NonPersistentDataProvider; @@ -45,11 +47,13 @@ public class RaftActorContextImplTest extends AbstractActorTest { @Test public void testGetPeerAddress() { + Map peerMap = new HashMap<>(); + peerMap.put("peer1", "peerAddress1"); + peerMap.put("peer2", null); DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1, - Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), configParams, - new NonPersistentDataProvider(), log); + peerMap, configParams, new NonPersistentDataProvider(), log); assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1")); assertEquals("getPeerAddress", null, context.getPeerAddress("peer2")); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index c67395b9de..8dd81dab8c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -23,10 +23,16 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.ElectionTerm; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorContextImpl; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -34,8 +40,11 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CandidateTest extends AbstractRaftActorBehaviorTest { + static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class); private final TestActorRef candidateActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("candidate")); @@ -114,9 +123,22 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Test public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){ MockRaftActorContext raftActorContext = createActorContext(); + raftActorContext.getTermInformation().update(2L, "other"); + raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder(). + createEntries(0, 5, 1).build()); raftActorContext.setPeerAddresses(setupPeers(4)); candidate = new Candidate(raftActorContext); + RequestVote requestVote = MessageCollectorActor.expectFirstMatching(peerActors[0], RequestVote.class); + assertEquals("getTerm", 3L, requestVote.getTerm()); + assertEquals("getCandidateId", raftActorContext.getId(), requestVote.getCandidateId()); + assertEquals("getLastLogTerm", 1L, requestVote.getLastLogTerm()); + assertEquals("getLastLogIndex", 4L, requestVote.getLastLogIndex()); + + MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class); + MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class); + MessageCollectorActor.expectFirstMatching(peerActors[3], RequestVote.class); + // First peers denies the vote. candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, false)); @@ -131,6 +153,32 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { assertEquals("Behavior", RaftState.Leader, candidate.state()); } + @Test + public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers(){ + ElectionTerm mockElectionTerm = Mockito.mock(ElectionTerm.class); + Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm(); + RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(), + "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(), + new NonPersistentDataProvider(), LOG); + raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING); + raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING); + candidate = new Candidate(raftActorContext); + + MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class); + MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class); + MessageCollectorActor.assertNoneMatching(peerActors[0], RequestVote.class, 300); + MessageCollectorActor.assertNoneMatching(peerActors[3], RequestVote.class, 100); + + candidate = candidate.handleMessage(peerActors[1], new RequestVoteReply(1, false)); + + assertEquals("Behavior", RaftState.Candidate, candidate.state()); + + candidate = candidate.handleMessage(peerActors[2], new RequestVoteReply(1, true)); + + assertEquals("Behavior", RaftState.Leader, candidate.state()); + } + @Test public void testResponseToHandleAppendEntriesWithLowerTerm() { candidate = new Candidate(createActorContext()); -- 2.36.6