*/
package org.opendaylight.controller.cluster.raft;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
-import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import scala.concurrent.duration.FiniteDuration;
/**
// snapshot installation is successful
onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
return true;
- } else if(message instanceof ApplyState){
- ApplyState applyState = (ApplyState) message;
- Payload data = applyState.getReplicatedLogEntry().getData();
- if( data instanceof ServerConfigurationPayload){
- LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
- (ServerConfigurationPayload)data);
- // respond ok to follower
- respondToClient(raftActor, ServerChangeStatus.OK);
- return true;
- }
- return false;
+ } else if(message instanceof ApplyState) {
+ return onApplyState((ApplyState) message, raftActor);
} else {
return false;
}
}
+ private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+ Payload data = applyState.getReplicatedLogEntry().getData();
+ if(data instanceof ServerConfigurationPayload) {
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
+ LOG.info("{} has been successfully replicated to a majority of followers", data);
+
+ // respond ok to follower
+ respondToClient(raftActor, ServerChangeStatus.OK);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
- LOG.debug("onAddServer: {}", addServer);
+ LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
return;
}
CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
- boolean process = !followerInfoQueue.isEmpty();
+ boolean process = followerInfoQueue.isEmpty();
followerInfoQueue.add(followerInfo);
if(process) {
processAddServer(raftActor);
}
}
+ /**
+ * The algorithm for AddServer is as follows:
+ * <ul>
+ * <li>Add the new server as a peer.</li>
+ * <li>Add the new follower to the leader.</li>
+ * <li>If new server should be voting member</li>
+ * <ul>
+ * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
+ * <li>Initiate install snapshot to the new follower.</li>
+ * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
+ * </ul>
+ * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
+ * <li>On replication consensus, respond to caller with OK.</li>
+ * </ul>
+ * If the install snapshot times out after a period of 2 * election time out
+ * <ul>
+ * <li>Remove the new server as a peer.</li>
+ * <li>Remove the new follower from the leader.</li>
+ * <li>Respond to caller with TIMEOUT.</li>
+ * </ul>
+ */
private void processAddServer(RaftActor raftActor){
- LOG.debug("In processAddServer");
+ LOG.debug("{}: In processAddServer", context.getId());
+
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- AddServer addSrv = followerInfoQueue.peek().getAddServer();
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ AddServer addSrv = followerInfo.getAddServer();
context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
// if voting member - initialize to VOTING_NOT_INITIALIZED
FollowerState.NON_VOTING;
leader.addFollower(addSrv.getNewServerId(), initialState);
- // TODO
- // if initialState == FollowerState.VOTING_NOT_INITIALIZED
- // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
- // Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
- // Set local instance state and wait for message from the AbstractLeader when install snapshot
- // is done and return now
- // When install snapshot message is received, go to step 1
- // else
- // go to step 2
- //
- // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
- // minIsolatedLeaderPeerCount
- // 2) persist and replicate ServerConfigurationPayload via
- // raftActor.persistData(sender, uuid, newServerConfigurationPayload)
- // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
- // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
- // this class.
- //
if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
leader.initiateCaptureSnapshot(addSrv.getNewServerId());
context.getActorSystem().dispatcher(), context.getActor());
} else {
LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId());
- persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
- addSrv.getNewServerId());
+ persistNewServerConfiguration(raftActor, followerInfo);
}
}
private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
RaftActor raftActor, ActorRef sender){
+ CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+ // Sanity check - it's possible we get a reply after it timed out.
+ if(followerInfo == null) {
+ return;
+ }
String followerId = reply.getFollowerId();
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
stopFollowerTimer();
followerLogInformation.setFollowerState(FollowerState.VOTING);
leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
- persistNewServerConfiguration(raftActor, sender, followerId);
+
+ persistNewServerConfiguration(raftActor, followerInfo);
}
- private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
- /* get old server configuration list */
- Map<String, String> tempMap = context.getPeerAddresses();
- List<String> cOld = new ArrayList<String>();
- for (Map.Entry<String, String> entry : tempMap.entrySet()) {
- if(!entry.getKey().equals(followerId)){
- cOld.add(entry.getKey());
- }
- }
- LOG.debug("Cold server configuration : {}", cOld.toString());
- /* get new server configuration list */
- List <String> cNew = new ArrayList<String>(cOld);
- cNew.add(followerId);
- LOG.debug("Cnew server configuration : {}", cNew.toString());
- // construct the peer list
- ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
- /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
- raftActor.persistData(sender, context.getId(), servPayload);
+ private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
+ List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
+ cNew.add(context.getId());
+
+ LOG.debug("New server configuration : {}", cNew.toString());
+
+ ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+
+ raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
}
private void stopFollowerTimer() {
}
private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-
LOG.debug("onFollowerCatchupTimeout: {}", serverId);
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
// cleanup
context.removePeer(serverId);
leader.removeFollower(serverId);
- LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+ LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
}
private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-
- int size = followerInfoQueue.size();
-
// remove the entry from the queue
CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+
// get the sender
ActorRef toClient = fInfo.getClientRequestor();
}
}
- // mantain sender actorRef
- private class CatchupFollowerInfo {
+ // maintain sender actorRef
+ private static class CatchupFollowerInfo {
private final AddServer addServer;
private final ActorRef clientRequestor;
+ private final String contextId;
CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
addServer = addSrv;
clientRequestor = cliReq;
+ contextId = UUID.randomUUID().toString();
}
- public AddServer getAddServer(){
+
+ String getContextId() {
+ return contextId;
+ }
+
+ AddServer getAddServer(){
return addServer;
}
- public ActorRef getClientRequestor(){
+
+ ActorRef getClientRequestor(){
return clientRequestor;
}
}