*/
public interface FollowerLogInformation {
- enum FollowerState {
- VOTING,
- NON_VOTING,
- VOTING_NOT_INITIALIZED
- };
-
/**
* Increment the value of the nextIndex
*
* Sets the payload data version of the follower.
*/
void setPayloadVersion(short payloadVersion);
-
- /**
- * Sets the state of the follower.
- */
- void setFollowerState(FollowerState state);
-
- /**
- * @return the state of the follower.
- */
- FollowerState getFollowerState();
-
- /**
- * @return true if the follower is in a state where it can participate in leader elections and
- * commitment consensus.
- */
- boolean canParticipateInConsensus();
}
package org.opendaylight.controller.cluster.raft;
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private final String id;
-
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
private short payloadVersion = -1;
- private FollowerState state = FollowerState.VOTING;
+ private final PeerInfo peerInfo;
- public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
- this.id = id;
+ public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
this.context = context;
+ this.peerInfo = Preconditions.checkNotNull(peerInfo);
}
@Override
@Override
public String getId() {
- return id;
+ return peerInfo.getId();
}
@Override
@Override
public boolean isFollowerActive() {
- if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+ if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
@Override
public boolean okToReplicate() {
- if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+ if(peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
this.payloadVersion = payloadVersion;
}
- @Override
- public boolean canParticipateInConsensus() {
- return state == FollowerState.VOTING;
- }
-
- @Override
- public void setFollowerState(FollowerState state) {
- this.state = state;
- }
-
- @Override
- public FollowerState getFollowerState() {
- return state;
- }
-
@Override
public String toString() {
- return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
- + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch="
- + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+ return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+ context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Stores information about a raft peer.
+ *
+ * @author Thomas Pantelis
+ */
+public class PeerInfo {
+ private final String id;
+ private String address;
+ private VotingState votingState;
+
+ public PeerInfo(String id, String address, VotingState votingState) {
+ this.id = id;
+ this.address = address;
+ this.votingState = votingState;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public VotingState getVotingState() {
+ return votingState;
+ }
+
+ public boolean isVoting() {
+ return votingState == VotingState.VOTING;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public void setVotingState(VotingState votingState) {
+ this.votingState = votingState;
+ }
+
+ @Override
+ public String toString() {
+ return "PeerInfo [id=" + id + ", address=" + address + ", votingState=" + votingState + "]";
+ }
+}
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
private void onGetOnDemandRaftStats() {
// Debugging message to retrieve raft stats.
+ Map<String, String> peerAddresses = new HashMap<>();
+ for(String peerId: context.getPeerIds()) {
+ peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ }
+
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(context.getPeerAddresses());
+ .peerAddresses(peerAddresses);
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import java.util.Collection;
-import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
*/
Logger getLogger();
- /**
- * @return a copy of the mapping of peerId's to their addresses
- *
- */
- Map<String, String> getPeerAddresses();
-
/**
* Get the address of the peer as a String. This is the same format in
* which a consumer would provide the address
*/
String getPeerAddress(String peerId);
+ /**
+ * @return list of PeerInfo
+ */
+ Collection<PeerInfo> getPeers();
+
/**
* @return the list of peer IDs.
*/
Collection<String> getPeerIds();
+ /**
+ * Get the PeerInfo for the given peer.
+ *
+ * @param peerId
+ * @return the PeerInfo
+ */
+ PeerInfo getPeerInfo(String peerId);
+
/**
* Add to actor peers
*
* @param name
* @param address
*/
- void addToPeers(String name, String address);
+ void addToPeers(String name, String address, VotingState votingState);
/**
*
package org.opendaylight.controller.cluster.raft;
+import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.actor.UntypedActorContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
private final ActorRef actor;
- private final UntypedActorContext context;
+ private final ActorContext context;
private final String id;
private ReplicatedLog replicatedLog;
- private final Map<String, String> peerAddresses;
+ private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
private final Logger LOG;
private short payloadVersion;
- public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+ public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
this.actor = actor;
this.termInformation = termInformation;
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
- this.peerAddresses = Maps.newHashMap(peerAddresses);
this.configParams = configParams;
this.persistenceProvider = persistenceProvider;
this.LOG = logger;
+
+ for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
+ }
}
void setPayloadVersion(short payloadVersion) {
}
@Override
- public Map<String, String> getPeerAddresses() {
- return new HashMap<String, String>(peerAddresses);
+ public Collection<String> getPeerIds() {
+ return peerInfoMap.keySet();
}
@Override
- public Collection<String> getPeerIds() {
- return peerAddresses.keySet();
+ public Collection<PeerInfo> getPeers() {
+ return peerInfoMap.values();
}
- @Override public String getPeerAddress(String peerId) {
- String peerAddress = peerAddresses.get(peerId);
- if(peerAddress == null) {
- peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
- peerAddresses.put(peerId, peerAddress);
+ @Override
+ public PeerInfo getPeerInfo(String peerId) {
+ return peerInfoMap.get(peerId);
+ }
+
+ @Override
+ public String getPeerAddress(String peerId) {
+ String peerAddress = null;
+ PeerInfo peerInfo = peerInfoMap.get(peerId);
+ if(peerInfo != null) {
+ peerAddress = peerInfo.getAddress();
+ if(peerAddress == null) {
+ peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
+ peerInfo.setAddress(peerAddress);
+ }
}
return peerAddress;
return configParams;
}
- @Override public void addToPeers(String name, String address) {
- peerAddresses.put(name, address);
+ @Override
+ public void addToPeers(String id, String address, VotingState votingState) {
+ peerInfoMap.put(id, new PeerInfo(id, address, votingState));
}
@Override public void removePeer(String name) {
- peerAddresses.remove(name);
+ peerInfoMap.remove(name);
}
@Override public ActorSelection getPeerActorSelection(String peerId) {
@Override
public void setPeerAddress(String peerId, String peerAddress) {
- if(peerAddresses.containsKey(peerId)) {
+ PeerInfo peerInfo = peerInfoMap.get(peerId);
+ if(peerInfo != null) {
LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
- peerAddresses.put(peerId, peerAddress);
+ peerInfo.setAddress(peerAddress);
}
}
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
- raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+ VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
+ VotingState.NON_VOTING;
+ raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
- // if voting member - initialize to VOTING_NOT_INITIALIZED
- FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
- FollowerState.NON_VOTING;
- leader.addFollower(addServer.getNewServerId(), initialState);
+ leader.addFollower(addServer.getNewServerId());
- if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
+ if(votingState == VotingState.VOTING_NOT_INITIALIZED){
LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
addServer.getNewServerId());
// add server operation that timed out.
if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
-
- installSnapshotTimer.cancel();
-
- followerLogInformation.setFollowerState(FollowerState.VOTING);
- leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+ raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
+ leader.updateMinReplicaCount();
persistNewServerConfiguration(raftActor, getAddServerContext());
+
+ installSnapshotTimer.cancel();
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Enumerates voting states for a peer.
+ *
+ * @author Thomas Pantelis
+ */
+public enum VotingState {
+ VOTING,
+ NON_VOTING,
+ VOTING_NOT_INITIALIZED
+}
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
private int minReplicationCount;
- private int minIsolatedLeaderPeerCount;
-
private Optional<SnapshotHolder> snapshot;
public AbstractLeader(RaftActorContext context) {
setLeaderPayloadVersion(context.getPayloadVersion());
- for (String followerId : context.getPeerIds()) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId, -1, context);
-
- followerToLog.put(followerId, followerLogInformation);
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
}
leaderId = context.getId();
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
- updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+ updateMinReplicaCount();
snapshot = Optional.absent();
return followerToLog.keySet();
}
- public void addFollower(String followerId, FollowerState followerState) {
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
- followerLogInformation.setFollowerState(followerState);
+ public void addFollower(String followerId) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
+ context.getPeerInfo(followerId), -1, context);
followerToLog.put(followerId, followerLogInformation);
if(heartbeatSchedule == null) {
followerToLog.remove(followerId);
}
- public void updateMinReplicaCountAndMinIsolatedLeaderPeerCount(){
- minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
+ public void updateMinReplicaCount() {
+ int numVoting = 0;
+ for(PeerInfo peer: context.getPeers()) {
+ if(peer.isVoting()) {
+ numVoting++;
+ }
+ }
+
+ minReplicationCount = getMajorityVoteCount(numVoting);
+ }
- //the isolated Leader peer count will be 1 less than the majority vote count.
+ protected int getMinIsolatedLeaderPeerCount(){
+ //the isolated Leader peer count will be 1 less than the majority vote count.
//this is because the vote count has the self vote counted in it
//for e.g
//0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
//2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
//4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
- minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
- }
- protected int getMinIsolatedLeaderPeerCount(){
- return minIsolatedLeaderPeerCount;
+ return minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
}
@VisibleForTesting
setSnapshot(null);
}
wasLastChunk = true;
- FollowerState followerState = followerLogInformation.getFollowerState();
- if(followerState == FollowerState.VOTING_NOT_INITIALIZED){
+ if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
new UnInitializedFollowerSnapshotReply(followerId);
context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
- ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ String followerId = e.getKey();
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
- if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+ long nextIndex = followerLogInfo.getNextIndex();
+ if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
canInstallSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, e.getKey());
+ sendSnapshotChunk(followerActor, followerId);
}
}
}
}
protected boolean isLeaderIsolated() {
- int minPresent = minIsolatedLeaderPeerCount;
+ int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
if (followerLogInformation.isFollowerActive()) {
--minPresent;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
-import java.util.Map;
+import java.util.HashSet;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
}
public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
- Map<String, String> currentPeers = context.getPeerAddresses();
+ Set<String> currentPeers = new HashSet<>(context.getPeerIds());
for(String peerId: serverConfig.getNewServerConfig()) {
if(!getId().equals(peerId)) {
- if(!currentPeers.containsKey(peerId)) {
- context.addToPeers(peerId, null);
+ if(!currentPeers.contains(peerId)) {
+ context.addToPeers(peerId, null, VotingState.VOTING);
} else {
currentPeers.remove(peerId);
}
}
}
- for(String peerIdToRemove: currentPeers.keySet()) {
+ for(String peerIdToRemove: currentPeers) {
context.removePeer(peerIdToRemove);
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.ArrayList;
import java.util.Collection;
+import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
private final int votesRequired;
- private final Collection<String> peers;
+ private final Collection<String> votingPeers = new ArrayList<>();
public Candidate(RaftActorContext context) {
super(context, RaftState.Candidate);
- peers = context.getPeerIds();
+ for(PeerInfo peer: context.getPeers()) {
+ if(peer.isVoting()) {
+ votingPeers.add(peer.getId());
+ }
+ }
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
+ LOG.debug("{}: Election: Candidate has following voting peers: {}", logName(), votingPeers);
}
- votesRequired = getMajorityVoteCount(peers.size());
+ votesRequired = getMajorityVoteCount(votingPeers.size());
startNewTerm();
- if(peers.isEmpty()){
+ if(votingPeers.isEmpty()){
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
// amount of time TBD
- for (String peerId : peers) {
+ for (String peerId : votingPeers) {
ActorSelection peerActor = context.getPeerActorSelection(peerId);
if(peerActor != null) {
RequestVote requestVote = new RequestVote(
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import scala.concurrent.duration.FiniteDuration;
public class FollowerLogInformationImplTest {
context.setConfigParams(configParams);
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl("follower1", 9, context);
+ new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 9, context);
assertFalse("Follower should be termed inactive before stopwatch starts",
followerLogInformation.isFollowerActive());
public void testOkToReplicate(){
MockRaftActorContext context = new MockRaftActorContext();
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(
- "follower1", 10, context);
+ new FollowerLogInformationImpl(new PeerInfo("follower1", null, VotingState.VOTING), 10, context);
assertTrue(followerLogInformation.okToReplicate());
assertFalse(followerLogInformation.okToReplicate());
@Test
public void testVotingNotInitializedState() {
+ final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.VOTING_NOT_INITIALIZED);
MockRaftActorContext context = new MockRaftActorContext();
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED);
assertFalse(followerLogInformation.okToReplicate());
- assertFalse(followerLogInformation.canParticipateInConsensus());
followerLogInformation.markFollowerActive();
assertFalse(followerLogInformation.isFollowerActive());
- followerLogInformation.setFollowerState(FollowerState.VOTING);
+ peerInfo.setVotingState(VotingState.VOTING);
assertTrue(followerLogInformation.okToReplicate());
- assertTrue(followerLogInformation.canParticipateInConsensus());
followerLogInformation.markFollowerActive();
assertTrue(followerLogInformation.isFollowerActive());
@Test
public void testNonVotingState() {
+ final PeerInfo peerInfo = new PeerInfo("follower1", null, VotingState.NON_VOTING);
MockRaftActorContext context = new MockRaftActorContext();
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- followerLogInformation.setFollowerState(FollowerState.NON_VOTING);
assertTrue(followerLogInformation.okToReplicate());
- assertFalse(followerLogInformation.canParticipateInConsensus());
followerLogInformation.markFollowerActive();
assertTrue(followerLogInformation.isFollowerActive());
import com.google.common.base.Supplier;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
return LoggerFactory.getLogger(getClass());
}
- @Override public Map<String, String> getPeerAddresses() {
- return peerAddresses;
- }
-
@Override
public Collection<String> getPeerIds() {
return peerAddresses.keySet();
}
+ @Override
+ public Collection<PeerInfo> getPeers() {
+ Collection<PeerInfo> peers = new ArrayList<>();
+ for(Map.Entry<String, String> p: peerAddresses.entrySet()) {
+ peers.add(new PeerInfo(p.getKey(), p.getValue(), VotingState.VOTING));
+ }
+
+ return peers;
+ }
+
@Override public String getPeerAddress(String peerId) {
return peerAddresses.get(peerId);
}
- @Override public void addToPeers(String name, String address) {
+ @Override
+ public PeerInfo getPeerInfo(String peerId) {
+ return new PeerInfo(peerId, peerAddresses.get(peerId), VotingState.VOTING);
+ }
+
+ @Override public void addToPeers(String name, String address, VotingState votingState) {
peerAddresses.put(name, address);
}
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
@Test
public void testGetPeerAddress() {
+ Map<String, String> peerMap = new HashMap<>();
+ peerMap.put("peer1", "peerAddress1");
+ peerMap.put("peer2", null);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
"test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
- Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
- new NonPersistentDataProvider(), log);
+ peerMap, configParams, new NonPersistentDataProvider(), log);
assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.ElectionTerm;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContextImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CandidateTest extends AbstractRaftActorBehaviorTest {
+ static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class);
private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("candidate"));
@Test
public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
MockRaftActorContext raftActorContext = createActorContext();
+ raftActorContext.getTermInformation().update(2L, "other");
+ raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
+ createEntries(0, 5, 1).build());
raftActorContext.setPeerAddresses(setupPeers(4));
candidate = new Candidate(raftActorContext);
+ RequestVote requestVote = MessageCollectorActor.expectFirstMatching(peerActors[0], RequestVote.class);
+ assertEquals("getTerm", 3L, requestVote.getTerm());
+ assertEquals("getCandidateId", raftActorContext.getId(), requestVote.getCandidateId());
+ assertEquals("getLastLogTerm", 1L, requestVote.getLastLogTerm());
+ assertEquals("getLastLogIndex", 4L, requestVote.getLastLogIndex());
+
+ MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[3], RequestVote.class);
+
// First peers denies the vote.
candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, false));
assertEquals("Behavior", RaftState.Leader, candidate.state());
}
+ @Test
+ public void testBecomeLeaderOnReceivingMajorityVotesWithNonVotingPeers(){
+ ElectionTerm mockElectionTerm = Mockito.mock(ElectionTerm.class);
+ Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
+ RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
+ "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
+ new NonPersistentDataProvider(), LOG);
+ raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
+ raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
+ candidate = new Candidate(raftActorContext);
+
+ MessageCollectorActor.expectFirstMatching(peerActors[1], RequestVote.class);
+ MessageCollectorActor.expectFirstMatching(peerActors[2], RequestVote.class);
+ MessageCollectorActor.assertNoneMatching(peerActors[0], RequestVote.class, 300);
+ MessageCollectorActor.assertNoneMatching(peerActors[3], RequestVote.class, 100);
+
+ candidate = candidate.handleMessage(peerActors[1], new RequestVoteReply(1, false));
+
+ assertEquals("Behavior", RaftState.Candidate, candidate.state());
+
+ candidate = candidate.handleMessage(peerActors[2], new RequestVoteReply(1, true));
+
+ assertEquals("Behavior", RaftState.Leader, candidate.state());
+ }
+
@Test
public void testResponseToHandleAppendEntriesWithLowerTerm() {
candidate = new Candidate(createActorContext());