Follow-up patch to https://git.opendaylight.org/gerrit/#/c/28018/.
Got the unit tests working and added more unit tests to cover more code.
Also fixed several bugs in the code that were failing the tests. One bug
was caused by replicating data quickly after install snapshot was
complete. On the final install snapshot chunk the follower sends an
ApplySnaphot message to persist and apply the snapshot. On the reply,
the leader assumes the follower is up-to-date and sets its next index.
However, applying the snapshot, ie updating the log and commit index, is
actually done after the async callback from the snapshot persist. In between
that time, if the leader sends the server config AppendEntries, the follower's
log is still empty and it deems itself out-of-sync and reports back failure.
This will cause the leader to eventually send a new install snaphot
which isn't which is not desirable. Also it may delay consensus for the
server config entry.
To fix this, I delayed the final InstallSnapshotReply until after the
ApplySnapshot is complete. I did this by adding a Callback to the
ApplySnapshot message which the SnapshotManager invokes.
Also the new server config was constructed without the leader's ID - it
needs to contain all members.
Also the ServerConfigurationPayload wasn't being applied in the
followers.
Another issue was that, if the leader had no peers initially, the
heartbeat wasn't scheduled so, when the new server was added, heartbeats
weren't occurring. So I change addFollower to schedule the heartbeat.
I added a test for adding a non-voting server which caused an endless
loop in AbstractLeader#handleAppendEntriesReply where it updates the
commitIndex based on the replicated count. To fix this, I added a break
if the replicatedLogEntry is null.
Change-Id: I5dff351140c611d58357cd58900bed401606038c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
@Override
public void handleCommand(final Object message) {
- if (message instanceof ApplyState){
+ if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ return;
+ } else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- boolean result = serverConfigurationSupport.handleMessage(message, this, getSender());
- if(result){
- return;
- }
-
long elapsedTime = (System.nanoTime() - applyState.getStartTime());
if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message) &&
- !serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ } else if(!snapshotSupport.handleSnapshotMessage(message)) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
*/
package org.opendaylight.controller.cluster.raft;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
-import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import scala.concurrent.duration.FiniteDuration;
/**
// snapshot installation is successful
onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
return true;
- } else if(message instanceof ApplyState){
- ApplyState applyState = (ApplyState) message;
- Payload data = applyState.getReplicatedLogEntry().getData();
- if( data instanceof ServerConfigurationPayload){
- LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
- (ServerConfigurationPayload)data);
- // respond ok to follower
- respondToClient(raftActor, ServerChangeStatus.OK);
- return true;
- }
- return false;
+ } else if(message instanceof ApplyState) {
+ return onApplyState((ApplyState) message, raftActor);
} else {
return false;
}
}
+ private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+ Payload data = applyState.getReplicatedLogEntry().getData();
+ if(data instanceof ServerConfigurationPayload) {
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
+ LOG.info("{} has been successfully replicated to a majority of followers", data);
+
+ // respond ok to follower
+ respondToClient(raftActor, ServerChangeStatus.OK);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
- LOG.debug("onAddServer: {}", addServer);
+ LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
return;
}
CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
- boolean process = !followerInfoQueue.isEmpty();
+ boolean process = followerInfoQueue.isEmpty();
followerInfoQueue.add(followerInfo);
if(process) {
processAddServer(raftActor);
}
}
+ /**
+ * The algorithm for AddServer is as follows:
+ * <ul>
+ * <li>Add the new server as a peer.</li>
+ * <li>Add the new follower to the leader.</li>
+ * <li>If new server should be voting member</li>
+ * <ul>
+ * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
+ * <li>Initiate install snapshot to the new follower.</li>
+ * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
+ * </ul>
+ * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
+ * <li>On replication consensus, respond to caller with OK.</li>
+ * </ul>
+ * If the install snapshot times out after a period of 2 * election time out
+ * <ul>
+ * <li>Remove the new server as a peer.</li>
+ * <li>Remove the new follower from the leader.</li>
+ * <li>Respond to caller with TIMEOUT.</li>
+ * </ul>
+ */
private void processAddServer(RaftActor raftActor){
- LOG.debug("In processAddServer");
+ LOG.debug("{}: In processAddServer", context.getId());
+
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- AddServer addSrv = followerInfoQueue.peek().getAddServer();
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ AddServer addSrv = followerInfo.getAddServer();
context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
// if voting member - initialize to VOTING_NOT_INITIALIZED
FollowerState.NON_VOTING;
leader.addFollower(addSrv.getNewServerId(), initialState);
- // TODO
- // if initialState == FollowerState.VOTING_NOT_INITIALIZED
- // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
- // Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
- // Set local instance state and wait for message from the AbstractLeader when install snapshot
- // is done and return now
- // When install snapshot message is received, go to step 1
- // else
- // go to step 2
- //
- // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
- // minIsolatedLeaderPeerCount
- // 2) persist and replicate ServerConfigurationPayload via
- // raftActor.persistData(sender, uuid, newServerConfigurationPayload)
- // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
- // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
- // this class.
- //
if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
leader.initiateCaptureSnapshot(addSrv.getNewServerId());
context.getActorSystem().dispatcher(), context.getActor());
} else {
LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId());
- persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
- addSrv.getNewServerId());
+ persistNewServerConfiguration(raftActor, followerInfo);
}
}
private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
RaftActor raftActor, ActorRef sender){
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ // Sanity check - it's possible we get a reply after it timed out.
+ if(followerInfo == null) {
+ return;
+ }
String followerId = reply.getFollowerId();
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
stopFollowerTimer();
followerLogInformation.setFollowerState(FollowerState.VOTING);
leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
- persistNewServerConfiguration(raftActor, sender, followerId);
+
+ persistNewServerConfiguration(raftActor, followerInfo);
}
- private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
- /* get old server configuration list */
- Map<String, String> tempMap = context.getPeerAddresses();
- List<String> cOld = new ArrayList<String>();
- for (Map.Entry<String, String> entry : tempMap.entrySet()) {
- if(!entry.getKey().equals(followerId)){
- cOld.add(entry.getKey());
- }
- }
- LOG.debug("Cold server configuration : {}", cOld.toString());
- /* get new server configuration list */
- List <String> cNew = new ArrayList<String>(cOld);
- cNew.add(followerId);
- LOG.debug("Cnew server configuration : {}", cNew.toString());
- // construct the peer list
- ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
- /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
- raftActor.persistData(sender, context.getId(), servPayload);
+ private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
+ List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
+ cNew.add(context.getId());
+
+ LOG.debug("New server configuration : {}", cNew.toString());
+
+ ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+
+ raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
}
private void stopFollowerTimer() {
}
private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-
LOG.debug("onFollowerCatchupTimeout: {}", serverId);
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
// cleanup
context.removePeer(serverId);
leader.removeFollower(serverId);
- LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+ LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
}
private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-
- int size = followerInfoQueue.size();
-
// remove the entry from the queue
CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+
// get the sender
ActorRef toClient = fInfo.getClientRequestor();
}
}
- // mantain sender actorRef
- private class CatchupFollowerInfo {
+ // maintain sender actorRef
+ private static class CatchupFollowerInfo {
private final AddServer addServer;
private final ActorRef clientRequestor;
+ private final String contextId;
CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
addServer = addSrv;
clientRequestor = cliReq;
+ contextId = UUID.randomUUID().toString();
}
- public AddServer getAddServer(){
+
+ String getContextId() {
+ return contextId;
+ }
+
+ AddServer getAddServer(){
return addServer;
}
- public ActorRef getClientRequestor(){
+
+ ActorRef getClientRequestor(){
return clientRequestor;
}
}
boolean handleSnapshotMessage(Object message) {
if(message instanceof ApplySnapshot ) {
- onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+ onApplySnapshot((ApplySnapshot) message);
return true;
} else if (message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
}
- private void onApplySnapshot(Snapshot snapshot) {
+ private void onApplySnapshot(ApplySnapshot message) {
log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
- snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
+ message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
- context.getSnapshotManager().apply(snapshot);
+ context.getSnapshotManager().apply(message);
}
}
public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
return null;
}
+
+ @Override
+ public String toString() {
+ return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig="
+ + oldServerConfig + "]";
+ }
}
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
private Procedure<Void> createSnapshotProcedure;
- private Snapshot applySnapshot;
+ private ApplySnapshot applySnapshot;
private Procedure<byte[]> applySnapshotProcedure;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.LOG = logger;
}
+ public boolean isApplying() {
+ return applySnapshot != null;
+ }
+
@Override
public boolean isCapturing() {
return currentState.isCapturing();
}
@Override
- public void apply(Snapshot snapshot) {
+ public void apply(ApplySnapshot snapshot) {
currentState.apply(snapshot);
}
}
@Override
- public void apply(Snapshot snapshot) {
+ public void apply(ApplySnapshot snapshot) {
LOG.debug("apply should not be called in state {}", this);
}
}
@Override
- public void apply(Snapshot snapshot) {
- applySnapshot = snapshot;
+ public void apply(ApplySnapshot applySnapshot) {
+ SnapshotManager.this.applySnapshot = applySnapshot;
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
- context.getPersistenceProvider().saveSnapshot(snapshot);
+ context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
SnapshotManager.this.currentState = PERSISTING;
}
@Override
public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
- LOG.debug("Snapshot success sequence number:", sequenceNumber);
+ LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
if(applySnapshot != null) {
try {
- applySnapshotProcedure.apply(applySnapshot.getState());
+ Snapshot snapshot = applySnapshot.getSnapshot();
+ applySnapshotProcedure.apply(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
- context.setLastApplied(applySnapshot.getLastAppliedIndex());
- context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+ applySnapshot.getCallback().onSuccess();
} catch (Exception e) {
LOG.error("Error applying snapshot", e);
}
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
+ } else {
+ applySnapshot.getCallback().onFailure();
}
lastSequenceNumber = -1;
package org.opendaylight.controller.cluster.raft;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public interface SnapshotState {
*
* @param snapshot the Snapshot to apply.
*/
- void apply(Snapshot snapshot);
+ void apply(ApplySnapshot snapshot);
/**
* Persist the snapshot
package org.opendaylight.controller.cluster.raft.base.messages;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.raft.Snapshot;
-import java.io.Serializable;
-
/**
* Internal message, issued by follower to its actor
*/
-public class ApplySnapshot implements Serializable {
- private static final long serialVersionUID = 1L;
+public class ApplySnapshot {
private final Snapshot snapshot;
+ private final Callback callback;
public ApplySnapshot(Snapshot snapshot) {
- this.snapshot = snapshot;
+ this(snapshot, NOOP_CALLBACK);
+ }
+
+ public ApplySnapshot(@Nonnull Snapshot snapshot, @Nonnull Callback callback) {
+ this.snapshot = Preconditions.checkNotNull(snapshot);
+ this.callback = Preconditions.checkNotNull(callback);
}
+ @Nonnull
public Snapshot getSnapshot() {
return snapshot;
}
+
+ @Nonnull
+ public Callback getCallback() {
+ return callback;
+ }
+
+ public interface Callback {
+ void onSuccess();
+
+ void onFailure();
+ }
+
+ public static Callback NOOP_CALLBACK = new Callback() {
+ @Override
+ public void onSuccess() {
+ }
+
+ @Override
+ public void onFailure() {
+ }
+ };
}
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
followerLogInformation.setFollowerState(followerState);
followerToLog.put(followerId, followerLogInformation);
+
+ if(heartbeatSchedule == null) {
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ }
}
public void removeFollower(String followerId) {
if (replicatedCount >= minReplicationCount) {
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null &&
- replicatedLogEntry.getTerm() == currentTerm()) {
+ if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
context.setCommitIndex(N);
+ } else {
+ break;
}
} else {
break;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
return context.getId();
}
+ public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
+ for(String peerId: context.getPeerAddresses().keySet()) {
+ context.removePeer(peerId);
+ }
+
+ for(String peerId: serverConfig.getNewServerConfig()) {
+ if(!getId().equals(peerId)) {
+ context.addToPeers(peerId, null);
+ }
+ }
+ }
}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
- if (snapshotTracker != null) {
+ if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
context.getReplicatedLog().appendAndPersist(entry);
+
+ if(entry.getData() instanceof ServerConfigurationPayload) {
+ applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+ }
}
LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
return super.handleMessage(sender, message);
}
- private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
try {
+ final InstallSnapshotReply reply = new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor());
- actor().tell(new ApplySnapshot(snapshot), actor());
+ ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+ @Override
+ public void onSuccess() {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
- snapshotTracker = null;
-
- }
+ sender.tell(reply, actor());
+ }
- InstallSnapshotReply reply = new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+ @Override
+ public void onFailure() {
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+ }
+ };
- LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
- sender.tell(reply, actor());
+ snapshotTracker = null;
+ } else {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ sender.tell(reply, actor());
+ }
} catch (SnapshotTracker.InvalidChunkException e) {
LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
*/
package org.opendaylight.controller.cluster.raft;
-//import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
import akka.actor.ActorRef;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
//import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
-//import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-//import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
-//import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+
/**
* Unit tests for RaftActorServerConfigurationSupport.
*
Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(FOLLOWER_ID));
- private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
- Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
- actorFactory.generateActorId(NEW_SERVER_ID));
+ private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
+ private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
- private RaftActorContext newServerActorContext;
+ private RaftActorContext newFollowerActorContext;
private final JavaTestKit testKit = new JavaTestKit(getSystem());
@Before
public void setup() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
- newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
- NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
- Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
- newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ newFollowerCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
+ newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
+ configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(NEW_SERVER_ID));
+ newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
}
@After
}
@Test
- public void testAddServerWithFollower() throws Exception {
+ public void testAddServerWithExistingFollower() throws Exception {
RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
0, 3, 1).build());
- followerActorContext.setCommitIndex(3);
- followerActorContext.setLastApplied(3);
+ followerActorContext.setCommitIndex(2);
+ followerActorContext.setLastApplied(2);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
- Follower newServer = new Follower(newServerActorContext);
- newServerActor.underlyingActor().setBehavior(newServer);
-
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
- leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+ ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+ List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+ assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+ // Verify ServerConfigurationPayload entry in leader's log
+
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
+ assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
+ verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+ // Verify ServerConfigurationPayload entry in both followers
+
+ assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
+ verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+ assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
+ verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+ // Verify new server config was applied in both followers
+
+ assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
+ followerActorContext.getPeerAddresses().keySet());
+
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
+ newFollowerActorContext.getPeerAddresses().keySet());
+
+ clearMessages(followerActor);
+ clearMessages(newFollowerCollectorActor);
+
+ expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+ expectFirstMatching(followerActor, ApplyState.class);
+
+ assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
+ assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
+ assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
+ assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
+ }
+
+ @Test
+ public void testAddServerWithNoExistingFollower() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(1);
+ initialActorContext.setLastApplied(1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
+ 0, 2, 1).build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+ ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+ List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+ assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+ // Verify ServerConfigurationPayload entry in leader's log
+
+ assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
+ assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
+ verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+ // Verify ServerConfigurationPayload entry in the new follower
+
+ clearMessages(newFollowerCollectorActor);
+
+ expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+ assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
+ verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+ // Verify new server config was applied in the new follower
+
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+ newFollowerActorContext.getPeerAddresses().keySet());
+ }
+
+ @Test
+ public void testAddServerAsNonVoting() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+ // Verify ServerConfigurationPayload entry in leader's log
+
+ assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+ assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+ verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+ // Verify ServerConfigurationPayload entry in the new follower
+
+ expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+ assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
+ verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+ // Verify new server config was applied in the new follower
+
+ assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+ newFollowerActorContext.getPeerAddresses().keySet());
+
+ MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
+ }
+
+ @Test
+ public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+ newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
- // leader should install snapshot - capture and verify ApplySnapshot contents
- //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
- //List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
- //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
- // leader should replicate new server config to both followers
- //expectFirstMatching(followerActor, AppendEntries.class);
- //expectFirstMatching(newServerActor, AppendEntries.class);
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
- // verify ServerConfigurationPayload entry in leader's log
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
- //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
- //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
- ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
- leaderActorContext.getReplicatedLog().lastIndex());
- // verify logEntry contents
- // Also verify ServerConfigurationPayload entry in both followers
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
- //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
- //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
- //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+ assertEquals("Leader followers size", 0,
+ ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
}
- //@Test
+ @Test
public void testAddServerWithNoLeader() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
actorFactory.generateActorId(LEADER_ID));
noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
- noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
- //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
- //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+ noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
}
- //@Test
+ @Test
public void testAddServerForwardedToLeader() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, (short)0), leaderActor);
- followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
- //expectFirstMatching(leaderActor, AddServer.class);
+ followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ expectFirstMatching(leaderActor, AddServer.class);
+ }
+
+ private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
+ ReplicatedLogEntry logEntry = log.get(log.lastIndex());
+ assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
+ ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
+ assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
}
private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
+ ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
+ termInfo.update(1, LEADER_ID);
RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
+ id, termInfo, -1, -1,
ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
return followerActorContext;
context.setCommitIndex(fromContext.getCommitIndex());
context.setLastApplied(fromContext.getLastApplied());
+ context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
+ fromContext.getTermInformation().getVotedFor());
}
@Override
static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- configParams.setElectionTimeoutFactor(100000);
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(1);
return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
}
}
+
+ public static class MockNewFollowerRaftActor extends MockRaftActor {
+ private final TestActorRef<MessageCollectorActor> collectorActor;
+ private volatile Class<?> dropMessageOfType;
+
+ public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+ super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
+ this.collectorActor = collectorActor;
+ }
+
+ void setDropMessageOfType(Class<?> dropMessageOfType) {
+ this.dropMessageOfType = dropMessageOfType;
+ }
+
+ @Override
+ public void handleCommand(Object message) {
+ if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
+ return;
+ }
+
+ super.handleCommand(message);
+ collectorActor.tell(message, getSender());
+ }
+
+ static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+ return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
+ }
+ }
}
Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
- sendMessageToSupport(new ApplySnapshot(snapshot));
+ ApplySnapshot applySnapshot = new ApplySnapshot(snapshot);
+ sendMessageToSupport(applySnapshot);
- verify(mockSnapshotManager).apply(snapshot);
+ verify(mockSnapshotManager).apply(applySnapshot);
}
@Test
Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+ applySnapshot.getCallback().onSuccess();
List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
leaderActor, InstallSnapshotReply.class);