From: Tom Pantelis Date: Wed, 21 Oct 2015 09:02:17 +0000 (-0400) Subject: Bug 2187: AddServer unit test and bug fixes X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3fda1a923defdbf18849c6080c3aa19f1ebf2c5f Bug 2187: AddServer unit test and bug fixes Follow-up patch to https://git.opendaylight.org/gerrit/#/c/28018/. Got the unit tests working and added more unit tests to cover more code. Also fixed several bugs in the code that were failing the tests. One bug was caused by replicating data quickly after install snapshot was complete. On the final install snapshot chunk the follower sends an ApplySnaphot message to persist and apply the snapshot. On the reply, the leader assumes the follower is up-to-date and sets its next index. However, applying the snapshot, ie updating the log and commit index, is actually done after the async callback from the snapshot persist. In between that time, if the leader sends the server config AppendEntries, the follower's log is still empty and it deems itself out-of-sync and reports back failure. This will cause the leader to eventually send a new install snaphot which isn't which is not desirable. Also it may delay consensus for the server config entry. To fix this, I delayed the final InstallSnapshotReply until after the ApplySnapshot is complete. I did this by adding a Callback to the ApplySnapshot message which the SnapshotManager invokes. Also the new server config was constructed without the leader's ID - it needs to contain all members. Also the ServerConfigurationPayload wasn't being applied in the followers. Another issue was that, if the leader had no peers initially, the heartbeat wasn't scheduled so, when the new server was added, heartbeats weren't occurring. So I change addFollower to schedule the heartbeat. I added a test for adding a non-voting server which caused an endless loop in AbstractLeader#handleAppendEntriesReply where it updates the commitIndex based on the replicated count. To fix this, I added a break if the replicatedLogEntry is null. Change-Id: I5dff351140c611d58357cd58900bed401606038c Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1996a814db..c5d81c18cc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -192,14 +192,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleCommand(final Object message) { - if (message instanceof ApplyState){ + if(serverConfigurationSupport.handleMessage(message, this, getSender())) { + return; + } else if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; - boolean result = serverConfigurationSupport.handleMessage(message, this, getSender()); - if(result){ - return; - } - long elapsedTime = (System.nanoTime() - applyState.getStartTime()); if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", @@ -244,8 +241,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } else if(message instanceof SwitchBehavior){ switchBehavior(((SwitchBehavior) message)); - } else if(!snapshotSupport.handleSnapshotMessage(message) && - !serverConfigurationSupport.handleMessage(message, this, getSender())) { + } else if(!snapshotSupport.handleSnapshotMessage(message)) { switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 0c34158ca3..ae23140114 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -7,26 +7,27 @@ */ package org.opendaylight.controller.cluster.raft; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout; -import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import scala.concurrent.duration.FiniteDuration; /** @@ -59,40 +60,71 @@ class RaftActorServerConfigurationSupport { // snapshot installation is successful onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender); return true; - } else if(message instanceof ApplyState){ - ApplyState applyState = (ApplyState) message; - Payload data = applyState.getReplicatedLogEntry().getData(); - if( data instanceof ServerConfigurationPayload){ - LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully", - (ServerConfigurationPayload)data); - // respond ok to follower - respondToClient(raftActor, ServerChangeStatus.OK); - return true; - } - return false; + } else if(message instanceof ApplyState) { + return onApplyState((ApplyState) message, raftActor); } else { return false; } } + private boolean onApplyState(ApplyState applyState, RaftActor raftActor) { + Payload data = applyState.getReplicatedLogEntry().getData(); + if(data instanceof ServerConfigurationPayload) { + CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); + if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) { + LOG.info("{} has been successfully replicated to a majority of followers", data); + + // respond ok to follower + respondToClient(raftActor, ServerChangeStatus.OK); + } + + return true; + } + + return false; + } + private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { - LOG.debug("onAddServer: {}", addServer); + LOG.debug("{}: onAddServer: {}", context.getId(), addServer); if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) { return; } CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender); - boolean process = !followerInfoQueue.isEmpty(); + boolean process = followerInfoQueue.isEmpty(); followerInfoQueue.add(followerInfo); if(process) { processAddServer(raftActor); } } + /** + * The algorithm for AddServer is as follows: + * + * If the install snapshot times out after a period of 2 * election time out + * + */ private void processAddServer(RaftActor raftActor){ - LOG.debug("In processAddServer"); + LOG.debug("{}: In processAddServer", context.getId()); + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - AddServer addSrv = followerInfoQueue.peek().getAddServer(); + CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); + AddServer addSrv = followerInfo.getAddServer(); context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress()); // if voting member - initialize to VOTING_NOT_INITIALIZED @@ -100,24 +132,6 @@ class RaftActorServerConfigurationSupport { FollowerState.NON_VOTING; leader.addFollower(addSrv.getNewServerId(), initialState); - // TODO - // if initialState == FollowerState.VOTING_NOT_INITIALIZED - // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId()) - // Start a timer to abort the operation after a period of time (maybe 2 times election timeout) - // Set local instance state and wait for message from the AbstractLeader when install snapshot - // is done and return now - // When install snapshot message is received, go to step 1 - // else - // go to step 2 - // - // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and - // minIsolatedLeaderPeerCount - // 2) persist and replicate ServerConfigurationPayload via - // raftActor.persistData(sender, uuid, newServerConfigurationPayload) - // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor, - // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call - // this class. - // if(initialState == FollowerState.VOTING_NOT_INITIALIZED){ LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId()); leader.initiateCaptureSnapshot(addSrv.getNewServerId()); @@ -129,8 +143,7 @@ class RaftActorServerConfigurationSupport { context.getActorSystem().dispatcher(), context.getActor()); } else { LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId()); - persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(), - addSrv.getNewServerId()); + persistNewServerConfiguration(raftActor, followerInfo); } } @@ -153,6 +166,11 @@ class RaftActorServerConfigurationSupport { private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply, RaftActor raftActor, ActorRef sender){ + CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); + // Sanity check - it's possible we get a reply after it timed out. + if(followerInfo == null) { + return; + } String followerId = reply.getFollowerId(); AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); @@ -160,27 +178,19 @@ class RaftActorServerConfigurationSupport { stopFollowerTimer(); followerLogInformation.setFollowerState(FollowerState.VOTING); leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); - persistNewServerConfiguration(raftActor, sender, followerId); + + persistNewServerConfiguration(raftActor, followerInfo); } - private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){ - /* get old server configuration list */ - Map tempMap = context.getPeerAddresses(); - List cOld = new ArrayList(); - for (Map.Entry entry : tempMap.entrySet()) { - if(!entry.getKey().equals(followerId)){ - cOld.add(entry.getKey()); - } - } - LOG.debug("Cold server configuration : {}", cOld.toString()); - /* get new server configuration list */ - List cNew = new ArrayList(cOld); - cNew.add(followerId); - LOG.debug("Cnew server configuration : {}", cNew.toString()); - // construct the peer list - ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew); - /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */ - raftActor.persistData(sender, context.getId(), servPayload); + private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){ + List cNew = new ArrayList(context.getPeerAddresses().keySet()); + cNew.add(context.getId()); + + LOG.debug("New server configuration : {}", cNew.toString()); + + ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.emptyList()); + + raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload); } private void stopFollowerTimer() { @@ -190,22 +200,19 @@ class RaftActorServerConfigurationSupport { } private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){ - LOG.debug("onFollowerCatchupTimeout: {}", serverId); AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); // cleanup context.removePeer(serverId); leader.removeFollower(serverId); - LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId); + LOG.warn("Timeout occured for new server {} while installing snapshot", serverId); respondToClient(raftActor,ServerChangeStatus.TIMEOUT); } private void respondToClient(RaftActor raftActor, ServerChangeStatus result){ - - int size = followerInfoQueue.size(); - // remove the entry from the queue CatchupFollowerInfo fInfo = followerInfoQueue.remove(); + // get the sender ActorRef toClient = fInfo.getClientRequestor(); @@ -216,19 +223,27 @@ class RaftActorServerConfigurationSupport { } } - // mantain sender actorRef - private class CatchupFollowerInfo { + // maintain sender actorRef + private static class CatchupFollowerInfo { private final AddServer addServer; private final ActorRef clientRequestor; + private final String contextId; CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){ addServer = addSrv; clientRequestor = cliReq; + contextId = UUID.randomUUID().toString(); } - public AddServer getAddServer(){ + + String getContextId() { + return contextId; + } + + AddServer getAddServer(){ return addServer; } - public ActorRef getClientRequestor(){ + + ActorRef getClientRequestor(){ return clientRequestor; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 2db595d743..bf0fc10aad 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -55,7 +55,7 @@ class RaftActorSnapshotMessageSupport { boolean handleSnapshotMessage(Object message) { if(message instanceof ApplySnapshot ) { - onApplySnapshot(((ApplySnapshot) message).getSnapshot()); + onApplySnapshot((ApplySnapshot) message); return true; } else if (message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess) message); @@ -95,10 +95,10 @@ class RaftActorSnapshotMessageSupport { context.getSnapshotManager().commit(sequenceNumber, currentBehavior); } - private void onApplySnapshot(Snapshot snapshot) { + private void onApplySnapshot(ApplySnapshot message) { log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(), - snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm()); + message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm()); - context.getSnapshotManager().apply(snapshot); + context.getSnapshotManager().apply(message); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java index 32cb458c20..db1f193cba 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java @@ -79,4 +79,10 @@ public class ServerConfigurationPayload extends Payload implements Serializable public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) { return null; } + + @Override + public String toString() { + return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig=" + + oldServerConfig + "]"; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index c553a397f6..8e0d2f820b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -12,6 +12,7 @@ import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -37,7 +38,7 @@ public class SnapshotManager implements SnapshotState { private Procedure createSnapshotProcedure; - private Snapshot applySnapshot; + private ApplySnapshot applySnapshot; private Procedure applySnapshotProcedure; public SnapshotManager(RaftActorContext context, Logger logger) { @@ -45,6 +46,10 @@ public class SnapshotManager implements SnapshotState { this.LOG = logger; } + public boolean isApplying() { + return applySnapshot != null; + } + @Override public boolean isCapturing() { return currentState.isCapturing(); @@ -61,7 +66,7 @@ public class SnapshotManager implements SnapshotState { } @Override - public void apply(Snapshot snapshot) { + public void apply(ApplySnapshot snapshot) { currentState.apply(snapshot); } @@ -130,7 +135,7 @@ public class SnapshotManager implements SnapshotState { } @Override - public void apply(Snapshot snapshot) { + public void apply(ApplySnapshot snapshot) { LOG.debug("apply should not be called in state {}", this); } @@ -260,14 +265,14 @@ public class SnapshotManager implements SnapshotState { } @Override - public void apply(Snapshot snapshot) { - applySnapshot = snapshot; + public void apply(ApplySnapshot applySnapshot) { + SnapshotManager.this.applySnapshot = applySnapshot; lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber); - context.getPersistenceProvider().saveSnapshot(snapshot); + context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot()); SnapshotManager.this.currentState = PERSISTING; } @@ -374,16 +379,19 @@ public class SnapshotManager implements SnapshotState { @Override public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { - LOG.debug("Snapshot success sequence number:", sequenceNumber); + LOG.debug("Snapshot success sequence number: {}", sequenceNumber); if(applySnapshot != null) { try { - applySnapshotProcedure.apply(applySnapshot.getState()); + Snapshot snapshot = applySnapshot.getSnapshot(); + applySnapshotProcedure.apply(snapshot.getState()); //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior)); - context.setLastApplied(applySnapshot.getLastAppliedIndex()); - context.setCommitIndex(applySnapshot.getLastAppliedIndex()); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); + context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); + + applySnapshot.getCallback().onSuccess(); } catch (Exception e) { LOG.error("Error applying snapshot", e); } @@ -412,6 +420,8 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), context.getReplicatedLog().size()); + } else { + applySnapshot.getCallback().onFailure(); } lastSequenceNumber = -1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 46e0c87fc2..f5a175f389 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; public interface SnapshotState { @@ -42,7 +43,7 @@ public interface SnapshotState { * * @param snapshot the Snapshot to apply. */ - void apply(Snapshot snapshot); + void apply(ApplySnapshot snapshot); /** * Persist the snapshot diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java index 54ee02a057..3c43d809b9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java @@ -8,22 +8,49 @@ package org.opendaylight.controller.cluster.raft.base.messages; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.raft.Snapshot; -import java.io.Serializable; - /** * Internal message, issued by follower to its actor */ -public class ApplySnapshot implements Serializable { - private static final long serialVersionUID = 1L; +public class ApplySnapshot { private final Snapshot snapshot; + private final Callback callback; public ApplySnapshot(Snapshot snapshot) { - this.snapshot = snapshot; + this(snapshot, NOOP_CALLBACK); + } + + public ApplySnapshot(@Nonnull Snapshot snapshot, @Nonnull Callback callback) { + this.snapshot = Preconditions.checkNotNull(snapshot); + this.callback = Preconditions.checkNotNull(callback); } + @Nonnull public Snapshot getSnapshot() { return snapshot; } + + @Nonnull + public Callback getCallback() { + return callback; + } + + public interface Callback { + void onSuccess(); + + void onFailure(); + } + + public static Callback NOOP_CALLBACK = new Callback() { + @Override + public void onSuccess() { + } + + @Override + public void onFailure() { + } + }; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 605a5c21a4..3c5ad0428f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -135,6 +135,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); followerLogInformation.setFollowerState(followerState); followerToLog.put(followerId, followerLogInformation); + + if(heartbeatSchedule == null) { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } } public void removeFollower(String followerId) { @@ -254,9 +258,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); - if (replicatedLogEntry != null && - replicatedLogEntry.getTerm() == currentTerm()) { + if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) { context.setCommitIndex(N); + } else { + break; } } else { break; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 7e6175d933..787bd74629 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -490,4 +491,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getId(); } + public void applyServerConfiguration(ServerConfigurationPayload serverConfig) { + for(String peerId: context.getPeerAddresses().keySet()) { + context.removePeer(peerId); + } + + for(String peerId: serverConfig.getNewServerConfig()) { + if(!getId().equals(peerId)) { + context.addToPeers(peerId, null); + } + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d516f9ccb3..8f41147537 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -103,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior { // to make it easier to read. Before refactoring ensure tests // cover the code properly - if (snapshotTracker != null) { + if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { // if snapshot install is in progress, follower should just acknowledge append entries with a reply. AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm(), context.getPayloadVersion()); @@ -189,6 +190,10 @@ public class Follower extends AbstractRaftActorBehavior { LOG.debug("{}: Append entry to log {}", logName(), entry.getData()); context.getReplicatedLog().appendAndPersist(entry); + + if(entry.getData() instanceof ServerConfigurationPayload) { + applyServerConfiguration((ServerConfigurationPayload)entry.getData()); + } } LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size()); @@ -335,7 +340,7 @@ public class Follower extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { + private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}", logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(), @@ -348,6 +353,9 @@ public class Follower extends AbstractRaftActorBehavior { updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); try { + final InstallSnapshotReply reply = new InstallSnapshotReply( + currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true); + if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())){ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(), @@ -359,19 +367,28 @@ public class Follower extends AbstractRaftActorBehavior { context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor()); - actor().tell(new ApplySnapshot(snapshot), actor()); + ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { + @Override + public void onSuccess() { + LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); - snapshotTracker = null; - - } + sender.tell(reply, actor()); + } - InstallSnapshotReply reply = new InstallSnapshotReply( - currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true); + @Override + public void onFailure() { + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor()); + } + }; - LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); + actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor()); - sender.tell(reply, actor()); + snapshotTracker = null; + } else { + LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); + sender.tell(reply, actor()); + } } catch (SnapshotTracker.InvalidChunkException e) { LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 538681e8df..5c9ba13b8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -7,7 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; -//import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertEquals; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; import akka.actor.ActorRef; @@ -19,7 +19,9 @@ import akka.testkit.TestActorRef; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.Collections; +import java.util.List; import java.util.Map; //import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,20 +30,26 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.messages.AddServer; -//import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -//import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; -//import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; + /** * Unit tests for RaftActorServerConfigurationSupport. * @@ -60,23 +68,29 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(FOLLOWER_ID)); - private final TestActorRef newServerActor = actorFactory.createTestActor( - Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), - actorFactory.generateActorId(NEW_SERVER_ID)); + private TestActorRef newFollowerRaftActor; + private TestActorRef newFollowerCollectorActor; - private RaftActorContext newServerActorContext; + private RaftActorContext newFollowerActorContext; private final JavaTestKit testKit = new JavaTestKit(getSystem()); @Before public void setup() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); - newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(), - NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1, - Maps.newHashMap(), configParams, NO_PERSISTENCE, LOG); - newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + newFollowerCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(NEW_SERVER_ID + "Collector")); + newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props( + configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(NEW_SERVER_ID)); + newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext(); } @After @@ -85,19 +99,16 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } @Test - public void testAddServerWithFollower() throws Exception { + public void testAddServerWithExistingFollower() throws Exception { RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( 0, 3, 1).build()); - followerActorContext.setCommitIndex(3); - followerActorContext.setLastApplied(3); + followerActorContext.setCommitIndex(2); + followerActorContext.setLastApplied(2); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); - Follower newServer = new Follower(newServerActorContext); - newServerActor.underlyingActor().setBehavior(newServer); - TestActorRef leaderActor = actorFactory.createTestActor( MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -109,33 +120,173 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); - leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + // Leader should install snapshot - capture and verify ApplySnapshot contents + + ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); + List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in both followers + + assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in both followers + + assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID), + followerActorContext.getPeerAddresses().keySet()); + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + + clearMessages(followerActor); + clearMessages(newFollowerCollectorActor); + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + expectFirstMatching(followerActor, ApplyState.class); + + assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied()); + assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex()); + assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied()); + } + + @Test + public void testAddServerWithNoExistingFollower() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(1); + initialActorContext.setLastApplied(1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( + 0, 2, 1).build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + // Leader should install snapshot - capture and verify ApplySnapshot contents + + ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); + List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); + assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in the new follower + + clearMessages(newFollowerCollectorActor); + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in the new follower + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + } + + @Test + public void testAddServerAsNonVoting() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef()); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + // Verify ServerConfigurationPayload entry in leader's log + + assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify ServerConfigurationPayload entry in the new follower + + expectFirstMatching(newFollowerCollectorActor, ApplyState.class); + assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex()); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + + // Verify new server config was applied in the new follower + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), + newFollowerActorContext.getPeerAddresses().keySet()); + + MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500); + } + + @Test + public void testAddServerWithInstallSnapshotTimeout() throws Exception { + newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS); - // leader should install snapshot - capture and verify ApplySnapshot contents - //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class); - //List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); - //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); - // leader should replicate new server config to both followers - //expectFirstMatching(followerActor, AppendEntries.class); - //expectFirstMatching(newServerActor, AppendEntries.class); + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); - // verify ServerConfigurationPayload entry in leader's log + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); - //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size()); - //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); - ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get( - leaderActorContext.getReplicatedLog().lastIndex()); - // verify logEntry contents - // Also verify ServerConfigurationPayload entry in both followers + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); - //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); - //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); - //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus()); + + assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size()); + assertEquals("Leader followers size", 0, + ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size()); } - //@Test + @Test public void testAddServerWithNoLeader() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -146,12 +297,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { actorFactory.generateActorId(LEADER_ID)); noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete(); - noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); - //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); - //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); } - //@Test + @Test public void testAddServerForwardedToLeader() { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); @@ -169,16 +320,25 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.emptyList(), -1, -1, (short)0), leaderActor); - followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); - //expectFirstMatching(leaderActor, AddServer.class); + followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + expectFirstMatching(leaderActor, AddServer.class); + } + + private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) { + ReplicatedLogEntry logEntry = log.get(log.lastIndex()); + assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass()); + ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData(); + assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig())); } private RaftActorContext newFollowerContext(String id, TestActorRef actor) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); + ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG); + termInfo.update(1, LEADER_ID); RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), - id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1, + id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG); return followerActorContext; @@ -198,6 +358,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { context.setCommitIndex(fromContext.getCommitIndex()); context.setLastApplied(fromContext.getLastApplied()); + context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(), + fromContext.getTermInformation().getVotedFor()); } @Override @@ -217,9 +379,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static Props props(Map peerAddresses, RaftActorContext fromContext) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - configParams.setElectionTimeoutFactor(100000); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(1); return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext); } } + + public static class MockNewFollowerRaftActor extends MockRaftActor { + private final TestActorRef collectorActor; + private volatile Class dropMessageOfType; + + public MockNewFollowerRaftActor(ConfigParams config, TestActorRef collectorActor) { + super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), null); + this.collectorActor = collectorActor; + } + + void setDropMessageOfType(Class dropMessageOfType) { + this.dropMessageOfType = dropMessageOfType; + } + + @Override + public void handleCommand(Object message) { + if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) { + return; + } + + super.handleCommand(message); + collectorActor.tell(message, getSender()); + } + + static Props props(ConfigParams config, TestActorRef collectorActor) { + return Props.create(MockNewFollowerRaftActor.class, config, collectorActor); + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 7d41508c3f..d79a48357a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -97,9 +97,10 @@ public class RaftActorSnapshotMessageSupportTest { Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.emptyList(), lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1); - sendMessageToSupport(new ApplySnapshot(snapshot)); + ApplySnapshot applySnapshot = new ApplySnapshot(snapshot); + sendMessageToSupport(applySnapshot); - verify(mockSnapshotManager).apply(snapshot); + verify(mockSnapshotManager).apply(applySnapshot); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index a06d086e99..51818d15b6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -785,6 +785,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); + applySnapshot.getCallback().onSuccess(); List replies = MessageCollectorActor.getAllMatching( leaderActor, InstallSnapshotReply.class);