Fix warnings and javadocs in sal-akka-raft
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.UUID;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
24 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
25 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
26 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
27 import org.opendaylight.controller.cluster.raft.messages.AddServer;
28 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
29 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
30 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
31 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
32 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
33 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
34 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
35 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
39 import org.opendaylight.yangtools.concepts.Identifier;
40 import org.opendaylight.yangtools.util.AbstractUUIDIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.concurrent.duration.FiniteDuration;
44
45 /**
46  * Handles server configuration related messages for a RaftActor.
47  *
48  * @author Thomas Pantelis
49  */
50 class RaftActorServerConfigurationSupport {
51     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
52
53     private final OperationState IDLE = new Idle();
54
55     private final RaftActor raftActor;
56
57     private final RaftActorContext raftContext;
58
59     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
60
61     private OperationState currentOperationState = IDLE;
62
63     RaftActorServerConfigurationSupport(RaftActor raftActor) {
64         this.raftActor = raftActor;
65         this.raftContext = raftActor.getRaftActorContext();
66     }
67
68     boolean handleMessage(Object message, ActorRef sender) {
69         if(message instanceof AddServer) {
70             onAddServer((AddServer) message, sender);
71             return true;
72         } else if(message instanceof RemoveServer) {
73             onRemoveServer((RemoveServer) message, sender);
74             return true;
75         } else if(message instanceof ChangeServersVotingStatus) {
76             onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
77             return true;
78         } else if (message instanceof ServerOperationTimeout) {
79             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
80             return true;
81         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
82             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
83             return true;
84         } else if(message instanceof ApplyState) {
85             return onApplyState((ApplyState) message);
86         } else if(message instanceof SnapshotComplete) {
87             currentOperationState.onSnapshotComplete();
88             return false;
89         } else {
90             return false;
91         }
92     }
93
94     void onNewLeader(String leaderId) {
95         currentOperationState.onNewLeader(leaderId);
96     }
97
98     private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
99         LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
100                 currentOperationState);
101
102         // The following check is a special case. Normally we fail an operation if there's no leader.
103         // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
104         // the other a backup such that if the primary cluster is lost, the backup can take over. In this
105         // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
106         // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
107         // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
108         // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
109         // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
110         // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
111         // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
112         // no current leader, we will try to elect a leader using the new server config in order to replicate
113         // the change and progress.
114         boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
115                 getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
116         boolean hasNoLeader = raftActor.getLeaderId() == null;
117         if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
118             currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
119         } else {
120             onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
121         }
122     }
123
124     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
125         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
126         boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
127         if(isSelf && !raftContext.hasFollowers()) {
128             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
129                     raftActor.getSelf());
130         } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
131             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
132                     raftActor.getSelf());
133         } else {
134             String serverAddress = isSelf ? raftActor.self().path().toString() :
135                 raftContext.getPeerAddress(removeServer.getServerId());
136             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
137         }
138     }
139
140     private boolean onApplyState(ApplyState applyState) {
141         Payload data = applyState.getReplicatedLogEntry().getData();
142         if(data instanceof ServerConfigurationPayload) {
143             currentOperationState.onApplyState(applyState);
144             return true;
145         }
146
147         return false;
148     }
149
150     /**
151      * The algorithm for AddServer is as follows:
152      * <ul>
153      * <li>Add the new server as a peer.</li>
154      * <li>Add the new follower to the leader.</li>
155      * <li>If new server should be voting member</li>
156      * <ul>
157      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
158      *     <li>Initiate install snapshot to the new follower.</li>
159      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
160      * </ul>
161      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
162      * <li>On replication consensus, respond to caller with OK.</li>
163      * </ul>
164      * If the install snapshot times out after a period of 2 * election time out
165      * <ul>
166      *     <li>Remove the new server as a peer.</li>
167      *     <li>Remove the new follower from the leader.</li>
168      *     <li>Respond to caller with TIMEOUT.</li>
169      * </ul>
170      */
171     private void onAddServer(AddServer addServer, ActorRef sender) {
172         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
173
174         onNewOperation(new AddServerContext(addServer, sender));
175     }
176
177     private void onNewOperation(ServerOperationContext<?> operationContext) {
178         if (raftActor.isLeader()) {
179             currentOperationState.onNewOperation(operationContext);
180         } else {
181             ActorSelection leader = raftActor.getLeader();
182             if (leader != null) {
183                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
184                 leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
185             } else {
186                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
187                 operationContext.getClientRequestor().tell(operationContext.newReply(
188                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
189             }
190         }
191     }
192
193     /**
194      * Interface for the initial state for a server operation.
195      */
196     private interface InitialOperationState {
197         void initiate();
198     }
199
200     /**
201      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
202      */
203     private abstract class OperationState {
204         void onNewOperation(ServerOperationContext<?> operationContext) {
205             // We're currently processing another operation so queue it to be processed later.
206
207             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
208                     operationContext.getOperation());
209
210             pendingOperationsQueue.add(operationContext);
211         }
212
213         void onServerOperationTimeout(ServerOperationTimeout timeout) {
214             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
215         }
216
217         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
218             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
219         }
220
221         void onApplyState(ApplyState applyState) {
222             LOG.debug("onApplyState was called in state {}", this);
223         }
224
225         void onSnapshotComplete() {
226
227         }
228
229         void onNewLeader(String newLeader) {
230         }
231
232         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
233             raftContext.setDynamicServerConfigurationInUse();
234
235             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
236                     operationContext.includeSelfInNewConfiguration(raftActor));
237             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
238
239             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
240
241             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
242                     operationContext.getLoggingContext())));
243
244             sendReply(operationContext, ServerChangeStatus.OK);
245         }
246
247         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
248             if(replyStatus != null) {
249                 sendReply(operationContext, replyStatus);
250             }
251
252             operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
253
254             changeToIdleState();
255         }
256
257         protected void changeToIdleState() {
258             currentOperationState = IDLE;
259
260             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
261             if(nextOperation != null) {
262                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
263             }
264         }
265
266         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
267             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
268
269             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
270                     raftActor.self());
271         }
272
273         Cancellable newTimer(Object message) {
274             return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
275         }
276
277         Cancellable newTimer(FiniteDuration timeout, Object message) {
278             return raftContext.getActorSystem().scheduler().scheduleOnce(
279                     timeout, raftContext.getActor(), message,
280                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
281         }
282
283         @Override
284         public String toString() {
285             return getClass().getSimpleName();
286         }
287     }
288
289     /**
290      * The state when no server operation is in progress. It immediately initiates new server operations.
291      */
292     private final class Idle extends OperationState {
293         @Override
294         public void onNewOperation(ServerOperationContext<?> operationContext) {
295             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
296         }
297
298         @Override
299         public void onApplyState(ApplyState applyState) {
300             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
301         }
302     }
303
304     /**
305      * The state when a new server configuration is being persisted and replicated.
306      */
307     private final class Persisting extends OperationState {
308         private final ServerOperationContext<?> operationContext;
309         private final Cancellable timer;
310         private boolean timedOut = false;
311
312         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
313             this.operationContext = operationContext;
314             this.timer = timer;
315         }
316
317         @Override
318         public void onApplyState(ApplyState applyState) {
319             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
320             // sure it's meant for us.
321             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
322                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
323                         applyState.getReplicatedLogEntry().getData());
324
325                 timer.cancel();
326                 operationComplete(operationContext, null);
327             }
328         }
329
330         @Override
331         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
332             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
333                     timeout.getLoggingContext());
334
335             timedOut = true;
336
337             // Fail any pending operations
338             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
339             while(nextOperation != null) {
340                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
341                 nextOperation = pendingOperationsQueue.poll();
342             }
343         }
344
345         @Override
346         public void onNewOperation(ServerOperationContext<?> newOperationContext) {
347             if(timedOut) {
348                 sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
349             } else {
350                 super.onNewOperation(newOperationContext);
351             }
352         }
353     }
354
355     /**
356      * Abstract base class for an AddServer operation state.
357      */
358     private abstract class AddServerState extends OperationState {
359         private final AddServerContext addServerContext;
360
361         AddServerState(AddServerContext addServerContext) {
362             this.addServerContext = addServerContext;
363         }
364
365         AddServerContext getAddServerContext() {
366             return addServerContext;
367         }
368
369         Cancellable newInstallSnapshotTimer() {
370             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
371         }
372
373         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
374             String serverId = timeout.getLoggingContext();
375
376             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
377
378             // cleanup
379             raftContext.removePeer(serverId);
380
381             boolean isLeader = raftActor.isLeader();
382             if(isLeader) {
383                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
384                 leader.removeFollower(serverId);
385             }
386
387             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
388         }
389
390     }
391
392     /**
393      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
394      * snapshot capture, if necessary.
395      */
396     private final class InitialAddServerState extends AddServerState implements InitialOperationState {
397         InitialAddServerState(AddServerContext addServerContext) {
398             super(addServerContext);
399         }
400
401         @Override
402         public void initiate() {
403             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
404             AddServer addServer = getAddServerContext().getOperation();
405
406             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
407
408             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
409                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
410                 return;
411             }
412
413             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
414                     VotingState.NON_VOTING;
415             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
416
417             leader.addFollower(addServer.getNewServerId());
418
419             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
420                 // schedule the install snapshot timeout timer
421                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
422                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
423                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
424                             addServer.getNewServerId());
425
426                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
427                 } else {
428                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
429
430                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
431                             installSnapshotTimer);
432                 }
433             } else {
434                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
435                         raftContext.getId());
436
437                 persistNewServerConfiguration(getAddServerContext());
438             }
439         }
440     }
441
442     /**
443      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
444      * reply or timeout.
445      */
446     private final class InstallingSnapshot extends AddServerState {
447         private final Cancellable installSnapshotTimer;
448
449         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
450             super(addServerContext);
451             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
452         }
453
454         @Override
455         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
456             handleInstallSnapshotTimeout(timeout);
457
458             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
459                     timeout.getLoggingContext());
460         }
461
462         @Override
463         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
464             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
465
466             String followerId = reply.getFollowerId();
467
468             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
469             // add server operation that timed out.
470             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
471                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
472                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
473                 leader.updateMinReplicaCount();
474
475                 persistNewServerConfiguration(getAddServerContext());
476
477                 installSnapshotTimer.cancel();
478             } else {
479                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
480                         raftContext.getId(), followerId,
481                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
482             }
483         }
484     }
485
486     /**
487      * The AddServer operation state for when there is a snapshot already in progress. When the current
488      * snapshot completes, it initiates an install snapshot.
489      */
490     private final class WaitingForPriorSnapshotComplete extends AddServerState {
491         private final Cancellable snapshotTimer;
492
493         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
494             super(addServerContext);
495             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
496         }
497
498         @Override
499         public void onSnapshotComplete() {
500             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
501
502             if(!raftActor.isLeader()) {
503                 LOG.debug("{}: No longer the leader", raftContext.getId());
504                 return;
505             }
506
507             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
508             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
509                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
510                         getAddServerContext().getOperation().getNewServerId());
511
512                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
513                         newInstallSnapshotTimer());
514
515                 snapshotTimer.cancel();
516             }
517         }
518
519         @Override
520         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
521             handleInstallSnapshotTimeout(timeout);
522
523             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
524                     raftContext.getId(), timeout.getLoggingContext());
525         }
526     }
527
528     private static final class ServerOperationContextIdentifier extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
529         private static final long serialVersionUID = 1L;
530
531         ServerOperationContextIdentifier() {
532             super(UUID.randomUUID());
533         }
534     }
535
536     /**
537      * Stores context information for a server operation.
538      *
539      * @param <T> the operation type
540      */
541     private abstract static class ServerOperationContext<T> {
542         private final T operation;
543         private final ActorRef clientRequestor;
544         private final Identifier contextId;
545
546         ServerOperationContext(T operation, ActorRef clientRequestor){
547             this.operation = operation;
548             this.clientRequestor = clientRequestor;
549             contextId = new ServerOperationContextIdentifier();
550         }
551
552         Identifier getContextId() {
553             return contextId;
554         }
555
556         T getOperation() {
557             return operation;
558         }
559
560         ActorRef getClientRequestor() {
561             return clientRequestor;
562         }
563
564         void operationComplete(RaftActor raftActor, boolean succeeded) {
565         }
566
567         boolean includeSelfInNewConfiguration(RaftActor raftActor) {
568             return true;
569         }
570
571         abstract Object newReply(ServerChangeStatus status, String leaderId);
572
573         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
574
575         abstract String getLoggingContext();
576     }
577
578     /**
579      * Stores context information for an AddServer operation.
580      */
581     private static class AddServerContext extends ServerOperationContext<AddServer> {
582         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
583             super(addServer, clientRequestor);
584         }
585
586         @Override
587         Object newReply(ServerChangeStatus status, String leaderId) {
588             return new AddServerReply(status, leaderId);
589         }
590
591         @Override
592         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
593             return support.new InitialAddServerState(this);
594         }
595
596         @Override
597         String getLoggingContext() {
598             return getOperation().getNewServerId();
599         }
600     }
601
602     private abstract class RemoveServerState extends OperationState {
603         private final RemoveServerContext removeServerContext;
604
605         protected RemoveServerState(RemoveServerContext removeServerContext) {
606             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
607
608         }
609
610         public RemoveServerContext getRemoveServerContext() {
611             return removeServerContext;
612         }
613     }
614
615     private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
616
617         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
618             super(removeServerContext);
619         }
620
621         @Override
622         public void initiate() {
623             String serverId = getRemoveServerContext().getOperation().getServerId();
624             raftContext.removePeer(serverId);
625             ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
626
627             persistNewServerConfiguration(getRemoveServerContext());
628         }
629     }
630
631     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
632         private final String peerAddress;
633
634         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
635             super(operation, clientRequestor);
636             this.peerAddress = peerAddress;
637         }
638
639         @Override
640         Object newReply(ServerChangeStatus status, String leaderId) {
641             return new RemoveServerReply(status, leaderId);
642         }
643
644         @Override
645         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
646             return support.new InitialRemoveServerState(this);
647         }
648
649         @Override
650         void operationComplete(RaftActor raftActor, boolean succeeded) {
651             if(peerAddress != null) {
652                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
653             }
654         }
655
656         @Override
657         boolean includeSelfInNewConfiguration(RaftActor raftActor) {
658             return !getOperation().getServerId().equals(raftActor.getId());
659         }
660
661         @Override
662         String getLoggingContext() {
663             return getOperation().getServerId();
664         }
665     }
666
667     private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
668         private final boolean tryToElectLeader;
669
670         ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
671                 boolean tryToElectLeader) {
672             super(convertMessage, clientRequestor);
673             this.tryToElectLeader = tryToElectLeader;
674         }
675
676         @Override
677         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
678             return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
679         }
680
681         @Override
682         Object newReply(ServerChangeStatus status, String leaderId) {
683             return new ServerChangeReply(status, leaderId);
684         }
685
686         @Override
687         void operationComplete(final RaftActor raftActor, boolean succeeded) {
688             // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
689             // leadership.
690             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
691                     getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
692             if (succeeded && localServerChangedToNonVoting) {
693                 LOG.debug("Leader changed to non-voting - trying leadership transfer");
694                 raftActor.becomeNonVoting();
695             }
696         }
697
698         @Override
699         String getLoggingContext() {
700             return getOperation().toString();
701         }
702     }
703
704     private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
705         private final ChangeServersVotingStatusContext changeVotingStatusContext;
706         private final boolean tryToElectLeader;
707
708         ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
709                 boolean tryToElectLeader) {
710             this.changeVotingStatusContext = changeVotingStatusContext;
711             this.tryToElectLeader = tryToElectLeader;
712         }
713
714         @Override
715         public void initiate() {
716             LOG.debug("Initiating ChangeServersVotingStatusState");
717
718             if(tryToElectLeader) {
719                 initiateLocalLeaderElection();
720             } else if(updateLocalPeerInfo()) {
721                 persistNewServerConfiguration(changeVotingStatusContext);
722             }
723         }
724
725         private void initiateLocalLeaderElection() {
726             LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
727
728             ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
729             if(!updateLocalPeerInfo()) {
730                 return;
731             }
732
733             raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
734
735             currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
736         }
737
738         private boolean updateLocalPeerInfo() {
739             List<ServerInfo> newServerInfoList = newServerInfoList();
740
741             // Check if new voting state would leave us with no voting members.
742             boolean atLeastOneVoting = false;
743             for(ServerInfo info: newServerInfoList) {
744                 if(info.isVoting()) {
745                     atLeastOneVoting = true;
746                     break;
747                 }
748             }
749
750             if(!atLeastOneVoting) {
751                 operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
752                 return false;
753             }
754
755             raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
756             if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
757                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
758                 leader.updateMinReplicaCount();
759             }
760
761             return true;
762         }
763
764         private List<ServerInfo> newServerInfoList() {
765             Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
766             List<ServerInfo> newServerInfoList = new ArrayList<>();
767             for(String peerId: raftContext.getPeerIds()) {
768                 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
769                         serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
770             }
771
772             newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
773                     raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
774
775             return newServerInfoList;
776         }
777     }
778
779     private class WaitingForLeaderElected extends OperationState {
780         private final ServerConfigurationPayload previousServerConfig;
781         private final ChangeServersVotingStatusContext operationContext;
782         private final Cancellable timer;
783
784         WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
785                 ServerConfigurationPayload previousServerConfig) {
786             this.operationContext = operationContext;
787             this.previousServerConfig = previousServerConfig;
788
789             timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
790                     new ServerOperationTimeout(operationContext.getLoggingContext()));
791         }
792
793         @Override
794         void onNewLeader(String newLeader) {
795             if(newLeader == null) {
796                 return;
797             }
798
799             LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
800
801             timer.cancel();
802
803             if(raftActor.isLeader()) {
804                 persistNewServerConfiguration(operationContext);
805             } else {
806                 // Edge case - some other node became leader so forward the operation.
807                 LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
808
809                 // Revert the local server config change.
810                 raftContext.updatePeerIds(previousServerConfig);
811
812                 changeToIdleState();
813                 RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
814             }
815         }
816
817         @Override
818         void onServerOperationTimeout(ServerOperationTimeout timeout) {
819             LOG.warn("{}: Leader election timed out - cannot apply operation {}",
820                     raftContext.getId(), timeout.getLoggingContext());
821
822             // Revert the local server config change.
823             raftContext.updatePeerIds(previousServerConfig);
824             raftActor.initializeBehavior();
825
826             tryToForwardOperationToAnotherServer();
827         }
828
829         private void tryToForwardOperationToAnotherServer() {
830             Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
831
832             LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
833                     serversVisited);
834
835             serversVisited.add(raftContext.getId());
836
837             // Try to find another whose state is being changed from non-voting to voting and that we haven't
838             // tried yet.
839             Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
840             ActorSelection forwardToPeerActor = null;
841             for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
842                 Boolean isVoting = e.getValue();
843                 String serverId = e.getKey();
844                 PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
845                 if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
846                     ActorSelection actor = raftContext.getPeerActorSelection(serverId);
847                     if(actor != null) {
848                         forwardToPeerActor = actor;
849                         break;
850                     }
851                 }
852             }
853
854             if(forwardToPeerActor != null) {
855                 LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
856
857                 forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
858                         operationContext.getClientRequestor());
859                 changeToIdleState();
860             } else {
861                 operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
862             }
863         }
864     }
865
866     static class ServerOperationTimeout {
867         private final String loggingContext;
868
869         ServerOperationTimeout(String loggingContext){
870            this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
871         }
872
873         String getLoggingContext() {
874             return loggingContext;
875         }
876     }
877 }