+
+ @Override
+ String getLoggingContext() {
+ return getOperation().getNewServerId();
+ }
+ }
+
+ private abstract class RemoveServerState extends OperationState {
+ private final RemoveServerContext removeServerContext;
+
+ protected RemoveServerState(RemoveServerContext removeServerContext) {
+ this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
+
+ }
+
+ public RemoveServerContext getRemoveServerContext() {
+ return removeServerContext;
+ }
+ }
+
+ private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState {
+
+ protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
+ super(removeServerContext);
+ }
+
+ @Override
+ public void initiate() {
+ String serverId = getRemoveServerContext().getOperation().getServerId();
+ raftContext.removePeer(serverId);
+ AbstractLeader leader = (AbstractLeader)raftActor.getCurrentBehavior();
+ leader.removeFollower(serverId);
+ leader.updateMinReplicaCount();
+
+ persistNewServerConfiguration(getRemoveServerContext());
+ }
+ }
+
+ private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
+ private final String peerAddress;
+
+ RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
+ super(operation, clientRequestor);
+ this.peerAddress = peerAddress;
+ }
+
+ @Override
+ Object newReply(ServerChangeStatus status, String leaderId) {
+ return new RemoveServerReply(status, leaderId);
+ }
+
+ @Override
+ InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+ return support.new InitialRemoveServerState(this);
+ }
+
+ @Override
+ void operationComplete(RaftActor raftActor, boolean succeeded) {
+ if (peerAddress != null) {
+ raftActor.context().actorSelection(peerAddress).tell(
+ new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
+ }
+ }
+
+ @Override
+ boolean includeSelfInNewConfiguration(RaftActor raftActor) {
+ return !getOperation().getServerId().equals(raftActor.getId());
+ }
+
+ @Override
+ String getLoggingContext() {
+ return getOperation().getServerId();
+ }
+ }
+
+ private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
+ private final boolean tryToElectLeader;
+
+ ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
+ boolean tryToElectLeader) {
+ super(convertMessage, clientRequestor);
+ this.tryToElectLeader = tryToElectLeader;
+ }
+
+ @Override
+ InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+ return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
+ }
+
+ @Override
+ Object newReply(ServerChangeStatus status, String leaderId) {
+ return new ServerChangeReply(status, leaderId);
+ }
+
+ @Override
+ void operationComplete(final RaftActor raftActor, boolean succeeded) {
+ // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
+ // leadership.
+ boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation()
+ .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+ if (succeeded && localServerChangedToNonVoting) {
+ LOG.debug("Leader changed to non-voting - trying leadership transfer");
+ raftActor.becomeNonVoting();
+ }
+ }
+
+ @Override
+ String getLoggingContext() {
+ return getOperation().toString();
+ }
+ }
+
+ private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
+ private final ChangeServersVotingStatusContext changeVotingStatusContext;
+ private final boolean tryToElectLeader;
+
+ ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
+ boolean tryToElectLeader) {
+ this.changeVotingStatusContext = changeVotingStatusContext;
+ this.tryToElectLeader = tryToElectLeader;
+ }
+
+ @Override
+ public void initiate() {
+ LOG.debug("Initiating ChangeServersVotingStatusState");
+
+ if (tryToElectLeader) {
+ initiateLocalLeaderElection();
+ } else if (updateLocalPeerInfo()) {
+ persistNewServerConfiguration(changeVotingStatusContext);
+ }
+ }
+
+ private void initiateLocalLeaderElection() {
+ LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
+
+ ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
+ if (!updateLocalPeerInfo()) {
+ return;
+ }
+
+ raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
+
+ currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
+ }
+
+ private boolean updateLocalPeerInfo() {
+ List<ServerInfo> newServerInfoList = newServerInfoList();
+
+ // Check if new voting state would leave us with no voting members.
+ boolean atLeastOneVoting = false;
+ for (ServerInfo info: newServerInfoList) {
+ if (info.isVoting()) {
+ atLeastOneVoting = true;
+ break;
+ }
+ }
+
+ if (!atLeastOneVoting) {
+ operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
+ return false;
+ }
+
+ raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
+ if (raftActor.getCurrentBehavior() instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ leader.updateMinReplicaCount();
+ }
+
+ return true;
+ }
+
+ private List<ServerInfo> newServerInfoList() {
+ Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation()
+ .getServerVotingStatusMap();
+ List<ServerInfo> newServerInfoList = new ArrayList<>();
+ for (String peerId: raftContext.getPeerIds()) {
+ newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId)
+ ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
+ }
+
+ newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
+ raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId())
+ : raftContext.isVotingMember()));
+
+ return newServerInfoList;
+ }
+ }
+
+ private class WaitingForLeaderElected extends OperationState {
+ private final ServerConfigurationPayload previousServerConfig;
+ private final ChangeServersVotingStatusContext operationContext;
+ private final Cancellable timer;
+
+ WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
+ ServerConfigurationPayload previousServerConfig) {
+ this.operationContext = operationContext;
+ this.previousServerConfig = previousServerConfig;
+
+ timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
+ new ServerOperationTimeout(operationContext.getLoggingContext()));
+ }
+
+ @Override
+ void onNewLeader(String newLeader) {
+ if (newLeader == null) {
+ return;
+ }
+
+ LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
+
+ timer.cancel();
+
+ if (raftActor.isLeader()) {
+ persistNewServerConfiguration(operationContext);
+ } else {
+ // Edge case - some other node became leader so forward the operation.
+ LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+
+ changeToIdleState();
+ RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
+ }
+ }
+
+ @Override
+ void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ LOG.warn("{}: Leader election timed out - cannot apply operation {}",
+ raftContext.getId(), timeout.getLoggingContext());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+ raftActor.initializeBehavior();
+
+ tryToForwardOperationToAnotherServer();
+ }
+
+ private void tryToForwardOperationToAnotherServer() {
+ Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
+
+ LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
+ serversVisited);
+
+ serversVisited.add(raftContext.getId());
+
+ // Try to find another whose state is being changed from non-voting to voting and that we haven't
+ // tried yet.
+ Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
+ ActorSelection forwardToPeerActor = null;
+ for (Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
+ Boolean isVoting = e.getValue();
+ String serverId = e.getKey();
+ PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
+ if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
+ ActorSelection actor = raftContext.getPeerActorSelection(serverId);
+ if (actor != null) {
+ forwardToPeerActor = actor;
+ break;
+ }
+ }
+ }
+
+ if (forwardToPeerActor != null) {
+ LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
+
+ forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
+ operationContext.getClientRequestor());
+ changeToIdleState();
+ } else {
+ operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
+ }
+ }
+ }
+
+ static class ServerOperationTimeout {
+ private final String loggingContext;
+
+ ServerOperationTimeout(String loggingContext) {
+ this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
+ }
+
+ String getLoggingContext() {
+ return loggingContext;
+ }