*/
package org.opendaylight.controller.cluster.raft;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import scala.concurrent.duration.FiniteDuration;
/**
* Handles server configuration related messages for a RaftActor.
*/
class RaftActorServerConfigurationSupport {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
-
private final RaftActorContext context;
+ // client follower queue
+ private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
+ // timeout handle
+ private Cancellable followerTimeout = null;
RaftActorServerConfigurationSupport(RaftActorContext context) {
this.context = context;
if(message instanceof AddServer) {
onAddServer((AddServer)message, raftActor, sender);
return true;
+ } else if (message instanceof FollowerCatchUpTimeout){
+ FollowerCatchUpTimeout followerTimeout = (FollowerCatchUpTimeout)message;
+ // abort follower catchup on timeout
+ onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
+ return true;
+ } else if (message instanceof UnInitializedFollowerSnapshotReply){
+ // snapshot installation is successful
+ onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
+ return true;
+ } else if(message instanceof ApplyState){
+ ApplyState applyState = (ApplyState) message;
+ Payload data = applyState.getReplicatedLogEntry().getData();
+ if( data instanceof ServerConfigurationPayload){
+ LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
+ (ServerConfigurationPayload)data);
+ // respond ok to follower
+ respondToClient(raftActor, ServerChangeStatus.OK);
+ return true;
+ }
+ return false;
} else {
return false;
}
private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
LOG.debug("onAddServer: {}", addServer);
-
if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
return;
}
- // TODO - check if a server config is in progress. If so, cache this AddServer request to be processed
- // after the current one is done.
-
- context.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+ CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
+ boolean process = !followerInfoQueue.isEmpty();
+ followerInfoQueue.add(followerInfo);
+ if(process) {
+ processAddServer(raftActor);
+ }
+ }
+ private void processAddServer(RaftActor raftActor){
+ LOG.debug("In processAddServer");
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
+ AddServer addSrv = followerInfoQueue.peek().getAddServer();
+ context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
+
+ // if voting member - initialize to VOTING_NOT_INITIALIZED
+ FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
FollowerState.NON_VOTING;
- leader.addFollower(addServer.getNewServerId(), initialState);
+ leader.addFollower(addSrv.getNewServerId(), initialState);
// TODO
// if initialState == FollowerState.VOTING_NOT_INITIALIZED
// Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
// Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
- // Set local instance state and wait for message from the AbstractLeader when install snapshot is done and return now
+ // Set local instance state and wait for message from the AbstractLeader when install snapshot
+ // is done and return now
// When install snapshot message is received, go to step 1
// else
// go to step 2
// on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
// this class.
//
-
- // TODO - temporary
- sender.tell(new AddServerReply(ServerChangeStatus.OK, raftActor.getLeaderId()), raftActor.self());
+ if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
+ LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
+ leader.initiateCaptureSnapshot(addSrv.getNewServerId());
+ // schedule the catchup timeout timer
+ followerTimeout = context.getActorSystem().scheduler()
+ .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
+ TimeUnit.MILLISECONDS),
+ context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
+ context.getActorSystem().dispatcher(), context.getActor());
+ } else {
+ LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId());
+ persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
+ addSrv.getNewServerId());
+ }
}
private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
return true;
}
+
+ private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
+ RaftActor raftActor, ActorRef sender){
+
+ String followerId = reply.getFollowerId();
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
+ stopFollowerTimer();
+ followerLogInformation.setFollowerState(FollowerState.VOTING);
+ leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+ persistNewServerConfiguration(raftActor, sender, followerId);
+ }
+
+ private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
+ /* get old server configuration list */
+ Map<String, String> tempMap = context.getPeerAddresses();
+ List<String> cOld = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : tempMap.entrySet()) {
+ if(!entry.getKey().equals(followerId)){
+ cOld.add(entry.getKey());
+ }
+ }
+ LOG.debug("Cold server configuration : {}", cOld.toString());
+ /* get new server configuration list */
+ List <String> cNew = new ArrayList<String>(cOld);
+ cNew.add(followerId);
+ LOG.debug("Cnew server configuration : {}", cNew.toString());
+ // construct the peer list
+ ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
+ /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
+ raftActor.persistData(sender, context.getId(), servPayload);
+ }
+
+ private void stopFollowerTimer() {
+ if (followerTimeout != null && !followerTimeout.isCancelled()) {
+ followerTimeout.cancel();
+ }
+ }
+
+ private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
+
+ LOG.debug("onFollowerCatchupTimeout: {}", serverId);
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ // cleanup
+ context.removePeer(serverId);
+ leader.removeFollower(serverId);
+ LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+ respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
+ }
+
+ private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
+
+ int size = followerInfoQueue.size();
+
+ // remove the entry from the queue
+ CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+ // get the sender
+ ActorRef toClient = fInfo.getClientRequestor();
+
+ toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
+ LOG.debug("Response returned is {} for server {} ", result, fInfo.getAddServer().getNewServerId());
+ if(!followerInfoQueue.isEmpty()){
+ processAddServer(raftActor);
+ }
+ }
+
+ // mantain sender actorRef
+ private class CatchupFollowerInfo {
+ private final AddServer addServer;
+ private final ActorRef clientRequestor;
+
+ CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
+ addServer = addSrv;
+ clientRequestor = cliReq;
+ }
+ public AddServer getAddServer(){
+ return addServer;
+ }
+ public ActorRef getClientRequestor(){
+ return clientRequestor;
+ }
+ }
}