29641cb00e441d5dedc5e7cccccd69bb0b0e6d3a
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Cancellable;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.UUID;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
26 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
27 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
28 import org.opendaylight.controller.cluster.raft.messages.AddServer;
29 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
30 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
31 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
32 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
33 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
34 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
35 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
36 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
40 import org.opendaylight.yangtools.concepts.Identifier;
41 import org.opendaylight.yangtools.util.AbstractUUIDIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import scala.concurrent.duration.FiniteDuration;
45
46 /**
47  * Handles server configuration related messages for a RaftActor.
48  *
49  * @author Thomas Pantelis
50  */
51 class RaftActorServerConfigurationSupport {
52     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
53
54     @SuppressWarnings("checkstyle:MemberName")
55     private final OperationState IDLE = new Idle();
56
57     private final RaftActor raftActor;
58
59     private final RaftActorContext raftContext;
60
61     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
62
63     private OperationState currentOperationState = IDLE;
64
65     RaftActorServerConfigurationSupport(final RaftActor raftActor) {
66         this.raftActor = raftActor;
67         this.raftContext = raftActor.getRaftActorContext();
68     }
69
70     boolean handleMessage(final Object message, final ActorRef sender) {
71         if (message instanceof AddServer) {
72             onAddServer((AddServer) message, sender);
73             return true;
74         } else if (message instanceof RemoveServer) {
75             onRemoveServer((RemoveServer) message, sender);
76             return true;
77         } else if (message instanceof ChangeServersVotingStatus) {
78             onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
79             return true;
80         } else if (message instanceof ServerOperationTimeout) {
81             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
82             return true;
83         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
84             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
85             return true;
86         } else if (message instanceof ApplyState) {
87             return onApplyState((ApplyState) message);
88         } else if (message instanceof SnapshotComplete) {
89             currentOperationState.onSnapshotComplete();
90             return false;
91         } else {
92             return false;
93         }
94     }
95
96     void onNewLeader(final String leaderId) {
97         currentOperationState.onNewLeader(leaderId);
98     }
99
100     private void onChangeServersVotingStatus(final ChangeServersVotingStatus message, final ActorRef sender) {
101         LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
102                 currentOperationState);
103
104         // The following check is a special case. Normally we fail an operation if there's no leader.
105         // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
106         // the other a backup such that if the primary cluster is lost, the backup can take over. In this
107         // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
108         // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
109         // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
110         // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
111         // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
112         // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
113         // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
114         // no current leader, we will try to elect a leader using the new server config in order to replicate
115         // the change and progress.
116         boolean localServerChangingToVoting = Boolean.TRUE.equals(message
117                 .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
118         boolean hasNoLeader = raftActor.getLeaderId() == null;
119         if (localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
120             currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
121         } else {
122             onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
123         }
124     }
125
126     private void onRemoveServer(final RemoveServer removeServer, final ActorRef sender) {
127         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
128         boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
129         if (isSelf && !raftContext.hasFollowers()) {
130             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
131                     raftActor.getSelf());
132         } else if (!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
133             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
134                     raftActor.getSelf());
135         } else {
136             String serverAddress = isSelf ? raftActor.self().path().toString() :
137                 raftContext.getPeerAddress(removeServer.getServerId());
138             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
139         }
140     }
141
142     private boolean onApplyState(final ApplyState applyState) {
143         Payload data = applyState.getReplicatedLogEntry().getData();
144         if (data instanceof ServerConfigurationPayload) {
145             currentOperationState.onApplyState(applyState);
146             return true;
147         }
148
149         return false;
150     }
151
152     /**
153      * Add a server. The algorithm for AddServer is as follows:
154      * <ul>
155      * <li>Add the new server as a peer.</li>
156      * <li>Add the new follower to the leader.</li>
157      * <li>If new server should be voting member</li>
158      * <ul>
159      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
160      *     <li>Initiate install snapshot to the new follower.</li>
161      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
162      * </ul>
163      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
164      * <li>On replication consensus, respond to caller with OK.</li>
165      * </ul>
166      * If the install snapshot times out after a period of 2 * election time out
167      * <ul>
168      *     <li>Remove the new server as a peer.</li>
169      *     <li>Remove the new follower from the leader.</li>
170      *     <li>Respond to caller with TIMEOUT.</li>
171      * </ul>
172      */
173     private void onAddServer(final AddServer addServer, final ActorRef sender) {
174         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
175
176         onNewOperation(new AddServerContext(addServer, sender));
177     }
178
179     private void onNewOperation(final ServerOperationContext<?> operationContext) {
180         if (raftActor.isLeader()) {
181             currentOperationState.onNewOperation(operationContext);
182         } else {
183             ActorSelection leader = raftActor.getLeader();
184             if (leader != null) {
185                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
186                 leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
187             } else {
188                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
189                 operationContext.getClientRequestor().tell(operationContext.newReply(
190                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
191             }
192         }
193     }
194
195     /**
196      * Interface for the initial state for a server operation.
197      */
198     private interface InitialOperationState {
199         void initiate();
200     }
201
202     /**
203      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
204      */
205     private abstract class OperationState {
206         void onNewOperation(final ServerOperationContext<?> operationContext) {
207             // We're currently processing another operation so queue it to be processed later.
208
209             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
210                     operationContext.getOperation());
211
212             pendingOperationsQueue.add(operationContext);
213         }
214
215         void onServerOperationTimeout(final ServerOperationTimeout timeout) {
216             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
217         }
218
219         void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) {
220             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
221         }
222
223         void onApplyState(final ApplyState applyState) {
224             LOG.debug("onApplyState was called in state {}", this);
225         }
226
227         void onSnapshotComplete() {
228
229         }
230
231         void onNewLeader(final String newLeader) {
232         }
233
234         protected void persistNewServerConfiguration(final ServerOperationContext<?> operationContext) {
235             raftContext.setDynamicServerConfigurationInUse();
236
237             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
238                     operationContext.includeSelfInNewConfiguration(raftActor));
239             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
240
241             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(),
242                     payload, false);
243
244             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
245                     operationContext.getLoggingContext())));
246
247             sendReply(operationContext, ServerChangeStatus.OK);
248         }
249
250         protected void operationComplete(final ServerOperationContext<?> operationContext,
251                 final @Nullable ServerChangeStatus replyStatus) {
252             if (replyStatus != null) {
253                 sendReply(operationContext, replyStatus);
254             }
255
256             operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
257
258             changeToIdleState();
259         }
260
261         protected void changeToIdleState() {
262             currentOperationState = IDLE;
263
264             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
265             if (nextOperation != null) {
266                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
267             }
268         }
269
270         protected void sendReply(final ServerOperationContext<?> operationContext, final ServerChangeStatus status) {
271             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status,
272                     operationContext.getOperation());
273
274             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
275                     raftActor.self());
276         }
277
278         Cancellable newTimer(final Object message) {
279             return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
280         }
281
282         Cancellable newTimer(final FiniteDuration timeout, final Object message) {
283             return raftContext.getActorSystem().scheduler().scheduleOnce(
284                     timeout, raftContext.getActor(), message,
285                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
286         }
287
288         @Override
289         public String toString() {
290             return getClass().getSimpleName();
291         }
292     }
293
294     /**
295      * The state when no server operation is in progress. It immediately initiates new server operations.
296      */
297     private final class Idle extends OperationState {
298         @Override
299         public void onNewOperation(final ServerOperationContext<?> operationContext) {
300             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
301         }
302
303         @Override
304         public void onApplyState(final ApplyState applyState) {
305             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
306         }
307     }
308
309     /**
310      * The state when a new server configuration is being persisted and replicated.
311      */
312     private final class Persisting extends OperationState {
313         private final ServerOperationContext<?> operationContext;
314         private final Cancellable timer;
315         private boolean timedOut = false;
316
317         Persisting(final ServerOperationContext<?> operationContext, final Cancellable timer) {
318             this.operationContext = operationContext;
319             this.timer = timer;
320         }
321
322         @Override
323         public void onApplyState(final ApplyState applyState) {
324             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
325             // sure it's meant for us.
326             if (operationContext.getContextId().equals(applyState.getIdentifier())) {
327                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
328                         applyState.getReplicatedLogEntry().getData());
329
330                 timer.cancel();
331                 operationComplete(operationContext, null);
332             }
333         }
334
335         @Override
336         public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
337             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
338                     timeout.getLoggingContext());
339
340             timedOut = true;
341
342             // Fail any pending operations
343             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
344             while (nextOperation != null) {
345                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
346                 nextOperation = pendingOperationsQueue.poll();
347             }
348         }
349
350         @Override
351         public void onNewOperation(final ServerOperationContext<?> newOperationContext) {
352             if (timedOut) {
353                 sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
354             } else {
355                 super.onNewOperation(newOperationContext);
356             }
357         }
358     }
359
360     /**
361      * Abstract base class for an AddServer operation state.
362      */
363     private abstract class AddServerState extends OperationState {
364         private final AddServerContext addServerContext;
365
366         AddServerState(final AddServerContext addServerContext) {
367             this.addServerContext = addServerContext;
368         }
369
370         AddServerContext getAddServerContext() {
371             return addServerContext;
372         }
373
374         Cancellable newInstallSnapshotTimer() {
375             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
376         }
377
378         void handleInstallSnapshotTimeout(final ServerOperationTimeout timeout) {
379             String serverId = timeout.getLoggingContext();
380
381             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
382
383             // cleanup
384             raftContext.removePeer(serverId);
385
386             boolean isLeader = raftActor.isLeader();
387             if (isLeader) {
388                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
389                 leader.removeFollower(serverId);
390             }
391
392             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT
393                     : ServerChangeStatus.NO_LEADER);
394         }
395
396     }
397
398     /**
399      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
400      * snapshot capture, if necessary.
401      */
402     private final class InitialAddServerState extends AddServerState implements InitialOperationState {
403         InitialAddServerState(final AddServerContext addServerContext) {
404             super(addServerContext);
405         }
406
407         @Override
408         public void initiate() {
409             final AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
410             AddServer addServer = getAddServerContext().getOperation();
411
412             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
413
414             if (raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
415                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
416                 return;
417             }
418
419             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
420                     VotingState.NON_VOTING;
421             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
422
423             leader.addFollower(addServer.getNewServerId());
424
425             if (votingState == VotingState.VOTING_NOT_INITIALIZED) {
426                 // schedule the install snapshot timeout timer
427                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
428                 if (leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
429                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
430                             addServer.getNewServerId());
431
432                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
433                 } else {
434                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
435
436                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
437                             installSnapshotTimer);
438                 }
439             } else {
440                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
441                         raftContext.getId());
442
443                 persistNewServerConfiguration(getAddServerContext());
444             }
445         }
446     }
447
448     /**
449      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
450      * reply or timeout.
451      */
452     private final class InstallingSnapshot extends AddServerState {
453         private final Cancellable installSnapshotTimer;
454
455         InstallingSnapshot(final AddServerContext addServerContext, final Cancellable installSnapshotTimer) {
456             super(addServerContext);
457             this.installSnapshotTimer = requireNonNull(installSnapshotTimer);
458         }
459
460         @Override
461         public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
462             handleInstallSnapshotTimeout(timeout);
463
464             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
465                     timeout.getLoggingContext());
466         }
467
468         @Override
469         public void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) {
470             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
471
472             String followerId = reply.getFollowerId();
473
474             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
475             // add server operation that timed out.
476             if (getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
477                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
478                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
479                 leader.updateMinReplicaCount();
480
481                 persistNewServerConfiguration(getAddServerContext());
482
483                 installSnapshotTimer.cancel();
484             } else {
485                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
486                         raftContext.getId(), followerId,
487                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
488             }
489         }
490     }
491
492     /**
493      * The AddServer operation state for when there is a snapshot already in progress. When the current
494      * snapshot completes, it initiates an install snapshot.
495      */
496     private final class WaitingForPriorSnapshotComplete extends AddServerState {
497         private final Cancellable snapshotTimer;
498
499         WaitingForPriorSnapshotComplete(final AddServerContext addServerContext, final Cancellable snapshotTimer) {
500             super(addServerContext);
501             this.snapshotTimer = requireNonNull(snapshotTimer);
502         }
503
504         @Override
505         public void onSnapshotComplete() {
506             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
507
508             if (!raftActor.isLeader()) {
509                 LOG.debug("{}: No longer the leader", raftContext.getId());
510                 return;
511             }
512
513             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
514             if (leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
515                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
516                         getAddServerContext().getOperation().getNewServerId());
517
518                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
519                         newInstallSnapshotTimer());
520
521                 snapshotTimer.cancel();
522             }
523         }
524
525         @Override
526         public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
527             handleInstallSnapshotTimeout(timeout);
528
529             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
530                     raftContext.getId(), timeout.getLoggingContext());
531         }
532     }
533
534     private static final class ServerOperationContextIdentifier
535             extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
536         private static final long serialVersionUID = 1L;
537
538         ServerOperationContextIdentifier() {
539             super(UUID.randomUUID());
540         }
541     }
542
543     /**
544      * Stores context information for a server operation.
545      *
546      * @param <T> the operation type
547      */
548     private abstract static class ServerOperationContext<T> {
549         private final T operation;
550         private final ActorRef clientRequestor;
551         private final Identifier contextId;
552
553         ServerOperationContext(final T operation, final ActorRef clientRequestor) {
554             this.operation = operation;
555             this.clientRequestor = clientRequestor;
556             contextId = new ServerOperationContextIdentifier();
557         }
558
559         Identifier getContextId() {
560             return contextId;
561         }
562
563         T getOperation() {
564             return operation;
565         }
566
567         ActorRef getClientRequestor() {
568             return clientRequestor;
569         }
570
571         void operationComplete(final RaftActor raftActor, final boolean succeeded) {
572         }
573
574         boolean includeSelfInNewConfiguration(final RaftActor raftActor) {
575             return true;
576         }
577
578         abstract Object newReply(ServerChangeStatus status, String leaderId);
579
580         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
581
582         abstract String getLoggingContext();
583     }
584
585     /**
586      * Stores context information for an AddServer operation.
587      */
588     private static class AddServerContext extends ServerOperationContext<AddServer> {
589         AddServerContext(final AddServer addServer, final ActorRef clientRequestor) {
590             super(addServer, clientRequestor);
591         }
592
593         @Override
594         Object newReply(final ServerChangeStatus status, final String leaderId) {
595             return new AddServerReply(status, leaderId);
596         }
597
598         @Override
599         InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
600             return support.new InitialAddServerState(this);
601         }
602
603         @Override
604         String getLoggingContext() {
605             return getOperation().getNewServerId();
606         }
607     }
608
609     private abstract class RemoveServerState extends OperationState {
610         private final RemoveServerContext removeServerContext;
611
612         protected RemoveServerState(final RemoveServerContext removeServerContext) {
613             this.removeServerContext = requireNonNull(removeServerContext);
614
615         }
616
617         public RemoveServerContext getRemoveServerContext() {
618             return removeServerContext;
619         }
620     }
621
622     private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState {
623
624         protected InitialRemoveServerState(final RemoveServerContext removeServerContext) {
625             super(removeServerContext);
626         }
627
628         @Override
629         public void initiate() {
630             String serverId = getRemoveServerContext().getOperation().getServerId();
631             raftContext.removePeer(serverId);
632             AbstractLeader leader = (AbstractLeader)raftActor.getCurrentBehavior();
633             leader.removeFollower(serverId);
634             leader.updateMinReplicaCount();
635
636             persistNewServerConfiguration(getRemoveServerContext());
637         }
638     }
639
640     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
641         private final String peerAddress;
642
643         RemoveServerContext(final RemoveServer operation, final String peerAddress, final ActorRef clientRequestor) {
644             super(operation, clientRequestor);
645             this.peerAddress = peerAddress;
646         }
647
648         @Override
649         Object newReply(final ServerChangeStatus status, final String leaderId) {
650             return new RemoveServerReply(status, leaderId);
651         }
652
653         @Override
654         InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
655             return support.new InitialRemoveServerState(this);
656         }
657
658         @Override
659         void operationComplete(final RaftActor raftActor, final boolean succeeded) {
660             if (peerAddress != null) {
661                 raftActor.context().actorSelection(peerAddress).tell(
662                         new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
663             }
664         }
665
666         @Override
667         boolean includeSelfInNewConfiguration(final RaftActor raftActor) {
668             return !getOperation().getServerId().equals(raftActor.getId());
669         }
670
671         @Override
672         String getLoggingContext() {
673             return getOperation().getServerId();
674         }
675     }
676
677     private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
678         private final boolean tryToElectLeader;
679
680         ChangeServersVotingStatusContext(final ChangeServersVotingStatus convertMessage, final ActorRef clientRequestor,
681                 final boolean tryToElectLeader) {
682             super(convertMessage, clientRequestor);
683             this.tryToElectLeader = tryToElectLeader;
684         }
685
686         @Override
687         InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
688             return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
689         }
690
691         @Override
692         Object newReply(final ServerChangeStatus status, final String leaderId) {
693             return new ServerChangeReply(status, leaderId);
694         }
695
696         @Override
697         void operationComplete(final RaftActor raftActor, final boolean succeeded) {
698             // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
699             // leadership.
700             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation()
701                     .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
702             if (succeeded && localServerChangedToNonVoting) {
703                 LOG.debug("Leader changed to non-voting - trying leadership transfer");
704                 raftActor.becomeNonVoting();
705             } else if (raftActor.isLeader()) {
706                 raftActor.onVotingStateChangeComplete();
707             }
708         }
709
710         @Override
711         String getLoggingContext() {
712             return getOperation().toString();
713         }
714     }
715
716     private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
717         private final ChangeServersVotingStatusContext changeVotingStatusContext;
718         private final boolean tryToElectLeader;
719
720         ChangeServersVotingStatusState(final ChangeServersVotingStatusContext changeVotingStatusContext,
721                 final boolean tryToElectLeader) {
722             this.changeVotingStatusContext = changeVotingStatusContext;
723             this.tryToElectLeader = tryToElectLeader;
724         }
725
726         @Override
727         public void initiate() {
728             LOG.debug("Initiating ChangeServersVotingStatusState");
729
730             if (tryToElectLeader) {
731                 initiateLocalLeaderElection();
732             } else if (updateLocalPeerInfo()) {
733                 persistNewServerConfiguration(changeVotingStatusContext);
734             }
735         }
736
737         private void initiateLocalLeaderElection() {
738             LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
739
740             ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
741             if (!updateLocalPeerInfo()) {
742                 return;
743             }
744
745             raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
746
747             currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
748         }
749
750         private boolean updateLocalPeerInfo() {
751             List<ServerInfo> newServerInfoList = newServerInfoList();
752
753             // Check if new voting state would leave us with no voting members.
754             boolean atLeastOneVoting = false;
755             for (ServerInfo info: newServerInfoList) {
756                 if (info.isVoting()) {
757                     atLeastOneVoting = true;
758                     break;
759                 }
760             }
761
762             if (!atLeastOneVoting) {
763                 operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
764                 return false;
765             }
766
767             raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
768             if (raftActor.getCurrentBehavior() instanceof AbstractLeader) {
769                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
770                 leader.updateMinReplicaCount();
771             }
772
773             return true;
774         }
775
776         private List<ServerInfo> newServerInfoList() {
777             Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation()
778                     .getServerVotingStatusMap();
779             List<ServerInfo> newServerInfoList = new ArrayList<>();
780             for (String peerId: raftContext.getPeerIds()) {
781                 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId)
782                         ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
783             }
784
785             newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
786                     raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId())
787                             : raftContext.isVotingMember()));
788
789             return newServerInfoList;
790         }
791     }
792
793     private class WaitingForLeaderElected extends OperationState {
794         private final ServerConfigurationPayload previousServerConfig;
795         private final ChangeServersVotingStatusContext operationContext;
796         private final Cancellable timer;
797
798         WaitingForLeaderElected(final ChangeServersVotingStatusContext operationContext,
799                 final ServerConfigurationPayload previousServerConfig) {
800             this.operationContext = operationContext;
801             this.previousServerConfig = previousServerConfig;
802
803             timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
804                     new ServerOperationTimeout(operationContext.getLoggingContext()));
805         }
806
807         @Override
808         void onNewLeader(final String newLeader) {
809             if (newLeader == null) {
810                 return;
811             }
812
813             LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
814
815             timer.cancel();
816
817             if (raftActor.isLeader()) {
818                 persistNewServerConfiguration(operationContext);
819             } else {
820                 // Edge case - some other node became leader so forward the operation.
821                 LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
822
823                 // Revert the local server config change.
824                 raftContext.updatePeerIds(previousServerConfig);
825
826                 changeToIdleState();
827                 RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
828             }
829         }
830
831         @Override
832         void onServerOperationTimeout(final ServerOperationTimeout timeout) {
833             LOG.warn("{}: Leader election timed out - cannot apply operation {}",
834                     raftContext.getId(), timeout.getLoggingContext());
835
836             // Revert the local server config change.
837             raftContext.updatePeerIds(previousServerConfig);
838             raftActor.initializeBehavior();
839
840             tryToForwardOperationToAnotherServer();
841         }
842
843         private void tryToForwardOperationToAnotherServer() {
844             Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
845
846             LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
847                     serversVisited);
848
849             serversVisited.add(raftContext.getId());
850
851             // Try to find another whose state is being changed from non-voting to voting and that we haven't
852             // tried yet.
853             Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
854             ActorSelection forwardToPeerActor = null;
855             for (Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
856                 Boolean isVoting = e.getValue();
857                 String serverId = e.getKey();
858                 PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
859                 if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
860                     ActorSelection actor = raftContext.getPeerActorSelection(serverId);
861                     if (actor != null) {
862                         forwardToPeerActor = actor;
863                         break;
864                     }
865                 }
866             }
867
868             if (forwardToPeerActor != null) {
869                 LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
870
871                 forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
872                         operationContext.getClientRequestor());
873                 changeToIdleState();
874             } else {
875                 operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
876             }
877         }
878     }
879
880     static class ServerOperationTimeout {
881         private final String loggingContext;
882
883         ServerOperationTimeout(final String loggingContext) {
884             this.loggingContext = requireNonNull(loggingContext, "loggingContext should not be null");
885         }
886
887         String getLoggingContext() {
888             return loggingContext;
889         }
890     }
891 }