import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
/**
* Handles server configuration related messages for a RaftActor.
private final OperationState IDLE = new Idle();
+ private final RaftActor raftActor;
+
private final RaftActorContext raftContext;
private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
private OperationState currentOperationState = IDLE;
- RaftActorServerConfigurationSupport(RaftActorContext context) {
- this.raftContext = context;
+ RaftActorServerConfigurationSupport(RaftActor raftActor) {
+ this.raftActor = raftActor;
+ this.raftContext = raftActor.getRaftActorContext();
}
- boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
+ boolean handleMessage(Object message, ActorRef sender) {
if(message instanceof AddServer) {
- onAddServer((AddServer)message, raftActor, sender);
+ onAddServer((AddServer) message, sender);
+ return true;
+ } else if(message instanceof RemoveServer) {
+ onRemoveServer((RemoveServer) message, sender);
return true;
- } else if (message instanceof FollowerCatchUpTimeout) {
- currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
+ } else if (message instanceof ServerOperationTimeout) {
+ currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
return true;
} else if (message instanceof UnInitializedFollowerSnapshotReply) {
- currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
- (UnInitializedFollowerSnapshotReply)message);
+ currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
return true;
} else if(message instanceof ApplyState) {
- return onApplyState((ApplyState) message, raftActor);
+ return onApplyState((ApplyState) message);
} else if(message instanceof SnapshotComplete) {
- currentOperationState.onSnapshotComplete(raftActor);
+ currentOperationState.onSnapshotComplete();
return false;
} else {
return false;
}
}
- private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+ private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
+ LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
+ if(removeServer.getServerId().equals(raftActor.getLeaderId())){
+ // Removing current leader is not supported yet
+ // TODO: To properly support current leader removal we need to first implement transfer of leadership
+ LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
+ sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
+ } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
+ sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
+ } else {
+ onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
+ }
+ }
+
+ private boolean onApplyState(ApplyState applyState) {
Payload data = applyState.getReplicatedLogEntry().getData();
if(data instanceof ServerConfigurationPayload) {
- currentOperationState.onApplyState(raftActor, applyState);
+ currentOperationState.onApplyState(applyState);
return true;
}
* <li>Respond to caller with TIMEOUT.</li>
* </ul>
*/
- private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
- LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
+ private void onAddServer(AddServer addServer, ActorRef sender) {
+ LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
- onNewOperation(raftActor, new AddServerContext(addServer, sender));
+ onNewOperation(new AddServerContext(addServer, sender));
}
- private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ private void onNewOperation(ServerOperationContext<?> operationContext) {
if (raftActor.isLeader()) {
- currentOperationState.onNewOperation(raftActor, operationContext);
+ currentOperationState.onNewOperation(operationContext);
} else {
ActorSelection leader = raftActor.getLeader();
if (leader != null) {
* Interface for a server operation FSM state.
*/
private interface OperationState {
- void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
+ void onNewOperation(ServerOperationContext<?> operationContext);
- void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
+ void onServerOperationTimeout(ServerOperationTimeout timeout);
- void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
+ void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
- void onApplyState(RaftActor raftActor, ApplyState applyState);
+ void onApplyState(ApplyState applyState);
- void onSnapshotComplete(RaftActor raftActor);
+ void onSnapshotComplete();
}
/**
* Interface for the initial state for a server operation.
*/
private interface InitialOperationState {
- void initiate(RaftActor raftActor);
+ void initiate();
}
/**
- * Abstract base class for server operation FSM state. Handles common behavior for all states.
+ * Abstract base class for a server operation FSM state. Handles common behavior for all states.
*/
private abstract class AbstractOperationState implements OperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
// We're currently processing another operation so queue it to be processed later.
LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
}
@Override
- public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
- LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ LOG.debug("onServerOperationTimeout should not be called in state {}", this);
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
LOG.debug("onApplyState was called in state {}", this);
}
@Override
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
}
- protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
- Collection<PeerInfo> peers = raftContext.getPeers();
- List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
- for(PeerInfo peer: peers) {
- newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
- }
-
- newConfig.add(new ServerInfo(raftContext.getId(), true));
-
- LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
-
- ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig);
+ protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
+ raftContext.setDynamicServerConfigurationInUse();
+ ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
+ LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
- currentOperationState = new Persisting(operationContext);
- }
+ currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
- protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
- ServerChangeStatus status) {
+ sendReply(operationContext, ServerChangeStatus.OK);
+ }
- LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
+ protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
+ if(replyStatus != null) {
+ sendReply(operationContext, replyStatus);
+ }
- operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
- raftActor.self());
+ operationContext.operationComplete(raftActor, replyStatus);
currentOperationState = IDLE;
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
if(nextOperation != null) {
- RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
+ RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
}
}
+ protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
+ LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
+
+ operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
+ raftActor.self());
+ }
+
+ Cancellable newTimer(Object message) {
+ return raftContext.getActorSystem().scheduler().scheduleOnce(
+ raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
+ raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+ }
+
@Override
public String toString() {
return getClass().getSimpleName();
*/
private class Idle extends AbstractOperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
- operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
+ operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Noop - we override b/c ApplyState is called normally for followers in the idle state.
}
}
*/
private class Persisting extends AbstractOperationState {
private final ServerOperationContext<?> operationContext;
+ private final Cancellable timer;
+ private boolean timedOut = false;
- Persisting(ServerOperationContext<?> operationContext) {
+ Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
this.operationContext = operationContext;
+ this.timer = timer;
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
- LOG.info("{}: {} has been successfully replicated to a majority of followers",
+ LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
applyState.getReplicatedLogEntry().getData());
- operationComplete(raftActor, operationContext, ServerChangeStatus.OK);
+ timer.cancel();
+ operationComplete(operationContext, null);
+ }
+ }
+
+ @Override
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
+ timeout.getServerId());
+
+ timedOut = true;
+
+ // Fail any pending operations
+ ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
+ while(nextOperation != null) {
+ sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ nextOperation = pendingOperationsQueue.poll();
+ }
+ }
+
+ @Override
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
+ if(timedOut) {
+ sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ } else {
+ super.onNewOperation(operationContext);
}
}
}
return addServerContext;
}
- Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
- return raftContext.getActorSystem().scheduler().scheduleOnce(
- new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
- TimeUnit.MILLISECONDS), raftContext.getActor(),
- new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
- raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+ Cancellable newInstallSnapshotTimer() {
+ return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
}
- void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
- String serverId = followerTimeout.getNewServerId();
+ void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
+ String serverId = timeout.getServerId();
- LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
+ LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
// cleanup
raftContext.removePeer(serverId);
leader.removeFollower(serverId);
}
- operationComplete(raftActor, getAddServerContext(),
- isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+ operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
}
+
}
/**
}
@Override
- public void initiate(RaftActor raftActor) {
+ public void initiate() {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-
AddServer addServer = getAddServerContext().getOperation();
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
+ if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
+ operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
+ return;
+ }
+
VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
VotingState.NON_VOTING;
raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
if(votingState == VotingState.VOTING_NOT_INITIALIZED){
// schedule the install snapshot timeout timer
- Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
+ Cancellable installSnapshotTimer = newInstallSnapshotTimer();
if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
addServer.getNewServerId());
LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
raftContext.getId());
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
}
}
}
}
@Override
- public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
- handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
- followerTimeout.getNewServerId());
+ timeout.getServerId());
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
String followerId = reply.getFollowerId();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
installSnapshotTimer.cancel();
} else {
}
@Override
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
LOG.debug("{}: onSnapshotComplete", raftContext.getId());
if(!raftActor.isLeader()) {
getAddServerContext().getOperation().getNewServerId());
currentOperationState = new InstallingSnapshot(getAddServerContext(),
- newInstallSnapshotTimer(raftActor));
+ newInstallSnapshotTimer());
snapshotTimer.cancel();
}
}
@Override
- public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
- handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
- raftContext.getId(), followerTimeout.getNewServerId());
+ raftContext.getId(), timeout.getServerId());
}
}
abstract Object newReply(ServerChangeStatus status, String leaderId);
abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
+
+ abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
+
+ abstract String getServerId();
}
/**
InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
return support.new InitialAddServerState(this);
}
+
+ @Override
+ void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
+
+ }
+
+ @Override
+ String getServerId() {
+ return getOperation().getNewServerId();
+ }
+ }
+
+ private abstract class RemoveServerState extends AbstractOperationState {
+ private final RemoveServerContext removeServerContext;
+
+
+ protected RemoveServerState(RemoveServerContext removeServerContext) {
+ this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
+
+ }
+
+ public RemoveServerContext getRemoveServerContext() {
+ return removeServerContext;
+ }
+ }
+
+ private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
+
+ protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
+ super(removeServerContext);
+ }
+
+ @Override
+ public void initiate() {
+ raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
+ persistNewServerConfiguration(getRemoveServerContext());
+ }
+ }
+
+ private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
+ private final String peerAddress;
+
+ RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
+ super(operation, clientRequestor);
+ this.peerAddress = peerAddress;
+ }
+
+ @Override
+ Object newReply(ServerChangeStatus status, String leaderId) {
+ return new RemoveServerReply(status, leaderId);
+ }
+
+ @Override
+ InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+ return support.new InitialRemoveServerState(this);
+ }
+
+ @Override
+ void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
+ if(peerAddress != null) {
+ raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
+ }
+ }
+
+ @Override
+ String getServerId() {
+ return getOperation().getServerId();
+ }
+ }
+
+ static class ServerOperationTimeout {
+ private final String serverId;
+
+ ServerOperationTimeout(String serverId){
+ this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
+ }
+
+ String getServerId() {
+ return serverId;
+ }
}
}