import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;
import javax.annotation.Nullable;
private final OperationState IDLE = new Idle();
+ private final RaftActor raftActor;
+
private final RaftActorContext raftContext;
- private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
+ private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
private OperationState currentOperationState = IDLE;
- RaftActorServerConfigurationSupport(RaftActorContext context) {
- this.raftContext = context;
+ RaftActorServerConfigurationSupport(RaftActor raftActor) {
+ this.raftActor = raftActor;
+ this.raftContext = raftActor.getRaftActorContext();
}
- boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
+ boolean handleMessage(Object message, ActorRef sender) {
if(message instanceof AddServer) {
- onAddServer((AddServer) message, raftActor, sender);
+ onAddServer((AddServer) message, sender);
return true;
} else if(message instanceof RemoveServer) {
- onRemoveServer((RemoveServer) message, raftActor, sender);
+ onRemoveServer((RemoveServer) message, sender);
return true;
} else if (message instanceof ServerOperationTimeout) {
- currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message);
+ currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
return true;
} else if (message instanceof UnInitializedFollowerSnapshotReply) {
- currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
- (UnInitializedFollowerSnapshotReply) message);
+ currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
return true;
} else if(message instanceof ApplyState) {
- return onApplyState((ApplyState) message, raftActor);
+ return onApplyState((ApplyState) message);
} else if(message instanceof SnapshotComplete) {
- currentOperationState.onSnapshotComplete(raftActor);
+ currentOperationState.onSnapshotComplete();
return false;
} else {
return false;
}
}
- private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
+ private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
- if(removeServer.getServerId().equals(raftActor.getLeaderId())){
- // Removing current leader is not supported yet
- // TODO: To properly support current leader removal we need to first implement transfer of leadership
- LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
- sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
- } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
- sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
+ boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
+ if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
+ sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
+ raftActor.getSelf());
} else {
- onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
+ String serverAddress = isSelf ? raftActor.self().path().toString() :
+ raftContext.getPeerAddress(removeServer.getServerId());
+ onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
}
}
- private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+ private boolean onApplyState(ApplyState applyState) {
Payload data = applyState.getReplicatedLogEntry().getData();
if(data instanceof ServerConfigurationPayload) {
- currentOperationState.onApplyState(raftActor, applyState);
+ currentOperationState.onApplyState(applyState);
return true;
}
* <li>Respond to caller with TIMEOUT.</li>
* </ul>
*/
- private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
+ private void onAddServer(AddServer addServer, ActorRef sender) {
LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
- onNewOperation(raftActor, new AddServerContext(addServer, sender));
+ onNewOperation(new AddServerContext(addServer, sender));
}
- private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ private void onNewOperation(ServerOperationContext<?> operationContext) {
if (raftActor.isLeader()) {
- currentOperationState.onNewOperation(raftActor, operationContext);
+ currentOperationState.onNewOperation(operationContext);
} else {
ActorSelection leader = raftActor.getLeader();
if (leader != null) {
* Interface for a server operation FSM state.
*/
private interface OperationState {
- void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
+ void onNewOperation(ServerOperationContext<?> operationContext);
- void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout);
+ void onServerOperationTimeout(ServerOperationTimeout timeout);
- void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
+ void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
- void onApplyState(RaftActor raftActor, ApplyState applyState);
+ void onApplyState(ApplyState applyState);
- void onSnapshotComplete(RaftActor raftActor);
+ void onSnapshotComplete();
}
/**
* Interface for the initial state for a server operation.
*/
private interface InitialOperationState {
- void initiate(RaftActor raftActor);
+ void initiate();
}
/**
*/
private abstract class AbstractOperationState implements OperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
// We're currently processing another operation so queue it to be processed later.
LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
}
@Override
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
LOG.debug("onServerOperationTimeout should not be called in state {}", this);
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
LOG.debug("onApplyState was called in state {}", this);
}
@Override
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
}
- protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
+ protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
raftContext.setDynamicServerConfigurationInUse();
- ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
+
+ boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
+ ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
- currentOperationState = new Persisting(operationContext, newTimer(
- new ServerOperationTimeout(operationContext.getServerId())));
+ currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
- sendReply(raftActor, operationContext, ServerChangeStatus.OK);
+ sendReply(operationContext, ServerChangeStatus.OK);
}
- protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
- @Nullable ServerChangeStatus replyStatus) {
+ protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
if(replyStatus != null) {
- sendReply(raftActor, operationContext, replyStatus);
+ sendReply(operationContext, replyStatus);
}
operationContext.operationComplete(raftActor, replyStatus);
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
if(nextOperation != null) {
- RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
+ RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
}
}
- protected void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
- ServerChangeStatus status) {
+ protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
Cancellable newTimer(Object message) {
return raftContext.getActorSystem().scheduler().scheduleOnce(
- raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(),
- message, raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+ raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
+ raftContext.getActorSystem().dispatcher(), raftContext.getActor());
}
@Override
*/
private class Idle extends AbstractOperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
- operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
+ operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Noop - we override b/c ApplyState is called normally for followers in the idle state.
}
}
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
applyState.getReplicatedLogEntry().getData());
timer.cancel();
- operationComplete(raftActor, operationContext, null);
+ operationComplete(operationContext, null);
}
}
@Override
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
timeout.getServerId());
// Fail any pending operations
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
while(nextOperation != null) {
- sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
nextOperation = pendingOperationsQueue.poll();
}
}
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
if(timedOut) {
- sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
} else {
- super.onNewOperation(raftActor, operationContext);
+ super.onNewOperation(operationContext);
}
}
}
return addServerContext;
}
- Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
+ Cancellable newInstallSnapshotTimer() {
return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
}
- void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
String serverId = timeout.getServerId();
LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
leader.removeFollower(serverId);
}
- operationComplete(raftActor, getAddServerContext(),
- isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+ operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
}
}
}
@Override
- public void initiate(RaftActor raftActor) {
+ public void initiate() {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
AddServer addServer = getAddServerContext().getOperation();
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
- operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
+ operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
return;
}
if(votingState == VotingState.VOTING_NOT_INITIALIZED){
// schedule the install snapshot timeout timer
- Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
+ Cancellable installSnapshotTimer = newInstallSnapshotTimer();
if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
addServer.getNewServerId());
LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
raftContext.getId());
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
}
}
}
}
@Override
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
- handleInstallSnapshotTimeout(raftActor, timeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
timeout.getServerId());
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
String followerId = reply.getFollowerId();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
installSnapshotTimer.cancel();
} else {
}
@Override
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
LOG.debug("{}: onSnapshotComplete", raftContext.getId());
if(!raftActor.isLeader()) {
getAddServerContext().getOperation().getNewServerId());
currentOperationState = new InstallingSnapshot(getAddServerContext(),
- newInstallSnapshotTimer(raftActor));
+ newInstallSnapshotTimer());
snapshotTimer.cancel();
}
}
@Override
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
- handleInstallSnapshotTimeout(raftActor, timeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
raftContext.getId(), timeout.getServerId());
}
@Override
- public void initiate(RaftActor raftActor) {
+ public void initiate() {
raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
- persistNewServerConfiguration(raftActor, getRemoveServerContext());
+ persistNewServerConfiguration(getRemoveServerContext());
}
}