Remove the leader's FollowerLogInformation on RemoveServer
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActor raftActor;
43
44     private final RaftActorContext raftContext;
45
46     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
47
48     private OperationState currentOperationState = IDLE;
49
50     RaftActorServerConfigurationSupport(RaftActor raftActor) {
51         this.raftActor = raftActor;
52         this.raftContext = raftActor.getRaftActorContext();
53     }
54
55     boolean handleMessage(Object message, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer) message, sender);
58             return true;
59         } else if(message instanceof RemoveServer) {
60             onRemoveServer((RemoveServer) message, sender);
61             return true;
62         } else if (message instanceof ServerOperationTimeout) {
63             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
64             return true;
65         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
67             return true;
68         } else if(message instanceof ApplyState) {
69             return onApplyState((ApplyState) message);
70         } else if(message instanceof SnapshotComplete) {
71             currentOperationState.onSnapshotComplete();
72             return false;
73         } else {
74             return false;
75         }
76     }
77
78     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
79         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
80         boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
81         if(isSelf && !raftContext.hasFollowers()) {
82             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
83                     raftActor.getSelf());
84         } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
85             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
86                     raftActor.getSelf());
87         } else {
88             String serverAddress = isSelf ? raftActor.self().path().toString() :
89                 raftContext.getPeerAddress(removeServer.getServerId());
90             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
91         }
92     }
93
94     private boolean onApplyState(ApplyState applyState) {
95         Payload data = applyState.getReplicatedLogEntry().getData();
96         if(data instanceof ServerConfigurationPayload) {
97             currentOperationState.onApplyState(applyState);
98             return true;
99         }
100
101         return false;
102     }
103
104     /**
105      * The algorithm for AddServer is as follows:
106      * <ul>
107      * <li>Add the new server as a peer.</li>
108      * <li>Add the new follower to the leader.</li>
109      * <li>If new server should be voting member</li>
110      * <ul>
111      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
112      *     <li>Initiate install snapshot to the new follower.</li>
113      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
114      * </ul>
115      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
116      * <li>On replication consensus, respond to caller with OK.</li>
117      * </ul>
118      * If the install snapshot times out after a period of 2 * election time out
119      * <ul>
120      *     <li>Remove the new server as a peer.</li>
121      *     <li>Remove the new follower from the leader.</li>
122      *     <li>Respond to caller with TIMEOUT.</li>
123      * </ul>
124      */
125     private void onAddServer(AddServer addServer, ActorRef sender) {
126         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
127
128         onNewOperation(new AddServerContext(addServer, sender));
129     }
130
131     private void onNewOperation(ServerOperationContext<?> operationContext) {
132         if (raftActor.isLeader()) {
133             currentOperationState.onNewOperation(operationContext);
134         } else {
135             ActorSelection leader = raftActor.getLeader();
136             if (leader != null) {
137                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
138                 leader.forward(operationContext.getOperation(), raftActor.getContext());
139             } else {
140                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
141                 operationContext.getClientRequestor().tell(operationContext.newReply(
142                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
143             }
144         }
145     }
146
147     /**
148      * Interface for a server operation FSM state.
149      */
150     private interface OperationState {
151         void onNewOperation(ServerOperationContext<?> operationContext);
152
153         void onServerOperationTimeout(ServerOperationTimeout timeout);
154
155         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
156
157         void onApplyState(ApplyState applyState);
158
159         void onSnapshotComplete();
160     }
161
162     /**
163      * Interface for the initial state for a server operation.
164      */
165     private interface InitialOperationState {
166         void initiate();
167     }
168
169     /**
170      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
171      */
172     private abstract class AbstractOperationState implements OperationState {
173         @Override
174         public void onNewOperation(ServerOperationContext<?> operationContext) {
175             // We're currently processing another operation so queue it to be processed later.
176
177             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
178                     operationContext.getOperation());
179
180             pendingOperationsQueue.add(operationContext);
181         }
182
183         @Override
184         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
185             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
186         }
187
188         @Override
189         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
190             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
191         }
192
193         @Override
194         public void onApplyState(ApplyState applyState) {
195             LOG.debug("onApplyState was called in state {}", this);
196         }
197
198         @Override
199         public void onSnapshotComplete() {
200         }
201
202         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
203             raftContext.setDynamicServerConfigurationInUse();
204
205             boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
206             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
207             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
208
209             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
210
211             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
212
213             sendReply(operationContext, ServerChangeStatus.OK);
214         }
215
216         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
217             if(replyStatus != null) {
218                 sendReply(operationContext, replyStatus);
219             }
220
221             operationContext.operationComplete(raftActor, replyStatus);
222
223             currentOperationState = IDLE;
224
225             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
226             if(nextOperation != null) {
227                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
228             }
229         }
230
231         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
232             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
233
234             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
235                     raftActor.self());
236         }
237
238         Cancellable newTimer(Object message) {
239             return raftContext.getActorSystem().scheduler().scheduleOnce(
240                     raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
241                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
242         }
243
244         @Override
245         public String toString() {
246             return getClass().getSimpleName();
247         }
248     }
249
250     /**
251      * The state when no server operation is in progress. It immediately initiates new server operations.
252      */
253     private class Idle extends AbstractOperationState {
254         @Override
255         public void onNewOperation(ServerOperationContext<?> operationContext) {
256             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
257         }
258
259         @Override
260         public void onApplyState(ApplyState applyState) {
261             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
262         }
263     }
264
265     /**
266      * The state when a new server configuration is being persisted and replicated.
267      */
268     private class Persisting extends AbstractOperationState {
269         private final ServerOperationContext<?> operationContext;
270         private final Cancellable timer;
271         private boolean timedOut = false;
272
273         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
274             this.operationContext = operationContext;
275             this.timer = timer;
276         }
277
278         @Override
279         public void onApplyState(ApplyState applyState) {
280             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
281             // sure it's meant for us.
282             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
283                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
284                         applyState.getReplicatedLogEntry().getData());
285
286                 timer.cancel();
287                 operationComplete(operationContext, null);
288             }
289         }
290
291         @Override
292         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
293             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
294                     timeout.getServerId());
295
296             timedOut = true;
297
298             // Fail any pending operations
299             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
300             while(nextOperation != null) {
301                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
302                 nextOperation = pendingOperationsQueue.poll();
303             }
304         }
305
306         @Override
307         public void onNewOperation(ServerOperationContext<?> operationContext) {
308             if(timedOut) {
309                 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
310             } else {
311                 super.onNewOperation(operationContext);
312             }
313         }
314     }
315
316     /**
317      * Abstract base class for an AddServer operation state.
318      */
319     private abstract class AddServerState extends AbstractOperationState {
320         private final AddServerContext addServerContext;
321
322         AddServerState(AddServerContext addServerContext) {
323             this.addServerContext = addServerContext;
324         }
325
326         AddServerContext getAddServerContext() {
327             return addServerContext;
328         }
329
330         Cancellable newInstallSnapshotTimer() {
331             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
332         }
333
334         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
335             String serverId = timeout.getServerId();
336
337             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
338
339             // cleanup
340             raftContext.removePeer(serverId);
341
342             boolean isLeader = raftActor.isLeader();
343             if(isLeader) {
344                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
345                 leader.removeFollower(serverId);
346             }
347
348             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
349         }
350
351     }
352
353     /**
354      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
355      * snapshot capture, if necessary.
356      */
357     private class InitialAddServerState extends AddServerState implements InitialOperationState {
358         InitialAddServerState(AddServerContext addServerContext) {
359             super(addServerContext);
360         }
361
362         @Override
363         public void initiate() {
364             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
365             AddServer addServer = getAddServerContext().getOperation();
366
367             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
368
369             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
370                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
371                 return;
372             }
373
374             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
375                     VotingState.NON_VOTING;
376             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
377
378             leader.addFollower(addServer.getNewServerId());
379
380             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
381                 // schedule the install snapshot timeout timer
382                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
383                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
384                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
385                             addServer.getNewServerId());
386
387                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
388                 } else {
389                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
390
391                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
392                             installSnapshotTimer);
393                 }
394             } else {
395                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
396                         raftContext.getId());
397
398                 persistNewServerConfiguration(getAddServerContext());
399             }
400         }
401     }
402
403     /**
404      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
405      * reply or timeout.
406      */
407     private class InstallingSnapshot extends AddServerState {
408         private final Cancellable installSnapshotTimer;
409
410         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
411             super(addServerContext);
412             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
413         }
414
415         @Override
416         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
417             handleInstallSnapshotTimeout(timeout);
418
419             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
420                     timeout.getServerId());
421         }
422
423         @Override
424         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
425             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
426
427             String followerId = reply.getFollowerId();
428
429             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
430             // add server operation that timed out.
431             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
432                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
433                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
434                 leader.updateMinReplicaCount();
435
436                 persistNewServerConfiguration(getAddServerContext());
437
438                 installSnapshotTimer.cancel();
439             } else {
440                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
441                         raftContext.getId(), followerId,
442                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
443             }
444         }
445     }
446
447     /**
448      * The AddServer operation state for when there is a snapshot already in progress. When the current
449      * snapshot completes, it initiates an install snapshot.
450      */
451     private class WaitingForPriorSnapshotComplete extends AddServerState {
452         private final Cancellable snapshotTimer;
453
454         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
455             super(addServerContext);
456             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
457         }
458
459         @Override
460         public void onSnapshotComplete() {
461             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
462
463             if(!raftActor.isLeader()) {
464                 LOG.debug("{}: No longer the leader", raftContext.getId());
465                 return;
466             }
467
468             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
469             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
470                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
471                         getAddServerContext().getOperation().getNewServerId());
472
473                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
474                         newInstallSnapshotTimer());
475
476                 snapshotTimer.cancel();
477             }
478         }
479
480         @Override
481         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
482             handleInstallSnapshotTimeout(timeout);
483
484             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
485                     raftContext.getId(), timeout.getServerId());
486         }
487     }
488
489     /**
490      * Stores context information for a server operation.
491      *
492      * @param <T> the operation type
493      */
494     private static abstract class ServerOperationContext<T> {
495         private final T operation;
496         private final ActorRef clientRequestor;
497         private final String contextId;
498
499         ServerOperationContext(T operation, ActorRef clientRequestor){
500             this.operation = operation;
501             this.clientRequestor = clientRequestor;
502             contextId = UUID.randomUUID().toString();
503         }
504
505         String getContextId() {
506             return contextId;
507         }
508
509         T getOperation() {
510             return operation;
511         }
512
513         ActorRef getClientRequestor() {
514             return clientRequestor;
515         }
516
517         abstract Object newReply(ServerChangeStatus status, String leaderId);
518
519         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
520
521         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
522
523         abstract String getServerId();
524     }
525
526     /**
527      * Stores context information for an AddServer operation.
528      */
529     private static class AddServerContext extends ServerOperationContext<AddServer> {
530         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
531             super(addServer, clientRequestor);
532         }
533
534         @Override
535         Object newReply(ServerChangeStatus status, String leaderId) {
536             return new AddServerReply(status, leaderId);
537         }
538
539         @Override
540         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
541             return support.new InitialAddServerState(this);
542         }
543
544         @Override
545         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
546
547         }
548
549         @Override
550         String getServerId() {
551             return getOperation().getNewServerId();
552         }
553     }
554
555     private abstract class RemoveServerState extends AbstractOperationState {
556         private final RemoveServerContext removeServerContext;
557
558
559         protected RemoveServerState(RemoveServerContext removeServerContext) {
560             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
561
562         }
563
564         public RemoveServerContext getRemoveServerContext() {
565             return removeServerContext;
566         }
567     }
568
569     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
570
571         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
572             super(removeServerContext);
573         }
574
575         @Override
576         public void initiate() {
577             String serverId = getRemoveServerContext().getOperation().getServerId();
578             raftContext.removePeer(serverId);
579             ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
580
581             persistNewServerConfiguration(getRemoveServerContext());
582         }
583     }
584
585     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
586         private final String peerAddress;
587
588         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
589             super(operation, clientRequestor);
590             this.peerAddress = peerAddress;
591         }
592
593         @Override
594         Object newReply(ServerChangeStatus status, String leaderId) {
595             return new RemoveServerReply(status, leaderId);
596         }
597
598         @Override
599         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
600             return support.new InitialRemoveServerState(this);
601         }
602
603         @Override
604         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
605             if(peerAddress != null) {
606                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
607             }
608         }
609
610         @Override
611         String getServerId() {
612             return getOperation().getServerId();
613         }
614     }
615
616     static class ServerOperationTimeout {
617         private final String serverId;
618
619         ServerOperationTimeout(String serverId){
620            this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
621         }
622
623         String getServerId() {
624             return serverId;
625         }
626     }
627 }