+ private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
+ private final boolean tryToElectLeader;
+
+ ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
+ boolean tryToElectLeader) {
+ super(convertMessage, clientRequestor);
+ this.tryToElectLeader = tryToElectLeader;
+ }
+
+ @Override
+ InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+ return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
+ }
+
+ @Override
+ Object newReply(ServerChangeStatus status, String leaderId) {
+ return new ServerChangeReply(status, leaderId);
+ }
+
+ @Override
+ void operationComplete(final RaftActor raftActor, boolean succeeded) {
+ // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
+ // leadership.
+ boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
+ getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+ if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
+ raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
+ ensureFollowerState(raftActor);
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer failed after change to non-voting", raftActor.persistenceId());
+ ensureFollowerState(raftActor);
+ }
+
+ private void ensureFollowerState(RaftActor raftActor) {
+ // Whether or not leadership transfer succeeded, we have to step down as leader and
+ // switch to Follower so ensure that.
+ if(raftActor.getRaftState() != RaftState.Follower) {
+ raftActor.initializeBehavior();
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ String getLoggingContext() {
+ return getOperation().toString();
+ }
+ }
+
+ private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
+ private final ChangeServersVotingStatusContext changeVotingStatusContext;
+ private final boolean tryToElectLeader;
+
+ ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
+ boolean tryToElectLeader) {
+ this.changeVotingStatusContext = changeVotingStatusContext;
+ this.tryToElectLeader = tryToElectLeader;
+ }
+
+ @Override
+ public void initiate() {
+ LOG.debug("Initiating ChangeServersVotingStatusState");
+
+ if(tryToElectLeader) {
+ initiateLocalLeaderElection();
+ } else {
+ updateLocalPeerInfo();
+
+ persistNewServerConfiguration(changeVotingStatusContext);
+ }
+ }
+
+ private void initiateLocalLeaderElection() {
+ LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
+
+ ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
+ updateLocalPeerInfo();
+
+ raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
+
+ currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
+ }
+
+ private void updateLocalPeerInfo() {
+ List<ServerInfo> newServerInfoList = newServerInfoList();
+
+ raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
+ if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ leader.updateMinReplicaCount();
+ }
+ }
+
+ private List<ServerInfo> newServerInfoList() {
+ Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
+ List<ServerInfo> newServerInfoList = new ArrayList<>();
+ for(String peerId: raftContext.getPeerIds()) {
+ newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
+ serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
+ }
+
+ newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
+ raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
+
+ return newServerInfoList;
+ }
+ }
+
+ private class WaitingForLeaderElected extends OperationState {
+ private final ServerConfigurationPayload previousServerConfig;
+ private final ChangeServersVotingStatusContext operationContext;
+ private final Cancellable timer;
+
+ WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
+ ServerConfigurationPayload previousServerConfig) {
+ this.operationContext = operationContext;
+ this.previousServerConfig = previousServerConfig;
+
+ timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
+ new ServerOperationTimeout(operationContext.getLoggingContext()));
+ }
+
+ @Override
+ void onNewLeader(String newLeader) {
+ LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
+
+ timer.cancel();
+
+ if(raftActor.isLeader()) {
+ persistNewServerConfiguration(operationContext);
+ } else {
+ // Edge case - some other node became leader so forward the operation.
+ LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+
+ changeToIdleState();
+ RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
+ }
+ }
+
+ @Override
+ void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ LOG.warn("{}: Leader election timed out - cannot apply operation {}",
+ raftContext.getId(), timeout.getLoggingContext());
+
+ // Revert the local server config change.
+ raftContext.updatePeerIds(previousServerConfig);
+ raftActor.initializeBehavior();
+
+ tryToForwardOperationToAnotherServer();
+ }
+
+ private void tryToForwardOperationToAnotherServer() {
+ Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
+
+ LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
+ serversVisited);
+
+ serversVisited.add(raftContext.getId());
+
+ // Try to find another whose state is being changed from non-voting to voting and that we haven't
+ // tried yet.
+ Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
+ ActorSelection forwardToPeerActor = null;
+ for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
+ Boolean isVoting = e.getValue();
+ String serverId = e.getKey();
+ PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
+ if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
+ ActorSelection actor = raftContext.getPeerActorSelection(serverId);
+ if(actor != null) {
+ forwardToPeerActor = actor;
+ break;
+ }
+ }
+ }
+
+ if(forwardToPeerActor != null) {
+ LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
+
+ forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
+ operationContext.getClientRequestor());
+ changeToIdleState();
+ } else {
+ operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
+ }
+ }
+ }
+