Remove DelegatingRaftActorBehavior
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.UUID;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
26 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
27 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
28 import org.opendaylight.controller.cluster.raft.messages.AddServer;
29 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
30 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
31 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
32 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
33 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
34 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
35 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
36 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.duration.FiniteDuration;
41
42 /**
43  * Handles server configuration related messages for a RaftActor.
44  *
45  * @author Thomas Pantelis
46  */
47 class RaftActorServerConfigurationSupport {
48     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
49
50     private final OperationState IDLE = new Idle();
51
52     private final RaftActor raftActor;
53
54     private final RaftActorContext raftContext;
55
56     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
57
58     private OperationState currentOperationState = IDLE;
59
60     RaftActorServerConfigurationSupport(RaftActor raftActor) {
61         this.raftActor = raftActor;
62         this.raftContext = raftActor.getRaftActorContext();
63     }
64
65     boolean handleMessage(Object message, ActorRef sender) {
66         if(message instanceof AddServer) {
67             onAddServer((AddServer) message, sender);
68             return true;
69         } else if(message instanceof RemoveServer) {
70             onRemoveServer((RemoveServer) message, sender);
71             return true;
72         } else if(message instanceof ChangeServersVotingStatus) {
73             onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
74             return true;
75         } else if (message instanceof ServerOperationTimeout) {
76             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
77             return true;
78         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
79             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
80             return true;
81         } else if(message instanceof ApplyState) {
82             return onApplyState((ApplyState) message);
83         } else if(message instanceof SnapshotComplete) {
84             currentOperationState.onSnapshotComplete();
85             return false;
86         } else {
87             return false;
88         }
89     }
90
91     void onNewLeader(String leaderId) {
92         currentOperationState.onNewLeader(leaderId);
93     }
94
95     private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
96         LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
97                 currentOperationState);
98
99         // The following check is a special case. Normally we fail an operation if there's no leader.
100         // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
101         // the other a backup such that if the primary cluster is lost, the backup can take over. In this
102         // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
103         // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
104         // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
105         // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
106         // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
107         // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
108         // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
109         // no current leader, we will try to elect a leader using the new server config in order to replicate
110         // the change and progress.
111         boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
112                 getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
113         boolean hasNoLeader = raftActor.getLeaderId() == null;
114         if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
115             currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
116         } else {
117             onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
118         }
119     }
120
121     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
122         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
123         boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
124         if(isSelf && !raftContext.hasFollowers()) {
125             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
126                     raftActor.getSelf());
127         } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
128             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
129                     raftActor.getSelf());
130         } else {
131             String serverAddress = isSelf ? raftActor.self().path().toString() :
132                 raftContext.getPeerAddress(removeServer.getServerId());
133             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
134         }
135     }
136
137     private boolean onApplyState(ApplyState applyState) {
138         Payload data = applyState.getReplicatedLogEntry().getData();
139         if(data instanceof ServerConfigurationPayload) {
140             currentOperationState.onApplyState(applyState);
141             return true;
142         }
143
144         return false;
145     }
146
147     /**
148      * The algorithm for AddServer is as follows:
149      * <ul>
150      * <li>Add the new server as a peer.</li>
151      * <li>Add the new follower to the leader.</li>
152      * <li>If new server should be voting member</li>
153      * <ul>
154      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
155      *     <li>Initiate install snapshot to the new follower.</li>
156      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
157      * </ul>
158      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
159      * <li>On replication consensus, respond to caller with OK.</li>
160      * </ul>
161      * If the install snapshot times out after a period of 2 * election time out
162      * <ul>
163      *     <li>Remove the new server as a peer.</li>
164      *     <li>Remove the new follower from the leader.</li>
165      *     <li>Respond to caller with TIMEOUT.</li>
166      * </ul>
167      */
168     private void onAddServer(AddServer addServer, ActorRef sender) {
169         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
170
171         onNewOperation(new AddServerContext(addServer, sender));
172     }
173
174     private void onNewOperation(ServerOperationContext<?> operationContext) {
175         if (raftActor.isLeader()) {
176             currentOperationState.onNewOperation(operationContext);
177         } else {
178             ActorSelection leader = raftActor.getLeader();
179             if (leader != null) {
180                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
181                 leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
182             } else {
183                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
184                 operationContext.getClientRequestor().tell(operationContext.newReply(
185                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
186             }
187         }
188     }
189
190     /**
191      * Interface for the initial state for a server operation.
192      */
193     private interface InitialOperationState {
194         void initiate();
195     }
196
197     /**
198      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
199      */
200     private abstract class OperationState {
201         void onNewOperation(ServerOperationContext<?> operationContext) {
202             // We're currently processing another operation so queue it to be processed later.
203
204             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
205                     operationContext.getOperation());
206
207             pendingOperationsQueue.add(operationContext);
208         }
209
210         void onServerOperationTimeout(ServerOperationTimeout timeout) {
211             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
212         }
213
214         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
215             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
216         }
217
218         void onApplyState(ApplyState applyState) {
219             LOG.debug("onApplyState was called in state {}", this);
220         }
221
222         void onSnapshotComplete() {
223
224         }
225
226         void onNewLeader(String newLeader) {
227         }
228
229         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
230             raftContext.setDynamicServerConfigurationInUse();
231
232             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
233                     operationContext.includeSelfInNewConfiguration(raftActor));
234             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
235
236             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
237
238             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
239                     operationContext.getLoggingContext())));
240
241             sendReply(operationContext, ServerChangeStatus.OK);
242         }
243
244         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
245             if(replyStatus != null) {
246                 sendReply(operationContext, replyStatus);
247             }
248
249             operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
250
251             changeToIdleState();
252         }
253
254         protected void changeToIdleState() {
255             currentOperationState = IDLE;
256
257             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
258             if(nextOperation != null) {
259                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
260             }
261         }
262
263         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
264             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
265
266             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
267                     raftActor.self());
268         }
269
270         Cancellable newTimer(Object message) {
271             return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
272         }
273
274         Cancellable newTimer(FiniteDuration timeout, Object message) {
275             return raftContext.getActorSystem().scheduler().scheduleOnce(
276                     timeout, raftContext.getActor(), message,
277                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
278         }
279
280         @Override
281         public String toString() {
282             return getClass().getSimpleName();
283         }
284     }
285
286     /**
287      * The state when no server operation is in progress. It immediately initiates new server operations.
288      */
289     private final class Idle extends OperationState {
290         @Override
291         public void onNewOperation(ServerOperationContext<?> operationContext) {
292             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
293         }
294
295         @Override
296         public void onApplyState(ApplyState applyState) {
297             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
298         }
299     }
300
301     /**
302      * The state when a new server configuration is being persisted and replicated.
303      */
304     private final class Persisting extends OperationState {
305         private final ServerOperationContext<?> operationContext;
306         private final Cancellable timer;
307         private boolean timedOut = false;
308
309         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
310             this.operationContext = operationContext;
311             this.timer = timer;
312         }
313
314         @Override
315         public void onApplyState(ApplyState applyState) {
316             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
317             // sure it's meant for us.
318             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
319                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
320                         applyState.getReplicatedLogEntry().getData());
321
322                 timer.cancel();
323                 operationComplete(operationContext, null);
324             }
325         }
326
327         @Override
328         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
329             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
330                     timeout.getLoggingContext());
331
332             timedOut = true;
333
334             // Fail any pending operations
335             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
336             while(nextOperation != null) {
337                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
338                 nextOperation = pendingOperationsQueue.poll();
339             }
340         }
341
342         @Override
343         public void onNewOperation(ServerOperationContext<?> operationContext) {
344             if(timedOut) {
345                 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
346             } else {
347                 super.onNewOperation(operationContext);
348             }
349         }
350     }
351
352     /**
353      * Abstract base class for an AddServer operation state.
354      */
355     private abstract class AddServerState extends OperationState {
356         private final AddServerContext addServerContext;
357
358         AddServerState(AddServerContext addServerContext) {
359             this.addServerContext = addServerContext;
360         }
361
362         AddServerContext getAddServerContext() {
363             return addServerContext;
364         }
365
366         Cancellable newInstallSnapshotTimer() {
367             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
368         }
369
370         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
371             String serverId = timeout.getLoggingContext();
372
373             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
374
375             // cleanup
376             raftContext.removePeer(serverId);
377
378             boolean isLeader = raftActor.isLeader();
379             if(isLeader) {
380                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
381                 leader.removeFollower(serverId);
382             }
383
384             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
385         }
386
387     }
388
389     /**
390      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
391      * snapshot capture, if necessary.
392      */
393     private final class InitialAddServerState extends AddServerState implements InitialOperationState {
394         InitialAddServerState(AddServerContext addServerContext) {
395             super(addServerContext);
396         }
397
398         @Override
399         public void initiate() {
400             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
401             AddServer addServer = getAddServerContext().getOperation();
402
403             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
404
405             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
406                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
407                 return;
408             }
409
410             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
411                     VotingState.NON_VOTING;
412             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
413
414             leader.addFollower(addServer.getNewServerId());
415
416             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
417                 // schedule the install snapshot timeout timer
418                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
419                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
420                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
421                             addServer.getNewServerId());
422
423                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
424                 } else {
425                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
426
427                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
428                             installSnapshotTimer);
429                 }
430             } else {
431                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
432                         raftContext.getId());
433
434                 persistNewServerConfiguration(getAddServerContext());
435             }
436         }
437     }
438
439     /**
440      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
441      * reply or timeout.
442      */
443     private final class InstallingSnapshot extends AddServerState {
444         private final Cancellable installSnapshotTimer;
445
446         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
447             super(addServerContext);
448             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
449         }
450
451         @Override
452         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
453             handleInstallSnapshotTimeout(timeout);
454
455             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
456                     timeout.getLoggingContext());
457         }
458
459         @Override
460         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
461             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
462
463             String followerId = reply.getFollowerId();
464
465             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
466             // add server operation that timed out.
467             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
468                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
469                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
470                 leader.updateMinReplicaCount();
471
472                 persistNewServerConfiguration(getAddServerContext());
473
474                 installSnapshotTimer.cancel();
475             } else {
476                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
477                         raftContext.getId(), followerId,
478                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
479             }
480         }
481     }
482
483     /**
484      * The AddServer operation state for when there is a snapshot already in progress. When the current
485      * snapshot completes, it initiates an install snapshot.
486      */
487     private final class WaitingForPriorSnapshotComplete extends AddServerState {
488         private final Cancellable snapshotTimer;
489
490         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
491             super(addServerContext);
492             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
493         }
494
495         @Override
496         public void onSnapshotComplete() {
497             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
498
499             if(!raftActor.isLeader()) {
500                 LOG.debug("{}: No longer the leader", raftContext.getId());
501                 return;
502             }
503
504             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
505             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
506                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
507                         getAddServerContext().getOperation().getNewServerId());
508
509                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
510                         newInstallSnapshotTimer());
511
512                 snapshotTimer.cancel();
513             }
514         }
515
516         @Override
517         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
518             handleInstallSnapshotTimeout(timeout);
519
520             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
521                     raftContext.getId(), timeout.getLoggingContext());
522         }
523     }
524
525     /**
526      * Stores context information for a server operation.
527      *
528      * @param <T> the operation type
529      */
530     private static abstract class ServerOperationContext<T> {
531         private final T operation;
532         private final ActorRef clientRequestor;
533         private final String contextId;
534
535         ServerOperationContext(T operation, ActorRef clientRequestor){
536             this.operation = operation;
537             this.clientRequestor = clientRequestor;
538             contextId = UUID.randomUUID().toString();
539         }
540
541         String getContextId() {
542             return contextId;
543         }
544
545         T getOperation() {
546             return operation;
547         }
548
549         ActorRef getClientRequestor() {
550             return clientRequestor;
551         }
552
553         void operationComplete(RaftActor raftActor, boolean succeeded) {
554         }
555
556         boolean includeSelfInNewConfiguration(RaftActor raftActor) {
557             return true;
558         }
559
560         abstract Object newReply(ServerChangeStatus status, String leaderId);
561
562         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
563
564         abstract String getLoggingContext();
565     }
566
567     /**
568      * Stores context information for an AddServer operation.
569      */
570     private static class AddServerContext extends ServerOperationContext<AddServer> {
571         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
572             super(addServer, clientRequestor);
573         }
574
575         @Override
576         Object newReply(ServerChangeStatus status, String leaderId) {
577             return new AddServerReply(status, leaderId);
578         }
579
580         @Override
581         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
582             return support.new InitialAddServerState(this);
583         }
584
585         @Override
586         String getLoggingContext() {
587             return getOperation().getNewServerId();
588         }
589     }
590
591     private abstract class RemoveServerState extends OperationState {
592         private final RemoveServerContext removeServerContext;
593
594         protected RemoveServerState(RemoveServerContext removeServerContext) {
595             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
596
597         }
598
599         public RemoveServerContext getRemoveServerContext() {
600             return removeServerContext;
601         }
602     }
603
604     private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
605
606         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
607             super(removeServerContext);
608         }
609
610         @Override
611         public void initiate() {
612             String serverId = getRemoveServerContext().getOperation().getServerId();
613             raftContext.removePeer(serverId);
614             ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
615
616             persistNewServerConfiguration(getRemoveServerContext());
617         }
618     }
619
620     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
621         private final String peerAddress;
622
623         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
624             super(operation, clientRequestor);
625             this.peerAddress = peerAddress;
626         }
627
628         @Override
629         Object newReply(ServerChangeStatus status, String leaderId) {
630             return new RemoveServerReply(status, leaderId);
631         }
632
633         @Override
634         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
635             return support.new InitialRemoveServerState(this);
636         }
637
638         @Override
639         void operationComplete(RaftActor raftActor, boolean succeeded) {
640             if(peerAddress != null) {
641                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
642             }
643         }
644
645         @Override
646         boolean includeSelfInNewConfiguration(RaftActor raftActor) {
647             return !getOperation().getServerId().equals(raftActor.getId());
648         }
649
650         @Override
651         String getLoggingContext() {
652             return getOperation().getServerId();
653         }
654     }
655
656     private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
657         private final boolean tryToElectLeader;
658
659         ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
660                 boolean tryToElectLeader) {
661             super(convertMessage, clientRequestor);
662             this.tryToElectLeader = tryToElectLeader;
663         }
664
665         @Override
666         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
667             return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
668         }
669
670         @Override
671         Object newReply(ServerChangeStatus status, String leaderId) {
672             return new ServerChangeReply(status, leaderId);
673         }
674
675         @Override
676         void operationComplete(final RaftActor raftActor, boolean succeeded) {
677             // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
678             // leadership.
679             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
680                     getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
681             if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
682                 raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
683                     @Override
684                     public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
685                         LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
686                         ensureFollowerState(raftActor);
687                     }
688
689                     @Override
690                     public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
691                         LOG.debug("{}: leader transfer failed after change to non-voting", raftActor.persistenceId());
692                         ensureFollowerState(raftActor);
693                     }
694
695                     private void ensureFollowerState(RaftActor raftActor) {
696                         // Whether or not leadership transfer succeeded, we have to step down as leader and
697                         // switch to Follower so ensure that.
698                         if(raftActor.getRaftState() != RaftState.Follower) {
699                             raftActor.initializeBehavior();
700                         }
701                     }
702                 });
703             }
704         }
705
706         @Override
707         String getLoggingContext() {
708             return getOperation().toString();
709         }
710     }
711
712     private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
713         private final ChangeServersVotingStatusContext changeVotingStatusContext;
714         private final boolean tryToElectLeader;
715
716         ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
717                 boolean tryToElectLeader) {
718             this.changeVotingStatusContext = changeVotingStatusContext;
719             this.tryToElectLeader = tryToElectLeader;
720         }
721
722         @Override
723         public void initiate() {
724             LOG.debug("Initiating ChangeServersVotingStatusState");
725
726             if(tryToElectLeader) {
727                 initiateLocalLeaderElection();
728             } else {
729                 updateLocalPeerInfo();
730
731                 persistNewServerConfiguration(changeVotingStatusContext);
732             }
733         }
734
735         private void initiateLocalLeaderElection() {
736             LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
737
738             ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
739             updateLocalPeerInfo();
740
741             raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
742
743             currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
744         }
745
746         private void updateLocalPeerInfo() {
747             List<ServerInfo> newServerInfoList = newServerInfoList();
748
749             raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
750             if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
751                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
752                 leader.updateMinReplicaCount();
753             }
754         }
755
756         private List<ServerInfo> newServerInfoList() {
757             Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
758             List<ServerInfo> newServerInfoList = new ArrayList<>();
759             for(String peerId: raftContext.getPeerIds()) {
760                 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
761                         serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
762             }
763
764             newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
765                     raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
766
767             return newServerInfoList;
768         }
769     }
770
771     private class WaitingForLeaderElected extends OperationState {
772         private final ServerConfigurationPayload previousServerConfig;
773         private final ChangeServersVotingStatusContext operationContext;
774         private final Cancellable timer;
775
776         WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
777                 ServerConfigurationPayload previousServerConfig) {
778             this.operationContext = operationContext;
779             this.previousServerConfig = previousServerConfig;
780
781             timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
782                     new ServerOperationTimeout(operationContext.getLoggingContext()));
783         }
784
785         @Override
786         void onNewLeader(String newLeader) {
787             LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
788
789             timer.cancel();
790
791             if(raftActor.isLeader()) {
792                 persistNewServerConfiguration(operationContext);
793             } else {
794                 // Edge case - some other node became leader so forward the operation.
795                 LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
796
797                 // Revert the local server config change.
798                 raftContext.updatePeerIds(previousServerConfig);
799
800                 changeToIdleState();
801                 RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
802             }
803         }
804
805         @Override
806         void onServerOperationTimeout(ServerOperationTimeout timeout) {
807             LOG.warn("{}: Leader election timed out - cannot apply operation {}",
808                     raftContext.getId(), timeout.getLoggingContext());
809
810             // Revert the local server config change.
811             raftContext.updatePeerIds(previousServerConfig);
812             raftActor.initializeBehavior();
813
814             tryToForwardOperationToAnotherServer();
815         }
816
817         private void tryToForwardOperationToAnotherServer() {
818             Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
819
820             LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
821                     serversVisited);
822
823             serversVisited.add(raftContext.getId());
824
825             // Try to find another whose state is being changed from non-voting to voting and that we haven't
826             // tried yet.
827             Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
828             ActorSelection forwardToPeerActor = null;
829             for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
830                 Boolean isVoting = e.getValue();
831                 String serverId = e.getKey();
832                 PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
833                 if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
834                     ActorSelection actor = raftContext.getPeerActorSelection(serverId);
835                     if(actor != null) {
836                         forwardToPeerActor = actor;
837                         break;
838                     }
839                 }
840             }
841
842             if(forwardToPeerActor != null) {
843                 LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
844
845                 forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
846                         operationContext.getClientRequestor());
847                 changeToIdleState();
848             } else {
849                 operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
850             }
851         }
852     }
853
854     static class ServerOperationTimeout {
855         private final String loggingContext;
856
857         ServerOperationTimeout(String loggingContext){
858            this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
859         }
860
861         String getLoggingContext() {
862             return loggingContext;
863         }
864     }
865 }